Skip to content

Layer 2: Storage

Single-purpose storage interfaces. Each store does one thing.

JournalStore

SQL statement buffer for write-ahead logging.

ftm_lakehouse.storage.JournalStore = SqlJournalStore module-attribute

ParquetStore

Delta Lake parquet storage for statements. Uses a translog metadata table for tracking timestamps and soft deletes.

ftm_lakehouse.storage.ParquetStore

Bases: LakehouseApiMixin

Delta Lake parquet storage for entity statements.

Wraps ftmq's LakeStore to provide statement storage with: - Partitioned parquet files (by bucket, origin) - Delta Lake transaction log for versioning - Translog metadata table for timestamps and soft deletes - Change data capture (CDC) support - Efficient querying via DuckDB

Layout: statements/bucket={bucket}/origin={origin}/{auto-identifier}.parquet

Source code in ftm_lakehouse/storage/parquet.py
class ParquetStore(LakehouseApiMixin):
    """
    Delta Lake parquet storage for entity statements.

    Wraps ftmq's LakeStore to provide statement storage with:
    - Partitioned parquet files (by bucket, origin)
    - Delta Lake transaction log for versioning
    - Translog metadata table for timestamps and soft deletes
    - Change data capture (CDC) support
    - Efficient querying via DuckDB

    Layout: statements/bucket={bucket}/origin={origin}/{auto-identifier}.parquet
    """

    TABLE = _TABLE

    def __init__(self, uri: Uri, dataset: str) -> None:
        self.uri = join_uri(uri, path.STATEMENTS)
        super().__init__(self.uri)
        self.dataset = dataset
        self._translog = TranslogStore(uri, dataset)
        self._store = TranslogAwareLakeStore(
            uri=self.uri,
            dataset=dataset,
            partition_by=PARTITIONS,
            translog=self._translog,
        )
        self.log = get_logger(
            f"{self.dataset}.{self.__class__.__name__}",
            dataset=self.dataset,
            uri=mask_uri(self.uri),
        )
        setup_duckdb_storage()

    @property
    def version(self) -> int | None:
        """Current version of the main Delta table."""
        if self._store.exists:
            return self._store.deltatable.version()

    @property
    def translog_version(self) -> int | None:
        """Current version of the translog Delta table."""
        if not self._translog.exists:
            return None
        return self._translog.deltatable.version()

    @property
    def exists(self) -> bool:
        """Check existence of deltatable"""
        return self._store.exists

    @no_api
    def writer(self, origin: str | None = None) -> LakeWriter:
        """Get a writer for adding statements."""
        return self._store.writer(origin)

    @no_api
    def view(self) -> LakeQueryView:
        """Get a view for querying statements."""
        return self._store.default_view()

    @no_api
    def query(self, q: Query | None = None) -> StatementEntities:
        """
        Query Entities from the store.

        Args:
            q: Optional Query object with filters

        Yields:
            StatementEntity objects matching the query
        """
        view = self.view()
        yield from view.query(q or Query())

    @no_api
    def query_statements(self, q: Select | None = None) -> Statements:
        """
        Query ordered Statements from the store.

        Args:
            q: Optional SQLAlchemy query (default: Query().sql.statements)

        Yields:
            Statement objects matching the query
        """
        view = self.view()
        yield from view.store._iterate_stmts(
            q if q is not None else Query().sql.statements
        )

    @no_api
    def stats(self) -> DatasetStats:
        """Compute statistics from the statement store."""
        return self.view().stats()

    @no_api
    def export_csv(self, output_uri: str) -> None:
        """
        Export statements to a sorted, de-duplicated CSV file.

        Args:
            output_uri: Destination URI for the CSV file
        """
        self._store._backend.ensure_parent(output_uri)
        dt = self._store.deltatable
        q = Query().sql.statements
        if self._translog.exists:
            db, _ = query_duckdb_translog(q, dt, self._translog.deltatable)
        else:
            db = query_duckdb(q, dt)
        db.write_csv(output_uri)

    @no_api
    def query_raw(self, q: Select | None = None) -> Iterator[dict[str, Any]]:
        """
        Query entity dicts via aggregate_unsafe(), bypassing FtM object construction.

        Args:
            q: Optional SQLAlchemy select (default: Query().sql.statements)

        Yields:
            Entity dicts (id, schema, properties, caption, ...)
        """
        if not self.exists:
            return
        dt = self._store.deltatable
        if q is None:
            q = Query().sql.statements

        if self._translog.exists:
            rel, con = query_duckdb_translog(q, dt, self._translog.deltatable)
        else:
            rel = query_duckdb(q, dt)
            con = None  # noqa: F841 — prevent GC of translog connection

        columns = rel.columns
        yield from aggregate_unsafe(
            dict(zip(columns, row))
            for batch in iter(lambda: rel.fetchmany(100_000), [])
            for row in batch
        )

    @no_api
    def compact(self) -> None:
        """Apply translog to main table: remove deleted rows, update timestamps.

        After compact the main table is self-contained (accurate first_seen/
        last_seen, no deleted rows) and the translog only contains live entries.
        Caller should call optimize() afterwards for file compaction.
        """
        if not self._translog.exists:
            return

        live = compact_with_translog(self._store.deltatable, self._translog.deltatable)

        write_deltalake(
            str(self.uri),
            live,
            partition_by=PARTITIONS,
            mode="overwrite",
            schema_mode="overwrite",
            storage_options=storage_options(),
            configuration={"delta.enableChangeDataFeed": "true"},
        )

        self._translog.compact()

    @no_api
    def get_deleted_entity_ids(self) -> set[str]:
        """Get entity IDs that have been soft-deleted via translog."""
        if not self._translog.exists:
            return set()

        return _get_deleted_entity_ids(
            self._store.deltatable, self._translog.deltatable
        )

    @no_api
    def get_changes(
        self,
        start_version: int | None = None,
        end_version: int | None = None,
    ) -> Generator[tuple[datetime, str, dict], None, None]:
        """
        Get statement changes for a version range using change data capture.

        Args:
            start_version: Starting version number (default: 0)
            end_version: Ending version number (default: latest)

        Yields:
            Tuples of (commit_timestamp, change_type, row_dict)
        """
        reader = self._store.deltatable.load_cdf(
            starting_version=start_version or 0,
            ending_version=end_version,
        )
        try:
            while batch := reader.read_next_batch():
                for row in batch.to_struct_array().to_pylist():
                    yield (
                        row["_commit_timestamp"],
                        row["_change_type"],
                        row,
                    )
        except StopIteration:
            return

    @no_api
    def optimize(
        self,
        vacuum: bool = False,
        vacuum_keep_hours: int = 0,
        bucket: str | None = None,
        origin: str | None = None,
    ) -> None:
        """
        Optimize the store by compacting small files.

        Args:
            vacuum: Also delete old file versions
            vacuum_keep_hours: Hours of history to retain when vacuuming
            bucket: Filter optimization to specific bucket partition
            origin: Filter optimization to specific origin partition
        """
        writer = self._store.writer()
        writer.optimize(vacuum, vacuum_keep_hours, bucket=bucket, origin=origin)

    @no_api
    def destroy(self) -> None:
        """
        Destroy the deltalake by removing the transaction log in "_delta_log"
        directory. This is soft deleting, as the parquet files remain (but will
        be cleaned up on optimize --vacuum)
        """
        with Took() as t:
            self.log.warn("🔥 Destroying deltalake store ...")
            for key in self._store._backend.iterate_keys("_delta_log"):
                self._store._backend.delete(key)
        self.log.info("Deleted statement store.", took=t.took)

