Source code for baseband.guppi.payload

# Licensed under the GPLv3 - see LICENSE
"""Payload for GUPPI format."""
from collections import namedtuple

import numpy as np

from ..base.payload import PayloadBase


__all__ = ['GUPPIPayload']


def decode_8bit(words):
    return words.view(np.int8, np.ndarray).astype(np.float32)


def encode_8bit(values):
    return np.clip(np.rint(values), -128, 127).astype(np.int8)


[docs] class GUPPIPayload(PayloadBase): """Container for decoding and encoding GUPPI payloads. Parameters ---------- words : `~numpy.ndarray` Array containg LSB unsigned words (with the right size) that encode the payload. header : `~baseband.guppi.GUPPIHeader` Header that provides information about how the payload is encoded. If not given, the following arguments have to be passed in. bps : int, optional Number of bits per sample part (i.e., per channel and per real or imaginary component). Default: 8. sample_shape : tuple, optional Shape of the samples; e.g., (nchan,). Default: (). complex_data : bool, optional Whether data are complex. Default: `False`. channels_first : bool, optional Whether the encoded payload is stored as (nchan, nsample, npol), rather than (nsample, nchan, npol). Default: `True`. """ _decoders = { 8: decode_8bit} _encoders = { 8: encode_8bit} _dtype_word = np.dtype('int8') _memmap = True _sample_shape_maker = namedtuple('SampleShape', 'npol, nchan') def __init__(self, words, *, header=None, sample_shape=(), bps=8, complex_data=False, channels_first=True): super().__init__(words, header=header, sample_shape=sample_shape, bps=bps, complex_data=complex_data) self.channels_first = (channels_first if header is None else header.channels_first) # If channels first, _item_to_slices must act on per-channel words. By # resetting self._bpfs, we allow _item_to_slices to work unmodified. self._true_bpfs = self._bpfs # Save the true bpfs regardless. if self.channels_first: self._bpfs //= self.sample_shape.nchan
[docs] @classmethod def fromdata(cls, data, header=None, bps=8, channels_first=True): """Encode data as a payload. Parameters ---------- data : `~numpy.ndarray` Data to be encoded. The trailing dimensions are taken as the sample shape, normally (npol, nchan). header : `~baseband.guppi.GUPPIHeader`, optional If given, used to infer the ``bps`` and ``channels_first``. bps : int, optional Bits per elementary sample, used only if ``header`` is `None`. Default: 8. channels_first : bool, optional If `True`, encode data (nchan, nsample, npol). otherwise as (nsample, nchan, npol). Used only if ``header`` is `None`. Default: `True`. """ return super().fromdata(data, header=header, bps=bps, channels_first=channels_first)
def __len__(self): """Number of samples in the payload.""" return self.nbytes * 8 // self._true_bpfs def _decode(self, words, words_slice): if self.channels_first: # Before decoding, reshape so channels fall along first axis. decoded_words = super()._decode( words.reshape(self.sample_shape.nchan, -1)[:, words_slice]) # Reshape to (nsample, nchan, npol), as expected by data_slice. return decoded_words.T.reshape(-1, *self.sample_shape) else: # Transpose result to allow data_slice to assume (npol, nchan). return (super()._decode(words[words_slice]) .reshape(-1, self.sample_shape.nchan, self.sample_shape.npol) .transpose(0, 2, 1)) def __getitem__(self, item=()): # GUPPI data may be stored as (nsample, nchan, npol) or, if # channels-first, (nchan, nsample, npol), both of which require # reshaping to get the usual order of (nsample, npol, nchan). words_slice, data_slice = self._item_to_slices(item) return self._decode(self.words, words_slice)[data_slice] def __setitem__(self, item, data): words_slice, data_slice = self._item_to_slices(item) data = np.asanyarray(data) # Check if the new data spans an entire word and is correctly shaped. # If so, skip decoding. If not, decode appropriate words and insert # new data. if not (data_slice == (slice(None),) and data.shape[-2:] == self.sample_shape and data.dtype.kind == self.dtype.kind): current_data = self._decode(self.words, words_slice) current_data[data_slice] = data data = current_data # Reshape before separating real and complex components. if self.channels_first: encoded_words = (self._encode(data.transpose(2, 0, 1)) .reshape(self.sample_shape.nchan, -1) .view(self._dtype_word)) self.words.reshape(self.sample_shape.nchan, -1)[:, words_slice] = ( encoded_words) else: encoded_words = (self._encode(data.transpose(0, 2, 1)) .ravel().view(self._dtype_word)) self.words[words_slice] = encoded_words data = property(__getitem__, doc="Full decoded payload.")