Skip to content

openaleph_procrastinate.model

DatasetJob

Bases: Job

A job with arbitrary payload bound to a dataset. The payload always contains an iterable of serialized EntityProxy objects in the entities key. It may contain other payload context data in the context key.

There are helpers for accessing archive files or loading entities.

Source code in openaleph_procrastinate/model.py
class DatasetJob(Job):
    """
    A job with arbitrary payload bound to a `dataset`.
    The payload always contains an iterable of serialized `EntityProxy` objects
    in the `entities` key. It may contain other payload context data in the
    `context` key.

    There are helpers for accessing archive files or loading entities.
    """

    dataset: str

    @property
    def log(self) -> BoundLogger:
        return get_logger(
            name=f"openaleph.job.{self.dataset}",
            dataset=self.dataset,
            queue=self.queue,
            task=self.task,
            job_id=self.job_id,
        )

    def get_writer(self: Self) -> ContextManager[BulkLoader]:
        """Get the writer for the dataset of the current job"""
        return helpers.entity_writer(self.dataset)

    def get_entities(self) -> Generator[EntityProxy, None, None]:
        """
        Get the entities from the payload
        """
        assert "entities" in self.payload, "No entities in payload"
        for data in self.payload["entities"]:
            yield model.get_proxy(data)

    def load_entities(self: Self) -> Generator[EntityProxy, None, None]:
        """Load the entities from the store to refresh it to the latest data"""
        assert "entities" in self.payload, "No entities in payload"
        for data in self.payload["entities"]:
            yield helpers.load_entity(self.dataset, data["id"])

    # Helpers for file jobs that access the servicelayer archive

    def get_file_references(self) -> Generator[EntityFileReference, None, None]:
        """
        Get file references per entity from this job

        Example:
            ```python
            # process temporary file paths
            for reference in job.get_file_references():
                with reference.get_local_path() as path:
                    subprocess.run(["command", "-i", str(path)])
                # temporary path will be cleaned up when leaving context

            # process temporary file handlers
            for reference in job.get_file_references():
                with reference.open() as handler:
                    do_something(handler.read())
                # temporary path will be cleaned up when leaving context
            ```

        Yields:
            The file references
        """
        for entity in self.get_entities():
            for content_hash in entity.get("contentHash", quiet=True):
                yield EntityFileReference(
                    dataset=self.dataset, entity=entity, content_hash=content_hash
                )

    @classmethod
    def from_entities(
        cls,
        dataset: str,
        queue: str,
        task: str,
        entities: Iterable[EntityProxy],
        dehydrate: bool | None = False,
        **context: Any,
    ) -> Self:
        """
        Make a job to process entities for a dataset

        Args:
            dataset: Name of the dataset
            queue: Name of the queue
            task: Python module path of the task
            entities: Entities
            dehydrate: Reduce entity payload to only a reference (tasks should
                re-fetch these entities from the store if they need more data)
            context: Job context
        """
        if dehydrate:
            entities = (make_checksum_entity(e, quiet=True) for e in entities)
        return cls(
            dataset=dataset,
            queue=queue,
            task=task,
            payload={
                "entities": [e.to_dict() for e in entities],
                "context": ensure_dict(context),
            },
        )

from_entities(dataset, queue, task, entities, dehydrate=False, **context) classmethod

Make a job to process entities for a dataset

Parameters:

Name Type Description Default
dataset str

Name of the dataset

required
queue str

Name of the queue

required
task str

Python module path of the task

required
entities Iterable[EntityProxy]

Entities

required
dehydrate bool | None

Reduce entity payload to only a reference (tasks should re-fetch these entities from the store if they need more data)

False
context Any

Job context

{}
Source code in openaleph_procrastinate/model.py
@classmethod
def from_entities(
    cls,
    dataset: str,
    queue: str,
    task: str,
    entities: Iterable[EntityProxy],
    dehydrate: bool | None = False,
    **context: Any,
) -> Self:
    """
    Make a job to process entities for a dataset

    Args:
        dataset: Name of the dataset
        queue: Name of the queue
        task: Python module path of the task
        entities: Entities
        dehydrate: Reduce entity payload to only a reference (tasks should
            re-fetch these entities from the store if they need more data)
        context: Job context
    """
    if dehydrate:
        entities = (make_checksum_entity(e, quiet=True) for e in entities)
    return cls(
        dataset=dataset,
        queue=queue,
        task=task,
        payload={
            "entities": [e.to_dict() for e in entities],
            "context": ensure_dict(context),
        },
    )

get_entities()

Get the entities from the payload