exists property

Check existence of deltatable

translog_version property

Current version of the translog Delta table.

version property

Current version of the main Delta table.

compact()

Apply translog to main table: remove deleted rows, update timestamps.

After compact the main table is self-contained (accurate first_seen/ last_seen, no deleted rows) and the translog only contains live entries. Caller should call optimize() afterwards for file compaction.

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def compact(self) -> None:
    """Apply translog to main table: remove deleted rows, update timestamps.

    After compact the main table is self-contained (accurate first_seen/
    last_seen, no deleted rows) and the translog only contains live entries.
    Caller should call optimize() afterwards for file compaction.
    """
    if not self._translog.exists:
        return

    live = compact_with_translog(self._store.deltatable, self._translog.deltatable)

    write_deltalake(
        str(self.uri),
        live,
        partition_by=PARTITIONS,
        mode="overwrite",
        schema_mode="overwrite",
        storage_options=storage_options(),
        configuration={"delta.enableChangeDataFeed": "true"},
    )

    self._translog.compact()

destroy()

Destroy the deltalake by removing the transaction log in "_delta_log" directory. This is soft deleting, as the parquet files remain (but will be cleaned up on optimize --vacuum)

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def destroy(self) -> None:
    """
    Destroy the deltalake by removing the transaction log in "_delta_log"
    directory. This is soft deleting, as the parquet files remain (but will
    be cleaned up on optimize --vacuum)
    """
    with Took() as t:
        self.log.warn("🔥 Destroying deltalake store ...")
        for key in self._store._backend.iterate_keys("_delta_log"):
            self._store._backend.delete(key)
    self.log.info("Deleted statement store.", took=t.took)

