Skip to content

openaleph_procrastinate.defer

Known stages to defer jobs to within the OpenAleph stack.

See Settings for configuring queue names and tasks.

Conventions / common pattern: Tasks are responsible to explicitly defer following tasks. This defer call is not conditional but happens always, but actually deferring happens in this module and is depending on runtime settings (see below).

Example
from openaleph_procrastinate import defer

@task(app=app)
def analyze(job: DatasetJob) -> None:
    result = analyze_entities(job.load_entities())
    # defer to index stage
    defer.index(app, job.dataset, result)

To disable deferring for a service, use environment variable:

For example, to disable indexing entities after ingestion, start the ingest-file worker with this config: OPENALEPH_INDEX_DEFER=0

analyze(app, dataset, entities, **context)

Defer a new job for ftm-analyze It will only deferred if OPENALEPH_ANALYZE_DEFER=1 (the default)

Parameters:

Name Type Description Default
app App

The procrastinate app instance

required
dataset str

The ftm dataset or collection

required
entities Iterable[EntityProxy]

The entities to analyze

required
context Any

Additional job context

{}
Source code in openaleph_procrastinate/defer.py
def analyze(
    app: App, dataset: str, entities: Iterable[EntityProxy], **context: Any
) -> None:
    """
    Defer a new job for `ftm-analyze`
    It will only deferred if `OPENALEPH_ANALYZE_DEFER=1` (the default)

    Args:
        app: The procrastinate app instance
        dataset: The ftm dataset or collection
        entities: The entities to analyze
        context: Additional job context
    """
    if tasks.analyze.defer:
        job = DatasetJob.from_entities(
            dataset=dataset,
            queue=tasks.analyze.queue,
            task=tasks.analyze.task,
            entities=entities,
            dehydrate=True,
            **context,
        )
        job.defer(app, tasks.analyze.get_priority())

Defer a new job to export_search into OpenAleph It will only deferred if OPENALEPH_EXPORT_SEARCH_DEFER=1 (the default)

Parameters:

Name Type Description Default
app App

The procrastinate app instance

required
context Any

Additional job context

{}
Source code in openaleph_procrastinate/defer.py
def export_search(app: App, **context: Any) -> None:
    """
    Defer a new job to export_search into OpenAleph
    It will only deferred if `OPENALEPH_EXPORT_SEARCH_DEFER=1` (the default)

    Args:
        app: The procrastinate app instance
        context: Additional job context
    """
    if tasks.export_search.defer:
        job = Job(
            queue=tasks.export_search.queue,
            task=tasks.export_search.task,
            payload={"context": ensure_dict(context)},
        )
        job.defer(app, tasks.export_search.get_priority())

export_xref(app, dataset, **context)

Defer a new job to export_xref into OpenAleph It will only deferred if OPENALEPH_EXPORT_XREF_DEFER=1 (the default)

Parameters:

Name Type Description Default
app App

The procrastinate app instance

required
context Any

Additional job context

{}
Source code in openaleph_procrastinate/defer.py
def export_xref(app: App, dataset: str, **context: Any) -> None:
    """
    Defer a new job to export_xref into OpenAleph
    It will only deferred if `OPENALEPH_EXPORT_XREF_DEFER=1` (the default)

    Args:
        app: The procrastinate app instance
        context: Additional job context
    """
    if tasks.export_xref.defer:
        job = DatasetJob(
            dataset=dataset,
            queue=tasks.export_xref.queue,
            task=tasks.export_xref.task,
            payload={"context": ensure_dict(context)},
        )
        job.defer(app, tasks.export_xref.get_priority())

flush_mapping(app, dataset, **context)

Defer a new job to flush_mapping into OpenAleph It will only deferred if OPENALEPH_FLUSH_MAPPING_DEFER=1 (the default)

Parameters:

Name Type Description Default
app App

The procrastinate app instance

required
dataset str

The ftm dataset or collection

required
context Any

Additional job context

{}
Source code in openaleph_procrastinate/defer.py
def flush_mapping(app: App, dataset: str, **context: Any) -> None:
    """
    Defer a new job to flush_mapping into OpenAleph
    It will only deferred if `OPENALEPH_FLUSH_MAPPING_DEFER=1` (the default)

    Args:
        app: The procrastinate app instance
        dataset: The ftm dataset or collection
        context: Additional job context
    """
    if tasks.flush_mapping.defer:
        job = DatasetJob(
            dataset=dataset,
            queue=tasks.flush_mapping.queue,
            task=tasks.flush_mapping.task,
            payload={"context": ensure_dict(context)},
        )
        job.defer(app, tasks.flush_mapping.get_priority())

geocode(app, dataset, entities, **context)

Defer a new job for ftm-geocode It will only deferred if OPENALEPH_GEOCODE_DEFER=1 (the default)

Parameters:

Name Type Description Default
app App

The procrastinate app instance

required
dataset str

The ftm dataset or collection

required
entities Iterable[EntityProxy]

The entities to geocode

required
context Any

Additional job context

