Source code for baseband.mark5b.header

# Licensed under the GPLv3 - see LICENSE
"""
Definitions for VLBI Mark5B Headers.

Implements a Mark5BHeader class used to store header words, and decode/encode
the information therein.

For the specification, see
https://www.haystack.mit.edu/tech/vlbi/mark5/docs/Mark%205B%20users%20manual.pdf
"""
import numpy as np
import astropy.units as u
from astropy.time import Time

from ..base.header import HeaderParser, VLBIHeaderBase, four_word_struct
from ..base.utils import bcd_decode, bcd_encode, fixedvalue, CRC


__all__ = ['CRC16', 'crc16', 'Mark5BHeader']

CRC16 = 0x18005
"""CRC polynomial used for Mark 5B Headers, as a check on the time code.

x^16 + x^15 + x^2 + 1, i.e., 0x18005.
See page 11 of https://www.haystack.mit.edu/tech/vlbi/mark5/docs/230.3.pdf
(defined there for VLBA headers).

This is also CRC-16-IBM mentioned in
https://en.wikipedia.org/wiki/Cyclic_redundancy_check
"""
crc16 = CRC(CRC16)


[docs]class Mark5BHeader(VLBIHeaderBase): """Decoder/encoder of a Mark5B Frame Header. See page 15 of https://www.haystack.mit.edu/tech/vlbi/mark5/docs/Mark%205B%20users%20manual.pdf Parameters ---------- words : tuple of int, or None Four 32-bit unsigned int header words. If `None`, set to a tuple of zeros for later initialisation. kday : int or None Explicit thousands of MJD of the observation time (needed to remove ambiguity in the Mark 5B time stamp). Can instead pass an approximate ``ref_time``. ref_time : `~astropy.time.Time` or None Reference time within 500 days of the observation time, used to infer the full MJD. Used only if ``kday`` is not given. verify : bool, optional Whether to do basic verification of integrity. Default: `True`. Returns ------- header : `Mark5BHeader` """ _header_parser = HeaderParser( (('sync_pattern', (0, 0, 32, 0xABADDEED)), ('user', (1, 16, 16)), ('internal_tvg', (1, 15, 1)), ('frame_nr', (1, 0, 15)), ('bcd_jday', (2, 20, 12)), ('bcd_seconds', (2, 0, 20)), ('bcd_fraction', (3, 16, 16)), ('crc', (3, 0, 16)))) _sync_pattern = _header_parser.defaults['sync_pattern'] _invariants = {'sync_pattern'} """Keys of invariant parts in all Mark 5B headers.""" _stream_invariants = _invariants | {'user'} """Keys of invariant parts in a given Mark 5B stream.""" _struct = four_word_struct _properties = ('payload_nbytes', 'frame_nbytes', 'complex_data', 'kday', 'jday', 'seconds', 'fraction', 'time') """Properties accessible/usable in initialisation.""" kday = None """Thousands of MJD, to complement ``jday`` from header.""" def __init__(self, words, kday=None, ref_time=None, verify=True): if kday is not None: self.kday = kday super().__init__(words, verify=verify) if kday is None and ref_time is not None: self.infer_kday(ref_time)
[docs] def verify(self): """Verify header integrity.""" assert len(self.words) == 4 assert self['sync_pattern'] == self._sync_pattern assert self.kday is None or (33000 < self.kday < 400000) if self.kday is not None: assert self.kday % 1000 == 0, "kday must be thousands of MJD."
[docs] def copy(self, **kwargs): return super().copy(kday=self.kday, **kwargs)
[docs] @classmethod def fromvalues(cls, *, verify=True, **kwargs): """Initialise a header from parsed values. Here, the parsed values must be given as keyword arguments, i.e., for any ``header = cls(<data>)``, ``cls.fromvalues(**header) == header``. However, unlike for the :meth:`Mark5BHeader.fromkeys` class method, data can also be set using arguments named after methods, such as ``jday`` and ``seconds``. Given defaults: sync_pattern : 0xABADDEED Values set by other keyword arguments (if present): bcd_jday : from ``jday`` or ``time`` bcd_seconds : from ``seconds`` or ``time`` bcd_fraction : from ``fraction`` or ``time`` (may need ``frame_rate``) frame_nr : from ``time`` (may need ``frame_rate``) """ # This method exists only to be able to override the docstring. return super().fromvalues(verify=verify, **kwargs)
[docs] def update(self, *, time=None, frame_rate=None, crc=None, verify=True, **kwargs): """Update the header by setting keywords or properties. Here, any keywords matching header keys are applied first, and any remaining ones are used to set header properties, in the order set by the class (in ``_properties``). Parameters ---------- time : `~astropy.time.Time`, optional A possible new time. This is updated last. frame_rate : `~astropy.units.Quantity`, optional The frame rate to use in calculating the frame number from the time. Needed for times at non-integer seconds. crc : int or None, optional If `None` (default), recalculate the CRC after updating. verify : bool, optional If `True` (default), verify integrity after updating. **kwargs Arguments used to set keywords and properties. """ super().update(verify=False, **kwargs) if time is not None: self.set_time(time, frame_rate=frame_rate) if crc is None: # Do not use words 2 & 3 directly, so that this works also if part # of a VDIF header, where the time information is in words 7 & 8. stream = ((((self['bcd_jday'] << 20) + self['bcd_seconds']) << 16) + self['bcd_fraction']) crc = crc16(stream) self['crc'] = crc if verify: self.verify()
[docs] def infer_kday(self, ref_time): """Uses a reference time to set a header's ``kday``. Parameters ---------- ref_time : `~astropy.time.Time` Reference time within 500 days of the observation time. """ self.kday = np.around(ref_time.mjd - self.jday, decimals=-3).astype(int)
@fixedvalue def payload_nbytes(cls): """Size of the payload in bytes (always 10000 for Mark5B).""" return 10000 # 2500 words @fixedvalue def frame_nbytes(cls): """Size of the frame in bytes (always 10016 for Mark5B).""" return cls.nbytes + cls.payload_nbytes @fixedvalue def complex_data(cls): """Whether the data are complex (always False for Mark5B).""" return False @property def jday(self): """Last three digits of MJD (decoded from 'bcd_jday').""" return bcd_decode(self['bcd_jday']) @jday.setter def jday(self, jday): self['bcd_jday'] = bcd_encode(jday) @property def seconds(self): """Integer seconds on day (decoded from 'bcd_seconds').""" return bcd_decode(self['bcd_seconds']) @seconds.setter def seconds(self, seconds): self['bcd_seconds'] = bcd_encode(seconds) @property def fraction(self): """Fractional seconds (decoded from 'bcd_fraction'). The fraction is stored to 0.1 ms accuracy. Following mark5access, this is "unrounded" to give the exact time of the start of the frame for any total bit rate below 512 Mbps. For rates above this value, it is no longer guaranteed that subsequent frames have unique rates. Note to the above: since a Mark5B frame contains 80000 bits, the total bit rate for which times can be unique would in principle be 800 Mbps. However, standard VLBI only uses bit rates that are powers of 2 in MHz. """ ns = bcd_decode(self['bcd_fraction']) * 100000 # "Unround" the nanoseconds, and turn into fractional seconds. return (156250 * ((ns + 156249) // 156250)) / 1e9 @fraction.setter def fraction(self, fraction): ns = np.around(fraction * 1.e9) # From inspecting sample files, the fraction appears to be truncated, # not rounded. fraction = (ns / 100000).astype(int) self['bcd_fraction'] = bcd_encode(fraction)
[docs] def get_time(self, frame_rate=None): """Convert year, BCD time code to Time object. Calculate time using `jday`, `seconds`, and `fraction` properties (which reflect the bcd-encoded 'bcd_jday', 'bcd_seconds' and 'bcd_fraction' header items), plus `kday` from the initialisation. See https://www.haystack.mit.edu/tech/vlbi/mark5/docs/Mark%205B%20users%20manual.pdf Note that some non-compliant files do not have 'bcd_fraction' set. For those, the time can still be calculated using the header's 'frame_nr' by passing in a frame rate. Furthermore, fractional seconds are stored only to 0.1 ms accuracy. In the code, this is "unrounded" to give the exact time of the start of the frame for any total bit rate below 512 Mbps. For higher rates, it is no longer guaranteed that subsequent frames have unique `fraction`, and one should pass in an explicit frame rate instead. Parameters ---------- frame_rate : `~astropy.units.Quantity`, optional Used to calculate the fractional second from the frame number instead of from the header's `fraction`. Returns ------- `~astropy.time.Time` """ frame_nr = self['frame_nr'] if frame_nr == 0: fraction = 0. elif frame_rate is None: fraction = self.fraction if fraction == 0.: raise ValueError('header does not provide correct fractional ' 'second (it is zero for non-zero frame ' 'number). Please pass in a frame_rate.') else: fraction = (frame_nr / frame_rate).to_value(u.s) return Time(self.kday + self.jday, (self.seconds + fraction) / 86400, format='mjd', scale='utc', precision=9)
[docs] def set_time(self, time, frame_rate=None): """ Convert Time object to BCD timestamp elements and 'frame_nr'. For non-integer seconds, the frame number will be calculated if not given explicitly. Doing so requires the frame rate. Parameters ---------- time : `~astropy.time.Time` The time to use for this header. frame_rate : `~astropy.units.Quantity`, optional For calculating 'frame_nr' from the fractional seconds. """ self.kday = int(time.mjd // 1000) * 1000 self.jday = int(time.mjd - self.kday) seconds = time - Time(self.kday + self.jday, format='mjd') int_sec = int(seconds.sec) fraction = seconds - int_sec * u.s # Round to nearest ns to handle timestamp difference errors. if abs(fraction) < 1. * u.ns: frame_nr = 0 frac_sec = 0. elif abs(1. * u.s - fraction) < 1. * u.ns: int_sec += 1 frame_nr = 0 frac_sec = 0. else: if frame_rate is None: raise ValueError("cannot calculate frame rate. Pass it " "in explicitly.") frame_nr = int((fraction * frame_rate).to(u.one).round()) fraction = frame_nr / frame_rate if abs(fraction - 1. * u.s) < 1. * u.ns: int_sec += 1 frame_nr = 0 frac_sec = 0. else: frac_sec = fraction.to(u.s).value self.seconds = int_sec self.fraction = frac_sec self['frame_nr'] = frame_nr
time = property(get_time, set_time)