# Licensed under the GPLv3 - see LICENSE
import numpy as np
from astropy.utils import lazyproperty
from .base import PaddedTaskBase
from .channelize import Channelize, Dechannelize
__all__ = ['sinc_hamming', 'PolyphaseFilterBankSamples',
'PolyphaseFilterBank', 'InversePolyphaseFilterBank']
[docs]def sinc_hamming(n_tap, n_sample, sinc_scale=1.):
r"""Construct a sinc-hamming polyphase filter.
The sinc-hamming filter is defined by
.. math:: F(n_{\rm tap}, n_{\rm samle}, s) &=
\left(\frac{\sin(\pi x)}{\pi x}\right)
\left(0.54 - 0.46\cos\left(\frac{2\pi k}{N-1}\right)\right),\\
{\rm with~~}
x &= n_{\rm tap} s \left(\frac{k}{N} - 0.5\right),\\
N &= n_{\rm tap} n_{\rm sample}, \quad 0 \leq k \leq N-1.
Parameters
----------
n_tap : int
Number of taps of the polyphase filter.
n_sample : int
Number of samples to pass on to the FFT stage.
sinc_scale : float
Possible scaling for the sinc factor, to widen or narrow the channels.
Examples
--------
Construct the CHIME and GUPPI baseband polyphase filter responses::
>>> from baseband_tasks.pfb import sinc_hamming
>>> chime_ppf = sinc_hamming(4, 2048)
>>> guppi_ppf = sinc_hamming(12, 64, sinc_scale=0.95)
"""
n = n_tap * n_sample
x = n_tap * sinc_scale * np.linspace(-0.5, 0.5, n, endpoint=False)
return (np.sinc(x) * np.hamming(n)).reshape(n_tap, n_sample)
[docs]class PolyphaseFilterBankSamples(Channelize):
"""Channelize using a polyphase filter bank, in the time domain.
Parameters
----------
ih : task or `baseband` stream reader
Input data stream, with time as the first axis.
response : `~numpy.ndarray`
Polyphase filter. The first dimension is taken to be the
number of taps, and the second the number of channels.
samples_per_frame : int, optional
Number of complete output samples per frame. Default: inferred from
padding, ensuring an efficiency of at least 75%.
frequency : `~astropy.units.Quantity`, optional
Frequencies for each channel in ``ih`` (channelized frequencies will
be calculated). Default: taken from ``ih`` (if available).
sideband : array, optional
Whether frequencies in ``ih`` are upper (+1) or lower (-1) sideband.
Default: taken from ``ih`` (if available).
See Also
--------
PolyphaseFilterBank : apply filter in the Fourier domain (usually faster)
"""
def __init__(self, ih, response, samples_per_frame=None,
frequency=None, sideband=None):
n_tap, n = response.shape
pad = (n_tap - 1) * n
if samples_per_frame is not None:
samples_per_frame = samples_per_frame * n
assert pad % 2 == 0
# Note: cannot easily use Convolve since this doesn't convolve
# every sample, but rather each n_chan samples. In principle,
# it could be done via a ReshapeSamples followed by a Convolve.
self.padded = PaddedTaskBase(ih, pad_start=pad//2, pad_end=pad//2,
samples_per_frame=samples_per_frame)
self.padded.task = self.ppf
self._response = response
super().__init__(self.padded, n, self.padded.samples_per_frame // n,
frequency=frequency, sideband=sideband)
self._reshape = ((self.padded._ih_samples_per_frame // n, n)
+ self.ih.sample_shape)
[docs] def ppf(self, data):
"""Apply the PolyPhase Filter, in the time domain."""
data = data.reshape(self._reshape)
result = np.empty((self.samples_per_frame,) + data.shape[1:],
data.dtype)
# TODO: use stride tricks to do this in one go.
n_tap = len(self._response)
for i in range(data.shape[0] + 1 - n_tap):
result[i] = (data[i:i+n_tap] * self._response).sum(0)
return result.reshape((-1,) + result.shape[2:])
[docs]class PolyphaseFilterBank(PolyphaseFilterBankSamples):
"""Channelize using a polyphase filter bank, in the frequency domain.
Parameters
----------
ih : task or `baseband` stream reader
Input data stream, with time as the first axis.
response : `~numpy.ndarray`
Polyphase filter. The first dimension is taken to be the
number of taps, and the second the number of channels.
samples_per_frame : int, optional
Number of complete output samples per frame (see Notes). Default: 1.
frequency : `~astropy.units.Quantity`, optional
Frequencies for each channel in ``ih`` (channelized frequencies will
be calculated). Default: taken from ``ih`` (if available).
sideband : array, optional
Whether frequencies in ``ih`` are upper (+1) or lower (-1) sideband.
Default: taken from ``ih`` (if available).
See Also
--------
PolyphaseFilterBankSamples : filter in the time domain (usually slower).
InversePolyphaseFilterBank : undo the effect of a polyphase filter.
baseband_tasks.fourier.fft_maker : to select the FFT package used.
"""
def __init__(self, ih, response, samples_per_frame=None,
frequency=None, sideband=None):
super().__init__(ih, response=response,
samples_per_frame=samples_per_frame,
frequency=frequency, sideband=sideband)
self._ppf_fft = self._FFT(shape=self._reshape, dtype=self.ih.dtype)
self._ppf_ifft = self._ppf_fft.inverse()
@lazyproperty
def _ft_response_conj(self):
long_response = np.zeros(self._reshape[:2], self.ih.dtype)
long_response[:self._response.shape[0]] = self._response
long_response.shape = (long_response.shape
+ (1,) * len(self.ih.sample_shape))
fft = self._FFT(shape=long_response.shape, dtype=self.ih.dtype)
return fft(long_response).conj()
[docs] def ppf(self, data):
"""Apply the PolyPhase Filter, in the frequency domain."""
data = data.reshape(self._reshape)
ft = self._ppf_fft(data)
ft *= self._ft_response_conj
result = self._ppf_ifft(ft)
# Remove padding, which has turned around.
result = result[:result.shape[0]+1-self._response.shape[0]]
# Reshape as timestream (channelizer will immediately undo this...).
return result.reshape((-1,) + result.shape[2:])
[docs]class InversePolyphaseFilterBank(PaddedTaskBase):
"""Dechannelize a stream produced by a polyphase filter bank.
A polyphase filterbank attempts to make channel responses squarer, by
convolving an input timestream with sinc-like function before doing a
Fourier transform. This class attempts to deconvolve the signal, given the
polyphase filter response. Like in most convolutions, a polyphase filter
generally destroys some information, especially for frequencies near the
edges of the channels. To optimize the process, Wiener deconvolution is
used.
The signal-to-noise ratio required for Wiener deconvolution is a
combination of the response-dependent quality with which any signal can be
recovered and the quality with which the signal was sampled. For CHIME,
where channels are digitized with 4 bits, simulations show that if 3
levels were covering the standard deviation of the input signal, ``sn=10``
works fairly well. For GUPPI, which has 8 bit digitization but a response
that strongly suppresses band edges, ``sn=30`` seems a good number.
The deconvolution necessarily works poorly near edges in time, so should
be done in overlapping blocks. Required padding is set with ``pad_start``
and ``pad_end`` (which are in addition to the minimum padding required by
the response). Adequate padding depend on response; from simulations, for
CHIME a padding of 32 on both sides seems to suffice, while for GUPPI 128
is needed.
Parameters
----------
ih : task or `baseband` stream reader
Input data stream, with time as the first axis, and Fourier channel
as the second.
response : `~numpy.ndarray`
Polyphase filter. The first dimension is taken to be the
number of taps, and the second the number of channels.
sn : float
Effective signal-to-noise ratio; see note above.
pad_start, pad_end : int, optional
Extra samples at the start and end to pad the frame being deconvolved;
see note above. Default: 128.
samples_per_frame : int, optional
Number of complete output samples per frame. Default: inferred from
padding such that a minimum efficiency of 75% is reached.
frequency : `~astropy.units.Quantity`, optional
Frequencies for each output channel. Default: inferred from ``ih``
(if available).
sideband : array, optional
Whether frequencies are upper (+1) or lower (-1) sideband.
Default: taken from ``ih`` (if available).
See Also
--------
PolyphaseFilterBank : apply polyphase filter.
baseband_tasks.fourier.fft_maker : to select the FFT package used.
"""
def __init__(self, ih, response, sn, pad_start=128, pad_end=128,
samples_per_frame=None, frequency=None, sideband=None,
dtype=None):
n_tap, n = response.shape
self.dechannelized = Dechannelize(
ih, n=n, samples_per_frame=None, frequency=frequency,
sideband=sideband, dtype=dtype)
self._FFT = self.dechannelized._FFT
pad_minimum = (n_tap - 1) * n
assert pad_minimum % 2 == 0
pad_start = pad_start * n + pad_minimum // 2
pad_end = pad_end * n + pad_minimum // 2
super().__init__(self.dechannelized,
pad_start=pad_start, pad_end=pad_end,
samples_per_frame=samples_per_frame)
self._response = response
self._reshape = ((self._ih_samples_per_frame // n, n)
+ self.ih.sample_shape)
self._ppf_fft = self._FFT(
shape=self._reshape, dtype=self.dtype)
self._ppf_ifft = self._ppf_fft.inverse()
self._inv_sn2 = 1. / (sn * sn)
@lazyproperty
def _ft_inverse_response(self):
"""Wiener deconvolution filter based on the PFB response."""
long_response = np.zeros(self._reshape[:2], self.dtype)
long_response[:self._response.shape[0]] = self._response
long_response.shape = (long_response.shape
+ (1,) * len(self.ih.sample_shape))
fft = self._FFT(shape=long_response.shape, dtype=self.ih.dtype)
ft_response = fft(long_response).conj()
inverse = (ft_response.conj()
/ (ft_response.real ** 2 + ft_response.imag ** 2
+ self._inv_sn2)) * (1 + self._inv_sn2)
return inverse
[docs] def task(self, data):
"""Apply the inverse polyphase filter to a frame.
Padding is removed from the result.
"""
data = data.reshape(self._reshape)
ft = self._ppf_fft(data)
ft *= self._ft_inverse_response
result = self._ppf_ifft(ft)
# Reshape as timestream
result = result.reshape((-1,) + result.shape[2:])
# Remove padding.
return result[self._pad_start:result.shape[0]-self._pad_end]