Source code for baseband.vlbi_base.base

# Licensed under the GPLv3 - see LICENSE
from __future__ import division, unicode_literals, print_function

import io
import warnings
import numpy as np
import operator
from collections import namedtuple
import astropy.units as u
from astropy.utils import lazyproperty
from .file_info import VLBIFileReaderInfo, VLBIStreamReaderInfo
from ..helpers import sequentialfile as sf


__all__ = ['VLBIFileBase', 'VLBIFileReaderBase', 'VLBIStreamBase',
           'VLBIStreamReaderBase', 'VLBIStreamWriterBase', 'make_opener']


[docs]class VLBIFileBase(object): """VLBI file wrapper, used to add frame methods to a binary data file. The underlying file is stored in ``fh_raw`` and all attributes that do not exist on the class itself are looked up on it. Parameters ---------- fh_raw : filehandle Filehandle of the raw binary data file. """ def __init__(self, fh_raw): self.fh_raw = fh_raw def __getattr__(self, attr): """Try to get things on the current open file if it is not on self.""" if not attr.startswith('_'): try: return getattr(self.fh_raw, attr) except AttributeError: pass # __getattribute__ to raise appropriate error. return self.__getattribute__(attr) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def close(self): self.fh_raw.close()
def __repr__(self): return "{0}(fh_raw={1})".format(self.__class__.__name__, self.fh_raw)
[docs]class VLBIFileReaderBase(VLBIFileBase): """VLBI wrapped file reader base class. Typically, a subclass will define ``read_header``, ``read_frame``, and ``find_header`` methods. This baseclass includes a `get_frame_rate` method which determines the frame rate by scanning the file for headers, looking for the maximum frame number that occurs before the jump down for the next second. This method requires the subclass to define a ``read_header`` method and assumes headers have a 'frame_nr' item, and define a ``payload_nbytes`` property (as do all standard VLBI formats). Parameters ---------- fh_raw : filehandle Filehandle of the raw binary data file. """ info = VLBIFileReaderInfo()
[docs] def get_frame_rate(self): """Determine the number of frames per second. The method cycles through headers, starting from the start of the file, finding the largest frame number before it jumps back to 0 for a new second. Returns ------- frame_rate : `~astropy.units.Quantity` Frames per second. Raises ------ `EOFError` If the file contains less than one second of data. """ oldpos = self.tell() self.seek(0) try: header = header0 = self.read_header() frame_nr0 = header0['frame_nr'] while header['frame_nr'] == frame_nr0: self.seek(header.payload_nbytes, 1) header = self.read_header() max_frame = frame_nr0 while header['frame_nr'] > 0: max_frame = max(header['frame_nr'], max_frame) self.seek(header.payload_nbytes, 1) header = self.read_header() finally: self.seek(oldpos) return (max_frame + 1) * u.Hz
[docs]class VLBIStreamBase(object): """VLBI file wrapper, allowing access as a stream of data.""" _sample_shape_maker = None def __init__(self, fh_raw, header0, sample_rate, samples_per_frame, unsliced_shape, bps, complex_data, squeeze, subset=(), fill_value=0., verify=True): self.fh_raw = fh_raw self._header0 = header0 self._bps = bps self._complex_data = complex_data self.samples_per_frame = samples_per_frame self.sample_rate = sample_rate self.offset = 0 self._fill_value = fill_value if self._sample_shape_maker is not None: self._unsliced_shape = self._sample_shape_maker(*unsliced_shape) else: self._unsliced_shape = unsliced_shape self._squeeze = bool(squeeze) if subset is None: subset = () elif not isinstance(subset, tuple): subset = (subset,) self._subset = subset self._sample_shape = self._get_sample_shape() self._frame_index = None self.verify = verify @property def squeeze(self): """Whether data arrays have dimensions with length unity removed. If `True`, data read out has such dimensions removed, and data passed in for writing has them inserted. """ return self._squeeze @property def subset(self): """Specific components of the complete sample to decode. The order of dimensions is the same as for `sample_shape`. Set by the class initializer. """ return self._subset def _get_sample_shape(self): """Get shape of possibly squeezed samples.""" if not self.squeeze: return self._unsliced_shape sample_shape = self._unsliced_shape squeezed_shape = tuple(dim for dim in sample_shape if dim > 1) fields = getattr(sample_shape, '_fields', None) if fields is None: return squeezed_shape fields = [field for field, dim in zip(fields, sample_shape) if dim > 1] shape_cls = namedtuple('SampleShape', ','.join(fields)) return shape_cls(*squeezed_shape) @property def sample_shape(self): """Shape of a complete sample (possibly subset or squeezed).""" return self._sample_shape def _get_time(self, header): """Get time from a header.""" # Subclasses can override this if information is needed beyond that # provided in the header. return header.time @lazyproperty def start_time(self): """Start time of the file. See also `time` for the time of the sample pointer's current offset, and (if available) `stop_time` for the time at the end of the file. """ return self._get_time(self.header0) @property def time(self): """Time of the sample pointer's current offset in file. See also `start_time` for the start time, and (if available) `stop_time` for the end time, of the file. """ return self.tell(unit='time') @property def header0(self): """First header of the file.""" return self._header0 @property def bps(self): """Bits per elementary sample.""" return self._bps @property def complex_data(self): """Whether the data are complex.""" return self._complex_data @property def samples_per_frame(self): """Number of complete samples per frame.""" return self._samples_per_frame @samples_per_frame.setter def samples_per_frame(self, samples_per_frame): try: self._samples_per_frame = operator.index(samples_per_frame) except Exception: raise TypeError("samples per frame must have an integer value.") @property def sample_rate(self): """Number of complete samples per second.""" return self._sample_rate @sample_rate.setter def sample_rate(self, sample_rate): # Check if sample_rate is a time rate. try: sample_rate.to(u.Hz) except u.UnitsError as exc: exc.args += ("sample rate must have units of 1 / time.",) raise self._sample_rate = sample_rate @property def verify(self): """Whether to do consistency checks on frames being read.""" return self._verify @verify.setter def verify(self, verify): self._verify = bool(verify)
[docs] def tell(self, unit=None): """Current offset in the file. Parameters ---------- unit : `~astropy.units.Unit` or str, optional Time unit the offset should be returned in. By default, no unit is used, i.e., an integer enumerating samples is returned. For the special string 'time', the absolute time is calculated. Returns ------- offset : int, `~astropy.units.Quantity`, or `~astropy.time.Time` Offset in current file (or time at current position). """ if unit is None: return self.offset if unit == 'time': return self.start_time + self.tell(unit=u.s) return (self.offset / self.sample_rate).to(unit)
def __getattr__(self, attr): """Try to get things on the current open file if it is not on self.""" if attr in {'readable', 'writable', 'seekable', 'closed', 'name'}: try: return getattr(self.fh_raw, attr) except AttributeError: pass # __getattribute__ to raise appropriate error. return self.__getattribute__(attr) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def close(self): self.fh_raw.close()
def __repr__(self): return ("<{s.__class__.__name__} name={s.name} offset={s.offset}\n" " sample_rate={s.sample_rate}," " samples_per_frame={s.samples_per_frame},\n" " sample_shape={s.sample_shape}, bps={s.bps},\n" " {sub}start_time={s.start_time.isot}>" .format(s=self, sub=('subset={0}, '.format(self.subset) if self.subset else '')))
[docs]class VLBIStreamReaderBase(VLBIStreamBase): def __init__(self, fh_raw, header0, sample_rate, samples_per_frame, unsliced_shape, bps, complex_data, squeeze, subset, fill_value, verify): if sample_rate is None: try: sample_rate = (samples_per_frame * fh_raw.get_frame_rate()).to(u.MHz) except Exception as exc: exc.args += ("the sample rate could not be auto-detected. " "This can happen if the file is too short to " "determine the sample rate, or because it is " "corrupted. Try passing in an explicit " "`sample_rate`.",) raise super(VLBIStreamReaderBase, self).__init__( fh_raw, header0, sample_rate, samples_per_frame, unsliced_shape, bps, complex_data, squeeze, subset, fill_value, verify) info = VLBIStreamReaderInfo() def _squeeze_and_subset(self, data): """Possibly remove unit dimensions and subset the given data. The first dimension (sample number) is never removed. """ if self.squeeze: data = data.reshape(data.shape[:1] + tuple(sh for sh in data.shape[1:] if sh > 1)) if self.subset: data = data[(slice(None),) + self.subset] return data def _get_sample_shape(self): """Get shape by applying squeeze and subset to a dummy data sample.""" # First apply base class, which squeezes if needed. sample_shape = super(VLBIStreamReaderBase, self)._get_sample_shape() if not self.subset: return sample_shape # Now apply subset to a dummy sample that has the sample number as its # value (where 13 is to bring bad luck to over-complicated subsets). dummy_data = np.arange(13.) dummy_sample = np.rollaxis( # Use moveaxis when numpy_min>=1.11 (np.zeros(sample_shape)[..., np.newaxis] + dummy_data), -1) try: dummy_subset = dummy_sample[(slice(None),) + self.subset] # Check whether subset was in range and whether sample numbers were # preserved (latter should be, but check anyway). assert 0 not in dummy_subset.shape assert np.all(dummy_subset == dummy_data.reshape( (-1,) + (1,) * (dummy_subset.ndim - 1))) except (IndexError, AssertionError) as exc: exc.args += ("subset {} cannot be used to properly index " "{}samples with shape {}.".format( self.subset, "squeezed " if self.squeeze else "", sample_shape),) raise exc # We got the shape. We only bother trying to associate names with the # dimensions when we know for sure what happened in the subsetting. subset_shape = dummy_subset.shape[1:] if (not hasattr(sample_shape, '_fields') or subset_shape == () or len(self.subset) > len(sample_shape)): return subset_shape # We can only associate names when indexing each dimension separately # gives a consistent result with the complete subset. subset_axis = 0 fields = [] subset = self.subset + (slice(None),) * (len(sample_shape) - len(self.subset)) try: for field, sample_dim, item in zip(sample_shape._fields, sample_shape, subset): subset_dim = np.empty(sample_dim)[item].shape assert len(subset_dim) <= 1 # No advanced multi-d indexing. if len(subset_dim) == 1: # If this dimension was not removed and matches that # of the real subset, we now have a field label for it. assert subset_dim[0] == subset_shape[subset_axis] fields.append(field) subset_axis += 1 except Exception: # Things did not make sense, probably some advanced indexing; # Just don't worry about having a named tuple. return subset_shape shape_cls = namedtuple('SampleShape', ','.join(fields)) return shape_cls(*subset_shape) @lazyproperty def _last_header(self): """Last header of the file.""" raw_offset = self.fh_raw.tell() self.fh_raw.seek(-self.header0.frame_nbytes, 2) last_header = self.fh_raw.find_header(forward=False) self.fh_raw.seek(raw_offset) if last_header is None: raise ValueError("corrupt VLBI frame? No frame in last {0} bytes." .format(10 * self.header0.frame_nbytes)) return last_header @lazyproperty def stop_time(self): """Time at the end of the file, just after the last sample. See also `start_time` for the start time of the file, and `time` for the time of the sample pointer's current offset. """ return (self._get_time(self._last_header) + (self.samples_per_frame / self.sample_rate).to(u.s)) @lazyproperty def _nsample(self): """Number of complete samples in the stream data.""" return int(((self.stop_time - self.start_time) * self.sample_rate).to(u.one).round()) @property def shape(self): """Shape of the (squeezed/subset) stream data.""" return (self._nsample,) + self.sample_shape @property def size(self): """Total number of component samples in the (squeezed/subset) stream data. """ prod = 1 for dim in self.shape: prod *= dim return prod @property def ndim(self): """Number of dimensions of the (squeezed/subset) stream data.""" return len(self.shape) @property def fill_value(self): """Value to use for invalid or missing data. Default: 0.""" return self._fill_value
[docs] def seek(self, offset, whence=0): """Change the stream position. This works like a normal filehandle seek, but the offset is in samples (or a relative or absolute time). Parameters ---------- offset : int, `~astropy.units.Quantity`, or `~astropy.time.Time` Offset to move to. Can be an (integer) number of samples, an offset in time units, or an absolute time. whence : {0, 1, 2, 'start', 'current', or 'end'}, optional Like regular seek, the offset is taken to be from the start if ``whence=0`` (default), from the current position if 1, and from the end if 2. One can alternativey use 'start', 'current', or 'end' for 0, 1, or 2, respectively. Ignored if ``offset`` is a time. """ try: offset = operator.index(offset) except Exception: try: offset = offset - self.start_time except Exception: pass else: whence = 0 offset = int((offset * self.sample_rate).to(u.one).round()) if whence == 0 or whence == 'start': self.offset = offset elif whence == 1 or whence == 'current': self.offset += offset elif whence == 2 or whence == 'end': self.offset = self._nsample + offset else: raise ValueError("invalid 'whence'; should be 0 or 'start', 1 or " "'current', or 2 or 'end'.") return self.offset
@property def dtype(self): return np.complex64 if self.complex_data else np.float32
[docs] def read(self, count=None, out=None): """Read a number of complete (or subset) samples. The range retrieved can span multiple frames. Parameters ---------- count : int or None, optional Number of complete/subset samples to read. If `None` (default) or negative, the whole file is read. Ignored if ``out`` is given. out : None or array, optional Array to store the data in. If given, ``count`` will be inferred from the first dimension; the other dimension should equal `sample_shape`. Returns ------- out : `~numpy.ndarray` of float or complex The first dimension is sample-time, and the remainder given by `sample_shape`. """ if out is None: if count is None or count < 0: count = self._nsample - self.offset if count < 0: raise EOFError("cannot read from beyond end of file.") out = np.empty((count,) + self.sample_shape, dtype=self.dtype) else: assert out.shape[1:] == self.sample_shape, ( "'out' should have trailing shape {}".format(self.sample_shape)) count = out.shape[0] offset0 = self.offset sample = 0 while count > 0: # For current position, get frame plus offset in that frame. frame_index, sample_offset = divmod(self.offset, self.samples_per_frame) if frame_index != self._frame_index: self._frame = self._read_frame(frame_index) self._frame_index = frame_index frame = self._frame nsample = min(count, len(frame) - sample_offset) data = frame[sample_offset:sample_offset + nsample] data = self._squeeze_and_subset(data) # Copy to relevant part of output. out[sample:sample + nsample] = data sample += nsample # Explicitly set offset (just in case read_frame adjusts it too). self.offset = offset0 + sample count -= nsample return out
[docs]class VLBIStreamWriterBase(VLBIStreamBase): def __init__(self, fh_raw, header0, sample_rate, samples_per_frame, unsliced_shape, bps, complex_data, squeeze, subset, fill_value, verify): if sample_rate is None: raise ValueError("must pass in an explicit `sample_rate`.") super(VLBIStreamWriterBase, self).__init__( fh_raw, header0, sample_rate, samples_per_frame, unsliced_shape, bps, complex_data, squeeze, subset, fill_value, verify) def _unsqueeze(self, data): new_shape = list(data.shape) for i, dim in enumerate(self._unsliced_shape): if dim == 1: new_shape.insert(i + 1, 1) return data.reshape(new_shape)
[docs] def write(self, data, valid=True): """Write data, buffering by frames as needed. Parameters ---------- data : `~numpy.ndarray` Piece of data to be written, with sample dimensions as given by `sample_shape`. This should be properly scaled to make best use of the dynamic range delivered by the encoding. valid : bool, optional Whether the current data are valid. Default: `True`. """ assert data.shape[1:] == self.sample_shape, ( "'data' should have trailing shape {}".format(self.sample_shape)) if self.squeeze: data = self._unsqueeze(data) count = data.shape[0] offset0 = self.offset sample = 0 while count > 0: frame_index, sample_offset = divmod(self.offset, self.samples_per_frame) if frame_index != self._frame_index: self._frame = self._make_frame(frame_index) self._frame_index = frame_index self._valid = valid else: self._valid &= valid nsample = min(count, len(self._frame) - sample_offset) sample_end = sample_offset + nsample self._frame[sample_offset:sample_end] = data[sample: sample + nsample] if sample_end == self.samples_per_frame: self._write_frame(self._frame, valid=self._valid) # Be sure we do not reuse this frame (might also be needed # to write memmaps to disk). del self._frame self._frame_index = None sample += nsample # Explicitly set offset (just in case write_frame adjusts it too). self.offset = offset0 + sample count -= nsample
def _write_frame(self, frame, valid=True): # Default implementation is to assume this is a frame that can write # the underlying binary file. frame.valid = valid frame.tofile(self.fh_raw)
[docs] def close(self): extra = self.offset % self.samples_per_frame if extra != 0: warnings.warn("closing with partial buffer remaining. " "Writing padded frame, marked as invalid.") self.write(np.zeros((self.samples_per_frame - extra,) + self.sample_shape), valid=False) assert self.offset % self.samples_per_frame == 0 return super(VLBIStreamWriterBase, self).close()
default_open_doc = """Open baseband file(s) for reading or writing. Opened as a binary file, one gets a wrapped filehandle that adds methods to read/write a frame. Opened as a stream, the handle is wrapped further, with methods such as reading and writing to the file as if it were a stream of samples. Parameters ---------- name : str or filehandle, or sequence of str File name, filehandle, or sequence of file names (see Notes). mode : {'rb', 'wb', 'rs', or 'ws'}, optional Whether to open for reading or writing, and as a regular binary file or as a stream. Default: 'rs', for reading a stream. **kwargs Additional arguments when opening the file as a stream. """
[docs]def make_opener(fmt, classes, doc='', append_doc=True): """Create a baseband file opener. Parameters ---------- fmt : str Name of the baseband format. classes : dict With the file/stream reader/writer classes keyed by names equal to 'FileReader', 'FileWriter', 'StreamReader', 'StreamWriter' prefixed by ``fmt``. Typically, one will pass in ``classes=globals()``. doc : str, optional If given, used to define the docstring of the opener. append_doc : bool, optional If `True` (default), append ``doc`` to the default docstring rather than override it. """ module = classes.get('__name__', None) classes = {cls_type: classes[fmt + cls_type] for cls_type in ('FileReader', 'FileWriter', 'StreamReader', 'StreamWriter')} def open(name, mode='rs', **kwargs): # If sequentialfile object, check that it's opened properly. if isinstance(name, sf.SequentialFileBase): assert (('r' in mode and name.mode == 'rb') or ('w' in mode and name.mode == 'w+b')), ( "open only accepts sequential files opened in 'rb' " "mode for reading or 'w+b' mode for writing.") # If passed some kind of list, open a sequentialfile object. if isinstance(name, (tuple, list, sf.FileNameSequencer)): if 'r' in mode: name = sf.open(name, 'rb') else: file_size = kwargs.pop('file_size', None) name = sf.open(name, 'w+b', file_size=file_size) # Select FileReader/Writer for binary, StreamReader/Writer for stream. if 'b' in mode: cls_type = 'File' else: cls_type = 'Stream' # Select reading or writing. Check if ``name`` is a filehandle, and # open it if not. if 'w' in mode: cls_type += 'Writer' got_fh = hasattr(name, 'write') if not got_fh: name = io.open(name, 'w+b') elif 'r' in mode: cls_type += 'Reader' got_fh = hasattr(name, 'read') if not got_fh: name = io.open(name, 'rb') else: raise ValueError("only support opening {0} file for reading " "or writing (mode='r' or 'w')." .format(fmt)) # Try wrapping ``name`` with file or stream reader (``name`` is a # binary filehandle at this point). try: return classes[cls_type](name, **kwargs) except Exception as exc: if not got_fh: try: name.close() except Exception: # pragma: no cover pass raise exc # Load custom documentation for format. open.__doc__ = (default_open_doc.replace('baseband', fmt) + doc if append_doc else doc) # This ensures the function becomes visible to sphinx. if module: open.__module__ = module return open