Source code for stixcore.tmtc.parser

from collections import defaultdict

import numpy as np
from bitstring import ConstBitStream

from stixcore.tmtc.parameter import Parameter
from stixcore.util.logging import get_logger

logger = get_logger(__name__)

__all__ = ["PacketData"]

SUBPACKET_NIX = "NIX00403"


[docs] class PacketData: """Generic class for organizing all parameters of the TM packet data."""
[docs] @classmethod def parameter_list_2_PacketData(cls, parameters): """Convert the list nested parameters into a PacketData object. Unpacks/decodes the NIXD0159 counts from TM(21,6,20) by eliminating this repeater level. Parameters ---------- parameters : `list` The parse result of the TM data block. Returns ------- `PacketData` Generic data structure. """ # if d is not a instance of dict then # directly object is returned if not isinstance(parameters, list): return parameters # constructor of the class passed to obj obj = PacketData() for parameter in parameters: new_parameter = parameter.merge_children() obj.__dict__[parameter.name] = new_parameter return obj
[docs] def merge(self): """ Merge sub structures Returns ------- """ for param in list(self.__dict__.values()): if param.children: for child in param.children: child.flatten(root=self) # TODO nicky reactivate? getattr(self, child.name).children = None return self
[docs] def set(self, nix, value): """Set the parameter vale by the NIX name. Parameters ---------- nix : `str` The NIX name of the parameter. value : 'any' The new value. """ setattr(self, nix, value)
[docs] def get(self, nix, aslist=False): """Get the parameter value for a given NIX name. Parameters ---------- nix : `str` The NIX name of the parameter. Returns ------- 'any' The found value. """ attr = getattr(self, nix, None) if aslist: return [attr] if attr is not None else [] return attr
[docs] def apply(self, nix, callback, args, addnix=None): """Apply a processing method to a parameter. Overids the value after processing or creates a new one. Parameters ---------- nix : `str` The NIX name of the parameter. callback : `callable` the callback to by applied to each data entry of the parameter. args : `any` will be passed on to the callback addnix : `str`, optional A NIX name where to override or create a new parameter. by default None Returns ------- 'int' How many times the callback was invoked? """ write_nix = addnix if addnix else nix counter = 0 val = self.get(nix) if val is not None: if isinstance(val, list): w_val = [callback(v, args) for v in val] counter = len(w_val.value) else: w_val = callback(val, args) counter += 1 self.set(write_nix, w_val) # if self.has_subpackets(): # for subpacket in self.get_subpackets(): # counter += subpacket.apply(nix, callback, args, addnix) return counter
def __repr__(self): return f"{self.__class__.__name__}({self.__dict__.keys()})"
[docs] def has_subpackets(self): return hasattr(self, SUBPACKET_NIX)
[docs] def get_subpackets(self): return getattr(self, SUBPACKET_NIX, [])
def parse_binary(binary, structure): """ Parse binary data using given structure. Parameters ---------- binary : bytes or hexstring Input binary data structure : dict Name and data type mapping e.g. `{'myparam': uint:8}` Returns ------- dict The parsed data fields and bitstream """ bitstream = ConstBitStream(binary) return parse_bitstream(bitstream, structure) def _parse_tree(bitstream, parent, fields): """Recursive parsing of TM data. Parameters ---------- bitstream : `bitstream.ConstBitstream` Input binary data parent : `~stixcore/idb/idb/IDBPacketTree` the dynamic parse tree defined by the IDB fields : `list[stixcore.tmtc.parser.Parameter]` The parsed parameters - mutable out data. """ if not parent: return counter = parent.counter for i in range(0, counter): for pnode in parent.children: # dynamic packets might jump back or forward if pnode.parameter.is_variable(): if pnode.parameter.VPD_OFFSET != 0: bitstream.pos += int(pnode.parameter.VPD_OFFSET) # static packets: each parameter describes its own absolute position else: bitstream.pos = pnode.parameter.PLF_OFFBY * 8 + pnode.parameter.PLF_OFFBI try: raw_val, children = (bitstream.read(pnode.parameter.bin_format), []) except Exception as e: raise e if pnode.children: num_children = raw_val is_valid = False if isinstance(num_children, int): if num_children > 0: pnode.counter = num_children is_valid = True _parse_tree(bitstream, pnode, children) if not is_valid: if pnode.name != "NIXD0159": # repeater NIXD0159 can be zero according to STIX ICD-0812-ESC Table 93 P123 logger.warning(f"Repeater {pnode.name} has an invalid value: {raw_val}") # entry = {'name': pnode.name, "value": raw_val} # if children: # entry["children"] = children entry = Parameter(name=pnode.name, value=raw_val, idb_info=pnode.parameter, children=children) fields.append(entry) def parse_variable(bitstream, tree): """Parse binary data using given structure. Parameters ---------- bitstream : `bitstream.ConstBitstream` Input binary data tree : `~stixcore/idb/idb/IDBPacketTree` the dynamic parse tree defined by the IDB Returns ------- `PacketData` The parsed (nested) telemetry packet parameters. """ fields = [] _parse_tree(bitstream, tree, fields) _keep_order(fields, 0) pd = PacketData.parameter_list_2_PacketData(fields) merged = pd.merge() return (merged, fields) def _keep_order(fields, counter): for p in fields: counter += 1 p.order = counter if p.children: counter = _keep_order(p.children, counter) return counter def parse_bitstream(bitstream, structure): """ Parse data from bitstream structure. Parameters ---------- bitstream : `bitstream.ConstBitstream` structure : dict Name and data type mapping e.g. `{'myparam': uint:8}` Returns ------- dict The parsed data fields and bitstream """ # TODO check if faster to use ', '.join(format.values()) parsed = {name: bitstream.read(format) for name, format in structure.items()} return {"fields": parsed, "bitstream": bitstream} def parse_repeated(bitstream, structure, num_repeats): """ Parse repeated structures from a bitstream. Parameters ---------- bitstream : `bitstream.ConstBitstream` Bitstream to parse structures from structure : dict Name and data type mapping e.g. `{'myparam': uint:8}` num_repeats : int Number of repeats Returns ------- dict The repeated fields are returned as a dictionary the field name as keys and values of the list of extracted data """ out = defaultdict(list) [[out[name].append(bitstream.readlist(format)) for name, format in structure.items()] for i in range(num_repeats)] return {"fields": out, "bitstream": bitstream} def split_into_length(ar, splits): """Split a list into n chunks with a given length each Parameters ---------- ar : `list` List like input to split up splits : `list` list of n length Returns ------- `list` a list of numpy.array chunks Raises ------ ValueError if the sum of all length in splits under or overflows """ parts = [] start = 0 if isinstance(splits, int): splits = [splits] for split in splits: parts.append(np.array(ar[start : start + split])) start += split if start < len(ar): raise ValueError("Length does not match to short") elif start > len(ar): raise ValueError("Length does not match to long") return parts