Source code for baseband_tasks.generators

# Licensed under the GPLv3 - see LICENSE
"""Collection of source generator classes.

All these look like stream readers and thus are useful to test pipelines
with artificial data.
"""
import numpy as np

from .base import Base


__all__ = ['StreamGenerator', 'EmptyStreamGenerator',
           'Noise', 'NoiseGenerator']


[docs]class StreamGenerator(Base): """Generator of data produced by a user-provided function. The function needs to be aware of stream structure. As an alternative, use `~baseband_tasks.generators.EmptyStreamGenerator` to generate an empty stream and add a `~baseband_tasks.base.Task` that fills data arrays. Parameters ---------- function : callable Function that takes one argument, the Source instance, and returns data with the correct shape, i.e., ``samples_per_frame`` samples of sample shape ``shape[1:]``. The function can count on the instance being at the start of the frame (i.e., ``instance.tell()`` is correct). shape : tuple First element is the total number of samples of the fake file, the others are the sample shape. start_time : `~astropy.time.Time` Start time of the fake file. sample_rate : `~astropy.units.Quantity` Sample rate, in units of frequency. samples_per_frame : int Blocking factor. This can be used for efficiency to reduce the overhead of calling the source function. dtype : `~numpy.dtype` or anything that initializes one, optional Type of data produced. Default: ``complex64``. --- **kwargs : meta data for the stream, which usually include frequency : `~astropy.units.Quantity`, optional Frequencies for each channel. Should be broadcastable to the sample shape. Default: unknown. sideband : array, optional Whether frequencies are upper (+1) or lower (-1) sideband. Should be broadcastable to the sample shape. Default: unknown. polarization : array or (nested) list of char, optional Polarization labels. Should broadcast to the sample shape, i.e., the labels are in the correct axis. For instance, ``['X', 'Y']``, or ``[['L'], ['R']]``. Default: unknown. Examples -------- Produce alternating ones and zeros. >>> from baseband_tasks.generators import StreamGenerator >>> import numpy as np >>> from astropy.time import Time >>> from astropy import units as u >>> def alternate(sh): ... return np.full((1,) + sh.shape[1:], sh.tell() % 2 == 1, sh.dtype) ... >>> sh = StreamGenerator(alternate, (10, 6), Time('2010-11-12'), 10.*u.Hz) >>> sh.seek(5) 5 >>> sh.read() # doctest: +FLOAT_CMP array([[1.+0.j, 1.+0.j, 1.+0.j, 1.+0.j, 1.+0.j, 1.+0.j], [0.+0.j, 0.+0.j, 0.+0.j, 0.+0.j, 0.+0.j, 0.+0.j], [1.+0.j, 1.+0.j, 1.+0.j, 1.+0.j, 1.+0.j, 1.+0.j], [0.+0.j, 0.+0.j, 0.+0.j, 0.+0.j, 0.+0.j, 0.+0.j], [1.+0.j, 1.+0.j, 1.+0.j, 1.+0.j, 1.+0.j, 1.+0.j]], dtype=complex64) """ def __init__(self, function, shape, start_time, sample_rate, samples_per_frame=1, dtype=np.complex64, **kwargs): super().__init__(shape=shape, start_time=start_time, sample_rate=sample_rate, samples_per_frame=samples_per_frame, dtype=dtype, **kwargs) self._function = function def _read_frame(self, frame_index): # Apply function to generate data. Note that the read() function # in base ensures that our offset pointer is correct. return self._function(self)
[docs]class EmptyStreamGenerator(Base): """Generator of an empty data stream. The stream is meant to be filled with a `~baseband_tasks.base.Task`. Parameters ---------- shape : tuple First element is the total number of samples of the fake file, the others are the sample shape. start_time : `~astropy.time.Time` Start time of the fake file. sample_rate : `~astropy.units.Quantity` Sample rate, in units of frequency. samples_per_frame : int Blocking factor. This is mostly useful to make the function task that uses the stream more efficient. dtype : `~numpy.dtype` or anything that initializes one, optional Type of data produced. Default: ``complex64``. --- **kwargs : meta data for the stream, which usually include frequency : `~astropy.units.Quantity`, optional Frequencies for each channel. Should be broadcastable to the sample shape. Default: unknown. sideband : array, optional Whether frequencies are upper (+1) or lower (-1) sideband. Should be broadcastable to the sample shape. Default: unknown. polarization : array or (nested) list of char, optional Polarization labels. Should broadcast to the sample shape, i.e., the labels are in the correct axis. For instance, ``['X', 'Y']``, or ``[['L'], ['R']]``. Default: unknown. Examples -------- Produce alternating +/-1 in single-channel data with decent-sized blocks. >>> from baseband_tasks.generators import EmptyStreamGenerator >>> from baseband_tasks.base import Task >>> import numpy as np >>> from astropy import time as t, units as u >>> def alternate(data): ... value = 2 * (np.arange(data.shape[0]) % 2) - 1 ... data[...] = value ... return data ... >>> eh = EmptyStreamGenerator((1000,), t.Time('2010-11-12'), ... 1.*u.kHz, samples_per_frame=100, ... dtype='f4') >>> sh = Task(eh, alternate) >>> sh.seek(995) 995 >>> sh.read() # doctest: +FLOAT_CMP array([ 1., -1., 1., -1., 1.], dtype=float32) """ def _read_frame(self, frame_index): return np.empty((self.samples_per_frame,) + self.shape[1:], self.dtype)
[docs]class Noise: """Helper class providing source callables for NoiseSource. When called, will provide a frame worth of normally distributed data, but using the `~numpy.random.Philox` bit generator to ensure that if the same frame is read again, the same random data are generated. Parameters ---------- seed : int Initial seed for `~numpy.random.Philox`. Notes ----- Data is identical between invocations only if seeded identically. """ def __init__(self, seed=None): self.seed = seed self.rng = np.random.Generator(np.random.Philox(self.seed)) # We store a base state with no buffers set, etc., since we # can use that to quickly reset the state for a new counter. self.bg_state = self.rng.bit_generator.state
[docs] def __call__(self, sh): # We're guaranteed to be at the start of a frame here. # Use the offset as the second uint64 in the counter to # ensure we get independent but reproducible frame data. self.bg_state['state']['counter'][1] = sh.tell() self.rng.bit_generator.state = self.bg_state shape = (sh.samples_per_frame,) + sh.sample_shape if sh.complex_data: shape = shape[:-1] + (shape[-1] * 2,) numbers = self.rng.normal(size=shape) if sh.complex_data: numbers = numbers.view(np.complex128) return numbers.astype(sh.dtype, copy=False)
[docs]class NoiseGenerator(StreamGenerator): """Genertator of a stream of normally distributed noise. To mimic proper streams, data is guaranteed to be identical if read multiple times from a given instance. This is done by storing the state of the random number generator for each "data frame". Given this, it is important to choose ``samples_per_frame`` wisely, such that frame sizes are at least of order millions of samples. Parameters ---------- shape : tuple First element is the total number of samples of the fake file, the others are the sample shape. start_time : `~astropy.time.Time` Start time of the fake file. sample_rate : `~astropy.units.Quantity` Sample rate, in units of frequency. samples_per_frame : int, optional Blocking factor, setting the size of the fake data frames. No default, since should typically be large (see above). dtype : `~numpy.dtype` or anything that initializes one, optional Type of data produced. Default: ``complex64`` seed : int, optional Possible seed to initialize the random number generator. --- **kwargs : meta data for the stream, which usually include frequency : `~astropy.units.Quantity`, optional Frequencies for each channel. Should be broadcastable to the sample shape. Default: unknown. sideband : array, optional Whether frequencies are upper (+1) or lower (-1) sideband. Should be broadcastable to the sample shape. Default: unknown. polarization : array or (nested) list of char, optional Polarization labels. Should broadcast to the sample shape, i.e., the labels are in the correct axis. For instance, ``['X', 'Y']``, or ``[['L'], ['R']]``. Default: unknown. Notes ----- Between instances, data is identical only if seeded identically *and* if first access of frames is done in the same order, with the same number of samples per frame. """ def __init__(self, shape, start_time, sample_rate, samples_per_frame, dtype=np.complex64, seed=None, **kwargs): generator = Noise(seed) super().__init__(function=generator, shape=shape, start_time=start_time, sample_rate=sample_rate, samples_per_frame=samples_per_frame, dtype=dtype, **kwargs)