Source code for baseband_tasks.convolution

# Licensed under the GPLv3 - see LICENSE
"""Convolution tasks."""
import numpy as np
from astropy.utils import lazyproperty

from .base import PaddedTaskBase, check_broadcast_to
from .fourier import fft_maker


__all__ = ['ConvolveSamples', 'Convolve']


[docs]class ConvolveSamples(PaddedTaskBase): """Convolve a time stream with a response, in the time domain. Parameters ---------- ih : task or `baseband` stream reader Input data stream, with time as the first axis. response : `~numpy.ndarray` Response to convolce the time stream with. If one-dimensional, assumed to apply to the sample axis of ``ih``. offset : int, optional Where samples should be considered to be taken from. For the default of 0, a given sample has the same time as the convolution of the filter with all preceding samples. samples_per_frame : int, optional Number of convolved samples which should be produced in one go. The number of input samples used will be larger to avoid wrapping. If not given, as produced by the larger of the input samples per frame minus padding or the minimum power of 2 of input samples that yields at least 75% efficiency. See Also -------- Convolve : convolution in the Fourier domain (usually faster) """ def __init__(self, ih, response, *, offset=0, samples_per_frame=None): if response.ndim == 1 and ih.ndim > 1: response = response.reshape(response.shape[:1] + (1,) * (ih.ndim - 1)) else: check_broadcast_to(response, response.shape[:1] + ih.sample_shape) pad = response.shape[0] - 1 super().__init__(ih, pad_start=pad-offset, pad_end=offset, samples_per_frame=samples_per_frame) self._response = response
[docs] def task(self, data): result = np.empty((self.samples_per_frame,) + self.sample_shape, dtype=self.dtype) response = np.broadcast_to(self._response, self._response.shape[:1]+self.sample_shape) for index in np.ndindex(self.sample_shape): index = (slice(None),) + index result[index] = np.convolve(data[index], response[index], mode='valid') return result
[docs]class Convolve(ConvolveSamples): """Convolve a time stream with a response, in the Fourier domain. The convolution is done via multiplication in the Fourier domain, which is faster than direct convolution for all but very simple responses. Parameters ---------- ih : task or `baseband` stream reader Input data stream, with time as the first axis. response : `~numpy.ndarray` Response to convolce the time stream with. If one-dimensional, assumed to apply to the sample axis of ``ih``. offset : int, optional Where samples should be considered to be taken from. For the default of 0, a given sample has the same time as the convolution of the filter with all preceding samples. samples_per_frame : int, optional Number of convolved samples which should be produced in one go. The number of input samples used will be larger to avoid wrapping. If not given, as produced by the larger of the input samples per frame minus padding or the minimum power of 2 of input samples that yields at least 75% efficiency. See Also -------- ConvolveSamples : convolution in the time domain (for simple responses) baseband_tasks.fourier.fft_maker : to select the FFT package used. """ def __init__(self, ih, response, *, offset=0, samples_per_frame=None): super().__init__(ih, response=response, offset=offset, samples_per_frame=samples_per_frame) # Initialize FFTs for fine channelization and the inverse. self._FFT = fft_maker.get() self._fft = self._FFT(shape=(self._ih_samples_per_frame,) + self.ih.sample_shape, dtype=self.ih.dtype, sample_rate=self.ih.sample_rate) self._ifft = self._fft.inverse() @lazyproperty def _ft_response(self): long_response = np.zeros((self._ih_samples_per_frame,) + self._response.shape[1:], self.dtype) long_response[:self._response.shape[0]] = self._response fft = self._FFT(shape=long_response.shape, dtype=self.dtype) return fft(long_response)
[docs] def task(self, data): ft = self._fft(data) ft *= self._ft_response result = self._ifft(ft) return result[self._pad_start + self._pad_end:]
[docs] def close(self): super().close() # Clear the caches of the lazyproperties to release memory. del self._ft_response del self._fft del self._ifft