How to create a service
This section assumes you are familiar with the architecture and conventions.
This example is based on ftm-geocode.
Prerequisites
Add openaleph-procrastinate
as a dependency to your project.
pip:
pip install "openaleph-procrastinate @ git+https://github.com/openaleph/openaleph-procrastinate@main"
poetry:
poetry add "openaleph-procrastinate @ git+https://github.com/openaleph/openaleph-procrastinate@main"
Configure database connection
Your service needs to access the procrastinate
task queues in the postgresql database.
- Use the environment variable
OPENALEPH_PROCRASTINATE_DB_URI
which falls back toOPENALEPH_DB_URI
(default:postgresql:///openaleph
). - If your tasks write entities to the followthemoney-store, its store needs to be configured if it differs from the main database:
OPENALEPH_FTM_STORE_URI
which falls back toFTM_STORE_URI
. If it's not set, the main database uri will be used. - If your tasks need access to the servicelayer Archive, configure it properly via the
ARCHIVE_*
env vars.
Creating a task
Within your application, create a python file tasks.py
within the project root. This is by convention to have a standardized way of referring to tasks from another program via a string like <library_name>.tasks.<task_name>
.
tasks.py
- Import the
openaleph_procrastinate
dependencies - Create the app that is able to import the tasks
- Register actual tasks that handle
Jobs
This file within ftm-geocode
allows other workers to defer tasks via the identifier ftm_geocode.tasks.geocode
.
from openaleph_procrastinate.app import make_app
from openaleph_procrastinate.model import DatasetJob
from openaleph_procrastinate.tasks import task
from ftm_geocode.geocode import geocode_proxy
from ftm_geocode.settings import Settings
settings = Settings()
app = make_app(__loader__.name)
@task(app=app)
def geocode(job: DatasetJob):
with job.get_writer() as bulk:
for proxy in geocode_proxy(settings.geocoders, job.get_entities()):
bulk.put(proxy)
Run the workers for this service
Use the built-in procrastinate cli. The app needs to be configured for the environment of this service.
This worker would subscribe to the ftm-geocode
queue:
Defer tasks from another service
Another service that has access to the postgresql database can defer tasks to geocode.
See the Job
model and make sure to properly use the context manager to connect to procrastinate.
from openaleph_procrastinate.app import make_app
from openaleph_procrastinate.model import DatasetJob
app = make_app()
def defer_job(entity):
with app.open():
job = DatasetJob.from_entity(
dataset="my_dataset",
queue="ftm-geocode",
task="ftm_geocode.tasks.geocode",
entity=entity
)
job.defer()
Defer tasks using the cli
openaleph_procrastinate
has a command line interface to defer tasks to any queues and services (that usually live somewhere else). This can be used for local debugging / development but as well could serve as an interface for production deployments.
To defer the Address entities of the dataset de_lobbyregister to the geocoding (using ftmq
for remote loading and filtering):