# Licensed under the GPLv3 - see LICENSE
"""Common classes for accessing baseband data as binary files and streams.
For access as binary files, the main `~baseband.base.base.FileBase` class
provides a base to which simple methods such as ``read_frame`` and
``write_frame`` can be added. Unlike normal binary file readers, the base can
be pickled: it stores the name and re-opens the file if unpickled. The
`~baseband.base.base.VLBIFileReaderBase` adds additional methods useful for
streams with small frames, to locate frames and determine the frame rate.
For access as streams, the `~baseband.base.base.StreamReaderBase` and
`~baseband.base.base.StreamWriterBase` classes allow one to process data in
terms of frames, and access them as streams of samples via the ``read`` and
``write`` methods, providing a number of private methods which can be
overridden as needed for specific data formats.
The `~baseband.base.base.FileOpener` and `~baseband.base.base.FileInfo`
classes are to help create the ``open`` and ``info`` functions that are
expected to exist for each format.
"""
import io
import functools
import inspect
import textwrap
import warnings
import operator
import pathlib
from collections import namedtuple
from contextlib import contextmanager
import numpy as np
from numpy.lib.stride_tricks import as_strided
import astropy.units as u
from astropy.utils import lazyproperty, isiterable
from ..helpers import sequentialfile as sf
from .offsets import RawOffsets
from .file_info import FileReaderInfo, StreamReaderInfo
from .utils import byte_array
__all__ = ['HeaderNotFoundError',
'FileBase', 'VLBIFileReaderBase',
'StreamBase', 'StreamReaderBase',
'VLBIStreamReaderBase', 'StreamWriterBase',
'FileInfo', 'FileOpener']
[docs]class FileBase:
"""File wrapper, used to add frame methods to a binary data file.
The underlying file is stored in ``fh_raw`` and all attributes that do not
exist on the class itself are looked up on it.
Parameters
----------
fh_raw : filehandle
Filehandle of the raw binary data file.
Notes
-----
A subclass for reading should define ``read_header``, ``read_frame``, and
``get_frame_rate`` methods, and also set an ``info`` property. Often the
standard one will suffice, i.e., ``info = FileReaderInfo()``.
A subclass for writing should define a ``write_frame`` method.
"""
fh_raw = None
def __init__(self, fh_raw):
self.fh_raw = fh_raw
def __getattr__(self, attr):
"""Try to get things on the current open file if it is not on self."""
if not attr.startswith('_'):
try:
return getattr(self.fh_raw, attr)
except AttributeError:
pass
# __getattribute__ to raise appropriate error.
return self.__getattribute__(attr)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
[docs] def close(self):
self.fh_raw.close()
[docs] @contextmanager
def temporary_offset(self, offset=None, whence=0):
"""Context manager for temporarily seeking to another file position.
To be used as part of a ``with`` statement::
with fh_raw.temporary_offset() [as fh_raw]:
with-block
On exiting the ``with-block``, the file pointer is moved back to its
original position. As a convenience, one can pass on the offset
to seek to when entering the context manager. Parameters are as
for :meth:`io.IOBase.seek`.
"""
oldpos = self.tell()
try:
if offset is not None:
self.seek(offset, whence)
yield self
finally:
self.seek(oldpos)
def __repr__(self):
return "{0}(fh_raw={1})".format(self.__class__.__name__, self.fh_raw)
def __getstate__(self):
if self.writable():
raise TypeError('cannot pickle file opened for writing')
state = self.__dict__.copy()
# IOBase instances cannot be pickled, but we can just reopen them
# when we are unpickled. Anything else may have internal state that
# needs preserving (e.g., SequentialFile), so we will assume
# it takes care of this itself.
if isinstance(self.fh_raw, io.IOBase):
fh = state.pop('fh_raw')
state['fh_info'] = {
'offset': 'closed' if fh.closed else fh.tell(),
'filename': fh.name,
'mode': fh.mode}
return state
def __setstate__(self, state):
fh_info = state.pop('fh_info', None)
if fh_info is not None:
fh = io.open(fh_info['filename'], fh_info['mode'])
if fh_info['offset'] != 'closed':
fh.seek(fh_info['offset'])
else:
fh.close()
state['fh_raw'] = fh
self.__dict__.update(state)
[docs]class VLBIFileReaderBase(FileBase):
"""VLBI wrapped file reader base class.
Parameters
----------
fh_raw : filehandle
Filehandle of the raw binary data file.
Notes
-----
A subclass should define ``read_header`` and ``read_frame`` methods.
This baseclass includes a base ``locate_frames`` method that can search
the file for a header patter. It can be overridden by a version that just
passes in the relevant pattern.
Also defined is are a ``find_header`` method which combines the above with
``read_header``, and a basic ``get_frame_rate`` methods which scans the
file for headers and determines the maximum frame number that occurs
before the jump down (which is taken to be for the next second, as in all
standard VLBI formats). This method assumes headers have a 'frame_nr'
item, and define a ``payload_nbytes`` property.
"""
info = FileReaderInfo()
[docs] def locate_frames(self, pattern, *, mask=None, frame_nbytes=None,
offset=0, forward=True, maximum=None, check=1):
"""Use a pattern to locate frame starts near the current position.
Note that the current position is always included.
Parameters
----------
pattern : header, ~numpy.ndaray, bytes, int, or iterable of int
Synchronization pattern to look for. If a header or header class,
:meth:`~baseband.base.header.VLBIHeaderBase.invariant_pattern`
is used to create a masked pattern, using invariant keys from
:meth:`~baseband.base.header.VLBIHeaderBase.invariants`.
If an `~numpy.ndarray` or `bytes` instance, a byte array view is
taken. If an (iterable of) int, the integers need to be unsigned
32 bit and will be interpreted as little-endian.
mask : ~numpy.ndarray, bytes, int, or iterable of int.
Bit mask for the pattern, with 1 indicating a given bit will
be used the comparison.
frame_nbytes : int, optional
Frame size in bytes. Defaults to the frame size in any header
passed in.
offset : int, optional
Offset from the frame start that the pattern occurs. Any
offsets inferred from masked entries are added to this (hence,
no offset needed when a header is passed in as ``pattern``).
forward : bool, optional
Seek forward if `True` (default), backward if `False`.
maximum : int, optional
Maximum number of bytes to search away from the present location.
Default: search twice the frame size if given, otherwise 1 million
(extra bytes to avoid partial patterns will be added).
Use 0 to check only at the current position.
check : int or tuple of int, optional
Frame offsets where another sync pattern should be present
(if inside the file). Ignored if ``frame_nbytes`` is not given.
Default: 1, i.e., a sync pattern should be present one
frame after the one found (independent of ``forward``),
thus helping to guarantee the frame is not corrupted.
Returns
-------
locations : list of int
Locations of sync patterns within the range scanned,
in order of proximity to the starting position.
"""
if hasattr(pattern, 'invariant_pattern'):
if frame_nbytes is None:
frame_nbytes = pattern.frame_nbytes
pattern, mask = pattern.invariant_pattern()
pattern = byte_array(pattern)
if mask is not None:
mask = byte_array(mask)
useful = np.nonzero(mask)[0]
useful_slice = slice(useful[0], useful[-1]+1)
mask = mask[useful_slice]
pattern = pattern[useful_slice]
offset += useful_slice.start
if maximum is None:
maximum = (2 * frame_nbytes if frame_nbytes else 1000000) - 1
if check is None or frame_nbytes is None:
check = np.array([], dtype=int)
check_min = check_max = 0
else:
check = np.atleast_1d(check) * frame_nbytes
# For Numpy >=1.15, can just be check.min(initial=0) in the
# calculation of start, stop below.
check_min = min(check.min(), 0)
check_max = max(check.max(), 0)
if frame_nbytes is None:
# If not set, we just let it stand in for the extra bytes
# that should be read to ensure we can even see a pattern.
frame_nbytes = offset + pattern.size
with self.temporary_offset() as fh:
# Calculate the fiducial start of the region we are looking in.
if forward:
seek_start = fh.tell()
else:
seek_start = fh.tell() - maximum
# Determine what part of the file to read, including the extra
# bits for doing the checking. Note that we ensure not to start
# before the start of the file, but we do not check for ending
# after the end of the file, as we want to avoid a possibly
# expensive seek (e.g., if the file is a SequentialFile of a lot
# of individual files). We rely instead on reading fewer bytes
# than requested if we are close to the end.
start = max(seek_start + offset + check_min, 0)
stop = max(seek_start + maximum + 1 + check_max + frame_nbytes,
start)
fh.seek(start)
data = fh.read(stop-start)
# Since we may have hit the end of the file, check what the actual
# stop position was (needed at end).
stop = start + len(data)
# We normally have read more than really needed (to check for EOF);
# select what we actually want and convert to an array of bytes.
size = min(maximum + 1 + check_max - check_min,
stop - start - pattern.size)
if size <= 0:
return []
data = np.frombuffer(data, dtype='u1', count=size+pattern.size)
# We match in two steps, first matching just the first pattern byte,
# and then matching the rest in one step using a strided array.
# The hope is that this gives a good compromise in speed: not
# iterate in python for each pattern byte, yet not doing pattern
# size times more comparisons than needed in C.
if mask is None:
match = data[:-pattern.size] == pattern[0]
else:
match = ((data[:-pattern.size] ^ pattern[0]) & mask[0]) == 0
matches = np.nonzero(match)[0]
# Re-stride data so that it looks like each element is followed by
# all others, and check those all in one go.
strided = as_strided(data[1:], strides=(1, 1),
shape=(size, pattern.size-1))
if mask is None:
match = strided[matches] == pattern[1:]
else:
match = ((strided[matches] ^ pattern[1:]) & mask[1:]) == 0
matches = matches[match.all(-1)]
if not forward:
# Order by proximity to the file position.
matches = matches[::-1]
# Convert matches to a list of actual file positions.
matches += start - offset
matches = matches.tolist()
# Keep only matches for which
# (1) the location is in the requested range of current +/- maximum,
# (2) the associated frames completely fit in the file, and
# (3) the pattern is also present at the requested check points.
# The range in positions actually wanted. Here, we guarantee not
# only (1) but also (2) by using the recalculated `stop` from above,
# which is limited by the file size.
loc_start = max(seek_start, 0)
loc_stop = min(seek_start+maximum+1, stop-frame_nbytes+1)
# And the range in which it was possible to check positions.
check_start = start
check_stop = stop-offset-pattern.size
locations = [loc for loc in matches
if (loc_start <= loc < loc_stop
and all(c in matches for c in loc+check
if check_start <= c < check_stop))]
return locations
[docs] def get_frame_rate(self, offset=0):
"""Determine the number of frames per second.
The method cycles through headers, starting from the start of the file,
finding the largest frame number before it jumps back to 0 for a new
second.
Parameters
----------
offset : int or None, optional
Offset at which to start searching. Default is 0, i.e., the start
of the file. Use `None` to start at the current position.
Returns
-------
frame_rate : `~astropy.units.Quantity`
Frames per second.
Raises
------
`EOFError`
If the file contains less than one second of data.
"""
with self.temporary_offset(offset):
header = header0 = self.read_header()
frame_nr0 = header0['frame_nr']
while header['frame_nr'] == frame_nr0:
self.seek(header.payload_nbytes, 1)
header = self.read_header()
max_frame = frame_nr0
while header['frame_nr'] > 0:
max_frame = max(header['frame_nr'], max_frame)
self.seek(header.payload_nbytes, 1)
header = self.read_header()
return (max_frame + 1) * u.Hz
[docs]class StreamBase:
"""Baseband file wrapper, allowing access as a stream of data.
Common methods between stream readers and writers, mostly just
dealing with getting the arguments on initialization and providing
access to them via properties.
"""
# Apart from verify, instances are meant to be immutable.
_sample_shape_maker = None
_frame_index = None
def __init__(self, fh_raw, header0, *, squeeze=True, **kwargs):
# Required arguments.
self.fh_raw = fh_raw
self._header0 = header0
# Arguments with defaults.
self._squeeze = bool(squeeze)
# Arguments that can override or complement information from header.
for attr, getter in [
('bps', operator.index),
('complex_data', bool),
('samples_per_frame', operator.index),
('sample_shape', tuple),
('sample_rate', None)]:
value = kwargs.pop(attr, None)
if value is None:
value = getattr(header0, attr, None)
if getter is not None and value is not None:
value = getter(value)
setattr(self, '_'+attr, value)
if kwargs:
raise TypeError('got unexpected keyword(s): {}'
.format(', '.join(kwargs.keys())))
# Pre-calculate.
self._frame_rate = (self.sample_rate / self.samples_per_frame).to(u.Hz)
# Initialize.
self.offset = 0
# Ensure that we have a sample_shape.
self.sample_shape
@property
def squeeze(self):
"""Whether data arrays have dimensions with length unity removed.
If `True`, data read out has such dimensions removed, and data
passed in for writing has them inserted.
"""
return self._squeeze
@property
def _unsliced_shape(self):
unsliced_shape = self._sample_shape
if self._sample_shape_maker is not None:
return self._sample_shape_maker(*unsliced_shape)
else:
return unsliced_shape
@lazyproperty
def sample_shape(self):
"""Shape of a complete sample (possibly subset or squeezed)."""
if not self.squeeze:
return self._unsliced_shape
sample_shape = self._unsliced_shape
squeezed_shape = tuple(dim for dim in sample_shape if dim > 1)
fields = getattr(sample_shape, '_fields', None)
if fields is None:
return squeezed_shape
fields = [field for field, dim in zip(fields, sample_shape) if dim > 1]
shape_cls = namedtuple('SampleShape', ','.join(fields))
return shape_cls(*squeezed_shape)
def _get_time(self, header):
"""Get time from a header."""
# Subclasses can override this if information is needed beyond that
# provided in the header.
return header.time
def _set_time(self, header, time):
"""Set time in a header."""
# Subclasses can override this if information is needed beyond that
# provided in the header. Use update since that some classes will
# do extra work after any setting (e.g., CRC update).
header.update(time=time)
def _get_index(self, header):
"""Infer the index of the frame header relative to the first frame."""
# This base implementation uses the time, but can be overridden
# with faster methods in subclasses.
dt = self._get_time(header) - self.start_time
return int(round((dt * self._frame_rate).to_value(u.one)))
def _set_index(self, header, index):
"""Set frame header index relative to the first frame."""
# Can be overridden if there is a simpler way than using the time,
# or if other properties need to be set.
self._set_time(header,
time=self.start_time + index / self._frame_rate)
@lazyproperty
def start_time(self):
"""Start time of the file.
See also `time` for the time of the sample pointer's current offset.
"""
return self._get_time(self.header0)
@property
def time(self):
"""Time of the sample pointer's current offset in file.
See also `start_time` for the start time of the file.
"""
return self.tell(unit='time')
@property
def header0(self):
"""First header of the file."""
return self._header0
@property
def bps(self):
"""Bits per elementary sample."""
return self._bps
@property
def complex_data(self):
"""Whether the data are complex."""
return self._complex_data
@property
def samples_per_frame(self):
"""Number of complete samples per frame."""
return self._samples_per_frame
@property
def sample_rate(self):
"""Number of complete samples per second."""
return self._sample_rate
[docs] def tell(self, unit=None):
"""Current offset in the file.
Parameters
----------
unit : `~astropy.units.Unit` or str, optional
Time unit the offset should be returned in. By default, no unit
is used, i.e., an integer enumerating samples is returned. For the
special string 'time', the absolute time is calculated.
Returns
-------
offset : int, `~astropy.units.Quantity`, or `~astropy.time.Time`
Offset in current file (or time at current position).
"""
if unit is None:
return self.offset
# "isinstance" avoids costly comparisons of an actual unit with 'time'.
if not isinstance(unit, u.UnitBase) and unit == 'time':
return self.start_time + self.tell(unit=u.s)
return (self.offset / self.sample_rate).to(unit)
def __getattr__(self, attr):
"""Try to get things on the current open file if it is not on self."""
if attr in {'readable', 'writable', 'seekable', 'closed', 'name'}:
return getattr(self.fh_raw, attr)
# __getattribute__ to raise appropriate error.
return self.__getattribute__(attr)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
[docs] def close(self):
self.fh_raw.close()
def __repr__(self):
return ("<{s.__class__.__name__} name={s.name} offset={s.offset}\n"
" sample_rate={s.sample_rate},"
" samples_per_frame={s.samples_per_frame},\n"
" sample_shape={s.sample_shape}, bps={s.bps},\n"
" {sub}start_time={s.start_time.isot}>"
.format(s=self, sub=('subset={0}, '.format(self.subset)
if self.subset else '')))
[docs]class StreamReaderBase(StreamBase):
"""Base for all stream readers, providing standard reading methods.
Parameters
----------
fh_raw : filehandle
Should be opened in binary mode for reading, and usually wrapped
in a given format;s ``FileReader``.
header0 : header instance
First header of the file, with information like the start time, etc.
squeeze : bool, optional
If `True` (default), remove any dimensions of length unity from
decoded data.
subset : indexing object or tuple of objects, optional
Specific components of the complete sample to decode (after possible
squeezing).
fill_value : float or complex, optional
Value to use for invalid or missing data. Default: 0.
verify : bool, optional
Whether to do basic checks of frame integrity when reading. The first
frameset of the stream is always checked. Default: `True`.
**kwargs
Information that supplements that of the header. In particular, any
format should define ``bps``, ``complex_data``, ``samples_per_frame``,
and ``sample_shape`` (pre-squeeze and subset). Furthermore,
``sample_rate`` should be provided if the raw file reader cannot
determine it.
Notes
-----
Accessing frames happens via a number of private methods, which can be
overridden by subclasses to deal with their peculiarities.
The process starts in `~baseband.base.base.StreamReaderBase.read` with
``_get_frame(offset)``, which determines the ``index`` of the frame a
given offset falls in, and then either uses ``_read_frame(index)`` to
read it or returns a possibly cached frame, together with the
``sample_offset`` at which ``offset`` falls within the frame. Then, data
is retrieved as needed until the frame is exhausted, at which point the
process repeats.
The ``_read_frame(index)`` method in turn takes a number of steps.
First, the binary file reader is moved to the correct position using
``_seek_frame(index)``, then for the base reader, the actual reading is
done using ``_fh_raw_read_frame()`` and a check is made that the right
frame was read.
Various formats override various steps. For instance, DADA overrides
``_fh_raw_read_frame()`` to take care of last frames, which are often
longer than the fixed lengths recorded in the header. For GUPPI,
``_get_frame`` is overridden to take into account overlap between
frames, which should not be skipped for the last frame. For GSB,
``_seek_frame`` and ``_fh_raw_read_frame`` are both overridden to take
into account that header and data are read from different files.
"""
info = StreamReaderInfo()
def __init__(self, fh_raw, header0, *,
squeeze=True, subset=(), fill_value=0., verify=True,
**kwargs):
self._subset = (() if subset is None
else subset if isinstance(subset, tuple)
else (subset,))
self._fill_value = float(fill_value)
self.verify = verify
super().__init__(fh_raw, header0, squeeze=squeeze, **kwargs)
@lazyproperty
def sample_rate(self):
sample_rate = super().sample_rate
if sample_rate is None:
try:
sample_rate = (self.samples_per_frame
* self.fh_raw.get_frame_rate()).to(u.MHz)
except Exception as exc:
exc.args += ("the sample rate could not be auto-detected. "
"This can happen if the file is too short to "
"determine the sample rate, or because it is "
"corrupted. Try passing in an explicit "
"`sample_rate`.",)
raise
return sample_rate
@property
def verify(self):
"""Whether to do consistency checks on frames being read."""
return self._verify
@verify.setter
def verify(self, verify):
self._verify = bool(verify) if verify != 'fix' else verify
@property
def subset(self):
"""Specific components of the complete sample to decode.
The order of dimensions is the same as for `sample_shape`. Set by
the class initializer.
"""
return self._subset
def _squeeze_and_subset(self, data):
"""Possibly remove unit dimensions and subset the given data.
The first dimension (sample number) is never removed.
"""
if self.squeeze:
data = data.reshape(data.shape[:1]
+ tuple(sh for sh in data.shape[1:] if sh > 1))
if self.subset:
data = data[(slice(None),) + self.subset]
return data
@lazyproperty
def sample_shape(self):
"""Shape of a complete sample (possibly subset or squeezed)."""
# First apply base class, which squeezes if needed.
sample_shape = super().sample_shape
if not self.subset:
return sample_shape
# Now apply subset to a dummy sample that has the sample number as its
# value (where 13 is to bring bad luck to over-complicated subsets).
dummy_data = np.arange(13.)
dummy_sample = np.moveaxis(
(np.zeros(sample_shape)[..., np.newaxis] + dummy_data), -1, 0)
try:
dummy_subset = dummy_sample[(slice(None),) + self.subset]
# Check whether subset was in range and whether sample numbers were
# preserved (latter should be, but check anyway).
assert 0 not in dummy_subset.shape
assert np.all(np.moveaxis(dummy_subset, 0, -1) == dummy_data)
except (IndexError, AssertionError) as exc:
exc.args += ("subset {} cannot be used to properly index "
"{}samples with shape {}.".format(
self.subset, "squeezed " if self.squeeze else "",
sample_shape),)
raise exc
# We got the shape. We only bother trying to associate names with the
# dimensions when we know for sure what happened in the subsetting.
subset_shape = dummy_subset.shape[1:]
if (not hasattr(sample_shape, '_fields') or subset_shape == ()
or len(self.subset) > len(sample_shape)):
return subset_shape
# We can only associate names when indexing each dimension separately
# gives a consistent result with the complete subset.
subset_axis = 0
fields = []
subset = self.subset + (slice(None),) * (len(sample_shape)
- len(self.subset))
try:
for field, sample_dim, item in zip(sample_shape._fields,
sample_shape, subset):
subset_dim = np.empty(sample_dim)[item].shape
assert len(subset_dim) <= 1 # No advanced multi-d indexing.
if len(subset_dim) == 1:
# If this dimension was not removed and matches that
# of the real subset, we now have a field label for it.
assert subset_dim[0] == subset_shape[subset_axis]
fields.append(field)
subset_axis += 1
except Exception:
# Things did not make sense, probably some advanced indexing;
# Just don't worry about having a named tuple.
return subset_shape
shape_cls = namedtuple('SampleShape', ','.join(fields))
return shape_cls(*subset_shape)
@lazyproperty
def _last_header(self):
"""Header of the last file for this stream."""
# Base implementation. Seek forward rather than backward, as last
# frame can have missing bytes.
with self.fh_raw.temporary_offset() as fh_raw:
file_size = fh_raw.seek(0, 2)
nframes, fframe = divmod(file_size, self.header0.frame_nbytes)
fh_raw.seek((nframes - 1) * self.header0.frame_nbytes)
return fh_raw.read_header()
# Override the following so we can refer to stop_time in the docstring.
@property
def start_time(self):
"""Start time of the file.
See also `time` for the time of the sample pointer's current offset,
and (if available) `stop_time` for the time at the end of the file.
"""
return super().start_time
@property
def time(self):
"""Time of the sample pointer's current offset in file.
See also `start_time` for the start time, and (if available)
`stop_time` for the end time, of the file.
"""
return super().time
@lazyproperty
def stop_time(self):
"""Time at the end of the file, just after the last sample.
See also `start_time` for the start time of the file, and `time` for
the time of the sample pointer's current offset.
"""
return (self._get_time(self._last_header)
+ (self.samples_per_frame / self.sample_rate))
@lazyproperty
def _nsample(self):
"""Number of complete samples in the stream data."""
return int(((self.stop_time - self.start_time)
* self.sample_rate).to(u.one).round())
@property
def shape(self):
"""Shape of the (squeezed/subset) stream data."""
return (self._nsample,) + self.sample_shape
@property
def size(self):
"""Total number of component samples in the (squeezed/subset) stream
data.
"""
prod = 1
for dim in self.shape:
prod *= dim
return prod
@property
def ndim(self):
"""Number of dimensions of the (squeezed/subset) stream data."""
return len(self.shape)
@property
def fill_value(self):
"""Value to use for invalid or missing data. Default: 0."""
return self._fill_value
@property
def dtype(self):
# TODO: arguably, this should be inferred from an actual payload.
return np.dtype(np.complex64 if self.complex_data else np.float32)
[docs] def readable(self):
"""Whether the file can be read and decoded."""
return self.fh_raw.readable and self.fh_raw.info.readable
[docs] def seek(self, offset, whence=0):
"""Change the sample pointer position.
This works like a normal filehandle seek, but the offset is in samples
(or a relative or absolute time).
Parameters
----------
offset : int, `~astropy.units.Quantity`, or `~astropy.time.Time`
Offset to move to. Can be an (integer) number of samples,
an offset in time units, or an absolute time. For the latter
two, the pointer will be moved to the nearest integer sample.
whence : {0, 1, 2, 'start', 'current', or 'end'}, optional
Like regular seek, the offset is taken to be from the start if
``whence=0`` (default), from the current position if 1,
and from the end if 2. One can alternativey use 'start',
'current', or 'end' for 0, 1, or 2, respectively. Ignored if
``offset`` is a time.
"""
try:
offset = operator.index(offset)
except Exception:
try:
offset = offset - self.start_time
except Exception:
pass
else:
whence = 0
offset = int((offset * self.sample_rate).to(u.one).round())
if whence == 0 or whence == 'start':
self.offset = offset
elif whence == 1 or whence == 'current':
self.offset += offset
elif whence == 2 or whence == 'end':
self.offset = self.shape[0] + offset
else:
raise ValueError("invalid 'whence'; should be 0 or 'start', 1 or "
"'current', or 2 or 'end'.")
return self.offset
[docs] def read(self, count=None, out=None):
"""Read a number of complete samples.
Parameters
----------
count : int or None, optional
Number of complete samples to read. If `None` (default) or
negative, the number of samples left. Ignored if ``out`` is given.
out : None or array, optional
Array to store the samples in. If given, ``count`` will be inferred
from the first dimension; the remaining dimensions should equal
`sample_shape`.
Returns
-------
out : `~numpy.ndarray` of float or complex
The first dimension is sample-time, and the remaining ones are
as given by `sample_shape`.
"""
if self.closed:
raise ValueError("I/O operation on closed stream.")
samples_left = self.shape[0] - self.offset
if out is None:
if count is None or count < 0:
count = max(0, samples_left)
out = np.empty((count,) + self.sample_shape, dtype=self.dtype)
else:
assert out.shape[1:] == self.sample_shape, (
"'out' must have trailing shape {}".format(self.sample_shape))
count = out.shape[0]
if count > samples_left:
raise EOFError("cannot read from beyond end of input.")
offset0 = self.offset
sample = 0
while sample < count:
# For current position, get frame plus offset in that frame.
frame, sample_offset = self._get_frame(self.offset)
nsample = min(count - sample, len(frame) - sample_offset)
data = frame[sample_offset:sample_offset + nsample]
data = self._squeeze_and_subset(data)
# Copy to relevant part of output.
out[sample:sample + nsample] = data
sample += nsample
# Explicitly set offset (leaving get_frame free to adjust it).
self.offset = offset0 + sample
return out
def _get_frame(self, offset):
"""Get a frame that includes given offset.
Finds the index corresponding to the needed frame, assuming frames
are all the same length. If not already cached, it retrieves a
frame by calling ``self._read_frame(index)``.
Parameters
----------
offset : int
Offset in the stream for which a frame should be found.
Returns
-------
frame : `~baseband.base.frame.FrameBase`
Frame holding the sample at ``offset``.
sample_offset : int
Offset within the frame corresponding to ``offset``.
"""
frame_index, sample_offset = divmod(offset, self.samples_per_frame)
if frame_index != self._frame_index:
self._frame = self._read_frame(frame_index)
self._frame.fill_value = self.fill_value
self._frame_index = frame_index
return self._frame, sample_offset
def _read_frame(self, index):
"""Base implementation of reading a frame.
It cannot handle bad frames, and contains pieces which subclasses
can override as needed (or override the whole thing).
"""
self._seek_frame(index)
frame = self._fh_raw_read_frame()
# Check whether we actually got the right frame.
if self.verify and self._get_index(frame) != index:
raise ValueError('wrong frame number.')
return frame
def _seek_frame(self, index):
"""Move the underlying file pointer to the frame of the given index."""
return self.fh_raw.seek(index * self.header0.frame_nbytes)
def _fh_raw_read_frame(self):
"""Read a frame at the current position of the underlying file."""
return self.fh_raw.read_frame(verify=self.verify)
def __getstate__(self):
state = self.__dict__.copy()
# Remove cached frame and associate index, since these
# are almost certainly not needed and can be regenerated.
# Sample_shape can also be regenerated, yet not easily pickled,
# if the namedtuple is defined on the class, so we remove it too.
for item in ('_frame', '_frame_index', 'sample_shape'):
state.pop(item, None)
return state
def __setstate__(self, state):
self.__dict__.update(state)
[docs]class VLBIStreamReaderBase(StreamReaderBase):
"""Base for VLBI stream readers.
This extends the base class specifically for the case that the underlying
file has small subsequent frames, which may have gaps, and adds methods
that allow fixing these.
Arguments are as for `~baseband.base.base.StreamReaderBase`.
Notes
-----
The code assumes that the raw file reader has a ``find_header`` method,
which can be used to find next or previous frames from any position, and
thus help identify and skip bad frames. It is used in ``_last_header``
to be sure to find a full last frame.
The class also overrides the ``_read_frame`` method which one that checks
that the frame after the one requested is the right one, and attemps
recovery if anything fails, using ``_bad_frame(index, frame, exc)``.
Subclasses can overrides parts of the class. For instance, VDIF overrides
``_fh_raw_read_frame`` and ``_bad_frame`` to take care of the fact that
data comes in sets of frames from multiple threads, not individual ones.
"""
_next_index = None
def __init__(self, fh_raw, header0, **kwargs):
super().__init__(fh_raw, header0, **kwargs)
self._raw_offsets = RawOffsets(frame_nbytes=self.header0.frame_nbytes)
@lazyproperty
def _last_header(self):
"""Last header of the file."""
with self.fh_raw.temporary_offset(
-self.header0.frame_nbytes, 2) as fh_raw:
try:
return fh_raw.find_header(self.header0, forward=False,
check=(-1, 1))
except HeaderNotFoundError as exc:
exc.args += ("corrupt VLBI frame? No frame in last {0} bytes."
.format(2 * self.header0.frame_nbytes),)
raise
def _seek_frame(self, index):
"""Move the underlying file pointer to the frame of the given index."""
return self.fh_raw.seek(self._raw_offsets[index])
def _read_frame(self, index):
"""Base implementation of reading a VLBI frame.
It can handle bad frames to some extent, and contains pieces
which subclasses can override as needed.
"""
self._seek_frame(index)
if not self.verify:
return self._fh_raw_read_frame()
# If we are reading with care, we read also a frame ahead
# to make sure that is not corrupted. If such a frame has
# been kept, we use it now. Otherwise, we read a new frame.
# We always remove the cached copy, since we cannot be sure
# it is even the right number.
if index == self._next_index:
frame = self._next_frame
frame_index = index
self.fh_raw.seek(frame.nbytes, 1)
self._next_index = self._next_frame = None
else:
self._next_index = self._next_frame = None
try:
frame = self._fh_raw_read_frame()
frame_index = self._get_index(frame)
except Exception as exc:
return self._bad_frame(index, None, exc)
# Check whether we actually got the right frame.
if frame_index != index:
return self._bad_frame(index, frame,
ValueError('wrong frame number.'))
# In either case, we check the next frame (if there is one).
try:
with self.fh_raw.temporary_offset():
self._next_frame = self._fh_raw_read_frame()
self._next_index = self._get_index(self._next_frame)
except Exception as exc:
return self._bad_frame(index, frame, exc)
return frame
def _bad_frame(self, index, frame, exc):
"""Deal with a bad frame.
Parameters
----------
index : int
Frame index that is to be read.
frame : `~baseband.base.frame.VLBIFrameBase` or None
Frame that was read without failure. If not `None`, either
the frame index is wrong or the next frame could not be read.
exc : Exception
Exception that led to the call.
"""
if (frame is not None and self._get_index(frame) == index
and index == self._get_index(self._last_header)):
# If we got an exception because we're trying to read beyond the
# last frame, the frame is almost certainly OK, so keep it.
return frame
if self.verify != 'fix':
raise exc
msg = 'problem loading frame {}.'.format(index)
# Where should we be?
raw_offset = self._seek_frame(index)
# See if we're in the right place. First ensure we have a header.
# Here, it is more important that it is a good one than that we go
# too far, so we insist on two consistent frames after it. We
# increase the maximum a bit to be able to jump over bad bits.
self.fh_raw.seek(raw_offset)
try:
header = self.fh_raw.find_header(
self.header0, forward=True, check=(1, 2),
maximum=3*self.header0.frame_nbytes)
except HeaderNotFoundError:
exc.args += (msg + ' Cannot find header nearby.',)
raise exc
# Don't yet know how to deal with excess data.
header_index = self._get_index(header)
if header_index < index:
exc.args += (msg + ' There appears to be excess data.',)
raise exc
# Go backward until we find previous frame, storing offsets
# as we go. We again increase the maximum since we may need
# to jump over a bad bit.
while header_index >= index:
raw_pos = self.fh_raw.tell()
header1_index = header_index
self.fh_raw.seek(-1, 1)
try:
header = self.fh_raw.find_header(
self.header0, forward=False,
maximum=4*self.header0.frame_nbytes)
except HeaderNotFoundError:
exc.args += (msg + ' Could not find previous index.',)
raise exc
header_index = self._get_index(header)
# While we are at it, update the list of known indices.
self._raw_offsets[header1_index] = raw_pos
# Move back to position of last good header (header1_index).
self.fh_raw.seek(raw_pos)
if header1_index > index:
# Frame is missing!
msg += ' The frame seems to be missing.'
# Just reuse old frame with new index and set to invalid.
frame = self._frame
frame.header.mutable = True
frame.valid = False
self._set_index(frame, index)
else:
assert header1_index == index, \
'at this point, we should have a good header.'
if raw_pos != raw_offset:
msg += ' Stream off by {0} bytes.'.format(raw_offset
- raw_pos)
# Above, we should have added information about
# this index in our offset table.
assert index in self._raw_offsets.frame_nr
# At this point, reading the frame should always work,
# and we know there is a header right after it.
frame = self._fh_raw_read_frame()
assert self._get_index(frame) == index
warnings.warn(msg)
return frame
def __getstate__(self):
state = super().__getstate__()
# Remove additional cached frames and indices.
for item in ('_next_frame', '_next_index'):
state.pop(item, None)
return state
[docs]class StreamWriterBase(StreamBase):
"""Base for all stream writers, providing a standard write method.
Parameters
----------
fh_raw : filehandle
Should be opened in binary mode for writer, and usually wrapped
in a given format's ``FileWriter``.
header0 : header instance
Header for the first frame, holding time information, etc. Can instead
give keyword arguments to construct a header (see ``**kwargs``).
squeeze : bool, optional
If `True` (default), writer accepts squeezed arrays as input, and adds
any dimensions of length unity.
**kwargs
Information that supplements that of the header. In particular, any
format should define ``bps``, ``complex_data``, ``samples_per_frame``,
``sample_shape`` (on file), and ``sample_rate``.
Notes
-----
Accessing frames happens via a number of private methods, which can be
overridden by subclasses to deal with their peculiarities.
The process starts in `~baseband.base.base.StreamWriterBase.write` with
``_get_frame(offset)``, which determines the ``index`` of the frame a
given offset falls in, and then either uses ``_make_frame(index)`` to
create it, or returns a possibly cached frame, together with the
``sample_offset`` at which ``offset`` falls within the frame. Then, data
is inserted as needed until the frame is exhausted, at which point the
frame is written to disk using ``_fh_raw_write_frame`` and the process
repeats.
Various formats override various steps. For instance, for DADA and GUPPI,
``_make_frame()`` and ``_fh_raw_write_frame()`` are overridden to work
with memory mapped files. For GSB, ``_fh_raw_write_frame()`` is overridden
to take into account that header and data are read from different files.
"""
def _unsqueeze(self, data):
new_shape = list(data.shape)
for i, dim in enumerate(self._unsliced_shape):
if dim == 1:
new_shape.insert(i + 1, 1)
return data.reshape(new_shape)
[docs] def write(self, data, valid=True):
"""Write data, buffering by frames as needed.
Parameters
----------
data : `~numpy.ndarray`
Piece of data to be written, with sample dimensions as given by
`sample_shape`. This should be properly scaled to make best use
of the dynamic range delivered by the encoding.
valid : bool, optional
Whether the current data are valid. Default: `True`.
"""
assert data.shape[1:] == self.sample_shape, (
"'data' should have trailing shape {}".format(self.sample_shape))
if self.squeeze:
data = self._unsqueeze(data)
count = data.shape[0]
offset0 = self.offset
sample = 0
while sample < count:
frame, sample_offset = self._get_frame(self.offset)
nsample = min(count - sample, len(frame) - sample_offset)
sample_end = sample_offset + nsample
frame[sample_offset:sample_end] = data[sample:sample+nsample]
frame.valid &= valid
if sample_end == len(frame):
self._fh_raw_write_frame(frame)
sample += nsample
# Explicitly set offset (leaving get_frame free to adjust it).
self.offset = offset0 + sample
def _get_frame(self, offset):
# Nearly identical to StreamReaderBase version, but not
# quite worth separating it out.
frame_index, sample_offset = divmod(offset, self.samples_per_frame)
if frame_index != self._frame_index:
self._frame = self._make_frame(frame_index)
self._frame_index = frame_index
return self._frame, sample_offset
_get_frame.__doc__ = StreamReaderBase._get_frame.__doc__
def _make_frame(self, index):
# Default implementation assumes that an initial _frame was
# set up and just re-uses it with a new index.
self._set_index(self._frame, index)
self._frame.valid = True
return self._frame
def _fh_raw_write_frame(self, frame):
# Default implementation is to assume that the frame knows how to
# write itself to the underlying file.
frame.tofile(self.fh_raw)
[docs] def close(self):
extra = self.offset % self.samples_per_frame
if extra != 0:
warnings.warn("closing with partial buffer remaining. "
"Writing padded frame, marked as invalid.")
self.write(np.zeros((self.samples_per_frame - extra,)
+ self.sample_shape), valid=False)
assert self.offset % self.samples_per_frame == 0
return super().close()
[docs]class FileInfo:
"""File information collector.
The instance can be used as a function on a file name to get
information from that file, by opening it and retrieving ``info``.
Parameters
----------
opener : callable
The function to use to open files
Notes
-----
The class is perhaps most easily used via the class method
`~baseband.base.base.FileInfo.create`.
"""
def __init__(self, opener):
self.open = opener
def _get_info(self, name, mode, **kwargs):
"""Open a file in the given mode and retrieve info."""
try:
with self.open(name, mode=mode, **kwargs) as fh:
return fh.info
except Exception as exc:
if isinstance(exc, FileNotFoundError):
raise
return exc
[docs] def is_ok(self, info):
"""Wether the item returned by _get_info has valid information."""
return not isinstance(info, Exception) and info
[docs] def get_file_info(self, name, **kwargs):
"""Open a file in binary mode and retrieve info.
Any keyword arguments that were required to open the file will
be stored as a ``used_kwargs`` attribute on the returned ``info``.
Parameters
----------
name : str or filehandle
Item to be opened for reading in binary mode.
**kwargs
Any keyword arguments that might be required to open the
file successfully (e.g., ``decade`` for Mark 4).
Returns
-------
info : `~baseband.base.file_info.FileReaderInfo`
Information on the file. Will evaluate as `False` if the
file was not in the right format.
Raises
------
FileNotFoundError
If the file does not exist.
Notes
-----
Getting information for an existing file should never fail. If an
`Exception` is raised or returned, it is a bug in the file reader.
"""
info = self._get_info(name, 'rb')
# If right format, check if arguments were missing.
if self.is_ok(info):
used_kwargs = {key: kwargs[key] for key in info.missing
if key in kwargs}
if used_kwargs:
info2 = self._get_info(name, mode='rb', **used_kwargs)
if self.is_ok(info2):
info = info2
else:
# An exception due to the arguments must have occurred.
info.missing = {k: v for (k, v) in kwargs.items()
if k not in used_kwargs}
info.errors[f"kwargs={kwargs}"] = info2
info.used_kwargs = used_kwargs
return info
[docs] def get_stream_info(self, name, file_info, **kwargs):
"""Open a file in stream mode and retrieve info.
Any keyword arguments that were required to open the file will
be stored as a ``used_kwargs`` attribute on the returned ``info``.
Parameters
----------
name : str or filehandle
Item to be opened for reading in stream mode.
file_info : `~baseband.base.file_info.FileReaderInfo`
Information gleaned from opening in binary mode.
**kwargs
Any keyword arguments that might be required to open the
file successfully (e.g., ``decade`` for Mark 4).
Returns
-------
info : `~baseband.base.file_info.StreamReaderInfo`
Information on the file. Will evaluate as `False` if the
file was not in the right format. Will return `None` if no
sample rate information was present, or an `Exception` if
the opening as a stream failed.
"""
frame_rate = file_info.frame_rate
used_kwargs = file_info.used_kwargs.copy()
# Somewhat inelegant special-cases for sample_rate and verify.
if frame_rate is None:
if 'sample_rate' in kwargs:
used_kwargs['sample_rate'] = kwargs['sample_rate']
else:
# frame rate will already be marked as missing in
# file_info.
return None
if 'verify' in kwargs:
used_kwargs['verify'] = kwargs['verify']
stream_info = self._get_info(name, mode='rs', **used_kwargs)
if self.is_ok(stream_info):
stream_info.used_kwargs = used_kwargs
return stream_info
[docs] def __call__(self, name, **kwargs):
"""Collect baseband file information.
First try opening as a binary file and check whether the file is
of the correct format. If so, and no required information is missing,
re-open as a stream, and get information like the start time,
sample rate, etc.
Parameters
----------
name : str or filehandle, or sequence of str
File name, filehandle, or sequence of file names.
**kwargs
Any other arguments the opener needs to open as a stream.
Returns
-------
info
:class:`~baseband.base.file_info.FileReaderInfo` or
:class:`~baseband.base.file_info.StreamReaderInfo`.
In addition to the normal ``info`` attributes, also stored
are attributes about what happened to the keyword arguments:
``used_kwargs``, ``consistent_kwargs``, ``inconsistent_kwargs``
and ``irrelevant_kwargs``.
"""
# NOTE: getting info should never fail or even emit warnings.
# Hence, warnings or errors should not be suppressed here, but
# rather in the info implementations.
file_info = self.get_file_info(name, **kwargs)
if not file_info or file_info.missing:
return file_info
stream_info = self.get_stream_info(name, file_info, **kwargs)
if not self.is_ok(stream_info):
if isinstance(stream_info, Exception):
# Unexpected errors. Put it in file_info so there is a record.
file_info.errors['stream'] = str(stream_info)
return file_info
self.check_consistency(stream_info, **kwargs)
return stream_info
[docs] def check_consistency(self, info, **kwargs):
"""Check consistency between info and the given arguments.
The keyword arguments will be sorted into those that were used
by the file opener and those that were unused, with the latter
split in those that had consistent, inconsistent, or irrelevant
information. They are stored on the ``info`` instance in
``used_kwargs``, ``consistent_kwargs``, ``inconsistent_kwargs``
and ``irrelevant_kwargs`` attributes, respectively.
Parameters
----------
info : `~baseband.base.file_info.StreamReaderInfo`
Information gleaned from a file opened in stream reading mode.
**kwargs
Keyword arguments passed to the opener.
"""
# Store what happened to the kwargs, so one can decide if there are
# inconsistencies or other problems.
info.consistent_kwargs = {}
info.inconsistent_kwargs = {}
info.irrelevant_kwargs = {}
for key, value in kwargs.items():
if key in info.used_kwargs:
continue
consistent = self.check_key(key, value, info)
if consistent is None:
info.irrelevant_kwargs[key] = value
elif consistent:
info.consistent_kwargs[key] = value
else:
info.inconsistent_kwargs[key] = value
return info
[docs] def check_key(self, key, value, info):
"""Check consistency for a given key and value.
Parameters
----------
key : str
Name of the key.
value : object
Corresponding value.
info : `~baseband.base.file_info.StreamReaderInfo`
Information collected by opening a file in stream reader mode.
Returns
-------
consistent : True, False, or None
Whether the information on ``info`` for ``key`` is consistent
with ``value``. `None` if it could not be determined.
"""
info_value = getattr(info, key, None)
if info_value is None:
info_value = getattr(info.file_info, key, None)
if info_value is not None:
return info_value == value
if key == 'nchan':
sample_shape = info.shape[1:]
if sample_shape is not None:
# If we passed nchan, and info doesn't have it, but does have a
# sample shape, check that consistency with that, either in
# being equal to `sample_shape.nchan` or equal to the product
# of all elements (e.g., a VDIF file with 8 threads and 1
# channel per thread is consistent with nchan=8).
return (getattr(sample_shape, 'nchan', -1) == value
or np.prod(sample_shape) == value)
elif key in {'ref_time', 'kday', 'decade'}:
start_time = info.start_time
if start_time is not None:
if key == 'ref_time':
return abs(value - start_time).jd < 500
elif key == 'kday':
return int(start_time.mjd / 1000.) * 1000 == value
else: # decade
return int(start_time.isot[:3]) * 10 == value
return None
[docs] def wrapped(self, module=None, doc=None):
"""Wrap as a function named info, replacing docstring and module."""
@functools.wraps(self.__call__)
def info(*args, **kwargs):
return self(*args, **kwargs)
if doc:
info.__doc__ = doc
# This ensures the function becomes visible to sphinx.
if module:
info.__module__ = module
return info
[docs] @classmethod
def create(cls, ns):
"""Create an info getter for the given namespace.
This assumes that the namespace contains an ``open`` function, which
is used to create an instance of the info class that is wrapped in a
function with ``__module__`` set to the calling module (inferred
from the namespace).
Parameters
----------
ns : dict
Namespace to look in. Generally, pass in ``globals()`` at the
call site.
"""
module = ns.get('__name__', None)
for key in ns:
if key.endswith('StreamReader'):
fmt = key.replace('StreamReader', '')
break
else: # pragma: no cover
fmt = None
opener = ns['open']
info = cls(opener)
doc = textwrap.dedent(info.__call__.__doc__)
if (fmt is not None
and info.__call__.__doc__ is FileInfo.__call__.__doc__):
doc = doc.replace(
'Collect baseband file information.',
f'Collect {fmt} file information.')
return info.wrapped(module=module, doc=doc)
[docs]class FileOpener:
"""File opener for a baseband format.
Each instance can be used as a function to open a baseband stream.
It is probably best used inside a wrapper, so that the documentation
can reflect the docstring of ``__call__`` rather than of this class.
Parameters
----------
fmt : str
Name of the baseband format
classes : dict
With the file/stream reader/writer classes keyed by names equal to
'FileReader', 'FileWriter', 'StreamReader', 'StreamWriter' prefixed
by ``fmt``. Typically, one will pass in ``classes=globals()``.
header_class : `~baseband.base.header.VLBIHeaderBase` subclass
Used to instantiate a header from keywords as needed.
"""
FileNameSequencer = sf.FileNameSequencer
"""Sequencer used for templates."""
non_header_keys = {'squeeze', 'subset', 'fill_value', 'verify',
'file_size'}
"""keyword arguments that should never be used to create a header."""
_name = None
def __init__(self, fmt, classes, header_class):
self.fmt = fmt
self.classes = classes
self.header_class = header_class
[docs] def normalize_mode(self, mode):
if mode in self.classes:
return mode
if mode[::-1] in self.classes:
return mode[::-1]
if mode in {'r', 'w'}:
return mode + 's'
raise ValueError(f'invalid mode: {mode} '
f'({self.fmt} supports {set(self.classes)}).')
def _get_type(self, name):
try:
pathlib.Path(name)
except TypeError:
if hasattr(name, 'read') or hasattr(name, 'write'):
return 'fh'
elif isiterable(name):
return 'sequence'
raise ValueError("name '{name}' not understood.")
if isinstance(name, str) and '{' in name and '}' in name:
return 'template'
else:
return 'name'
[docs] def get_type(self, name):
"""Infer the type of file name is pointing to.
Options are 'fh', 'name', 'sequence', and 'template'.
"""
if self._name is not name:
self._type = self._get_type(name)
self._name = name
return self._type
[docs] def is_sequence(self, name):
"""Whether name is an (implied) sequence of files."""
return self.get_type(name) in ('template', 'sequence')
[docs] def is_template(self, name):
"""Whether name is a template for a sequence of files."""
return self.get_type(name) == 'template'
[docs] def is_name(self, name):
"""Whether name is a name of a file."""
return self.get_type(name) == 'name'
[docs] def is_fh(self, name):
"""Whether name is a filehandle."""
return self.get_type(name) == 'fh'
[docs] def get_fns(self, name, mode, kwargs):
"""Convert a template into a file-name sequencer.
Any keywords needed to fill the template are popped from kwargs.
"""
try:
fns_kwargs = dict(self.get_header0(kwargs))
except Exception:
fns_kwargs = {}
fns_kwargs.update(kwargs)
fns = self.FileNameSequencer(name, fns_kwargs)
for key in set(fns.items).intersection(kwargs):
kwargs.pop(key)
return fns
[docs] def get_fh(self, name, mode, kwargs={}):
"""Ensure name is a filehandle, opening it if necessary."""
if mode == 'wb' and self.is_sequence(name):
raise ValueError(f"{self.fmt} does not support writing to a "
f"sequence or template in binary mode.")
if self.is_fh(name):
return name
if self.is_template(name):
name = self.get_fns(name, mode, kwargs)
open_kwargs = {'mode': (mode[0].replace('w', 'w+')
+ mode[1].replace('s', 'b'))}
if self.is_sequence(name):
opener = sf.open
if 'file_size' in kwargs:
open_kwargs['file_size'] = kwargs.pop('file_size')
else:
opener = io.open
return opener(name, **open_kwargs)
[docs] def __call__(self, name, mode='rs', **kwargs):
"""
Open baseband file(s) for reading or writing.
Opened as a binary file, one gets a wrapped filehandle that adds
methods to read/write a frame. Opened as a stream, the handle is
wrapped further, with methods such as reading and writing to the file
as if it were a stream of samples.
Parameters
----------
name : str or filehandle, or sequence of str
File name, filehandle, sequence of file names, or template
(file name(s) can be `~pathlib.Path` but template has to be `str`).
mode : {'rb', 'wb', 'rs', or 'ws'}, optional
Whether to open for reading or writing, and as a regular binary
file or as a stream. Default: 'rs', for reading a stream.
**kwargs
Additional arguments when opening the file as a stream.
"""
mode = self.normalize_mode(mode)
if mode == 'ws':
# Stream writing always needs a header. Construct from
# other keywords if necessary.
kwargs['header0'] = self.get_header0(kwargs)
fh = self.get_fh(name, mode, kwargs)
try:
return self.classes[mode](fh, **kwargs)
except Exception:
if fh is not name:
fh.close()
raise
[docs] def wrapped(self, module=None, doc=None):
"""Wrap as a function named open, replacing docstring and module."""
@functools.wraps(self.__call__)
def open(*args, **kwargs):
return self(*args, **kwargs)
if doc:
open.__doc__ = doc
# This ensures the function becomes visible to sphinx.
if module:
open.__module__ = module
return open
[docs] @classmethod
def create(cls, ns, doc=None):
"""Create a standard opener for the given namespace.
This assumes that the namespace contains file and stream readers
and writers, as well as a header class, with standard names,
``<fmt>FileReader``, ``<fmt>FileWriter``, ``<fmt>StreamReader``,
``<fmt>StreamWriter``, and ``<fmt>Header``, where ``fmt`` is the
name of the format (which is inferred by looking for a
``*StreamReader`` entry).
The opener is instantiated using the format and the above classes,
and then a wrapping function is created with ``__module__`` set to
the ``__name__`` of the namespace, and with the documentation of
its ``__call__`` method extended with ``doc``.
Parameters
----------
ns : dict
Namespace to look in. Generally, pass in ``globals()`` at the
call site.
doc : str, optional
Extra documentation to add to that of the opener's ``__call__``
method.
"""
module = ns.get('__name__', None)
for key in ns:
if key.endswith('StreamReader'):
fmt = key.replace('StreamReader', '')
break
else: # noqa
raise ValueError('namespace does not contain a StreamReader, '
'so fmt cannot be guessed.')
classes = {mode: ns[fmt + cls_type] for (mode, cls_type) in {
'rb': 'FileReader',
'wb': 'FileWriter',
'rs': 'StreamReader',
'ws': 'StreamWriter'}.items()}
header_class = ns.get(fmt+'Header')
opener = cls(fmt, classes, header_class)
if doc is not None:
doc = textwrap.dedent(opener.__call__.__doc__) + doc
if (opener.__call__.__doc__ is FileOpener.__call__.__doc__):
doc = doc.replace(
'Open baseband file(s) for reading or writing.',
f'Open {fmt} file(s) for reading or writing.')
return opener.wrapped(module=module, doc=doc)