Source code for baseband.guppi.header

# Licensed under the GPLv3 - see LICENSE
Definitions for GUPPI headers.

Implements a GUPPIHeader class that reads & writes FITS-like headers from file.
import operator

import numpy as np
import astropy.units as u
from import fits
from astropy.time import Time

__all__ = ['GUPPIHeader']

[docs]class GUPPIHeader(fits.Header): """GUPPI baseband file format header. Parameters ---------- *args : str or iterable If a string, parsed as a GUPPI header from a file, otherwise as for the `` baseclass. verify : bool, optional Whether to do minimal verification that the header is consistent with the GUPPI standard. Default: `True`. mutable : bool, optional Whether to allow the header to be changed after initialisation. Default: `True`. **kwargs Any further header keywords to be set. Notes ----- Like ``, the initialiser does not accept keyword arguments to populate an array - instead, one must pass an iterable. In order to ensure keywords are kept in the right order, one should pass on values as a tuple, not as a dict. E.g., to copy a header, one should not do ``GUPPIHeader({key: header[key] for key in header})``, but rather:: GUPPIHeader(((key, header[key]) for key in header)) or, to also keep the comments:: GUPPIHeader(((key, (header[key], header.comments[key])) for key in header)) """ _properties = ('payload_nbytes', 'frame_nbytes', 'bps', 'nchan', 'npol', 'sample_shape', 'sample_rate', 'sideband', 'overlap', 'samples_per_frame', 'offset', 'start_time', 'time') """Properties accessible/usable in initialisation for all headers.""" _defaults = [('BACKEND', 'GUPPI'), ('BLOCSIZE', 0), ('STT_OFFS', 0), ('PKTIDX', 0), ('OVERLAP', 0), ('SRC_NAME', 'unset'), ('TELESCOP', 'unset'), ('PKTFMT', '1SFA'), ('PKTSIZE', 8192), ('NBITS', 8), ('NPOL', 1), ('OBSNCHAN', 1)] def __init__(self, *args, verify=True, mutable=True, **kwargs): # Comments handled by fits.Header__init__(). super().__init__(*args, **kwargs) self.mutable = mutable # Empty header always OK, since things will be added to it. if len(self) and verify: self.verify()
[docs] def verify(self): """Basic check of integrity.""" assert all(key in self for key in ('BLOCSIZE', 'PKTIDX')) # '1SFA' is used for all modes other than FAST4K (which is only used # for total intensity). 'SIMPLE' is from DSPSR, and used to support # time-first payloads. See # assert self['PKTFMT'] in ('1SFA', 'SIMPLE')
[docs] def copy(self): """Create a mutable and independent copy of the header.""" # This method exists because io.fits.Header.copy has a docstring that # refers to `Header` which breaks sphinx... return super().copy()
[docs] @classmethod def fromfile(cls, fh, verify=True): """Reads in GUPPI header block from a file. Parameters ---------- fh : filehandle To read data from. verify: bool, optional Whether to do basic checks on whether the header is valid. Verify is automatically called by ``, so this flag exists only to standardize the API. """ header_start = fh.tell() # Find the size of the header. GUPPI header entries are 80 char long # with <=8 char keyword names. "=" is always the 9th char. line = '=========' while line[8] in ('=', ' '): line ='ascii') if line[:3] == 'END': break if line == '': raise EOFError header_end = fh.tell() hdr = - header_start).decode('ascii') # Create the header using the base class. self = cls.fromstring(hdr) self.mutable = False if verify: self.verify() # GUPPI headers are not a proper FITS standard, and we're reading # from a file that the user likely cannot control, so let's not bother # with card verification (this avoids warnings in repr(); gh-282) for c in c._verified = True return self
[docs] def tofile(self, fh): """Write GUPPI file header to filehandle. Uses ``. """ fh.write(self.tostring(padding=False).encode('ascii'))
[docs] @classmethod def fromkeys(cls, *args, verify=True, mutable=True, **kwargs): """Initialise a header from keyword values. Like fromvalues, but without any interpretation of keywords. Note that this just passes kwargs to the class initializer as a dict (for compatibility with fits.Header). It is present for compatibility with other header classes only. """ return cls(kwargs, *args, verify=verify, mutable=mutable)
[docs] @classmethod def fromvalues(cls, **kwargs): """Initialise a header from parsed values. Here, the parsed values must be given as keyword arguments, i.e., for any ``header``, ``cls.fromvalues(**header) == header``. However, unlike for the ``fromkeys`` class method, data can also be set using arguments named after header methods, such as ``time``. Furthermore, some header defaults are set in ``GUPPIHeader._defaults``. """ self = cls(cls._defaults, verify=False) self.update(**kwargs) return self
[docs] def update(self, *, verify=True, **kwargs): """Update the header with new values. Here, any keywords matching properties are processed as well, in the order set by the class (in ``_properties``), and after all other keywords have been processed. Parameters ---------- verify : bool, optional If `True` (default), verify integrity after updating. **kwargs Arguments used to set keywords and properties. """ # Remove kwargs that set properties, in correct order. extras = [(key, kwargs.pop(key)) for key in self._properties if key in kwargs] # Update the normal keywords. super().update(kwargs) # Now set the properties. for attr, value in extras: setattr(self, attr, value) if verify: self.verify()
def __setitem__(self, key, value): if not self.mutable: raise TypeError("immutable {0} does not support assignment." .format(type(self).__name__)) super().__setitem__(key.upper(), value) @property def nbytes(self): """Size of the header in bytes.""" return (len(self) + 1) * 80 @property def payload_nbytes(self): """Size of the payload in bytes.""" return self['BLOCSIZE'] @payload_nbytes.setter def payload_nbytes(self, payloadsize): self['BLOCSIZE'] = payloadsize @property def frame_nbytes(self): """Size of the frame in bytes.""" return self.nbytes + self.payload_nbytes @frame_nbytes.setter def frame_nbytes(self, frame_nbytes): self.payload_nbytes = frame_nbytes - self.nbytes @property def bps(self): """Bits per elementary sample.""" return self['NBITS'] @bps.setter def bps(self, bps): self['NBITS'] = bps @property def complex_data(self): """Whether the data are complex.""" return self['OBSNCHAN'] != 1 @property def npol(self): """Number of polarisations.""" return self['NPOL'] // (2 if self.complex_data else 1) @npol.setter def npol(self, npol): self['NPOL'] = npol * (2 if self.complex_data else 1) @property def nchan(self): """Number of channels.""" return self['OBSNCHAN'] @nchan.setter def nchan(self, nchan): self['OBSNCHAN'] = operator.index(nchan) @property def sample_shape(self): """Shape of a sample in the payload (npol, nchan).""" return self.npol, self.nchan @sample_shape.setter def sample_shape(self, sample_shape): # Need to set nchan first to properly set npol, since nchan is # connected to complex_data. self.nchan = sample_shape[1] self.npol = sample_shape[0] @property def _bpcs(self): """Bits per complete sample.""" # NPOL includes factor of 2 for real/complex components. return self['OBSNCHAN'] * self['NPOL'] * self.bps @property def sample_rate(self): """Number of complete samples per second. Can be set with a negative quantity to set `sideband`. Overlap samples are not included in the rate. """ return (1. / self['TBIN']) * u.Hz @sample_rate.setter def sample_rate(self, sample_rate): self['TBIN'] = 1. / abs(sample_rate.to_value(u.Hz)) bw = (sample_rate.to_value(u.MHz) * self['OBSNCHAN'] / (1 if self.complex_data else 2)) self['OBSBW'] = bw @property def sideband(self): """True if upper sideband.""" return self['OBSBW'] > 0 @sideband.setter def sideband(self, sideband): self['OBSBW'] = (1 if sideband else -1) * abs(self['OBSBW']) @property def channels_first(self): """True if encoded payload ordering is (nchan, nsample, npol).""" # Called ``time-ordered`` in DSPSR. return self['PKTFMT'] != 'SIMPLE' @channels_first.setter def channels_first(self, channels_first): self['PKTFMT'] = '1SFA' if bool(channels_first) else 'SIMPLE' @property def samples_per_frame(self): """Number of complete samples in the frame, including overlap.""" return self.payload_nbytes * 8 // self._bpcs @samples_per_frame.setter def samples_per_frame(self, samples_per_frame): old_payload_nbytes = self.payload_nbytes self.payload_nbytes = (samples_per_frame * self._bpcs + 7) // 8 if self.samples_per_frame != samples_per_frame: exc = ValueError("header cannot store {} samples per frame. " "Nearest is {}.".format(samples_per_frame, self.samples_per_frame)) self.payload_nbytes = old_payload_nbytes raise exc @property def overlap(self): """Number of complete samples that overlap with the next frame.""" return self['OVERLAP'] @overlap.setter def overlap(self, overlap): self['OVERLAP'] = operator.index(overlap) @property def offset(self): """Offset from start of observation in units of time.""" # PKTIDX only counts valid packets, not overlap ones. return self['STT_OFFS'] + ((self['PKTIDX'] * self['PKTSIZE'] * 8 // self._bpcs) * self['TBIN'] * u.s) @offset.setter def offset(self, offset): self['PKTIDX'] = int((offset / (self['TBIN'] * u.s) / self['PKTSIZE'] * ((self._bpcs + 7) // 8)).to( @property def start_time(self): """Start time of the observation.""" return (Time(self['STT_IMJD'], scale='utc', format='mjd') + self['STT_SMJD'] * u.s) @start_time.setter def start_time(self, start_time): start_time = Time(start_time, scale='utc', format='isot', precision=9) self['STT_IMJD'] = int(start_time.mjd) self['STT_SMJD'] = int(np.around( (start_time - Time(self['STT_IMJD'], format='mjd', scale=start_time.scale)).sec)) @property def time(self): """Start time of the part of the observation covered by this header.""" return self.start_time + self.offset @time.setter def time(self, time): """Set header time. If ``start_time`` is not already set, this sets it using the time and ``offset``. Otherwise, this sets ``offset`` using the time and ``start_time``. Parameters ---------- time : `~astropy.time.Time` Time for the first sample associated with this header. """ if 'STT_IMJD' not in self.keys(): self.start_time = time - self.offset else: self.offset = time - self.start_time def _ipython_key_completions_(self): # Enables tab-completion of header keys in IPython. return self.keys() def __eq__(self, other): """Whether headers have the same keys with the same values.""" return all(self.get(k, None) == other.get(k, None) for k in (set(self.keys()) | set(other.keys()))) def __repr__(self): name = self.__class__.__name__ vals = super().__repr__() return('<{0} {1}>'.format(name, ("\n " + len(name) * " ").join( [v.rstrip() for v in vals.split('\n')])))