Skip to content

openaleph_procrastinate.helpers

Helper functions to access Archive and FollowTheMoney data within Jobs

entity_writer(dataset)

Get the ftmstore.dataset.BulkLoader for the given dataset. The entities are flushed when leaving the context.

Source code in openaleph_procrastinate/helpers.py
@contextmanager
def entity_writer(dataset: str) -> Generator[BulkLoader, None, None]:
    """
    Get the `ftmstore.dataset.BulkLoader` for the given `dataset`. The entities
    are flushed when leaving the context.
    """
    store = get_dataset(
        dataset, origin=OPAL_ORIGIN, database_uri=settings.ftm_store_uri
    )
    loader = store.bulk()
    try:
        yield loader
    finally:
        loader.flush()

get_localpath(dataset, content_hash)

Load a file from the archive and store it in a local temporary path for further processing. The file is cleaned up after leaving the context.

Danger

This is not tested.

Source code in openaleph_procrastinate/helpers.py
@contextmanager
def get_localpath(dataset: str, content_hash: str) -> Generator[Path, None, None]:
    """
    Load a file from the archive and store it in a local temporary path for
    further processing. The file is cleaned up after leaving the context.

    !!! danger
        This is not tested.
    """
    archive = get_archive()
    key = lookup_key(content_hash)
    store = get_virtual()
    path = store.download(key, archive)
    try:
        yield Path(path)
    finally:
        store.cleanup(path)

load_entity(dataset, entity_id)

Retrieve a single entity from the store.

Source code in openaleph_procrastinate/helpers.py
def load_entity(dataset: str, entity_id: str) -> EntityProxy:
    """
    Retrieve a single entity from the store.
    """
    store = get_dataset(dataset, database_uri=settings.ftm_store_uri)
    entity = store.get(entity_id)
    if entity is None:
        raise EntityNotFound(f"Entity `{entity_id}` not found in dataset `{dataset}`")
    return entity

open_file(dataset, content_hash)

Load a file from the archive and store it in a local temporary path for further processing. Returns an open file handler. The file is closed and cleaned up after leaving the context.

Danger

This is not tested.

Source code in openaleph_procrastinate/helpers.py
@contextmanager
def open_file(dataset: str, content_hash: str) -> Generator[BinaryIO, None, None]:
    """
    Load a file from the archive and store it in a local temporary path for
    further processing. Returns an open file handler. The file is closed and
    cleaned up after leaving the context.

    !!! danger
        This is not tested.
    """
    archive = get_archive()
    key = lookup_key(content_hash)
    with archive.open(key) as handler:
        try:
            yield handler
        finally:
            handler.close()