import os
from sqlalchemy import TEXT, Boolean, Column, Float, Index, Integer, SmallInteger, String
from sqlalchemy.dialects.mysql import DATETIME, MEDIUMTEXT
from sqlalchemy.orm import declarative_base
Base = declarative_base()
[docs]
class File(Base):
__tablename__ = "files"
file_id = Column(Integer, primary_key=True)
level = Column(String(1), nullable=False)
file_type = Column(String(2), nullable=False)
observatory = Column(String(1), nullable=False)
file_version = Column(String(16), nullable=False)
software_version = Column(String(35), nullable=False)
date_created = Column(DATETIME(fsp=6), nullable=True)
date_obs = Column(DATETIME(fsp=6), nullable=False)
date_beg = Column(DATETIME(fsp=6), nullable=True)
date_end = Column(DATETIME(fsp=6), nullable=True)
polarization = Column(String(2), nullable=True)
state = Column(String(64), nullable=False)
outlier = Column(SmallInteger, nullable=False, default=False)
bad_packets = Column(Boolean, nullable=False, default=False)
processing_flow = Column(Integer, nullable=True)
crota = Column(Float, nullable=True)
[docs]
def summary(self):
return (
f"File {self.file_id}, level {self.level}, type {self.file_type}{self.observatory}, state {self.state}\n"
f"File version {self.file_version}, from software {self.software_version}\n"
f"date_obs {self.date_obs}, created {self.date_created}\n"
)
[docs]
def __repr__(self):
return f"File(id={self.file_id!r})"
[docs]
def filename(self) -> str:
"""
Constructs the filename for this file
Returns
-------
str
properly formatted PUNCH filename
"""
return f'PUNCH_L{self.level}_{self.file_type}{self.observatory}_{self.date_obs.strftime("%Y%m%d%H%M%S")}_v{self.file_version}.fits'
[docs]
def directory(self, root: str):
"""
Constructs the directory the file should be stored in
Parameters
----------
root : str
the root directory where the top level PUNCH file hierarchy is
Returns
-------
str
the place to write the file
"""
return os.path.join(root, self.level, self.file_type + self.observatory, self.date_obs.strftime("%Y/%m/%d"))
Index("get_ready_files", File.state, File.level, File.date_obs.desc(), File.file_type, File.observatory)
Index("get_ready_files_alt", File.level, File.date_obs.desc(), File.file_type, File.state)
Index("construct_background", File.level, File.observatory, File.outlier, File.date_obs, File.state, File.file_type)
Index("get_cal_file", File.file_type, File.observatory, File.date_obs, File.state)
Index("CNN", File.file_type, File.observatory, File.level, File.state, File.outlier)
Index("processing_flow_index", File.processing_flow)
# This is really useful for a files dashboard "show me what we made today" query
Index("date_created_index", File.date_created, File.file_type, File.level)
Index("level0_uniqueness",
File.level, File.polarization, File.file_type,
File.observatory, File.file_version, File.date_obs,
unique=True)
[docs]
class Flow(Base):
__tablename__ = "flows"
flow_id = Column(Integer, primary_key=True)
flow_level = Column(String(1), nullable=False)
flow_type = Column(String(64), nullable=False)
flow_run_name = Column(String(64), nullable=True)
flow_run_id = Column(String(36), nullable=True)
state = Column(String(16), nullable=False)
creation_time = Column(DATETIME(fsp=6), nullable=False)
launch_time = Column(DATETIME(fsp=6), nullable=True)
start_time = Column(DATETIME(fsp=6), nullable=True)
end_time = Column(DATETIME(fsp=6), nullable=True)
priority = Column(Integer, nullable=False)
call_data = Column(MEDIUMTEXT, nullable=True)
is_backprocessing = Column(Boolean, nullable=False, default=False)
[docs]
def __repr__(self):
return f"Flow(id={self.flow_id!r})"
Index("flow_stats", Flow.end_time, Flow.flow_type, Flow.state)
Index("flow_cards", Flow.start_time, Flow.flow_level, Flow.flow_type)
Index("escalate_flows", Flow.state, Flow.flow_type, Flow.priority, Flow.creation_time)
[docs]
class FileRelationship(Base):
__tablename__ = "relationships"
relationship_id = Column(Integer, primary_key=True)
parent = Column(Integer, nullable=False)
child = Column(Integer, nullable=False)
Index("relationship_parent_index", FileRelationship.parent, mysql_using="hash", mariadb_using="hash")
Index("relationship_child_index", FileRelationship.child, mysql_using="hash", mariadb_using="hash")
[docs]
class TLMFiles(Base):
__tablename__ = "tlm_files"
tlm_id = Column(Integer, primary_key=True)
path = Column(String(128), nullable=False)
successful = Column(Boolean, nullable=False)
num_attempts = Column(Integer, nullable=False)
last_attempt = Column(DATETIME(fsp=6), nullable=True)
[docs]
class Health(Base):
__tablename__ = "health"
health_id = Column(Integer, primary_key=True)
datetime = Column(DATETIME(fsp=6), nullable=False)
cpu_usage = Column(Float, nullable=False)
memory_usage = Column(Float, nullable=False)
memory_percentage = Column(Float, nullable=False)
disk_usage = Column(Float, nullable=False)
disk_percentage = Column(Float, nullable=False)
num_pids = Column(Integer, nullable=False)
[docs]
class PacketHistory(Base):
__tablename__ = "packet_history"
id = Column(Integer, primary_key=True)
datetime = Column(DATETIME(fsp=6), nullable=False)
num_images_succeeded = Column(Integer, nullable=False)
num_images_failed = Column(Integer, nullable=False)
[docs]
class SCI_XFI(Base):
__tablename__ = "sci_xfi"
id = Column(Integer, primary_key=True)
tlm_id = Column(Integer)
spacecraft_id = Column(Integer, nullable=False)
packet_index = Column(Integer, nullable=False)
ccsds_sequence_count = Column(Integer, nullable=False)
ccsds_packet_length = Column(Integer, nullable=False)
timestamp = Column(DATETIME(fsp=6), nullable=False, index=True)
is_used = Column(Boolean, nullable=False)
num_attempts = Column(Integer, nullable=True)
last_attempt = Column(DATETIME(fsp=6), nullable=True)
last_skip_reason = Column(TEXT, nullable=True)
flash_block = Column(Integer, nullable=False)
compression_settings = Column(Integer, nullable=False)
acquisition_settings = Column(Integer, nullable=False)
packet_group = Column(Integer, nullable=False)
priority = Column(Integer, nullable=False, default=0)
Index("unused_packet_query", SCI_XFI.spacecraft_id, SCI_XFI.is_used)
[docs]
class ENG_CEB(Base):
__tablename__ = "eng_ceb"
id = Column(Integer, primary_key=True)
tlm_id = Column(Integer)
spacecraft_id = Column(Integer, nullable=False)
packet_index = Column(Integer, nullable=False)
ccsds_sequence_count = Column(Integer, nullable=False)
ccsds_packet_length = Column(Integer, nullable=False)
timestamp = Column(DATETIME(fsp=6), nullable=False, index=True)
Index("metadata_query", ENG_CEB.spacecraft_id, ENG_CEB.timestamp)
[docs]
class ENG_PFW(Base):
__tablename__ = "eng_pfw"
id = Column(Integer, primary_key=True)
tlm_id = Column(Integer)
spacecraft_id = Column(Integer, nullable=False)
packet_index = Column(Integer, nullable=False)
ccsds_sequence_count = Column(Integer, nullable=False)
ccsds_packet_length = Column(Integer, nullable=False)
timestamp = Column(DATETIME(fsp=6), nullable=False, index=True)
Index("metadata_query", ENG_PFW.spacecraft_id, ENG_PFW.timestamp)
[docs]
class ENG_XACT(Base):
__tablename__ = "eng_xact"
id = Column(Integer, primary_key=True)
tlm_id = Column(Integer)
spacecraft_id = Column(Integer, nullable=False)
packet_index = Column(Integer, nullable=False)
ccsds_sequence_count = Column(Integer, nullable=False)
ccsds_packet_length = Column(Integer, nullable=False)
timestamp = Column(DATETIME(fsp=6), nullable=False, index=True)
Index("metadata_query", ENG_XACT.spacecraft_id, ENG_XACT.timestamp)
[docs]
class ENG_LZ(Base):
__tablename__ = "eng_lz"
id = Column(Integer, primary_key=True)
tlm_id = Column(Integer)
spacecraft_id = Column(Integer, nullable=False)
packet_index = Column(Integer, nullable=False)
ccsds_sequence_count = Column(Integer, nullable=False)
ccsds_packet_length = Column(Integer, nullable=False)
timestamp = Column(DATETIME(fsp=6), nullable=False, index=True)
Index("metadata_query", ENG_LZ.spacecraft_id, ENG_LZ.timestamp)
[docs]
class ENG_LED(Base):
__tablename__ = "eng_led"
id = Column(Integer, primary_key=True)
tlm_id = Column(Integer)
spacecraft_id = Column(Integer, nullable=False)
packet_index = Column(Integer, nullable=False)
ccsds_sequence_count = Column(Integer, nullable=False)
ccsds_packet_length = Column(Integer, nullable=False)
timestamp = Column(DATETIME(fsp=6), nullable=False, index=True)
led_start_time = Column(DATETIME(fsp=6), nullable=False, index=True)
led_end_time = Column(DATETIME(fsp=6), nullable=False, index=True)
Index("metadata_query", ENG_LED.spacecraft_id, ENG_LED.timestamp)
PACKETNAME2SQL = {"SCI_XFI": SCI_XFI,
"ENG_CEB": ENG_CEB,
"ENG_PFW": ENG_PFW,
"ENG_XACT": ENG_XACT,
"ENG_LED": ENG_LED,
"ENG_LZ": ENG_LZ}
[docs]
def get_closest_file(f_target: File, f_others: list[File]) -> File:
return min(f_others, key=lambda o: abs((f_target.date_obs - o.date_obs).total_seconds()))
[docs]
def get_closest_before_file(f_target: File, f_others: list[File]) -> File:
return get_closest_file(f_target, [o for o in f_others if f_target.date_obs >= o.date_obs])
[docs]
def get_closest_after_file(f_target: File, f_others: list[File]) -> File:
return get_closest_file(f_target, [o for o in f_others if f_target.date_obs <= o.date_obs])