Skip to content

openaleph_procrastinate.settings

DeferSettings

Bases: BaseSettings

Adjust the worker queues and tasks for different stages.

This is useful e.g. for launching a priority queuing setup for a specific dataset:

Example
# ingest service
export OPENALEPH_INGEST_QUEUE=ingest-prio-dataset
export OPENALEPH_ANALYZE_QUEUE=analyze-prio-dataset
ingestors ingest -d prio_dataset ./documents
procrastinate worker -q ingest-prio-dataset --one-shot  # stop worker after complete

# analyze service
procrastinate worker -q analyze-prio-dataset --one-shot  # stop worker after complete
Source code in openaleph_procrastinate/settings.py
class DeferSettings(BaseSettings):
    """
    Adjust the worker queues and tasks for different stages.

    This is useful e.g. for launching a priority queuing setup for a specific dataset:

    Example:
        ```bash
        # ingest service
        export OPENALEPH_INGEST_QUEUE=ingest-prio-dataset
        export OPENALEPH_ANALYZE_QUEUE=analyze-prio-dataset
        ingestors ingest -d prio_dataset ./documents
        procrastinate worker -q ingest-prio-dataset --one-shot  # stop worker after complete

        # analyze service
        procrastinate worker -q analyze-prio-dataset --one-shot  # stop worker after complete
        ```
    """

    model_config = SettingsConfigDict(
        env_prefix="openaleph_",
        env_nested_delimiter="_",
        env_file=".env",
        nested_model_default_partial_update=True,
        extra="ignore",  # other envs in .env file
    )

    ingest: ServiceSettings = ServiceSettings(
        queue="ingest", task="ingestors.tasks.ingest"
    )
    """ingest-file"""

    analyze: ServiceSettings = ServiceSettings(
        queue="analyze", task="ftm_analyze.tasks.analyze"
    )
    """ftm-analyze"""

    transcribe: ServiceSettings = ServiceSettings(
        queue="transcribe", task="ftm_transcribe.tasks.transcribe"
    )
    """ftm-transcribe"""

    geocode: ServiceSettings = ServiceSettings(
        queue="geocode", task="ftm_geocode.tasks.geocode"
    )
    """ftm-geocode"""

    assets: ServiceSettings = ServiceSettings(
        queue="assets", task="ftm_assets.tasks.resolve"
    )
    """ftm-assets"""

    # OpenAleph

    index: ServiceSettings = ServiceSettings(
        queue="openaleph",
        task="aleph.procrastinate.tasks.index_entities",
        min_priority=60,
    )
    """openaleph indexer"""

    reindex: ServiceSettings = ServiceSettings(
        queue="openaleph",
        task="aleph.procrastinate.tasks.reindex_collection",
        min_priority=50,
    )
    """openaleph reindexer"""

    xref: ServiceSettings = ServiceSettings(
        queue="openaleph",
        task="aleph.procrastinate.tasks.xref_collection",
        min_priority=50,
    )
    """openaleph xref"""

    load_mapping: ServiceSettings = ServiceSettings(
        queue="openaleph",
        task="aleph.procrastinate.tasks.load_mapping",
        min_priority=70,
    )
    """openaleph load_mapping"""

    flush_mapping: ServiceSettings = ServiceSettings(
        queue="openaleph",
        task="aleph.procrastinate.tasks.flush_mapping",
        min_priority=40,
    )
    """openaleph flush_mapping"""

    export_search: ServiceSettings = ServiceSettings(
        queue="openaleph",
        task="aleph.procrastinate.tasks.export_search",
        max_priority=50,
    )
    """openaleph export_search"""

    export_xref: ServiceSettings = ServiceSettings(
        queue="openaleph", task="aleph.procrastinate.tasks.export_xref", max_priority=50
    )
    """openaleph export_xref"""

    update_entity: ServiceSettings = ServiceSettings(
        queue="openaleph",
        task="aleph.procrastinate.tasks.update_entity",
        min_priority=80,
    )
    """openaleph update_entity"""

    prune_entity: ServiceSettings = ServiceSettings(
        queue="openaleph",
        task="aleph.procrastinate.tasks.prune_entity",
        min_priority=80,
    )
    """openaleph update_entity"""

analyze = ServiceSettings(queue='analyze', task='ftm_analyze.tasks.analyze') class-attribute instance-attribute

ftm-analyze

assets = ServiceSettings(queue='assets', task='ftm_assets.tasks.resolve') class-attribute instance-attribute

ftm-assets

openaleph export_search

export_xref = ServiceSettings(queue='openaleph', task='aleph.procrastinate.tasks.export_xref', max_priority=50) class-attribute instance-attribute

openaleph export_xref

flush_mapping = ServiceSettings(queue='openaleph', task='aleph.procrastinate.tasks.flush_mapping', min_priority=40) class-attribute instance-attribute

openaleph flush_mapping

geocode = ServiceSettings(queue='geocode', task='ftm_geocode.tasks.geocode') class-attribute instance-attribute

ftm-geocode

index = ServiceSettings(queue='openaleph', task='aleph.procrastinate.tasks.index_entities', min_priority=60) class-attribute instance-attribute

openaleph indexer

ingest = ServiceSettings(queue='ingest', task='ingestors.tasks.ingest') class-attribute instance-attribute

ingest-file

load_mapping = ServiceSettings(queue='openaleph', task='aleph.procrastinate.tasks.load_mapping', min_priority=70) class-attribute instance-attribute

openaleph load_mapping

prune_entity = ServiceSettings(queue='openaleph', task='aleph.procrastinate.tasks.prune_entity', min_priority=80) class-attribute instance-attribute

openaleph update_entity

