Source code for stixcore.time.datetime

"""
Array like time objects
"""

import logging
import operator

import numpy as np
from sunpy.time.timerange import TimeRange

import astropy.units as u
from astropy.time.core import Time
from astropy.utils import ShapedLikeNDArray
from astropy.utils.data_info import MixinInfo

from stixcore import get_logger
from stixcore.ephemeris.manager import Spice

__all__ = ["SCETBase", "SCETime", "SCETimeDelta", "SCETimeRange", "SEC_IN_DAY"]

# SOLO convention top bit is for time sync
MAX_COARSE = 2**32 - 1
MAX_FINE = 2**16 - 1
SEC_IN_DAY = 24 * 60 * 60


logger = get_logger(__name__)
logger.setLevel(logging.INFO)


class TimeInfo(MixinInfo):
    attr_names = MixinInfo.attr_names
    _supports_indexing = True
    _represent_as_dict_attrs = ("coarse", "fine")

    def get_sortable_arrays(self):
        pass

    #
    # @property
    # def unit(self):
    #     return None

    def new_like(self, cols, length, metadata_conflicts="warn", name=None):
        """
        Return a new instance of this class which is consistent with the
        input ``cols`` and has ``length`` rows.

        This is intended for creating an empty column object whose elements can
        be set in-place for table operations like join or vstack.

        Parameters
        ----------
        cols : list
            List of input columns
        length : int
            Length of the output column object
        metadata_conflicts : str ('warn'|'error'|'silent')
            How to handle metadata conflicts
        name : str
            Output column name

        Returns
        -------
        col : object
            New instance of this class consistent with ``cols``
        """
        # Get merged info attributes like shape, dtype, format, description, etc.
        attrs = self.merge_cols_attributes(cols, metadata_conflicts, name, ("meta", "description"))
        attrs.pop("dtype")  # Not relevant for Time

        # cols[0]
        # for col in cols[1:]:
        #     # This is the method used by __setitem__ to ensure that the right side
        #     # has a consistent location (and coerce data if necessary, but that does
        #     # not happen in this case since `col` is already a Time object).  If this
        #     # passes then any subsequent table operations via setitem will work.
        #     try:
        #         col0._make_value_equivalent(slice(None), col)
        #     except ValueError:
        #         raise ValueError('input columns have inconsistent locations')

        # Make a new Time object with the desired shape and attributes
        shape = (length,) + attrs.pop("shape")
        coarse = np.zeros(shape, dtype=np.int32)
        fine = np.zeros(shape, dtype=np.int32)
        out = self._parent_cls(coarse, fine)

        # Set remaining info attributes
        for attr, value in attrs.items():
            setattr(out.info, attr, value)

        return out


class TimeDetlaInfo(TimeInfo):
    pass
    # _represent_as_dict_attrs = ('seconds')
    # _represent_as_dict_primary_data = 'seconds'
    #
    # attrs_names = MixinInfo.attr_names | {'serialize_method'}
    #
    # def _represent_as_dict(self, attrs=None):
    #     out = super()._represent_as_dict()
    #
    #     col = self._parent
    #
    #     # If the serialize method for this context (e.g. 'fits' or 'ecsv') is
    #     # 'data_mask', that means to serialize using an explicit mask column.
    #     method = self.serialize_method[self._serialize_context]
    #
    # def __init__(self, bound=False):
    #     super().__init__(bound)
    #
    #     if bound:
    #         self.serialize_method = {'fits': 'seconds',
    #                                  None: 'seconds'}


