openaleph_procrastinate.settings
DeferSettings
Bases: BaseSettings
Adjust the worker queues and tasks for different stages.
This is useful e.g. for launching a priority queuing setup for a specific dataset:
Example
# ingest service
export OPENALEPH_INGEST_QUEUE=ingest-prio-dataset
export OPENALEPH_ANALYZE_QUEUE=analyze-prio-dataset
ingestors ingest -d prio_dataset ./documents
procrastinate worker -q ingest-prio-dataset --one-shot # stop worker after complete
# analyze service
procrastinate worker -q analyze-prio-dataset --one-shot # stop worker after complete
Source code in openaleph_procrastinate/settings.py
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
|
analyze = ServiceSettings(queue='analyze', task='ftm_analyze.tasks.analyze')
class-attribute
instance-attribute
ftm-analyze
assets = ServiceSettings(queue='assets', task='ftm_assets.tasks.resolve')
class-attribute
instance-attribute
ftm-assets
export_search = ServiceSettings(queue='openaleph', task='aleph.procrastinate.tasks.export_search', max_priority=50)
class-attribute
instance-attribute
openaleph export_search
export_xref = ServiceSettings(queue='openaleph', task='aleph.procrastinate.tasks.export_xref', max_priority=50)
class-attribute
instance-attribute
openaleph export_xref
flush_mapping = ServiceSettings(queue='openaleph', task='aleph.procrastinate.tasks.flush_mapping', min_priority=40)
class-attribute
instance-attribute
openaleph flush_mapping
geocode = ServiceSettings(queue='geocode', task='ftm_geocode.tasks.geocode')
class-attribute
instance-attribute
ftm-geocode
index = ServiceSettings(queue='openaleph', task='aleph.procrastinate.tasks.index_entities', min_priority=60)
class-attribute
instance-attribute
openaleph indexer
ingest = ServiceSettings(queue='ingest', task='ingestors.tasks.ingest')
class-attribute
instance-attribute
ingest-file
load_mapping = ServiceSettings(queue='openaleph', task='aleph.procrastinate.tasks.load_mapping', min_priority=70)
class-attribute
instance-attribute
openaleph load_mapping
prune_entity = ServiceSettings(queue='openaleph', task='aleph.procrastinate.tasks.prune_entity', min_priority=80)
class-attribute
instance-attribute
openaleph update_entity
reindex = ServiceSettings(queue='openaleph', task='aleph.procrastinate.tasks.reindex_collection', min_priority=50)
class-attribute
instance-attribute
openaleph reindexer
transcribe = ServiceSettings(queue='transcribe', task='ftm_transcribe.tasks.transcribe')
class-attribute
instance-attribute
ftm-transcribe
update_entity = ServiceSettings(queue='openaleph', task='aleph.procrastinate.tasks.update_entity', min_priority=80)
class-attribute
instance-attribute
openaleph update_entity
xref = ServiceSettings(queue='openaleph', task='aleph.procrastinate.tasks.xref_collection', min_priority=50)
class-attribute
instance-attribute
openaleph xref
OpenAlephSettings
Bases: BaseSettings
openaleph_procrastinate
settings management using
pydantic-settings
Note
All settings can be set via environment variables, prepending
OPENALEPH_
(except for those with another alias) via runtime or in a
.env
file.
Source code in openaleph_procrastinate/settings.py
db_pool_size = 5
class-attribute
instance-attribute
Max psql pool size per thread
db_uri = Field(default=DEFAULT_DB_URI, validation_alias=(AliasChoices('openaleph_db_uri', 'aleph_database_uri')))
class-attribute
instance-attribute
OpenAleph database uri
debug = Field(default=False, alias='debug')
class-attribute
instance-attribute
Debug mode
fragments_uri = Field(default=DEFAULT_DB_URI, validation_alias=(AliasChoices('ftm_fragments_uri', 'ftm_store_uri')))
class-attribute
instance-attribute
FollowTheMoney Fragments store uri
instance = Field(default='openaleph')
class-attribute
instance-attribute
Instance identifier
procrastinate_db_uri = Field(default=DEFAULT_DB_URI, validation_alias=(AliasChoices('procrastinate_db_uri', 'openaleph_db_uri', 'aleph_database_uri')))
class-attribute
instance-attribute
Procrastinate database uri, falls back to OpenAleph database uri
procrastinate_sync = Field(default=False, alias='procrastinate_sync')
class-attribute
instance-attribute
Run sync workers (during testing)
ServiceSettings
Bases: BaseSettings
Settings for a specific service, like ingest-file
or ftm-analyze
Source code in openaleph_procrastinate/settings.py
defer = True
class-attribute
instance-attribute
enable deferring
max_priority = MAX_PRIORITY
class-attribute
instance-attribute
Maximum priority
max_retries = 5
class-attribute
instance-attribute
Max retries, set to "-1" to enable infinity
min_priority = MIN_PRIORITY
class-attribute
instance-attribute
Minimum priority
queue
instance-attribute
queue name
task
instance-attribute
task module path
get_priority(priority=None)
Calculate a random priority between min_priority
and
max_priority