Source code for baseband_tasks.phases.phase

# -*- coding: utf-8 -*-
# Licensed under the GPLv3 - see LICENSE
"""Provide a Phase class with integer and fractional part."""

import numpy as np
from astropy import units as u
from astropy.coordinates import Angle, Longitude
from astropy.time.utils import day_frac


__all__ = ['Phase', 'FractionalPhase']


FRACTION_UFUNCS = {np.cos, np.sin, np.tan, np.spacing}

COMPARISON_UFUNCS = {
    np.equal, np.not_equal,
    np.less, np.less_equal, np.greater, np.greater_equal}


def _parse_string(s):
    """Parse a string into two doubles.

    Generally, first part before the decimal dot, second after,
    but takes account of possible exponents.
    """
    s = s.strip().lower().translate(s.maketrans('d', 'e'))
    if s[-1] == 'j':
        s = s[:-1]
        factor = 1j
    else:
        factor = 1
    if s[0] == '+':
        s = s[1:]
    elif s[0] == '-':
        s = s[1:]
        factor *= -1

    # Just test string is basically OK (and reference value below).
    test = float(s) * factor

    s_float, exp, s_exp = s.partition('e')
    s_count, sep, s_frac = s_float.rpartition('.')
    if exp:
        exponent = int(s_exp)
        if exponent < 0:
            n = min(len(s_count), -exponent)
            s_frac = s_count[-n:] + s_frac
            s_count = s_count[:-n]
            exponent += n
        elif exponent > 0:
            n = min(len(s_frac), exponent)
            s_count = s_count + s_frac[:n]
            s_frac = s_frac[n:]
            exponent -= n
        factor *= 10**exponent

    frac = float('0.' + s_frac) * factor
    count = float('0' + s_count) * factor

    assert count + frac == test

    return count, frac


_parse_strings = np.vectorize(_parse_string, otypes=[complex, complex])


