Skip to content

The Job model

openaleph_procrastinate.model provides two models for defining a Job payload. The models are powered by pydantic.

This allows services to import these models and make sure the payloads are valid.

Jobs have some helper methods attached, including .defer() for queuing, these should the preferred usage instead of the methods in the helpers module.

Tasks always receive the instantiated Job object as their argument:

See further below for deferring a Job to a next processing stage.

from openaleph_procrastinate.model import AnyJob, Defers

@task(app=app)
def my_task(job: AnyJob) -> Defers:
    # process things
    # if defer to a next stage, return an updated job:
    yield next_job
    # or return None (implicit)

Job data payload

All jobs need the required data:

job = Job(
    queue="<queue-name>",
    task="<service>.tasks.<task_name>",
    payload={}  # actual job payload
)

A DatasetJob (see below) has the additional property dataset.

Arbitrary Job

openaleph_procrastinate.model.Job

A generic Job not bound to a dataset. They can be used for management purposes and other tasks not related to a specific dataset (Aleph wording: "Collection").

Dataset Job

openaleph_procrastinate.model.DatasetJob

A concrete Job bound to a dataset via its property dataset (Aleph wording: "Collection"). This will be the most common used Job model for ingesting files, analyzing and indexing entities.

Entities in the job payload are always an array in entities key even if it is a job only for 1 Entity.

They have some helper methods to access file objects or entities:

A job for processing one or more entities

job = DatasetJob(
    dataset="my_dataset",
    queue="index",
    task="aleph.tasks.index_proxy",
    payload={"entities": [...], "context": {ctx}}
)

There exists a @classmethod to create a job for an iterable of Entities:

job = DatasetJob.from_entities(
    dataset="my_dataset",
    queue="index",
    task="aleph.tasks.index_proxy",
    entities=[...]  # instances of `followthemoney.model.EntityProxy`
)

Get entities

Get the entities from the payload:

@task(app=app)
def process_entities(job: DatasetJob):
    for entity in job.get_entities():
        do_something(entity)

To receive the entities from the followthemoney-store (to have the most recent version, because it might be patched in between by other tasks):

@task(app=app)
def process_entities(job: DatasetJob):
    for entity in job.load_entities():
        do_something(entity)

Write entities

Write entities or fragments to the store. This writes to the same dataset the original entity(ies) are from.

@task(app=app)
def process_entities(job: DatasetJob):
    with job.get_writer() as bulk:
        for entity in job.load_entities():
            fragment = extract_something(entity)
            bulk.put(fragment)

The bulk writer is flushed when leaving the context.

A job for processing file entities from the servicelayer

The entities must have contentHash properties.

job = DatasetJob(
    dataset="my_dataset",
    queue="ingest",
    task="ingestors.tasks.ingest",
    payload={"entities": [...]}
)

Get file-handlers within a task

Get a BinaryIO context manager that behaves the same like file-like .open() for each entity and its contentHash properties:

@task(app=app)
def process_file(job: DatasetJob):
    for reference in job.get_file_references():
        with reference.open() as fh:
            some_function(fh, entity=reference.entity)

Under the hood, the file is retrieved from the servicelayer Archive and stored in a local temporary folder. After leaving the context, the file is cleaned up (deleted) locally.

Get temporary file paths within a task

Some procedures require a path instead of a file handler. The returned path is a pathlib.Path object:

@task(app=app)
def process_file(job: DatasetJob):
    for reference in job.get_file_references():
        with handler.get_localpath() as path:
            subprocess.run(f"some-program -f {path}")

Under the hood, the file is retrieved from the servicelayer Archive and stored in a local temporary folder. After leaving the context, the file is cleaned up (deleted) locally.

Defer to next stage

To defer (queue) a job to a next stage after processing, explicitly call the defer method either on the updated or newly created Job object itself or use one of the known defers.

@task(app=app)
def my_task(job: DatasetJob) -> None:
    entities = []
    for entity in job.load_entities():
        result = do_something(entity)
        entities.append(entity)

    # yield a new job to defer
    next_job = DatasetJob.from_entities(
        dataset=job.dataset,
        queue=job.queue,  # use the same queue or another one
        task="another_module.tasks.process",  # reference a task
        entities=entities
    )
    next_job.defer(app=app)

See the full reference