Skip to content

openaleph_procrastinate.defer

Known stages to defer jobs to within the OpenAleph stack.

See Settings for configuring queue names and tasks.

Example
from openaleph_procrastinate import defer

@task(app=app)
def analyze(job: DatasetJob) -> Defers:
    result = analyze_entities(job.load_entities())
    # defer to index stage
    yield defer.index(job.dataset, result)

To disable deferring for a service, use environment variable:

For example, to disable indexing entities after ingestion, start the ingest-file worker with this config: OPENALEPH_INDEX_DEFER=0

analyze(dataset, entities, **context)

Make a new job for ftm-analyze

Parameters:

Name Type Description Default
dataset str

The ftm dataset or collection

required
entities Iterable[EntityProxy]

The entities to analyze

required
context Any

Additional job context

{}
Source code in openaleph_procrastinate/defer.py
@check_defer(enabled=settings.analyze.defer)
def analyze(
    dataset: str, entities: Iterable[EntityProxy], **context: Any
) -> DatasetJob:
    """
    Make a new job for `ftm-analyze`

    Args:
        dataset: The ftm dataset or collection
        entities: The entities to analyze
        context: Additional job context
    """
    return DatasetJob.from_entities(
        dataset=dataset,
        queue=settings.analyze.queue,
        task=settings.analyze.task,
        entities=entities,
        dehydrate=True,
        **context,
    )

geocode(dataset, entities, **context)

Make a new job for ftm-geocode

Parameters:

Name Type Description Default
dataset str

The ftm dataset or collection

required
entities Iterable[EntityProxy]

The entities to geocode

required
context Any

Additional job context

{}
Source code in openaleph_procrastinate/defer.py
@check_defer(enabled=settings.geocode.defer)
def geocode(
    dataset: str, entities: Iterable[EntityProxy], **context: Any
) -> DatasetJob:
    """
    Make a new job for `ftm-geocode`

    Args:
        dataset: The ftm dataset or collection
        entities: The entities to geocode
        context: Additional job context
    """
    return DatasetJob.from_entities(
        dataset=dataset,
        queue=settings.geocode.queue,
        task=settings.geocode.task,
        entities=entities,
        **context,
    )

index(dataset, entities, **context)

Make a new job to index into OpenAleph

Parameters:

Name Type Description Default
dataset str

The ftm dataset or collection

required
entities Iterable[EntityProxy]

The entities to index

required
context Any

Additional job context

{}
Source code in openaleph_procrastinate/defer.py
@check_defer(enabled=settings.index.defer)
def index(dataset: str, entities: Iterable[EntityProxy], **context: Any) -> DatasetJob:
    """
    Make a new job to index into OpenAleph

    Args:
        dataset: The ftm dataset or collection
        entities: The entities to index
        context: Additional job context
    """
    return DatasetJob.from_entities(
        dataset=dataset,
        queue=settings.index.queue,
        task=settings.index.task,
        entities=entities,
        dehydrate=True,
        **context,
    )

ingest(dataset, entities, **context)

Make a new job for ingest-file

Parameters:

Name Type Description Default
dataset str

The ftm dataset or collection

required
entities Iterable[EntityProxy]

The file or directory entities to ingest

required
context Any

Additional job context

{}
Source code in openaleph_procrastinate/defer.py
@check_defer(enabled=settings.ingest.defer)
def ingest(dataset: str, entities: Iterable[EntityProxy], **context: Any) -> DatasetJob:
    """
    Make a new job for `ingest-file`

    Args:
        dataset: The ftm dataset or collection
        entities: The file or directory entities to ingest
        context: Additional job context
    """
    return DatasetJob.from_entities(
        dataset=dataset,
        queue=settings.ingest.queue,
        task=settings.ingest.task,
        entities=entities,
        **context,
    )

resolve_assets(dataset, entities, **context)

Make a new job for ftm-assets

Parameters:

Name Type Description Default
dataset str

The ftm dataset or collection

required
entities Iterable[EntityProxy]

The entities to resolve assets for

required
context Any

Additional job context

{}
Source code in openaleph_procrastinate/defer.py
@check_defer(enabled=settings.assets.defer)
def resolve_assets(
    dataset: str, entities: Iterable[EntityProxy], **context: Any
) -> DatasetJob:
    """
    Make a new job for `ftm-assets`

    Args:
        dataset: The ftm dataset or collection
        entities: The entities to resolve assets for
        context: Additional job context
    """
    return DatasetJob.from_entities(
        dataset=dataset,
        queue=settings.assets.queue,
        task=settings.assets.task,
        entities=entities,
        **context,
    )

transcribe(dataset, entities, **context)

Make a new job for ftm-transcribe

Parameters:

Name Type Description Default
dataset str

The ftm dataset or collection

required
entity

The file entity to ingest

required
context Any

Additional job context

{}
Source code in openaleph_procrastinate/defer.py
@check_defer(enabled=settings.transcribe.defer)
def transcribe(
    dataset: str, entities: Iterable[EntityProxy], **context: Any
) -> DatasetJob:
    """
    Make a new job for `ftm-transcribe`

    Args:
        dataset: The ftm dataset or collection
        entity: The file entity to ingest
        context: Additional job context
    """
    return DatasetJob.from_entities(
        dataset=dataset,
        queue=settings.transcribe.queue,
        task=settings.transcribe.task,
        entities=entities,
        **context,
    )