Source code in openaleph_procrastinate/model.py
def get_entities(self) -> Generator[EntityProxy, None, None]:
    """
    Get the entities from the payload
    """
    assert "entities" in self.payload, "No entities in payload"
    for data in self.payload["entities"]:
        yield model.get_proxy(data)

get_file_references()

Get file references per entity from this job

Example
# process temporary file paths
for reference in job.get_file_references():
    with reference.get_local_path() as path:
        subprocess.run(["command", "-i", str(path)])
    # temporary path will be cleaned up when leaving context

# process temporary file handlers
for reference in job.get_file_references():
    with reference.open() as handler:
        do_something(handler.read())
    # temporary path will be cleaned up when leaving context

Yields:

Type Description
EntityFileReference

The file references

Source code in openaleph_procrastinate/model.py
def get_file_references(self) -> Generator[EntityFileReference, None, None]:
    """
    Get file references per entity from this job

    Example:
        ```python
        # process temporary file paths
        for reference in job.get_file_references():
            with reference.get_local_path() as path:
                subprocess.run(["command", "-i", str(path)])
            # temporary path will be cleaned up when leaving context

        # process temporary file handlers
        for reference in job.get_file_references():
            with reference.open() as handler:
                do_something(handler.read())
            # temporary path will be cleaned up when leaving context
        ```

    Yields:
        The file references
    """
    for entity in self.get_entities():
        for content_hash in entity.get("contentHash", quiet=True):
            yield EntityFileReference(
                dataset=self.dataset, entity=entity, content_hash=content_hash
            )

get_writer()

Get the writer for the dataset of the current job

Source code in openaleph_procrastinate/model.py
def get_writer(self: Self) -> ContextManager[BulkLoader]:
    """Get the writer for the dataset of the current job"""
    return helpers.entity_writer(self.dataset)

load_entities()

Load the entities from the store to refresh it to the latest data

Source code in openaleph_procrastinate/model.py
def load_entities(self: Self) -> Generator[EntityProxy, None, None]:
    """Load the entities from the store to refresh it to the latest data"""
    assert "entities" in self.payload, "No entities in payload"
    for data in self.payload["entities"]:
        yield helpers.load_entity(self.dataset, data["id"])

EntityFileReference

Bases: BaseModel

A file reference (via content_hash) to a servicelayer file from an entity

Source code in openaleph_procrastinate/model.py
class EntityFileReference(BaseModel):
    """
    A file reference (via `content_hash`) to a servicelayer file from an entity
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    dataset: str
    content_hash: str
    entity: EntityProxy

    def open(self: Self) -> ContextManager[VirtualIO]:
        """
        Open the file attached to this job
        """
        return helpers.open_file(self.dataset, self.content_hash)

    def get_localpath(self: Self) -> ContextManager[Path]:
        """
        Get a temporary path for the file attached to this job
        """
        return helpers.get_localpath(self.dataset, self.content_hash)

get_localpath()

Get a temporary path for the file attached to this job

Source code in openaleph_procrastinate/model.py
def get_localpath(self: Self) -> ContextManager[Path]:
    """
    Get a temporary path for the file attached to this job
    """
    return helpers.get_localpath(self.dataset, self.content_hash)

open()

Open the file attached to this job

Source code in openaleph_procrastinate/model.py
def open(self: Self) -> ContextManager[VirtualIO]:
    """
    Open the file attached to this job
    """
    return helpers.open_file(self.dataset, self.content_hash)

Job

Bases: BaseModel

A job with arbitrary payload

Source code in openaleph_procrastinate/model.py
class Job(BaseModel):
    """
    A job with arbitrary payload
    """

    queue: str
    task: str
    payload: dict[str, Any]

    @property
    def context(self) -> dict[str, Any]:
        """Get the context from the payload if any"""
        return ensure_dict(self.payload.get("context")) or {}

    @property
    def job_id(self) -> str | None:
        """Get the job_id if it is stored in the context"""
        return self.context.get("job_id")

    @property
    def log(self) -> BoundLogger:
        return get_logger(
            name="openaleph.job", queue=self.queue, task=self.task, job_id=self.job_id
        )

    def defer(self: Self, app: App) -> None:
        """Defer this job"""
        self.log.debug("Deferring ...", payload=self.payload)
        data = self.model_dump(mode="json")
        app.configure_task(name=self.task, queue=self.queue).defer(**data)

context property

Get the context from the payload if any

job_id property

Get the job_id if it is stored in the context

defer(app)

Defer this job

Source code in openaleph_procrastinate/model.py
def defer(self: Self, app: App) -> None:
    """Defer this job"""
    self.log.debug("Deferring ...", payload=self.payload)
    data = self.model_dump(mode="json")
    app.configure_task(name=self.task, queue=self.queue).defer(**data)