{}
Source code in openaleph_procrastinate/defer.py
def geocode(
    app: App, dataset: str, entities: Iterable[EntityProxy], **context: Any
) -> None:
    """
    Defer a new job for `ftm-geocode`
    It will only deferred if `OPENALEPH_GEOCODE_DEFER=1` (the default)

    Args:
        app: The procrastinate app instance
        dataset: The ftm dataset or collection
        entities: The entities to geocode
        context: Additional job context
    """
    if tasks.geocode.defer:
        job = DatasetJob.from_entities(
            dataset=dataset,
            queue=tasks.geocode.queue,
            task=tasks.geocode.task,
            entities=entities,
            **context,
        )
        job.defer(app, tasks.geocode.get_priority())

index(app, dataset, entities, **context)

Defer a new job to index into OpenAleph It will only deferred if OPENALEPH_INDEX_DEFER=1 (the default)

Parameters:

Name Type Description Default
app App

The procrastinate app instance

required
dataset str

The ftm dataset or collection

required
entities Iterable[EntityProxy]

The entities to index

required
context Any

Additional job context

{}
Source code in openaleph_procrastinate/defer.py
def index(
    app: App, dataset: str, entities: Iterable[EntityProxy], **context: Any
) -> None:
    """
    Defer a new job to index into OpenAleph
    It will only deferred if `OPENALEPH_INDEX_DEFER=1` (the default)

    Args:
        app: The procrastinate app instance
        dataset: The ftm dataset or collection
        entities: The entities to index
        context: Additional job context
    """
    if tasks.index.defer:
        job = DatasetJob.from_entities(
            dataset=dataset,
            queue=tasks.index.queue,
            task=tasks.index.task,
            entities=entities,
            dehydrate=True,
            **context,
        )
        job.defer(app, tasks.index.get_priority())

ingest(app, dataset, entities, **context)

Defer a new job for ingest-file. It will only deferred if OPENALEPH_INGEST_DEFER=1 (the default)

Parameters:

Name Type Description Default
app App

The procrastinate app instance

required
dataset str

The ftm dataset or collection

required
entities Iterable[EntityProxy]

The file or directory entities to ingest

required
context Any

Additional job context

{}
Source code in openaleph_procrastinate/defer.py
def ingest(
    app: App, dataset: str, entities: Iterable[EntityProxy], **context: Any
) -> None:
    """
    Defer a new job for `ingest-file`.
    It will only deferred if `OPENALEPH_INGEST_DEFER=1` (the default)

    Args:
        app: The procrastinate app instance
        dataset: The ftm dataset or collection
        entities: The file or directory entities to ingest
        context: Additional job context
    """
    if tasks.ingest.defer:
        job = DatasetJob.from_entities(
            dataset=dataset,
            queue=tasks.ingest.queue,
            task=tasks.ingest.task,
            entities=entities,
            **context,
        )
        job.defer(app, tasks.ingest.get_priority())

load_mapping(app, dataset, **context)

Defer a new job to load_mapping into OpenAleph It will only deferred if OPENALEPH_LOAD_MAPPING_DEFER=1 (the default)

Parameters:

Name Type Description Default
app App

The procrastinate app instance

required
dataset str

The ftm dataset or collection

required
context Any

Additional job context

{}
Source code in openaleph_procrastinate/defer.py
def load_mapping(app: App, dataset: str, **context: Any) -> None:
    """
    Defer a new job to load_mapping into OpenAleph
    It will only deferred if `OPENALEPH_LOAD_MAPPING_DEFER=1` (the default)

    Args:
        app: The procrastinate app instance
        dataset: The ftm dataset or collection
        context: Additional job context
    """
    if tasks.load_mapping.defer:
        job = DatasetJob(
            dataset=dataset,
            queue=tasks.load_mapping.queue,
            task=tasks.load_mapping.task,
            payload={"context": ensure_dict(context)},
        )
        job.defer(app, tasks.load_mapping.get_priority())

prune_entity(app, dataset, **context)

Defer a new job to prune_entity into OpenAleph It will only deferred if OPENALEPH_PRUNE_ENTITY_DEFER=1 (the default)

Parameters:

Name Type Description Default
app App

The procrastinate app instance

required
dataset str

The ftm dataset or collection

required
context Any

Additional job context

{}
Source code in openaleph_procrastinate/defer.py
def prune_entity(app: App, dataset: str, **context: Any) -> None:
    """
    Defer a new job to prune_entity into OpenAleph
    It will only deferred if `OPENALEPH_PRUNE_ENTITY_DEFER=1` (the default)

    Args:
        app: The procrastinate app instance
        dataset: The ftm dataset or collection
        context: Additional job context
    """
    if tasks.prune_entity.defer:
        job = DatasetJob(
            dataset=dataset,
            queue=tasks.prune_entity.queue,
            task=tasks.prune_entity.task,
            payload={"context": ensure_dict(context)},
        )
        job.defer(app, tasks.prune_entity.get_priority())

reindex(app, dataset, **context)

Defer a new job to reindex into OpenAleph It will only deferred if OPENALEPH_REINDEX_DEFER=1 (the default)

