Skip to content

openaleph_procrastinate.manage

This is temporary and should use the procrastinate Django models at one point in the future

Db

Get a db manager object for the current procrastinate database uri

Source code in openaleph_procrastinate/manage/db.py
class Db:
    """Get a db manager object for the current procrastinate database uri"""

    def __init__(self, uri: str | None = None) -> None:
        self.settings = OpenAlephSettings()
        if self.settings.in_memory_db:
            raise RuntimeError("Can't use in-memory database")
        self.uri = uri or self.settings.procrastinate_db_uri
        self.log = get_logger(__name__, uri=mask_uri(self.uri))

    def configure(self) -> None:
        """Create procrastinate tables and schema (if not exists) and add our
        index optimizations (if not exists)"""
        if self.settings.in_memory_db:
            return
        app = make_app(sync=True)
        with app.open():
            if not app.check_connection():
                self.log.info("Configuring procrastinate database schema ...")
                app.schema_manager.apply_schema()
        self.log.info("Configuring generated fields, indices, and optimizations ...")
        with Took() as t:
            self._execute(sql.GENERATED_FIELDS)
            self._execute(sql.REMOVE_FOREIGN_KEY)
            self._execute(sql.CUSTOM_PRUNE_STALLED_WORKERS)
            self._execute(sql.INDEXES)
            self._execute(sql.OPTIMIZED_FETCH_FUNCTION)
            self.log.info("Configuring done.", took=t.took)

    def iterate_status(
        self,
        dataset: str | None = None,
        batch: str | None = None,
        queue: str | None = None,
        task: str | None = None,
        status: Status | None = None,
        active_only: bool | None = True,
    ) -> Rows:
        """
        Iterate through aggregated job status summary

        Each row is an aggregation over
        `dataset,batch,queue_name,task_name,status` and includes jobs count,
        timestamp first event, timestamp last event

        Args:
            dataset: The dataset to filter for
            batch: The job batch to filter for
            queue: The queue name to filter for
            task: The task name to filter for
            status: The status to filter for
            active_only: Only include "active" datasets (at least 1 job in
                'todo' or 'doing')

        Yields:
            Rows a tuple with the fields in this order:
                dataset, batch, queue_name, task_name, status, jobs count,
                timestamp first event, timestamp last event
        """
        if active_only:
            query = sql.STATUS_SUMMARY_ACTIVE
        else:
            query = sql.STATUS_SUMMARY
        yield from self._execute_iter(
            query,
            dataset=dataset,
            batch=batch,
            queue=queue,
            task=task,
            status=status,
        )

    def iterate_jobs(
        self,
        dataset: str | None = None,
        batch: str | None = None,
        queue: str | None = None,
        task: str | None = None,
        status: Status | None = None,
        min_ts: datetime | None = None,
        max_ts: datetime | None = None,
        flatten_entities: bool | None = False,
    ) -> Jobs:
        """
        Iterate job objects from the database by given criteria.

        Args:
            dataset: The dataset to filter for
            batch: The job batch to filter for
            queue: The queue name to filter for
            task: The task name to filter for
            status: The status to filter for
            min_ts: Start timestamp (earliest event found in `procrastinate_events`)
            max_ts: End timestamp (latest event found in `procrastinate_events`)
            flatten_entities: If true, yield a job for each entity found in the source job

        Yields:
            Iterator of [Job][openaleph_procrastinate.model.Job]
        """

        min_ts = min_ts or datetime(1970, 1, 1)
        max_ts = max_ts or datetime.now()
        params = {
            "dataset": dataset,
            "min_ts": min_ts.isoformat(),
            "max_ts": max_ts.isoformat(),
            "batch": batch,
            "queue": queue,
            "task": task,
            "status": status,
        }
        for id, status_, data in self._execute_iter(sql.ALL_JOBS, **params):
            data["id"] = id
            data["status"] = status_
            job = unpack_job(data)
            if flatten_entities and isinstance(job, DatasetJob):
                has_entities = False
                for entity in job.get_entities():
                    if entity.id:
                        has_entities = True
                        yield EntityJob(**data, entity_id=entity.id)
                if not has_entities:
                    yield job
            else:
                yield job

    def cancel_jobs(
        self,
        dataset: str | None = None,
        batch: str | None = None,
        queue: str | None = None,
        task: str | None = None,
    ) -> None:
        """
        Cancel jobs by given criteria.

        Args:
            dataset: The dataset to filter for
            batch: The job batch to filter for
            queue: The queue name to filter for
            task: The task name to filter for
        """

        self._execute(
            sql.CANCEL_JOBS, dataset=dataset, batch=batch, queue=queue, task=task
        )

    def get_failed_jobs(
        self,
        dataset: str | None = None,
        batch: str | None = None,
        queue: str | None = None,
        task: str | None = None,
    ) -> Rows:
        """
        Get failed jobs by given criteria with fields needed for retrying.

        Args:
            dataset: The dataset to filter for
            batch: The job batch to filter for
            queue: The queue name to filter for
            task: The task name to filter for

        Yields:
            Rows with (id, queue_name, priority, lock) for each failed job
        """
        yield from self._execute_iter(
            sql.GET_FAILED_JOBS,
            dataset=dataset,
            batch=batch,
            queue=queue,
            task=task,
        )

    def get_orphaned_jobs(
        self,
        dataset: str | None = None,
        batch: str | None = None,
        queue: str | None = None,
        task: str | None = None,
    ) -> Rows:
        """
        Get orphaned jobs (status='doing' but worker no longer exists).

        These jobs are stuck because their worker was deleted before the job
        completed. This can happen when the FK constraint is dropped and workers
        are pruned before the custom prune function is installed.

        Args:
            dataset: The dataset to filter for
            batch: The job batch to filter for
            queue: The queue name to filter for
            task: The task name to filter for

        Yields:
            Rows with (id, queue_name, priority, lock) for each orphaned job
        """
        yield from self._execute_iter(
            sql.GET_ORPHANED_JOBS,
            dataset=dataset,
            batch=batch,
            queue=queue,
            task=task,
        )

    def _execute_iter(self, q: LiteralString, **params: str | None) -> Rows:
        with psycopg.connect(self.settings.procrastinate_db_uri) as connection:
            with connection.cursor() as cursor:
                cursor.execute(q, dict(params))
                while rows := cursor.fetchmany(100_000):
                    yield from rows

    def _execute(self, q: LiteralString, **params: str | None) -> None:
        # Use autocommit for DDL (no params) to support multiple statements
        # with $$-quoted strings. Use transactions for DML (with params)
        # to maintain atomicity
        with psycopg.connect(
            self.settings.procrastinate_db_uri, autocommit=not params
        ) as connection:
            with connection.cursor() as cursor:
                # With parameters, must split to avoid prepared statement error
                if params:
                    for query in q.split(";"):
                        query = query.strip()
                        if query:
                            cursor.execute(query, params)
                # Without parameters, can execute all at once (handles $$-quoted strings)
                else:
                    cursor.execute(q)

    def _destroy(self) -> None:
        """Destroy all data (used in tests)"""
        self.log.warning("🔥 Deleting all procrastinate data 🔥")
        for table in (
            "procrastinate_jobs",
            "procrastinate_workers",
            "procrastinate_events",
        ):
            try:
                self._execute(f"TRUNCATE {table} RESTART IDENTITY CASCADE")
            except UndefinedTable:
                pass

