Source code for baseband.vlbi_base.utils

# Licensed under the GPLv3 - see LICENSE
from __future__ import division, unicode_literals, print_function

from operator import index

import numpy as np
import six

__all__ = ['bcd_decode', 'bcd_encode', 'CRC']


[docs]def bcd_decode(value): try: # Far faster than my routine for scalars return int('{:x}'.format(index(value))) except TypeError as exc: # Might still be an array try: assert issubclass(value.dtype.type, np.integer) except Exception: raise exc bcd = value factor = 1 result = np.zeros_like(value) while np.any(bcd > 0): bcd, digit = divmod(bcd, 0x10) if np.any(digit > 9): if not np.isscalar(digit): value = value[digit > 9][0] raise ValueError("invalid BCD encoded value {0}={1}." .format(value, hex(value))) result += digit * factor factor *= 10 return result
[docs]def bcd_encode(value): try: # Far faster than my routine for scalars return int('{:d}'.format(index(value)), base=16) except TypeError as exc: # Might still be an array try: assert issubclass(value.dtype.type, np.integer) except Exception: raise exc result = np.zeros_like(value) result = 0 factor = 1 while np.any(value > 0): value, digit = divmod(value, 10) result += digit*factor factor *= 16 return result
[docs]class CRC(object): """Cyclic Redundancy Check for a bitstream. See https://en.wikipedia.org/wiki/Cyclic_redundancy_check Once initialised, the instance can be used as a function that calculates the CRC, or one can use the `check` method to check that the CRC at the end of a stream is correct. Parameters ---------- polynomial : int Binary encoded CRC divisor. For instance, that used by Mark 4 headers is 0x180f, or x^12 + x^11 + x^3 + x^2 + x + 1. """ def __init__(self, polynomial): self.polynomial = polynomial self.pol_bin = np.array( [int(bit) for bit in '{:b}'.format(polynomial)], dtype=np.int8) def __len__(self): return self.pol_bin.size - 1
[docs] def __call__(self, stream): """Calculate CRC for the given stream. Parameters ---------- stream : array of bool or unsigned int The dimension is treated as the index into the bits. For a single stream, the array should thus be of type `bool`. Integers represent multiple streams. E.g., for a 64-track Mark 4 header, the stream would be an array of ``np.uint64`` words. Returns ------- crc : array The crc will have the same dtype as the input stream. """ stream = np.hstack((stream, np.zeros((len(self),), stream.dtype))) return self._crc(stream)
[docs] def check(self, stream): """Check that the CRC at the end of the stream is correct. Parameters ---------- stream : array of bool or unsigned int The dimension is treated as the index into the bits. For a single stream, the array should thus be of type `bool`. Integers represent multiple streams. E.g., for a 64-track Mark 4 header, the stream would be an array of ``np.uint64`` words. Returns ------- ok : bool `True` if the calculated CRC is all zero (which should be the case if the CRC at the end of the stream is correct). """ return np.all(self._crc(stream.copy()) == 0)
def _crc(self, stream): """Internal function to calculate the CRC. Note that the stream is changed in-place. """ pol_bin = (-self.pol_bin).astype(stream.dtype) for i in range(0, len(stream) - len(self)): stream[i:i+pol_bin.size] ^= (pol_bin & stream[i]) return stream[-len(self):]
if not six.PY2: # ignoring python < 3.5; could add if you wish from math import gcd else: from fractions import gcd as fgcd def gcd(a, b): return abs(fgcd(a, b)) def lcm(a, b): """Calculate the least common multiple of a and b.""" return abs(a * b) // gcd(a, b)