Source code for baseband.guppi.payload

# Licensed under the GPLv3 - see LICENSE
"""Payload for GUPPI format."""
from collections import namedtuple

import numpy as np

from ..base.payload import PayloadBase


__all__ = ['GUPPIPayload']


def decode_8bit(words):
    return words.view(np.int8, np.ndarray).astype(np.float32)


def encode_8bit(values):
    return np.clip(np.rint(values), -128, 127).astype(np.int8)


[docs]class GUPPIPayload(PayloadBase): """Container for decoding and encoding GUPPI payloads. Parameters ---------- words : `~numpy.ndarray` Array containg LSB unsigned words (with the right size) that encode the payload. header : `~baseband.guppi.GUPPIHeader` Header that provides information about how the payload is encoded. If not given, the following arguments have to be passed in. bps : int, optional Number of bits per sample part (i.e., per channel and per real or imaginary component). Default: 8. sample_shape : tuple, optional Shape of the samples; e.g., (nchan,). Default: (). complex_data : bool, optional Whether data are complex. Default: `False`. channels_first : bool, optional Whether the encoded payload is stored as (nchan, nsample, npol), rather than (nsample, nchan, npol). Default: `True`. """ _decoders = { 8: decode_8bit} _encoders = { 8: encode_8bit} _dtype_word = np.dtype('int8') _memmap = True _sample_shape_maker = namedtuple('SampleShape', 'npol, nchan') def __init__(self, words, *, header=None, sample_shape=(), bps=8, complex_data=False, channels_first=True): super().__init__(words, header=header, sample_shape=sample_shape, bps=bps, complex_data=complex_data) self.channels_first = (channels_first if header is None else header.channels_first) # If channels first, _item_to_slices must act on per-channel words. By # resetting self._bpfs, we allow _item_to_slices to work unmodified. self._true_bpfs = self._bpfs # Save the true bpfs regardless. if self.channels_first: self._bpfs //= self.sample_shape.nchan
[docs] @classmethod def fromdata(cls, data, header=None, bps=8, channels_first=True): """Encode data as a payload. Parameters ---------- data : `~numpy.ndarray` Data to be encoded. The last dimension is taken as the number of channels. header : `~baseband.guppi.GUPPIHeader`, optional If given, used to infer the ``bps`` and ``channels_first``. bps : int, optional Bits per elementary sample, used if ``header`` is `None`. Default: 8. channels_first : bool, optional Whether encoded data should be ordered as (nchan, nsample, npol), used if ``header`` is `None`. Default: `True`. """ if header is not None: bps = header.bps channels_first = header.channels_first sample_shape = data.shape[1:] complex_data = data.dtype.kind == 'c' try: encoder = cls._encoders[bps] except KeyError: raise ValueError("{0} cannot encode data with {1} bits" .format(cls.__name__, bps)) # If channels-first, switch to (nchan, nsample, npol); otherwise use # (nsample, nchan, npol). if channels_first: data = data.transpose(2, 0, 1) else: data = data.transpose(0, 2, 1) if complex_data: data = data.view((data.real.dtype, (2,))) words = encoder(data).ravel().view(cls._dtype_word) return cls(words, sample_shape=sample_shape, bps=bps, complex_data=complex_data, channels_first=channels_first)
def __len__(self): """Number of samples in the payload.""" return self.nbytes * 8 // self._true_bpfs def __getitem__(self, item=()): # GUPPI data may be stored as (nsample, nchan, npol) or, if # channels-first, (nchan, nsample, npol), both of which require # reshaping to get the usual order of (nsample, npol, nchan). decoder = self._decoders[self._coder] # If we want to decode the entire dataset. if item == () or item == slice(None): data = decoder(self.words) if self.complex_data: data = data.view(self.dtype) if self.channels_first: # Reshape to (nchan, nsample, npol); transpose to usual order. return (data.reshape(self.sample_shape.nchan, -1, self.sample_shape.npol) .transpose(1, 2, 0)) else: # Reshape to (nsample, nchan, npol); transpose to usual order. return (data.reshape(-1, self.sample_shape.nchan, self.sample_shape.npol) .transpose(0, 2, 1)) words_slice, data_slice = self._item_to_slices(item) if self.channels_first: # Reshape words so channels fall along first axis, then decode. decoded_words = decoder(self.words.reshape(self.sample_shape.nchan, -1)[:, words_slice]) # Reshape to (nsample, nchan, npol), then use data_slice. return (decoded_words.view(self.dtype).T .reshape(-1, *self.sample_shape)[data_slice]) else: # data_slice assumes (npol, nchan), so transpose before using it. return (decoder(self.words[words_slice]).view(self.dtype) .reshape(-1, self.sample_shape.nchan, self.sample_shape.npol) .transpose(0, 2, 1)[data_slice]) def __setitem__(self, item, data): if item == () or item == slice(None): words_slice = data_slice = slice(None) else: words_slice, data_slice = self._item_to_slices(item) data = np.asanyarray(data) # Check if the new data spans an entire word and is correctly shaped. # If so, skip decoding. If not, decode appropriate words and insert # new data. if not (data_slice == slice(None) and data.shape[-2:] == self.sample_shape and data.dtype.kind == self.dtype.kind): decoder = self._decoders[self._coder] if self.channels_first: decoded_words = decoder(np.ascontiguousarray( self.words.reshape( self.sample_shape.nchan, -1)[:, words_slice])) current_data = (decoded_words.view(self.dtype) .T.reshape(-1, *self.sample_shape)) else: current_data = (decoder(self.words[words_slice]) .view(self.dtype) .reshape(-1, self.sample_shape.nchan, self.sample_shape.npol) .transpose(0, 2, 1)) current_data[data_slice] = data data = current_data # Reshape before separating real and complex components. if self.channels_first: data = data.reshape(-1, self.sample_shape.nchan).T else: data = data.transpose(0, 2, 1) # Separate real and complex components. if data.dtype.kind == 'c': data = data.view((data.real.dtype, (2,))) # Select encoder. encoder = self._encoders[self._coder] # Reshape and encode words. if self.channels_first: self.words.reshape(self.sample_shape.nchan, -1)[:, words_slice] = ( encoder(data).reshape(self.sample_shape.nchan, -1) .view(self._dtype_word)) else: self.words[words_slice] = (encoder(data.ravel()) .view(self._dtype_word)) data = property(__getitem__, doc="Full decoded payload.")