Skip to content

How to create a service

This section assumes you are familiar with the architecture and conventions.

This example is based on ftm-geocode.

Prerequisites

Add openaleph-procrastinate as a dependency to your project.

pip:

pip install "openaleph-procrastinate @ git+https://github.com/openaleph/openaleph-procrastinate@main"

poetry:

poetry add "openaleph-procrastinate @ git+https://github.com/openaleph/openaleph-procrastinate@main"

Configure database connection

Your service needs to access the procrastinate task queues in the postgresql database.

  • Use the environment variable OPENALEPH_PROCRASTINATE_DB_URI which falls back to OPENALEPH_DB_URI (default: postgresql:///openaleph).
  • If your tasks write entities to the followthemoney-store, its store needs to be configured if it differs from the main database: OPENALEPH_FTM_STORE_URI which falls back to FTM_STORE_URI. If it's not set, the main database uri will be used.
  • If your tasks need access to the servicelayer Archive, configure it properly via the ARCHIVE_* env vars.

Creating a task

Within your application, create a python file tasks.py within the project root. This is by convention to have a standardized way of referring to tasks from another program via a string like <library_name>.tasks.<task_name>.

tasks.py

  • Import the openaleph_procrastinate dependencies
  • Create the app that is able to import the tasks
  • Register actual tasks that handle Jobs

This file within ftm-geocode allows other workers to defer tasks via the identifier ftm_geocode.tasks.geocode.

from openaleph_procrastinate.app import make_app
from openaleph_procrastinate.model import DatasetJob
from openaleph_procrastinate.tasks import task

from ftm_geocode.geocode import geocode_proxy
from ftm_geocode.settings import Settings

settings = Settings()
app = make_app(__loader__.name)

@task(app=app)
def geocode(job: DatasetJob):
    with job.get_writer() as bulk:
        for proxy in geocode_proxy(settings.geocoders, job.get_entities()):
            bulk.put(proxy)

Run the workers for this service

Use the built-in procrastinate cli. The app needs to be configured for the environment of this service.

export PROCRASTINATE_APP=ftm_geocode.tasks.app

This worker would subscribe to the ftm-geocode queue:

procrastinate worker -q ftm-geocode

Defer tasks from another service

Another service that has access to the postgresql database can defer tasks to geocode.

See the Job model and make sure to properly use the context manager to connect to procrastinate.

from openaleph_procrastinate.app import make_app
from openaleph_procrastinate.model import DatasetJob

app = make_app()

def defer_job(entity):
    with app.open():
        job = DatasetJob.from_entity(
            dataset="my_dataset",
            queue="ftm-geocode",
            task="ftm_geocode.tasks.geocode",
            entity=entity
        )
        job.defer()

Defer tasks using the cli

openaleph_procrastinate has a command line interface to defer tasks to any queues and services (that usually live somewhere else). This can be used for local debugging / development but as well could serve as an interface for production deployments.

To defer the Address entities of the dataset de_lobbyregister to the geocoding (using ftmq for remote loading and filtering):

ftmq -i https://data.ftm.store/de_lobbyregister/entities.ftm.json -s Address | opal-procrastinate defer-entities -q ftm-geocode -t ftm_geocode.tasks.geocode -d de_lobbyregister