logic
The logic module contains pure, stateless transformation functions with no infrastructure dependencies. Functions here take inputs and produce outputs without side effects.
Entity Aggregation
Aggregate a stream of statement dicts into FollowTheMoney entity dicts:
from ftm_lakehouse.logic import aggregate_unsafe
for entity in aggregate_unsafe(statement_dicts, "my_dataset"):
print(f"{entity['id']}: {entity['caption']}")
aggregate_unsafe assumes the input is pre-sorted by canonical_id – the parquet store guarantees this for its queries.
ftm_lakehouse.logic.aggregate_unsafe(data, dataset=None)
Aggregate statement dicts (e.g. from DuckDB rows) to entity payloads.
Completely circumvents the dict -> Statement -> StatementEntity -> dict Python path, but therefore has no validation checks. Input must be sorted by canonical_id.
Source code in ftm_lakehouse/logic/entities/aggregate.py
Mapping Processing
Generate entities from FollowTheMoney mapping configurations:
from ftm_lakehouse.logic import map_entities
from ftm_lakehouse.model.mapping import DatasetMapping
mapping = DatasetMapping(
dataset="my_dataset",
content_hash="abc123...",
queries=[...]
)
for entity in map_entities(mapping, csv_path):
print(f"{entity.schema.name}: {entity.caption}")
ftm_lakehouse.logic.map_entities(mapping, csv_path)
Generate entities from a mapping configuration and source file.
Applies a FollowTheMoney mapping configuration to a CSV/tabular file and yields the resulting entities. Each entity is annotated with:
- A
proofproperty linking to the source file's content hash - An
origincontext identifying the mapping source
This function is the core transformation logic used by DatasetMappings.process(). It handles the iteration over mapping queries and record processing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mapping
|
DatasetMapping
|
The mapping configuration containing query definitions |
required |
csv_path
|
Path
|
Local path to the source CSV/tabular file |
required |
Yields:
| Type | Description |
|---|---|
Entities
|
EntityProxy objects generated from the mapping |
Example
from ftm_lakehouse.logic import map_entities
from ftm_lakehouse.model.mapping import DatasetMapping
mapping = DatasetMapping(
dataset="my_dataset",
content_hash="abc123...",
queries=[...] # FollowTheMoney mapping queries
)
for entity in map_entities(mapping, csv_path):
print(f"{entity.schema.name}: {entity.caption}")
See Also:
- [FollowTheMoney Mappings](https://followthemoney.tech/docs/mappings/)
- `operations.MappingOperation` for high-level mapping workflow
Source code in ftm_lakehouse/logic/mappings.py
Parquet helpers
DuckDB connection / view registration and the merge-query builder used by ParquetStore.
ftm_lakehouse.logic.parquet.make_duckdb()
Create a DuckDB connection with the configured memory ceiling, spill directory, and auto-loaded extensions.
Per-query memory is bounded by :attr:Settings.duckdb_memory_limit
(env: LAKEHOUSE_DUCKDB_MEMORY_LIMIT, default 4GB); queries
exceeding the limit spill to
:attr:Settings.duckdb_temp_directory
(env: LAKEHOUSE_DUCKDB_TEMP_DIRECTORY) when set, otherwise to
the OS temp directory DuckDB picks by default.
autoinstall_known_extensions + autoload_known_extensions let
DuckDB lazy-resolve the Delta extension on the first delta_scan
call instead of every :func:register_view running an explicit
INSTALL delta / LOAD delta. INSTALL is idempotent and
cached on disk by DuckDB; LOAD runs at most once per connection.
Source code in ftm_lakehouse/logic/parquet.py
ftm_lakehouse.logic.parquet.register_view(con, dt, name=TABLE.name)
Register a DeltaTable as a DuckDB view via delta_scan.
The Delta extension exposes partition values as columns and uses Delta's
column statistics for file skipping on filtered queries, so per-partition
queries (WHERE shard = ? AND bucket = ? AND origin = ?) prune to one
partition's files automatically.
The Delta extension is auto-installed and auto-loaded on first
delta_scan use thanks to the connection-level flags set in
:func:make_duckdb, so this function does not run an explicit
INSTALL / LOAD.
DuckDB's delta_scan does not accept prepared parameters for its URI
argument, so the URI is interpolated as a SQL string literal. Single
quotes are doubled to prevent injection if a future code path lets a
dataset name (and thus the URI) carry a quote – primary validation is
in :func:ftm_lakehouse.util.validate_dataset_name.
Source code in ftm_lakehouse/logic/parquet.py
register_view uses delta_scan so the registered view resolves the current Delta log on every query – registering once per connection is enough; subsequent write_deltalake commits are picked up automatically.
ftm_lakehouse.logic.parquet.build_merge_query(shard, bucket, origin, grace_cutoff)
SQLAlchemy Select that collapses one partition.
The returned query:
- filters the source view to one
(shard, bucket, origin)partition; - computes
MIN(first_seen) OVER (PARTITION BY id)so the surviving row carries the earliestfirst_seenfor that statement id; - keeps the row with the latest
last_seenper id viaROW_NUMBER() OVER (PARTITION BY id ORDER BY last_seen DESC) = 1; - drops tombstones whose
deleted_atis older thangrace_cutoff; - orders by
(entity_id, id, last_seen DESC)so the rewritten parquet file is ready for future merges without re-sort.
Consumers can compose further filters via .where(...) on the
returned Select (e.g.
query.where(query.selected_columns.entity_id == entity_id) for a
single-entity merge). Compile to executable DuckDB SQL with
str(query.compile(compile_kwargs={"literal_binds": True})).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
shard
|
str
|
Target shard value (hex-padded). |
required |
bucket
|
str
|
Target bucket ( |
required |
origin
|
str
|
Target origin tag. |
required |
grace_cutoff
|
datetime
|
Tombstones with |
required |
Returns:
| Type | Description |
|---|---|
Select
|
A SQLAlchemy :class: |
Select
|
compiles to DuckDB SQL. |
Source code in ftm_lakehouse/logic/parquet.py
Returns a SQLAlchemy Select that consumers can compose with additional .where(...) clauses before compiling to DuckDB SQL via literal_binds=True.
Statement Serialization
Pack and unpack statements for compact storage in the journal data column:
from ftm_lakehouse.logic import pack_statement, unpack_statement
packed = pack_statement(stmt) # unit-separator delimited string
stmt = unpack_statement(packed) # back to Statement
ftm_lakehouse.helpers.statements.pack_statement(stmt)
Pack a Statement into a unit-separator delimited string.
id, entity_id, canonical_id, prop, schema, value, dataset,
lang, original_value, external, first_seen, last_seen, origin, prop_type
Source code in ftm_lakehouse/helpers/statements.py
ftm_lakehouse.helpers.statements.unpack_statement(data)
Unpack a unit-separator delimited string back into a Statement.
Raises:
| Type | Description |
|---|---|
MalformedStatementError
|
If |