from collections import defaultdict
import numpy as np
import astropy.units as u
from stixcore.time.datetime import SCETime
__all__ = ["Parameter", "EngineeringParameter", "CompressedParameter"]
[docs]
class Parameter:
"""Generic Parameter Class
Attributes
_________
name : `str`
Parameter name
value :
The value of the parameter
children : `list` optional
Children of this parameter
"""
def __init__(self, name, value, idb_info, *, children=None, order=-1):
self.name = name
self.value = value
self.idb_info = idb_info
self.children = children
self.order = order
[docs]
def export(self, packetdata, exportstate, *, descr=False):
val = ""
convparam = getattr(packetdata, self.name)
if convparam:
if isinstance(convparam, EngineeringParameter):
val = convparam.engineering
elif isinstance(convparam, CompressedParameter):
val = convparam.decompressed
if isinstance(val, np.ndarray) and val.size > 1:
# idx = self.order-convparam.order-1
idx = exportstate[self.name]
exportstate[self.name] += 1
val = val[0][idx] if len(val.shape) > 1 else val[idx]
if self.name == "NIX00445":
val = (
SCETime(
coarse=self.value, fine=getattr(packetdata, "NIX00446", Parameter("NIX00446", 0, None)).value
)
.to_datetime()
.isoformat(timespec="milliseconds")
)
elif self.name == "NIX00402":
if self.value > 2**32 - 1:
coarse = self.value >> 16
fine = self.value & (1 << 16) - 1
else:
coarse = self.value
fine = 0
val = SCETime(coarse, fine).to_datetime().isoformat(timespec="milliseconds")
elif self.name == "NIXD0003":
val = round(self.value / 2.5, 3)
if isinstance(convparam, EngineeringParameter):
# undo °C to K
if convparam.unit == "degC":
val = val.to("deg_C", equivalencies=u.temperature())
val = (
(val.value[0].round(3) if hasattr(val.value, "__len__") else round(val.value, 3))
if hasattr(val, "value")
else str(val)
)
if isinstance(val, np.ndarray):
val = val[0]
res = [self.name, self.value, val, [c.export(packetdata, exportstate, descr=descr) for c in self.children]]
if descr:
res.insert(1, self.idb_info.PCF_DESCR)
return res
def __repr__(self):
return (
f"{self.__class__.__name__}(name={self.name}, value={self.value}, "
f"children={len(self.children) if self.children else None}, "
f"order={self.order})"
)
[docs]
def merge_children(self):
"""
Merge children into single parameter if same NIX
Returns
-------
Parameter
A new parameter with where are children are merged.
"""
if self.name == "NIXD0159":
return self.unpack_NIX00065()
elif self.children:
names = (c.name for c in self.children)
if all([False if child.children else True for child in self.children]) and "NIXD0159" not in names:
params = {child.name: child for child in self.children}
values = defaultdict(list)
for param in self.children:
if isinstance(param.value, list):
values[param.name].extend(param.value)
else:
values[param.name].append(param.value)
children = [
Parameter(name, values[name], param.idb_info, order=self.order) for name, param in params.items()
]
return Parameter(self.name, self.value, self.idb_info, children=children, order=self.order)
else:
return Parameter(
self.name,
self.value,
self.idb_info,
children=[c.merge_children() for c in self.children],
order=self.order,
)
else:
return Parameter(self.name, self.value, self.idb_info, order=self.order)
[docs]
def flatten(self, root):
"""
Flatten repeated substructures into root object
Parameters
----------
root : `object`
Returns
-------
"""
if self.children:
for child in self.children:
if child.children:
child.flatten(root=root)
else:
if hasattr(root, child.name):
cur = getattr(root, child.name)
if isinstance(child.value, list):
cur.value.extend(child.value)
else:
cur.value.append(child.value)
else:
if not isinstance(child.value, list):
child.value = [child.value]
setattr(root, child.name, child)
self.children = None
if hasattr(root, self.name):
old = getattr(root, self.name)
if not isinstance(old.value, list):
old.value = [old.value]
if isinstance(self.value, list):
old.value.extend(self.value)
else:
old.value.append(self.value)
else:
self.value = [self.value]
setattr(root, self.name, self)
[docs]
def unpack_NIX00065(self):
"""Unpack the NIX00065 values.
Continuation bits (NIXD0159) define number of subsequent bytes used to define counts for
given Detector / Pixel / Energy combination, i.e. value 0 denotes no following bytes and
count equal to 1, value 1 denotes 1 byte for “Counts” parameter with value between 2-255
and continuation bits equal to 2 are used for 2 successive bytes for “Counts” parameter
with value between 256 and 65535.
Parameters
----------
param : ´Parameter´
The NIXD00159 parameter
Returns
-------
`Parameter`
The unpacked value
Raises
------
ValueError
if unpacking schema is not supported
"""
NIX00065 = None
if self.value == 0:
NIX00065 = 1
child_idb_info = None
elif self.value == 1:
NIX00065 = self.children[0].value
child_idb_info = self.children[0].idb_info
elif self.value == 2:
high_bit, low_bit = [c.value for c in self.children]
NIX00065 = (high_bit << 8) + low_bit
child_idb_info = self.children[0].idb_info
else:
raise ValueError(
f"Continuation bits value of {self.value} \
not allowed (0, 1, 2)"
)
param = Parameter(
name=self.name,
value=self.value,
idb_info=self.idb_info,
children=[Parameter(name="NIX00065", value=NIX00065, idb_info=child_idb_info)],
)
return param
[docs]
class EngineeringParameter(Parameter):
"""A class to combine the raw and engineering values and settings of a parameter.
Attributes
----------
value : `int` | `list`
The original or raw values before the calibration.
engineering : `int` | `list`
The Engineering values.
unit : `str`
The unit for the engineering values
"""
def __init__(self, *, name, value, idb_info, engineering, unit, order=-1):
"""Create a EngineeringParameter object.
Parameters
----------
value : `int` | `list`
The raw values.
engineering : `int` | `list`
The engineering values.
"""
super().__init__(name=name, value=value, idb_info=idb_info, order=order)
self.unit = unit
convert = False
# IDB 2.26.36 defines some times as "S" (Siemens) not as "s" (seconds) override that
if unit == "S":
unit = "s"
if unit == "degC":
unit = "deg_C"
convert = "K"
if unit is not None and unit != "" and engineering is not None:
try:
engineering = engineering * u.Unit(unit)
if convert == "K":
engineering = engineering.to(convert, equivalencies=u.temperature())
except ValueError:
raise NotImplementedError(f"Add unit support: for {unit}")
self.engineering = engineering
def __repr__(self):
return (
f"{self.__class__.__name__}(value={self.value}, engineering={self.engineering}, "
+ f"unit={self.unit}, order={self.order})"
)
def __str__(self):
return (
f"{self.__class__.__name__}(value: len({len(self.value)}), engineering: "
+ f"len({len(self.engineering)}), unit={self.unit}, order={self.order}))"
)
[docs]
class CompressedParameter(Parameter):
"""A class to combine the raw and decompressed values and settings of a parameter.
Attributes
----------
decompressed : `int` | `list`
The decompressed values.
error : `int` | `list`
The estimated error of the decompression.
skm : `tuple`
(s, k, m) settings for the decompression algorithm.
"""
def __init__(self, *, name, value, idb_info, decompressed, error, skm, order=-1):
"""Create a CompressedParameter object.
Parameters
----------
value : `int` | `list`
The compressed values.
decompressed : `int` | `list`
The decompressed values.
error : `int` | `list`
The estimated error of the decompression.
skm : `numpy.array`
[s, k, m] settings for the decompression algorithm.
"""
super().__init__(name=name, value=value, idb_info=idb_info, order=order)
self.decompressed = decompressed
self.skm = skm
self.error = error
def __repr__(self):
return f"{self.__class__.__name__}(raw={self.value}, decompressed={self.decompressed}, \
error={self.error}, skm={self.skm}, order={self.order})"
def __str__(self):
return f"{self.__class__.__name__}(raw: len({len(self.value)}), decompressed: \
len({len(self.decompressed)}), error: len({len(self.error)}), \
skm={self.skm}, order={self.order})"