Layer 4: Operation
Multi-step workflow operations that coordinate across repositories.
Base Classes
ftm_lakehouse.operation.base.DatasetJobOperation
Bases: LakehouseApiMixin, Generic[DJ]
A (long-running) operation for a specific dataset that updates tags and checks dependencies for freshness to be able to skip this operation. The job result is stored after successful run.
Subclasses can either set class attributes target and dependencies,
or override get_target() and get_dependencies() for dynamic values.
Source code in ftm_lakehouse/operation/base.py
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | |
from_job(job, dataset)
classmethod
Create an operation instance from a job and Dataset.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
DJ
|
The job model instance |
required |
dataset
|
Dataset
|
The Dataset providing repositories and storage |
required |
Returns:
| Type | Description |
|---|---|
Self
|
Configured operation instance |
Source code in ftm_lakehouse/operation/base.py
get_dependencies()
get_target()
run(force=False, *args, **kwargs)
Execute the handle function, force to run it regardless of freshness dependencies
Source code in ftm_lakehouse/operation/base.py
CrawlOperation
Batch file ingestion from a source location.
ftm_lakehouse.operation.crawl.CrawlJob
Bases: DatasetJobModel
Job model for crawl operations.
Tracks the state and configuration of a crawl job.
Attributes:
| Name | Type | Description |
|---|---|---|
uri |
Uri
|
Source location URI to crawl |
prefix |
str | None
|
Include only keys with this prefix |
exclude_prefix |
str | None
|
Exclude keys with this prefix |
glob |
str | None
|
Include only keys matching this glob pattern |
exclude_glob |
str | None
|
Exclude keys matching this glob pattern |
Source code in ftm_lakehouse/operation/crawl.py
ftm_lakehouse.operation.CrawlOperation
Bases: DatasetJobOperation[CrawlJob]
Crawl workflow that archives files and creates entities.
Iterates through files in a source store, archives them to the file repository, and creates corresponding entities in the entities repository.
Example
Source code in ftm_lakehouse/operation/crawl.py
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 | |
get_uris()
Generate file uris to crawl.
Applies prefix, glob, and exclude filters to the source store.
Yields:
| Type | Description |
|---|---|
str
|
File uris to be crawled |
Source code in ftm_lakehouse/operation/crawl.py
handle_crawl(uri, run)
Handle a single crawl task.
Archives the file and creates a corresponding entity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
uri
|
str
|
File uri to crawl |
required |
run
|
JobRun[CrawlJob]
|
Current job run context |
required |
Returns:
| Type | Description |
|---|---|
datetime
|
Timestamp when the task was processed |
Source code in ftm_lakehouse/operation/crawl.py
Export Operations
ExportStatementsOperation
Export parquet store to exports/statements.csv.
ftm_lakehouse.operation.export.ExportStatementsJob
ftm_lakehouse.operation.ExportStatementsOperation
Bases: BaseExportOperation[ExportStatementsJob]
Export parquet store to statements.csv. Checks if journal needs to be flushed first. Skips if the last export is newer then last statements update.
Source code in ftm_lakehouse/operation/export.py
ExportEntitiesOperation
Export parquet store to entities.ftm.json.
ftm_lakehouse.operation.export.ExportEntitiesJob
ftm_lakehouse.operation.ExportEntitiesOperation
Bases: BaseExportOperation[ExportEntitiesJob]
Export parquet store to entities.ftm.json. Checks if journal needs to be flushed first. Skips if the last export is newer then last statements update.
Source code in ftm_lakehouse/operation/export.py
ExportStatisticsOperation
Export statistics to exports/statistics.json.
ftm_lakehouse.operation.export.ExportStatisticsJob
ftm_lakehouse.operation.ExportStatisticsOperation
Bases: BaseExportOperation[ExportStatisticsJob]
Export parquet store statistics to statistics.json. Checks if journal needs to be flushed first. Skips if the last export is newer then last statements update.
Source code in ftm_lakehouse/operation/export.py
ExportDocumentsOperation
Export document metadata to exports/documents.csv.
ftm_lakehouse.operation.export.ExportDocumentsJob
Bases: BaseExportJob
Source code in ftm_lakehouse/operation/export.py
ftm_lakehouse.operation.ExportDocumentsOperation
Bases: BaseExportOperation[ExportDocumentsJob]
Export file metadata to documents.csv. Checks if journal needs to be flushed first. Skips if the last export is newer then last statements update.
Source code in ftm_lakehouse/operation/export.py
ExportIndexOperation
Export index.json with optional resources.
ftm_lakehouse.operation.export.ExportIndexJob
ftm_lakehouse.operation.ExportIndexOperation
Bases: BaseExportOperation[ExportIndexJob]
Export index.json, optionally including resources, therefore these targets need to be existing.
Source code in ftm_lakehouse/operation/export.py
MappingOperation
Process CSV-to-entity mapping configurations.
ftm_lakehouse.operation.mapping.MappingJob
ftm_lakehouse.operation.MappingOperation
Bases: DatasetJobOperation[MappingJob]
Mapping workflow that transforms a CSV file into entities.
Processes a single archived CSV file (identified by content_hash) using its mapping configuration to generate FollowTheMoney entities, which are written to the entity repository.
Example
Source code in ftm_lakehouse/operation/mapping.py
handle(run, *args, **kwargs)
Process the mapping configuration and store generated entities.
Skips processing if the mapping output is already up-to-date relative to the mapping config.
Source code in ftm_lakehouse/operation/mapping.py
OptimizeOperation
Compact Delta Lake parquet files and optionally apply translog to main table.
ftm_lakehouse.operation.optimize.OptimizeJob
ftm_lakehouse.operation.OptimizeOperation
Bases: DatasetJobOperation[OptimizeJob]
Optimize the parquet delta like with optional vacuum (purge of old files). The optimization can be scoped to a bucket and/or an origin. For instance, after a crawl operation, only optimizing origin=crawl is feasible.
When compact=True, performs full compaction: dedup, remove tombstones, rewrite the table, Z_ORDER optimize, and vacuum. This is a heavier operation than standard optimize but produces a clean table.
Depending on the size of the dataset, this can be a very long running operation that may require some local memory and tmp disk storage.
Source code in ftm_lakehouse/operation/optimize.py
MakeOperation
Full workflow: flush journal + all exports.
ftm_lakehouse.operation.make.MakeJob
Bases: DatasetJobModel
ftm_lakehouse.operation.MakeOperation
Bases: DatasetJobOperation[MakeJob]
Source code in ftm_lakehouse/operation/make.py
RecreateOperation
Repair corrupted datasets from exported files.
ftm_lakehouse.operation.recreate.RecreateJob
Bases: DatasetJobModel
Job model for recreate operation.
Source code in ftm_lakehouse/operation/recreate.py
ftm_lakehouse.operation.recreate.RecreateOperation
Bases: DatasetJobOperation[RecreateJob]
Recreate a corrupted dataset by rebuilding the parquet store from exports.
This operation repairs corrupted lakehouse datasets by: 1. Clearing the statement store (parquet) and journal 2. Re-importing entities/statements from the most recent export
The source for re-import is selected based on tag timestamps:
- If entities.ftm.json is newer, import entities
- If statements.csv is newer, import statements
- Can be forced to use a specific source via the job's source field
Warning: This operation is destructive - it will delete all existing statement data before re-importing from exports.
Source code in ftm_lakehouse/operation/recreate.py
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 | |
DownloadArchiveOperation
Export archive files to their original paths.
ftm_lakehouse.operation.download.DownloadArchiveJob
ftm_lakehouse.operation.DownloadArchiveOperation
Bases: DatasetJobOperation[DownloadArchiveJob]
Download the archive files to a target transforming into nice paths based on exported documents.csv