Source code for baseband_tasks.io.psrfits.core

# Licensed under the GPLv3 - see LICENSE
"""Interfaces for dealing with PSRFITS fold-mode data."""

from astropy.io import fits
from astropy import log
from collections import defaultdict

from baseband_tasks.base import BaseTaskBase
from .hdu import HDU_map, subint_map


__all__ = ['open', 'get_readers', 'get_writer', 'PSRFITSReader',
           'PSRFITSWriter']


[docs]def open(filename, mode='r', **kwargs): """Function to open a PSRFITS file. Parameters ---------- filename : str Input PSRFITS file name. mode : {'r', 'w'}, optional Whether to open for reading (default) or writing. **kwargs Additional arguments for opening the fits file and creating the reader. --- For opening the fits file : memmap : bool, optional Is memory mapping to be used? By default, this value is obtained from the configuration item ``astropy.io.fits.Conf.use_memmap`` (the default of which, in turn, is `True`). --- For creating the reader : weighted : bool, optional Whether the data should be weighted along the frequency axis. Default is `True`. verify : bool, optional Whether to do basic checks on the PSRFITS HDUs. Default is `True`. --- For creating the writer : primary_hdu : ~baseband_tasks.io.psrfits.PSRFITSPrimaryHDU Currently required to be constructed before opening the file. This limitation will be lifted in future. **kwargs Further arguments to set up the SUBINT HDU. These can include 'start_time', 'sample_rate', 'sample_shape', 'shape', 'samples_per_frame', 'polarization', 'frequency'. Returns ------- reader : file-handle like reader for the first SUBINT HDU Raises ------ RuntimeError If more than one SUBINT HDU is present. Notes ----- The current version` only opens and reads one SUBINT HDU and ignores all other types of HDUs. """ if mode == 'r': memmap = kwargs.pop('memmap', None) hdu_list = fits.open(filename, 'readonly', memmap=memmap) reader_list = get_readers(hdu_list, **kwargs) # TODO: allow support for more than one HDU. if len(reader_list) != 1: raise RuntimeError("Current reader can only read one SUBINT HDU.") # TODO Returning one HDU for now. But in the future, we may want to # return a group of it. return reader_list[0] elif mode == 'w': # Build Writer from scratch primary_hdu = kwargs.pop('primary_hdu', None) if primary_hdu is None: raise ValueError("need a primary hdu/meta data for building a" " PSRFITS file.") writer = get_writer(filename, primary_hdu, **kwargs) return writer else: raise ValueError("Unknown mode '{}'. Currently only modes 'r' and 'w' " "are supported.".format(mode))
[docs]def get_readers(hdu_list, **kwargs): """Function to read one PSRFITS file into a list of HDU Readers. Parameters ---------- hdu_list : str Opened PSRFITS file. **kwargs Additional arguments for opening the fits file and creating the reader. Passes on to `~baseband_tasks.io.psrfits.PSRFITSReader`. Returns ------- readers : list of the HDU readers. """ buffers = defaultdict(list) for ii, hdu in enumerate(hdu_list): if hdu.name in HDU_map.keys(): buffers[hdu.name].append(hdu) else: log.warning("Skipping HDU {} ({}), as it is not a supported " "PSRFITS HDU.".format(ii, hdu.name)) primary = buffers.pop('PRIMARY', []) if len(primary) != 1: raise ValueError("HDUList `{}` does not have a header" " HDU or have more than one header" " HDU.".format(hdu_list)) primary_hdu = HDU_map['PRIMARY'](primary[0]) psrfits_hdus = [] for k, v in buffers.items(): for hdu in v: psrfits_hdus.append(HDU_map[k](hdu, primary_hdu)) # Build reader on the HDUs readers = [] for hdu in psrfits_hdus: readers.append(PSRFITSReader(hdu, **kwargs)) return readers
[docs]def get_writer(filename, hdu, **kwargs): """Function to init a PSRFITS HDU for writing. Parameters ---------- filename : str or filehandle File to eventually write the FITS data to. shape : tuple The shape for data. **kwargs Additional arguments for creating PSRFITS writer. Passes on to `~baseband_tasks.io.psrfits.PSRFITSWriter`. """ # TODO: not sure if the best solution is to build from a primary HDU. # Might be nicer if we can construct from kwargs. Might want to use # the hdu_list_template in hdu.py if not isinstance(hdu, HDU_map['PRIMARY']): raise ValueError("need a PSRFITSPrimaryHDU to write FITS data") # TODO, is there any PSRFITS has no subint hdu? try: subint = subint_map[hdu.obs_mode] except KeyError as exc: exc.args += ("observation mode '{}' is not a valid" " mode. Available modes are: {}" .format(hdu.obs_mode, list(subint_map.keys())),) raise exc hdu = subint(primary_hdu=hdu) # Take input from the keywords # TODO: in analogy with baseband, should there be a .fromvalues() class # method? We should not be accessing private properties here... for ppt in hdu._properties: ppt_value = kwargs.pop(ppt, None) if ppt_value is not None: setattr(hdu, ppt, ppt_value) if kwargs: raise TypeError('unused keyword arguments: {} '.format(kwargs)) # TODO: should the file already opened for writing here? Not good # to get an error if the file exists only when the writer is closed! return PSRFITSWriter(filename, hdu)
[docs]class PSRFITSReader(BaseTaskBase): """Wrapper for reading PSRFITS HDUs. Parameters ---------- ih : wrapped PSRFITS HDU The input fits table HDU, wrapped in an interface from `~baseband_tasks.io.psrfits`. dtype : `~numpy.dtype`, optional Dtype of the samples. Default: inferred from ``hdu``. weighted : bool, optional Whether to weight the data along the frequency axis using the 'DAT_WTS' column. Default of `True` should suffice for most purposes, but sometimes the weights are incorrect. --- **kwargs : meta data for the stream, which usually include frequency : `~astropy.units.Quantity`, optional Frequencies for each channel. Should be broadcastable to the sample shape. Default: inferred from ``hdu``. sideband : array, optional Whether frequencies are upper (+1) or lower (-1) sideband. Should be broadcastable to the sample shape. Default: inferred from ``hdu``. polarization : array or (nested) list of char, optional Polarization labels. Should broadcast to the sample shape, i.e., the labels are in the correct axis. For instance, ``['X', 'Y']``, or ``[['L'], ['R']]``. Default: inferred from ``hdu``. """ # Note: this very light-weight wrapper around SubintHDU is mostly here # because eventually it might unify different/multiple HDUs. def __init__(self, ih, *, dtype=None, weighted=True, **kwargs): self.weighted = weighted super().__init__(ih, dtype=dtype, **kwargs) def _read_frame(self, frame_index): res = self.ih.read_data_row(frame_index, weighted=self.weighted).T return res.reshape((self.samples_per_frame, ) + self.sample_shape)
[docs] def close(self): file = self.ih.hdu._file super().close() file.close()
[docs]class PSRFITSWriter: """Interface class for writing the PSRFITS HDUs Parameters ---------- filename : str Output file name. hdu : wrapped PSRFITS HDU The output fits table HDU, wrapped in an interface from `~baseband_tasks.io.psrfits`. Notes ----- Currently it only support write the PSRFITS primary HDU and Subint HDU. """ # TODO: this should have some of the attributes from the underlying HDU, # in particular sample_shape, sample_rate, etc. May be good to split # off a ReaderBase from base.Base, and then define a WriterBase as well. def __init__(self, filename, hdu): self.filename = filename self.hdu = hdu self.offset = 0
[docs] def write(self, data): open_space = self.hdu.shape[0] - self.offset - data.shape[0] # TODO This may have to change if we want to write data to # multiple files if open_space < 0: raise RuntimeError("Not enough space for input data.") # FIXME add scaling # We need to have a hdu data setter, otherwise, this will not # scaled right. That should also deal with this reshaping; # i.e., do implement a write_data_row function! data_row = data.reshape(self.hdu.samples_per_frame, -1) self.hdu.data['DATA'][ 0, self.offset:self.offset+data.shape[0]] = data_row self.offset += data.shape[0]
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def close(self): """Dump the data to the underlying file. Also removes references to the underlying HDU. """ # dump the data out # First convert the psrfits hdu to HDUList fits_hdu_list = fits.HDUList([self.hdu.primary_hdu.hdu, self.hdu.hdu]) fits_hdu_list.writeto(self.filename, overwrite=False)