# Licensed under the GPLv3 - see LICENSE
"""
Definitions for VLBI VDIF frames and frame sets.
Implements a VDIFFrame class that can be used to hold a header and a
payload, providing access to the values encoded in both. Also, define
a VDIFFrameSet class that combines a set of frames from different threads.
For the VDIF specification, see https://www.vlbi.org/vdif
"""
import numpy as np
from ..base.frame import FrameBase
from .header import VDIFHeader, VDIFBaseHeader
from .payload import VDIFPayload
__all__ = ['VDIFFrame', 'VDIFFrameSet']
[docs]
class VDIFFrame(FrameBase):
"""Representation of a VDIF data frame, consisting of a header and payload.
Parameters
----------
header : `~baseband.vdif.VDIFHeader`
Wrapper around the encoded header words, providing access to the
header information.
payload : `~baseband.vdif.VDIFPayload`
Wrapper around the payload, provding mechanisms to decode it.
valid : bool, optional
Whether the data are valid. Default: inferred from header.
Note that ``header`` is changed in-place if `True` or `False`.
verify : bool
Whether or not to do basic assertions that check the integrity
(e.g., that channel information and whether or not data are complex
are consistent between header and data). Default: `True`.
Notes
-----
The Frame can also be instantiated using class methods:
fromfile : read header and payload from a filehandle
fromdata : encode data as payload
Of course, one can also do the opposite:
tofile : method to write header and payload to filehandle
data : property that yields full decoded payload
One can decode part of the payload by indexing or slicing the frame.
If the frame does not contain valid data, all values returned are set
to ``self.fill_value``.
A number of properties are defined: `shape`, `dtype` and `size` are
the shape, type and number of complete samples of the data array, and
`nbytes` the frame size in bytes. Furthermore, the frame acts as a
dictionary, with keys those of the header. Any attribute that is not
defined on the frame itself, such as ``.time`` will be looked up on the
header as well.
"""
_header_class = VDIFHeader
_payload_class = VDIFPayload
[docs]
def verify(self):
"""Verify integrity.
Checks consistency between the header information and payload
data shape and type.
"""
super().verify()
assert self.header.complex_data == (self.payload.dtype.kind == 'c')
assert self.payload.shape == (self.header.samples_per_frame,
self.header.nchan)
@property
def valid(self):
"""Whether frame contains valid data.
This is just the opposite of the ``invalid_data`` item in the header.
If set, that header item is adjusted correspondingly.
"""
return not self.header['invalid_data']
@valid.setter
def valid(self, valid):
self.header['invalid_data'] = not valid
[docs]
@classmethod
def fromfile(cls, fh, edv=None, verify=True):
"""Read a frame from a filehandle.
Parameters
----------
fh : filehandle
From which the header and payload are read.
edv : int, False, or None, optional
Extended Data Version. `False` is for legacy headers. If `None`
(default), it will be determined from the words themselves.
verify : bool, optional
Whether or not to do basic assertions that check the integrity
(e.g., that channel information and whether or not data are complex
are consistent between header and data). Default: `True`.
"""
header = cls._header_class.fromfile(fh, edv, verify)
payload = cls._payload_class.fromfile(fh, header=header)
# Since header was (optionally) verified, and payload was initialized
# using header, frame verification is unnecessary.
return cls(header, payload, verify=False)
[docs]
@classmethod
def from_mark5b_frame(cls, mark5b_frame, verify=True, **kwargs):
"""Construct an Mark5B over VDIF frame (EDV=0xab).
Any additional keywords can be used to set VDIF header properties
not found in the Mark 5B header (such as station).
See https://vlbi.org/wp-content/uploads/2019/03/vdif_extension_0xab.pdf
"""
m5h, m5pl = mark5b_frame.header, mark5b_frame.payload
header = cls._header_class.from_mark5b_header(
m5h, nchan=m5pl.sample_shape.nchan, bps=m5pl.bps,
invalid_data=not mark5b_frame.valid, **kwargs)
payload = cls._payload_class(m5pl.words, header)
return cls(header, payload, verify)
[docs]
class VDIFFrameSet:
"""Representation of a set of VDIF frames, combining different threads.
Parameters
----------
frames : list of `~baseband.vdif.VDIFFrame`
Should all cover the same time span.
header0 : `~baseband.vdif.VDIFHeader`
First header of the frame set. If `None` (default), is extracted from
``frames[0]``.
Notes
-----
The FrameSet can also be read instantiated using class methods:
fromfile : read frames from a filehandle, optionally selecting threads
fromdata : encode data as a set of frames
Of course, one can also do the opposite:
tofile : write frames to filehandle
data : property that yields full decoded frame payloads
One can decode part of the payload by indexing or slicing the frame.
If the frame does not contain valid data, all values returned are set
to ``self.fill_value``.
A number of properties are defined: `shape`, `dtype` and `size` are
the shape, type and number of complete samples of the data array, and
`nbytes` the frame size in bytes. Like a VDIFFrame, the frame set acts
as a dictionary, with keys those of the header of the first frame
(available via ``.header0``). Any attribute that is not defined on the
frame set itself, such as ``.time`` will also be looked up on the header.
"""
def __init__(self, frames, header0=None):
self.frames = frames
if header0 is None:
self.header0 = self.frames[0].header
else:
self.header0 = header0
[docs]
@classmethod
def fromfile(cls, fh, thread_ids=None, edv=None, verify=True):
"""Read a frame set from a file, starting at the current location.
Parameters
----------
fh : filehandle
Handle to the VDIF file. Should be at the location where the
frames are read from.
thread_ids : list or None, optional
The thread ids that should be read. If `None` (default), continue
reading threads as long as the frame number does not increase.
edv : int or None, optional
The expected extended data version for the VDIF Header. If `None`
(default), use that of the first frame. (Passing it in slightly
improves file integrity checking.)
verify : bool, optional
Whether to do (light) sanity checks on the header. Default: `True`.
Returns
-------
frameset : `~baseband.vdif.VDIFFrameSet`
Its ``frames`` property holds a list of frames (in order of either
their ``thread_id`` or following the input ``thread_ids`` list).
Use the `data` attribute to convert to an array.
"""
header0 = VDIFHeader.fromfile(fh, edv, verify)
edv = header0.edv
frame_nr = header0['frame_nr']
frames = {}
header = header0
while True:
thread_id = header['thread_id']
if header['frame_nr'] != frame_nr or thread_id in frames:
# Exit if we got to the next frame, moving offset back to
# before the header that belongs to the next frame.
# Note that a duplicate thread can happen for a frame_rate
# of 1/s (see gh-485) or perhaps an unfortunate gap.
# Sadly, it seems we cannot always rely on header['seconds'].
fh.seek(-header.nbytes, 1)
break
if thread_ids is None or thread_id in thread_ids:
payload = VDIFPayload.fromfile(fh, header=header)
# Since header was (optionally) verified, and payload was
# initialized using header, no frame verification is needed.
frames[thread_id] = VDIFFrame(header, payload, verify=False)
else:
fh.seek(header.payload_nbytes, 1)
try:
header = VDIFHeader.fromfile(fh, edv, verify)
except (EOFError, AssertionError):
if thread_ids is None or len(frames) == len(thread_ids):
break
else:
raise
if thread_ids and len(frames) < len(thread_ids):
raise OSError("could not find all requested frames.")
# Turn dict of frames into a list, following order given by
# thread_ids, or just sorting by their own thread_id
if thread_ids is None:
thread_ids = sorted(frames.keys())
frames = [frames[tid] for tid in thread_ids]
return cls(frames, header0)
[docs]
def tofile(self, fh):
"""Write all encoded frames to filehandle."""
for frame in self.frames:
frame.tofile(fh)
[docs]
@classmethod
def fromdata(cls, data, headers=None, verify=True, **kwargs):
"""Construct a set of frames from data and headers.
Parameters
----------
data : `~numpy.ndarray`
Array holding complex or real data to be encoded. Dimensions
should be (samples_per_frame, nthread, nchan).
headers : `~baseband.vdif.VDIFHeader`, list of same, or None
If a single header, a list with increasing ``thread_id`` is
generated. If not given, will attempt to generate a header from
the keyword arguments.
verify : bool
Whether or not to do basic assertions that check the integrety
(e.g., that channel information and whether or not data are complex
are consistent between header and data). Default: `True`.
**kwargs
If ``header`` is not given, these are used to initialize one.
Returns
-------
frameset : `~baseband.vdif.VDIFFrameSet`
"""
assert data.ndim == 3
if not isinstance(headers, (list, tuple)):
if headers is None:
kwargs.setdefault('thread_id', 0)
header = VDIFHeader.fromvalues(verify=verify, **kwargs)
else:
header = headers.copy()
header['thread_id'] = 0
headers = [header]
for thread_id in range(1, data.shape[1]):
header = header.copy()
header['thread_id'] = thread_id
headers.append(header)
frames = [VDIFFrame.fromdata(d, h, verify=verify)
for d, h in zip(data.transpose(1, 0, 2), headers)]
return cls(frames)
@property
def nbytes(self):
"""Size of the encoded frame in bytes."""
return len(self.frames) * self.frames[0].nbytes
@property
def sample_shape(self):
"""Shape of a sample in the frameset (nthread, nchan)."""
return (len(self.frames),) + self.frames[0].sample_shape
def __len__(self):
"""Number of samples in the frameset."""
return len(self.frames[0])
@property
def shape(self):
"""Shape of the frameset data."""
return (len(self),) + self.sample_shape
@property
def size(self):
"""Total number of component samples in the frameset data."""
prod = 1
for dim in self.shape:
prod *= dim
return prod
@property
def ndim(self):
"""Number of dimensions of the frameset data."""
return len(self.shape)
@property
def dtype(self):
"""Numeric type of the frameset data."""
return self.frames[0].dtype
@property
def valid(self):
"""Whether frameset contains valid data."""
valid = np.array([frame.valid for frame in self.frames])
return valid[0] if len(np.unique(valid)) == 1 else valid
@valid.setter
def valid(self, valid):
valid = np.broadcast_to(valid, (len(self.frames),))
for f, v in zip(self.frames, valid):
f.valid = v
@property
def fill_value(self):
"""Value to replace invalid data in the frameset."""
return self.frames[0].fill_value
@fill_value.setter
def fill_value(self, fill_value):
for frame in self.frames:
frame.fill_value = fill_value
def _get_frames(self, item):
"""Get frames and other information required to obtain given item.
Parameters
----------
item : int, slice, or tuple
Sample indices. An int represents a single sample, a slice a
sample range, and a tuple of ints/slices a range for multi-frame
and multi-channel data.
Returns
-------
frames : list
List of frames needed for this slice. For a list of length unity,
``single_frame`` determines whether it is an index or unit-length
slice.
frame_item : int, slice, or tuple
The item that should be gotten/set for each frame
single_sample, single_frame, single_channel : bool
Whether the sample, frame, or channel axes are simple indices,
and thus whether the corresponding dimension should be removed.
Notes
-----
The sample part of ``item`` is restricted to (tuples of) ints or
slices, so one cannot access non-contiguous samples using advanced
indexing. Futhermore, if ``item`` is a slice, a negative increment
cannot be used. The function is unable to parse payloads whose words
have unused space (eg. VDIF files with 20 bits/sample).
"""
if item == ():
return self.frames, (), False, False, False
if not isinstance(item, tuple):
return self.frames, item, not isinstance(item, slice), False, False
single_sample = not isinstance(item[0], slice)
if len(item) == 1:
return self.frames, item[0], single_sample, False, False
single_channel = (len(item) > 2
and np.empty(self.shape[2:])[item[2:]].ndim == 0)
frame_indices = np.arange(len(self.frames))[item[1]]
assert frame_indices.ndim <= 1
single_frame = frame_indices.ndim == 0
frames = [self.frames[i] for i in np.atleast_1d(frame_indices)]
return (frames, item[:1] + item[2:],
single_sample, single_frame, single_channel)
# Header behaves as a dictionary, while Payload can be indexed/sliced.
# Let frameset behave appropriately.
def __getitem__(self, item=()):
if isinstance(item, str):
# Header behaves as a dictionary. Assume base keywords are the
# same for every frame, except for thread_id (must be different)
# and invalid_data (can be different).
if item == 'thread_id':
return np.array([f.header[item] for f in self.frames])
elif (item != 'invalid_data'
and item in VDIFBaseHeader._header_parser.keys()):
return self.header0[item]
values = np.array([f.header[item] for f in self.frames])
return values[0] if len(np.unique(values)) == 1 else values
(frames, frame_item,
single_sample, single_frame, single_channel) = self._get_frames(item)
data0 = frames[0][frame_item]
if single_frame:
return data0
if single_sample:
swapped = data = np.empty((len(frames),) + data0.shape,
dtype=self.dtype)
else:
data = np.empty((data0.shape[0], len(frames))
+ data0.shape[1:], dtype=self.dtype)
swapped = data.swapaxes(0, 1)
swapped[0] = data0
for i, frame in enumerate(frames[1:]):
swapped[i+1] = frame[frame_item]
return data
def __setitem__(self, item, data):
if isinstance(item, str):
# Headers behave as dictionaries; except for thread_id and
# invalid_data, assume set base properties all to the same value.
data = np.broadcast_to(data, (len(self.frames),))
if item == 'thread_id':
if len(np.unique(data)) != len(self.frames):
raise ValueError("all thread ids should be unique.")
elif (item != 'invalid_data'
and item in VDIFBaseHeader._header_parser.keys()):
if data.strides != (0,) and len(np.unique(data)) > 1:
raise ValueError("base header keys should be identical.")
for f, value in zip(self.frames, data):
f.header[item] = value
return
(frames, frame_item,
single_sample, single_frame, single_channel) = self._get_frames(item)
if single_frame:
frames[0][frame_item] = data
return
data = np.asanyarray(data)
if single_channel:
if single_sample and data.ndim <= 1:
swapped = np.broadcast_to(data, (len(frames),))
else:
new_shape = (data.shape[0], len(frames))
swapped = np.broadcast_to(data, new_shape).swapaxes(0, 1)
else:
if single_sample or data.ndim <= 2:
new_shape = (len(frames),) + data.shape[1:]
swapped = np.broadcast_to(data, new_shape)
else:
new_shape = (data.shape[0], len(frames)) + data.shape[2:]
swapped = np.broadcast_to(data, new_shape).swapaxes(0, 1)
for frame, frame_data in zip(frames, swapped):
frame[frame_item] = frame_data
data = property(__getitem__, doc="Full decoded frame.")
[docs]
def keys(self):
return self.header0.keys()
def _ipython_key_completions_(self):
# Enables tab-completion of header keys in IPython.
return self.keys()
def __contains__(self, key):
return key in self.header0
def __getattr__(self, attr):
if attr in self.header0._properties:
if attr in VDIFBaseHeader._properties:
return getattr(self.header0, attr)
values = np.hstack([getattr(f.header, attr) for f in self.frames])
return values[0] if len(np.unique(values)) == 1 else values
else:
return self.__getattribute__(attr)
# For tests, it is useful to define equality.
def __eq__(self, other):
return (type(self) is type(other)
and len(self.frames) == len(other.frames)
and self.header0 == other.header0
and all(f1 == f2 for f1, f2 in zip(self.frames, other.frames)))