Skip to content

openaleph_procrastinate.model

DatasetJob

Bases: Job

A job with arbitrary payload bound to a dataset. The payload always contains an iterable of serialized EntityProxy objects in the entities key. It may contain other payload context data in the context key.

There are helpers for accessing archive files or loading entities.

Source code in openaleph_procrastinate/model.py
class DatasetJob(Job):
    """
    A job with arbitrary payload bound to a `dataset`.
    The payload always contains an iterable of serialized `EntityProxy` objects
    in the `entities` key. It may contain other payload context data in the
    `context` key.

    There are helpers for accessing archive files or loading entities.
    """

    dataset: str

    def get_writer(self: Self) -> ContextManager[BulkLoader]:
        """Get the writer for the dataset of the current entity"""
        return helpers.entity_writer(self.dataset)

    def get_entities(self) -> Generator[EntityProxy, None, None]:
        """
        Get the entities from the payload
        """
        assert "entities" in self.payload, "No entities in payload"
        for data in self.payload["entities"]:
            yield model.get_proxy(data)

    def load_entities(self: Self) -> Generator[EntityProxy, None, None]:
        """Load the entities from the store to refresh it to the latest data"""
        assert "entities" in self.payload, "No entities in payload"
        for data in self.payload["entities"]:
            yield helpers.load_entity(self.dataset, data["id"])

    # Helpers for file jobs that access the servicelayer archive

    def get_file_references(self) -> Generator[EntityFileReference, None, None]:
        """Get file references per entity from this job"""
        for entity in self.get_entities():
            for content_hash in entity.get("contentHash"):
                yield EntityFileReference(
                    dataset=self.dataset, entity=entity, content_hash=content_hash
                )

    # Helpers for creating entity jobs

    @classmethod
    def from_entities(
        cls, dataset: str, queue: str, task: str, entities: Iterable[EntityProxy]
    ) -> Self:
        """Make a job to process entities for a dataset"""
        return cls(
            dataset=dataset,
            queue=queue,
            task=task,
            payload={"entities": [e.to_full_dict() for e in entities]},
        )

    @classmethod
    def from_entity(
        cls, dataset: str, queue: str, task: str, entity: EntityProxy
    ) -> Self:
        """Make a job to process an entity for a dataset"""
        return cls.from_entities(
            dataset=dataset, queue=queue, task=task, entities=[entity]
        )

from_entities(dataset, queue, task, entities) classmethod

Make a job to process entities for a dataset

Source code in openaleph_procrastinate/model.py
@classmethod
def from_entities(
    cls, dataset: str, queue: str, task: str, entities: Iterable[EntityProxy]
) -> Self:
    """Make a job to process entities for a dataset"""
    return cls(
        dataset=dataset,
        queue=queue,
        task=task,
        payload={"entities": [e.to_full_dict() for e in entities]},
    )

from_entity(dataset, queue, task, entity) classmethod

Make a job to process an entity for a dataset

Source code in openaleph_procrastinate/model.py
@classmethod
def from_entity(
    cls, dataset: str, queue: str, task: str, entity: EntityProxy
) -> Self:
    """Make a job to process an entity for a dataset"""
    return cls.from_entities(
        dataset=dataset, queue=queue, task=task, entities=[entity]
    )

get_entities()

Get the entities from the payload

Source code in openaleph_procrastinate/model.py
def get_entities(self) -> Generator[EntityProxy, None, None]:
    """
    Get the entities from the payload
    """
    assert "entities" in self.payload, "No entities in payload"
    for data in self.payload["entities"]:
        yield model.get_proxy(data)

get_file_references()

Get file references per entity from this job

Source code in openaleph_procrastinate/model.py
def get_file_references(self) -> Generator[EntityFileReference, None, None]:
    """Get file references per entity from this job"""
    for entity in self.get_entities():
        for content_hash in entity.get("contentHash"):
            yield EntityFileReference(
                dataset=self.dataset, entity=entity, content_hash=content_hash
            )

get_writer()

Get the writer for the dataset of the current entity

Source code in openaleph_procrastinate/model.py
def get_writer(self: Self) -> ContextManager[BulkLoader]:
    """Get the writer for the dataset of the current entity"""
    return helpers.entity_writer(self.dataset)

load_entities()

Load the entities from the store to refresh it to the latest data

Source code in openaleph_procrastinate/model.py
def load_entities(self: Self) -> Generator[EntityProxy, None, None]:
    """Load the entities from the store to refresh it to the latest data"""
    assert "entities" in self.payload, "No entities in payload"
    for data in self.payload["entities"]:
        yield helpers.load_entity(self.dataset, data["id"])

EntityFileReference

Bases: BaseModel

A file reference (via content_hash) to a servicelayer file from an entity

Source code in openaleph_procrastinate/model.py
class EntityFileReference(BaseModel):
    """
    A file reference (via `content_hash`) to a servicelayer file from an entity
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    dataset: str
    content_hash: str
    entity: EntityProxy

    def open(self: Self) -> ContextManager[BinaryIO]:
        """
        Open the file attached to this job

        !!! danger
            This is not tested.
        """
        return helpers.open_file(self.dataset, self.content_hash)

    def get_localpath(self: Self) -> ContextManager[Path]:
        """
        Get a temporary path for the file attached to this job

        !!! danger
            This is not tested.
        """
        return helpers.get_localpath(self.dataset, self.content_hash)

get_localpath()

Get a temporary path for the file attached to this job

Danger

This is not tested.

Source code in openaleph_procrastinate/model.py
def get_localpath(self: Self) -> ContextManager[Path]:
    """
    Get a temporary path for the file attached to this job

    !!! danger
        This is not tested.
    """
    return helpers.get_localpath(self.dataset, self.content_hash)

open()

Open the file attached to this job

Danger

This is not tested.

Source code in openaleph_procrastinate/model.py
def open(self: Self) -> ContextManager[BinaryIO]:
    """
    Open the file attached to this job

    !!! danger
        This is not tested.
    """
    return helpers.open_file(self.dataset, self.content_hash)

Job

Bases: Stage

A job with arbitrary payload

Source code in openaleph_procrastinate/model.py
class Job(Stage):
    """
    A job with arbitrary payload
    """

    payload: dict[str, Any]
    stages: list[Stage] = Field(default=[])

    @property
    def context(self) -> dict[str, Any]:
        """Get the context from the payload if any"""
        return self.payload.get("context") or {}

    def defer(self: Self, app: App) -> None:
        """Defer this job"""
        data = self.model_dump(mode="json")
        app.configure_task(name=self.task, queue=self.queue).defer(**data)

context property

Get the context from the payload if any

defer(app)

Defer this job

Source code in openaleph_procrastinate/model.py
def defer(self: Self, app: App) -> None:
    """Defer this job"""
    data = self.model_dump(mode="json")
    app.configure_task(name=self.task, queue=self.queue).defer(**data)

Stage

Bases: BaseModel

Define an arbitrary (next) stage for a job

Source code in openaleph_procrastinate/model.py
class Stage(BaseModel):
    """Define an arbitrary (next) stage for a job"""

    # model: str
    queue: str
    task: str

    def make_job(self: Self, job: "AnyJob") -> "AnyJob":
        """Create a new job for this stage from a previous one"""
        job.queue = self.queue
        job.task = self.task
        return job

make_job(job)

Create a new job for this stage from a previous one

Source code in openaleph_procrastinate/model.py
def make_job(self: Self, job: "AnyJob") -> "AnyJob":
    """Create a new job for this stage from a previous one"""
    job.queue = self.queue
    job.task = self.task
    return job