Source code for

# Licensed under the GPLv3 - see LICENSE
"""Payload for HDF5 format."""
from functools import reduce
import operator

import numpy as np

from baseband.vdif import VDIFPayload
from baseband.base.payload import PayloadBase

__all__ = ['HDF5Payload', 'HDF5RawPayload', 'HDF5CodedPayload',
           'HDF5DatasetWrapper', 'DTYPE_C4']

# Ideally, we'd use 'r' and 'i' here, to match the use for
# other complex numbers inside h5py, but unfortunately that
# needs a numpy 'c4' dtype to actually exist.
DTYPE_C4 = np.dtype([('real', '<f2'), ('imag', '<f2')])
"""Numpy dtype used to encode half-precision complex numbers."""

[docs]class HDF5Payload: """Container for decoding and encoding HDF5 payloads. The data will be taken to represent their values directly unless the header has a ``bps`` attribute, or ``bps`` is given explicitly. Parameters ---------- words : `~h5py.Dataset` Array containg data as stored in the HDF5 file, which possibly are encoded similar to a VDIF payload. header : ``, optional Header providing information about whether, and if so, how the payload is encoded. If not given and if the data are encoded, then the following should be passed in. sample_shape : tuple, optional Shape of the samples; e.g., (nchan,). Default: (). bps : int, optional Number of bits per sample part (i.e., per channel and per real or imaginary component). No default. complex_data : bool, optional Whether data are complex. Default: `False`. """ def __new__(cls, words, header=None, **kwargs): if 'bps' in kwargs or hasattr(header, 'bps'): cls = HDF5CodedPayload else: cls = HDF5RawPayload return super().__new__(cls)
[docs] @classmethod def fromfile(cls, fh, header=None): """Get payload words from HDF5 file or group. Parameters ---------- fh : `~h5py.File` or `~h5py.Group` Handle to the HDF5 file/group which has an 'payload' dataset. If the payload does not exist, it will be created. header : ``, optional Must be given for encoded payloads, or to create a payload. """ if 'payload' in fh: return cls(fh['payload'], header) if hasattr(header, 'bps'): nsample = reduce(operator.mul, header.sample_shape, header.samples_per_frame) shape = ((header.bps * (2 if header.complex_data else 1) * nsample + 31) // 32,) else: shape = (header.samples_per_frame,) + header.sample_shape words = fh.create_dataset('payload', shape=shape, dtype=header.encoded_dtype) return cls(words, header)
[docs]class HDF5DatasetWrapper: """Make a HDF5 Dataset look a bit more like ndarray. In particular, adds ``nbytes`` and ``itemsize`` properties, and implements a ``view`` method. """ def __init__(self, words): self.words = words def __getattr__(self, attr): if not attr.startswith('_'): try: return getattr(self.words, attr) except AttributeError: pass return self.__getattribute__(attr) @property def nbytes(self): return self.words.size * self.itemsize @property def itemsize(self): return self.words.dtype.itemsize def __getitem__(self, item): return self.words[item] def __setitem__(self, item, value): self.words[item] = value
[docs] def view(self, *args, **kwargs): # Needed in case a whole data set is decoded in one go. return self.words[:].view(*args, **kwargs)
[docs]class HDF5RawPayload(HDF5DatasetWrapper, HDF5Payload): def __init__(self, words, header=None): self.words = words if header is not None: self._dtype = header.dtype assert header.encoded_dtype == words.dtype assert header.sample_shape == self.sample_shape assert header.samples_per_frame == len(self) elif words.dtype == DTYPE_C4: self._dtype = np.dtype('c8') else: self._dtype = words.dtype @property def data(self): return self[()] @property def sample_shape(self): return self.words.shape[1:] def __len__(self): return len(self.words) def __getitem__(self, item): result = super().__getitem__(item) if result.dtype == DTYPE_C4: result = result.view(DTYPE_C4['real']).astype('f4').view('c8') return result.astype(self.dtype, copy=False) def __setitem__(self, item, value): if self.words.dtype == DTYPE_C4: value = (value.view(value.real.dtype) .astype(DTYPE_C4['real']).view(DTYPE_C4)) super().__setitem__(item, value) @property def dtype(self): """Numeric type of the decoded data array.""" return self._dtype
[docs]class HDF5CodedPayload(HDF5Payload, PayloadBase): _decoders = VDIFPayload._decoders _encoders = VDIFPayload._encoders def __init__(self, words, header=None, sample_shape=(), bps=None, complex_data=False): # Wrap the h5py.Dataset since it misses a few ndarray attributes. # In particular, nbytes, itemsize. words = HDF5DatasetWrapper(words) if header is not None: sample_shape = header.sample_shape bps = header.bps complex_data = header.complex_data super().__init__(words, sample_shape=sample_shape, bps=bps, complex_data=complex_data)