# Licensed under the GPLv3 - see LICENSE
import io
import warnings
import operator
from collections import namedtuple
from contextlib import contextmanager
import numpy as np
from numpy.lib.stride_tricks import as_strided
import astropy.units as u
from astropy.utils import lazyproperty
from ..helpers import sequentialfile as sf
from .offsets import RawOffsets
from .file_info import VLBIFileReaderInfo, VLBIStreamReaderInfo
from .utils import byte_array
__all__ = ['HeaderNotFoundError',
'VLBIFileBase', 'VLBIFileReaderBase', 'VLBIStreamBase',
'VLBIStreamReaderBase', 'VLBIStreamWriterBase',
'make_opener']
[docs]class VLBIFileBase:
"""VLBI file wrapper, used to add frame methods to a binary data file.
The underlying file is stored in ``fh_raw`` and all attributes that do not
exist on the class itself are looked up on it.
Parameters
----------
fh_raw : filehandle
Filehandle of the raw binary data file.
"""
def __init__(self, fh_raw):
self.fh_raw = fh_raw
def __getattr__(self, attr):
"""Try to get things on the current open file if it is not on self."""
if not attr.startswith('_'):
try:
return getattr(self.fh_raw, attr)
except AttributeError:
pass
# __getattribute__ to raise appropriate error.
return self.__getattribute__(attr)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
[docs] def close(self):
self.fh_raw.close()
[docs] @contextmanager
def temporary_offset(self):
"""Context manager for temporarily seeking to another file position.
To be used as part of a ``with`` statement::
with fh_raw.temporary_offset() [as fh_raw]:
with-block
On exiting the ``with-block``, the file pointer is moved back to its
original position.
"""
oldpos = self.tell()
try:
yield self
finally:
self.seek(oldpos)
def __repr__(self):
return "{0}(fh_raw={1})".format(self.__class__.__name__, self.fh_raw)
[docs]class VLBIFileReaderBase(VLBIFileBase):
"""VLBI wrapped file reader base class.
Typically, a subclass will define ``read_header``, ``read_frame``,
and ``find_header`` methods. This baseclass includes a `get_frame_rate`
method which determines the frame rate by scanning the file for headers,
looking for the maximum frame number that occurs before the jump down
for the next second. This method requires the subclass to define a
``read_header`` method and assumes headers have a 'frame_nr' item, and
define a ``payload_nbytes`` property (as do all standard VLBI formats).
Parameters
----------
fh_raw : filehandle
Filehandle of the raw binary data file.
"""
info = VLBIFileReaderInfo()
[docs] def locate_frames(self, pattern, *, mask=None, frame_nbytes=None,
offset=0, forward=True, maximum=None, check=1):
"""Use a pattern to locate frame starts near the current position.
Note that the current position is always included.
Parameters
----------
pattern : header, ~numpy.ndaray, bytes, int, or iterable of int
Synchronization pattern to look for. If a header or header class,
:meth:`~baseband.vlbi_base.header.VLBIHeaderBase.invariant_pattern`
is used to create a masked pattern, using invariant keys from
:meth:`~baseband.vlbi_base.header.VLBIHeaderBase.invariants`.
If an `~numpy.ndarray` or `bytes` instance, a byte array view is
taken. If an (iterable of) int, the integers need to be unsigned
32 bit and will be interpreted as little-endian.
mask : ~numpy.ndarray, bytes, int, or iterable of int.
Bit mask for the pattern, with 1 indicating a given bit will
be used the comparison.
frame_nbytes : int, optional
Frame size in bytes. Defaults to the frame size in any header
passed in.
offset : int, optional
Offset from the frame start that the pattern occurs. Any
offsets inferred from masked entries are added to this (hence,
no offset needed when a header is passed in as ``pattern``).
forward : bool, optional
Seek forward if `True` (default), backward if `False`.
maximum : int, optional
Maximum number of bytes to search away from the present location.
Default: search twice the frame size if given, otherwise 1 million
(extra bytes to avoid partial patterns will be added).
Use 0 to check only at the current position.
check : int or tuple of int, optional
Frame offsets where another sync pattern should be present
(if inside the file). Ignored if ``frame_nbytes`` is not given.
Default: 1, i.e., a sync pattern should be present one
frame after the one found (independent of ``forward``),
thus helping to guarantee the frame is not corrupted.
Returns
-------
locations : list of int
Locations of sync patterns within the range scanned,
in order of proximity to the starting position.
"""
if hasattr(pattern, 'invariant_pattern'):
if frame_nbytes is None:
frame_nbytes = pattern.frame_nbytes
pattern, mask = pattern.invariant_pattern()
pattern = byte_array(pattern)
if mask is not None:
mask = byte_array(mask)
useful = np.nonzero(mask)[0]
useful_slice = slice(useful[0], useful[-1]+1)
mask = mask[useful_slice]
pattern = pattern[useful_slice]
offset += useful_slice.start
if maximum is None:
maximum = (2 * frame_nbytes if frame_nbytes else 1000000) - 1
if check is None or frame_nbytes is None:
check = np.array([], dtype=int)
check_min = check_max = 0
else:
check = np.atleast_1d(check) * frame_nbytes
# For Numpy >=1.15, can just be check.min(initial=0) in the
# calculation of start, stop below.
check_min = min(check.min(), 0)
check_max = max(check.max(), 0)
if frame_nbytes is None:
# If not set, we just let it stand in for the extra bytes
# that should be read to ensure we can even see a pattern.
frame_nbytes = offset + pattern.size
with self.temporary_offset() as fh:
# Calculate the fiducial start of the region we are looking in.
if forward:
seek_start = fh.tell()
else:
seek_start = fh.tell() - maximum
# Determine what part of the file to read, including the extra
# bits for doing the checking. Note that we ensure not to start
# before the start of the file, but we do not check for ending
# after the end of the file, as we want to avoid a possibly
# expensive seek (e.g., if the file is a SequentialFile of a lot
# of individual files). We rely instead on reading fewer bytes
# than requested if we are close to the end.
start = max(seek_start + offset + check_min, 0)
stop = max(seek_start + maximum + 1 + check_max + frame_nbytes,
start)
fh.seek(start)
data = fh.read(stop-start)
# Since we may have hit the end of the file, check what the actual
# stop position was (needed at end).
stop = start + len(data)
# We normally have read more than really needed (to check for EOF);
# select what we actually want and convert to an array of bytes.
size = min(maximum + 1 + check_max - check_min,
stop - start - pattern.size)
if size <= 0:
return []
data = np.frombuffer(data, dtype='u1', count=size+pattern.size)
# We match in two steps, first matching just the first pattern byte,
# and then matching the rest in one step using a strided array.
# The hope is that this gives a good compromise in speed: not
# iterate in python for each pattern byte, yet not doing pattern
# size times more comparisons than needed in C.
if mask is None:
match = data[:-pattern.size] == pattern[0]
else:
match = ((data[:-pattern.size] ^ pattern[0]) & mask[0]) == 0
matches = np.nonzero(match)[0]
# Re-stride data so that it looks like each element is followed by
# all others, and check those all in one go.
strided = as_strided(data[1:], strides=(1, 1),
shape=(size, pattern.size-1))
if mask is None:
match = strided[matches] == pattern[1:]
else:
match = ((strided[matches] ^ pattern[1:]) & mask[1:]) == 0
matches = matches[match.all(-1)]
if not forward:
# Order by proximity to the file position.
matches = matches[::-1]
# Convert matches to a list of actual file positions.
matches += start - offset
matches = matches.tolist()
# Keep only matches for which
# (1) the location is in the requested range of current +/- maximum,
# (2) the associated frames completely fit in the file, and
# (3) the pattern is also present at the requested check points.
# The range in positions actually wanted. Here, we guarantee not
# only (1) but also (2) by using the recalculated `stop` from above,
# which is limited by the file size.
loc_start = max(seek_start, 0)
loc_stop = min(seek_start+maximum+1, stop-frame_nbytes+1)
# And the range in which it was possible to check positions.
check_start = start
check_stop = stop-offset-pattern.size
locations = [loc for loc in matches
if (loc_start <= loc < loc_stop
and all(c in matches for c in loc+check
if check_start <= c < check_stop))]
return locations
[docs] def get_frame_rate(self):
"""Determine the number of frames per second.
The method cycles through headers, starting from the start of the file,
finding the largest frame number before it jumps back to 0 for a new
second.
Returns
-------
frame_rate : `~astropy.units.Quantity`
Frames per second.
Raises
------
`EOFError`
If the file contains less than one second of data.
"""
with self.temporary_offset():
self.seek(0)
header = header0 = self.read_header()
frame_nr0 = header0['frame_nr']
while header['frame_nr'] == frame_nr0:
self.seek(header.payload_nbytes, 1)
header = self.read_header()
max_frame = frame_nr0
while header['frame_nr'] > 0:
max_frame = max(header['frame_nr'], max_frame)
self.seek(header.payload_nbytes, 1)
header = self.read_header()
return (max_frame + 1) * u.Hz
[docs]class VLBIStreamBase:
"""VLBI file wrapper, allowing access as a stream of data."""
_sample_shape_maker = None
def __init__(self, fh_raw, header0, sample_rate, samples_per_frame,
unsliced_shape, bps, complex_data, squeeze, subset=(),
fill_value=0., verify=True):
self.fh_raw = fh_raw
self._header0 = header0
self._bps = bps
self._complex_data = complex_data
self.samples_per_frame = samples_per_frame
self.sample_rate = sample_rate
self._frame_rate = (sample_rate / samples_per_frame).to(u.Hz)
self.offset = 0
self._fill_value = fill_value
if self._sample_shape_maker is not None:
self._unsliced_shape = self._sample_shape_maker(*unsliced_shape)
else:
self._unsliced_shape = unsliced_shape
self._squeeze = bool(squeeze)
if subset is None:
subset = ()
elif not isinstance(subset, tuple):
subset = (subset,)
self._subset = subset
self._sample_shape = self._get_sample_shape()
self._frame_index = None
self.verify = verify
@property
def squeeze(self):
"""Whether data arrays have dimensions with length unity removed.
If `True`, data read out has such dimensions removed, and data
passed in for writing has them inserted.
"""
return self._squeeze
@property
def subset(self):
"""Specific components of the complete sample to decode.
The order of dimensions is the same as for `sample_shape`. Set by
the class initializer.
"""
return self._subset
def _get_sample_shape(self):
"""Get shape of possibly squeezed samples."""
if not self.squeeze:
return self._unsliced_shape
sample_shape = self._unsliced_shape
squeezed_shape = tuple(dim for dim in sample_shape if dim > 1)
fields = getattr(sample_shape, '_fields', None)
if fields is None:
return squeezed_shape
fields = [field for field, dim in zip(fields, sample_shape) if dim > 1]
shape_cls = namedtuple('SampleShape', ','.join(fields))
return shape_cls(*squeezed_shape)
@property
def sample_shape(self):
"""Shape of a complete sample (possibly subset or squeezed)."""
return self._sample_shape
def _get_time(self, header):
"""Get time from a header."""
# Subclasses can override this if information is needed beyond that
# provided in the header.
return header.time
def _set_time(self, header, time):
"""Set time in a header."""
# Subclasses can override this if information is needed beyond that
# provided in the header.
header.time = time
@lazyproperty
def start_time(self):
"""Start time of the file.
See also `time` for the time of the sample pointer's current offset.
"""
return self._get_time(self.header0)
@property
def time(self):
"""Time of the sample pointer's current offset in file.
See also `start_time` for the start time of the file.
"""
return self.tell(unit='time')
@property
def header0(self):
"""First header of the file."""
return self._header0
@property
def bps(self):
"""Bits per elementary sample."""
return self._bps
@property
def complex_data(self):
"""Whether the data are complex."""
return self._complex_data
@property
def samples_per_frame(self):
"""Number of complete samples per frame."""
return self._samples_per_frame
@samples_per_frame.setter
def samples_per_frame(self, samples_per_frame):
try:
self._samples_per_frame = operator.index(samples_per_frame)
except Exception as exc:
exc.args += ("samples per frame must have an integer value.",)
raise exc
@property
def sample_rate(self):
"""Number of complete samples per second."""
return self._sample_rate
@sample_rate.setter
def sample_rate(self, sample_rate):
# Check if sample_rate is a time rate.
try:
sample_rate.to(u.Hz)
except u.UnitsError as exc:
exc.args += ("sample rate must have units of 1 / time.",)
raise
self._sample_rate = sample_rate
@property
def verify(self):
"""Whether to do consistency checks on frames being read."""
return self._verify
@verify.setter
def verify(self, verify):
self._verify = bool(verify) if verify != 'fix' else verify
[docs] def tell(self, unit=None):
"""Current offset in the file.
Parameters
----------
unit : `~astropy.units.Unit` or str, optional
Time unit the offset should be returned in. By default, no unit
is used, i.e., an integer enumerating samples is returned. For the
special string 'time', the absolute time is calculated.
Returns
-------
offset : int, `~astropy.units.Quantity`, or `~astropy.time.Time`
Offset in current file (or time at current position).
"""
if unit is None:
return self.offset
# "isinstance" avoids costly comparisons of an actual unit with 'time'.
if not isinstance(unit, u.UnitBase) and unit == 'time':
return self.start_time + self.tell(unit=u.s)
return (self.offset / self.sample_rate).to(unit)
def __getattr__(self, attr):
"""Try to get things on the current open file if it is not on self."""
if attr in {'readable', 'writable', 'seekable', 'closed', 'name'}:
try:
return getattr(self.fh_raw, attr)
except AttributeError:
pass
# __getattribute__ to raise appropriate error.
return self.__getattribute__(attr)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
[docs] def close(self):
self.fh_raw.close()
def __repr__(self):
return ("<{s.__class__.__name__} name={s.name} offset={s.offset}\n"
" sample_rate={s.sample_rate},"
" samples_per_frame={s.samples_per_frame},\n"
" sample_shape={s.sample_shape}, bps={s.bps},\n"
" {sub}start_time={s.start_time.isot}>"
.format(s=self, sub=('subset={0}, '.format(self.subset)
if self.subset else '')))
[docs]class VLBIStreamReaderBase(VLBIStreamBase):
def __init__(self, fh_raw, header0, sample_rate, samples_per_frame,
unsliced_shape, bps, complex_data, squeeze, subset,
fill_value, verify):
if sample_rate is None:
try:
sample_rate = (samples_per_frame
* fh_raw.get_frame_rate()).to(u.MHz)
except Exception as exc:
exc.args += ("the sample rate could not be auto-detected. "
"This can happen if the file is too short to "
"determine the sample rate, or because it is "
"corrupted. Try passing in an explicit "
"`sample_rate`.",)
raise
super().__init__(
fh_raw, header0, sample_rate, samples_per_frame, unsliced_shape,
bps, complex_data, squeeze, subset, fill_value, verify)
if hasattr(header0, 'frame_nbytes'):
self._raw_offsets = RawOffsets(frame_nbytes=header0.frame_nbytes)
info = VLBIStreamReaderInfo()
def _squeeze_and_subset(self, data):
"""Possibly remove unit dimensions and subset the given data.
The first dimension (sample number) is never removed.
"""
if self.squeeze:
data = data.reshape(data.shape[:1]
+ tuple(sh for sh in data.shape[1:] if sh > 1))
if self.subset:
data = data[(slice(None),) + self.subset]
return data
def _get_sample_shape(self):
"""Get shape by applying squeeze and subset to a dummy data sample."""
# First apply base class, which squeezes if needed.
sample_shape = super()._get_sample_shape()
if not self.subset:
return sample_shape
# Now apply subset to a dummy sample that has the sample number as its
# value (where 13 is to bring bad luck to over-complicated subsets).
dummy_data = np.arange(13.)
dummy_sample = np.rollaxis( # Use moveaxis when numpy_min>=1.11
(np.zeros(sample_shape)[..., np.newaxis] + dummy_data), -1)
try:
dummy_subset = dummy_sample[(slice(None),) + self.subset]
# Check whether subset was in range and whether sample numbers were
# preserved (latter should be, but check anyway).
assert 0 not in dummy_subset.shape
assert np.all(dummy_subset == dummy_data.reshape(
(-1,) + (1,) * (dummy_subset.ndim - 1)))
except (IndexError, AssertionError) as exc:
exc.args += ("subset {} cannot be used to properly index "
"{}samples with shape {}.".format(
self.subset, "squeezed " if self.squeeze else "",
sample_shape),)
raise exc
# We got the shape. We only bother trying to associate names with the
# dimensions when we know for sure what happened in the subsetting.
subset_shape = dummy_subset.shape[1:]
if (not hasattr(sample_shape, '_fields') or subset_shape == ()
or len(self.subset) > len(sample_shape)):
return subset_shape
# We can only associate names when indexing each dimension separately
# gives a consistent result with the complete subset.
subset_axis = 0
fields = []
subset = self.subset + (slice(None),) * (len(sample_shape)
- len(self.subset))
try:
for field, sample_dim, item in zip(sample_shape._fields,
sample_shape, subset):
subset_dim = np.empty(sample_dim)[item].shape
assert len(subset_dim) <= 1 # No advanced multi-d indexing.
if len(subset_dim) == 1:
# If this dimension was not removed and matches that
# of the real subset, we now have a field label for it.
assert subset_dim[0] == subset_shape[subset_axis]
fields.append(field)
subset_axis += 1
except Exception:
# Things did not make sense, probably some advanced indexing;
# Just don't worry about having a named tuple.
return subset_shape
shape_cls = namedtuple('SampleShape', ','.join(fields))
return shape_cls(*subset_shape)
@lazyproperty
def _last_header(self):
"""Last header of the file."""
with self.fh_raw.temporary_offset() as fh_raw:
fh_raw.seek(-self.header0.frame_nbytes, 2)
try:
return fh_raw.find_header(self.header0, forward=False,
check=(-1, 1))
except HeaderNotFoundError as exc:
exc.args += ("corrupt VLBI frame? No frame in last {0} bytes."
.format(2 * self.header0.frame_nbytes),)
raise
# Override the following so we can refer to stop_time in the docstring.
@property
def start_time(self):
"""Start time of the file.
See also `time` for the time of the sample pointer's current offset,
and (if available) `stop_time` for the time at the end of the file.
"""
return super().start_time
@property
def time(self):
"""Time of the sample pointer's current offset in file.
See also `start_time` for the start time, and (if available)
`stop_time` for the end time, of the file.
"""
return super().time
@lazyproperty
def stop_time(self):
"""Time at the end of the file, just after the last sample.
See also `start_time` for the start time of the file, and `time` for
the time of the sample pointer's current offset.
"""
return (self._get_time(self._last_header)
+ (self.samples_per_frame / self.sample_rate).to(u.s))
@lazyproperty
def _nsample(self):
"""Number of complete samples in the stream data."""
return int(((self.stop_time - self.start_time)
* self.sample_rate).to(u.one).round())
@property
def shape(self):
"""Shape of the (squeezed/subset) stream data."""
return (self._nsample,) + self.sample_shape
@property
def size(self):
"""Total number of component samples in the (squeezed/subset) stream
data.
"""
prod = 1
for dim in self.shape:
prod *= dim
return prod
@property
def ndim(self):
"""Number of dimensions of the (squeezed/subset) stream data."""
return len(self.shape)
@property
def fill_value(self):
"""Value to use for invalid or missing data. Default: 0."""
return self._fill_value
[docs] def seek(self, offset, whence=0):
"""Change the stream position.
This works like a normal filehandle seek, but the offset is in samples
(or a relative or absolute time).
Parameters
----------
offset : int, `~astropy.units.Quantity`, or `~astropy.time.Time`
Offset to move to. Can be an (integer) number of samples,
an offset in time units, or an absolute time.
whence : {0, 1, 2, 'start', 'current', or 'end'}, optional
Like regular seek, the offset is taken to be from the start if
``whence=0`` (default), from the current position if 1,
and from the end if 2. One can alternativey use 'start',
'current', or 'end' for 0, 1, or 2, respectively. Ignored if
``offset`` is a time.
"""
try:
offset = operator.index(offset)
except Exception:
try:
offset = offset - self.start_time
except Exception:
pass
else:
whence = 0
offset = int((offset * self.sample_rate).to(u.one).round())
if whence == 0 or whence == 'start':
self.offset = offset
elif whence == 1 or whence == 'current':
self.offset += offset
elif whence == 2 or whence == 'end':
self.offset = self._nsample + offset
else:
raise ValueError("invalid 'whence'; should be 0 or 'start', 1 or "
"'current', or 2 or 'end'.")
return self.offset
@property
def dtype(self):
# TODO: arguably, this should be inferred from an actual payload.
return np.dtype(np.complex64 if self.complex_data else np.float32)
[docs] def readable(self):
"""Whether the file can be read and decoded."""
return self.fh_raw.readable and self.fh_raw.info.readable
[docs] def read(self, count=None, out=None):
"""Read a number of complete (or subset) samples.
The range retrieved can span multiple frames.
Parameters
----------
count : int or None, optional
Number of complete/subset samples to read. If `None` (default) or
negative, the whole file is read. Ignored if ``out`` is given.
out : None or array, optional
Array to store the data in. If given, ``count`` will be inferred
from the first dimension; the other dimension should equal
`sample_shape`.
Returns
-------
out : `~numpy.ndarray` of float or complex
The first dimension is sample-time, and the remainder given by
`sample_shape`.
"""
if out is None:
if count is None or count < 0:
count = self._nsample - self.offset
if count < 0:
raise EOFError("cannot read from beyond end of file.")
out = np.empty((count,) + self.sample_shape, dtype=self.dtype)
else:
assert out.shape[1:] == self.sample_shape, (
"'out' must have trailing shape {}".format(self.sample_shape))
count = out.shape[0]
offset0 = self.offset
sample = 0
while sample < count:
# For current position, get frame plus offset in that frame.
frame_index, sample_offset = divmod(self.offset,
self.samples_per_frame)
if frame_index != self._frame_index:
self._frame = self._read_frame(frame_index)
self._frame.fill_value = self.fill_value
self._frame_index = frame_index
frame = self._frame
nsample = min(count - sample, len(frame) - sample_offset)
data = frame[sample_offset:sample_offset + nsample]
data = self._squeeze_and_subset(data)
# Copy to relevant part of output.
out[sample:sample + nsample] = data
sample += nsample
# Explicitly set offset (just in case read_frame adjusts it too).
self.offset = offset0 + sample
return out
_next_index = None
def _read_frame(self, index):
"""Base implementation of reading a frame.
This contains two pieces which subclasses can override as needed
(or override the whole thing).
"""
self._seek_frame(index)
if not self.verify:
return self._fh_raw_read_frame()
# If we are reading with care, we read also a frame ahead
# to make sure that is not corrupted. If such a frame has
# been kept, we use it now. Otherwise, we read a new frame.
# We always remove the cached copy, since we cannot be sure
# it is even the right number.
if index == self._next_index:
frame = self._next_frame
frame_index = index
self.fh_raw.seek(frame.nbytes, 1)
self._next_index = self._next_frame = None
else:
self._next_index = self._next_frame = None
try:
frame = self._fh_raw_read_frame()
frame_index = self._tell_frame(frame)
except Exception as exc:
return self._bad_frame(index, None, exc)
# Check whether we actually got the right frame.
if frame_index != index:
return self._bad_frame(index, frame,
ValueError('wrong frame number.'))
# In either case, we check the next frame (if there is one).
try:
with self.fh_raw.temporary_offset():
self._next_frame = self._fh_raw_read_frame()
self._next_index = self._tell_frame(self._next_frame)
except Exception as exc:
return self._bad_frame(index, frame, exc)
return frame
def _bad_frame(self, index, frame, exc):
"""Deal with a bad frame.
Parameters
----------
index : int
Frame index that is to be read.
frame : `~baseband.vlbi_base.frame.VLBIFrameBase` or None
Frame that was read without failure. If not `None`, either
the frame index is wrong or the next frame could not be read.
exc : Exception
Exception that led to the call.
"""
if (frame is not None and self._tell_frame(frame) == index
and index == self._tell_frame(self._last_header)):
# If we got an exception because we're trying to read beyond the
# last frame, the frame is almost certainly OK, so keep it.
return frame
if self.verify != 'fix':
raise exc
msg = 'problem loading frame {}.'.format(index)
# Where should we be?
raw_offset = self._seek_frame(index)
# See if we're in the right place. First ensure we have a header.
# Here, it is more important that it is a good one than that we go
# too far, so we insist on two consistent frames after it. We
# increase the maximum a bit to be able to jump over bad bits.
self.fh_raw.seek(raw_offset)
try:
header = self.fh_raw.find_header(
self.header0, forward=True, check=(1, 2),
maximum=3*self.header0.frame_nbytes)
except HeaderNotFoundError:
exc.args += (msg + ' Cannot find header nearby.',)
raise exc
# Don't yet know how to deal with excess data.
header_index = self._tell_frame(header)
if header_index < index:
exc.args += (msg + ' There appears to be excess data.')
raise exc
# Go backward until we find previous frame, storing offsets
# as we go. We again increase the maximum since we may need
# to jump over a bad bit.
while header_index >= index:
raw_pos = self.fh_raw.tell()
header1 = header
header1_index = header_index
self.fh_raw.seek(-1, 1)
try:
header = self.fh_raw.find_header(
self.header0, forward=False,
maximum=4*self.header0.frame_nbytes)
except HeaderNotFoundError:
exc.args += (msg + ' Could not find previous index.',)
raise exc
header_index = self._tell_frame(header)
# While we are at it, update the list of known indices.
self._raw_offsets[header1_index] = raw_pos
# Move back to position of last good header (header1).
self.fh_raw.seek(raw_pos)
if header1_index > index:
# Frame is missing!
msg += ' The frame seems to be missing.'
# Construct a missing frame.
header = header1.copy()
self._set_time(header, self.time)
frame = self._frame.__class__(header, self._frame.payload,
valid=False)
else:
assert header1_index == index, \
'at this point, we should have a good header.'
if raw_pos != raw_offset:
msg += ' Stream off by {0} bytes.'.format(raw_offset
- raw_pos)
# Above, we should have added information about
# this index in our offset table.
assert index in self._raw_offsets.frame_nr
# At this point, reading the frame should always work,
# and we know there is a header right after it.
frame = self._fh_raw_read_frame()
assert self._tell_frame(frame) == index
warnings.warn(msg)
return frame
def _seek_frame(self, index):
"""Move the underlying file pointer to the frame of the given index."""
return self.fh_raw.seek(self._raw_offsets[index])
def _fh_raw_read_frame(self):
"""Read a frame at the current position of the underlying file."""
return self.fh_raw.read_frame(verify=self.verify)
def _tell_frame(self, frame):
"""Get the index of the frame relative to the first frame."""
dt = self._get_time(frame) - self.start_time
return int(round((dt * self._frame_rate).to_value(u.one)))
[docs]class VLBIStreamWriterBase(VLBIStreamBase):
def __init__(self, fh_raw, header0, sample_rate, samples_per_frame,
unsliced_shape, bps, complex_data, squeeze, subset,
fill_value, verify):
if sample_rate is None:
raise ValueError("must pass in an explicit `sample_rate`.")
super().__init__(
fh_raw, header0, sample_rate, samples_per_frame, unsliced_shape,
bps, complex_data, squeeze, subset, fill_value, verify)
def _unsqueeze(self, data):
new_shape = list(data.shape)
for i, dim in enumerate(self._unsliced_shape):
if dim == 1:
new_shape.insert(i + 1, 1)
return data.reshape(new_shape)
[docs] def write(self, data, valid=True):
"""Write data, buffering by frames as needed.
Parameters
----------
data : `~numpy.ndarray`
Piece of data to be written, with sample dimensions as given by
`sample_shape`. This should be properly scaled to make best use
of the dynamic range delivered by the encoding.
valid : bool, optional
Whether the current data are valid. Default: `True`.
"""
assert data.shape[1:] == self.sample_shape, (
"'data' should have trailing shape {}".format(self.sample_shape))
if self.squeeze:
data = self._unsqueeze(data)
count = data.shape[0]
offset0 = self.offset
sample = 0
while sample < count:
frame_index, sample_offset = divmod(self.offset,
self.samples_per_frame)
if frame_index != self._frame_index:
self._frame = self._make_frame(frame_index)
self._frame_index = frame_index
self._valid = valid
else:
self._valid &= valid
nsample = min(count - sample, len(self._frame) - sample_offset)
sample_end = sample_offset + nsample
self._frame[sample_offset:sample_end] = data[sample:
sample + nsample]
if sample_end == self.samples_per_frame:
self._write_frame(self._frame, valid=self._valid)
# Be sure we do not reuse this frame (might also be needed
# to write memmaps to disk).
del self._frame
self._frame_index = None
sample += nsample
# Explicitly set offset (just in case write_frame adjusts it too).
self.offset = offset0 + sample
def _write_frame(self, frame, valid=True):
# Default implementation is to assume this is a frame that can write
# the underlying binary file.
frame.valid = valid
frame.tofile(self.fh_raw)
[docs] def close(self):
extra = self.offset % self.samples_per_frame
if extra != 0:
warnings.warn("closing with partial buffer remaining. "
"Writing padded frame, marked as invalid.")
self.write(np.zeros((self.samples_per_frame - extra,)
+ self.sample_shape), valid=False)
assert self.offset % self.samples_per_frame == 0
return super().close()
default_open_doc = """Open baseband file(s) for reading or writing.
Opened as a binary file, one gets a wrapped filehandle that adds
methods to read/write a frame. Opened as a stream, the handle is
wrapped further, with methods such as reading and writing to the file
as if it were a stream of samples.
Parameters
----------
name : str or filehandle, or sequence of str
File name, filehandle, or sequence of file names (see Notes).
mode : {'rb', 'wb', 'rs', or 'ws'}, optional
Whether to open for reading or writing, and as a regular binary
file or as a stream. Default: 'rs', for reading a stream.
**kwargs
Additional arguments when opening the file as a stream.
"""
[docs]def make_opener(fmt, classes, doc='', append_doc=True):
"""Create a baseband file opener.
Parameters
----------
fmt : str
Name of the baseband format.
classes : dict
With the file/stream reader/writer classes keyed by names equal to
'FileReader', 'FileWriter', 'StreamReader', 'StreamWriter' prefixed by
``fmt``. Typically, one will pass in ``classes=globals()``.
doc : str, optional
If given, used to define the docstring of the opener.
append_doc : bool, optional
If `True` (default), append ``doc`` to the default docstring rather
than override it.
"""
module = classes.get('__name__', None)
classes = {cls_type: classes[fmt + cls_type]
for cls_type in ('FileReader', 'FileWriter',
'StreamReader', 'StreamWriter')}
def open(name, mode='rs', **kwargs):
# If sequentialfile object, check that it's opened properly.
if isinstance(name, sf.SequentialFileBase):
assert (('r' in mode and name.mode == 'rb')
or ('w' in mode and name.mode == 'w+b')), (
"open only accepts sequential files opened in 'rb' "
"mode for reading or 'w+b' mode for writing.")
# If passed some kind of list, open a sequentialfile object.
if isinstance(name, (tuple, list, sf.FileNameSequencer)):
if 'r' in mode:
name = sf.open(name, 'rb')
else:
file_size = kwargs.pop('file_size', None)
name = sf.open(name, 'w+b', file_size=file_size)
# Select FileReader/Writer for binary, StreamReader/Writer for stream.
if 'b' in mode:
cls_type = 'File'
else:
cls_type = 'Stream'
# Select reading or writing. Check if ``name`` is a filehandle, and
# open it if not.
if 'w' in mode:
cls_type += 'Writer'
got_fh = hasattr(name, 'write')
if not got_fh:
name = io.open(name, 'w+b')
elif 'r' in mode:
cls_type += 'Reader'
got_fh = hasattr(name, 'read')
if not got_fh:
name = io.open(name, 'rb')
else:
raise ValueError("only support opening {0} file for reading "
"or writing (mode='r' or 'w')."
.format(fmt))
# Try wrapping ``name`` with file or stream reader (``name`` is a
# binary filehandle at this point).
try:
return classes[cls_type](name, **kwargs)
except Exception as exc:
if not got_fh:
try:
name.close()
except Exception: # pragma: no cover
pass
raise exc
# Load custom documentation for format.
open.__doc__ = (default_open_doc.replace('baseband', fmt) + doc
if append_doc else doc)
# This ensures the function becomes visible to sphinx.
if module:
open.__module__ = module
return open