Source code for baseband.guppi.header
# Licensed under the GPLv3 - see LICENSE
"""
Definitions for GUPPI headers.
Implements a GUPPIHeader class that reads & writes FITS-like headers from file.
"""
import operator
import astropy.units as u
from astropy.io import fits
from astropy.time import Time, TimeDelta
__all__ = ['GUPPIHeader']
[docs]class GUPPIHeader(fits.Header):
"""GUPPI baseband file format header.
Parameters
----------
*args : str or iterable
If a string, parsed as a GUPPI header from a file, otherwise
as for the `astropy.io.fits.Header` baseclass.
verify : bool, optional
Whether to do minimal verification that the header is consistent with
the GUPPI standard. Default: `True`.
mutable : bool, optional
Whether to allow the header to be changed after initialisation.
Default: `True`.
**kwargs
Any further header keywords to be set.
Notes
-----
Like `~astropy.io.fits.Header`, the initialiser does not accept keyword
arguments to populate an array - instead, one must pass an iterable. In
order to ensure keywords are kept in the right order, one should pass on
values as a tuple, not as a dict. E.g., to copy a header, one should not
do ``GUPPIHeader({key: header[key] for key in header})``, but rather::
GUPPIHeader(((key, header[key]) for key in header))
or, to also keep the comments::
GUPPIHeader(((key, (header[key], header.comments[key]))
for key in header))
"""
_properties = ('payload_nbytes', 'frame_nbytes', 'bps', 'nchan', 'npol',
'sample_shape', 'sample_rate', 'sideband', 'overlap',
'samples_per_frame', 'offset', 'start_time',
'time')
"""Properties accessible/usable in initialisation for all headers."""
_defaults = [('BACKEND', 'GUPPI'),
('BLOCSIZE', 0),
('STT_OFFS', 0),
('PKTIDX', 0),
('OVERLAP', 0),
('SRC_NAME', 'unset'),
('TELESCOP', 'unset'),
('PKTFMT', '1SFA'),
('PKTSIZE', 8192),
('NBITS', 8),
('NPOL', 1),
('OBSNCHAN', 1)]
supported_formats = {'1SFA', 'SIMPLE'}
"""GUPPI formats that are known to work.
'1SFA' is used for all modes other than FAST4K (which is only used
for total intensity). 'SIMPLE' is from DSPSR, and used to support
time-first payloads. See
https://safe.nrao.edu/wiki/pub/Main/JoeBrandt/guppi_status_shmem.pdf
If a format is not in this set, yet is known to work, a PR would be
most welcome.
"""
def __init__(self, *args, verify=True, mutable=True, **kwargs):
# Comments handled by fits.Header__init__().
super().__init__(*args, **kwargs)
self.mutable = mutable
# Empty header always OK, since things will be added to it.
if len(self) and verify:
self.verify()
[docs] def verify(self):
"""Basic check of integrity."""
# Same check as dspsr's dsp::GUPPIFile::is_valid
assert all(key in self for key in ('BLOCSIZE',
'PKTIDX'))
# We could check here for self['PKTFMT'] in self.supported_formats
# but that would break reading of unsupported but working formats,
# so instead this becomes just a warning in file_info.
[docs] def copy(self):
"""Create a mutable and independent copy of the header."""
# This method exists because io.fits.Header.copy has a docstring that
# refers to `Header` which breaks sphinx...
return super().copy()
[docs] @classmethod
def fromfile(cls, fh, verify=True):
"""Reads in GUPPI header block from a file.
Parameters
----------
fh : filehandle
To read data from.
verify: bool, optional
Whether to do basic checks on whether the header is valid. Verify
is automatically called by `~astropy.io.fits.Header.fromstring`, so
this flag exists only to standardize the API.
"""
header_start = fh.tell()
# Find the size of the header. GUPPI header entries are 80 char long
# with <=8 char keyword names. "=" is always the 9th char.
line = '========='
while line[8] in ('=', ' '):
line = fh.read(80).decode('ascii')
if line[:3] == 'END':
break
if line == '':
raise EOFError
header_end = fh.tell()
fh.seek(header_start)
hdr = fh.read(header_end - header_start).decode('ascii')
# Create the header using the base class.
self = cls.fromstring(hdr)
self.mutable = False
if verify:
self.verify()
# GUPPI headers are not a proper FITS standard, and we're reading
# from a file that the user likely cannot control, so let's not bother
# with card verification (this avoids warnings in repr(); gh-282)
for c in self.cards:
c._verified = True
return self
[docs] def tofile(self, fh):
"""Write GUPPI file header to filehandle.
Uses `~astropy.io.fits.Header.tostring`.
"""
fh.write(self.tostring(padding=False).encode('ascii'))
[docs] @classmethod
def fromkeys(cls, *args, verify=True, mutable=True, **kwargs):
"""Initialise a header from keyword values.
Like fromvalues, but without any interpretation of keywords.
Note that this just passes kwargs to the class initializer as a dict
(for compatibility with fits.Header). It is present for compatibility
with other header classes only.
"""
return cls(kwargs, *args, verify=verify, mutable=mutable)
[docs] @classmethod
def fromvalues(cls, **kwargs):
"""Initialise a header from parsed values.
Here, the parsed values must be given as keyword arguments, i.e., for
any ``header``, ``cls.fromvalues(**header) == header``.
However, unlike for the ``fromkeys`` class method, data can also be set
using arguments named after header methods, such as ``time``.
Furthermore, some header defaults are set in ``GUPPIHeader._defaults``.
"""
self = cls(cls._defaults, verify=False)
self.update(**kwargs)
return self
[docs] def update(self, *, verify=True, **kwargs):
"""Update the header with new values.
Here, any keywords matching properties are processed as well, in the
order set by the class (in ``_properties``), and after all other
keywords have been processed.
Parameters
----------
verify : bool, optional
If `True` (default), verify integrity after updating.
**kwargs
Arguments used to set keywords and properties.
"""
# Remove kwargs that set properties, in correct order.
extras = [(key, kwargs.pop(key)) for key in self._properties
if key in kwargs]
# Update the normal keywords.
super().update(kwargs)
# Now set the properties.
for attr, value in extras:
setattr(self, attr, value)
if verify:
self.verify()
def __setitem__(self, key, value):
if not self.mutable:
raise TypeError("immutable {0} does not support assignment."
.format(type(self).__name__))
super().__setitem__(key.upper(), value)
@property
def nbytes(self):
"""Size of the header in bytes."""
return (len(self) + 1) * 80
@property
def payload_nbytes(self):
"""Size of the payload in bytes."""
return int(self['BLOCSIZE'])
@payload_nbytes.setter
def payload_nbytes(self, payloadsize):
self['BLOCSIZE'] = payloadsize
@property
def frame_nbytes(self):
"""Size of the frame in bytes."""
return self.nbytes + self.payload_nbytes
@frame_nbytes.setter
def frame_nbytes(self, frame_nbytes):
self.payload_nbytes = frame_nbytes - self.nbytes
@property
def bps(self):
"""Bits per elementary sample."""
return int(self['NBITS'])
@bps.setter
def bps(self, bps):
self['NBITS'] = bps
@property
def complex_data(self):
"""Whether the data are complex."""
return int(self['OBSNCHAN']) != 1
@property
def npol(self):
"""Number of polarisations."""
return int(self['NPOL']) // (2 if self.complex_data else 1)
@npol.setter
def npol(self, npol):
self['NPOL'] = npol * (2 if self.complex_data else 1)
@property
def nchan(self):
"""Number of channels."""
return int(self['OBSNCHAN'])
@nchan.setter
def nchan(self, nchan):
self['OBSNCHAN'] = operator.index(nchan)
@property
def sample_shape(self):
"""Shape of a sample in the payload (npol, nchan)."""
return self.npol, self.nchan
@sample_shape.setter
def sample_shape(self, sample_shape):
# Need to set nchan first to properly set npol, since nchan is
# connected to complex_data.
self.nchan = sample_shape[1]
self.npol = sample_shape[0]
@property
def _bpcs(self):
"""Bits per complete sample."""
# NPOL includes factor of 2 for real/complex components.
return int(self['OBSNCHAN']) * int(self['NPOL']) * self.bps
@property
def sample_rate(self):
"""Number of complete samples per second.
Can be set with a negative quantity to set `sideband`. Overlap samples
are not included in the rate.
"""
return (1. / float(self['TBIN'])) * u.Hz
@sample_rate.setter
def sample_rate(self, sample_rate):
self['TBIN'] = 1. / abs(sample_rate.to_value(u.Hz))
bw = (sample_rate.to_value(u.MHz) * int(self['OBSNCHAN'])
/ (1 if self.complex_data else 2))
self['OBSBW'] = bw
@property
def sideband(self):
"""True if upper sideband."""
return float(self['OBSBW']) > 0
@sideband.setter
def sideband(self, sideband):
self['OBSBW'] = (1 if sideband else -1) * abs(self['OBSBW'])
@property
def channels_first(self):
"""True if encoded payload ordering is (nchan, nsample, npol)."""
# Called ``time-ordered`` in DSPSR.
return self['PKTFMT'] != 'SIMPLE'
@channels_first.setter
def channels_first(self, channels_first):
self['PKTFMT'] = '1SFA' if bool(channels_first) else 'SIMPLE'
@property
def samples_per_frame(self):
"""Number of complete samples in the frame, including overlap."""
return self.payload_nbytes * 8 // self._bpcs
@samples_per_frame.setter
def samples_per_frame(self, samples_per_frame):
old_payload_nbytes = self.payload_nbytes
self.payload_nbytes = (samples_per_frame * self._bpcs + 7) // 8
if self.samples_per_frame != samples_per_frame:
exc = ValueError("header cannot store {} samples per frame. "
"Nearest is {}.".format(samples_per_frame,
self.samples_per_frame))
self.payload_nbytes = old_payload_nbytes
raise exc
@property
def overlap(self):
"""Number of complete samples that overlap with the next frame."""
return int(self['OVERLAP'])
@overlap.setter
def overlap(self, overlap):
self['OVERLAP'] = operator.index(overlap)
@property
def offset(self):
"""Offset from start of observation in units of time."""
# PKTIDX only counts valid packets, not overlap ones.
return ((self['PKTIDX'] * self['PKTSIZE'] * 8 // self._bpcs)
* float(self['TBIN'])) * u.s
@offset.setter
def offset(self, offset):
self['PKTIDX'] = int((offset / (float(self['TBIN']) * u.s)
/ self['PKTSIZE']
* ((self._bpcs + 7) // 8)).to(u.one).round())
@property
def start_time(self):
"""Start time of the observation."""
return (Time(self['STT_IMJD'], scale='utc', format='mjd')
+ TimeDelta(self['STT_SMJD'], self['STT_OFFS'], format='sec'))
@start_time.setter
def start_time(self, start_time):
start_time = Time(start_time, scale='utc')
imjd = int(start_time.mjd)
# Calculate differences from start of day.
djd = start_time - Time(imjd, format='mjd', scale='utc')
# Correct for possible rounding errors.
imjd += int(djd.jd)
# Get seconds. Should now be guaranteed to be between 0 and 86400
# (or 86401 if a leap-second day).
seconds = (start_time - Time(imjd, format='mjd', scale='utc')).sec
self['STT_IMJD'] = imjd
self['STT_SMJD'], self['STT_OFFS'] = divmod(seconds, 1)
@property
def time(self):
"""Start time of the part of the observation covered by this header."""
return self.start_time + self.offset
@time.setter
def time(self, time):
"""Set header time.
If ``start_time`` is not already set, this sets it using the time and
``offset``. Otherwise, this sets ``offset`` using the time and
``start_time``.
Parameters
----------
time : `~astropy.time.Time`
Time for the first sample associated with this header.
"""
if 'STT_IMJD' not in self.keys():
self.start_time = time - self.offset
else:
self.offset = time - self.start_time
def _ipython_key_completions_(self):
# Enables tab-completion of header keys in IPython.
return self.keys()
def __eq__(self, other):
"""Whether headers have the same keys with the same values."""
return all(self.get(k, None) == other.get(k, None)
for k in (set(self.keys()) | set(other.keys())))
def __repr__(self):
name = self.__class__.__name__
vals = super().__repr__()
return ('<{0} {1}>'.format(name, ("\n " + len(name) * " ").join(
[v.rstrip() for v in vals.split('\n')])))