reindex = ServiceSettings(queue='openaleph', task='aleph.procrastinate.tasks.reindex_collection', min_priority=50) class-attribute instance-attribute

openaleph reindexer

transcribe = ServiceSettings(queue='transcribe', task='ftm_transcribe.tasks.transcribe') class-attribute instance-attribute

ftm-transcribe

update_entity = ServiceSettings(queue='openaleph', task='aleph.procrastinate.tasks.update_entity', min_priority=80) class-attribute instance-attribute

openaleph update_entity

xref = ServiceSettings(queue='openaleph', task='aleph.procrastinate.tasks.xref_collection', min_priority=50) class-attribute instance-attribute

openaleph xref

OpenAlephSettings

Bases: BaseSettings

openaleph_procrastinate settings management using pydantic-settings

Note

All settings can be set via environment variables, prepending OPENALEPH_ (except for those with another alias) via runtime or in a .env file.

Source code in openaleph_procrastinate/settings.py
class OpenAlephSettings(BaseSettings):
    """
    `openaleph_procrastinate` settings management using
    [pydantic-settings](https://docs.pydantic.dev/latest/concepts/pydantic_settings/)

    Note:
        All settings can be set via environment variables, prepending
        `OPENALEPH_` (except for those with another alias) via runtime or in a
        `.env` file.
    """

    model_config = SettingsConfigDict(
        env_prefix="openaleph_",
        env_nested_delimiter="_",
        env_file=".env",
        nested_model_default_partial_update=True,
        extra="ignore",  # other envs in .env file
    )

    instance: str = Field(default="openaleph")
    """Instance identifier"""

    debug: bool = Field(default=False, alias="debug")
    """Debug mode"""

    procrastinate_sync: bool = Field(default=False, alias="procrastinate_sync")
    """Run sync workers (during testing)"""

    db_uri: str = Field(
        default=DEFAULT_DB_URI,
        validation_alias=AliasChoices("openaleph_db_uri", "aleph_database_uri"),
    )
    """OpenAleph database uri"""

    db_pool_size: int = 5
    """Max psql pool size per thread"""

    procrastinate_db_uri: str = Field(
        default=DEFAULT_DB_URI,
        validation_alias=AliasChoices(
            "procrastinate_db_uri", "openaleph_db_uri", "aleph_database_uri"
        ),
    )
    """Procrastinate database uri, falls back to OpenAleph database uri"""

    fragments_uri: str = Field(
        default=DEFAULT_DB_URI,
        validation_alias=AliasChoices("ftm_fragments_uri", "ftm_store_uri"),
    )
    """FollowTheMoney Fragments store uri"""

    @property
    def in_memory_db(self) -> bool:
        return self.procrastinate_db_uri.startswith("memory:")

db_pool_size = 5 class-attribute instance-attribute

Max psql pool size per thread

db_uri = Field(default=DEFAULT_DB_URI, validation_alias=(AliasChoices('openaleph_db_uri', 'aleph_database_uri'))) class-attribute instance-attribute

OpenAleph database uri

debug = Field(default=False, alias='debug') class-attribute instance-attribute

Debug mode

fragments_uri = Field(default=DEFAULT_DB_URI, validation_alias=(AliasChoices('ftm_fragments_uri', 'ftm_store_uri'))) class-attribute instance-attribute

FollowTheMoney Fragments store uri

instance = Field(default='openaleph') class-attribute instance-attribute

Instance identifier

procrastinate_db_uri = Field(default=DEFAULT_DB_URI, validation_alias=(AliasChoices('procrastinate_db_uri', 'openaleph_db_uri', 'aleph_database_uri'))) class-attribute instance-attribute

Procrastinate database uri, falls back to OpenAleph database uri

procrastinate_sync = Field(default=False, alias='procrastinate_sync') class-attribute instance-attribute

Run sync workers (during testing)

ServiceSettings

Bases: BaseSettings

Settings for a specific service, like ingest-file or ftm-analyze

Source code in openaleph_procrastinate/settings.py
class ServiceSettings(BaseSettings):
    """
    Settings for a specific service, like `ingest-file` or `ftm-analyze`
    """

    queue: str
    """queue name"""
    task: str
    """task module path"""
    defer: bool = True
    """enable deferring"""
    max_retries: int = 5
    """Max retries, set to "-1" to enable infinity"""
    min_priority: int = MIN_PRIORITY
    """Minimum priority"""
    max_priority: int = MAX_PRIORITY
    """Maximum priority"""

    @property
    def retries(self) -> int | bool:
        if self.max_retries == -1:
            return True
        return max(0, self.max_retries)

    def get_priority(self, priority: int | None = None) -> int:
        """Calculate a random priority between `min_priority` and
        `max_priority`"""
        min_priority = max(priority or MIN_PRIORITY, self.min_priority)
        max_priority = max(min_priority, self.max_priority)
        return random.randint(min_priority, max_priority)

defer = True class-attribute instance-attribute

enable deferring

max_priority = MAX_PRIORITY class-attribute instance-attribute

Maximum priority

max_retries = 5 class-attribute instance-attribute

Max retries, set to "-1" to enable infinity

min_priority = MIN_PRIORITY class-attribute instance-attribute

Minimum priority

queue instance-attribute

queue name

task instance-attribute

task module path

get_priority(priority=None)

Calculate a random priority between min_priority and max_priority

Source code in openaleph_procrastinate/settings.py
def get_priority(self, priority: int | None = None) -> int:
    """Calculate a random priority between `min_priority` and
    `max_priority`"""
    min_priority = max(priority or MIN_PRIORITY, self.min_priority)
    max_priority = max(min_priority, self.max_priority)
    return random.randint(min_priority, max_priority)