Source code for punchbowl.auto.control.cache_layer.manager
import contextlib
from glob import glob
from multiprocessing.shared_memory import SharedMemory
from prefect import get_run_logger
from prefect.exceptions import MissingContextError
from prefect.variables import Variable
CACHE_KEY_PREFIX = "punchpipe-cache-"
[docs]
def caching_is_enabled() -> bool:
return Variable.get("use_shm_cache", False)
[docs]
class ExportableWrapper:
"""
Allows shared memory buffers to be yielded without "leaking" a reference
If try_read_from_key just does `yield shm.buf`, then the calling function will still have a reference to our
shared memory buffer when we enter our `finally` block and try to close the memory. By instead yielding this
wrapper, we can invalidate its reference to the shared memory and close it.
"""
def __init__(self, buffer):
self.data = buffer
[docs]
@contextlib.contextmanager
def try_read_from_key(key) -> ExportableWrapper | None:
shm = None
wrapper = None
try:
shm = SharedMemory(CACHE_KEY_PREFIX + key, track=False)
if shm.buf[0] == 1:
wrapper = ExportableWrapper(shm.buf[1:])
yield wrapper
else:
yield None
except (FileNotFoundError, ValueError):
yield None
finally:
if wrapper is not None:
# This allows `shm` to be closed---otherwise it complains than an "exported buffer" exists
wrapper.data = None
if shm is not None:
shm.close()
[docs]
def try_write_to_key(key, data):
shm = None
try:
shm = SharedMemory(CACHE_KEY_PREFIX + key, create=True, size=len(data) + 1, track=False)
# buf[0] will be a sentinel value to indicate that the rest of the data is in place, in case another process
# opens this shared memory while we're still copying
shm.buf[0] = 0
shm.buf[1:len(data)+1] = data
shm.buf[0] = 1
try:
get_run_logger().info(f"Saved to cache key {key}")
except MissingContextError:
pass # we're not in a flow so we don't log
except FileExistsError:
pass
finally:
if shm is not None:
shm.close()
[docs]
def get_existing_cache_files():
return glob(f"/dev/shm/{CACHE_KEY_PREFIX}*")