Source code for stixcore.processing.decompression

"""Processing module for applying the skm decompression for configured parameters."""

from stixpy.calibration.compression import decompress as algo_decompress

from stixcore.tmtc.parameter import CompressedParameter

__all__ = ["decompress"]


def apply_decompress(raw, skm):
    """Wrap the decompression algorithm into a callback.

    Parameters
    ----------
    raw : `stixcore.tmtc.parser.Parameter`
        will be the old parameter value (input)
    skmp : `list[stixcore.tmtc.parser.Parameter]`
        list of compression settings [s, k, m]

    Returns
    -------
    CompressedParameter
        A uncompressed version of the parameter
    """
    try:
        decompressed, error = algo_decompress(
            raw.value, s=skm[0].value, k=skm[1].value, m=skm[2].value, return_variance=True
        )
    except AttributeError:
        # If the compression scheme parameters are overridden they will be int no parameters
        decompressed, error = algo_decompress(raw.value, s=skm[0], k=skm[1], m=skm[2], return_variance=True)
    return CompressedParameter(
        name=raw.name,
        idb_info=raw.idb_info,
        value=raw.value,
        decompressed=decompressed,
        error=error,
        skm=skm,
        order=raw.order,
    )


[docs] def decompress(packet): """Apply parameter decompression for the entire packet. Gets all parameters to decompress from configuration. Parameters ---------- packet : `GenericTMPacket` The TM packet Returns ------- `int` How many times the decompression algorithm was called. """ decompression_parameter = packet.get_decompression_parameter() if not decompression_parameter: return 0 c = 0 for param_name, (sn, kn, mn) in decompression_parameter.items(): skm = ( sn if isinstance(sn, int) else packet.data.get(sn), kn if isinstance(kn, int) else packet.data.get(kn), mn if isinstance(mn, int) else packet.data.get(mn), ) c += packet.data.apply(param_name, apply_decompress, skm) return c