cancel_jobs(dataset=None, batch=None, queue=None, task=None)

Cancel jobs by given criteria.

Parameters:

Name Type Description Default
dataset str | None

The dataset to filter for

None
batch str | None

The job batch to filter for

None
queue str | None

The queue name to filter for

None
task str | None

The task name to filter for

None
Source code in openaleph_procrastinate/manage/db.py
def cancel_jobs(
    self,
    dataset: str | None = None,
    batch: str | None = None,
    queue: str | None = None,
    task: str | None = None,
) -> None:
    """
    Cancel jobs by given criteria.

    Args:
        dataset: The dataset to filter for
        batch: The job batch to filter for
        queue: The queue name to filter for
        task: The task name to filter for
    """

    self._execute(
        sql.CANCEL_JOBS, dataset=dataset, batch=batch, queue=queue, task=task
    )

configure()

Create procrastinate tables and schema (if not exists) and add our index optimizations (if not exists)

Source code in openaleph_procrastinate/manage/db.py
def configure(self) -> None:
    """Create procrastinate tables and schema (if not exists) and add our
    index optimizations (if not exists)"""
    if self.settings.in_memory_db:
        return
    app = make_app(sync=True)
    with app.open():
        if not app.check_connection():
            self.log.info("Configuring procrastinate database schema ...")
            app.schema_manager.apply_schema()
    self.log.info("Configuring generated fields, indices, and optimizations ...")
    with Took() as t:
        self._execute(sql.GENERATED_FIELDS)
        self._execute(sql.REMOVE_FOREIGN_KEY)
        self._execute(sql.CUSTOM_PRUNE_STALLED_WORKERS)
        self._execute(sql.INDEXES)
        self._execute(sql.OPTIMIZED_FETCH_FUNCTION)
        self.log.info("Configuring done.", took=t.took)

