import os
import re
import sys
import json
from enum import Enum
from pathlib import Path
from collections import defaultdict
from collections.abc import Iterable
import dateutil.parser
import requests
from intervaltree import IntervalTree
from stixcore.config.config import CONFIG
from stixcore.data.test import test_data
from stixcore.util.logging import get_logger
from stixcore.util.singleton import Singleton
__all__ = ["SOOPManager", "SoopObservationType", "KeywordSet", "HeaderKeyword", "SoopObservation", "SOOP"]
logger = get_logger(__name__)
[docs]
class KeywordSet:
"""Helper object to collect and combine `HeaderKeyword`."""
def __init__(self, s=None):
"""Create a KeywordSet collection.
Parameters
----------
s : `Iterable`, optional
a list of `HeaderKeyword` objects, by default None
"""
self.dic = dict()
if s is not None:
self.append(s)
[docs]
def add(self, element):
"""Add a new `HeaderKeyword` to the collection.
If a same `HeaderKeyword` is already present than the new keywords
gets combined into the old.
Parameters
----------
element : `HeaderKeyword`
the new keyword to add
"""
if element in self.dic:
self.dic[element] += element
else:
self.dic[element] = element
[docs]
def get(self, element):
"""Get the given keyword from the collection.
Parameters
----------
element : `HeaderKeyword`
a prototype keyword that should be return with the same name.
Returns
-------
`HeaderKeyword`
The found keyword with the same name.
"""
return self.dic[element]
[docs]
def append(self, elements):
"""Add a list of `HeaderKeyword` to the collection.
If a same `HeaderKeyword` is already present than the new keywords
gets combined into the old.
Parameters
----------
elements : Iterable<`HeaderKeyword`>
the new keywords to add
"""
if isinstance(elements, Iterable):
for e in elements:
self.add(e)
else:
self.add(elements)
[docs]
def to_list(self):
"""Get all keywords.
Returns
-------
`list`
list of all keywords
"""
return [e for e in self.dic.values()]
[docs]
class SoopObservationType(Enum):
"""Enum of possible observation types defines in LTP.
This is used for Filtering.
"""
ALL = ""
""" Use `ALL` to address all types."""
STIX_BASIC = "STIX_BASIC"
"""Basic operation products like HK, QL ..."""
STIX_ANALYSIS = "STIX_ANALYSIS"
"""Bulk science data products: detector, aspect and calibration data"""
STIX_DOWNLOAD = "STIX_DOWNLOAD"
"""Download of Data"""
STIX_GENERIC = "STIX_GENERIC"
"""STIX Generic """
[docs]
class SOOP:
"""A SOOP entry from the LTP plans."""
def __init__(self, jsonobj):
"""Create a new SOOP object based on the generic json data.
see: https://issues.cosmos.esa.int/solarorbiterwiki/pages/viewpage.action?pageId=44991195
Parameters
----------
jsonobj : `Object`
the already parsed generic object.
"""
self.encodedSoopType = jsonobj["encodedSoopType"]
self.soopInstanceId = jsonobj["soopInstanceId"]
self.soopType = jsonobj["soopType"]
self.startDate = dateutil.parser.parse(jsonobj["startDate"])
self.endDate = dateutil.parser.parse(jsonobj["endDate"])
[docs]
def to_fits_keywords(self):
"""Generate the fits header keywords derived from this SOOP.
Returns
-------
`list`
list of `HeaderKeyword`
"""
return list(
[
HeaderKeyword(name="TARGET", value="none", comment="Type of target from planning"),
HeaderKeyword(
name="SOOPTYPE", value=self.encodedSoopType, comment="Campaign ID(s) that the data belong to"
),
HeaderKeyword(
name="SOOPNAME", value=self.soopType, comment="Name of the SOOP Campaign that the data belong to"
),
]
)
def __repr__(self):
return (
f"{__class__.__name__}<{self.encodedSoopType}, {self.soopInstanceId}, "
f"{self.soopType}, {self.startDate}, {self.endDate}>"
)
[docs]
class SoopObservation:
"""An observation entry from the LTP plans."""
def __init__(self, jsonobj):
"""Create a new SoopObservation object based on the generic json data.
see: https://issues.cosmos.esa.int/solarorbiterwiki/pages/viewpage.action?pageId=44991195
Parameters
----------
jsonobj : `Object`
the already parsed generic object.
"""
self.comment = jsonobj["comment"]
self.compositeId = jsonobj["compositeId"]
self.socIds = jsonobj["socIds"]
self.name = jsonobj["name"]
self.type = SoopObservationType[jsonobj["name"]]
self.experiment = jsonobj["experiment"]
self.startDate = dateutil.parser.parse(jsonobj["startDate"])
self.endDate = dateutil.parser.parse(jsonobj["endDate"])
[docs]
def to_fits_keywords(self):
"""Generate the fits header keywords derived from this observation.
Returns
-------
`list`
list of `HeaderKeyword`
"""
return [
HeaderKeyword(
name="OBS_ID", value=";".join(self.socIds), comment="Unique ID of the individual observation"
),
HeaderKeyword(name="OBS_TYPE", value=self.compositeId["obsType"], comment="Encoded version of OBS_MODE"),
HeaderKeyword(name="OBS_MODE", value=self.name, comment="Observation mode"),
]
def __repr__(self):
return (
f"{__class__.__name__}<{self.name}, {self.type}, {self.experiment}"
f"{self.socIds}, {self.compositeId}, {self.startDate}, {self.endDate}>"
)
[docs]
class SOOPManager(metaclass=Singleton):
"""Manages LTP files provided by GFTS"""
SOOP_FILE_FILTER = "SSTX_observation_timeline_export_*.json"
SOOP_FILE_REGEX = re.compile(r".*SSTX_observation_timeline_export_.*.json$")
def __init__(self, data_root, *, mock_api=False):
"""Create the manager for a given data path root.
All existing files will be index and the dir is observed.
Parameters
----------
data_root : `str` | `pathlib.Path`
Path to the directory with all LTP files.
"""
self.data = defaultdict(lambda: {"version": 0, "soops": [], "observations": []})
self.soops = IntervalTree()
self.observations = IntervalTree()
self.filecounter = 0
self.mock_api = mock_api
self.data_root = data_root
@property
def data_root(self):
"""Get the data path root directory.
Returns
-------
`pathlib.Path`
path of the root directory
"""
return self._data_root
@data_root.setter
def data_root(self, value):
"""Set the data path root.
Parameters
----------
data_root : `str` or `pathlib.Path`
Path to the directory with all LTP files.
"""
path = Path(value)
if not path.exists():
raise ValueError(f"path not found: {value}")
self._data_root = path
files = sorted(list(self._data_root.glob(SOOPManager.SOOP_FILE_FILTER)), key=os.path.basename, reverse=True)
if len(files) == 0:
raise ValueError(f"No current SOOP files found at: {self._data_root}")
for sfile in files:
self.add_soop_file_to_index(sfile, rebuild_index=False)
self.rebuild_index()
def __getstate__(self):
# in case this gets pickled we only keep the index as in the parallel processes the
# index is read only anyway
return {"soops": self.soops, "observations": self.observations}
def __setstate__(self, data):
# we get un-pickled also we are in a parallel process we restore just the index for reading
self._data_root = None
self.soops = data["soops"]
self.observations = data["observations"]
[docs]
def find_soops(self, *, start, end=None):
"""Search for all SOOPs in the index.
Parameters
----------
start : `datetime`
start time to look for overlapping SOOPs in utc time
end : `datetime`, optional
end time to look for overlapping SOOPs in utc time, by default None ()
Returns
-------
`list`
list of found `SOOP` in all indexed LTP overlapping the given timeperiod/point
"""
if end is None:
intervals = self.soops.at(start)
else:
intervals = self.soops.overlap(start, end)
return list([o.data for o in intervals])
[docs]
def download_all_soops_from_api(self, ltpname, version, destination):
soops = {"info": {}, "soops": [], "observations": [], "events": ""}
if self.mock_api:
with open(test_data.soop.API_RES) as fp:
soops = json.load(fp)
else:
endpoint = CONFIG.get("SOOP", "endpoint")
user = CONFIG.get("SOOP", "user", fallback="user")
pw = CONFIG.get("SOOP", "password", fallback="pw")
request_client = requests.session()
l_url = f"{endpoint}/authentication?j_username={user}&j_password={pw}&rememberme=true&submit=Login" # noqa
request_client.get(l_url)
# sets cookie
xref_token = request_client.cookies["XSRF-TOKEN"]
login_headers = {
"Cookie": f"XSRF-TOKEN={xref_token};",
"X-XSRF-TOKEN": f"{xref_token}",
"Connection": "keep-alive",
}
post = requests.post(url=l_url, headers=login_headers)
if not post.ok:
logger.error(f"SOOP API ERROR: {post.status_code} > {post.reason}")
elif 400 <= post.status_code < 404:
logger.error("SOOP API LOGIN ERROR")
else:
jsession_id = post.cookies["JSESSIONID"]
p_url = f"{endpoint}/plan/ote/{ltpname}/{version}"
logger.info(f"SOOP API download: {p_url}")
je_headers = {
"Cookie": f"XSRF-TOKEN={xref_token}; JSESSIONID={jsession_id}",
"X-XSRF-TOKEN": f"{xref_token}",
}
try:
api_soops = request_client.get(url=p_url, headers=je_headers).json()
# clean unnecessary data
api_soops["events"] = ""
api_soops["observations"] = []
soops = api_soops
except Exception as e:
logger.error("SOOP API ERROR", e, stack_info=True)
if not destination.parent.exists():
logger.info(f"path not found for soop download: {destination.parent} creating dir")
destination.parent.mkdir(parents=True, exist_ok=True)
with open(destination, "w") as fp:
json.dump(soops, fp, indent=4)
[docs]
def find_observations(self, *, start, end=None, otype=SoopObservationType.ALL):
"""Search for all observations in the index.
Parameters
----------
start : `datetime`
start time to look for overlapping observations in utc time
end : `datetime`, optional
end time to look for overlapping observations in utc time, by default None ()
otype : `SoopObservationType`, optional
filter for specific type, by default SoopObservationType.ALL
Returns
-------
`list`
list of found `SOOPObservation` in all indexed LTP overlapping the given
timeperiod/point and matching the SoopObservationType.
"""
intervals = set()
if end is None:
intervals = self.observations.at(start)
else:
intervals = self.observations.overlap(start, end)
if len(intervals) > 0 and otype != SoopObservationType.ALL:
intervals = set([o for o in intervals if o.data.type == otype])
return list([o.data for o in intervals])
[docs]
def get_keywords(self, *, start, end=None, otype=SoopObservationType.ALL):
"""Searches for corresponding entries (SOOPs and Observations) in the index LTPs.
Based on all found entries for the filter parameters a list of
HeaderKeyword is generated combining all available information.
Parameters
----------
start : `datetime`
start time to look for overlapping observations and SOOPs in utc time
end : `datetime`, optional
end time to look for overlapping observations and SOOPs in utc time, by default None ()
otype : `SoopObservationType`, optional
filter for specific type of observations, by default SoopObservationType.ALL
Returns
-------
`list`
A list of `HeaderKeyword`
Raises
------
ValueError
if no SOOPs or Observations where found in the index LTPs for the given filter settings.
"""
kwset = KeywordSet()
soops = self.find_soops(start=start, end=end)
if len(soops) == 0:
logger.info(f"No soops found for time: {start} - {end}")
for soop in soops:
logger.info(f"Soop found: {soop}")
kwset.append(soop.to_fits_keywords())
obss = self.find_observations(start=start, end=end, otype=otype)
if len(obss) == 0:
logger.info(f"No observations found for time: {start} - {end} : {otype}")
for obs in obss:
kwset.append(obs.to_fits_keywords())
return kwset.to_list()
[docs]
def rebuild_index(self):
self.soops = IntervalTree()
self.observations = IntervalTree()
for ltp_name in self.data.keys():
ltp = self.data[ltp_name]
for jsond in ltp["soops"]:
soop = SOOP(jsond)
self.soops.addi(soop.startDate, soop.endDate, soop)
for jsond in ltp["observations"]:
obs = SoopObservation(jsond)
self.observations.addi(obs.startDate, obs.endDate, obs)
logger.info(f"SOOP Rebuild Index: soops: {len(self.soops)}observations {len(self.observations)}")
[docs]
def add_soop_file_to_index(self, path, *, rebuild_index=True, **args):
logger.info(f"Read SOOP file: {path}")
with open(path) as f:
ltp_stix = json.load(f)
plan = str(ltp_stix["info"]["name"])
version = int(ltp_stix["info"]["internalVersion"])
all_soop_file = Path(CONFIG.get("SOOP", "soop_files_download")) / f"{plan}.{version}.all.json"
if not all_soop_file.exists():
self.download_all_soops_from_api(plan, version, all_soop_file)
with open(all_soop_file) as f_all:
ltp_data_all = json.load(f_all)
if self.data[plan]["version"] <= version:
logger.info(f"newer version ({version}) found for {plan}: replacing")
# replace to new version
self.data[plan]["version"] = version
# for soops we store all soops from all instruments
self.data[plan]["soops"] = ltp_data_all["soops"]
# for observations we only store STIX related
self.data[plan]["observations"] = ltp_stix["observations"]
else:
logger.info(f"older version ({version}) found for {plan}: skipping")
if rebuild_index:
self.rebuild_index()
self.filecounter += 1
if "pytest" in sys.modules:
# only set the global in test scenario
SOOPManager.instance = SOOPManager(test_data.soop.DIR, mock_api=True)