Source code for baseband.mark4.payload

# Licensed under the GPLv3 - see LICENSE
"""
Definitions for VLBI Mark 4 payloads.

Implements a Mark4Payload class used to store payload words, and decode to
or encode from a data array.

For the specification, see
https://www.haystack.mit.edu/tech/vlbi/mark5/docs/230.3.pdf
"""
import sys
from collections import namedtuple

import numpy as np

from ..base.payload import PayloadBase
from ..base.encoding import encode_2bit_base, decoder_levels
from ..base.utils import fixedvalue
from .header import MARK4_DTYPES


__all__ = ['reorder32', 'reorder64', 'init_luts', 'decode_8chan_2bit_fanout4',
           'encode_8chan_2bit_fanout4', 'Mark4Payload']

#  2bit/fanout4 use the following in decoding 32 and 64 track data:
if sys.byteorder == 'big':  # pragma: no cover
    def reorder32(x):
        """Reorder 32-track bits to bring signs & magnitudes together."""
        return (((x & 0x55AA55AA))
                | ((x & 0xAA00AA00) >> 9)
                | ((x & 0x00550055) << 9))

    def reorder64(x):
        """Reorder 64-track bits to bring signs & magnitudes together."""
        return (((x & 0x55AA55AA55AA55AA))
                | ((x & 0xAA00AA00AA00AA00) >> 9)
                | ((x & 0x0055005500550055) << 9))

    def reorder64_Ft(x):
        """Reorder 64-track bits to bring signs & magnitudes together.

        Special version for the Ft station, which has unusual settings.
        """
        return (((x & 0xAFFAFFFFAFFAFFFF))
                | ((x & 0x0005000000050000) << 12)
                | ((x & 0x5000000050000000) >> 12))
