Layer 4: Operation
Multi-step workflow operations that coordinate across repositories.
Base Classes
ftm_lakehouse.operation.base.DatasetJobOperation
Bases: LakehouseApiMixin, Generic[DJ]
A (long-running) operation for a specific dataset that updates tags and checks dependencies for freshness to be able to skip this operation. The job result is stored after successful run.
Subclasses can either set class attributes target and dependencies,
or override get_target() and get_dependencies() for dynamic values.
Source code in ftm_lakehouse/operation/base.py
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | |
from_job(job, dataset)
classmethod
Create an operation instance from a job and Dataset.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
DJ
|
The job model instance |
required |
dataset
|
Dataset
|
The Dataset providing repositories and storage |
required |
Returns:
| Type | Description |
|---|---|
Self
|
Configured operation instance |
Source code in ftm_lakehouse/operation/base.py
get_dependencies()
get_target()
run(force=False, *args, **kwargs)
Execute the handle function, force to run it regardless of freshness dependencies
CrawlOperation
Batch file ingestion from a source location.
ftm_lakehouse.operation.crawl.CrawlJob
Bases: DatasetJobModel
Job model for crawl operations.
Tracks the state and configuration of a crawl job.
Attributes:
| Name | Type | Description |
|---|---|---|
uri |
Uri
|
Source location URI to crawl |
prefix |
str | None
|
Include only keys with this prefix |
exclude_prefix |
str | None
|
Exclude keys with this prefix |
glob |
str | None
|
Include only keys matching this glob pattern |
exclude_glob |
str | None
|
Exclude keys matching this glob pattern |
Source code in ftm_lakehouse/operation/crawl.py
ftm_lakehouse.operation.CrawlOperation
Bases: DatasetJobOperation[CrawlJob]
Crawl workflow that archives files and creates entities.
Iterates through files in a source store, archives them to the file repository, and creates corresponding entities in the entities repository.
Example
Source code in ftm_lakehouse/operation/crawl.py
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 | |
get_uris()
Generate file uris to crawl.
Applies prefix, glob, and exclude filters to the source store.
Yields:
| Type | Description |
|---|---|
str
|
File uris to be crawled |
Source code in ftm_lakehouse/operation/crawl.py
handle_crawl(uri, run)
Handle a single crawl task.
Archives the file and creates a corresponding entity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
uri
|
str
|
File uri to crawl |
required |
run
|
JobRun[CrawlJob]
|
Current job run context |
required |
Returns:
| Type | Description |
|---|---|
datetime
|
Timestamp when the task was processed |
Source code in ftm_lakehouse/operation/crawl.py
Export Operations
ExportStatementsOperation
Export parquet store to exports/statements.csv.
ftm_lakehouse.operation.export.ExportStatementsJob
ftm_lakehouse.operation.ExportStatementsOperation
Bases: BaseExportOperation[ExportStatementsJob]
Export parquet store to statements.csv. Checks if journal needs to be flushed first. Skips if the last export is newer then last statements update.
Source code in ftm_lakehouse/operation/export.py
ExportEntitiesOperation
Export parquet store to entities.ftm.json.
ftm_lakehouse.operation.export.ExportEntitiesJob
ftm_lakehouse.operation.ExportEntitiesOperation
Bases: BaseExportOperation[ExportEntitiesJob]
Export parquet store to entities.ftm.json. Checks if journal needs to be flushed first. Skips if the last export is newer then last statements update.
Source code in ftm_lakehouse/operation/export.py
ExportStatisticsOperation
Export statistics to exports/statistics.json.
ftm_lakehouse.operation.export.ExportStatisticsJob
ftm_lakehouse.operation.ExportStatisticsOperation
Bases: BaseExportOperation[ExportStatisticsJob]
Export parquet store statistics to statistics.json. Checks if journal needs to be flushed first. Skips if the last export is newer then last statements update.
Source code in ftm_lakehouse/operation/export.py
ExportDocumentsOperation
Export document metadata to exports/documents.csv.
ftm_lakehouse.operation.export.ExportDocumentsJob
Bases: BaseExportJob
Source code in ftm_lakehouse/operation/export.py
ftm_lakehouse.operation.ExportDocumentsOperation
Bases: BaseExportOperation[ExportDocumentsJob]
Export file metadata to documents.csv. Checks if journal needs to be flushed first. Skips if the last export is newer then last statements update.
Source code in ftm_lakehouse/operation/export.py
ExportIndexOperation
Export index.json with optional resources.
ftm_lakehouse.operation.export.ExportIndexJob
ftm_lakehouse.operation.ExportIndexOperation
Bases: BaseExportOperation[ExportIndexJob]
Export index.json, optionally including resources, therefore these targets need to be existing.
Source code in ftm_lakehouse/operation/export.py
MappingOperation
Process CSV-to-entity mapping configurations.
ftm_lakehouse.operation.mapping.MappingJob
ftm_lakehouse.operation.MappingOperation
Bases: DatasetJobOperation[MappingJob]
Mapping workflow that transforms a CSV file into entities.
Processes a single archived CSV file (identified by content_hash) using its mapping configuration to generate FollowTheMoney entities, which are written to the entity repository.
Example
Source code in ftm_lakehouse/operation/mapping.py
handle(run, *args, **kwargs)
Process the mapping configuration and store generated entities.
Skips processing if the mapping output is already up-to-date relative to the mapping config.
Source code in ftm_lakehouse/operation/mapping.py
Maintenance Operations
Three independent async operations on the parquet statement store. All three acquire the dataset-wide write fence (.LOCK).
CompactOperation
Bin-pack small parquet files within each (shard, bucket, origin) partition (Delta OPTIMIZE compact). Cheap; does not change row contents.
ftm_lakehouse.operation.maintenance.CompactJob
Bases: DatasetJobModel
ftm_lakehouse.operation.CompactOperation
Bases: DatasetJobOperation[CompactJob]
Bin-pack small parquet files (Delta OPTIMIZE compact).
Cheap maintenance – only rewrites small files into larger ones; does not
dedupe rows or drop tombstones (use MergeOperation for that).
Source code in ftm_lakehouse/operation/maintenance.py
MergeOperation
Per-partition rewrite that collapses duplicates (latest last_seen per id), folds first_seen to the min, and drops tombstones older than the grace cutoff (LAKEHOUSE_GRACE_PERIOD_DAYS).
ftm_lakehouse.operation.maintenance.MergeJob
Bases: DatasetJobModel
ftm_lakehouse.operation.MergeOperation
Bases: DatasetJobOperation[MergeJob]
Collapse duplicates and reap expired tombstones, partition by partition.
For each (shard, bucket, origin) partition: keep the most-recent row
per statement id, fold first_seen down to the minimum, drop tombstones
older than LAKEHOUSE_GRACE_PERIOD_DAYS.
Source code in ftm_lakehouse/operation/maintenance.py
VacuumOperation
Delete obsolete parquet files no longer referenced by the Delta log.
ftm_lakehouse.operation.maintenance.VacuumJob
ftm_lakehouse.operation.VacuumOperation
Bases: DatasetJobOperation[VacuumJob]
Delete obsolete parquet files no longer referenced by the Delta log.
Source code in ftm_lakehouse/operation/maintenance.py
MakeOperation
Full workflow: flush journal + all exports.
ftm_lakehouse.operation.make.MakeJob
Bases: DatasetJobModel
ftm_lakehouse.operation.MakeOperation
Bases: DatasetJobOperation[MakeJob]
Source code in ftm_lakehouse/operation/make.py
DownloadArchiveOperation
Export archive files to their original paths.
ftm_lakehouse.operation.download.DownloadArchiveJob
ftm_lakehouse.operation.DownloadArchiveOperation
Bases: DatasetJobOperation[DownloadArchiveJob]
Download the archive files to a target transforming into nice paths based on exported documents.csv