Source code for baseband.guppi.payload

# Licensed under the GPLv3 - see LICENSE
"""Payload for GUPPI format."""
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)
import numpy as np
from collections import namedtuple

from ..vlbi_base.payload import VLBIPayloadBase


__all__ = ['GUPPIPayload']


def decode_8bit(words):
    return words.view(np.int8, np.ndarray).astype(np.float32)


def encode_8bit(values):
    return np.clip(np.rint(values), -128, 127).astype(np.int8)


[docs]class GUPPIPayload(VLBIPayloadBase): """Container for decoding and encoding GUPPI payloads. Parameters ---------- words : `~numpy.ndarray` Array containg LSB unsigned words (with the right size) that encode the payload. header : `~baseband.guppi.GUPPIHeader` Header that provides information about how the payload is encoded. If not given, the following arguments have to be passed in. bps : int, optional Number of bits per sample part (i.e., per channel and per real or imaginary component). Default: 8. sample_shape : tuple, optional Shape of the samples; e.g., (nchan,). Default: (). complex_data : bool, optional Whether data are complex. Default: `False`. channels_first : bool, optional Whether the encoded payload is stored as (nchan, nsample, npol), rather than (nsample, nchan, npol). Default: `True`. """ _decoders = { 8: decode_8bit} _encoders = { 8: encode_8bit} _dtype_word = np.dtype('int8') _sample_shape_maker = namedtuple('SampleShape', 'npol, nchan') def __init__(self, words, header=None, sample_shape=(), bps=8, complex_data=False, channels_first=True): if header is not None: bps = header.bps sample_shape = header.sample_shape complex_data = header.complex_data channels_first = header.channels_first super(GUPPIPayload, self).__init__(words, sample_shape=sample_shape, bps=bps, complex_data=complex_data) self.channels_first = channels_first # If channels first, _item_to_slices must act on per-channel words. By # resetting self._bpfs, we allow _item_to_slices to work unmodified. self._true_bpfs = self._bpfs # Save the true bpfs regardless. if self.channels_first: self._bpfs //= self.sample_shape.nchan
[docs] @classmethod def fromfile(cls, fh, header=None, memmap=False, payload_nbytes=None, **kwargs): """Read or map encoded data in file. Parameters ---------- fh : filehandle Handle to the file which will be read or mapped. header : `~baseband.guppi.GUPPIHeader`, optional If given, used to infer ``payload_nbytes``, ``bps``, ``sample_shape``, ``complex_data`` and ``channels_first``. If not given, those have to be passed in. memmap : bool, optional If `False` (default), read from file. Otherwise, map the file in memory (see `~numpy.memmap`). payload_nbytes : int, optional Number of bytes to read (default: as given in ``header``, ``cls._nbytes``, or, for mapping, to the end of the file). **kwargs Additional arguments are passed on to the class initializer. These are only needed if ``header`` is not given. """ if payload_nbytes is None: payload_nbytes = (cls._nbytes if header is None else header.payload_nbytes) if not memmap: return super(GUPPIPayload, cls).fromfile( fh, header=header, payload_nbytes=payload_nbytes, **kwargs) words_shape = (payload_nbytes // cls._dtype_word.itemsize,) if hasattr(fh, 'memmap'): words = fh.memmap(dtype=cls._dtype_word, shape=(None if payload_nbytes is None else words_shape)) else: mode = fh.mode.replace('b', '') offset = fh.tell() words = np.memmap( fh, mode=mode, dtype=cls._dtype_word, offset=offset, shape=(None if payload_nbytes is None else words_shape)) fh.seek(offset + words.size * words.dtype.itemsize) return cls(words, header=header, **kwargs)
[docs] @classmethod def fromdata(cls, data, header=None, bps=8, channels_first=True): """Encode data as a payload. Parameters ---------- data : `~numpy.ndarray` Data to be encoded. The last dimension is taken as the number of channels. header : `~baseband.guppi.GUPPIHeader`, optional If given, used to infer the ``bps`` and ``channels_first``. bps : int, optional Bits per elementary sample, used if ``header`` is `None`. Default: 8. channels_first : bool, optional Whether encoded data should be ordered as (nchan, nsample, npol), used if ``header`` is `None`. Default: `True`. """ if header is not None: bps = header.bps channels_first = header.channels_first sample_shape = data.shape[1:] complex_data = data.dtype.kind == 'c' try: encoder = cls._encoders[bps] except KeyError: raise ValueError("{0} cannot encode data with {1} bits" .format(cls.__name__, bps)) # If channels-first, switch to (nchan, nsample, npol); otherwise use # (nsample, nchan, npol). if channels_first: data = data.transpose(2, 0, 1) else: data = data.transpose(0, 2, 1) if complex_data: data = data.view((data.real.dtype, (2,))) words = encoder(data).ravel().view(cls._dtype_word) return cls(words, sample_shape=sample_shape, bps=bps, complex_data=complex_data, channels_first=channels_first)
def __len__(self): """Number of samples in the payload.""" return self.nbytes * 8 // self._true_bpfs def __getitem__(self, item=()): # GUPPI data may be stored as (nsample, nchan, npol) or, if # channels-first, (nchan, nsample, npol), both of which require # reshaping to get the usual order of (nsample, npol, nchan). decoder = self._decoders[self._coder] # If we want to decode the entire dataset. if item is () or item == slice(None): data = decoder(self.words) if self.complex_data: data = data.view(self.dtype) if self.channels_first: # Reshape to (nchan, nsample, npol); transpose to usual order. return (data.reshape(self.sample_shape.nchan, -1, self.sample_shape.npol) .transpose(1, 2, 0)) else: # Reshape to (nsample, nchan, npol); transpose to usual order. return (data.reshape(-1, self.sample_shape.nchan, self.sample_shape.npol) .transpose(0, 2, 1)) words_slice, data_slice = self._item_to_slices(item) if self.channels_first: # Reshape words so channels fall along first axis, then decode. decoded_words = decoder( self.words.reshape(self.sample_shape.nchan, -1)[:, words_slice]) # Reshape to (nsample, nchan, npol), then use data_slice. return (decoded_words.view(self.dtype).T .reshape(-1, *self.sample_shape)[data_slice]) else: # data_slice assumes (npol, nchan), so transpose before using it. return (decoder(self.words[words_slice]).view(self.dtype) .reshape(-1, self.sample_shape.nchan, self.sample_shape.npol) .transpose(0, 2, 1)[data_slice]) def __setitem__(self, item, data): if item is () or item == slice(None): words_slice = data_slice = slice(None) else: words_slice, data_slice = self._item_to_slices(item) data = np.asanyarray(data) # Check if the new data spans an entire word and is correctly shaped. # If so, skip decoding. If not, decode appropriate words and insert # new data. if not (data_slice == slice(None) and data.shape[-2:] == self.sample_shape and data.dtype.kind == self.dtype.kind): decoder = self._decoders[self._coder] if self.channels_first: decoded_words = decoder(np.ascontiguousarray( self.words.reshape( self.sample_shape.nchan, -1)[:, words_slice])) current_data = (decoded_words.view(self.dtype) .T.reshape(-1, *self.sample_shape)) else: current_data = (decoder(self.words[words_slice]) .view(self.dtype) .reshape(-1, self.sample_shape.nchan, self.sample_shape.npol) .transpose(0, 2, 1)) current_data[data_slice] = data data = current_data # Reshape before separating real and complex components. if self.channels_first: data = data.reshape(-1, self.sample_shape.nchan).T else: data = data.transpose(0, 2, 1) # Separate real and complex components. if data.dtype.kind == 'c': data = data.view((data.real.dtype, (2,))) # Select encoder. encoder = self._encoders[self._coder] # Reshape and encode words. if self.channels_first: self.words.reshape(self.sample_shape.nchan, -1)[:, words_slice] = ( encoder(data).reshape(self.sample_shape.nchan, -1) .view(self._dtype_word)) else: self.words[words_slice] = (encoder(data.ravel()) .view(self._dtype_word)) data = property(__getitem__, doc="Full decoded payload.")