export_csv(output_uri)

Export statements to a sorted, de-duplicated CSV file.

Parameters:

Name Type Description Default
output_uri str

Destination URI for the CSV file

required
Source code in ftm_lakehouse/storage/parquet.py
@no_api
def export_csv(self, output_uri: str) -> None:
    """
    Export statements to a sorted, de-duplicated CSV file.

    Args:
        output_uri: Destination URI for the CSV file
    """
    self._store._backend.ensure_parent(output_uri)
    dt = self._store.deltatable
    q = Query().sql.statements
    if self._translog.exists:
        db, _ = query_duckdb_translog(q, dt, self._translog.deltatable)
    else:
        db = query_duckdb(q, dt)
    db.write_csv(output_uri)

get_changes(start_version=None, end_version=None)

Get statement changes for a version range using change data capture.

Parameters:

Name Type Description Default
start_version int | None

Starting version number (default: 0)

None
end_version int | None

Ending version number (default: latest)

None

Yields:

Type Description
tuple[datetime, str, dict]

Tuples of (commit_timestamp, change_type, row_dict)

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def get_changes(
    self,
    start_version: int | None = None,
    end_version: int | None = None,
) -> Generator[tuple[datetime, str, dict], None, None]:
    """
    Get statement changes for a version range using change data capture.

    Args:
        start_version: Starting version number (default: 0)
        end_version: Ending version number (default: latest)

    Yields:
        Tuples of (commit_timestamp, change_type, row_dict)
    """
    reader = self._store.deltatable.load_cdf(
        starting_version=start_version or 0,
        ending_version=end_version,
    )
    try:
        while batch := reader.read_next_batch():
            for row in batch.to_struct_array().to_pylist():
                yield (
                    row["_commit_timestamp"],
                    row["_change_type"],
                    row,
                )
    except StopIteration:
        return

get_deleted_entity_ids()

Get entity IDs that have been soft-deleted via translog.

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def get_deleted_entity_ids(self) -> set[str]:
    """Get entity IDs that have been soft-deleted via translog."""
    if not self._translog.exists:
        return set()

    return _get_deleted_entity_ids(
        self._store.deltatable, self._translog.deltatable
    )

optimize(vacuum=False, vacuum_keep_hours=0, bucket=None, origin=None)

Optimize the store by compacting small files.

Parameters:

Name Type Description Default
vacuum bool

Also delete old file versions

False
vacuum_keep_hours int

Hours of history to retain when vacuuming

0
bucket str | None

Filter optimization to specific bucket partition

None
origin str | None

Filter optimization to specific origin partition

