punchbowl.auto.flows.level0#
Attributes#
Classes#
A base class for implementing a block that wraps an external service. |
|
Like the parent class, but takes an astropy Time object (which inherently encodes a |
|
Interface for passing callable objects instead of file paths to be loaded. |
Functions#
Ensure the parent proc's database connections are not touched |
|
|
Unpack image compression control register value. |
|
Unpack CEB image acquisition register value. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Convert an ECI quaternion to RA and Dec. |
|
Create the preliminary WCS for punchbowl |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Generates a short hash of specified length using MD5 and base64 encoding. |
|
Module Contents#
- punchbowl.auto.flows.level0.FIXED_PACKETS = ['ENG_XACT', 'ENG_LED', 'ENG_PFW', 'ENG_CEB', 'ENG_LZ']#
- punchbowl.auto.flows.level0.VARIABLE_PACKETS = ['SCI_XFI']#
- punchbowl.auto.flows.level0.PACKET_CADENCE#
- punchbowl.auto.flows.level0.SC_TIME_EPOCH#
- punchbowl.auto.flows.level0.NFI_PFW_POSITION_MAPPING = ['PM', 'DK', 'PZ', 'PP', 'CR']#
- punchbowl.auto.flows.level0.WFI_PFW_POSITION_MAPPING = ['PP', 'DK', 'PZ', 'PM', 'CR']#
- punchbowl.auto.flows.level0.credentials#
- punchbowl.auto.flows.level0.engine#
- punchbowl.auto.flows.level0.initializer()#
Ensure the parent proc’s database connections are not touched in the new connection pool
- class punchbowl.auto.flows.level0.SpacecraftMapping(*args: Any, **kwargs: Any)#
Bases:
prefect.blocks.core.BlockA base class for implementing a block that wraps an external service.
This class can be defined with an arbitrary set of fields and methods, and couples business logic with data contained in an block document. _block_document_name, _block_document_id, _block_schema_id, and _block_type_id are reserved by Prefect as Block metadata fields, but otherwise a Block can implement arbitrary logic. Blocks can be instantiated without populating these metadata fields, but can only be used interactively, not with the Prefect API.
Instead of the __init__ method, a block implementation allows the definition of a block_initialization method that is called after initialization.
- mapping: prefect.blocks.fields.SecretDict#
- class punchbowl.auto.flows.level0.TaiDatetimeConverter(since: astropy.time.Time, units: str | tuple[str])#
Bases:
ccsdspy.converters.DatetimeConverterLike the parent class, but takes an astropy Time object (which inherently encodes a timescale) instead of a datetime for the since initialization argument, and uses astropy TimeDelta objects for date math (instead of leapsecond-naïve Python timedeltas). Values are treated as offsets on a TAI timescale.
- _since#
- _units#
- convert(*field_arrays)#
Apply the datetime conversion.
- Parameters:
field_arrays (list of NumPy array) – list of decoded packet field values, each must have at least one dimension
- Returns:
converted – converted form of the decoded packet field values
- Return type:
NumPy array of object (holding datetimes)
- Raises:
ValueError – Too many or too few units were provided, as compared to the input field arrays sent.
- punchbowl.auto.flows.level0.unpack_compression_settings(com_set_val: bytes | int)#
Unpack image compression control register value.
See SciPacket.COMPRESSION_REG for details.
- punchbowl.auto.flows.level0.unpack_acquisition_settings(acq_set_val: bytes | int)#
Unpack CEB image acquisition register value.
See SciPacket.ACQUISITION_REG for details.
- punchbowl.auto.flows.level0.read_tlm_defs(path)#
- punchbowl.auto.flows.level0.get_ccsds_data_type(sheet_type, data_size)#
- punchbowl.auto.flows.level0.create_packet_definitions(tlm, parse_expanding_fields=True)#
- punchbowl.auto.flows.level0.detect_new_tlm_files(pipeline_config: dict, session=None) list[str]#
- punchbowl.auto.flows.level0.ingest_tlm_file(path: str, defs: dict[str, ccsdspy.VariableLength | ccsdspy.FixedLength], apid_name2num: dict[str, int])#
- punchbowl.auto.flows.level0.unpack_n_bit_values(packed: bytes, byteorder: str, n_bits=19) numpy.ndarray#
- punchbowl.auto.flows.level0.organize_lz_fits_keywords(lz_packet_db, lz_packet)#
- punchbowl.auto.flows.level0.organize_pfw_fits_keywords(pfw_packet_db, pfw_packet)#
- punchbowl.auto.flows.level0.organize_led_fits_keywords(led_packet_db, led_packet)#
- punchbowl.auto.flows.level0.organize_ceb_fits_keywords(ceb_packet_db, ceb_packet)#
- punchbowl.auto.flows.level0.organize_spacecraft_position_keywords(observation_time, before_xact_db, before_xact)#
- punchbowl.auto.flows.level0.organize_compression_and_acquisition_settings(compression_settings, acquisition_settings)#
- punchbowl.auto.flows.level0.organize_gain_info(spacecraft_id)#
- punchbowl.auto.flows.level0.decode_image_packets(img_packets, compression_settings)#
- punchbowl.auto.flows.level0.determine_file_type(polarizer_packet, pfw_is_out_of_date, led_info, image_shape) str#
- punchbowl.auto.flows.level0.get_metadata(first_image_packet, image_shape, session, defs, apid_name2num, pfw_recency_requirement=3, xact_recency_requirement=3) tuple[dict[str, Any], dict[str, Any]]#
- punchbowl.auto.flows.level0.eci_quaternion_to_ra_dec(q, obstime)#
Convert an ECI quaternion to RA and Dec.
- Parameters:
q – A numpy array representing the ECI quaternion (q0, q1, q2, q3).
- Returns:
Right Ascension in degrees. dec: Declination in degrees.
- Return type:
ra
- punchbowl.auto.flows.level0.form_preliminary_wcs(soc_spacecraft_id, metadata, plate_scale)#
Create the preliminary WCS for punchbowl
- punchbowl.auto.flows.level0.form_single_image_caller(args)#
- punchbowl.auto.flows.level0.form_single_image(spacecraft, t, defs, apid_name2num, pipeline_config, spacecraft_secrets, outlier_limits, masks, processing_flow_id)#
- punchbowl.auto.flows.level0.level0_form_images(pipeline_config, defs, apid_name2num, outlier_limits, masks, session, logger, processing_flow_id)#
- punchbowl.auto.flows.level0.level0_core_flow(pipeline_config: dict, skip_if_no_new_tlm: bool = True, limit_files: list[str] = None, mask_files: list[str] = None, processing_flow_id=None)#
- punchbowl.auto.flows.level0.get_outlier_limits_paths(session, reference_time)#
- punchbowl.auto.flows.level0.get_mask_paths(session, reference_time)#
- punchbowl.auto.flows.level0.level0_construct_flow_info(pipeline_config: dict, session, skip_if_no_new_tlm: bool = True)#
- punchbowl.auto.flows.level0.level0_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None)#
- punchbowl.auto.flows.level0.level0_process_flow(flow_id: int, pipeline_config_path=None, session=None)#
- punchbowl.auto.flows.level0.open_and_split_packet_file(path: str) dict[int, io.BytesIO]#
- punchbowl.auto.flows.level0.parse_telemetry_file(path, defs, apid_name2num)#
- punchbowl.auto.flows.level0.short_hash(data, length=8)#
Generates a short hash of specified length using MD5 and base64 encoding.
- class punchbowl.auto.flows.level0.TLMLoader(path: str, defs, apid_name2num)#
Bases:
punchbowl.auto.control.cache_layer.loader_base_class.LoaderABC[dict]Interface for passing callable objects instead of file paths to be loaded.
- path#
- defs#
- apid_name2num#
- gen_key() str#
Generate a cache key
- src_repr() str#
Return a string representation of the source data (e.g. a file path)
- load_from_disk()#
Load the object
- __repr__()#
Return a string representation of this loader
- punchbowl.auto.flows.level0.wrap_if_appropriate(psf_path: str, defs, apid_name2num) str | collections.abc.Callable#