Source code for baseband.vdif.payload

# Licensed under the GPLv3 - see LICENSE
"""
Definitions for VLBI VDIF payloads.

Implements a VDIFPayload class used to store payload words, and decode to
or encode from a data array.

See the `VDIF specification page <https://www.vlbi.org/vdif>`_ for payload
specifications.
"""
from collections import namedtuple

import numpy as np

from ..base.payload import PayloadBase
from ..base.encoding import (
    encode_1bit_base, encode_2bit_base, encode_4bit_base,
    decoder_levels, decode_8bit, encode_8bit)


__all__ = ['init_luts', 'decode_1bit', 'decode_2bit', 'decode_4bit',
           'encode_1bit', 'encode_2bit', 'encode_4bit', 'VDIFPayload']


[docs]def init_luts(): """Sets up the look-up tables for levels as a function of input byte. Returns ------- lut1bit, lut2bit, lut4but : `~numpy.ndarray` Look-up table for decoding bytes to samples of 1, 2, and 4 bits, resp. Notes ----- Look-up tables are two-dimensional arrays whose first axis is indexed by byte value (in uint8 form) and whose second axis represents sample temporal order. Table values are decoded sample values. Sec. 10 in the `VDIF Specification <https://vlbi.org/wp-content/uploads/2019/03/VDIF_specification_Release_1.1.1.pdf>`_ states that samples are encoded by offset-binary, such that all 0 bits is lowest and all 1 bits is highest. I.e., for 2-bit sampling, the order is 00, 01, 10, 11. These are decoded using `~baseband.base.encoding.decoder_levels`. For example, the 2-bit sample sequence ``-1, -1, 1, 1`` is encoded as ``0b10100101`` (or ``165`` in uint8 form). To translate this back to sample values, access ``lut2bit`` using the byte as the key:: >>> lut2bit[0b10100101] array([-1., -1., 1., 1.], dtype=float32) """ b = np.arange(256)[:, np.newaxis] # 1-bit mode i = np.arange(8) lut1bit = decoder_levels[1][(b >> i) & 1] # 2-bit mode i = np.arange(0, 8, 2) lut2bit = decoder_levels[2][(b >> i) & 3] # 4-bit mode i = np.arange(0, 8, 4) lut4bit = decoder_levels[4][(b >> i) & 0xf] return lut1bit, lut2bit, lut4bit
lut1bit, lut2bit, lut4bit = init_luts()
[docs]def decode_1bit(words): b = words.view(np.uint8) return lut1bit.take(b, axis=0)
shift1bit = np.arange(0, 8).astype(np.uint8)
[docs]def encode_1bit(values): """Encodes values using 1 bit per sample, packing the result into bytes.""" bitvalues = encode_1bit_base(values.reshape(-1, 8)) return np.packbits(bitvalues[:, ::-1])
[docs]def decode_2bit(words): """Decodes data stored using 2 bits per sample.""" b = words.view(np.uint8) return lut2bit.take(b, axis=0)
shift2bit = np.arange(0, 8, 2).astype(np.uint8)
[docs]def encode_2bit(values): """Encodes values using 2 bits per sample, packing the result into bytes. """ bitvalues = encode_2bit_base(values.reshape(-1, 4)) bitvalues <<= shift2bit return np.bitwise_or.reduce(bitvalues, axis=-1)
[docs]def decode_4bit(words): """Decodes data stored using 4 bits per sample.""" b = words.view(np.uint8) return lut4bit.take(b, axis=0)
shift04 = np.array([0, 4], np.uint8)
[docs]def encode_4bit(values): """Encodes values using 4 bits per sample, packing the result into bytes. """ b = encode_4bit_base(values).reshape(-1, 2) b <<= shift04 return b[:, 0] | b[:, 1]
[docs]class VDIFPayload(PayloadBase): """Container for decoding and encoding VDIF payloads. Parameters ---------- words : `~numpy.ndarray` Array containg LSB unsigned words (with the right size) that encode the payload. header : `~baseband.vdif.VDIFHeader` If given, used to infer the number of channels, bps, and whether the data are complex. sample_shape : tuple Shape of the samples (e.g., (nchan,)). Default: (1,). bps : int, optional Bits per elementary sample, used if ``header`` is not given. Default: 2. complex_data : bool, optional Whether the data are complex, used if ``header`` is not given. Default: `False`. """ _decoders = {1: decode_1bit, 2: decode_2bit, 4: decode_4bit, 8: decode_8bit} _encoders = {1: encode_1bit, 2: encode_2bit, 4: encode_4bit, 8: encode_8bit} _sample_shape_maker = namedtuple('SampleShape', 'nchan') def __init__(self, words, header=None, sample_shape=(1,), bps=2, complex_data=False): if header is not None and header.edv == 0xab: # Mark5B payload from ..mark5b import Mark5BPayload self._decoders = Mark5BPayload._decoders self._encoders = Mark5BPayload._encoders super().__init__(words, header=header, sample_shape=sample_shape, bps=bps, complex_data=complex_data) # Recalculate bpfs: samples do not cross word boundaries. if (self.bps & (self.bps - 1)) != 0: if self.sample_shape != (1,): raise ValueError("multi-channel VDIF data requires " "bits per sample that is a power of two.") spw = 32 // self._bpfs if (spw & (spw - 1)) == 0: self._bpfs = 32 // spw else: raise ValueError( "cannot yet sensibly handle {} data with bps={}" .format('complex' if self.complex_data else 'real', bps))
[docs] @classmethod def fromdata(cls, data, header=None, bps=2, edv=None): """Encode data as payload, using header information. Parameters ---------- data : `~numpy.ndarray` Values to be encoded. header : `~baseband.vdif.VDIFHeader`, optional If given, used to infer the encoding, and to verify the number of channels and whether the data are complex. bps : int, optional Bits per elementary sample, used if ``header`` is not given. Default: 2. edv : int, optional Should be given if ``header`` is not given and the payload is encoded as Mark 5B data (i.e., ``edv=0xab``). """ if (edv if header is None else header.edv) == 0xab: # Mark5B payload from ..mark5b import Mark5BPayload bps = bps if header is None else header.bps m5pl = Mark5BPayload.fromdata(data, bps=bps) return cls(m5pl.words, header, sample_shape=data.shape[1:], bps=bps, complex_data=False) else: return super().fromdata(data, header=header, bps=bps)