None
Source code in ftm_lakehouse/storage/parquet.py
@no_api
def optimize(
    self,
    vacuum: bool = False,
    vacuum_keep_hours: int = 0,
    bucket: str | None = None,
    origin: str | None = None,
) -> None:
    """
    Optimize the store by compacting small files.

    Args:
        vacuum: Also delete old file versions
        vacuum_keep_hours: Hours of history to retain when vacuuming
        bucket: Filter optimization to specific bucket partition
        origin: Filter optimization to specific origin partition
    """
    writer = self._store.writer()
    writer.optimize(vacuum, vacuum_keep_hours, bucket=bucket, origin=origin)

query(q=None)

Query Entities from the store.

Parameters:

Name Type Description Default
q Query | None

Optional Query object with filters

None

Yields:

Type Description
StatementEntities

StatementEntity objects matching the query

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def query(self, q: Query | None = None) -> StatementEntities:
    """
    Query Entities from the store.

    Args:
        q: Optional Query object with filters

    Yields:
        StatementEntity objects matching the query
    """
    view = self.view()
    yield from view.query(q or Query())

query_raw(q=None)

Query entity dicts via aggregate_unsafe(), bypassing FtM object construction.

Parameters:

Name Type Description Default
q Select | None

Optional SQLAlchemy select (default: Query().sql.statements)

None

Yields:

Type Description
dict[str, Any]

Entity dicts (id, schema, properties, caption, ...)

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def query_raw(self, q: Select | None = None) -> Iterator[dict[str, Any]]:
    """
    Query entity dicts via aggregate_unsafe(), bypassing FtM object construction.

    Args:
        q: Optional SQLAlchemy select (default: Query().sql.statements)

    Yields:
        Entity dicts (id, schema, properties, caption, ...)
    """
    if not self.exists:
        return
    dt = self._store.deltatable
    if q is None:
        q = Query().sql.statements

    if self._translog.exists:
        rel, con = query_duckdb_translog(q, dt, self._translog.deltatable)
    else:
        rel = query_duckdb(q, dt)
        con = None  # noqa: F841 — prevent GC of translog connection

    columns = rel.columns
    yield from aggregate_unsafe(
        dict(zip(columns, row))
        for batch in iter(lambda: rel.fetchmany(100_000), [])
        for row in batch
    )

query_statements(q=None)

Query ordered Statements from the store.

Parameters:

Name Type Description Default
q Select | None

Optional SQLAlchemy query (default: Query().sql.statements)

None

Yields:

Type Description
Statements

Statement objects matching the query

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def query_statements(self, q: Select | None = None) -> Statements:
    """
    Query ordered Statements from the store.

    Args:
        q: Optional SQLAlchemy query (default: Query().sql.statements)

    Yields:
        Statement objects matching the query
    """
    view = self.view()
    yield from view.store._iterate_stmts(
        q if q is not None else Query().sql.statements
    )

stats()

Compute statistics from the statement store.

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def stats(self) -> DatasetStats:
    """Compute statistics from the statement store."""
    return self.view().stats()

view()

Get a view for querying statements.

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def view(self) -> LakeQueryView:
    """Get a view for querying statements."""
    return self._store.default_view()

writer(origin=None)

Get a writer for adding statements.

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def writer(self, origin: str | None = None) -> LakeWriter:
    """Get a writer for adding statements."""
    return self._store.writer(origin)

TranslogStore

Lightweight Delta table for per-statement metadata (first_seen, last_seen, deleted_at). Used internally by ParquetStore.

ftm_lakehouse.storage.parquet.TranslogStore

Bases: LakehouseApiMixin

Manages a lightweight translog Delta table for per-statement metadata.

Tracks first_seen, last_seen, and deleted_at per statement ID. The main parquet table stores immutable FtM statements; the translog provides mutable metadata via Delta Lake MERGE operations.

