Source code for baseband_tasks.integration

# Licensed under the GPLv3 - see LICENSE
"""Tasks for integration over time and pulse phase."""

import operator
import warnings

import numpy as np
from astropy import units as u
from astropy.time import Time
from astropy.utils import ShapedLikeNDArray

from .base import BaseTaskBase


__all__ = ['Integrate', 'Fold', 'Stack']


class _FakeOutput(ShapedLikeNDArray):
    """Pretend output class that diverts setting.

    It subclasses `~astropy.utils.ShapedLikeNDArray` to mimic all the
    shape properties of `~numpy.ndarray` given a (fake) shape.
    """

    def __init__(self, shape, setitem):
        self._shape = shape
        self._setitem = setitem

    def __setitem__(self, item, value):
        # Call back on out[item] = value.
        return self._setitem(item, value)

    # The two required parts for ShapedLikeNDArray.
    @property
    def shape(self):
        return self._shape

    def _apply(self, *args, **kwargs):
        raise NotImplementedError("No _apply possible for _FakeOutput")


def is_index(n):
    """Helper that checks whether n is suitable for indexing."""
    try:
        operator.index(n)
    except TypeError:
        return False
    else:
        return True


[docs]class Integrate(BaseTaskBase): """Integrate a stream stepwise. Parameters ---------- ih : task or `baseband` stream reader Input data stream, with time as the first axis. step : int or `~astropy.units.Quantity`, optional Interval over which to integrate. For time invervals, should have units of time. For phase intervals, units should be consistent with what the ``phase`` callable returns. If no ``phase`` callable is passed in, then if step is integer, it is taken to be a number of samples in the underlying stream that should be integrated over, and if step is omitted, integration is over all samples. phase : callable Should return full pulse phase (i.e., including cycle count) for given input times (passed in as '~astropy.time.Time'). The output should be compatible with ``step``, i.e., generally an `~astropy.units.Quantity` with angular units. start : `~astropy.time.Time` or int, optional Time or offset at which to start the integration. If an offset or if ``step`` is integer, the actual start time will the underlying sample time nearest to the requested one. Default: 0 (start of stream). average : bool, optional Whether the output should be the average of all entries that contributed to it, or rather the sum, in a structured array that holds both ``'data'`` and ``'count'`` items. samples_per_frame : int, optional Number of samples to process in one go. This can be used to optimize the process. With many samples per bin, the default of 1 should be OK. dtype : `~numpy.dtype`, optional Output dtype. Generally, the default of the dtype of the underlying stream is good enough, but can be used to increase precision. Note that if ``average=True``, it is the user's responsibilty to pass in a structured dtype. Notes ----- If there are not many samples per bin, either set ``samples_per_frame`` to a larger number or ensure that ``samples_per_frame`` of the underlying stream is not small (larger than, say, 20). If both are small, there will be a relatively large overhead in calculating phases. Since time or phase bins are typically not an integer multiple of the underlying bin spacing, the integrated samples will generally not contain the same number of samples. The actual number of samples is counted, and for ``average=True``, the sums have been divided by these counts, with bins with no points set to ``NaN``. For ``average=False``, the arrays returned by ``read`` are structured arrays with ``data`` and ``count`` fields. .. warning: The format for ``average=False`` may change in the future. """ def __init__(self, ih, step=None, phase=None, *, start=0, average=True, samples_per_frame=1, dtype=None): self._start = start self._step = step ih_start = ih.seek(start) ih_n_sample = ih.shape[0] - ih_start if ih_start < 0 or ih_n_sample < 0: raise ValueError("'start' is not within the underlying stream.") if isinstance(start, Time): # We may not be at an integer sample. ih_start += ((start - ih.time) * ih.sample_rate).to_value(u.one) else: start = ih.time if step is None: step = ih_n_sample if is_index(step): assert phase is None, 'cannot pass in phase and integer step' sample_rate = ih.sample_rate / step n_sample = ih_n_sample / step else: stop = ih.stop_time if phase is not None: start = phase(start) stop = phase(stop) sample_rate = 1 / step n_sample = ((stop - start) * sample_rate).to_value(u.one) # Initialize value for _get_offsets. self._mean_offset_size = n_sample / ih_n_sample self._sample_start = start # Calculate output shape. n_sample = int(n_sample + 0.5*self._mean_offset_size) assert n_sample >= 1, "time per frame larger than total time in stream" shape = (n_sample,) + ih.sample_shape # Set start_time or indicate time should be inferred from ih. # (see _tell_time). if isinstance(start, Time) and sample_rate.unit.is_equivalent(u.Hz): start_time = start else: start_time = False # Output dtype. TODO: upgrade by default? if dtype is None: if average: dtype = ih.dtype else: dtype = np.dtype([('data', ih.dtype), ('count', int)]) super().__init__(ih, shape=shape, sample_rate=sample_rate, samples_per_frame=samples_per_frame, start_time=start_time, dtype=dtype) self.average = average self._phase = phase self._ih_start = ih_start def _tell_time(self, offset): if self._start_time: return super()._tell_time(offset) else: return self.ih._tell_time(self._get_offsets(offset)) def _get_offsets(self, samples, precision=1.e-3, max_iter=10): """Get offsets in the underlying stream nearest to samples. For a phase callable, this is done by iteratively guessing offsets, calculating their associated phase, and updating, until the change in guessed offset is less than ``precision`` or more than ``max_iter`` iterations are done. Phase is assumed to increase monotonously with time. """ if self._phase is None: return (np.around(samples / self._mean_offset_size + self._ih_start).astype(int)) # Requested phases relative to start (we work relative to the start # to avoid rounding errors for large cycle counts). Also, we want # *not* to use the Phase class, as it makes interpolation tricky. phase = np.ravel(samples) / self.sample_rate # Initial guesses for the associated offsets. ih_mean_phase_size = self._mean_offset_size / self.sample_rate offsets = (phase / ih_mean_phase_size).to_value(u.one) # In order to update guesses, below we interpolate phase in offset. # Add known boundaries to ensure we do not go out of bounds there. all_offsets = np.hstack((0, offsets, self.ih.shape[0]-self._ih_start)) # Associated phases relative to start phase; # all but start (=0) and stop will be overwritten. all_ih_phase = all_offsets * ih_mean_phase_size # Add in base offset in underlying file. all_offsets += self._ih_start # Select the parts we are going to modify (in-place). offsets = all_offsets[1:-1] ih_phase = all_ih_phase[1:-1] mask = np.ones(offsets.shape, bool) it = 0 while np.any(mask) and it < max_iter: # Use mask to avoid calculating more phases than necessary. # First calculate phase associate with the current offset guesses. old_offsets = offsets[mask] ih_time = self.ih.start_time + old_offsets / self.ih.sample_rate # TODO: the conversion is necessary because Quantity(Phase) # doesn't convert the two doubles to float internally. ih_phase[mask] = ((self._phase(ih_time) - self._sample_start) .astype(ih_phase.dtype, copy=False)) # Next, interpolate in known phases to get improved offsets. offsets[mask] = np.interp(phase[mask], all_ih_phase, all_offsets) # Finally, update mask. mask[mask] = abs(offsets[mask] - old_offsets) > precision it += 1 if it >= max_iter: # pragma: no cover warnings.warn('offset calculation did not converge. ' 'This should not happen!') shape = getattr(samples, 'shape', ()) return offsets.round().astype(int).reshape(shape) def _read_frame(self, frame_index): """Determine which samples to read, and integrate over them. Uses the a ``_get_offsets`` method to determine where in the underlying stream the samples should be gotten from. Integration is done by setting up a fake output array whose setter calls back to the ``_integrate`` method that does the actual summing. """ # Get offsets in the underlying stream for the current samples (and # the next one to get the upper edge). For integration over time # intervals, these offsets are not necessarily evenly spaced. sample0 = frame_index * self.samples_per_frame n_sample = min(self.samples_per_frame, self.shape[0]-sample0) samples = np.arange(sample0, sample0+n_sample+1) offsets = self._get_offsets(samples) self.ih.seek(offsets[0]) offsets -= offsets[0] # Set up fake output with a shape that tells the reader of the # underlying stream how many samples should be read (and a remaining # part that should pass consistency checks), and which has a callback # for the actual setting of output in the reader. integrating_out = _FakeOutput((offsets[-1],) + self.ih.sample_shape, setitem=self._integrate) # Set up real output and store information used in self._integrate frame = np.zeros((n_sample,) + self.sample_shape, dtype=self.dtype) if self.average: ndim_ih_sample = len(self.ih.sample_shape) self._frame = { 'data': frame, 'count': np.zeros(frame.shape[:-ndim_ih_sample] + (1,)*ndim_ih_sample, dtype=int)} else: self._frame = frame self._offsets = offsets # Do the actual reading. self.ih.read(out=integrating_out) if self.average: frame /= self._frame['count'] return frame def _integrate(self, item, data): """Sum data in the correct samples. Here, item will be a slice with start and stop being indices in the underlying stream relative to the start of the current output frame, and data the corresponding slice of underlying stream. """ # Note that this is not entirely trivial even for integrating over an # integer number of samples, since underlying data frames do not # necessarily contain integer multiples of this number of samples. # # First find all samples that have any overlap with the slice, i.e., # for which start < offset_right and stop > offset_left. Here, we use # the offsets in the underlying stream for each sample in the current # frame, plus the one just above, i.e., f[0]=0, f[1], f[2], ..., f[n]. # (E.g., bin 0 should be included only when start < f[1]; bin n-1 # only when stop > f[n-1].) start = np.searchsorted(self._offsets[1:], item.start, side='right') stop = np.searchsorted(self._offsets[:-1], item.stop, side='left') # Calculate corresponding indices in ``data`` by extracting the offsets # that have any overlap (we take one more -- guaranteed to exist -- # so that we can count the number of items more easily), subtracting # the start offset, and clipping to the right range. indices = self._offsets[start:stop + 1] - item.start indices[0] = 0 indices[-1] = item.stop - item.start # Finally, sum within slices constructed from consecutive indices # (reduceat always adds the end point itself). self._frame['data'][start:stop] += np.add.reduceat(data, indices[:-1]) self._frame['count'][start:stop] += ( np.diff(indices).reshape((-1,) + (1,) * (data.ndim - 1)))
[docs]class Fold(Integrate): """Fold pulse profiles in fixed time intervals. Parameters ---------- ih : task or `baseband` stream reader Input data stream, with time as the first axis. n_phase : int Number of bins per pulse period. phase : callable Should return pulse phases (with or without cycle count) for given input time(s), passed in as an '~astropy.time.Time' object. The output can be an `~astropy.units.Quantity` with angular units or a regular array of float (in which case units of cycles are assumed). step : int or `~astropy.units.Quantity`, optional Number of input samples or time interval over which to fold. If not given, the whole file will be folded into a single profile. start : `~astropy.time.Time` or int, optional Time or offset at which to start the integration. If an offset or if ``step`` is integer, the actual start time will the underlying sample time nearest to the requested one. Default: 0 (start of stream). average : bool, optional Whether the output pulse profile should be the average of all entries that contributed to it, or rather the sum, in a structured array that holds both ``'data'`` and ``'count'`` items. samples_per_frame : int, optional Number of sample times to process in one go. This can be used to optimize the process, though in general the default of 1 should work. dtype : `~numpy.dtype`, optional Output dtype. Generally, the default of the dtype of the underlying stream is good enough, but can be used to increase precision. Note that if ``average=True``, it is the user's responsibilty to pass in a structured dtype. See Also -------- Stack : to integrate over pulse phase and create pulse stacks Notes ----- If there are only few input samples per phase bin (i.e., its inverse sample rate is similar to the time per phase bin), then it is important to ensure the ``samples_per_frame`` of the underlying stream is not small (larger than, say, 20), to avoid a large overhead in calculating phases. Since the sample time is not necessarily an integer multiple of the pulse period, the returned profiles will generally not contain the same number of samples in each phase bin. The actual number of samples is counted, and for ``average=True``, the sums have been divided by these counts, with bins with no points set to ``NaN``. For ``average=False``, the arrays returned by ``read`` are structured arrays with ``data`` and ``count`` fields. .. warning: The format for ``average=False`` may change in the future. """ def __init__(self, ih, n_phase, phase, step=None, *, start=0, average=True, samples_per_frame=1, dtype=None): super().__init__(ih, step=step, start=start, average=average, samples_per_frame=samples_per_frame) # And ensure we reshape it to cycles. self._shape = (self._shape[0], n_phase) + ih.sample_shape self.n_phase = n_phase self.phase = phase def _read_frame(self, frame_index): # Before calling the underlying implementation, get the start time in # the underlying frame, to be used to calculate phases in _integrate. offset0 = self._get_offsets(frame_index * self.samples_per_frame) self.ih.seek(offset0) self._raw_time = self.ih.time return super()._read_frame(frame_index) def _integrate(self, item, raw): # Get sample and phase indices. raw_items = np.arange(item.start, item.stop) if self.samples_per_frame == 1: sample_index = 0 else: sample_index = np.searchsorted(self._offsets[1:], raw_items) # TODO: allow having a phase reference. phases = self.phase(self._raw_time + raw_items / self.ih.sample_rate) phase_index = ((phases % (1. * u.cycle)).to_value(u.cycle) * self.n_phase).astype(int) # Do the actual folding, adding the data to the sums and counts. # TODO: np.add.at is not very efficient; replace? np.add.at(self._frame['data'], (sample_index, phase_index), raw) np.add.at(self._frame['count'], (sample_index, phase_index), 1)
[docs]class Stack(BaseTaskBase): """Create a stream of pulse profiles. Parameters ---------- ih : task or `baseband` stream reader Input data stream, with time as the first axis. n_phase : int Number of bins per pulse period. phase : callable Should return pulse phases for given input time(s), passed in as an '~astropy.time.Time' object. The output should be an array of float, and has to include the cycle count. start : `~astropy.time.Time` or int, optional Time or offset at which to start the integration. If an offset or if ``step`` is integer, the actual start time will the underlying sample time nearest to the requested one. Default: 0 (start of stream). average : bool, optional Whether the output pulse profile should be the average of all entries that contributed to it, or rather the sum, in a structured array that holds both ``'data'`` and ``'count'`` items. samples_per_frame : int, optional Number of sample times to process in one go. This can be used to optimize the process, though with many samples per pulse period the default of 1 should be fine. dtype : `~numpy.dtype`, optional Output dtype. Generally, the default of the dtype of the underlying stream is good enough, but can be used to increase precision. Note that if ``average=True``, it is the user's responsibilty to pass in a structured dtype. See Also -------- Fold : to calculate pulse profiles integrated over a given amount of time. Notes ----- One can follow this with a `~baseband_tasks.integration.Integrate` task to average over multiple pulses. If there are only few input samples per cycle, one can avoid a large overhead in calculating phases by ensuring ``samples_per_frame`` of the underlying stream is not too small (larger than, say, 20). Since phase bins are typically not an integer multiple of the underlying bin spacing, the integrated samples will generally not contain the same number of samples. The actual number of samples is counted, and for ``average=True``, the sums have been divided by these counts, with bins with no points set to ``NaN``. For ``average=False``, the arrays returned by ``read`` are structured arrays with ``data`` and ``count`` fields. .. warning: The format for ``average=False`` may change in the future. """ def __init__(self, ih, n_phase, phase, *, start=0, average=True, samples_per_frame=1, dtype=None): # Set up the integration in phase bins. phased = Integrate(ih, u.cycle/n_phase, phase, start=start, average=average, samples_per_frame=samples_per_frame*n_phase, dtype=dtype) # And ensure we reshape it to cycles. shape = (phased.shape[0] // n_phase, n_phase) + phased.shape[1:] super().__init__(phased, shape=shape, sample_rate=phased.sample_rate / n_phase, samples_per_frame=samples_per_frame, dtype=dtype) self.n_phase = n_phase def _read_frame(self, frame_index): # Read frame in phased directly, bypassing its ``read`` method. out = self.ih._read_frame(frame_index) # Remove a possible incomplete cycle for the last frame. if len(out) != self.ih.samples_per_frame: out = out[:(len(out) // self.n_phase) * self.n_phase] return out.reshape((-1,) + self.sample_shape) def _tell_time(self, offset): return self.ih._tell_time(offset * self.n_phase)