Source code for stixcore.tmtc.packet_factory
from binascii import unhexlify
from stixcore.tmtc.packets import GenericPacket, GenericTMPacket, SourcePacketHeader
__all__ = [
"Packet",
"BaseFactory",
"TMTCPacketFactory",
"TMPacketFactory",
"MultipleMatchError",
"NoMatchError",
"ValidationFunctionError",
]
[docs]
class BaseFactory:
"""
An abstract base factory
"""
def __init__(self, registry=None):
"""
Method for running the factory.
Arguments args and kwargs are passed through to the validation
function and to the constructor for the final type.
Parameters
----------
registry
"""
if registry is None:
self.registry = dict()
else:
self.registry = registry
[docs]
def __call__(self, *args, **kwargs):
return self._check_registered(*args, **kwargs)
def _check_registered(self, *args, **kwargs):
"""
Implementation of a basic check to see if arguments match against the registered classes.
Parameters
----------
data
Input data
Returns
-------
The registered class if found.
"""
candidates = list()
for key in self.registry:
# Call the registered validation function for each registered class
if self.registry[key](*args):
candidates.append(key)
n_matches = len(candidates)
if n_matches == 0:
raise NoMatchError("No types match specified arguments")
elif n_matches > 1:
raise MultipleMatchError("Too many candidate types identified ")
# Only one is found
PacketType = candidates[0] # noqa
return PacketType(*args, **kwargs)
[docs]
def register(self, PacketType, validation_function): # noqa
if validation_function is not None:
if not callable(validation_function):
raise AttributeError("Keyword argument 'validation_function' must be callable.")
self.registry[PacketType] = validation_function
[docs]
def unregister(self, PacketType): # noqa
self.registry.pop(PacketType)
[docs]
class TMTCPacketFactory(BaseFactory):
"""
Factory from TM/TC packets returning either TM or TC Packets
"""
def __init__(self, registry=None):
super().__init__(registry=registry)
self.tm_packet_factory = TMPacketFactory(registry=GenericTMPacket._registry) # noqa
[docs]
def __call__(self, data, **kwargs):
if isinstance(data, str):
data = unhexlify(data)
sph = SourcePacketHeader(data)
packet = self._check_registered(sph)
return self.tm_packet_factory(packet, **kwargs)
[docs]
class TMPacketFactory(BaseFactory):
"""
Factory from TM packet return the correct type of based on the packet data and registered.
"""
def __init__(self, registry=None):
super().__init__(registry=registry)
[docs]
def __call__(self, data, **kwargs):
return self._check_registered(data, **kwargs)
[docs]
class NoMatchError(Exception):
"""
Exception for when no candidate class is found.
"""
[docs]
class MultipleMatchError(Exception):
"""
Exception for when too many candidate classes are found.
"""
[docs]
class ValidationFunctionError(AttributeError):
"""
Exception for when no candidate class is found.
"""
Packet = TMTCPacketFactory(registry=GenericPacket._registry)