# Licensed under the GPLv3 - see LICENSE
"""Wrappers for PSRFTIS Header Data Units (HDUs)."""
from collections import namedtuple
from astropy import units as u
from astropy.time import Time, TimeDelta
from astropy.coordinates import EarthLocation
from astropy.coordinates import Latitude, Longitude
from astropy.io import fits
from astropy.utils import lazyproperty
import numpy as np
import operator
from .psrfits_htm_parser import HDU_TEMPLATES
__all__ = ["HDU_map", "HDUWrapper", "PSRFITSPrimaryHDU",
"SubintHDU", "PSRSubintHDU"]
[docs]class HDUWrapper:
def __init__(self, hdu=None, verify=True, hdu_type=None):
if hdu is None:
template = HDU_TEMPLATES[hdu_type]
if hdu_type == 'PRIMARY':
self.hdu = template.copy()
else:
# Make an indirect copy that doesn't create data.
self.hdu = template.__class__(data=fits.DELAYED,
header=template.header.copy())
else:
self.hdu = hdu
if verify:
self.verify()
[docs] def verify(self):
assert isinstance(self.header, fits.Header)
[docs] def copy(self):
return self.__class__(self.hdu.copy())
@property
def header(self):
return self.hdu.header
[docs] def close(self):
del self.hdu
[docs] def get_hdu_list(self):
# Build HDU list
# TODO, this need to be modified when combining multilpe HDU.
hdu_list = []
if hasattr(self, 'primary_hdu'):
hdu_list.append(self.primary_hdu.hdu)
hdu_list.append(self.hdu)
print(type(hdu_list))
return fits.HDUList(hdu_list)
[docs]class PSRFITSPrimaryHDU(HDUWrapper):
"""Wrapper for PSRFITS primary HDU, providing baseband-style properties.
Parameters
----------
hdu : `~astropy.io.fits.PrimaryHDU`
PSRFITS primary HDU instance.
Notes
-----
Frequencies are defined to be in the center of the channels.
"""
_properties = ('location', 'start_time', 'observatory', 'frequency',
'ra', 'dec', 'shape', 'sample_rate')
def __init__(self, hdu=None, verify=True):
# When input hdu is None, an empty Primary header will be initialized.
super().__init__(hdu, hdu_type="PRIMARY", verify=verify)
[docs] def verify(self):
assert self.header['SIMPLE'], "The HDU is not a FITS primary HDU."
assert self.header['FITSTYPE'] == "PSRFITS", \
"The header is not from a PSRFITS file."
@property
def location(self):
try:
return EarthLocation(self.header['ANT_X'],
self.header['ANT_Y'],
self.header['ANT_Z'], u.m)
except (KeyError, TypeError):
# Sometimes PSRFITS uses '*' to indicate no data.
# TODO: should this be AttributeError instead?
return None
@location.setter
def location(self, loc):
""" Location setter. input should be an Astropy EarthLocation"""
# TODO, for the space base observatory, Earth Location may not apply.
self.hdu.header['ANT_X'] = loc.x.to_value(u.m)
self.hdu.header['ANT_Y'] = loc.y.to_value(u.m)
self.hdu.header['ANT_Z'] = loc.z.to_value(u.m)
@property
def start_time(self):
return (Time(float(self.header['STT_IMJD']), format='mjd', precision=9,
location=self.location)
+ TimeDelta(float(self.header['STT_SMJD']),
float(self.header['STT_OFFS']),
format='sec', scale='tai'))
@start_time.setter
def start_time(self, time):
""" Set the start_time, the input value should be an Time object"""
# Should we allow time set location
if time.location is not None:
self.location = time.location
mjd_int = int(time.mjd)
mjd_frac = (time - Time(mjd_int, scale=time.scale, format='mjd'))
frac_sec, int_sec = np.modf(mjd_frac.to(u.s).value)
self.hdu.header['STT_IMJD'] = '{0:05d}'.format(mjd_int)
self.hdu.header['STT_SMJD'] = '{}'.format(int(int_sec))
self.hdu.header['STT_OFFS'] = '{0:17.15f}'.format(frac_sec)
self.hdu.header['DATE-OBS'] = time.fits
@property
def telescope(self):
return self.header['TELESCOP']
@telescope.setter
def telescope(self, value):
self.hdu.header['TELESCOP'] = value
@property
def frequency(self):
try:
n_chan = int(self.header['OBSNCHAN'])
c_freq = float(self.header['OBSFREQ'])
bw = float(self.header['OBSBW'])
except (KeyError, ValueError):
return None
chan_bw = bw / n_chan
# According to the PSRFITS definition document, channels are
# numbered 1 to n_chan, with the zeroth channel assumed removed
# and c_freq is the frequency of channel n_nchan / 2. We use
# (n_chan + 1) // 2 to ensure this makes sense for n_chan = 1
# and is consistent with the document at least for even n_chan.
freq = c_freq + (np.arange(1, n_chan + 1)
- ((n_chan + 1) // 2)) * chan_bw
return u.Quantity(freq, u.MHz, copy=False)
@frequency.setter
def frequency(self, freq):
"""Frequency setter. The input should be the frequency array."""
freq = freq.to_value(u.MHz)
n_chan = len(freq)
# add the channel 0
# we assume the frequency resolution is the same across the band.
freq_pad = np.insert(freq, 0, 2 * freq[0] - freq[1])
c_chan = freq_pad[((n_chan + 1) // 2)]
bw = freq_pad.ptp()
self.hdu.header['OBSNCHAN'] = n_chan
self.hdu.header['OBSFREQ'] = c_chan
self.hdu.header['OBSBW'] = bw
@property
def sideband(self):
return np.where(self.hdu.header['OBSBW'] > 0, np.int8(1), np.int8(-1))
@sideband.setter
def sideband(self, sideband):
assert np.all(np.abs(sideband) == 1), "sideband should be +/- 1"
self.hdu.header['OBSBW'] = sideband * abs(self.hdu.header['OBSBW'])
@property
def ra(self):
return Longitude(self.header['RA'], unit=u.hourangle)
@ra.setter
def ra(self, value):
self.hdu.header['RA'] = value.to_string(sep=':', pad=True)
@property
def dec(self):
return Latitude(self.header['DEC'], unit=u.deg)
@dec.setter
def dec(self, value):
self.hdu.header['DEC'] = value.to_string(sep=':', alwayssign=True,
pad=True)
@property
def obs_mode(self):
return self.header['OBS_MODE']
@obs_mode.setter
def obs_mode(self, value):
assert value in {'PSR', 'CAL', 'SEARCH'}, \
"obs_mode can only be 'PSR', 'CAL', or 'SEARCH'."
self.hdu.header['OBS_MODE'] = value
[docs]class SubintHDU(HDUWrapper):
"""Base for PSRFITS SUBINT HDU wrappers.
Parameters
----------
hdu : `~astropy.io.fits.BinTableHDU` instance
The PSRFITS table HDU of SUBINT type.
primary : `~baseband_tasks.io.psrfits.PSRFITSPrimaryHDU`
The wrapped PSRFITS main header.
verify: bool, optional
Whether to do basic verification. Default is `True`.
Notes
-----
Right now we are assuming the data rows are continuous in time and the
frequencies do not vary.
"""
_properties = ('samples_per_frame', 'sample_shape', 'shape',
'start_time', 'sample_rate',
'polarization', 'frequency', 'bandwidth', 'sideband')
"""Possibly settable properties that this class provides."""
# NOTE: order of the above matters, as some of the later ones may
# need earlier ones (e.g., one cannot set start_time without a shape).
_sample_shape_maker = namedtuple('SampleShape', 'nbin, nchan, npol')
_shape_maker = namedtuple('Shape', 'nsample, nbin, nchan, npol')
def __new__(cls, hdu=None, primary_hdu=None, verify=True):
# Map Subint subclasses;
# TODO: switch to__init_subclass__ when we only support python>=3.6.
try:
mode = primary_hdu.obs_mode
cls = subint_map[mode]
except AttributeError:
raise ValueError("need a primary HDU to determine the mode.")
except KeyError:
raise ValueError("'{}' is not a valid mode.".format(mode))
return super().__new__(cls)
def __init__(self, hdu=None, primary_hdu=None, verify=True):
self.primary_hdu = primary_hdu
self.offset = 0
super().__init__(hdu, verify=verify, hdu_type='SUBINT')
self.sample_label = ('nbin', 'nchan', 'npol')
[docs] def verify(self):
assert self.header['EXTNAME'].strip() == "SUBINT", \
"Input HDU is not a SUBINT type."
assert isinstance(self.primary_hdu, PSRFITSPrimaryHDU), \
"Primary HDU needs to be a PSRFITSPrimaryHDU instance."
@property
def mode(self):
return self.primary_hdu.obs_mode
@property
def start_time(self):
# Note: subclasses can use or override this.
return self.primary_hdu.start_time
@property
def _has_data(self):
# TODO: surely there is a better way...
return (self.hdu._file is not None
or self.hdu._buffer is not None
or self.hdu._has_data)
@property
def data(self):
if not self._has_data:
try:
dims = tuple(self.sample_shape)[::-1]
except AttributeError:
raise AttributeError('can only initialize data '
'if sample_shape is set.') from None
# It really seems necessary to change a private attribute...
self.hdu.columns.change_attrib('DATA', '_dims', dims)
repr_dims = repr(dims).replace(' ', '')
self.hdu.columns.change_attrib('DATA', 'dim', repr_dims)
self.hdu.data = np.zeros(self.nrow, self.hdu.columns.dtype)
return self.hdu.data
@data.deleter
def data(self):
del self.hdu.data
del self.hdu.columns._dims
del self.hdu.columns.dtype
@property
def nrow(self):
return self.header['NAXIS2']
@nrow.setter
def nrow(self, value):
if self.hdu._has_data:
raise AttributeError('can only set nrow on empty HDU. '
'Delete data first.')
self.hdu.header['NAXIS2'] = operator.index(value)
@property
def nchan(self):
nchan = self.header['NCHAN']
if nchan == '*':
raise AttributeError('nchan has not yet been set.')
return nchan
@nchan.setter
def nchan(self, value):
if self._has_data:
raise AttributeError('can only set nchan on empty HDU. '
'Delete data first.')
self.hdu.header['NCHAN'] = operator.index(value)
@property
def npol(self):
npol = self.header['NPOL']
if npol == '*':
raise AttributeError('npol has not yet been set.')
return npol
@npol.setter
def npol(self, value):
if self._has_data:
raise AttributeError('can only set npol on empty HDU. '
'Delete data first.')
self.hdu.header['NPOL'] = operator.index(value)
@property
def nbin(self):
nbin = self.header['NBIN']
if nbin == '*':
raise AttributeError('nbin has not yet been set.')
return nbin
@nbin.setter
def nbin(self, value):
if self._has_data:
raise AttributeError('can only set nbin on empty HDU. '
'Delete data first.')
self.hdu.header['NBIN'] = operator.index(value)
@property
def sample_shape(self):
return self._sample_shape_maker(self.nbin, self.nchan, self.npol)
@sample_shape.setter
def sample_shape(self, shape):
self.nbin = shape[0]
self.nchan = shape[1]
self.npol = shape[2]
@property
def shape(self):
return self._shape_maker(self.nrow * self.samples_per_frame,
self.nbin, self.nchan, self.npol)
@shape.setter
def shape(self, shape):
assert shape[0] % self.samples_per_frame == 0, (
'shape has to an integer multiple of samples_per_frame={}'
.format(self.samples_per_frame))
self.sample_shape = shape[1:]
self.nrow = shape[0] // self.samples_per_frame
@property
def polarization(self):
pol_type = self.header['POL_TYPE']
# split into equal parts using zip;
# see https://docs.python.org/3.5/library/functions.html#zip
return np.array([map(''.join, zip(*[iter(self.header['POL_TYPE'])]
* (len(pol_type) // self.npol)))])
@polarization.setter
def polarization(self, value):
""" Setter for polarization labels.
Parameter
---------
value : array-like
The names of polarization
"""
# check if the input value length matches the npol
assert len(value) == self.npol, \
("The input polarization name does not match the number of"
" polarizations.")
self.hdu.header['POL_TYPE'] = ''.join(value)
@property
def frequency(self):
if 'DAT_FREQ' in self.data.names:
freqs = u.Quantity(self.data['DAT_FREQ'][0], u.MHz, copy=False)
else:
freqs = super().frequency
if freqs is not None:
freqs = freqs.reshape(-1, 1)
return freqs
@frequency.setter
def frequency(self, freqs):
freqs_value = np.atleast_1d(freqs.to_value(u.MHz))
if self.nchan != len(freqs_value):
raise ValueError("Frequency size has to match the channle number "
"'nchan'.")
self.data['DAT_FREQ'] = np.broadcast_to(freqs_value, (self.nrow,
self.nchan))
# TODO I am not sure if this is the best way to get the channel
# bandwidth.
if self.nchan > 1:
self.hdu.header['CHAN_BW'] = freqs_value[1] - freqs_value[0]
else:
pass
if self.primary_hdu.frequency is None:
self.primary_hdu.frequency = freqs
@property
def bandwidth(self):
try:
return np.abs(self.header['CHAN_BW']) * u.MHz
except TypeError:
return None
@bandwidth.setter
def bandwidth(self, bandwidth):
bandwidth = bandwidth.to_value(u.MHz)
if self.sideband == -1:
bandwidth *= -1
self.header['CHAN_BW'] = bandwidth
@property
def sideband(self):
try:
return np.where(self.header['CHAN_BW'] > 0,
np.int8(1), np.int8(-1))
except TypeError:
return None
@sideband.setter
def sideband(self, sideband):
assert np.all(np.abs(sideband) == 1), "sideband should be +/- 1"
self.header['CHAN_BW'] = sideband * abs(self.header['CHAN_BW'])
@lazyproperty
def dtype(self):
"""Data type of the data. Inferred from ``read_data_row(0)``."""
return self.read_data_row(0).dtype
[docs] def read_data_row(self, index, weighted=False):
if index >= self.nrow:
raise EOFError("cannot read from beyond end of input SUBINT HDU.")
row = self.data[index]
# Reversed the header shape to match the data
data_scale = row['DAT_SCL'].reshape(-1, 1)
data_off_set = row['DAT_OFFS'].reshape(-1, 1)
try:
zero_off = self.header['ZERO_OFF']
# Sometimes zero_off equals * or some such
float(zero_off)
except Exception:
zero_off = 0
result = (row['DATA'] - zero_off) * data_scale + data_off_set
if weighted and 'DAT_WTS' in self.data.names:
result *= row['DAT_WTS'].reshape(-1, 1)
return result
[docs]class PSRSubintHDU(SubintHDU):
"""Wrapper for PSRFITS SUBINT HDUs, providing baseband-style properties.
Parameters
----------
hdu : `~astropy.io.fits.BinTableHDU` instance
The PSRFITS table HDU of SUBINT type.
primary : `~baseband_tasks.io.psrfits.PSRFITSPrimaryHDU`
The wrapped PSRFITS main header.
verify: bool, optional
Whether to do basic verification. Default is `True`.
Notes
-----
Right now we are assuming the data rows are continuous in time and the
frequency are the same.
"""
[docs] def verify(self):
super().verify()
assert self.mode.upper() == 'PSR', \
"Header HDU is not in the folding mode."
assert int(self.header['NBIN']) > 1, \
"Invalid 'NBIN' field in the header."
# Check frequency
if 'DAT_FREQ' in self.data.names:
freqs = u.Quantity(self.data['DAT_FREQ'],
u.MHz, copy=False)
assert np.array_equiv(freqs[0], freqs), \
"Frequencies are not all the same for different rows."
# NOTE some files has large amount of TSUBING differ. comment this part
# for right now.
# tsubint = self.data['TSUBINT']
# assert all(np.isclose(tsubint[0], tsubint, atol=1e-1)), \
# "TSUBINT differ by large amounts in different rows."
d_shape_raw = self.data['DATA'].shape
d_shape_header = (self.nbin, self.nchan, self.npol)
# The shape has to be inversed since FITS is in the Fortran order.
assert d_shape_raw == (self.nrow,) + d_shape_header[::-1], \
"Data shape does not match with the header information."
@property
def start_time(self):
"""Start time of the first sub-integration.
Notes
-----
The start time is accurate to one pulse period. This calculation below
is consistent with PSRCHIVE's definition
(defined in psrchive/Base/Classes/Integration.C)
"""
start_time = super().start_time
if "OFFS_SUB" in self.data.names:
offset0 = (self.data['OFFS_SUB'][0]
- self.data['TSUBINT'][0] * self.samples_per_frame / 2)
start_time += u.Quantity(offset0, u.s, copy=False)
return start_time
@start_time.setter
def start_time(self, time):
"""
Note
----
this sets the start time of the HDU, not the file start time.
"""
try:
_ = self.primary_hdu.start_time
except ValueError:
self.primary_hdu.start_time = time
dt = 0
else:
dt = (time - self.primary_hdu.start_time).to(u.s).value
self.data['OFFS_SUB'][0] = dt + self.data['TSUBINT'] / 2
@property
def samples_per_frame(self):
return 1
@property
def sample_rate(self):
# NOTE we are assuming TSUBINT is uniform; tested in verify,
# but as individual numbers seem to vary, take the mean.
# TODO: check whether there really isn't a better way!.
sample_time = u.Quantity(self.data['TSUBINT'], u.s).mean()
return 1.0 / sample_time
@sample_rate.setter
def sample_rate(self, value):
sample_time = 1.0 / value
self.data['TSUBINT'] = sample_time.to_value(u.s)
[docs] def close(self):
super().close()
self.primary_hdu.close()
HDU_map = {'PRIMARY': PSRFITSPrimaryHDU,
'SUBINT': SubintHDU}
# TODO maybe we should just use the HDU_map. Is there any psrfits file that
# does not have SUBINT data?
subint_map = {'PSR': PSRSubintHDU}
# TODO: add search HDU
# TODO: actually use this template!!
hdu_list_template = {'PSR': {'primary': PSRFITSPrimaryHDU,
'data': PSRSubintHDU}}