get_failed_jobs(dataset=None, batch=None, queue=None, task=None)

Get failed jobs by given criteria with fields needed for retrying.

Parameters:

Name Type Description Default
dataset str | None

The dataset to filter for

None
batch str | None

The job batch to filter for

None
queue str | None

The queue name to filter for

None
task str | None

The task name to filter for

None

Yields:

Type Description
Rows

Rows with (id, queue_name, priority, lock) for each failed job

Source code in openaleph_procrastinate/manage/db.py
def get_failed_jobs(
    self,
    dataset: str | None = None,
    batch: str | None = None,
    queue: str | None = None,
    task: str | None = None,
) -> Rows:
    """
    Get failed jobs by given criteria with fields needed for retrying.

    Args:
        dataset: The dataset to filter for
        batch: The job batch to filter for
        queue: The queue name to filter for
        task: The task name to filter for

    Yields:
        Rows with (id, queue_name, priority, lock) for each failed job
    """
    yield from self._execute_iter(
        sql.GET_FAILED_JOBS,
        dataset=dataset,
        batch=batch,
        queue=queue,
        task=task,
    )

get_orphaned_jobs(dataset=None, batch=None, queue=None, task=None)

Get orphaned jobs (status='doing' but worker no longer exists).

These jobs are stuck because their worker was deleted before the job completed. This can happen when the FK constraint is dropped and workers are pruned before the custom prune function is installed.

Parameters:

Name Type Description Default
dataset str | None

The dataset to filter for

None
batch str | None

The job batch to filter for

None
queue str | None

The queue name to filter for

None
task str | None

The task name to filter for

None

Yields:

Type Description
Rows

Rows with (id, queue_name, priority, lock) for each orphaned job

Source code in openaleph_procrastinate/manage/db.py
def get_orphaned_jobs(
    self,
    dataset: str | None = None,
    batch: str | None = None,
    queue: str | None = None,
    task: str | None = None,
) -> Rows:
    """
    Get orphaned jobs (status='doing' but worker no longer exists).

    These jobs are stuck because their worker was deleted before the job
    completed. This can happen when the FK constraint is dropped and workers
    are pruned before the custom prune function is installed.

    Args:
        dataset: The dataset to filter for
        batch: The job batch to filter for
        queue: The queue name to filter for
        task: The task name to filter for

    Yields:
        Rows with (id, queue_name, priority, lock) for each orphaned job
    """
    yield from self._execute_iter(
        sql.GET_ORPHANED_JOBS,
        dataset=dataset,
        batch=batch,
        queue=queue,
        task=task,
    )

iterate_jobs(dataset=None, batch=None, queue=None, task=None, status=None, min_ts=None, max_ts=None, flatten_entities=False)

Iterate job objects from the database by given criteria.

Parameters:

Name Type Description Default
dataset str | None

The dataset to filter for

None
batch str | None

The job batch to filter for

None
queue str | None

The queue name to filter for

None
task str | None

The task name to filter for

None
status Status | None

The status to filter for

None
min_ts datetime | None

Start timestamp (earliest event found in procrastinate_events)

