logic
The logic module contains pure stateless transformation functions with no infrastructure dependencies. Functions here take inputs and produce outputs without side effects.
Entity Aggregation
Aggregate statement streams into FollowTheMoney entities:
from ftm_lakehouse.logic import aggregate_statements
from followthemoney.statement.serialize import read_csv_statements
with open("statements.csv") as f:
statements = read_csv_statements(f)
for entity in aggregate_statements(statements, "my_dataset"):
print(f"{entity.id}: {entity.caption}")
ftm_lakehouse.logic.aggregate_statements(stmts, dataset)
Aggregate sorted statements into entities.
Takes a stream of statements sorted by canonical_id and yields StatementEntity objects by grouping consecutive statements with the same canonical_id.
This function is the core entity assembly logic used when exporting entities from the statement store. It expects statements to be pre-sorted by canonical_id for correct grouping.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stmts
|
Statements
|
Iterable of statements, must be sorted by canonical_id |
required |
dataset
|
str
|
Dataset name for the resulting entities |
required |
Yields:
| Type | Description |
|---|---|
StatementEntities
|
StatementEntity for each unique canonical_id |
Example
from ftm_lakehouse.logic import aggregate_statements
from followthemoney.statement.serialize import read_csv_statements
# Read sorted statements from CSV
with open("statements.csv") as f:
statements = read_csv_statements(f)
for entity in aggregate_statements(statements, "my_dataset"):
print(f"{entity.id}: {entity.caption}")
Source code in ftm_lakehouse/logic/entities.py
Mapping Processing
Generate entities from FollowTheMoney mapping configurations:
from ftm_lakehouse.logic import map_entities
from ftm_lakehouse.model.mapping import DatasetMapping
mapping = DatasetMapping(
dataset="my_dataset",
content_hash="abc123...",
queries=[...]
)
for entity in map_entities(mapping, csv_path):
print(f"{entity.schema.name}: {entity.caption}")
ftm_lakehouse.logic.map_entities(mapping, csv_path)
Generate entities from a mapping configuration and source file.
Applies a FollowTheMoney mapping configuration to a CSV/tabular file and yields the resulting entities. Each entity is annotated with:
- A
proofproperty linking to the source file's content hash - An
origincontext identifying the mapping source
This function is the core transformation logic used by DatasetMappings.process(). It handles the iteration over mapping queries and record processing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mapping
|
DatasetMapping
|
The mapping configuration containing query definitions |
required |
csv_path
|
Path
|
Local path to the source CSV/tabular file |
required |
Yields:
| Type | Description |
|---|---|
Entities
|
EntityProxy objects generated from the mapping |
Example
from ftm_lakehouse.logic import map_entities
from ftm_lakehouse.model.mapping import DatasetMapping
mapping = DatasetMapping(
dataset="my_dataset",
content_hash="abc123...",
queries=[...] # FollowTheMoney mapping queries
)
for entity in map_entities(mapping, csv_path):
print(f"{entity.schema.name}: {entity.caption}")
See Also:
- [FollowTheMoney Mappings](https://followthemoney.tech/docs/mappings/)
- `operations.MappingOperation` for high-level mapping workflow
Source code in ftm_lakehouse/logic/mappings.py
Translog-Aware Parquet Operations
Pure DuckDB/PyArrow functions that operate on the main statement table and translog metadata table. Used internally by ParquetStore, TranslogStore, and TranslogAwareLakeStore.
ftm_lakehouse.logic.parquet.translog_aware_sql(compiled_query, dt)
Wrap a compiled SQL query with a CTE that joins the translog.
The CTE joins the main arrow table with the translog to: - Use translog's first_seen/last_seen instead of main table's - Filter out rows where translog.deleted_at IS NOT NULL
Source code in ftm_lakehouse/logic/parquet.py
ftm_lakehouse.logic.parquet.stream_duckdb_translog(q, dt, translog_dt)
Like stream_duckdb but joins with translog for accurate timestamps and soft deletes.
Source code in ftm_lakehouse/logic/parquet.py
ftm_lakehouse.logic.parquet.query_duckdb_translog(q, dt, translog_dt)
Like query_duckdb but joins with translog for accurate timestamps and soft deletes.
Returns (relation, connection) tuple. Caller must hold the connection reference to prevent GC from closing it while the relation is still in use.
Source code in ftm_lakehouse/logic/parquet.py
ftm_lakehouse.logic.parquet.compact_with_translog(dt, translog_dt)
Join main table with translog, returning only live rows with accurate timestamps.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dt
|
DeltaTable
|
Main statement DeltaTable |
required |
translog_dt
|
DeltaTable
|
Translog metadata DeltaTable |
required |
Returns:
| Type | Description |
|---|---|
RecordBatchReader
|
RecordBatchReader of live rows with translog timestamps applied |
Source code in ftm_lakehouse/logic/parquet.py
ftm_lakehouse.logic.parquet.get_deleted_entity_ids(dt, translog_dt)
Get entity IDs that have been soft-deleted via translog.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dt
|
DeltaTable
|
Main statement DeltaTable |
required |
translog_dt
|
DeltaTable
|
Translog metadata DeltaTable |
required |
Returns:
| Type | Description |
|---|---|
set[str]
|
Set of entity_id strings with at least one deleted statement |
Source code in ftm_lakehouse/logic/parquet.py
ftm_lakehouse.logic.parquet.filter_live_translog(translog_dt)
Return only live (non-deleted) translog rows.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
translog_dt
|
DeltaTable
|
Translog metadata DeltaTable |
required |
Returns:
| Type | Description |
|---|---|
RecordBatchReader
|
RecordBatchReader of translog rows where deleted_at IS NULL |
Source code in ftm_lakehouse/logic/parquet.py
Statement Serialization
Pack and unpack statements for efficient storage:
from ftm_lakehouse.logic import pack_statement, unpack_statement
from followthemoney import Statement
# Pack a statement to string
packed = pack_statement(stmt)
# Unpack back to Statement
stmt = unpack_statement(packed)
pack_statement
Pack a Statement into a null-byte joined string for compact storage.
Args:
stmt: A FollowTheMoney Statement object
Returns: Serialized string representation
unpack_statement
Unpack a null-byte joined string back into a Statement.
Args:
data: Serialized statement string frompack_statement
Returns: Reconstructed Statement object