"""Processing module for converting raw to engineering values."""
import re
from collections.abc import Iterable
import numpy as np
from astropy.table.table import QTable
from stixcore.time import SCETime
from stixcore.tmtc.parameter import EngineeringParameter, Parameter
from stixcore.util.logging import get_logger
CCN = "__converted_column__"
__all__ = ["EngineeringParameter", "raw_to_engineering"]
logger = get_logger(__name__)
# TODO decide on raw as array input
def apply_raw_to_engineering(raw, args):
"""Wrap the raw to engineering converting algorithm into a callback.
Parameters
----------
raw : `stixcore.tmtc.parser.Parameter
The the original parameter value
args : `tuple`
(IDBCalibrationParameter, IDB)
Returns
-------
`EngineeringParameter`
The raw and engineering value
"""
param, idb = args
en = None
if param.PCF_CATEG == "S":
if isinstance(raw.value, Iterable):
if isinstance(raw.value, list):
raw.value = np.array(raw.value)
en = np.array([idb.textual_interpret(param.PCF_CURTX, val.item()) for val in np.ravel(raw.value)]).reshape(
raw.value.shape
)
else:
en = idb.textual_interpret(param.PCF_CURTX, raw.value)
elif param.PCF_CATEG == "N":
prefix = re.split(r"\d+", param.PCF_CURTX)[0]
if prefix == "CIXP":
curve = idb.get_calibration_curve(param)
en = curve(raw.value)
if en is None:
logger.error(
f"Failed curve calibrate {param.PCF_NAME} / \
{param.PCF_CURTX} due to bad coefficients {curve}"
)
elif prefix == "CIX":
poly = idb.get_calibration_polynomial(param.PCF_CURTX)
en = poly(raw.value)
if en is None:
logger.error(
f"Failed polynomial calibrate {param.PCF_NAME} / \
{param.PCF_CURTX} due to bad coefficients {poly}"
)
else:
er = f"Unsupported calibration method: {param.PCF_CATEG} for " + f"{param.PCF_NAME} / {param.PCF_CURTX}"
logger.error(er)
raise ValueError(er)
# hardcoding RCR override do not pass back "State_0" ...
if raw.name in ["NIX00276", "NIX00401"]:
en = raw.value
return EngineeringParameter(
name=raw.name, value=raw.value, idb_info=raw.idb_info, engineering=en, unit=param.PCF_UNIT, order=raw.order
)
[docs]
def raw_to_engineering(packet):
"""Apply parameter raw to engineering conversion for the entire packet.
Parameters
----------
packet : `GenericTMPacket`
The TM packet
Returns
-------
`int`
How many times the raw to engineering algorithm was called.
"""
idb, calib_parameters = packet.get_calibration_params()
if not calib_parameters:
return 0
c = 0
for param in calib_parameters:
c += packet.data.apply(param.PCF_NAME, apply_raw_to_engineering, (param, idb))
return c
def raw_to_engineering_product(product, idbm):
"""Apply parameter raw to engineering conversion for the entire product.
Parameters
----------
product : `BaseProduct`
The TM product as level 0
Returns
-------
`int`
How many columns where calibrated.
"""
col_n = 0
idb_ranges = QTable(
rows=[
(version, range.start.as_float(), range.end.as_float()) for version, range in product.idb_versions.items()
],
names=["version", "obt_start", "obt_end"],
)
idb_ranges.sort("obt_start")
idb_ranges["obt_start"][0] = SCETime.min_time().as_float()
for i in range(0, len(idb_ranges) - 1):
idb_ranges["obt_end"][i] = idb_ranges["obt_start"][i + 1]
idb_ranges["obt_end"][-1] = SCETime.max_time().as_float()
for table, timecol in [(product.data, "time"), (product.control, "scet_coarse")]:
if timecol == "scet_coarse":
if "scet_coarse" in table.colnames:
timevector = SCETime(coarse=table["scet_coarse"], fine=table["scet_fine"]).as_float()
else:
# product per request (xray: no 'scet_coarse' in control)
# do not have engineering values in control
continue
else: # time
timevector = table["time"].as_float()
for col in table.colnames:
if not (
hasattr(table[col], "meta")
# and table[col].meta.get("PCF_CURTX", None) is not None
and not isinstance(table[col].meta.get("PCF_CURTX", None), (type(None), list))
and table[col].meta["NIXS"] is not None
):
continue
col_n += 1
c = 0
# clone the current column into a new column as the content might be replaced chunk wise
table[CCN] = table[col]
for idbversion, starttime, endtime in idb_ranges.iterrows():
idb = idbm.get_idb(idbversion)
idb_time_period = np.where((starttime <= timevector) & (timevector < endtime))[0]
if len(idb_time_period) < 1:
continue
c += len(idb_time_period)
calib_param = idb.get_params_for_calibration(
product.service_type,
product.service_subtype,
(product.ssid if hasattr(product, "ssid") else None),
table[col].meta["NIXS"],
table[col].meta["PCF_CURTX"],
)
if len(calib_param) == 0:
# for this idb period no conversion with the same ID (PCF_CURTX) is defined
# so look it up again
calib_param = idb.get_params_for_calibration(
product.service_type,
product.service_subtype,
(product.ssid if hasattr(product, "ssid") else None),
table[col].meta["NIXS"],
)
calib_param = calib_param[0]
raw = Parameter(table[col].meta["NIXS"], table[idb_time_period][col], None)
eng = apply_raw_to_engineering(raw, (calib_param, idb))
# cast the type of the column if needed
if table[CCN].dtype != eng.engineering.dtype:
table[CCN] = table[CCN].astype(eng.engineering.dtype)
# set the unit if needed
if hasattr(eng.engineering, "unit") and table[CCN].unit != eng.engineering.unit:
if table[CCN].unit is None:
meta = table[col].meta
table[CCN].unit = eng.engineering.unit
# restore the meta info
setattr(table[CCN], "meta", meta)
else:
# convert the values to the already existing unit in the column
# (e.g. ms to s) if this fails (e.g. s to K) something is very
# off and should raise an error anyway
eng.engineering = eng.engineering.to(table[CCN].unit)
logger.warning(
f"Automated unit conversion triggered: "
f"{eng.engineering.unit} to {table[CCN].unit}"
f" for {col} / {table[col].meta['NIXS']}"
)
# override the data into the new column
table[CCN][idb_time_period] = eng.engineering
# replace the old column with the converted
table[col] = table[CCN]
table[col].meta = table[CCN].meta
# delete the generic column for conversion
del table[CCN]
# delete the calibration key from meta as it is now processed
del table[col].meta["PCF_CURTX"]
if c != len(table):
logger.warning(
"Not all time bins got converted to engineering"
+ "values due to bad idb periods."
+ f"\n Converted bins: {c}\ntotal bins {len(table)}"
)
return col_n