Parameters:

Name Type Description Default
app App

The procrastinate app instance

required
dataset str

The ftm dataset or collection

required
context Any

Additional job context

{}
Source code in openaleph_procrastinate/defer.py
def reindex(app: App, dataset: str, **context: Any) -> None:
    """
    Defer a new job to reindex into OpenAleph
    It will only deferred if `OPENALEPH_REINDEX_DEFER=1` (the default)

    Args:
        app: The procrastinate app instance
        dataset: The ftm dataset or collection
        context: Additional job context
    """
    if tasks.reindex.defer:
        job = DatasetJob(
            dataset=dataset,
            queue=tasks.reindex.queue,
            task=tasks.reindex.task,
            payload={"context": ensure_dict(context)},
        )
        job.defer(app, tasks.reindex.get_priority())

resolve_assets(app, dataset, entities, **context)

Defer a new job for ftm-assets It will only deferred if OPENALEPH_ASSETS_DEFER=1 (the default)

Parameters:

Name Type Description Default
app App

The procrastinate app instance

required
dataset str

The ftm dataset or collection

required
entities Iterable[EntityProxy]

The entities to resolve assets for

required
context Any

Additional job context

{}
Source code in openaleph_procrastinate/defer.py
def resolve_assets(
    app: App, dataset: str, entities: Iterable[EntityProxy], **context: Any
) -> None:
    """
    Defer a new job for `ftm-assets`
    It will only deferred if `OPENALEPH_ASSETS_DEFER=1` (the default)

    Args:
        app: The procrastinate app instance
        dataset: The ftm dataset or collection
        entities: The entities to resolve assets for
        context: Additional job context
    """
    if tasks.assets.defer:
        job = DatasetJob.from_entities(
            dataset=dataset,
            queue=tasks.assets.queue,
            task=tasks.assets.task,
            entities=entities,
            **context,
        )
        job.defer(app, tasks.assets.get_priority())

transcribe(app, dataset, entities, **context)

Defer a new job for ftm-transcribe It will only deferred if OPENALEPH_TRANSCRIBE_DEFER=1 (the default)

Parameters:

Name Type Description Default
app App

The procrastinate app instance

required
dataset str

The ftm dataset or collection

required
entities Iterable[EntityProxy]

The file entities to ingest

required
context Any

Additional job context

{}
Source code in openaleph_procrastinate/defer.py
def transcribe(
    app: App, dataset: str, entities: Iterable[EntityProxy], **context: Any
) -> None:
    """
    Defer a new job for `ftm-transcribe`
    It will only deferred if `OPENALEPH_TRANSCRIBE_DEFER=1` (the default)

    Args:
        app: The procrastinate app instance
        dataset: The ftm dataset or collection
        entities: The file entities to ingest
        context: Additional job context
    """
    if tasks.transcribe.defer:
        job = DatasetJob.from_entities(
            dataset=dataset,
            queue=tasks.transcribe.queue,
            task=tasks.transcribe.task,
            entities=entities,
            dehydrate=True,
            **context,
        )
        job.defer(app, tasks.transcribe.get_priority())

update_entity(app, dataset, **context)

Defer a new job to update_entity into OpenAleph It will only deferred if OPENALEPH_UPDATE_ENTITY_DEFER=1 (the default)

Parameters:

Name Type Description Default
app App

The procrastinate app instance

required
dataset str

The ftm dataset or collection

required
context Any

Additional job context

{}
Source code in openaleph_procrastinate/defer.py
def update_entity(app: App, dataset: str, **context: Any) -> None:
    """
    Defer a new job to update_entity into OpenAleph
    It will only deferred if `OPENALEPH_UPDATE_ENTITY_DEFER=1` (the default)

    Args:
        app: The procrastinate app instance
        dataset: The ftm dataset or collection
        context: Additional job context
    """
    if tasks.update_entity.defer:
        job = DatasetJob(
            dataset=dataset,
            queue=tasks.update_entity.queue,
            task=tasks.update_entity.task,
            payload={"context": ensure_dict(context)},
        )
        job.defer(app, tasks.update_entity.get_priority())

xref(app, dataset, **context)

Defer a new job to xref into OpenAleph It will only deferred if OPENALEPH_XREF_DEFER=1 (the default)

Parameters:

Name Type Description Default
app App

The procrastinate app instance

required
dataset str

The ftm dataset or collection

required
context Any

Additional job context

{}
Source code in openaleph_procrastinate/defer.py
def xref(app: App, dataset: str, **context: Any) -> None:
    """
    Defer a new job to xref into OpenAleph
    It will only deferred if `OPENALEPH_XREF_DEFER=1` (the default)

    Args:
        app: The procrastinate app instance
        dataset: The ftm dataset or collection
        context: Additional job context
    """
    if tasks.xref.defer:
        job = DatasetJob(
            dataset=dataset,
            queue=tasks.xref.queue,
            task=tasks.xref.task,
            payload={"context": ensure_dict(context)},
        )
        job.defer(app, tasks.xref.get_priority())