Source code for baseband.gsb.header

# Licensed under the GPLv3 - see LICENSE
"""
Definitions for GSB Headers, using the timestamp files.

Somewhat out of data description for phased data:
http://gmrt.ncra.tifr.res.in/gmrt_hpage/sub_system/gmrt_gsb/GSB_beam_timestamp_note_v1.pdf
and for rawdump data
http://gmrt.ncra.tifr.res.in/gmrt_hpage/sub_system/gmrt_gsb/GSB_rawdump_data_format_v2.pdf
"""
import os

import numpy as np
from astropy import units as u
from astropy.time import Time, TimeString
from astropy.time.formats import erfa  # astropy-version independent.

from ..base.header import HeaderParserBase, ParsedHeaderBase, ParserDict


__all__ = ['TimeGSB', 'GSBHeader', 'GSBRawdumpHeader', 'GSBPhasedHeader']


[docs]class TimeGSB(TimeString): """GSB header date-time format ``YYYY MM DD HH MM SS 0.SSSSSSSSS``. For example, ``2000 01 01 00 00 00 0.000000000`` is midnight on January 1, 2000. """ # Implicitly uses the metaclass astropy.time.formats.TimeFormatMeta to # register with astropy.Time. name = 'gsb'
[docs] def set_jds(self, val1, val2): """Parse the time strings contained in val1 and set jd1, jd2""" iterator = np.nditer([val1, None, None, None, None, None, None], op_dtypes=([val1.dtype] + 5 * [np.intc] + [np.double])) try: for val, iy, im, id, ihr, imin, dsec in iterator: timestr = val.item() components = timestr.split() iy[...], im[...], id[...], ihr[...], imin[...], sec = ( int(component) for component in components[:-1]) dsec[...] = sec + float(components[-1]) except Exception: raise ValueError('Time {0} does not match {1} format' .format(timestr, self.name)) self.jd1, self.jd2 = erfa.dtf2d( self.scale.upper().encode('utf8'), *iterator.operands[1:])
[docs] def to_value(self, parent=None): scale = self.scale.upper().encode('ascii'), iys, ims, ids, ihmsfs = erfa.d2dtf(scale, self.precision, self.jd1, self.jd2) ihrs = ihmsfs['h'] imins = ihmsfs['m'] isecs = ihmsfs['s'] ifracs = ihmsfs['f'] fmt = ('{0:04d} {1:02d} {2:02d} {3:02d} {4:02d} {5:02d} ' '0.{6:0' + str(self.precision) + 'd}') outs = [] for iy, im, id, ihr, imin, isec, ifracsec in np.nditer( [iys, ims, ids, ihrs, imins, isecs, ifracs]): outs.append(fmt.format(int(iy), int(im), int(id), int(ihr), int(imin), int(isec), int(ifracsec))) return np.array(outs).reshape(self.jd1.shape)
value = property(to_value)
def make_parser(index, length, forward=None, backward=None, default=None): if length > 1: index = slice(index, index + length) def parser(items): return forward(items[index]) return parser def make_setter(index, length, forward, backward, default=None): def setter(items, value): value = backward(value) if length == 1: items[index] = value else: for i, val in enumerate(value): items[index + i] = val return items return setter def get_default(index, length, forward, backward, default=None): return default class GSBHeaderParser(HeaderParserBase): """Parser & setter for GSB timestamp keywords. A dictionary of header keywords, with values that describe how they are encoded in the GSB header. Initialisation is as a normal dict, with (ordered) key, value pairs, with each value a tuple containing: index : int Index into the header words for this key. length : int Number of words included in this key. forward : callable Function to decode the data. backward : callable Function to encode the value. default : object or None Possible default value. Notes ----- For GSB, this parsing technique is a bit of overkill, but it allows re-use of handy methods from the VLBI header parser. """ parsers = ParserDict(make_parser) setters = ParserDict(make_setter) defaults = ParserDict(get_default)
[docs]class GSBHeader(ParsedHeaderBase): """GSB Header, based on a line from a timestamp file. Parameters ---------- words : list of str, or None If `None`, set to a list of empty strings for later initialisation. mode : str or None, optional Mode in which data was taken: 'phased' or 'rawdump'. If `None`, it is determined from the words. nbytes : int or None, optional Number of characters in the header, including trailing blank spaces and carriage returns. If `None`, is determined from the words assuming one trailing blank space and one CR. verify : bool, optional Whether to do basic verification of integrity. Default: `True`. Returns ------- header : `GSBHeader` subclass As appropriate for the mode. """ _mode = None _gsb_header_classes = {} def __new__(cls, words=None, mode=None, nbytes=None, utc_offset=5.5*u.hr, verify=True): if cls is GSBHeader: if mode is None: if words is None: raise TypeError("cannot construct an empty GSB header " "without knowing the mode.") mode = 'rawdump' if len(words) == 7 else 'phased' cls = cls._gsb_header_classes.get(mode) # We intialise GSBHeader subclasses, so their __init__ will be called. return super().__new__(cls) def __init__(self, words, mode=None, nbytes=None, utc_offset=5.5*u.hr, verify=True): if words is None: words = [''] * self._number_of_words if mode is not None: self._mode = mode self._nbytes = nbytes self.utc_offset = utc_offset super().__init__(words, verify=verify)
[docs] def verify(self): assert self.mode == self.__class__._mode assert len(self.words) == self._number_of_words
@property def mode(self): """Mode in which data was taken: 'phased' or 'rawdump'.""" return self._mode @property def nbytes(self): """Size of the header in characters. Assumes the string terminates in one blank space and one carriage return. """ if self._nbytes is None: self._nbytes = len(' '.join(self.words) + os.linesep) return self._nbytes
[docs] @classmethod def fromfile(cls, fh, *args, **kwargs): """Read GSB Header from a line from a timestamp file. Arguments are the same as for class initialisation. The header constructed will be immutable. """ start_pos = fh.tell() s = fh.readline() if s == '': raise EOFError nbytes = fh.tell() - start_pos return cls(tuple(s.split()), mode=None, nbytes=nbytes, *args, **kwargs)
[docs] def tofile(self, fh): """Write GSB header as a line to the filehandle.""" return fh.write(' '.join(self.words) + '\n')
[docs] @classmethod def fromvalues(cls, mode=None, nbytes=None, *args, **kwargs): if mode is None and cls._mode is None: if set(kwargs.keys()) & {'pc', 'pc_time', 'seq_nr', 'mem_block'}: mode = 'phased' else: raise TypeError("cannot construct a GSB header from " "values without knowing the mode.") return super().fromvalues(mode, nbytes, *args, **kwargs)
[docs] @classmethod def fromkeys(cls, mode=None, nbytes=None, *args, **kwargs): if mode is None and cls._mode is None: if set(kwargs.keys()) & {'pc', 'seq_nr', 'mem_block'}: mode = 'phased' else: mode = 'rawdump' return super().fromkeys(mode, nbytes, *args, **kwargs)
[docs] def seek_offset(self, n, nbytes=None): """Offset in bytes needed to move a file pointer to another header. Some GSB headers have variable size and hence one cannot trivially jump to another entry in a timestamp file. This routine allows one to calculate the offset required to move the file pointer ``n`` headers. Parameters ---------- n : int The number of headers to move to, relative to the present header. nbytes : int, optional The size in bytes of the present header (if not given, will use the header's `nbytes` property). """ if nbytes is None: nbytes = self.nbytes return n * nbytes
[docs]class GSBRawdumpHeader(GSBHeader): """GSB rawdump header.""" _mode = 'rawdump' _number_of_words = 7 _gps_time_precision = 9 _properties = ('gps_time', 'time') _header_parser = GSBHeaderParser( (('gps', (0, 7, ' '.join, str.split)),)) @property def gps_time(self): return Time(self['gps'], format='gsb', precision=self._gps_time_precision) - self.utc_offset @gps_time.setter def gps_time(self, time): t = time + self.utc_offset t.precision = self._gps_time_precision self['gps'] = t.gsb time = gps_time
[docs]class GSBPhasedHeader(GSBRawdumpHeader): """GSB phased header.""" _mode = 'phased' _number_of_words = GSBRawdumpHeader._number_of_words + 7 + 2 _pc_time_precision = 6 _properties = ('time', 'pc_time') + GSBRawdumpHeader._properties _header_parser = GSBHeaderParser( (('pc', (0, 7, ' '.join, str.split)), ('gps', (7, 7, ' '.join, str.split)), ('seq_nr', (14, 1, int, str, 0)), ('mem_block', (15, 1, int, str, 0)))) @property def pc_time(self): return Time(self['pc'], format='gsb', precision=self._pc_time_precision) - self.utc_offset @pc_time.setter def pc_time(self, time): t = time + self.utc_offset t.precision = self._pc_time_precision self['pc'] = t.gsb @property def time(self): return self.gps_time @time.setter def time(self, time): self.gps_time = time self.pc_time = time
[docs] def seek_offset(self, n, nbytes=None): """Offset in bytes needed to move a file pointer to another header. GSB headers for phased data differ in size depending on the sequence number, making it impossible to trivially jump to another entry in a timestamp file. This routine allows one to calculate the offset required to move the file pointer ``n`` headers. Parameters ---------- n : int The number of headers to move to, relative to the present header. nbytes : int, optional The size in bytes of the present header (if not given, will use the header's `nbytes` property). """ if nbytes is None: nbytes = self.nbytes # Initial guess assuming all headers have same nbytes. guess = n * nbytes # Get number of digits of current sequence number. seq = self['seq_nr'] ndseq = len(str(seq)) # Find the sequence number we're trying to reach. seq_sub_targ = seq + n # And get number of digits for this value. ndtarg = len(str(seq_sub_targ)) # If numbers not the same, correct appropriately. while ndseq != ndtarg: if n > 0: next_power_of_ten = int('1' + ndseq * '0') guess += seq_sub_targ - next_power_of_ten ndseq += 1 else: next_power_of_ten = int('1' + (ndseq - 1) * '0') guess += next_power_of_ten - seq_sub_targ ndseq -= 1 return guess
GSBHeader._gsb_header_classes.update(rawdump=GSBRawdumpHeader, phased=GSBPhasedHeader)