else:
[docs] def reorder32(x): """Reorder 32-track bits to bring signs & magnitudes together.""" return (((x & 0xAA55AA55)) | ((x & 0x55005500) >> 7) | ((x & 0x00AA00AA) << 7))
# Can speed this up from 140 to 132 us by predefining bit patterns as # array scalars. Inplace calculations do not seem to help much.
[docs] def reorder64(x): """Reorder 64-track bits to bring signs & magnitudes together.""" return (((x & 0xAA55AA55AA55AA55)) | ((x & 0x5500550055005500) >> 7) | ((x & 0x00AA00AA00AA00AA) << 7))
def reorder64_Ft(x): """Reorder 64-track bits to bring signs & magnitudes together. Special version for the Ft station, which has unusual settings. """ return (((x & 0xFFFFFAAFFFFFFAAF)) | ((x & 0x0000050000000500) >> 4) | ((x & 0x0000005000000050) << 4)) # Check on 2015-JUL-12: C code: 738811025863578102 -> 738829572664316278 # 118, 209, 53, 244, 148, 217, 64, 10 # reorder64(np.array([738811025863578102], dtype=np.uint64)) # # array([738829572664316278], dtype=uint64) # reorder64(np.array([738811025863578102], dtype=np.uint64)).view(np.uint8) # # array([118, 209, 53, 244, 148, 217, 64, 10], dtype=uint8) # decode_2bit_64track_fanout4( # np.array([738811025863578102], dtype=np.int64)).astype(int).T # -1 1 3 1 array([[-1, 1, 3, 1], # 1 1 3 -3 [ 1, 1, 3, -3], # 1 -3 1 3 [ 1, -3, 1, 3], # -3 1 3 3 [-3, 1, 3, 3], # -3 1 1 -1 [-3, 1, 1, -1], # -3 -3 -3 1 [-3, -3, -3, 1], # 1 -1 1 3 [ 1, -1, 1, 3], # -1 -1 -3 -3 [-1, -1, -3, -3]])
[docs]def init_luts(): """Set up the look-up tables for levels as a function of input byte.""" # Organisation by bits is quite odd for Mark 4. b = np.arange(256)[:, np.newaxis] # lut1bit i = np.arange(8) # For all 1-bit modes; if set, sign=-1, so need to get item 0. lut1bit = decoder_levels[1][((b >> i) & 1) ^ 1] i = np.arange(4) # fanout 1 @ 8/16t, fanout 4 @ 32/64t s = i*2 # 0, 2, 4, 6 m = s+1 # 1, 3, 5, 7 lut2bit1 = decoder_levels[2][2*(b >> s & 1) + (b >> m & 1)] # fanout 2 @ 8/16t, fanout 1 @ 32/64t s = i + (i//2)*2 # 0, 1, 4, 5 m = s + 2 # 2, 3, 6, 7 lut2bit2 = decoder_levels[2][2*(b >> s & 1) + (b >> m & 1)] # fanout 4 @ 8/16t, fanout 2 @ 32/64t s = i # 0, 1, 2, 3 m = s+4 # 4, 5, 6, 7 lut2bit3 = decoder_levels[2][2*(b >> s & 1) + (b >> m & 1)] return lut1bit, lut2bit1, lut2bit2, lut2bit3
lut1bit, lut2bit1, lut2bit2, lut2bit3 = init_luts() # Look-up table for the number of bits in a byte. nbits = ((np.arange(256)[:, np.newaxis] >> np.arange(8) & 1) .sum(1).astype(np.int16)) def decode_2chan_2bit_fanout4(frame): """Decode payload for 2 channels using 2 bits, fan-out 4 (16 tracks).""" # header['magnitude_bit'] = 00001111,00001111 # makes sense with lut2bit3 # header['fan_out'] = 01230123,01230123 # header['converter_id'] = 00000000,11111111 # header['lsb_output'] = 11111111,11111111 # After reshape: byte 0: ch0/s0, ch0/s1, ch0/s2, ch0/s3, + mag. # byte 1: ch1/s0, ch1/s1, ch1/s2, ch1/s3, + mag. frame = frame.view(np.uint8).reshape(-1, 2) # The look-up table splits each data word into the above 8 measurements, # the transpose pushes channels first and fanout last, and the reshape # flattens the fanout. return lut2bit3.take(frame, axis=0).transpose(1, 0, 2).reshape(2, -1).T def encode_2chan_2bit_fanout4(values): """Encode payload for 2 channels using 2 bits, fan-out 4 (16 tracks).""" # Reverse reshaping (see above). values = values.reshape(-1, 4, 2).transpose(0, 2, 1) bitvalues = encode_2bit_base(values) # Values are -3, -1, +1, 3 -> 00, 01, 10, 11; get first bit (sign) as 1, # second bit (magnitude) as 16. reorder_bits = np.array([0, 16, 1, 17], dtype=np.uint8) reorder_bits.take(bitvalues, out=bitvalues) bitvalues <<= np.array([0, 1, 2, 3], dtype=np.uint8) out = np.bitwise_or.reduce(bitvalues, axis=-1).ravel().view('<u2') return out def decode_4chan_2bit_fanout4(frame): """Decode payload for 4 channels using 2 bits, fan-out 4 (32 tracks).""" # Bitwise reordering of tracks, to align sign and magnitude bits, # reshaping to get VLBI channels in sequential, but wrong order. frame = reorder32(frame.view(np.uint32)).view(np.uint8).reshape(-1, 4) # Correct ordering. frame = frame.take(np.array([0, 2, 1, 3]), axis=1) # The look-up table splits each data byte into 4 measurements. # Using transpose ensures channels are first, then time samples, then # those 4 measurements, so the reshape orders the samples correctly. # Another transpose ensures samples are the first dimension. return lut2bit1.take(frame.T, axis=0).reshape(4, -1).T def encode_4chan_2bit_fanout4(values): """Encode payload for 4 channels using 2 bits, fan-out 4 (32 tracks).""" reorder_channels = np.array([0, 2, 1, 3]) values = values[:, reorder_channels].reshape(-1, 4, 4).transpose(0, 2, 1) bitvalues = encode_2bit_base(values) reorder_bits = np.array([0, 2, 1, 3], dtype=np.uint8) reorder_bits.take(bitvalues, out=bitvalues) bitvalues <<= np.array([0, 2, 4, 6], dtype=np.uint8) out = np.bitwise_or.reduce(bitvalues, axis=-1).ravel().view(np.uint32) return reorder32(out).view('<u4') def decode_8chan_2bit_fanout2(frame): """Decode payload for 8 channels using 2 bits, fan-out 4 (32 tracks).""" # header['magnitude_bit'] = 00001111,00001111,00001111,00001111 # makes sense with lut2bit3 # header['fan_out'] = 00110011,00110011,00110011,00110011 # i.e., s0s0,s1s1,m0m0,m1m1 for each byte # header['converter_id'] = 02020202,13131313,02020202,13131313 # header['lsb_output'] = 00000000,00000000,11111111,11111111 # After reshape: byte 0: ch0/s0, ch4/s0, ch0/s1, ch4/s1, + mag. # byte 1: ch1/s0, ch5/s0, ch1/s1, ch5/s1, + mag. # byte 2: ch2/s0, ch6/s0, ch2/s1, ch6/s1, + mag. # byte 3: ch3/s0, ch7/s0, ch3/s1, ch7/s1, + mag. frame = frame.view(np.uint8).reshape(-1, 4) # The look-up table splits each data word into the above 16 measurements. # the first reshape means one gets time, channel&0x3, sample, channel&0x4 # the transpose makes this channel&0x4, channel&0x3, time, sample. # the second reshape (which makes a copy) gets one just channel, time, # and the final transpose time, channel. return (lut2bit3.take(frame, axis=0).reshape(-1, 4, 2, 2) .transpose(3, 1, 0, 2).reshape(8, -1).T) def encode_8chan_2bit_fanout2(values): """Encode payload for 8 channels using 2 bits, fan-out 2 (32 tracks).""" # words encode 16 values (see above) in order ch&0x3, sample, ch&0x4 values = (values.reshape(-1, 2, 2, 4).transpose(0, 3, 1, 2) .reshape(-1, 4, 4)) bitvalues = encode_2bit_base(values) # values are -3, -1, +1, 3 -> 00, 01, 10, 11; # get first bit (sign) as 1, second bit (magnitude) as 16 reorder_bits = np.array([0, 16, 1, 17], dtype=np.uint8) reorder_bits.take(bitvalues, out=bitvalues) bitvalues <<= np.array([0, 1, 2, 3], dtype=np.uint8) out = np.bitwise_or.reduce(bitvalues, axis=-1).ravel().view('<u4') return out def decode_16chan_2bit_fanout2_ft(frame): """Decode payload for 16 channels using 2 bits, fan-out 2 (64 tracks).""" # These Fortaleza files have an unusual ordering: # header['magnitude_bit'] = 00000101,10101111,00001111,00001111 * 2 # White later two are as for lut2bit3, the first two are different. # header['fan_out'] = 00110011 * 8 # i.e., s0s0,s1s1,m0m0,m1m1 for the later bytes. # header['converter_id'] = 03030303,04040404,15151515,26262626, # 7a7a7a7a,7b7b7b7b,8c8c8c8c,9d9d9d9d # 0, 7 are doubled; those have lsb & usb; # header['lsb_output'] = 00001010,00001010,00000000,00000000 * 2 # Should take magnitude bit literally, # byte 0: 0u/s0, 3u/s0, 0u/s1, 3u/s1, 0l/s0, 3u/m0, 0l/s1, 3u/m1 # byte 1: 0u/m0, 4u/s0, 0u/m1, 4u/s1, 0l/m0, 4u/m0, 0l/m1, 4u/m1 # byte 2: 1u/s0, 5u/s0, 1u/s1, 5u/s1, 1u/m0, 5u/m0, 1u/m1, 5u/m1 # byte 3: 2u/s0, 6u/s0, 2u/s1, 6u/s1, 2u/m0, 6u/m0, 2u/m1, 6u/m1 # byte 4: 7u/s0, au/s0, 7u/s1, au/s1, 7l/s0, au/m0, 7l/s1, au/m1 # byte 5: 7u/m0, bu/s0, 7u/m1, bu/s1, 7l/m0, bu/m0, 7l/m1, bu/m1 # byte 6: 8u/s0, cu/s0, 8u/s1, cu/s1, 8u/m0, cu/m0, 8u/m1, cu/m1 # byte 7: 9u/s0, du/s0, 9u/s1, du/s1, 9u/m0, du/m0, 9u/m1, du/m1 # This means the re-ordering is different from the usual: we just # need to shift bits around in bytes 0,1,4,5: frame = reorder64_Ft(frame.view(np.uint64)) # This leaves samples as # byte 0: 0u/s0, 3u/s0, 0u/s1, 3u/s1, 0u/m0, 3u/m0, 0u/m1, 3u/m1 # byte 1: 0l/s0, 4u/s0, 0l/s1, 4u/s1, 0l/m0, 4u/m0, 0l/m1, 4u/m1 # byte 2: 1u/s0, 5u/s0, 1u/s1, 5u/s1, 1u/m0, 5u/m0, 1u/m1, 5u/m1 # byte 3: 2u/s0, 6u/s0, 2u/s1, 6u/s1, 2u/m0, 6u/m0, 2u/m1, 6u/m1 # byte 4: 7u/s0, au/s0, 7u/s1, au/s1, 7u/m0, au/m0, 7u/m1, au/m1 # byte 5: 7l/s0, bu/s0, 7l/s1, bu/s1, 7l/m0, bu/m0, 7l/m1, bu/m1 # byte 6: 8u/s0, cu/s0, 8u/s1, cu/s1, 8u/m0, cu/m0, 8u/m1, cu/m1 # byte 7: 9u/s0, du/s0, 9u/s1, du/s1, 9u/m0, du/m0, 9u/m1, du/m1 frame = frame.view(np.uint8).reshape(-1, 8) # The look-up table splits each data word into the above 32 measurements. # Translating 0u=0, 0l=1, 1..6=2..7, 7u=8, 7l=9, 8..d=a..f, the # first reshape yieds time, channel&0x8, channel&0x3, sample, channel&0x4 # transpose makes this channel&0x8, channel&0x4, channel&0x3, time, sample. # and the second reshape (which makes a copy) gets one just time, channel, # and the final transpose time, channel. return (lut2bit3.take(frame, axis=0).reshape(-1, 2, 4, 2, 2) .transpose(1, 4, 2, 0, 3).reshape(16, -1).T) def encode_16chan_2bit_fanout2_ft(values): """Encode payload for 16 channels using 2 bits, fan-out 2 (64 tracks).""" # words encode 32 values (above) in order ch&0x8, ch&0x3, sample, ch&0x4 # First reshape goes to time, sample, ch&0x8, ch&0x4, ch&0x3, # transpose makes this time, ch&0x8, ch&0x3, sample, ch&0x4. # second reshape future bytes, sample+ch&0x4. values = (values.reshape(-1, 2, 2, 2, 4).transpose(0, 2, 4, 1, 3) .reshape(-1, 4)) bitvalues = encode_2bit_base(values) # values are -3, -1, +1, 3 -> 00, 01, 10, 11; # get first bit (sign) as 1, second bit (magnitude) as 16 reorder_bits = np.array([0, 16, 1, 17], dtype=np.uint8) reorder_bits.take(bitvalues, out=bitvalues) bitvalues <<= np.array([0, 1, 2, 3], dtype=np.uint8) out = np.bitwise_or.reduce(bitvalues, axis=-1).ravel().view(np.uint64) return reorder64_Ft(out).view('<u8')
[docs]def decode_8chan_2bit_fanout4(frame): """Decode payload for 8 channels using 2 bits, fan-out 4 (64 tracks).""" # Bitwise reordering of tracks, to align sign and magnitude bits, # reshaping to get VLBI channels in sequential, but wrong order. frame = reorder64(frame.view(np.uint64)).view(np.uint8).reshape(-1, 8) # Correct ordering. frame = frame.take(np.array([0, 2, 1, 3, 4, 6, 5, 7]), axis=1) # The look-up table splits each data byte into 4 measurements. # Using transpose ensures channels are first, then time samples, then # those 4 measurements, so the reshape orders the samples correctly. # Another transpose ensures samples are the first dimension. return lut2bit1.take(frame.T, axis=0).reshape(8, -1).T
[docs]def encode_8chan_2bit_fanout4(values): """Encode payload for 8 channels using 2 bits, fan-out 4 (64 tracks).""" reorder_channels = np.array([0, 2, 1, 3, 4, 6, 5, 7]) values = values[:, reorder_channels].reshape(-1, 4, 8).transpose(0, 2, 1) bitvalues = encode_2bit_base(values) reorder_bits = np.array([0, 2, 1, 3], dtype=np.uint8) reorder_bits.take(bitvalues, out=bitvalues) bitvalues <<= np.array([0, 2, 4, 6], dtype=np.uint8) out = np.bitwise_or.reduce(bitvalues, axis=-1).ravel().view(np.uint64) return reorder64(out).view('<u8')
[docs]class Mark4Payload(PayloadBase): """Container for decoding and encoding Mark 4 payloads. Parameters ---------- words : `~numpy.ndarray` Array containg LSB unsigned words (with the right size) that encode the payload. header : `~baseband.mark4.Mark4Header`, optional If given, used to infer the number of channels, bps, and fanout. sample_shape : tuple Shape of the samples (e.g., (nchan,)). Default: (1,). bps : int, optional Number of bits per sample, used if ``header`` is not given. Default: 2. fanout : int, optional Number of tracks every bit stream is spread over, used if ``header`` is not given. Default: 1. magnitude_bit : int, optional Magnitude bits for all tracks packed together. Used to index encoder and decoder. Default: assume standard Mark 4 payload, for which number of channels, bps, and fanout suffice. Notes ----- The total number of tracks is ``nchan * bps * fanout``. """ _dtype_word = None # Decoders keyed by (nchan, nbit, fanout). _encoders = {(2, 2, 4): encode_2chan_2bit_fanout4, (4, 2, 4): encode_4chan_2bit_fanout4, (8, 2, 2): encode_8chan_2bit_fanout2, (8, 2, 4): encode_8chan_2bit_fanout4, (16, 0xf0faf050f0faf05, 2): encode_16chan_2bit_fanout2_ft} _decoders = {(2, 2, 4): decode_2chan_2bit_fanout4, (4, 2, 4): decode_4chan_2bit_fanout4, (8, 2, 2): decode_8chan_2bit_fanout2, (8, 2, 4): decode_8chan_2bit_fanout4, (16, 0xf0faf050f0faf05, 2): decode_16chan_2bit_fanout2_ft} _sample_shape_maker = namedtuple('SampleShape', 'nchan') def __init__(self, words, header=None, *, sample_shape=(1,), bps=2, fanout=1, magnitude_bit=None, complex_data=False): if header is not None: magnitude_bit = header['magnitude_bit'] bps = 2 if magnitude_bit.any() else 1 ta = header.track_assignment if bps == 1 or np.all(magnitude_bit[ta] == [False, True]): magnitude_bit = None else: magnitude_bit = (np.packbits(magnitude_bit) .view(header.stream_dtype).item()) ntrack = header.ntrack fanout = header.fanout sample_shape = (ntrack // (bps * fanout),) self._nbytes = header.payload_nbytes else: ntrack = sample_shape[0] * bps * fanout magnitude_bit = None self._dtype_word = MARK4_DTYPES[ntrack] self.fanout = fanout super().__init__(words, sample_shape=sample_shape, bps=bps, complex_data=complex_data) self._coder = (self.sample_shape.nchan, (self.bps if magnitude_bit is None else magnitude_bit), self.fanout) @fixedvalue def complex_data(self): return False
[docs] @classmethod def fromfile(cls, fh, header=None, **kwargs): """Read payload from filehandle and decode it into data. Parameters ---------- fh : filehandle From which data is read. header : `~baseband.mark4.Mark4Header` Used to infer ``payload_nbytes``, ``bps``, ``sample_shape``, and ``dtype``. If not given, those have to be passed in. """ if header is not None: kwargs.setdefault('dtype', header.stream_dtype) return super().fromfile(fh, header=header, **kwargs)
[docs] @classmethod def fromdata(cls, data, header): """Encode data as payload, using header information.""" if data.dtype.kind == 'c': raise ValueError("Mark4 format does not support complex data.") if header.sample_shape != data.shape[1:]: raise ValueError("header is for {0} channels but data has {1}" .format(header.nchan, data.shape[-1])) words = np.empty(header.payload_nbytes // header.stream_dtype.itemsize, header.stream_dtype) self = cls(words, header) self[:] = data return self