Source code in ftm_lakehouse/storage/parquet.py
class TranslogStore(LakehouseApiMixin):
    """Manages a lightweight translog Delta table for per-statement metadata.

    Tracks first_seen, last_seen, and deleted_at per statement ID.
    The main parquet table stores immutable FtM statements; the translog
    provides mutable metadata via Delta Lake MERGE operations.
    """

    def __init__(self, uri: Uri, dataset: str) -> None:
        self.uri = join_uri(uri, path.TRANSLOG)
        super().__init__(self.uri)
        self.dataset = dataset
        setup_duckdb_storage()

    @property
    def deltatable(self) -> DeltaTable:
        return DeltaTable(str(self.uri), storage_options=storage_options())

    @property
    def exists(self) -> bool:
        try:
            self.deltatable.version()
            return True
        except TableNotFoundError:
            return False

    @no_api
    def upsert(self, table: pa.Table) -> None:
        """Insert or update translog rows. Updates last_seen on conflict."""
        if not self.exists:
            write_deltalake(
                str(self.uri),
                table,
                mode="overwrite",
                schema_mode="overwrite",
                storage_options=storage_options(),
            )
            return
        (
            self.deltatable.merge(
                source=table,
                predicate="target.id = source.id",
                source_alias="source",
                target_alias="target",
            )
            .when_matched_update(
                {
                    "last_seen": "source.last_seen",
                }
            )
            .when_not_matched_insert_all()
            .execute()
        )

    @no_api
    def mark_deleted(self, table: pa.Table) -> None:
        """Set deleted_at on existing translog rows.

        Args:
            table: PyArrow table with columns (id, deleted_at)
        """
        if not self.exists:
            return
        (
            self.deltatable.merge(
                source=table,
                predicate="target.id = source.id",
                source_alias="source",
                target_alias="target",
            )
            .when_matched_update(
                {
                    "deleted_at": "source.deleted_at",
                }
            )
            .execute()
        )

    @no_api
    def compact(self) -> None:
        """Remove deleted entries from translog."""
        if not self.exists:
            return

        live = filter_live_translog(self.deltatable)
        write_deltalake(
            str(self.uri),
            live,
            mode="overwrite",
            schema_mode="overwrite",
            storage_options=storage_options(),
        )

compact()

Remove deleted entries from translog.

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def compact(self) -> None:
    """Remove deleted entries from translog."""
    if not self.exists:
        return

    live = filter_live_translog(self.deltatable)
    write_deltalake(
        str(self.uri),
        live,
        mode="overwrite",
        schema_mode="overwrite",
        storage_options=storage_options(),
    )

mark_deleted(table)

Set deleted_at on existing translog rows.

Parameters:

Name Type Description Default
table Table

PyArrow table with columns (id, deleted_at)

required
Source code in ftm_lakehouse/storage/parquet.py
@no_api
def mark_deleted(self, table: pa.Table) -> None:
    """Set deleted_at on existing translog rows.

    Args:
        table: PyArrow table with columns (id, deleted_at)
    """
    if not self.exists:
        return
    (
        self.deltatable.merge(
            source=table,
            predicate="target.id = source.id",
            source_alias="source",
            target_alias="target",
        )
        .when_matched_update(
            {
                "deleted_at": "source.deleted_at",
            }
        )
        .execute()
    )

upsert(table)

Insert or update translog rows. Updates last_seen on conflict.

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def upsert(self, table: pa.Table) -> None:
    """Insert or update translog rows. Updates last_seen on conflict."""
    if not self.exists:
        write_deltalake(
            str(self.uri),
            table,
            mode="overwrite",
            schema_mode="overwrite",
            storage_options=storage_options(),
        )
        return
    (
        self.deltatable.merge(
            source=table,
            predicate="target.id = source.id",
            source_alias="source",
            target_alias="target",
        )
        .when_matched_update(
            {
                "last_seen": "source.last_seen",
            }
        )
        .when_not_matched_insert_all()
        .execute()
    )

TagStore

Key-value freshness tracking.

ftm_lakehouse.storage.TagStore

Bases: Tags

Key-value store for freshness tracking.

