Skip to content

logic

The logic module contains pure, stateless transformation functions with no infrastructure dependencies. Functions here take inputs and produce outputs without side effects.

Entity Aggregation

Aggregate a stream of statement dicts into FollowTheMoney entity dicts:

from ftm_lakehouse.logic import aggregate_unsafe

for entity in aggregate_unsafe(statement_dicts, "my_dataset"):
    print(f"{entity['id']}: {entity['caption']}")

aggregate_unsafe assumes the input is pre-sorted by canonical_id – the parquet store guarantees this for its queries.

ftm_lakehouse.logic.aggregate_unsafe(data, dataset=None)

Aggregate statement dicts (e.g. from DuckDB rows) to entity payloads.

Completely circumvents the dict -> Statement -> StatementEntity -> dict Python path, but therefore has no validation checks. Input must be sorted by canonical_id.

Source code in ftm_lakehouse/logic/entities/aggregate.py
def aggregate_unsafe(
    data: Iterator[StatementDict], dataset: str | None = None
) -> Iterator[EntityPayload]:
    """
    Aggregate statement dicts (e.g. from DuckDB rows) to entity payloads.

    Completely circumvents the dict -> Statement -> StatementEntity -> dict
    Python path, but therefore has no validation checks. Input must be sorted
    by canonical_id.
    """
    current: EntityPayload | None = None
    for statement in data:
        if current is None or statement["canonical_id"] != current.id:
            if current is not None:
                yield current
            current = EntityPayload(id=statement["canonical_id"], dataset=dataset)
        current.add(statement)
    if current is not None:
        yield current

Mapping Processing

Generate entities from FollowTheMoney mapping configurations:

from ftm_lakehouse.logic import map_entities
from ftm_lakehouse.model.mapping import DatasetMapping

mapping = DatasetMapping(
    dataset="my_dataset",
    content_hash="abc123...",
    queries=[...]
)

for entity in map_entities(mapping, csv_path):
    print(f"{entity.schema.name}: {entity.caption}")

ftm_lakehouse.logic.map_entities(mapping, csv_path)

Generate entities from a mapping configuration and source file.

Applies a FollowTheMoney mapping configuration to a CSV/tabular file and yields the resulting entities. Each entity is annotated with:

  • A proof property linking to the source file's content hash
  • An origin context identifying the mapping source

This function is the core transformation logic used by DatasetMappings.process(). It handles the iteration over mapping queries and record processing.

Parameters:

Name Type Description Default
mapping DatasetMapping

The mapping configuration containing query definitions

required
csv_path Path

Local path to the source CSV/tabular file

required

Yields:

Type Description
Entities

EntityProxy objects generated from the mapping

Example
from ftm_lakehouse.logic import map_entities
from ftm_lakehouse.model.mapping import DatasetMapping

mapping = DatasetMapping(
    dataset="my_dataset",
    content_hash="abc123...",
    queries=[...]  # FollowTheMoney mapping queries
)

for entity in map_entities(mapping, csv_path):
    print(f"{entity.schema.name}: {entity.caption}")

See Also:

- [FollowTheMoney Mappings](https://followthemoney.tech/docs/mappings/)
- `operations.MappingOperation` for high-level mapping workflow
Source code in ftm_lakehouse/logic/mappings.py
def map_entities(mapping: DatasetMapping, csv_path: Path) -> Entities:
    """
    Generate entities from a mapping configuration and source file.

    Applies a FollowTheMoney mapping configuration to a CSV/tabular file
    and yields the resulting entities. Each entity is annotated with:

    - A `proof` property linking to the source file's content hash
    - An `origin` context identifying the mapping source

    This function is the core transformation logic used by DatasetMappings.process().
    It handles the iteration over mapping queries and record processing.

    Args:
        mapping: The mapping configuration containing query definitions
        csv_path: Local path to the source CSV/tabular file

    Yields:
        EntityProxy objects generated from the mapping

    Example:
        ```python
        from ftm_lakehouse.logic import map_entities
        from ftm_lakehouse.model.mapping import DatasetMapping

        mapping = DatasetMapping(
            dataset="my_dataset",
            content_hash="abc123...",
            queries=[...]  # FollowTheMoney mapping queries
        )

        for entity in map_entities(mapping, csv_path):
            print(f"{entity.schema.name}: {entity.caption}")
        ```

    See Also:

        - [FollowTheMoney Mappings](https://followthemoney.tech/docs/mappings/)
        - `operations.MappingOperation` for high-level mapping workflow
    """
    origin = mapping_origin(mapping.content_hash)
    for query in mapping.queries:
        mapper = query.make_mapping(csv_path.as_posix(), mapping.dataset)
        for record in logged_items(mapper.source.records, "Map", 1000, "Row", log):
            for entity in mapper.map(record).values():
                entity.add("proof", mapping.content_hash)
                entity.context["origin"] = [origin]
                yield entity

Parquet helpers

DuckDB connection / view registration and the merge-query builder used by ParquetStore.

ftm_lakehouse.logic.parquet.make_duckdb()

Create a DuckDB connection with the configured memory ceiling, spill directory, and auto-loaded extensions.

Per-query memory is bounded by :attr:Settings.duckdb_memory_limit (env: LAKEHOUSE_DUCKDB_MEMORY_LIMIT, default 4GB); queries exceeding the limit spill to :attr:Settings.duckdb_temp_directory (env: LAKEHOUSE_DUCKDB_TEMP_DIRECTORY) when set, otherwise to the OS temp directory DuckDB picks by default.

autoinstall_known_extensions + autoload_known_extensions let DuckDB lazy-resolve the Delta extension on the first delta_scan call instead of every :func:register_view running an explicit INSTALL delta / LOAD delta. INSTALL is idempotent and cached on disk by DuckDB; LOAD runs at most once per connection.

Source code in ftm_lakehouse/logic/parquet.py
def make_duckdb() -> duckdb.DuckDBPyConnection:
    """Create a DuckDB connection with the configured memory ceiling,
    spill directory, and auto-loaded extensions.

    Per-query memory is bounded by :attr:`Settings.duckdb_memory_limit`
    (env: ``LAKEHOUSE_DUCKDB_MEMORY_LIMIT``, default ``4GB``); queries
    exceeding the limit spill to
    :attr:`Settings.duckdb_temp_directory`
    (env: ``LAKEHOUSE_DUCKDB_TEMP_DIRECTORY``) when set, otherwise to
    the OS temp directory DuckDB picks by default.

    ``autoinstall_known_extensions`` + ``autoload_known_extensions`` let
    DuckDB lazy-resolve the Delta extension on the first ``delta_scan``
    call instead of every :func:`register_view` running an explicit
    ``INSTALL delta`` / ``LOAD delta``. ``INSTALL`` is idempotent and
    cached on disk by DuckDB; ``LOAD`` runs at most once per connection.
    """
    settings = Settings()
    config: dict[str, str] = {
        "memory_limit": settings.duckdb_memory_limit,
        "autoinstall_known_extensions": "true",
        "autoload_known_extensions": "true",
    }
    if settings.duckdb_temp_directory:
        config["temp_directory"] = settings.duckdb_temp_directory
    return duckdb.connect(":memory:", config=config)

ftm_lakehouse.logic.parquet.register_view(con, dt, name=TABLE.name)

Register a DeltaTable as a DuckDB view via delta_scan.

The Delta extension exposes partition values as columns and uses Delta's column statistics for file skipping on filtered queries, so per-partition queries (WHERE shard = ? AND bucket = ? AND origin = ?) prune to one partition's files automatically.

The Delta extension is auto-installed and auto-loaded on first delta_scan use thanks to the connection-level flags set in :func:make_duckdb, so this function does not run an explicit INSTALL / LOAD.

DuckDB's delta_scan does not accept prepared parameters for its URI argument, so the URI is interpolated as a SQL string literal. Single quotes are doubled to prevent injection if a future code path lets a dataset name (and thus the URI) carry a quote – primary validation is in :func:ftm_lakehouse.util.validate_dataset_name.

Source code in ftm_lakehouse/logic/parquet.py
def register_view(
    con: duckdb.DuckDBPyConnection,
    dt: DeltaTable,
    name: str = TABLE.name,
) -> None:
    """Register a DeltaTable as a DuckDB view via ``delta_scan``.

    The Delta extension exposes partition values as columns and uses Delta's
    column statistics for file skipping on filtered queries, so per-partition
    queries (``WHERE shard = ? AND bucket = ? AND origin = ?``) prune to one
    partition's files automatically.

    The Delta extension is auto-installed and auto-loaded on first
    ``delta_scan`` use thanks to the connection-level flags set in
    :func:`make_duckdb`, so this function does not run an explicit
    ``INSTALL`` / ``LOAD``.

    DuckDB's ``delta_scan`` does not accept prepared parameters for its URI
    argument, so the URI is interpolated as a SQL string literal. Single
    quotes are doubled to prevent injection if a future code path lets a
    dataset name (and thus the URI) carry a quote – primary validation is
    in :func:`ftm_lakehouse.util.validate_dataset_name`.
    """
    table_uri = dt.table_uri.replace("'", "''")
    con.sql(f"CREATE OR REPLACE VIEW {name} AS SELECT * FROM delta_scan('{table_uri}')")

register_view uses delta_scan so the registered view resolves the current Delta log on every query – registering once per connection is enough; subsequent write_deltalake commits are picked up automatically.

ftm_lakehouse.logic.parquet.build_merge_query(shard, bucket, origin, grace_cutoff)

SQLAlchemy Select that collapses one partition.

The returned query:

  • filters the source view to one (shard, bucket, origin) partition;
  • computes MIN(first_seen) OVER (PARTITION BY id) so the surviving row carries the earliest first_seen for that statement id;
  • keeps the row with the latest last_seen per id via ROW_NUMBER() OVER (PARTITION BY id ORDER BY last_seen DESC) = 1;
  • drops tombstones whose deleted_at is older than grace_cutoff;
  • orders by (entity_id, id, last_seen DESC) so the rewritten parquet file is ready for future merges without re-sort.

Consumers can compose further filters via .where(...) on the returned Select (e.g. query.where(query.selected_columns.entity_id == entity_id) for a single-entity merge). Compile to executable DuckDB SQL with str(query.compile(compile_kwargs={"literal_binds": True})).

Parameters:

Name Type Description Default
shard str

Target shard value (hex-padded).

required
bucket str

Target bucket (thing / interval / document / page / pages / mention).

required
origin str

Target origin tag.

required
grace_cutoff datetime

Tombstones with deleted_at <= grace_cutoff are dropped. Typically now - LAKEHOUSE_GRACE_PERIOD_DAYS.

required

Returns:

Type Description
Select

A SQLAlchemy :class:~sqlalchemy.sql.expression.Select that

Select

compiles to DuckDB SQL.

Source code in ftm_lakehouse/logic/parquet.py
def build_merge_query(
    shard: str,
    bucket: str,
    origin: str,
    grace_cutoff: datetime,
) -> Select:
    """SQLAlchemy ``Select`` that collapses one partition.

    The returned query:

    - filters the source view to one ``(shard, bucket, origin)`` partition;
    - computes ``MIN(first_seen) OVER (PARTITION BY id)`` so the surviving
      row carries the earliest ``first_seen`` for that statement id;
    - keeps the row with the latest ``last_seen`` per id via
      ``ROW_NUMBER() OVER (PARTITION BY id ORDER BY last_seen DESC) = 1``;
    - drops tombstones whose ``deleted_at`` is older than ``grace_cutoff``;
    - orders by ``(entity_id, id, last_seen DESC)`` so the rewritten parquet
      file is ready for future merges without re-sort.

    Consumers can compose further filters via ``.where(...)`` on the
    returned Select (e.g.
    ``query.where(query.selected_columns.entity_id == entity_id)`` for a
    single-entity merge). Compile to executable DuckDB SQL with
    ``str(query.compile(compile_kwargs={"literal_binds": True}))``.

    Args:
        shard: Target shard value (hex-padded).
        bucket: Target bucket (``thing`` / ``interval`` / ``document`` /
            ``page`` / ``pages`` / ``mention``).
        origin: Target origin tag.
        grace_cutoff: Tombstones with ``deleted_at <= grace_cutoff`` are
            dropped. Typically ``now - LAKEHOUSE_GRACE_PERIOD_DAYS``.

    Returns:
        A SQLAlchemy :class:`~sqlalchemy.sql.expression.Select` that
        compiles to DuckDB SQL.
    """
    inner_cols = [c for c in TABLE.columns if c.name != "first_seen"]
    inner = (
        select(
            *inner_cols,
            func.min(TABLE.c.first_seen)
            .over(partition_by=TABLE.c.id)
            .label("first_seen"),
            func.row_number()
            .over(partition_by=TABLE.c.id, order_by=TABLE.c.last_seen.desc())
            .label("rn"),
        )
        .where(
            TABLE.c.shard == shard,
            TABLE.c.bucket == bucket,
            TABLE.c.origin == origin,
        )
        .subquery("merge_src")
    )

    return (
        select(*[c for c in inner.c if c.name != "rn"])
        .where(
            inner.c.rn == 1,
            or_(
                inner.c.deleted_at.is_(None),
                inner.c.deleted_at > grace_cutoff,
            ),
        )
        .order_by(inner.c.entity_id, inner.c.id, inner.c.last_seen.desc())
    )

Returns a SQLAlchemy Select that consumers can compose with additional .where(...) clauses before compiling to DuckDB SQL via literal_binds=True.

Statement Serialization

Pack and unpack statements for compact storage in the journal data column:

from ftm_lakehouse.logic import pack_statement, unpack_statement

packed = pack_statement(stmt)     # unit-separator delimited string
stmt   = unpack_statement(packed) # back to Statement

ftm_lakehouse.helpers.statements.pack_statement(stmt)

Pack a Statement into a unit-separator delimited string.

id, entity_id, canonical_id, prop, schema, value, dataset,

lang, original_value, external, first_seen, last_seen, origin, prop_type

Source code in ftm_lakehouse/helpers/statements.py
def pack_statement(stmt: Statement) -> str:
    """
    Pack a Statement into a unit-separator delimited string.

    Format: id, entity_id, canonical_id, prop, schema, value, dataset,
            lang, original_value, external, first_seen, last_seen, origin, prop_type
    """
    parts = [
        stmt.id or "",
        stmt.entity_id,
        stmt.canonical_id or stmt.entity_id,
        stmt.prop,
        stmt.schema,
        stmt.value,
        stmt.dataset,
        stmt.lang or "",
        stmt.original_value or "",
        "1" if stmt.external else "0",
        _to_iso(stmt.first_seen),
        _to_iso(stmt.last_seen),
        stmt.origin or DEFAULT_ORIGIN,
        stmt.prop_type or "",
    ]
    return UNIT_SEP.join(parts)

ftm_lakehouse.helpers.statements.unpack_statement(data)

Unpack a unit-separator delimited string back into a Statement.

Raises:

Type Description
MalformedStatementError

If data has fewer than :data:UNPACK_MIN_FIELDS separator-delimited fields. The journal flush loop catches this and logs+skips the row so one bad row can't abort an entire flush.

Source code in ftm_lakehouse/helpers/statements.py
def unpack_statement(data: str) -> Statement:
    """Unpack a unit-separator delimited string back into a Statement.

    Raises:
        MalformedStatementError: If ``data`` has fewer than
            :data:`UNPACK_MIN_FIELDS` separator-delimited fields. The
            journal flush loop catches this and logs+skips the row so
            one bad row can't abort an entire flush.
    """
    parts = data.split(UNIT_SEP)
    if len(parts) < UNPACK_MIN_FIELDS:
        raise MalformedStatementError(
            f"Packed statement has {len(parts)} fields; "
            f"expected at least {UNPACK_MIN_FIELDS}"
        )
    return Statement(
        id=parts[0] or None,
        entity_id=parts[1],  # required
        canonical_id=parts[2] or None,
        prop=parts[3],  # required
        schema=parts[4],  # required
        value=parts[5],  # required
        dataset=parts[6],  # required
        lang=parts[7] or None,
        original_value=parts[8] or None,
        external=parts[9] == "1",
        first_seen=parts[10] or None,
        last_seen=parts[11] or None,
        origin=parts[12] or None,
    )