from pathlib import Path
import numpy as np
import astropy.units as u
from stixcore.config.reader import get_sci_channels, read_energy_channels
from stixcore.util.logging import get_logger
__all__ = [
"ENERGY_CHANNELS",
"_get_compression_scheme",
"_get_energy_bins",
"_get_detector_mask",
"_get_pixel_mask",
"_get_num_energies",
"_get_unique",
"_get_sub_spectrum_mask",
"_get_energies_from_mask",
"rebin_proportional",
]
logger = get_logger(__name__)
# TODO get file from config
ENERGY_CHANNELS = read_energy_channels(
Path(__file__).parent.parent / "config" / "data" / "common" / "detector" / "ScienceEnergyChannels_1000.csv"
)
[docs]
def _get_compression_scheme(packets, nix):
"""
Get the compression scheme parameters.
Parameters
----------
packets : dict
Packets
nix : `str`
The parameter to look up the compression scheme values for
Returns
-------
np.ndarray
S,K,M compression scheme parameters and names
"""
param = packets.get(nix)
skms = [p.skm for p in param]
comp_scheme = np.array([[skm[0].value, skm[1].value, skm[2].value] for skm in skms], dtype=np.ubyte)
return comp_scheme, {
"NIXS": [skms[0][0].name, skms[0][1].name, skms[0][2].name],
"PCF_CURTX": [p.idb_info.PCF_CURTX for p in skms[0]],
}
[docs]
def _get_energy_bins(packets, nixlower, nixuppper):
"""
Get energy bin mask from packets
Parameters
----------
packets : dict
Packets
nixlower : str
Parameter name of lower 32 bins
nixuppper : str
Parameters name of the upper bin
Returns
-------
np.ndarray
Full energy mask of len 33
"""
energy_bin_mask = np.array(packets.get_value(nixlower), np.uint32)
energy_bin_mask_upper = np.array(packets.get_value(nixuppper), np.bool_)
full_energy_mask = [
format(mask, "b").zfill(32)[::-1] + format(upper, "b")
for mask, upper in zip(energy_bin_mask, energy_bin_mask_upper)
]
full_energy_mask = [list(map(int, m)) for m in full_energy_mask]
full_energy_mask = np.array(full_energy_mask).astype(np.ubyte)
meta = {
"NIXS": [nixlower, nixuppper],
"PCF_CURTX": [packets.get(n)[0].idb_info.PCF_CURTX for n in [nixlower, nixuppper]],
}
return full_energy_mask, meta
[docs]
def _get_detector_mask(packets):
"""
Get the detector mask.
Parameters
----------
packets : dict
Packets
Returns
-------
np.ndarray
Detector mask
"""
detector_masks = np.array([list(format(dm, "032b"))[::-1] for dm in packets.get_value("NIX00407")]).astype(np.ubyte)
param = packets.get("NIX00407")[0]
meta = {"NIXS": "NIX00407", "PCF_CURTX": param.idb_info.PCF_CURTX}
return detector_masks, meta
[docs]
def _get_pixel_mask(packets, param_name="NIXD0407"):
"""
Get pixel mask.
Parameters
----------
packets : dict
Packets
Returns
-------
np.ndarray
Pixel mask
"""
pixel_masks_ints = packets.get_value(param_name)
pixel_masks = np.array([list(format(pm, "012b"))[::-1] for pm in pixel_masks_ints]).astype(np.ubyte)
param = packets.get(param_name)[0]
meta = {"NIXS": param_name, "PCF_CURTX": param.idb_info.PCF_CURTX}
return pixel_masks, meta
[docs]
def _get_num_energies(packets):
"""
Get number of energies.
Parameters
----------
packets : dict
Packets
Returns
-------
int
Number of energies
"""
return packets.get_value("NIX00270")
[docs]
def _get_unique(packets, param_name, dtype):
"""
Get a unique parameter raise warning if not unique.
Parameters
----------
param_name : str
STIX parameter name eg NIX00001
dtype : np.dtype
Dtype to cast to eg. np.uint16/np.uint32
Returns
-------
np.ndarray
First value even if not unique
"""
param = np.array(packets.get_value(param_name), dtype)
if not np.all(param == param[0]):
logger.warning("%s has changed in complete packet sequence", param_name)
return param[0]
[docs]
def _get_sub_spectrum_mask(packets):
"""
Get subspectrum mask as bool array
Parameters
----------
packets : dict
Merged packets
Returns
-------
numpy.ndarray
Bool array of mask
"""
nix = "NIX00160"
sub_spectrum_masks = np.array(
[
[bool(int(x)) for x in format(packets.get_value(nix)[i], "08b")][::-1]
for i in range(len(packets.get_value(nix)))
],
np.ubyte,
)
param = packets.get(nix)[0]
meta = {"NIXS": nix, "PCF_CURTX": param.idb_info.PCF_CURTX}
return sub_spectrum_masks, meta
[docs]
def _get_energies_from_mask(date, mask=None):
"""
Return energy channels for
Parameters
----------
mask : list or array
Energy bin mask
Returns
-------
tuple
Lower and high energy edges
"""
sci_energy_channels = get_sci_channels(date.replace(tzinfo=None))
e_lower = sci_energy_channels["Elower"].filled(np.nan).value
e_upper = sci_energy_channels["Eupper"].filled(np.nan).value
if mask is None:
low = [e_lower[edge] for edge in range(32)]
high = [e_upper[edge] for edge in range(32)]
elif len(mask) == 33:
edges = np.where(np.array(mask) == 1)[0]
channel_edges = [edges[i : i + 2].tolist() for i in range(len(edges) - 1)]
low = []
high = []
for edge in channel_edges:
l, h = edge # noqa: E741
low.append(e_lower[l])
high.append(e_upper[h - 1])
elif len(mask) == 32:
edges = np.where(np.array(mask) == 1)
low_ind = np.min(edges)
high_ind = np.max(edges)
low = [e_lower[i] for i in range(low_ind, high_ind + 1)]
high = [e_upper[i] for i in range(low_ind, high_ind + 1)]
else:
raise ValueError(f"Energy mask or edges must have a length of 32 or 33 not {len(mask)}")
return low, high
[docs]
def rebin_proportional(y1, x1, x2):
x1 = np.asarray(x1)
y1 = np.asarray(y1)
x2 = np.asarray(x2)
# the fractional bin locations of the new bins in the old bins
i_place = np.interp(x2, x1, np.arange(len(x1)))
cum_sum = np.r_[[0], np.cumsum(y1)]
# calculate bins where lower and upper bin edges span
# greater than or equal to one original bin.
# This is the contribution from the 'intact' bins (not including the
# fractional start and end parts.
whole_bins = np.floor(i_place[1:]) - np.ceil(i_place[:-1]) >= 1.0
start = cum_sum[np.ceil(i_place[:-1]).astype(int)]
finish = cum_sum[np.floor(i_place[1:]).astype(int)]
y2 = np.where(whole_bins, finish - start, 0.0)
bin_loc = np.clip(np.floor(i_place).astype(int), 0, len(y1) - 1)
# fractional contribution for bins where the new bin edges are in the same
# original bin.
same_cell = np.floor(i_place[1:]) == np.floor(i_place[:-1])
frac = i_place[1:] - i_place[:-1]
contrib = frac * y1[bin_loc[:-1]]
y2 += np.where(same_cell, contrib, 0.0)
# fractional contribution for bins where the left and right bin edges are in
# different original bins.
different_cell = np.floor(i_place[1:]) > np.floor(i_place[:-1])
frac_left = np.ceil(i_place[:-1]) - i_place[:-1]
contrib = frac_left * y1[bin_loc[:-1]]
frac_right = i_place[1:] - np.floor(i_place[1:])
contrib += frac_right * y1[bin_loc[1:]]
y2 += np.where(different_cell, contrib, 0.0)
return y2
def unscale_triggers(scaled_triggers, *, integration, detector_masks, ssid, factor=30):
r"""
Unscale scaled trigger values.
Trigger values are scaled on board in compression mode 0,0,7 via the following relation
T_s = T / (factor * n_int * n_trig_groups)
where `factor` is a configured parameter, `n_int` is the duration in units of 0.1s and
`n_trig_groups` number of active trigger groups being summed, which depends on the data
product given by the SSID.
Parameters
----------
scaled_triggers : int
Scaled trigger
integration : `astropy.units.Quantity`
Number of integrations (integration time / 0.1s)
detector_masks : `array-like`
Number of trigger groups summed
ssid : `int`
Science Structure ID
factor : int optional
Scaling factor
Returns
-------
tuple
Unscaled triggers, Scaling Variance
"""
# TODO extract this to a separate function and test
detector_to_trigger_group_map = np.array(
[
[1, 6, 5, 12, 14, 10, 8, 3, 31, 26, 22, 20, 18, 17, 24, 29],
[2, 7, 11, 13, 15, 16, 9, 4, 32, 27, 28, 21, 19, 23, 25, 30],
]
).T
trigger_groups = detector_masks[:, [np.array(detector_to_trigger_group_map) - 1]]
active_detectors_per_trigger_group = trigger_groups.squeeze().sum(axis=-1)
active_trigger_groups = active_detectors_per_trigger_group >= 1
# QL are summed to total trigger (1 trigger value)
# BSD SPEC data are summed to total trigger (1 trigger value)
if ssid in {30, 31, 32, 24}:
n_group = active_trigger_groups.astype(int).sum()
n_int = integration.as_float().to_value(u.ds).flatten() # units of 0.1s
# BSD pixel/vis data not summed (16 trigger values)
elif ssid in {21, 22, 23}:
n_group = active_trigger_groups.astype(int)
n_int = integration.as_float().to_value(u.ds) # units of 0.1s
else:
raise ValueError(f"Unscaling not support for SSID {ssid}")
# Scaled to ints onboard, bins have scaled width of 1, so error is 0.5 times the total factor
scaling_error = np.full_like(scaled_triggers, 0.5, dtype=float) * n_group.T * n_int * factor
# The FSW essential floors the value so add 0.5 so trigger is the centre of range +/- error
unscaled_triggers = (scaled_triggers * n_group.T * n_int * factor) + scaling_error
return unscaled_triggers, scaling_error**2