[docs] class SCETBase(ShapedLikeNDArray): """Base time class from which SCETime and SCETimeDelta inherit.""" _astropy_column_attrs = None # Make sure reverse (radd, rsub, ...) magic methods are called over others __array_priority__ = 20000 def __new__(cls, *args, **kwargs): if len(args) == 1 and isinstance(args[0], cls): self = args[0].copy() else: self = super().__new__(cls) return self def __init__(self, coarse, fine): self.coarse = coarse self.fine = fine self.btime = (coarse.astype("int64") << 16) + fine @property def shape(self): return self.coarse.shape def _apply(self, method, *args, **kwargs): if callable(method): apply_method = lambda array: method(array, *args, **kwargs) # noqa: E731 else: if method == "replicate": apply_method = None else: apply_method = operator.methodcaller(method, *args, **kwargs) coarse, fine = self.coarse, self.fine if apply_method: coarse = apply_method(coarse) fine = apply_method(fine) out = self.__class__(coarse, fine) if "info" in self.__dict__: out.info = self.info return out
[docs] def get_scedays(self, *, timestamp=False): days, frac = np.divmod(self.coarse, SEC_IN_DAY) return days * SEC_IN_DAY if timestamp else days
[docs] def as_bintime(self): """ Return a uint64 representation of the SCET and time e.g. coarse << 16 + fine Returns ------- """ return self.btime
[docs] def as_float(self): """ Return a float representation of the SCET and time e.g. coarse+fine*(1/MAX_FINE) Returns ------- """ return (self.coarse + (self.fine / MAX_FINE)) * u.s
[docs] def min(self, axis=None, out=None, keepdims=False): """ Return the minimum of time or minimum time along an axis Parameters ---------- axis : out : keepdims : `boolean` Keep the dimension of the original array Returns ------- """ return self[self._advanced_index(self.argmin(axis), axis, keepdims)]
[docs] def max(self, axis=None, out=None, keepdims=False): """ Return the maximum time or maximum time along an axis Parameters ---------- axis : out : keepdims : `boolean` Keep the dimension of the original array Returns ------- """ return self[self._advanced_index(self.argmax(axis), axis, keepdims)]
[docs] def argmin(self, axis=None, out=None): """ Return indices of the minimum values along the given axis Parameters ---------- axis out Returns ------- """ return self.btime.argmin(axis, out)
[docs] def argmax(self, axis=None, out=None): """ Return indices of the maximum values along the given axis Parameters ---------- axis : out : Returns ------- """ return self.btime.argmax(axis, out)
def _advanced_index(self, indices, axis=None, keepdims=False): """Turn argmin, argmax output into an advanced index. Argmin, argmax output contains indices along a given axis in an array shaped like the other dimensions. To use this to get values at the correct location, a list is constructed in which the other axes are indexed sequentially. For ``keepdims`` is ``True``, the net result is the same as constructing an index grid with ``np.ogrid`` and then replacing the ``axis`` item with ``indices`` with its shaped expanded at ``axis``. For ``keepdims`` is ``False``, the result is the same but with the ``axis`` dimension removed from all list entries. For ``axis`` is ``None``, this calls :func:`~numpy.unravel_index`. Parameters ---------- indices : array Output of argmin or argmax. axis : int or None axis along which argmin or argmax was used. keepdims : bool Whether to construct indices that keep or remove the axis along which argmin or argmax was used. Default: ``False``. Returns ------- advanced_index : list of arrays Suitable for use as an advanced index. """ if axis is None: return np.unravel_index(indices, self.shape) ndim = self.ndim if axis < 0: axis = axis + ndim if keepdims and indices.ndim < self.ndim: indices = np.expand_dims(indices, axis) index = [ indices if i == axis else np.arange(s).reshape( (1,) * (i if keepdims or i < axis else i - 1) + (s,) + (1,) * (ndim - i - (1 if keepdims or i > axis else 2)) ) for i, s in enumerate(self.shape) ] return tuple(index) def __setitem__(self, item, value): self.coarse[item] = value.coarse self.fine[item] = value.fine self.btime[item] = (value.coarse.astype("uint64") << 16) + value.fine def __repr__(self): return f"{self.__class__.__name__}(coarse={self.coarse}, fine={self.fine})" def __str__(self): return f"{self.__class__.__name__}(coarse={self.coarse}, fine={self.fine}, {self.btime})"
[docs] class SCETime(SCETBase): """ SolarOrbiter Spacecraft Elapse Time (SCET) or Onboard Time (OBT). The mission clock time is compose of a coarse time in seconds in 32bit field and fine time in 16bit field fractions of second 1s/(2**16 -1) can be represented as a single 48bit field or a float. The top most bit is used to indicate time sync issues. Attributes ---------- time_sync : bool Time synchronisation status coarse : int Coarse time stamp (seconds) fine : int Fine time stamp fraction of seconds 1/2**31 -1 Examples -------- SCETimes can be created in a number of ways from scaler values, >>> SCETime(123, 456) SCETime(coarse=123, fine=456) combinations of scalers and array-like, >>> SCETime(123, [1,2,3,4,5]) SCETime(coarse=[123 123 123 123 123], fine=[1 2 3 4 5]) or from seconds with the understanding this are from the epoch of the start or the SCET >>> SCETime.from_float(123.345*u.s) SCETime(coarse=123, fine=22610) """ info = TimeInfo() def __init__(self, coarse, fine=0): """ Create a new datetime using the given coarse and fine values. Parameters ---------- coarse : `int` or `SCETime` Coarse time stamp (seconds) or eixting `SCETime` fine : `int` Fine time stamp fraction of seconds 1/2**16 -1 """ if not isinstance(coarse, SCETime): coarse, fine = np.broadcast_arrays(coarse, fine) if not np.issubdtype(coarse.dtype, np.integer) or not np.issubdtype(fine.dtype, np.integer): raise ValueError("Coarse and fine times must be integers") # Convention if top bit is set means times are not synchronised time_sync = (coarse >> np.uint16(31)) != 1 # coarse = np.where(time_sync, coarse, coarse ^ 2 ** 31) # Check limits if np.any(np.logical_or(coarse < 0, coarse > MAX_COARSE)): raise ValueError(f"Coarse time must be in range (0 to {MAX_COARSE})") if np.any(np.logical_or(fine < 0, fine > MAX_FINE)): raise ValueError(f"Fine time must be in range (0 to {MAX_FINE})") # Can store as uints coarse = coarse.astype(np.uint32) fine = fine.astype(np.uint16) super().__init__(coarse, fine) self.time_sync = time_sync @property def fits(self): if self.coarse.size == 1: return f"{self.coarse:010d}:{self.fine:05d}" else: return [f"{c:010d}:{f:05d}" for c, f in zip(self.coarse, self.fine)]
[docs] @classmethod def from_float(cls, scet_float): """ Create an SCETime from a float representation of seconds since epoch Parameters ---------- scet_float : `astropy.units.Quantity` The scet float representation Returns ------- `SCETime` The SCETime object """ sub_seconds, seconds = np.modf(scet_float.to_value("s")) coarse = seconds.astype(np.uint32) fine = np.round(MAX_FINE * sub_seconds).astype(np.uint16) return SCETime(coarse, fine)
[docs] @classmethod def from_btime(cls, btime): coarse = btime >> 16 fine = btime - (coarse << 16) return SCETime(coarse=coarse, fine=fine)
[docs] @classmethod def from_string(cls, scet_str, sep=":"): """ Create an SCETime for a string representation e.g. `'123456:789'` Parameters ---------- scet_str : `array_like` Time/s in SCET string format Returns ------- `SCETime` The SCETime object """ if isinstance(scet_str, str): scet_str = [scet_str] coarse, fine = zip(*[list(map(int, ts.split(sep))) for ts in scet_str]) return SCETime(coarse=coarse, fine=fine)
[docs] def to_datetime(self, raise_error=False): """ Return a python datetime object. Returns ------- `datetime.datetime` The corresponding UTC datetime object. """ try: utc = [Spice.instance.scet_to_datetime(t.to_string()) for t in self] except TypeError: utc = Spice.instance.scet_to_datetime(self.to_string()) kernel_date = Spice.instance.get_mk_date(meta_kernel_type="flown") bad = [t.replace(tzinfo=None) > kernel_date for t in (utc if isinstance(utc, list) else [utc])] if any(bad): if raise_error is True: raise ValueError(f"Converting OBT to UTC after kernel issue date: {kernel_date}.") logger.warning(f"Converting OBT to UTC after kernel issue date: {kernel_date}.") return utc
[docs] def to_time(self): return Time(self.to_datetime())
[docs] def to_string(self, full=True, sep=":"): if self.size == 1: if full: return f"{self.coarse:010d}{sep}{self.fine:05d}" return f"{self.coarse:010d}"
[docs] @staticmethod def min_time(): """ The minimum possible time value """ return SCETime(0, 0)
[docs] @staticmethod def max_time(): """ The maximum possible time value """ return SCETime(MAX_COARSE, MAX_FINE)
def __add__(self, other): """ Can only add a SCETimeDeltas or Quantities that can be converted to seconds It doesn't make sense to add two 'times' """ if not isinstance(other, (u.Quantity, SCETimeDelta)): raise TypeError("Only Quantities and SCETimeDeltas can be added to SCETimes") if isinstance(other, u.Quantity): other = SCETimeDelta.from_float(other.to(u.s)) return SCETime.from_btime(self.btime + other.btime) def __radd__(self, other): return self.__add__(other) def __sub__(self, other): """ Can subtract one time from another, can subtract a time delta or Quantity from an SCETime """ if not isinstance(other, (SCETime, SCETimeDelta, u.Quantity)): raise TypeError("Only quantities, SCETime and SCETimeDelta objects can be subtracted from SCETimes") if isinstance(other, SCETime): return SCETimeDelta.from_btime(self.btime - other.btime) if isinstance(other, u.Quantity): other = SCETimeDelta.from_float(other) # case SCETimeDelta return SCETime.from_btime(self.btime - other.btime) def __str__(self): return f"{self.coarse}:{self.fine}" def _comparison_operator(self, other, op): if other.__class__ is not self.__class__: return NotImplemented return op(self.btime - other.btime, 0) def __gt__(self, other): return self._comparison_operator(other, operator.gt) def __ge__(self, other): return self._comparison_operator(other, operator.ge) def __lt__(self, other): return self._comparison_operator(other, operator.lt) def __le__(self, other): return self._comparison_operator(other, operator.le) def __eq__(self, other): return self._comparison_operator(other, operator.eq) def __ne__(self, other): return self._comparison_operator(other, operator.ne)
[docs] class SCETimeDelta(SCETBase): """ SCET time delta objects which can be created. Attributes ---------- coarse : int Coarse time stamp (seconds) fine : int Fine time stamp fraction of seconds 1/(2**16-1) Examples -------- SCETimeDeltas can be created from directly >>> SCETimeDelta(1, 2) SCETimeDelta(coarse=1, fine=2) or as result of subtracting two times >>> SCETime(9, 8) - SCETime(5, 10) SCETimeDelta(coarse=3, fine=65534) """ info = TimeDetlaInfo() def __init__(self, coarse, fine=0): """ Create a new delta time using the given coarse and fine values. Parameters ---------- coarse : `int` Coarse time stamp (seconds) fine : `int` Fine time stamp fraction of seconds 1/2**16 -1 """ if not isinstance(coarse, SCETimeDelta): if isinstance(coarse, u.Quantity): coarse, fine = self._convert_float(coarse) coarse, fine = np.broadcast_arrays(coarse, fine) if not np.issubdtype(coarse.dtype, np.integer) or not np.issubdtype(fine.dtype, np.integer): raise ValueError("Coarse and fine times must be integers") if np.any(np.abs(coarse) > MAX_COARSE): raise ValueError("Course time must be in the range -2**31-1 to 2**31-1") if np.any(np.abs(fine) > MAX_FINE): raise ValueError("Fine time must be in the range -2**16-1 to 2**16-1") # Fine needs to be 32bit as due to SO convention coarse has max of 2**31 which works # for a signed 32 bit in but but fine uses all 16bit as a uint as has to be 32 as a int super().__init__(coarse.astype(np.int32), fine.astype(np.int32))
[docs] @classmethod def from_btime(cls, btime): btime = np.atleast_1d(btime) neg_idx = np.where(btime < 0) btime[neg_idx] = abs(btime[neg_idx]) coarse = btime >> 16 fine = btime - (coarse << 16) td = cls(coarse, fine) td[neg_idx] = -td[neg_idx] return td[0] if btime.size == 1 else td
[docs] @classmethod def from_float(cls, scet_float): """ Create a Parameters ---------- scet_float Returns ------- """ coarse, fine = cls._convert_float(scet_float) return cls(coarse, fine)
@staticmethod def _convert_float(scet_float): scet_float = scet_float.to_value("s") sub_seconds, seconds = np.modf(scet_float) fine = np.round(MAX_FINE * sub_seconds).astype(int) return seconds.astype(int), fine def __add__(self, other): # If other is a Time then use SCETime.__add__ to do the calculation. if isinstance(other, SCETime): return other.__add__(self) if not isinstance(other, SCETimeDelta): other = SCETimeDelta(other) return SCETimeDelta.from_btime(self.btime + other.btime) def __radd__(self, other): return self.__add__(other) def __sub__(self, other): if isinstance(other, (u.Quantity, SCETimeDelta)): try: other = SCETimeDelta(other) except Exception: raise TypeError(f"{other.__class__.__name__} could not be converted to {self.__class__.__name__}") return SCETimeDelta.from_btime(self.btime - other.btime) else: raise TypeError(f"Unsupported operation for types {self.__class__.__name__} and {other.__class__.__name__}") def __rsub__(self, other): out = self.__sub__(other) return -out def __neg__(self): new = self.copy() new.coarse = -new.coarse new.fine = -new.fine new.btime = -new.btime return new def __truediv__(self, other): res = np.floor_divide(self.btime, other) return SCETimeDelta.from_btime(res) def __mul__(self, other): res = (self.btime * other).astype("int64") return SCETimeDelta.from_btime(res) def __str__(self): return f"{self.coarse}, {self.fine}, {self.btime}" def __eq__(self, other): if not isinstance(other, (SCETimeDelta, u.Quantity)): return False if isinstance(other, u.Quantity): other = SCETimeDelta(other) return self.btime == other.btime
[docs] class SCETimeRange: """ SolarOrbiter Spacecraft Elapse Time (SCET) Range with start and end time. Attributes ---------- start : `SCETime` start time of the range end : `SCETime` end time of the range """ def __init__(self, *, start=SCETime.max_time(), end=SCETime.min_time()): if not isinstance(start, SCETime) or not isinstance(end, SCETime): raise TypeError("Must be SCETime") self.start = start.min() self.end = end.max()
[docs] def expand(self, time): """ Enlarge the time range to include the given time. Parameters ---------- time : `SCETime` or `SCETimeRange` The new time the range should include or an other time range. Raises ------ ValueError if the given time is from a other class. """ if isinstance(time, SCETime): self.start = min(self.start, time.min()) self.end = max(self.end, time.max()) elif isinstance(time, SCETimeRange): self.start = min(self.start, time.start) self.end = max(self.end, time.end) else: raise ValueError("time must be 'SCETime' or 'SCETimeRange'")
[docs] def to_timerange(self): return TimeRange(self.start.to_time(), self.end.to_time())
[docs] def duration(self): return (self.end - self.start).as_float()
@property def avg(self): return self.start + (self.end - self.start) / 2 def __repr__(self): return f"{self.__class__.__name__}(start={str(self.start)}, end={str(self.end)})" def __str__(self): return f"{str(self.start)} to " + f"{str(self.end)}" def __eq__(self, other): return np.all(self.start == other.start) and np.all(self.end == other.end) def __contains__(self, item): if isinstance(item, SCETime): return self.start <= item <= self.end elif isinstance(item, SCETimeRange): return self.start <= item.start and self.end >= item.end else: raise ValueError("time must be 'SCETime' or 'SCETimeRange'")