Tags are timestamps stored as key-value pairs, used to track when resources were last updated and determine if processing is needed.

Layout: tags/{tenant}/{key}

This store has the "tags/{tenant}" key prefix set, so clients must use relative paths from there.

Source code in ftm_lakehouse/storage/tags.py
class TagStore(AnyTags):
    """
    Key-value store for freshness tracking.

    Tags are timestamps stored as key-value pairs, used to track
    when resources were last updated and determine if processing
    is needed.

    Layout: tags/{tenant}/{key}

    This store has the "tags/{tenant}" key prefix set, so clients must use
    relative paths from there.
    """

    store = Store[datetime, Literal[False]]

    def __init__(self, uri: Uri, tenant: str | None = None) -> None:
        uri = join_uri(uri, path.tag(tenant=tenant))
        store = get_store(uri, raise_on_nonexist=False)
        super().__init__(store)

    def is_latest(self, key: str, dependencies: Iterable[str]) -> bool:
        """
        Check if the tag is more recent than all dependencies.

        Args:
            key: Tag key to check
            dependencies: Tag keys that this key depends on

        Returns:
            True if key is newer than all dependencies, False otherwise
        """
        last_updated = self.get(key)
        if last_updated is None:
            return False
        updated_dependencies = [i for i in map(self.get, dependencies) if i]
        if not updated_dependencies:
            return False
        return all(last_updated > i for i in updated_dependencies)

    def set(self, key: str, timestamp: datetime | None = None) -> datetime:
        """Set a tag to the given timestamp (or now if not provided)."""
        ts = timestamp or datetime.now()
        self.put(key, ts)
        return ts

    def __repr__(self) -> str:
        return f"<{self.__class__.__name__}({self.store.uri})>"

is_latest(key, dependencies)

Check if the tag is more recent than all dependencies.

Parameters:

Name Type Description Default
key str

Tag key to check

required
dependencies Iterable[str]

Tag keys that this key depends on

required

Returns:

Type Description
bool

True if key is newer than all dependencies, False otherwise

Source code in ftm_lakehouse/storage/tags.py
def is_latest(self, key: str, dependencies: Iterable[str]) -> bool:
    """
    Check if the tag is more recent than all dependencies.

    Args:
        key: Tag key to check
        dependencies: Tag keys that this key depends on

    Returns:
        True if key is newer than all dependencies, False otherwise
    """
    last_updated = self.get(key)
    if last_updated is None:
        return False
    updated_dependencies = [i for i in map(self.get, dependencies) if i]
    if not updated_dependencies:
        return False
    return all(last_updated > i for i in updated_dependencies)

set(key, timestamp=None)

Set a tag to the given timestamp (or now if not provided).

Source code in ftm_lakehouse/storage/tags.py
def set(self, key: str, timestamp: datetime | None = None) -> datetime:
    """Set a tag to the given timestamp (or now if not provided)."""
    ts = timestamp or datetime.now()
    self.put(key, ts)
    return ts

QueueStore

CRUD action queue for async processing.

ftm_lakehouse.storage.QueueStore

Bases: Queue

CRUD action queue for ordered mutation log.

All mutations (entity upsert/delete, file archive, mapping updates) go through this queue, ordered by UUID7 timestamp.

Layout: queue/{tenant}/{uuid7}.json

This store has the "queue/{tenant}" key prefix set, so clients must use relative paths from there.

Source code in ftm_lakehouse/storage/queue.py
class QueueStore(Queue):
    """
    CRUD action queue for ordered mutation log.

    All mutations (entity upsert/delete, file archive, mapping updates)
    go through this queue, ordered by UUID7 timestamp.

    Layout: queue/{tenant}/{uuid7}.json

    This store has the "queue/{tenant}" key prefix set, so clients must use
    relative paths from there.
    """

    def __init__(self, uri: Uri, model: Type[M], tenant: str | None = None) -> None:
        uri = join_uri(uri, path.queue(tenant))
        store = get_store(uri)
        super().__init__(store, model)