Source code for baseband.gsb.base

# Licensed under the GPLv3 - see LICENSE
import warnings

import numpy as np
import astropy.units as u
from astropy.utils import lazyproperty

from ..base.base import (
    FileBase,
    StreamBase, StreamReaderBase, StreamWriterBase,
    FileOpener, FileInfo)
from .header import GSBHeader
from .payload import GSBPayload
from .frame import GSBFrame
from .file_info import GSBTimeStampInfo, GSBStreamReaderInfo


__all__ = ['GSBTimeStampIO', 'GSBFileReader', 'GSBFileWriter',
           'GSBStreamBase', 'GSBStreamReader', 'GSBStreamWriter',
           'open', 'info']


[docs]class GSBTimeStampIO(FileBase): """Simple reader/writer for GSB time stamp files. Wraps a binary filehandle, providing methods `read_timestamp`, `write_timestamp`, and `get_frame_rate`. Parameters ---------- fh_raw : filehandle Filehandle to the timestamp file, opened in binary mode. """ info = GSBTimeStampInfo()
[docs] def read_timestamp(self): """Read a single timestamp. Returns ------- frame : `~baseband.gsb.GSBHeader` With a ``.time`` property that returns the time encoded. """ return GSBHeader.fromfile(self.fh_raw)
[docs] def write_timestamp(self, header=None, **kwargs): """Write a single timestamp. Parameters ---------- header : `~baseband.gsb.GSBHeader`, optional Header holding time to be written to disk. Can instead give keyword arguments to construct a header. **kwargs : If ``header`` is not given, these are used to initialize one. """ if header is None: header = GSBHeader.fromvalues(**kwargs) header.tofile(self.fh_raw)
[docs] def get_frame_rate(self): """Determine the number of frames per second. The frame rate is inferred from the first two timestamps. Returns ------- frame_rate : `~astropy.units.Quantity` Frames per second. """ with self.temporary_offset(0): timestamp0 = self.read_timestamp() timestamp1 = self.read_timestamp() return (1. / (timestamp1.time - timestamp0.time)).to(u.Hz)
[docs]class GSBFileReader(FileBase): """Simple reader for GSB data files. Wraps a binary filehandle, providing a `read_payload` method to help interpret the data. Parameters ---------- payload_nbytes : int Number of bytes to read. nchan : int, optional Number of channels. Default: 1. bps : int, optional Bits per elementary sample, i.e. per real or imaginary component for complex data. Default: 4. complex_data : bool, optional Whether data are complex. Default: `False`. """ def __init__(self, fh_raw, payload_nbytes, nchan=1, bps=4, complex_data=False): self.payload_nbytes = payload_nbytes self.nchan = nchan self.bps = bps self.complex_data = complex_data super().__init__(fh_raw) def __repr__(self): return ("{name}(fh_raw={s.fh_raw}, payload_nbytes={s.payload_nbytes}, " "nchan={s.nchan}, bps={s.bps}, complex_data={s.complex_data})" .format(name=self.__class__.__name__, s=self))
[docs] def read_payload(self): """Read a single block. Returns ------- frame : `~baseband.gsb.GSBPayload` With a ``.data`` property that returns the data encoded. """ return GSBPayload.fromfile(self.fh_raw, payload_nbytes=self.payload_nbytes, sample_shape=(self.nchan,), bps=self.bps, complex_data=self.complex_data)
[docs]class GSBFileWriter(FileBase): """Simple writer for GSB data files. Adds `write_payload` method to the basic binary file wrapper. """
[docs] def write_payload(self, data, bps=4): """Write single data block. Parameters ---------- data : `~numpy.ndarray` or `~baseband.gsb.GSBPayload` If an array, ``bps`` needs to be passed in. bps : int, optional Bits per elementary sample, to use when encoding the payload. Ignored if ``data`` is a GSB payload. Default: 4. """ if not isinstance(data, GSBPayload): data = GSBPayload.fromdata(data, bps=bps) return data.tofile(self.fh_raw)
[docs]class GSBStreamBase(StreamBase): """Base for GSB streams.""" _sample_shape_maker = GSBPayload._sample_shape_maker def __init__(self, fh_ts, fh_raw, header0, sample_rate=None, samples_per_frame=None, payload_nbytes=None, nchan=None, bps=None, complex_data=None, **kwargs): self.fh_ts = fh_ts rawdump = header0.mode == 'rawdump' if isinstance(fh_raw, (tuple, list)): assert not rawdump for pair in fh_raw: assert isinstance(pair, (tuple, list)) assert len(pair) == len(fh_raw[0]) elif not rawdump: fh_raw = ((fh_raw,),) complex_data = (complex_data if complex_data is not None else (False if rawdump else True)) bps = bps if bps is not None else (4 if rawdump else 8) nchan = nchan if nchan is not None else (1 if rawdump else 512) bpfs = bps * nchan * (2 if complex_data else 1) default_frame_rate = (1e8/6/2**22)*u.Hz nfiles = 1 if rawdump else len(fh_raw[0]) # By default, GSB payloads are always 4 MB (which can combine to # frames of 8 MB for phased if two streams are used). if payload_nbytes is None: if samples_per_frame is None: if sample_rate is None: payload_nbytes = 2**22 else: payload_nbytes = int((sample_rate / default_frame_rate * bpfs / 8 / nfiles) .to(u.one).round()) else: payload_nbytes = samples_per_frame * bpfs // (8 * nfiles) if samples_per_frame is None: samples_per_frame = payload_nbytes * 8 // bpfs * nfiles elif samples_per_frame != payload_nbytes*nfiles*8/bpfs: raise ValueError('inconsistent samples_per_frame, bps, ' 'complex_data, and payload_nbytes') if sample_rate is None: sample_rate = samples_per_frame * default_frame_rate sample_shape = (nchan,) if rawdump else (len(fh_raw), nchan) super().__init__( fh_raw, header0, sample_rate=sample_rate, samples_per_frame=samples_per_frame, sample_shape=sample_shape, bps=bps, complex_data=complex_data, **kwargs) self._payload_nbytes = payload_nbytes @property def payload_nbytes(self): """Number of bytes per payload, divided by the number of raw files.""" return self._payload_nbytes def __getattr__(self, attr): """Try to get things on the current open file if it is not on self.""" if attr in {'readable', 'writable', 'seekable', 'closed', 'name'}: fh_raw = (self.fh_raw if self.header0.mode == 'rawdump' else self.fh_raw[0][0]) try: return getattr(fh_raw, attr) except AttributeError: # pragma: no cover pass # __getattribute__ to raise appropriate error. return self.__getattribute__(attr) def _set_index(self, header, index): if self.header0.mode == 'phased': time_offset = index / self._frame_rate # mem_block is a rotating modulo-8 value with no meaning. header.update(gps_time=self.header0.gps_time + time_offset, pc_time=self.header0.pc_time + time_offset, seq_nr=self.header0['seq_nr'] + index, mem_block=(self.header0['mem_block'] + index) % 8) else: super()._set_index(header, index)
[docs] def close(self): self.fh_ts.close() if self.header0.mode == 'rawdump': self.fh_raw.close() else: for fh_pair in self.fh_raw: for fh in fh_pair: fh.close()
def __repr__(self): if isinstance(self.fh_raw, (list, tuple)): data_name = tuple(tuple(p.name.split('/')[-1] for p in pol) for pol in self.fh_raw) else: data_name = self.fh_raw.name return ("<{s.__class__.__name__} header={s.fh_ts.name}" " offset= {s.offset}\n data={dn}\n" " sample_rate={s.sample_rate:.5g}," " samples_per_frame={s.samples_per_frame},\n" " sample_shape={s.sample_shape}, bps={s.bps},\n" " {sub}start_time={s.start_time.isot}>" .format(s=self, dn=data_name, sub=( 'subset={0}, '.format(self.subset) if self.subset else '')))
[docs]class GSBStreamReader(GSBStreamBase, StreamReaderBase): """GSB format reader. Allows access to GSB files as a continuous series of samples. Requires both a timestamp and one or more corresponding raw data files. Parameters ---------- fh_ts : filehandle For reading timestamps. fh_raw : filehandle, or nested tuple of filehandles Raw binary data filehandle(s). A single file is needed for rawdump, and a tuple for phased. For a nested tuple, the outer tuple determines the number of polarizations, and the inner tuple(s) the number of streams per polarization. E.g., ``((polL1, polL2), (polR1, polR2))`` for two streams per polarization. A single tuple is interpreted as streams of a single polarization. sample_rate : `~astropy.units.Quantity`, optional Number of complete samples per second, i.e. the rate at which each channel of each polarization is sampled. If `None`, will be inferred assuming the frame rate is exactly 0.25165824 s. samples_per_frame : int, optional Number of complete samples per frame (possibly combining two files). Can give ``payload_nbytes`` instead. payload_nbytes : int, optional Number of bytes per payload (in each raw file separately). If both ``samples_per_frame`` and ``payload_nbytes`` are `None`, ``payload_nbytes`` is set to ``2**22`` (4 MiB). nchan : int, optional Number of channels. Default: 1 for rawdump, 512 for phased. bps : int, optional Bits per elementary sample, i.e. per real or imaginary component for complex data. Default: 4 for rawdump, 8 for phased. complex_data : bool, optional Whether data are complex. Default: `False` for rawdump, `True` for phased. squeeze : bool, optional If `True` (default), remove any dimensions of length unity from decoded data. subset : indexing object or tuple of objects, optional Specific components of the complete sample to decode (after possibly squeezing). If a single indexing object is passed, it selects (available) polarizations. If a tuple is passed, the first selects polarizations and the second selects channels. If the tuple is empty (default), all components are read. verify : bool, optional Whether to do basic checks of frame integrity when reading. The first frame of the stream is always checked. Default: `True`. """ # TODO: right we are not really compatible with StreamReaderBase, # since we need to access multiple files. Can this be solved with # FileWriter/FileReader classes that handle timestamps and multiple blocks, # combining these into a frame? def __init__(self, fh_ts, fh_raw, sample_rate=None, samples_per_frame=None, payload_nbytes=None, nchan=None, bps=None, complex_data=None, squeeze=True, subset=(), verify=True): fh_ts = GSBTimeStampIO(fh_ts) header0 = fh_ts.read_timestamp() super().__init__( fh_ts, fh_raw, header0, sample_rate=sample_rate, samples_per_frame=samples_per_frame, payload_nbytes=payload_nbytes, nchan=nchan, bps=bps, complex_data=complex_data, squeeze=squeeze, subset=subset, verify=verify) self.fh_ts.seek(0) # Replace fh_raw with GSBFileReader instances. fr_kwargs = dict(payload_nbytes=self._payload_nbytes, nchan=self._unsliced_shape.nchan, bps=self.bps, complex_data=self.complex_data) if header0.mode == 'rawdump': self.fh_raw = GSBFileReader(self.fh_raw, **fr_kwargs) else: self.fh_raw = [[GSBFileReader(fh, **fr_kwargs) for fh in fh_pair] for fh_pair in self.fh_raw] info = GSBStreamReaderInfo() @lazyproperty def _last_header(self): """Last header of the timestamp file.""" with self.fh_ts.temporary_offset() as fh: # Guess based on a fixed header size. In reality, this # may be an overestimate as the headers can grow in size, # or an underestimate as the last header may be partial. # So, search around to be sure. fh_size = fh.seek(0, 2) guess = max(fh_size // self.header0.nbytes, 1) while self.header0.seek_offset(guess) > fh_size: guess -= 1 while self.header0.seek_offset(guess) < fh_size: guess += 1 # Now see if there is indeed a nice header before. fh.seek(self.header0.seek_offset(guess-1)) last_line = fh.readline() last_line_tuple = last_line.split() # But realize that sometimes an incomplete header is written. try: if (len(" ".join(last_line_tuple)) < len(" ".join(self.header0.words))): raise EOFError last_header = self.header0.__class__(last_line_tuple) # Check header time can be parsed. last_header.time except Exception: warnings.warn("The last header entry, '{0}', has an incorect " "length. Using the second-to-last entry instead." .format(last_line)) fh.seek(self.header0.seek_offset(guess-2)) last_line_tuple = fh.readline().split() last_header = self.header0.__class__(last_line_tuple) return last_header
[docs] def readable(self): """Whether the file can be read and decoded.""" return self.info.readable
def _seek_frame(self, index): self.fh_ts.seek(self.header0.seek_offset(index)) if self.header0.mode == 'rawdump': self.fh_raw.seek(index * self._payload_nbytes) else: for fh_pair in self.fh_raw: for fh in fh_pair: fh.seek(index * self._payload_nbytes) def _fh_raw_read_frame(self): return GSBFrame.fromfile(self.fh_ts, self.fh_raw, payload_nbytes=self._payload_nbytes, sample_shape=self._unsliced_shape, bps=self.bps, complex_data=self.complex_data, verify=self.verify)
[docs]class GSBStreamWriter(GSBStreamBase, StreamWriterBase): """GSB format writer. Encodes and writes sequences of samples to file. Parameters ---------- fh_ts : filehandle For writing time stamps to storage. fh_raw : filehandle, or nested tuple of filehandles For writing raw binary data to storage. A single file is needed for rawdump, and a tuple for phased. For a nested tuple, the outer tuple determines the number of polarizations, and the inner tuple(s) the number of streams per polarization. E.g., ``((polL1, polL2), (polR1, polR2))`` for two streams per polarization. A single tuple is interpreted as streams of a single polarization. header0 : `~baseband.gsb.GSBHeader` Header for the first frame, holding time information, etc. sample_rate : `~astropy.units.Quantity`, optional Number of complete samples per second, i.e. the rate at which each channel of each polarization is sampled. If not given, will be inferred assuming the frame rate is exactly 0.25165824 s. samples_per_frame : int, optional Number of complete samples per frame (possibly combining two files). Can give ``payload_nbytes`` instead. payload_nbytes : int, optional Number of bytes per payload (in each raw file separately). If both ``samples_per_frame`` and ``payload_nbytes`` are `None`, ``payload_nbytes`` is set to ``2**22`` (4 MiB). nchan : int, optional Number of channels. Default: 1 for rawdump, 512 for phased. bps : int, optional Bits per elementary sample, i.e. per real or imaginary component for complex data. Default: 4 for rawdump, 8 for phased. complex_data : bool, optional Whether data are complex. Default: `False` for rawdump, `True` for phased. squeeze : bool, optional If `True` (default), `write` accepts squeezed arrays as input, and adds any dimensions of length unity. """ def __init__(self, fh_ts, fh_raw, header0=None, sample_rate=None, samples_per_frame=None, payload_nbytes=None, nchan=None, bps=None, complex_data=None, squeeze=True): fh_ts = GSBTimeStampIO(fh_ts) super().__init__( fh_ts, fh_raw, header0, sample_rate=sample_rate, samples_per_frame=samples_per_frame, payload_nbytes=payload_nbytes, nchan=nchan, bps=bps, complex_data=complex_data, squeeze=squeeze) self._frame = GSBFrame.fromdata( np.zeros((self.samples_per_frame,) + self._unsliced_shape, (np.complex64 if self.complex_data else np.float32)), header=self.header0.copy(), bps=self.bps) def _fh_raw_write_frame(self, frame): assert frame.valid frame.tofile(self.fh_ts, self.fh_raw)
[docs] def flush(self): self.fh_ts.flush() try: self.fh_raw.flush() except AttributeError: for fh_pair in self.fh_raw: for fh in fh_pair: fh.flush()
class GSBFileOpener(FileOpener): non_header_keys = FileOpener.non_header_keys | {'raw'} # TODO: think whether the scheme with using FileReader can be made to work. def __call__(self, name, mode='rs', **kwargs): mode = self.normalize_mode(mode) # For binary or timestamp files, the normal opener works fine. if mode[1] != 's': return super().__call__(name, mode, **kwargs) # But for stream mode, we need to open both raw and timestamp. raw = kwargs.pop('raw', None) if raw is None: raise TypeError("stream missing required argument 'raw'.") fh = self.get_fh(name, mode[0]+'t') stream_mode = kwargs.pop('header_mode', 'phased' if isinstance(raw, (list, tuple)) else 'rawdump') if stream_mode == 'rawdump': fh_raw = self.get_fh(raw, mode[0]+'b') else: if not isinstance(raw, (list, tuple)): raw = ((raw,),) elif not isinstance(raw[0], (list, tuple)): raw = (raw,) fh_raw = tuple(tuple(self.get_fh(p, mode[0]+'b') for p in pol) for pol in raw) if mode == 'ws' and 'header0' not in kwargs: kwargs['mode'] = stream_mode kwargs['header0'] = self.get_header0(kwargs) try: return self.classes[mode](fh, fh_raw=fh_raw, **kwargs) except Exception: if fh is not name: fh.close() if isinstance(raw, (list, tuple)): for pol, polfh in zip(raw, fh_raw): for p, pfh in zip(pol, polfh): if pfh is not p: pfh.close() elif fh_raw is not raw: fh_raw.close() raise open = GSBFileOpener('GSB', header_class=GSBHeader, classes={ 'rt': GSBTimeStampIO, 'wt': GSBTimeStampIO, 'rb': GSBFileReader, 'wb': GSBFileWriter, 'rs': GSBStreamReader, 'ws': GSBStreamWriter}).wrapped(module=__name__, doc=""" Open GSB file(s) for reading or writing. A GSB data set contains a text header file and one or more raw data files. When the file is opened as text, one gets a standard filehandle, but with methods to read/write timestamps. When it is opened as a binary, one similarly gets methods to read/write frames. Opened as a stream, the file is interpreted as a timestamp file, but raw files need to be given too. This allows access to the stream(s) as series of samples. Parameters ---------- name : str Filename of timestamp or raw data file. mode : {'rb', 'wb', 'rt', 'wt', 'rs', or 'ws'}, optional Whether to open for reading or writing, and as a regular text or binary file (for timestamps and data, respectively) or as a stream. Default: 'rs', for reading a stream. **kwargs Additional arguments when opening the file as a stream. --- For both reading and writing of streams : raw : str or (tuple of) tuple of str Name of files holding payload data. A single file is needed for rawdump, and a tuple for phased. For a nested tuple, the outer tuple determines the number of polarizations, and the inner tuple(s) the number of streams per polarization. E.g., ``((polL1, polL2), (polR1, polR2))`` for two streams per polarization. A single tuple is interpreted as streams of a single polarization. sample_rate : `~astropy.units.Quantity`, optional Number of complete samples per second, i.e. the rate at which each channel of each polarization is sampled. If `None`, will be inferred assuming the frame rate is exactly 251.658240 ms. samples_per_frame : int, optional Number of complete samples per frame. Can give ``payload_nbytes`` instead. payload_nbytes : int, optional Number of bytes per payload (in each raw file separately). If both ``samples_per_frame`` and ``payload_nbytes`` are `None`, ``payload_nbytes`` is set to ``2**22`` (4 MiB). nchan : int, optional Number of channels. Default: 1 for rawdump, 512 for phased. bps : int, optional Bits per elementary sample, i.e. per real or imaginary component for complex data. Default: 4 for rawdump, 8 for phased. complex_data : bool, optional Whether data are complex. Default: `False` for rawdump, `True` for phased. squeeze : bool, optional If `True` (default) and reading, remove any dimensions of length unity from decoded data. If `True` and writing, accept squeezed arrays as input, and adds any dimensions of length unity. --- For reading only : (see `~baseband.gsb.base.GSBStreamReader`) subset : indexing object or tuple of objects, optional Specific components of the complete sample to decode (after possibly squeezing). If a single indexing object is passed, it selects (available) polarizations. If a tuple is passed, the first selects polarizations and the second selects channels. If the tuple is empty (default), all components are read. verify : bool, optional Whether to do basic checks of frame integrity when reading. The first frame of the stream is always checked. Default: `True`. --- For writing only : (see `~baseband.gsb.base.GSBStreamWriter`) header0 : `~baseband.gsb.GSBHeader` Header for the first frame, holding time information, etc. Can instead give keyword arguments to construct a header. **kwargs If no header is given, an attempt is made to construct one from these. For a standard header, this would include the following. --- Header keywords : (see :meth:`~baseband.gsb.GSBHeader.fromvalues`) time : `~astropy.time.Time` Start time of the file. header_mode : 'rawdump' or 'phased', optional Used to explicitly set the mode of the GSB stream. Default: 'rawdump' if only a single raw file is present, or 'phased' otherwise. seq_nr : int, optional Frame number, only used for phased (default: 0). Returns ------- Filehandle :class:`~baseband.gsb.base.GSBTimeStampIO` (timestamp), or :class:`~baseband.gsb.base.GSBFileReader` or :class:`~baseband.gsb.base.GSBFileWriter` (binary), or :class:`~baseband.gsb.base.GSBStreamReader` or :class:`~baseband.gsb.base.GSBStreamWriter` (stream) """) class GSBFileInfo(FileInfo): def get_file_info(self, name, **kwargs): info = self._get_info(name, 'rt') if self.is_ok(info): info.used_kwargs = {} if 'raw' in kwargs: info.missing.pop('raw') info.used_kwargs['raw'] = kwargs['raw'] return info def get_stream_info(self, name, file_info, **kwargs): used_kwargs = file_info.used_kwargs for key in ('sample_rate', 'payload_nbytes', 'samples_per_frame'): if key in kwargs: used_kwargs[key] = kwargs[key] stream_info = self._get_info(name, mode='rs', **used_kwargs) if self.is_ok(stream_info): stream_info.used_kwargs = used_kwargs return stream_info info = GSBFileInfo.create(globals())