None
max_ts datetime | None

End timestamp (latest event found in procrastinate_events)

None
flatten_entities bool | None

If true, yield a job for each entity found in the source job

False

Yields:

Type Description
Jobs

Iterator of Job

Source code in openaleph_procrastinate/manage/db.py
def iterate_jobs(
    self,
    dataset: str | None = None,
    batch: str | None = None,
    queue: str | None = None,
    task: str | None = None,
    status: Status | None = None,
    min_ts: datetime | None = None,
    max_ts: datetime | None = None,
    flatten_entities: bool | None = False,
) -> Jobs:
    """
    Iterate job objects from the database by given criteria.

    Args:
        dataset: The dataset to filter for
        batch: The job batch to filter for
        queue: The queue name to filter for
        task: The task name to filter for
        status: The status to filter for
        min_ts: Start timestamp (earliest event found in `procrastinate_events`)
        max_ts: End timestamp (latest event found in `procrastinate_events`)
        flatten_entities: If true, yield a job for each entity found in the source job

    Yields:
        Iterator of [Job][openaleph_procrastinate.model.Job]
    """

    min_ts = min_ts or datetime(1970, 1, 1)
    max_ts = max_ts or datetime.now()
    params = {
        "dataset": dataset,
        "min_ts": min_ts.isoformat(),
        "max_ts": max_ts.isoformat(),
        "batch": batch,
        "queue": queue,
        "task": task,
        "status": status,
    }
    for id, status_, data in self._execute_iter(sql.ALL_JOBS, **params):
        data["id"] = id
        data["status"] = status_
        job = unpack_job(data)
        if flatten_entities and isinstance(job, DatasetJob):
            has_entities = False
            for entity in job.get_entities():
                if entity.id:
                    has_entities = True
                    yield EntityJob(**data, entity_id=entity.id)
            if not has_entities:
                yield job
        else:
            yield job

iterate_status(dataset=None, batch=None, queue=None, task=None, status=None, active_only=True)

Iterate through aggregated job status summary

Each row is an aggregation over dataset,batch,queue_name,task_name,status and includes jobs count, timestamp first event, timestamp last event

Parameters:

Name Type Description Default
dataset str | None

The dataset to filter for

None
batch str | None

The job batch to filter for

None
queue str | None

The queue name to filter for

None
task str | None

The task name to filter for

None
status Status | None

The status to filter for

None
active_only bool | None

Only include "active" datasets (at least 1 job in 'todo' or 'doing')

True

Yields:

Type Description
Rows

Rows a tuple with the fields in this order: dataset, batch, queue_name, task_name, status, jobs count, timestamp first event, timestamp last event

Source code in openaleph_procrastinate/manage/db.py
def iterate_status(
    self,
    dataset: str | None = None,
    batch: str | None = None,
    queue: str | None = None,
    task: str | None = None,
    status: Status | None = None,
    active_only: bool | None = True,
) -> Rows:
    """
    Iterate through aggregated job status summary

    Each row is an aggregation over
    `dataset,batch,queue_name,task_name,status` and includes jobs count,
    timestamp first event, timestamp last event

    Args:
        dataset: The dataset to filter for
        batch: The job batch to filter for
        queue: The queue name to filter for
        task: The task name to filter for
        status: The status to filter for
        active_only: Only include "active" datasets (at least 1 job in
            'todo' or 'doing')

    Yields:
        Rows a tuple with the fields in this order:
            dataset, batch, queue_name, task_name, status, jobs count,
            timestamp first event, timestamp last event
    """
    if active_only:
        query = sql.STATUS_SUMMARY_ACTIVE
    else:
        query = sql.STATUS_SUMMARY
    yield from self._execute_iter(
        query,
        dataset=dataset,
        batch=batch,
        queue=queue,
        task=task,
        status=status,
    )

get_db(uri=None)

Get a globally cached db instance

Source code in openaleph_procrastinate/manage/db.py
@cache
def get_db(uri: str | None = None) -> Db:
    """Get a globally cached `db` instance"""
    settings = OpenAlephSettings()
    uri = uri or settings.procrastinate_db_uri
    return Db(uri)