Source code for baseband.gsb.file_info

# Licensed under the GPLv3 - see LICENSE
from astropy import units as u

from ..base.file_info import (info_item, InfoBase,
                              FileReaderInfo, StreamReaderInfo)


def file_size(fh):
    offset = fh.tell()
    try:
        return fh.seek(0, 2)
    finally:
        fh.seek(offset)


class GSBTimeStampInfo(InfoBase):
    """Standardized information on a timestamp file reader.

    The ``info`` descriptor has a number of standard attributes, which are
    determined from arguments passed in opening the file, from the first header
    (``info.header0``) and from possibly scanning the file to determine the
    duration of frames.
    """
    attr_names = ('format', 'mode', 'number_of_frames', 'frame_rate',
                  'start_time', 'readable', 'missing', 'errors', 'warnings')
    mode = info_item(needs='header0', doc=(
        "Mode in which data was taken: 'phased' or 'rawdump'."))
    # We do not subclass FileReaderInfo, since the logic is a bit different.
    # Still, these ones can be used directly.
    start_time = FileReaderInfo.start_time
    frame_rate = FileReaderInfo.frame_rate
    checks = FileReaderInfo.checks
    errors = FileReaderInfo.errors
    warnings = FileReaderInfo.warnings

    # We cannot know whether the stream is readable without the raw files.
    readable = None
    missing = info_item(default={
        'raw': 'need raw binary files for the stream reader'}, copy=True)

    @info_item
    def header0(self):
        with self._parent.temporary_offset(0) as fh:
            return fh.read_timestamp()

    @info_item(needs='header0')
    def format(self):
        return 'gsb'

    @info_item(needs='header0')
    def number_of_frames(self):
        with self._parent.temporary_offset() as fh:
            fh_size = fh.seek(0, 2)
            # Guess based on a fixed header size.  In reality, this
            # may be an overestimate as the headers can grow in size,
            # or an underestimate as the last header may be partial.
            # So, search around to be sure.
            guess = max(fh_size // self.header0.nbytes, 1)
            while self.header0.seek_offset(guess) > fh_size:
                guess -= 1
            while self.header0.seek_offset(guess) < fh_size:
                guess += 1

            # Now see if there is indeed a nice header before.
            fh.seek(self.header0.seek_offset(guess-1))
            line_tuple = fh.readline().split()
            # But realize that sometimes an incomplete header is written.
            if (len(" ".join(line_tuple))
                    < len(" ".join(self.header0.words))):
                self.warnings['number_of_frames'] = (
                    'last header is incomplete and is ignored')
                retry = True
            else:
                # Check last header is readable.
                try:
                    self.header0.__class__(line_tuple).time
                except Exception as exc:
                    self.warnings['number_of_frames'] = (
                        'last header failed to read ({}) and is ignored'
                        .format(str(exc)))
                    retry = True
                else:
                    retry = False
            if retry:
                guess -= 1
                fh.seek(self.header0.seek_offset(guess-1))
                self.header0.fromfile(fh).time

        return guess


class GSBStreamReaderInfo(StreamReaderInfo):
    attr_names = list(StreamReaderInfo.attr_names)
    attr_names.insert(attr_names.index('readable'), 'bandwidth')
    attr_names.insert(attr_names.index('readable'), 'n_raw')
    attr_names.insert(attr_names.index('readable'), 'payload_nbytes')
    attr_names = tuple(attr_names)

    payload_nbytes = info_item(needs='_parent', doc=(
        'Number of bytes per payload (in each raw file).'))

    @info_item
    def frame0(self):
        """First frame from the file."""
        return self._parent._read_frame(0)

    # Bit of a hack, but the base reader one suffices here with
    # the frame0 override above.
    decodable = FileReaderInfo.decodable

    @info_item
    def file_info(self):
        """Information from timestamp file."""
        fh_ts_info = self._parent.fh_ts.info
        fh_ts_info.missing.pop('raw', None)
        return fh_ts_info

    @info_item(needs='shape')
    def bandwidth(self):
        """Bandwidth covered by the stream."""
        return (self.sample_rate * self.shape[-1]
                / (1 if self.complex_data else 2)).to(u.MHz)

    @info_item
    def n_raw(self):
        """Number of raw streams (per polarization)."""
        fh_raw = self._parent.fh_raw
        return len(fh_raw[0]) if isinstance(fh_raw, (list, tuple)) else 1

    @info_item(needs=('file_info', 'payload_nbytes', 'n_raw'), default=False)
    def consistent(self):
        """Whether timestamp and raw files are consistent in length."""
        pl_nbytes = self.payload_nbytes
        nchan = self._parent._unsliced_shape[-1]
        expected_size = int(((self.stop_time-self.start_time)
                             * self.sample_rate * nchan
                             * self.bps * (2 if self.complex_data else 1)
                             // (8 * self.n_raw)).to(u.one).round())
        fh_raw = self._parent.fh_raw
        if self.file_info.mode == 'rawdump':
            fh_raw = [[fh_raw]]

        msg = ''
        try:
            for pair in fh_raw:
                for fh in pair:
                    fs = file_size(fh)
                    if fs % pl_nbytes != 0 and 'non-integer' not in msg:
                        msg += ('raw file contains non-integer number ({}) '
                                'of payloads.'.format(fs / pl_nbytes))

                    consistent = fs >= expected_size
                    if not consistent:
                        emsg = 'raw file size smaller than expected.'
                        ratio = fs / expected_size
                        if len(pair) == 1 and 0.5 <= ratio < 0.6:
                            emsg = (emsg[:-1] + ' by {} factor of two. '
                                    'Are you missing the second raw file?'
                                    .format('a' if ratio == 0.5
                                            else 'about a'))
                        raise EOFError(emsg)

                    if fs > expected_size and 'more bytes' not in msg:
                        msg += 'raw file contains more bytes than expected.'
        finally:
            if msg:
                self.warnings['consistent'] = msg

        # As a final sanity check, try reading the final sample of the file.
        old_offset = self._parent.tell()
        try:
            self._parent.seek(-1, 2)
            self._parent.read(1)
        finally:
            self._parent.seek(old_offset)

        return True

    @info_item(needs='frame0', default=False)
    def readable(self):
        """Whether the file is readable and decodable."""
        self.checks['decodable'] = self.decodable
        self.checks['consistent'] = self.consistent
        return all(bool(v) for v in self.checks.values())