Source code for baseband.guppi.header
# Licensed under the GPLv3 - see LICENSE
"""
Definitions for GUPPI headers.
Implements a GUPPIHeader class that reads & writes FITS-like headers from file.
"""
import operator
import numpy as np
import astropy.units as u
from astropy.io import fits
from astropy.time import Time
__all__ = ['GUPPIHeader']
[docs]class GUPPIHeader(fits.Header):
"""GUPPI baseband file format header.
Parameters
----------
*args : str or iterable
If a string, parsed as a GUPPI header from a file, otherwise
as for the `astropy.io.fits.Header` baseclass.
verify : bool, optional
Whether to do minimal verification that the header is consistent with
the GUPPI standard. Default: `True`.
mutable : bool, optional
Whether to allow the header to be changed after initialisation.
Default: `True`.
**kwargs
Any further header keywords to be set.
Notes
-----
Like `~astropy.io.fits.Header`, the initialiser does not accept keyword
arguments to populate an array - instead, one must pass an iterable. In
order to ensure keywords are kept in the right order, one should pass on
values as a tuple, not as a dict. E.g., to copy a header, one should not
do ``GUPPIHeader({key: header[key] for key in header})``, but rather::
GUPPIHeader(((key, header[key]) for key in header))
or, to also keep the comments::
GUPPIHeader(((key, (header[key], header.comments[key]))
for key in header))
"""
_properties = ('payload_nbytes', 'frame_nbytes', 'bps', 'nchan', 'npol',
'sample_shape', 'sample_rate', 'sideband', 'overlap',
'samples_per_frame', 'offset', 'start_time',
'time')
"""Properties accessible/usable in initialisation for all headers."""
_defaults = [('BACKEND', 'GUPPI'),
('BLOCSIZE', 0),
('STT_OFFS', 0),
('PKTIDX', 0),
('OVERLAP', 0),
('SRC_NAME', 'unset'),
('TELESCOP', 'unset'),
('PKTFMT', '1SFA'),
('PKTSIZE', 8192),
('NBITS', 8),
('NPOL', 1),
('OBSNCHAN', 1)]
def __init__(self, *args, verify=True, mutable=True, **kwargs):
# Comments handled by fits.Header__init__().
super().__init__(*args, **kwargs)
self.mutable = mutable
# Empty header always OK, since things will be added to it.
if len(self) and verify:
self.verify()
[docs] def verify(self):
"""Basic check of integrity."""
assert all(key in self for key in ('BLOCSIZE',
'PKTIDX'))
# '1SFA' is used for all modes other than FAST4K (which is only used
# for total intensity). 'SIMPLE' is from DSPSR, and used to support
# time-first payloads. See
# https://safe.nrao.edu/wiki/pub/Main/JoeBrandt/guppi_status_shmem.pdf
assert self['PKTFMT'] in ('1SFA', 'SIMPLE')
[docs] def copy(self):
"""Create a mutable and independent copy of the header."""
# This method exists because io.fits.Header.copy has a docstring that
# refers to `Header` which breaks sphinx...
return super().copy()
[docs] @classmethod
def fromfile(cls, fh, verify=True):
"""Reads in GUPPI header block from a file.
Parameters
----------
fh : filehandle
To read data from.
verify: bool, optional
Whether to do basic checks on whether the header is valid. Verify
is automatically called by `~astropy.io.fits.Header.fromstring`, so
this flag exists only to standardize the API.
"""
header_start = fh.tell()
# Find the size of the header. GUPPI header entries are 80 char long
# with <=8 char keyword names. "=" is always the 9th char.
line = '========='
while line[8] in ('=', ' '):
line = fh.read(80).decode('ascii')
if line[:3] == 'END':
break
if line == '':
raise EOFError
header_end = fh.tell()
fh.seek(header_start)
hdr = fh.read(header_end - header_start).decode('ascii')
# Create the header using the base class.
self = cls.fromstring(hdr)
self.mutable = False
if verify:
self.verify()
# GUPPI headers are not a proper FITS standard, and we're reading
# from a file that the user likely cannot control, so let's not bother
# with card verification (this avoids warnings in repr(); gh-282)
for c in self.cards:
c._verified = True
return self
[docs] def tofile(self, fh):
"""Write GUPPI file header to filehandle.
Uses `~astropy.io.fits.Header.tostring`.
"""
fh.write(self.tostring(padding=False).encode('ascii'))
[docs] @classmethod
def fromkeys(cls, *args, verify=True, mutable=True, **kwargs):
"""Initialise a header from keyword values.
Like fromvalues, but without any interpretation of keywords.
Note that this just passes kwargs to the class initializer as a dict
(for compatibility with fits.Header). It is present for compatibility
with other header classes only.
"""
return cls(kwargs, *args, verify=verify, mutable=mutable)
[docs] @classmethod
def fromvalues(cls, **kwargs):
"""Initialise a header from parsed values.
Here, the parsed values must be given as keyword arguments, i.e., for
any ``header``, ``cls.fromvalues(**header) == header``.
However, unlike for the ``fromkeys`` class method, data can also be set
using arguments named after header methods, such as ``time``.
Furthermore, some header defaults are set in ``GUPPIHeader._defaults``.
"""
self = cls(cls._defaults, verify=False)
self.update(**kwargs)
return self
[docs] def update(self, *, verify=True, **kwargs):
"""Update the header with new values.
Here, any keywords matching properties are processed as well, in the
order set by the class (in ``_properties``), and after all other
keywords have been processed.
Parameters
----------
verify : bool, optional
If `True` (default), verify integrity after updating.
**kwargs
Arguments used to set keywords and properties.
"""
# Remove kwargs that set properties, in correct order.
extras = [(key, kwargs.pop(key)) for key in self._properties
if key in kwargs]
# Update the normal keywords.
super().update(kwargs)
# Now set the properties.
for attr, value in extras:
setattr(self, attr, value)
if verify:
self.verify()
def __setitem__(self, key, value):
if not self.mutable:
raise TypeError("immutable {0} does not support assignment."
.format(type(self).__name__))
super().__setitem__(key.upper(), value)
@property
def nbytes(self):
"""Size of the header in bytes."""
return (len(self) + 1) * 80
@property
def payload_nbytes(self):
"""Size of the payload in bytes."""
return self['BLOCSIZE']
@payload_nbytes.setter
def payload_nbytes(self, payloadsize):
self['BLOCSIZE'] = payloadsize
@property
def frame_nbytes(self):
"""Size of the frame in bytes."""
return self.nbytes + self.payload_nbytes
@frame_nbytes.setter
def frame_nbytes(self, frame_nbytes):
self.payload_nbytes = frame_nbytes - self.nbytes
@property
def bps(self):
"""Bits per elementary sample."""
return self['NBITS']
@bps.setter
def bps(self, bps):
self['NBITS'] = bps
@property
def complex_data(self):
"""Whether the data are complex."""
return self['OBSNCHAN'] != 1
@property
def npol(self):
"""Number of polarisations."""
return self['NPOL'] // (2 if self.complex_data else 1)
@npol.setter
def npol(self, npol):
self['NPOL'] = npol * (2 if self.complex_data else 1)
@property
def nchan(self):
"""Number of channels."""
return self['OBSNCHAN']
@nchan.setter
def nchan(self, nchan):
self['OBSNCHAN'] = operator.index(nchan)
@property
def sample_shape(self):
"""Shape of a sample in the payload (npol, nchan)."""
return self.npol, self.nchan
@sample_shape.setter
def sample_shape(self, sample_shape):
# Need to set nchan first to properly set npol, since nchan is
# connected to complex_data.
self.nchan = sample_shape[1]
self.npol = sample_shape[0]
@property
def _bpcs(self):
"""Bits per complete sample."""
# NPOL includes factor of 2 for real/complex components.
return self['OBSNCHAN'] * self['NPOL'] * self.bps
@property
def sample_rate(self):
"""Number of complete samples per second.
Can be set with a negative quantity to set `sideband`. Overlap samples
are not included in the rate.
"""
return (1. / self['TBIN']) * u.Hz
@sample_rate.setter
def sample_rate(self, sample_rate):
self['TBIN'] = 1. / abs(sample_rate.to_value(u.Hz))
bw = (sample_rate.to_value(u.MHz) * self['OBSNCHAN']
/ (1 if self.complex_data else 2))
self['OBSBW'] = bw
@property
def sideband(self):
"""True if upper sideband."""
return self['OBSBW'] > 0
@sideband.setter
def sideband(self, sideband):
self['OBSBW'] = (1 if sideband else -1) * abs(self['OBSBW'])
@property
def channels_first(self):
"""True if encoded payload ordering is (nchan, nsample, npol)."""
# Called ``time-ordered`` in DSPSR.
return self['PKTFMT'] != 'SIMPLE'
@channels_first.setter
def channels_first(self, channels_first):
self['PKTFMT'] = '1SFA' if bool(channels_first) else 'SIMPLE'
@property
def samples_per_frame(self):
"""Number of complete samples in the frame, including overlap."""
return self.payload_nbytes * 8 // self._bpcs
@samples_per_frame.setter
def samples_per_frame(self, samples_per_frame):
old_payload_nbytes = self.payload_nbytes
self.payload_nbytes = (samples_per_frame * self._bpcs + 7) // 8
if self.samples_per_frame != samples_per_frame:
exc = ValueError("header cannot store {} samples per frame. "
"Nearest is {}.".format(samples_per_frame,
self.samples_per_frame))
self.payload_nbytes = old_payload_nbytes
raise exc
@property
def overlap(self):
"""Number of complete samples that overlap with the next frame."""
return self['OVERLAP']
@overlap.setter
def overlap(self, overlap):
self['OVERLAP'] = operator.index(overlap)
@property
def offset(self):
"""Offset from start of observation in units of time."""
# PKTIDX only counts valid packets, not overlap ones.
return self['STT_OFFS'] + ((self['PKTIDX'] * self['PKTSIZE'] * 8
// self._bpcs) * self['TBIN'] * u.s)
@offset.setter
def offset(self, offset):
self['PKTIDX'] = int((offset / (self['TBIN'] * u.s) / self['PKTSIZE']
* ((self._bpcs + 7) // 8)).to(u.one).round())
@property
def start_time(self):
"""Start time of the observation."""
return (Time(self['STT_IMJD'], scale='utc', format='mjd')
+ self['STT_SMJD'] * u.s)
@start_time.setter
def start_time(self, start_time):
start_time = Time(start_time, scale='utc', format='isot', precision=9)
self['STT_IMJD'] = int(start_time.mjd)
self['STT_SMJD'] = int(np.around(
(start_time - Time(self['STT_IMJD'], format='mjd',
scale=start_time.scale)).sec))
@property
def time(self):
"""Start time of the part of the observation covered by this header."""
return self.start_time + self.offset
@time.setter
def time(self, time):
"""Set header time.
If ``start_time`` is not already set, this sets it using the time and
``offset``. Otherwise, this sets ``offset`` using the time and
``start_time``.
Parameters
----------
time : `~astropy.time.Time`
Time for the first sample associated with this header.
"""
if 'STT_IMJD' not in self.keys():
self.start_time = time - self.offset
else:
self.offset = time - self.start_time
def _ipython_key_completions_(self):
# Enables tab-completion of header keys in IPython.
return self.keys()
def __eq__(self, other):
"""Whether headers have the same keys with the same values."""
return all(self.get(k, None) == other.get(k, None)
for k in (set(self.keys()) | set(other.keys())))
def __repr__(self):
name = self.__class__.__name__
vals = super().__repr__()
return('<{0} {1}>'.format(name, ("\n " + len(name) * " ").join(
[v.rstrip() for v in vals.split('\n')])))