Skip to content

openaleph_procrastinate.helpers

Helper functions to access Archive and FollowTheMoney data within Jobs

entity_writer(dataset, origin=OPAL_ORIGIN)

Get the ftmq.store.fragments.BulkLoader for the given dataset. The writer is flushed when leaving the context.

Source code in openaleph_procrastinate/helpers.py
@contextmanager
def entity_writer(
    dataset: str, origin: str = OPAL_ORIGIN
) -> Generator[BulkLoader, None, None]:
    """
    Get the `ftmq.store.fragments.BulkLoader` for the given `dataset`. The
    writer is flushed when leaving the context.
    """
    store = get_fragments(
        dataset,
        origin=origin,
        database_uri=settings.fragments_uri,
        **sqlalchemy_pool,
    )
    loader = store.bulk()
    try:
        yield loader
    finally:
        loader.flush()

get_localpath(dataset, content_hash)

Load a file from the archive and store it in a local temporary path for further processing. The file is cleaned up after leaving the context. Reference

Source code in openaleph_procrastinate/helpers.py
def get_localpath(dataset: str, content_hash: str) -> ContextManager[Path]:
    """
    Load a file from the archive and store it in a local temporary path for
    further processing. The file is cleaned up after leaving the context.
    [Reference][openaleph_procrastinate.model.DatasetJob.get_file_references]
    """
    archive = get_archive()
    key = lookup_key(content_hash)
    return archive.local_path(key)

load_entities(dataset, entity_ids)

Batch retrieve entities from the fragment store.

Source code in openaleph_procrastinate/helpers.py
def load_entities(
    dataset: str, entity_ids: Iterable[str]
) -> Generator[EntityProxy, None, None]:
    """
    Batch retrieve entities from the fragment store.
    """
    store = get_fragments(
        dataset, database_uri=settings.fragments_uri, **sqlalchemy_pool
    )
    yield from store.iterate(entity_ids)

open_file(dataset, content_hash)

Load a file from the archive and store it in a local temporary path for further processing. Returns an open file handler. The file is closed and cleaned up after leaving the context. Reference

Source code in openaleph_procrastinate/helpers.py
def open_file(dataset: str, content_hash: str) -> ContextManager[VirtualIO]:
    """
    Load a file from the archive and store it in a local temporary path for
    further processing. Returns an open file handler. The file is closed and
    cleaned up after leaving the context.
    [Reference][openaleph_procrastinate.model.DatasetJob.get_file_references]
    """
    archive = get_archive()
    key = lookup_key(content_hash)
    return archive.local_open(key, algorithm=ARCHIVE_CHECKSUM_ALGORITHM)