The Job model
openaleph_procrastinate.model
provides two models for defining a Job payload. The models are powered by pydantic.
This allows services to import these models and make sure the payloads are valid.
Jobs have some helper methods attached, including .defer()
for queuing, these should the preferred usage instead of the methods in the helpers
module.
Tasks always receive the instantiated Job
object as their argument:
from openaleph_procrastinate.model import AnyJob # a type alias for Job | DatasetJob
@task(app=app)
def my_task(job: AnyJob) -> AnyJob:
# process things
return job
Job data payload
All jobs need the required data:
job = Job(
queue="<queue-name>",
task="<service>.tasks.<task_name>",
payload={} # actual job payload
)
A DatasetJob
(see below) has the additional property dataset
.
Arbitrary Job
openaleph_procrastinate.model.Job
A generic Job not bound to a dataset. They can be used for management purposes and other tasks not related to a specific dataset (Aleph wording: "Collection").
Dataset Job
openaleph_procrastinate.model.DatasetJob
A concrete Job bound to a dataset via its property dataset
(Aleph wording: "Collection"). This will be the most common used Job model for ingesting files, analyzing and indexing entities.
Entities in the job payload are always an array in entities
key even if it is a job only for 1 Entity.
They have some helper methods to access file objects or entities:
A job for processing one or more entities
job = DatasetJob(
dataset="my_dataset",
queue="index",
task="aleph.tasks.index_proxy",
payload={"entities": [...], "context": {ctx}}
)
There exists a @classmethod
to create a job for an Entity:
job = DatasetJob.from_entity(
dataset="my_dataset",
queue="index",
task="aleph.tasks.index_proxy",
entity=entity # instance of `followthemoney.model.EntityProxy`
)
Multiple entities (for batch processing):
job = DatasetJob.from_entities(
dataset="my_dataset",
queue="index",
task="aleph.tasks.index_proxy",
entities=[...] # instances of `followthemoney.model.EntityProxy`
)
Get entities
Get the entities from the payload:
@task(app=app)
def process_entities(job: DatasetJob):
for entity in job.get_entities():
do_something(entity)
To receive the entities from the followthemoney-store
(to have the most recent version, it might be patched in between by other tasks):
@task(app=app)
def process_entities(job: DatasetJob):
for entity in job.load_entities():
do_something(entity)
Write entities
Write entities or fragments to the store. This writes to the same dataset the original entity(ies) are from.
Danger
This is currently not working for writing entities to OpenAleph. Its FollowTheMoney store uses ftm_collection_<id>
as the table scheme instead of the dataset
property as an identifier. To put entities into OpenAleph, defer a next stage job with aleph.procrastinate.put_entities
as the task.
@task(app=app)
def process_entities(job: DatasetJob):
with job.get_writer() as bulk:
for entity in job.load_entities():
fragment = extract_something(entity)
bulk.add(fragment)
The bulk writer is flushed when leaving the context.
A job for processing file entities from the servicelayer
The entities must have contentHash
properties.
job = DatasetJob(
dataset="my_dataset",
queue="ingest",
task="ingestors.tasks.ingest",
payload={"entities": [...]}
)
Get file-handlers within a task
Get a BinaryIO
context manager that behaves the same like file-like .open()
for each entity and its contentHash
properties:
@task(app=app)
def process_file(job: DatasetJob):
for entity, handler in job.get_file_references():
with handler.open_file() as fh:
some_function(fh, entity=entity)
Under the hood, the file is retrieved from the servicelayer Archive and stored in a local temporary folder. After leaving the context, the file is cleaned up (deleted) locally.
Get temporary file paths within a task
Some procedures require a path instead of a file handler. The returned path is a pathlib.Path
object:
@task(app=app)
def process_file(job: DatasetJob):
for entity, handler in job.get_file_references():
with handler.get_localpath() as path:
subprocess.run(f"some-program -f {path}")
Under the hood, the file is retrieved from the servicelayer Archive and stored in a local temporary folder. After leaving the context, the file is cleaned up (deleted) locally.