[docs]class FractionalPhase(Longitude): """Phase without the cycle count, i.e., with a range of 1 cycle. This subclass of `~astropy.coordinates.Longitude` differs from it mostly in being able to take the fractional part of any `~baseband_tasks.phases.Phase` input. Parameters ---------- angle : array, list, scalar, `~astropy.units.Quantity`, :class:`~astropy.coordinates.Angle` The angle value(s). If a tuple, will be interpreted as ``(h, m s)`` or ``(d, m, s)`` depending on ``unit``. If a string, it will be interpreted following the rules described for :class:`~astropy.coordinates.Angle`. unit : :class:`~astropy.units.UnitBase`, str, optional The unit of the value specified for the angle. This may be any string that `~astropy.units.Unit` understands. Must be an angular unit. Default is 'cycle'. wrap_angle : :class:`~astropy.coordinates.Angle` or equivalent, optional Angle at which to wrap back to ``wrap_angle - 1 cycle``. If ``None`` (default), it will be taken to be 0.5 cycle unless ``angle`` has a ``wrap_angle`` attribute. Raises ------ `~astropy.units.UnitsError` If a unit is not provided or it is not an angular unit. `TypeError` If the angle parameter is a :class:`~astropy.coordinates.Latitude`. """ _default_wrap_angle = Angle(0.5, u.cycle) _equivalent_unit = _default_unit = u.cycle def __new__(cls, angle, unit=None, wrap_angle=None, **kwargs): # TODO: ideally, the Longitude/Angle/Quantity initializer by # default tries to convert to float also for structured arrays, # maybe via astype. if isinstance(angle, Phase): angle = angle['frac'] return super().__new__(cls, angle, unit=unit, wrap_angle=wrap_angle, **kwargs)
def check_imaginary(a): """Check whether a value is purely imaginary or purely real. Parameters ---------- a : array_like or `~numpy.ndarray` subclass Returns ------- real, imaginary : array of float, bool A float array, either just the input array if not complex or the real or imaginary part if complex, with the bool indicating whether it is the imaginary part. Raises ------ ValueError If the input is complex and is not purely real or purely imaginary. """ if np.iscomplexobj(a): if np.all(a.real == 0): return a.imag, True elif np.all(a.imag == 0): return a.real, False else: raise ValueError("cannot have mixed real/imaginary Phase") else: return a, False
[docs]class Phase(Angle): """Represent two-part phase. With one part the integer cycle count and the other the fractional phase. The class is a high-precision version of `~astropy.coordinates.Angle` that aims to behave just like it, but use its precision in operations where possible -- decaying to a `~astropy.units.Quantity` otherwise. For instance, addition and subtraction preserve precision, as do multiplication and division with dimensionless quantities. Similarly, trigonometric functions use just the fractional phase. The phase can either be purely real or purely imaginary, not mixed. If imaginary, using it in `~numpy.exp` will again preserve precision. Parameters ---------- phase1, phase2 : array or `~astropy.units.Quantity` Two-part phase. If not quantities, the assumed units are cycles. copy : bool, optional Ensure a copy is made. Only relevant if ``phase1`` is a `Phase` and ``phase2`` is not given. subok : bool, optional If `False` (default), the returned array will be forced to be a `Phase`. Otherwise, `Phase` subclasses will be passed through. Only relevant if ``phase1`` or ``phase2`` is a `Phase` subclass. Notes ----- The machinery to keep precision is not complete; in particular, reductions such as summing along an axis will currently lose precision. Strings passed in to ``phase1`` or ``phase2`` are first converted to standard doubles, which may lead to loss of precision. For long strings, use the `~baseband_tasks.phases.Phase.from_string` class method instead. """ _equivalent_unit = _unit = _default_unit = u.cycle _phase_dtype = np.dtype({'names': ['int', 'frac'], 'formats': ['f8']*2}) def __new__(cls, phase1, phase2=None, copy=True, subok=False): if isinstance(phase1, Phase): if phase2 is not None: phase1 = phase1 + phase2 copy = False if not subok and type(phase1) is not cls: phase1 = phase1.view(cls) return phase1.copy() if copy else phase1 phase1 = Angle(phase1, cls._unit, copy=False) if phase2 is not None: if isinstance(phase2, Phase): phase2 = phase2 + phase1 if not subok and type(phase2) is not cls: phase2 = phase2.view(cls) return phase2 phase2 = Angle(phase2, cls._unit, copy=False) return cls.from_angles(phase1, phase2)
[docs] @classmethod def from_angles(cls, phase1, phase2=None, factor=None, divisor=None, out=None): """Create a Phase instance from two angles. The two angles will be added, and possibly multiplied by a factor or divided by a divisor, preserving precision using two doubles, one for the integer part and one for the remainder. Note that this class method is mostly meant for internal use. Parameters ---------- phase1 : `~astropy.units.Quantity` With angular units. phase2 : `~astropy.units.Quantity` or `None` With angular units. factor : float or complex Posisble factor to multiply the angles with. divisor : float or complex Posisble divisor to divide the angles by. Raises ------ ValueError If the result is not purely real or purely imaginary """ phase1, imaginary = check_imaginary(phase1) if phase2 is not None: phase2, im2 = check_imaginary(phase2) if im2 is not imaginary: raise ValueError("phase1 and phase2 must either be both " "real or both imaginary.") if factor is not None: factor, imf = check_imaginary(factor) imaginary ^= imf if divisor is not None: divisor, imd = check_imaginary(divisor) if imd and not imaginary: divisor = -divisor imaginary ^= imd # TODO: would be nice if day_frac had an out parameter. phase1_value = phase1.to_value(cls._unit) if phase2 is None: phase2_value = 0. else: phase2_value = phase2.to_value(cls._unit) count, fraction = day_frac(phase1_value, phase2_value, factor=factor, divisor=divisor) if out is None: value = np.empty(count.shape, cls._phase_dtype) out = value.view(cls) else: value = out.view(np.ndarray) value['int'] = count value['frac'] = fraction out.imaginary = imaginary return out
def __array_finalize__(self, obj): super().__array_finalize__(obj) self.imaginary = getattr(obj, 'imaginary', False) def __getitem__(self, item): result = super().__getitem__(item) if result.dtype is self.dtype: return result if item == 'frac': result = result.view(Angle if self.imaginary else FractionalPhase) else: assert item == 'int' result = result.view(Angle) if self.imaginary: result = result * 1j return result def __iter__(self): if self.isscalar: raise TypeError( "'{cls}' object with a scalar value is not iterable" .format(cls=self.__class__.__name__)) # Override Quantity.__iter__ since that iterates over self.value. def phase_iter(): for i in range(len(self)): yield self[i] return phase_iter() def _set_unit(self, unit): if unit is None or unit != self._unit: raise u.UnitTypeError( "{0} instances require units of '{1}'" .format(type(self).__name__, self._unit) + (", but no unit was given." if unit is None else ", so cannot set it to '{0}'.".format(unit))) super()._set_unit(unit) def __repr__(self): v = self.view(np.ndarray) return "{0}({1}{3} cycle, {2}{3} cycle)".format( self.__class__.__name__, v['int'], v['frac'], " * 1j" if self.imaginary else '') def __str__(self): # Override Angle, since one cannot override the formatter for # structured dtype in array2string. def formatter(x): # [...] and view as self.__class__ are needed for scalar case. return x[...].view(self.dtype, self.__class__).to_string() return np.array2string(self.view(f"V{self.dtype.itemsize}"), formatter={'all': formatter}) + self._unitstr def __format__(self, format_spec): """Format a phase, special-casing the float format. For the 'f' format, precision is kept, and the unit is suppressed. For everything else, the Quantity formatter is used. """ if format_spec.endswith('f'): # Check that formatting works at all... test = format(self.value, format_spec) pre, dot, post = test.partition('.') if post: precise = self.to_string(precision=len(post)) pre, _, post = precise.partition('.') # Just to ensure no bad rounding happened pre = format(float(pre), format_spec).partition('.')[0] return pre + dot + post return self.cycle.__format__(format_spec)
[docs] def to_string(self, unit=None, decimal=True, sep='fromunit', precision=None, alwayssign=False, pad=False, fields=3, format=None): """ A string representation of the Phase. By default, uses a decimal representation that is guaranteed to preserve precision to within 1e-16 cycles. Otherwise, uses `astropy.coordinates.Angle.to_string`. """ if not decimal or (unit is not None and unit != u.cycle): return self.cycle.to_string( unit=unit, decimal=decimal, sep=sep, precision=precision, alwayssign=alwayssign, pad=pad, fields=fields, format=format) if precision is None: func = str else: func = ("{0:1." + str(precision) + "f}").format def do_format(count, frac): neg = (count + frac) < 0 if neg: count = -count frac = -frac sign = '-' elif alwayssign: sign = '+' else: sign = '' if frac < 0: frac += 1 count -= 1 if frac < 0.25: # Ensure that we do not get 1e-16, etc., yet can use numpy's # guarantee that the right number of digits is shown. frac_str = func(frac+0.25) f24 = int(frac_str[2:4]) if func is str and (len(frac_str) == 3 or (len(frac_str) == 4 and frac_str[3] == '5')): if len(frac_str) == 3: f24 = '{:02d}'.format(f24 * 10 - 25) else: f24 = '{:1d}'.format((f24 - 5) // 10 - 2) else: f24 = '{:02d}'.format(f24 - 25) frac_str = frac_str[:2] + f24 + frac_str[4:] else: frac_str = func(frac) if frac_str[0] == '1': count += 1 s = sign + str(int(count)) + frac_str[1:] if self.imaginary: s += 'j' if format == 'latex': s = '$' + s + '$' return s format_ufunc = np.vectorize(do_format, otypes=['U']) if self.imaginary: count, frac = self['int'].value.imag, self['frac'].value.imag else: count, frac = self['int'].value, self['frac'].value result = format_ufunc(count, frac) if result.ndim == 0: result = result[()] return result
[docs] @classmethod def from_string(cls, string): """Create Phase instance from a long string. The string has to be a standard decimal string, i.e., no attempt is made to parse an angle. """ string = np.asanyarray(string) if string.dtype.kind not in 'SU': raise ValueError('require string input.') count, frac = _parse_strings(string) return cls(count, frac)
@property def int(self): """Rounded cycle count.""" return self['int'] @property def frac(self): """Fractional phase, between -0.5 and 0.5 cycles.""" return self['frac'] @property def cycle(self): """Full cycle, including phase, as a regular Angle. The result will use a standard double, and thus likely loose precision. """ return self['int'] + self['frac']
[docs] def to_value(self, unit=None, equivalencies=[]): """The numerical value, possibly in a different unit. The result will use a standard double, and thus likely loose precision. """ return self.cycle.to_value(unit, equivalencies)
value = property(to_value, doc="""The numerical value. The result will use a standard double, and thus likely loose precision. See also -------- to_value : Get the numerical value in a given unit. """)
[docs] def to(self, unit, equivalencies=[]): """The phase in a different unit. For any unit except "cycle", this will likely loose precision as an `~astropy.coordinates.Angle` or `~astropy.units.Quantity` is returned. """ if unit == u.cycle: return self.copy() else: return self.cycle.to(unit, equivalencies=equivalencies)
def _take_along_axis(self, indices, axis=None, keepdims=False): # More or less a straight copy from Time. if axis is None: return self[np.unravel_index(indices, self.shape)] if indices.ndim == self.ndim - 1: indices = np.expand_dims(indices, axis) result = np.take_along_axis(self, indices, axis) return result if keepdims else result.squeeze(axis)
[docs] def argmin(self, axis=None, out=None): """Return indices of the minimum values along the given axis.""" approx = np.min(self.cycle, axis, keepdims=True) dt = (self['int'] - approx) + self['frac'] return dt.argmin(axis, out)
[docs] def argmax(self, axis=None, out=None): """Return indices of the maximum values along the given axis.""" approx = np.max(self.cycle, axis, keepdims=True) dt = (self['int'] - approx) + self['frac'] return dt.argmax(axis, out)
[docs] def argsort(self, axis=-1): """Returns the indices that would sort the phase array.""" phase_approx = self.cycle phase_remainder = (self - phase_approx).cycle if axis is None: return np.lexsort((phase_remainder.ravel(), phase_approx.ravel())) else: return np.lexsort(keys=(phase_remainder, phase_approx), axis=axis)
# Below are basically straight copies from Time
[docs] def min(self, axis=None, out=None, keepdims=False): """Minimum along a given axis. This is similar to :meth:`~numpy.ndarray.min`, but adapted to ensure that the full precision is used. """ if out is not None: raise ValueError("An `out` argument is not yet supported.") return self._take_along_axis(self.argmin(axis), axis, keepdims)
[docs] def max(self, axis=None, out=None, keepdims=False): """Maximum along a given axis. This is similar to :meth:`~numpy.ndarray.max`, but adapted to ensure that the full precision is used. """ if out is not None: raise ValueError("An `out` argument is not yet supported.") return self._take_along_axis(self.argmax(axis), axis, keepdims)
[docs] def ptp(self, axis=None, out=None, keepdims=False): """Peak to peak (maximum - minimum) along a given axis. This is similar to :meth:`~numpy.ndarray.ptp`, but adapted to ensure that the full precision is used. """ if out is not None: raise ValueError("An `out` argument is not yet supported.") return (self.max(axis, keepdims=keepdims) - self.min(axis, keepdims=keepdims))
[docs] def sort(self, axis=-1): """Return a copy sorted along the specified axis. This is similar to :meth:`~numpy.ndarray.sort`, but internally uses indexing with :func:`~numpy.lexsort` to ensure that the full precision given by the two doubles is kept. Parameters ---------- axis : int or None Axis to be sorted. If ``None``, the flattened array is sorted. By default, sort over the last axis. """ return self._take_along_axis(self.argsort(axis), axis, keepdims=True)
# Quantity lets ndarray.__eq__, __ne__ deal with structured arrays like us. # Override this so we can deal with it in __array_ufunc__. def __eq__(self, other): try: return np.equal(self, other) except u.UnitsError: return False except Exception: return NotImplemented def __ne__(self, other): try: return np.not_equal(self, other) except u.UnitsError: return True except Exception: return NotImplemented def __array_ufunc__(self, function, method, *inputs, **kwargs): """Wrap numpy ufuncs, taking care of units. Parameters ---------- function : callable ufunc to wrap. method : str Ufunc method: ``__call__``, ``at``, ``reduce``, etc. inputs : tuple Input arrays. kwargs : keyword arguments As passed on, with ``out`` containing possible quantity output. Returns ------- result : array_like Results of the ufunc, with the unit set properly, `~baseband_tasks.phases.Phase` if possible (i.e., with units of cycles), otherwise `~astropyy.units.Quantity` or `~numpy.ndarray` as appropriate. """ # Do *not* use inputs.index(self) since that will use __eq__ for i_self, input_ in enumerate(inputs): if input_ is self: break else: i_self += 1 # Some bools for use in the if-statements below. # TODO: support reductions on add, minimum, maximum; others? basic = method == '__call__' and i_self < function.nin basic_real = basic and not self.imaginary basic_phase_out = basic out = kwargs.get('out', None) if out is not None: if len(out) == 1 and isinstance(out[0], Phase): phase_out = out[0] out = None else: phase_out = None basic_phase_out = False else: phase_out = None if function in {np.add, np.subtract} and basic and out is None: try: phases = [Phase(input_, copy=False, subok=True) for input_ in inputs] except Exception: return NotImplemented if phases[0].imaginary == phases[1].imaginary: return self.from_angles( function(phases[0]['int'], phases[1]['int']), function(phases[0]['frac'], phases[1]['frac']), out=phase_out) elif function in COMPARISON_UFUNCS and basic: phases = list(inputs) try: phases[1-i_self] = Phase(inputs[1-i_self], copy=False, subok=True) except Exception: return NotImplemented if phases[0].imaginary == phases[1].imaginary: diff = ((phases[0]['int'] - phases[1]['int']) + (phases[0]['frac'] - phases[1]['frac'])) return getattr(function, method)(diff, 0, **kwargs) elif ((function is np.multiply or function is np.divide and i_self == 0) and basic_phase_out): try: other = u.Quantity(inputs[1-i_self], u.dimensionless_unscaled, copy=False).value if function is np.multiply: return self.from_angles(self['int'], self['frac'], factor=other, out=phase_out) else: return self.from_angles(self['int'], self['frac'], divisor=other, out=phase_out) except Exception: # If not consistent with a dimensionless quantity, or mixed # real and complex, we follow the standard route of # downgrading ourself to a quantity and see if things work. pass elif (function in {np.floor_divide, np.remainder, np.divmod} and basic_real): fd_out = None if out is not None: if function is np.divmod: fd_out = out[0] phase_out = out[1] elif function is np.floor_divide: fd_out = out[0] elif phase_out is not None and function is np.floor_divide: return NotImplemented fd = np.floor_divide(self.cycle, inputs[1], out=fd_out) corr = Phase.from_angles(inputs[1], factor=fd, out=phase_out) remainder = np.subtract(self, corr, out=corr) fdx = np.floor_divide(remainder.cycle, inputs[1]) # This can likely be optimized... # Note: one cannot just loop, because rounding of exact 0.5. # TODO: check this method is really correct. if np.count_nonzero(fdx): fd += fdx corr = Phase.from_angles(inputs[1], factor=fd, out=corr) remainder = np.subtract(self, corr, out=corr) if function is np.floor_divide: return fd elif function is np.remainder: return remainder else: return fd, remainder elif function is np.positive and basic_phase_out: return self.from_angles(self['int'], self['frac'], out=phase_out) elif function is np.negative and basic_phase_out: return self.from_angles(-self['int'], -self['frac'], out=phase_out) elif function in {np.absolute, np.fabs} and basic_phase_out: # Go via view to avoid having to deal with imaginary. v = self.view(np.ndarray) return self.from_angles(u.Quantity(v['int'], u.cycle, copy=False), u.Quantity(v['frac'], u.cycle, copy=False), factor=np.sign(v['int'] + v['frac']), out=phase_out) elif function is np.rint and basic: return np.positive(self['int'], **kwargs) elif function in FRACTION_UFUNCS and basic_real: frac = self.frac.view(Angle) return function(frac, **kwargs) elif function is np.exp and basic and self.imaginary: # Avoid dimensionless_angles, but still get Quantity out. exponent = u.Quantity(self.frac.to_value(u.radian), copy=False) return function(exponent, **kwargs) # Fall-back: treat Phase as a simple Quantity. if basic: inputs = tuple((input_.cycle if isinstance(input_, Phase) else input_) for input_ in inputs) quantity = inputs[i_self] else: quantity = self.cycle if phase_out is None: return quantity.__array_ufunc__(function, method, *inputs, **kwargs) else: # We won't be able to store in a phase directly, but might # as well use one of its elements to store the angle. kwargs['out'] = (phase_out['int'],) result = quantity.__array_ufunc__(function, method, *inputs, **kwargs) return phase_out.from_angles(result, out=phase_out) def _new_view(self, obj=None, unit=None): # If the unit is not right, we should ensure we change our two-float # dtype to a single float. if unit is not None and unit != self._unit: if obj is None: obj = self.cycle elif isinstance(obj, Phase): obj = obj.cycle return super()._new_view(obj, unit)
[docs] def astype(self, dtype, order='K', casting='unsafe', subok=True, copy=True): """Copy of the array, cast to a specified type. As `numpy.ndarray.astype`, but using knowledge of format to cast to floats. """ dtype = np.dtype(dtype) if not dtype.fields and casting in {'same_kind', 'unsafe'}: parts = [self[part].astype(dtype, order=order, casting=casting, subok=subok, copy=copy) for part, copy in (('int', True), ('frac', False))] parts[0] += parts[1] return parts[0] else: return super().astype(dtype, order=order, casting=casting, subok=subok, copy=copy)