# Licensed under the GPLv3 - see LICENSE
"""Payload for GUPPI format."""
from collections import namedtuple
import numpy as np
from ..base.payload import PayloadBase
__all__ = ['GUPPIPayload']
def decode_8bit(words):
return words.view(np.int8, np.ndarray).astype(np.float32)
def encode_8bit(values):
return np.clip(np.rint(values), -128, 127).astype(np.int8)
[docs]class GUPPIPayload(PayloadBase):
"""Container for decoding and encoding GUPPI payloads.
Parameters
----------
words : `~numpy.ndarray`
Array containg LSB unsigned words (with the right size) that
encode the payload.
header : `~baseband.guppi.GUPPIHeader`
Header that provides information about how the payload is encoded.
If not given, the following arguments have to be passed in.
bps : int, optional
Number of bits per sample part (i.e., per channel and per real or
imaginary component). Default: 8.
sample_shape : tuple, optional
Shape of the samples; e.g., (nchan,). Default: ().
complex_data : bool, optional
Whether data are complex. Default: `False`.
channels_first : bool, optional
Whether the encoded payload is stored as (nchan, nsample, npol),
rather than (nsample, nchan, npol). Default: `True`.
"""
_decoders = {
8: decode_8bit}
_encoders = {
8: encode_8bit}
_dtype_word = np.dtype('int8')
_memmap = True
_sample_shape_maker = namedtuple('SampleShape', 'npol, nchan')
def __init__(self, words, *, header=None, sample_shape=(), bps=8,
complex_data=False, channels_first=True):
super().__init__(words, header=header, sample_shape=sample_shape,
bps=bps, complex_data=complex_data)
self.channels_first = (channels_first if header is None
else header.channels_first)
# If channels first, _item_to_slices must act on per-channel words. By
# resetting self._bpfs, we allow _item_to_slices to work unmodified.
self._true_bpfs = self._bpfs # Save the true bpfs regardless.
if self.channels_first:
self._bpfs //= self.sample_shape.nchan
[docs] @classmethod
def fromdata(cls, data, header=None, bps=8, channels_first=True):
"""Encode data as a payload.
Parameters
----------
data : `~numpy.ndarray`
Data to be encoded. The last dimension is taken as the number of
channels.
header : `~baseband.guppi.GUPPIHeader`, optional
If given, used to infer the ``bps`` and ``channels_first``.
bps : int, optional
Bits per elementary sample, used if ``header`` is `None`.
Default: 8.
channels_first : bool, optional
Whether encoded data should be ordered as (nchan, nsample, npol),
used if ``header`` is `None`. Default: `True`.
"""
if header is not None:
bps = header.bps
channels_first = header.channels_first
sample_shape = data.shape[1:]
complex_data = data.dtype.kind == 'c'
try:
encoder = cls._encoders[bps]
except KeyError:
raise ValueError("{0} cannot encode data with {1} bits"
.format(cls.__name__, bps))
# If channels-first, switch to (nchan, nsample, npol); otherwise use
# (nsample, nchan, npol).
if channels_first:
data = data.transpose(2, 0, 1)
else:
data = data.transpose(0, 2, 1)
if complex_data:
data = data.view((data.real.dtype, (2,)))
words = encoder(data).ravel().view(cls._dtype_word)
return cls(words, sample_shape=sample_shape, bps=bps,
complex_data=complex_data, channels_first=channels_first)
def __len__(self):
"""Number of samples in the payload."""
return self.nbytes * 8 // self._true_bpfs
def __getitem__(self, item=()):
# GUPPI data may be stored as (nsample, nchan, npol) or, if
# channels-first, (nchan, nsample, npol), both of which require
# reshaping to get the usual order of (nsample, npol, nchan).
decoder = self._decoders[self._coder]
# If we want to decode the entire dataset.
if item == () or item == slice(None):
data = decoder(self.words)
if self.complex_data:
data = data.view(self.dtype)
if self.channels_first:
# Reshape to (nchan, nsample, npol); transpose to usual order.
return (data.reshape(self.sample_shape.nchan, -1,
self.sample_shape.npol)
.transpose(1, 2, 0))
else:
# Reshape to (nsample, nchan, npol); transpose to usual order.
return (data.reshape(-1, self.sample_shape.nchan,
self.sample_shape.npol)
.transpose(0, 2, 1))
words_slice, data_slice = self._item_to_slices(item)
if self.channels_first:
# Reshape words so channels fall along first axis, then decode.
decoded_words = decoder(self.words.reshape(self.sample_shape.nchan,
-1)[:, words_slice])
# Reshape to (nsample, nchan, npol), then use data_slice.
return (decoded_words.view(self.dtype).T
.reshape(-1, *self.sample_shape)[data_slice])
else:
# data_slice assumes (npol, nchan), so transpose before using it.
return (decoder(self.words[words_slice]).view(self.dtype)
.reshape(-1, self.sample_shape.nchan,
self.sample_shape.npol)
.transpose(0, 2, 1)[data_slice])
def __setitem__(self, item, data):
if item == () or item == slice(None):
words_slice = data_slice = slice(None)
else:
words_slice, data_slice = self._item_to_slices(item)
data = np.asanyarray(data)
# Check if the new data spans an entire word and is correctly shaped.
# If so, skip decoding. If not, decode appropriate words and insert
# new data.
if not (data_slice == slice(None)
and data.shape[-2:] == self.sample_shape
and data.dtype.kind == self.dtype.kind):
decoder = self._decoders[self._coder]
if self.channels_first:
decoded_words = decoder(np.ascontiguousarray(
self.words.reshape(
self.sample_shape.nchan, -1)[:, words_slice]))
current_data = (decoded_words.view(self.dtype)
.T.reshape(-1, *self.sample_shape))
else:
current_data = (decoder(self.words[words_slice])
.view(self.dtype)
.reshape(-1, self.sample_shape.nchan,
self.sample_shape.npol)
.transpose(0, 2, 1))
current_data[data_slice] = data
data = current_data
# Reshape before separating real and complex components.
if self.channels_first:
data = data.reshape(-1, self.sample_shape.nchan).T
else:
data = data.transpose(0, 2, 1)
# Separate real and complex components.
if data.dtype.kind == 'c':
data = data.view((data.real.dtype, (2,)))
# Select encoder.
encoder = self._encoders[self._coder]
# Reshape and encode words.
if self.channels_first:
self.words.reshape(self.sample_shape.nchan, -1)[:, words_slice] = (
encoder(data).reshape(self.sample_shape.nchan, -1)
.view(self._dtype_word))
else:
self.words[words_slice] = (encoder(data.ravel())
.view(self._dtype_word))
data = property(__getitem__, doc="Full decoded payload.")