Skip to content

openaleph_procrastinate.helpers

Helper functions to access Archive and FollowTheMoney data within Jobs

entity_writer(dataset)

Get the ftmstore.dataset.BulkLoader for the given dataset. The writer is flushed when leaving the context.

Source code in openaleph_procrastinate/helpers.py
@contextmanager
def entity_writer(dataset: str) -> Generator[BulkLoader, None, None]:
    """
    Get the `ftmstore.dataset.BulkLoader` for the given `dataset`. The writer is
    flushed when leaving the context.
    """
    store = get_dataset(
        dataset, origin=OPAL_ORIGIN, database_uri=settings.ftm_store_uri
    )
    loader = store.bulk()
    try:
        yield loader
    finally:
        loader.flush()

get_localpath(dataset, content_hash)

Load a file from the archive and store it in a local temporary path for further processing. The file is cleaned up after leaving the context. Reference

Source code in openaleph_procrastinate/helpers.py
def get_localpath(dataset: str, content_hash: str) -> ContextManager[Path]:
    """
    Load a file from the archive and store it in a local temporary path for
    further processing. The file is cleaned up after leaving the context.
    [Reference][openaleph_procrastinate.model.DatasetJob.get_file_references]
    """
    archive = get_archive()
    key = lookup_key(content_hash)
    return get_virtual_path(key, archive)

load_entity(dataset, entity_id)

Retrieve a single entity from the store.

Source code in openaleph_procrastinate/helpers.py
def load_entity(dataset: str, entity_id: str) -> EntityProxy:
    """
    Retrieve a single entity from the store.
    """
    store = get_dataset(dataset, database_uri=settings.ftm_store_uri)
    entity = store.get(entity_id)
    if entity is None:
        raise EntityNotFound(f"Entity `{entity_id}` not found in dataset `{dataset}`")
    return entity

open_file(dataset, content_hash)

Load a file from the archive and store it in a local temporary path for further processing. Returns an open file handler. The file is closed and cleaned up after leaving the context. Reference

Source code in openaleph_procrastinate/helpers.py
def open_file(dataset: str, content_hash: str) -> ContextManager[VirtualIO]:
    """
    Load a file from the archive and store it in a local temporary path for
    further processing. Returns an open file handler. The file is closed and
    cleaned up after leaving the context.
    [Reference][openaleph_procrastinate.model.DatasetJob.get_file_references]
    """
    archive = get_archive()
    key = lookup_key(content_hash)
    return open_virtual(key, archive)