Skip to content

logic

The logic module contains pure stateless transformation functions with no infrastructure dependencies. Functions here take inputs and produce outputs without side effects.

Entity Aggregation

Aggregate statement streams into FollowTheMoney entities:

from ftm_lakehouse.logic import aggregate_statements
from followthemoney.statement.serialize import read_csv_statements

with open("statements.csv") as f:
    statements = read_csv_statements(f)
    for entity in aggregate_statements(statements, "my_dataset"):
        print(f"{entity.id}: {entity.caption}")

ftm_lakehouse.logic.aggregate_statements(stmts, dataset)

Aggregate sorted statements into entities.

Takes a stream of statements sorted by canonical_id and yields StatementEntity objects by grouping consecutive statements with the same canonical_id.

This function is the core entity assembly logic used when exporting entities from the statement store. It expects statements to be pre-sorted by canonical_id for correct grouping.

Parameters:

Name Type Description Default
stmts Statements

Iterable of statements, must be sorted by canonical_id

required
dataset str

Dataset name for the resulting entities

required

Yields:

Type Description
StatementEntities

StatementEntity for each unique canonical_id

Example
from ftm_lakehouse.logic import aggregate_statements
from followthemoney.statement.serialize import read_csv_statements

# Read sorted statements from CSV
with open("statements.csv") as f:
    statements = read_csv_statements(f)
    for entity in aggregate_statements(statements, "my_dataset"):
        print(f"{entity.id}: {entity.caption}")
Source code in ftm_lakehouse/logic/entities.py
def aggregate_statements(stmts: Statements, dataset: str) -> StatementEntities:
    """
    Aggregate sorted statements into entities.

    Takes a stream of statements sorted by canonical_id and yields
    StatementEntity objects by grouping consecutive statements with
    the same canonical_id.

    This function is the core entity assembly logic used when exporting
    entities from the statement store. It expects statements to be pre-sorted
    by canonical_id for correct grouping.

    Args:
        stmts: Iterable of statements, must be sorted by canonical_id
        dataset: Dataset name for the resulting entities

    Yields:
        StatementEntity for each unique canonical_id

    Example:
        ```python
        from ftm_lakehouse.logic import aggregate_statements
        from followthemoney.statement.serialize import read_csv_statements

        # Read sorted statements from CSV
        with open("statements.csv") as f:
            statements = read_csv_statements(f)
            for entity in aggregate_statements(statements, "my_dataset"):
                print(f"{entity.id}: {entity.caption}")
        ```
    """
    ds = make_dataset(dataset)
    statements: list[Statement] = []
    for s in stmts:
        if len(statements) and statements[0].canonical_id != s.canonical_id:
            yield StatementEntity.from_statements(ds, statements)
            statements = []
        statements.append(s)
    if len(statements):
        yield StatementEntity.from_statements(ds, statements)

Mapping Processing

Generate entities from FollowTheMoney mapping configurations:

from ftm_lakehouse.logic import map_entities
from ftm_lakehouse.model.mapping import DatasetMapping

mapping = DatasetMapping(
    dataset="my_dataset",
    content_hash="abc123...",
    queries=[...]
)

for entity in map_entities(mapping, csv_path):
    print(f"{entity.schema.name}: {entity.caption}")

ftm_lakehouse.logic.map_entities(mapping, csv_path)

Generate entities from a mapping configuration and source file.

Applies a FollowTheMoney mapping configuration to a CSV/tabular file and yields the resulting entities. Each entity is annotated with:

  • A proof property linking to the source file's content hash
  • An origin context identifying the mapping source

This function is the core transformation logic used by DatasetMappings.process(). It handles the iteration over mapping queries and record processing.

Parameters:

Name Type Description Default
mapping DatasetMapping

The mapping configuration containing query definitions

required
csv_path Path

Local path to the source CSV/tabular file

required

Yields:

Type Description
Entities

EntityProxy objects generated from the mapping

Example
from ftm_lakehouse.logic import map_entities
from ftm_lakehouse.model.mapping import DatasetMapping

mapping = DatasetMapping(
    dataset="my_dataset",
    content_hash="abc123...",
    queries=[...]  # FollowTheMoney mapping queries
)

for entity in map_entities(mapping, csv_path):
    print(f"{entity.schema.name}: {entity.caption}")

See Also:

- [FollowTheMoney Mappings](https://followthemoney.tech/docs/mappings/)
- `operations.MappingOperation` for high-level mapping workflow
Source code in ftm_lakehouse/logic/mappings.py
def map_entities(mapping: DatasetMapping, csv_path: Path) -> Entities:
    """
    Generate entities from a mapping configuration and source file.

    Applies a FollowTheMoney mapping configuration to a CSV/tabular file
    and yields the resulting entities. Each entity is annotated with:

    - A `proof` property linking to the source file's content hash
    - An `origin` context identifying the mapping source

    This function is the core transformation logic used by DatasetMappings.process().
    It handles the iteration over mapping queries and record processing.

    Args:
        mapping: The mapping configuration containing query definitions
        csv_path: Local path to the source CSV/tabular file

    Yields:
        EntityProxy objects generated from the mapping

    Example:
        ```python
        from ftm_lakehouse.logic import map_entities
        from ftm_lakehouse.model.mapping import DatasetMapping

        mapping = DatasetMapping(
            dataset="my_dataset",
            content_hash="abc123...",
            queries=[...]  # FollowTheMoney mapping queries
        )

        for entity in map_entities(mapping, csv_path):
            print(f"{entity.schema.name}: {entity.caption}")
        ```

    See Also:

        - [FollowTheMoney Mappings](https://followthemoney.tech/docs/mappings/)
        - `operations.MappingOperation` for high-level mapping workflow
    """
    origin = mapping_origin(mapping.content_hash)
    for query in mapping.queries:
        mapper = query.make_mapping(csv_path.as_posix(), mapping.dataset)
        for record in logged_items(mapper.source.records, "Map", 1000, "Row", log):
            for entity in mapper.map(record).values():
                entity.add("proof", mapping.content_hash)
                entity.context["origin"] = [origin]
                yield entity

Translog-Aware Parquet Operations

Pure DuckDB/PyArrow functions that operate on the main statement table and translog metadata table. Used internally by ParquetStore, TranslogStore, and TranslogAwareLakeStore.

ftm_lakehouse.logic.parquet.translog_aware_sql(compiled_query, dt)

Wrap a compiled SQL query with a CTE that joins the translog.

The CTE joins the main arrow table with the translog to: - Use translog's first_seen/last_seen instead of main table's - Filter out rows where translog.deleted_at IS NOT NULL

Source code in ftm_lakehouse/logic/parquet.py
def translog_aware_sql(compiled_query: str, dt: DeltaTable) -> str:
    """Wrap a compiled SQL query with a CTE that joins the translog.

    The CTE joins the main arrow table with the translog to:
    - Use translog's first_seen/last_seen instead of main table's
    - Filter out rows where translog.deleted_at IS NOT NULL
    """
    all_cols = [f.name for f in dt.schema().to_arrow()]
    # Use translog's first_seen/last_seen instead of main table's
    main_cols = [f"arrow.{c}" for c in all_cols if c not in ("first_seen", "last_seen")]
    select_cols = ", ".join(main_cols + ["sc.first_seen", "sc.last_seen"])

    cte = f"""WITH __live AS (
    SELECT {select_cols}
    FROM arrow
    JOIN translog sc ON arrow.id = sc.id
    WHERE sc.deleted_at IS NULL
)
"""
    rewritten = compiled_query.replace(
        "FROM arrow as statement", "FROM __live as statement"
    )
    return cte + rewritten

ftm_lakehouse.logic.parquet.stream_duckdb_translog(q, dt, translog_dt)

Like stream_duckdb but joins with translog for accurate timestamps and soft deletes.

Source code in ftm_lakehouse/logic/parquet.py
def stream_duckdb_translog(
    q, dt: DeltaTable, translog_dt: DeltaTable
) -> Generator[Row, None, None]:
    """Like stream_duckdb but joins with translog for accurate timestamps and soft deletes."""
    con = duckdb.connect()
    con.register("arrow", dt.to_pyarrow_dataset())
    con.register("translog", translog_dt.to_pyarrow_dataset())
    compiled = compile_query(q)
    sql = translog_aware_sql(compiled, dt)
    rel = con.sql(sql)
    columns = rel.columns
    while rows := rel.fetchmany(100_000):
        for row in rows:
            yield Row(dict(zip(columns, row)))

ftm_lakehouse.logic.parquet.query_duckdb_translog(q, dt, translog_dt)

Like query_duckdb but joins with translog for accurate timestamps and soft deletes.

Returns (relation, connection) tuple. Caller must hold the connection reference to prevent GC from closing it while the relation is still in use.

Source code in ftm_lakehouse/logic/parquet.py
def query_duckdb_translog(
    q, dt: DeltaTable, translog_dt: DeltaTable
) -> tuple[duckdb.DuckDBPyRelation, duckdb.DuckDBPyConnection]:
    """Like query_duckdb but joins with translog for accurate timestamps and soft deletes.

    Returns (relation, connection) tuple. Caller must hold the connection reference
    to prevent GC from closing it while the relation is still in use.
    """
    con = duckdb.connect()
    con.register("arrow", dt.to_pyarrow_dataset())
    con.register("translog", translog_dt.to_pyarrow_dataset())
    compiled = compile_query(q)
    sql = translog_aware_sql(compiled, dt)
    return con.sql(sql), con

ftm_lakehouse.logic.parquet.compact_with_translog(dt, translog_dt)

Join main table with translog, returning only live rows with accurate timestamps.

Parameters:

Name Type Description Default
dt DeltaTable

Main statement DeltaTable

required
translog_dt DeltaTable

Translog metadata DeltaTable

required

Returns:

Type Description
RecordBatchReader

RecordBatchReader of live rows with translog timestamps applied

Source code in ftm_lakehouse/logic/parquet.py
def compact_with_translog(
    dt: DeltaTable, translog_dt: DeltaTable
) -> pa.RecordBatchReader:
    """Join main table with translog, returning only live rows with accurate timestamps.

    Args:
        dt: Main statement DeltaTable
        translog_dt: Translog metadata DeltaTable

    Returns:
        RecordBatchReader of live rows with translog timestamps applied
    """
    all_cols = [f.name for f in dt.schema().to_arrow()]
    main_cols = [f"arrow.{c}" for c in all_cols if c not in ("first_seen", "last_seen")]
    select_cols = ", ".join(main_cols + ["sc.first_seen", "sc.last_seen"])

    con = duckdb.connect()
    con.register("arrow", dt.to_pyarrow_dataset())
    con.register("translog", translog_dt.to_pyarrow_dataset())
    return con.sql(
        f"SELECT {select_cols} FROM arrow "
        "JOIN translog sc ON arrow.id = sc.id "
        "WHERE sc.deleted_at IS NULL"
    ).fetch_arrow_reader()

ftm_lakehouse.logic.parquet.get_deleted_entity_ids(dt, translog_dt)

Get entity IDs that have been soft-deleted via translog.

Parameters:

Name Type Description Default
dt DeltaTable

Main statement DeltaTable

required
translog_dt DeltaTable

Translog metadata DeltaTable

required

Returns:

Type Description
set[str]

Set of entity_id strings with at least one deleted statement

Source code in ftm_lakehouse/logic/parquet.py
def get_deleted_entity_ids(dt: DeltaTable, translog_dt: DeltaTable) -> set[str]:
    """Get entity IDs that have been soft-deleted via translog.

    Args:
        dt: Main statement DeltaTable
        translog_dt: Translog metadata DeltaTable

    Returns:
        Set of entity_id strings with at least one deleted statement
    """
    con = duckdb.connect()
    con.register("arrow", dt.to_pyarrow_dataset())
    con.register("translog", translog_dt.to_pyarrow_dataset())
    result = con.execute(
        "SELECT DISTINCT arrow.entity_id FROM arrow "
        "JOIN translog sc ON arrow.id = sc.id "
        "WHERE sc.deleted_at IS NOT NULL"
    )
    return {r[0] for r in result.fetchall()}

ftm_lakehouse.logic.parquet.filter_live_translog(translog_dt)

Return only live (non-deleted) translog rows.

Parameters:

Name Type Description Default
translog_dt DeltaTable

Translog metadata DeltaTable

required

Returns:

Type Description
RecordBatchReader

RecordBatchReader of translog rows where deleted_at IS NULL

Source code in ftm_lakehouse/logic/parquet.py
def filter_live_translog(translog_dt: DeltaTable) -> pa.RecordBatchReader:
    """Return only live (non-deleted) translog rows.

    Args:
        translog_dt: Translog metadata DeltaTable

    Returns:
        RecordBatchReader of translog rows where deleted_at IS NULL
    """
    rel = duckdb.arrow(translog_dt.to_pyarrow_dataset())
    return rel.query(
        "translog", "SELECT * FROM translog WHERE deleted_at IS NULL"
    ).fetch_arrow_reader()

Statement Serialization

Pack and unpack statements for efficient storage:

from ftm_lakehouse.logic import pack_statement, unpack_statement
from followthemoney import Statement

# Pack a statement to string
packed = pack_statement(stmt)

# Unpack back to Statement
stmt = unpack_statement(packed)

pack_statement

def pack_statement(stmt: Statement) -> str

Pack a Statement into a null-byte joined string for compact storage.

Args:

  • stmt: A FollowTheMoney Statement object

Returns: Serialized string representation

unpack_statement

def unpack_statement(data: str) -> Statement

Unpack a null-byte joined string back into a Statement.

Args:

  • data: Serialized statement string from pack_statement

Returns: Reconstructed Statement object