Skip to content

Layer 2: Storage

Single-purpose storage interfaces. Each store does one thing.

JournalStore

SQL statement buffer for write-ahead logging.

ftm_lakehouse.storage.JournalStore = SqlJournalStore module-attribute

ParquetStore

Delta Lake parquet storage for statements, partitioned by (shard, bucket, origin). Writes are append-only; deduplication, first_seen folding, and tombstone reaping happen in three independent async ops (compact / merge / vacuum), all coordinated by a dataset-wide write fence.

ftm_lakehouse.storage.ParquetStore

Bases: LakehouseApiMixin

Single Delta Lake table (per dataset) partitioned by (shard, bucket, origin).

Writes are append-only: append sorts a per-partition batch in memory and writes one parquet file. Reads delegate to an ftmq LakeStore with view_filter=deleted_at IS NULL (filters tombstones at query time; merge drops them physically once past grace).

Source code in ftm_lakehouse/storage/parquet.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
class ParquetStore(LakehouseApiMixin):
    """Single Delta Lake table (per dataset) partitioned by ``(shard, bucket,
    origin)``.

    Writes are append-only: ``append`` sorts a per-partition batch in memory
    and writes one parquet file. Reads delegate to an ftmq ``LakeStore`` with
    ``view_filter=deleted_at IS NULL`` (filters tombstones at query time;
    merge drops them physically once past grace).
    """

    def __init__(self, uri: Uri, dataset: str, shards: int | None = None) -> None:
        self.uri = join_uri(uri, path.STATEMENTS)
        super().__init__(self.uri)
        self.settings = Settings()
        self.dataset = dataset
        self.shards = shards if shards is not None else self.settings.entity_shards
        self._store = get_store(uri)
        self._lake = LakeStore(
            uri=str(self.uri),
            dataset=self.dataset,
            partition_by=PARTITIONS,
            view_filter=VIEW_FILTER,
        )
        self.log = get_logger(
            f"{self.dataset}.{self.__class__.__name__}",
            dataset=self.dataset,
            uri=mask_uri(self.uri),
        )
        setup_duckdb_storage()

    @property
    def deltatable(self) -> DeltaTable:
        return self._lake.deltatable

    @cached_property
    def _duckdb(self) -> duckdb.DuckDBPyConnection:
        """DuckDB connection with the Delta table registered as a view.

        ``register_view`` uses ``delta_scan`` so the view resolves the current
        Delta log on every query – registering once per store is enough; the
        view stays in sync with subsequent ``write_deltalake`` commits.

        DuckDB's :class:`~duckdb.DuckDBPyConnection` is *not* thread-safe;
        callers must not query this connection directly. Use
        :meth:`_cursor` to get a thread-isolated child connection that
        shares the catalog, the loaded Delta extension, and the
        registered view.
        """
        con = make_duckdb()
        register_view(con, self.deltatable)
        return con

    @contextmanager
    def _cursor(self) -> Iterator[duckdb.DuckDBPyConnection]:
        """Yield a thread-isolated DuckDB cursor.

        Per the DuckDB Python docs, ``DuckDBPyConnection.cursor()`` returns
        a separate connection sharing the underlying database (catalog,
        loaded extensions, registered views), which is the supported way
        to run concurrent queries from multiple threads against one
        ``ParquetStore``.

        Use as ``with self._cursor() as cur:`` for any synchronous query
        on the cached connection. Generators that need the cursor alive
        for streaming results should pin it in their closure so it isn't
        closed before consumption finishes.
        """
        cur = self._duckdb.cursor()
        try:
            yield cur
        finally:
            cur.close()

    @property
    def version(self) -> int | None:
        """Current version of the main Delta table."""
        if self._lake.exists:
            return self._lake.deltatable.version()

    @property
    def exists(self) -> bool:
        """Check existence of deltatable"""
        return self._lake.exists

    @no_api
    def view(self) -> LakeQueryView:
        """Get a view for querying statements."""
        return self._lake.default_view()

    @no_api
    def get(self, entity_id: str) -> StatementEntity | None:
        """Lookup an Entity by its ID"""
        stmts = list(self.get_statements(entity_id))
        if stmts:
            return StatementEntity.from_statements(make_dataset(self.dataset), stmts)

    @no_api
    def query(self, q: Query | None = None) -> StatementEntities:
        """
        Query Entities from the store.

        Args:
            q: Optional Query object with filters

        Yields:
            StatementEntity objects matching the query
        """
        sql = (q or Query()).sql.statements
        for data in self._query_data(sql):
            yield data.to_entity()

    @no_api
    def query_statements(self, q: Select | None = None) -> Statements:
        """
        Query ordered Statements from the store.

        Args:
            q: Optional SQLAlchemy query (default: Query().sql.statements)

        Yields:
            Statement objects matching the query
        """
        for stmt_dict in self._query_statement_data(q):
            yield Statement.from_dict(stmt_dict)

    @no_api
    def get_statements(self, entity_id: str) -> Statements:
        """Query all live statements for a single entity.

        Uses the shard partition for efficient pruning.
        """
        if not self.exists:
            return
        shard = path.entity_shard(entity_id, self.shards)
        q = select(TABLE).where(TABLE.c.shard == shard, TABLE.c.entity_id == entity_id)
        yield from self.query_statements(q)

    @no_api
    def stats(self) -> DatasetStats:
        """Compute statistics from the statement store."""
        return self.view().stats()

    def _write_lock(self) -> Lock:
        """Dataset-wide write fence.

        All Delta writers (``append``, ``merge``, ``compact``, ``vacuum``)
        acquire this lock so they can't race on the same partition. The lock
        lives at ``{dataset_root}/.LOCK`` per ``path.LOCK``.
        """
        return Lock(self._store, key=path.LOCK)

    @no_api
    def unlock(self) -> bool:
        """Forcibly release the dataset write fence.

        Operator escape hatch for the case where a writer process died
        with the lock held (or an attacker held it on purpose). The lock
        is just a file at ``{dataset_root}/.LOCK``; this method deletes
        it.

        **Use sparingly** – breaking a lock that's still held by a live
        writer can corrupt a write in flight. Confirm no process is
        actively writing before running.

        Returns:
            ``True`` if a lock was released, ``False`` if no lock was
            held.
        """
        if not self._store.exists(path.LOCK):
            return False
        self._store.delete(path.LOCK)
        return True

    @no_api
    def append(self, batch: pa.Table) -> None:
        """Append a sorted batch of statements.

        The batch should be scoped to a single ``shard`` for write efficiency
        (one parquet file per ``(shard, bucket, origin)`` partition). The
        method sorts by ``(bucket, origin, entity_id, id, last_seen DESC)``
        then splits by ``bucket`` so each ``write_deltalake`` call uses the
        bucket-appropriate ``writer_properties`` (small vs. large profile).
        Duplicates land as separate rows and are reaped by :meth:`merge`.

        Held under the dataset write fence so concurrent :meth:`merge` /
        :meth:`compact` / :meth:`vacuum` can't tombstone an in-flight append.

        Args:
            batch: PyArrow table with the columns of
                :data:`ftm_lakehouse.model.statement.SHARDED_SCHEMA`. Rows
                should already be scoped to a single shard.
        """
        if len(batch) == 0:
            return

        batch = batch.sort_by(
            [
                ("bucket", "ascending"),
                ("origin", "ascending"),
                ("entity_id", "ascending"),
                ("id", "ascending"),
                ("last_seen", "descending"),
            ]
        )
        with self._write_lock():
            mode = "append" if self.exists else "overwrite"
            for bucket in pc.unique(batch["bucket"]).to_pylist():
                sub = batch.filter(pc.equal(batch["bucket"], bucket))
                write_deltalake(
                    str(self.uri),
                    sub,
                    partition_by=PARTITIONS,
                    mode=mode,
                    writer_properties=writer_for_bucket(bucket),
                    storage_options=storage_options(),
                )
                # After the first sub-batch, the table exists for subsequent buckets.
                mode = "append"

    @no_api
    def merge(self, grace_period_days: int | None = None) -> None:
        """Collapse duplicates and reap expired tombstones, partition by partition.

        For each ``(shard, bucket, origin)`` partition, runs the merge query
        (keep latest row per ``id`` by ``last_seen DESC``; fold ``first_seen``
        to the min; drop tombstones older than the grace cutoff) and atomically
        overwrites that partition via ``partition_filters``. Held under the
        dataset write fence (``path.LOCK``).

        Args:
            grace_period_days: Override ``settings.grace_period_days``. Pass
                ``0`` to drop tombstones immediately.
        """
        if not self.exists:
            return
        days = (
            grace_period_days
            if grace_period_days is not None
            else self.settings.grace_period_days
        )
        grace_cutoff = datetime.now(timezone.utc) - timedelta(days=days)
        with self._write_lock():
            for shard, bucket, origin in self._list_partitions():
                merge_select = build_merge_query(shard, bucket, origin, grace_cutoff)
                sql = str(merge_select.compile(compile_kwargs={"literal_binds": True}))
                with self._cursor() as cur:
                    # ``to_arrow_reader`` yields a pyarrow RecordBatchReader
                    # that DuckDB streams lazily from its execution
                    # pipeline; ``write_deltalake`` consumes the reader
                    # batch by batch, so the merge never materialises the
                    # full partition in Python memory.
                    reader = cur.execute(sql).to_arrow_reader()
                    write_deltalake(
                        str(self.uri),
                        reader,
                        mode="overwrite",
                        partition_by=PARTITIONS,
                        predicate=(
                            f"shard = '{shard}' AND bucket = '{bucket}' "
                            f"AND origin = '{origin}'"
                        ),
                        writer_properties=writer_for_bucket(bucket),
                        storage_options=storage_options(),
                    )

    @no_api
    def compact(self) -> None:
        """Bin-pack small parquet files within each partition.

        Cheap maintenance – Delta's ``OPTIMIZE compact`` only rewrites small
        files into larger ones; it does not collapse duplicate rows or drop
        tombstones (use :meth:`merge` for that). Held under the dataset write
        fence (``path.LOCK``).
        """
        if not self.exists:
            return
        with self._write_lock():
            for shard, bucket, origin in self._list_partitions():
                self.deltatable.optimize.compact(
                    partition_filters=[
                        ("shard", "=", shard),
                        ("bucket", "=", bucket),
                        ("origin", "=", origin),
                    ],
                    writer_properties=writer_for_bucket(bucket),
                )

    @no_api
    def vacuum(self, retention_hours: int = 0) -> None:
        """Delete obsolete parquet files no longer referenced by the Delta log.

        Tombstoned files (replaced by :meth:`merge` / :meth:`compact`) become
        orphans on disk; vacuum prunes them once they're past
        ``retention_hours``. Held under the dataset write fence
        (``path.LOCK``).

        Args:
            retention_hours: Keep files newer than this many hours. ``0``
                drops every file the Delta log no longer references.
        """
        if not self.exists:
            return
        with self._write_lock():
            self.deltatable.vacuum(
                retention_hours=retention_hours,
                dry_run=False,
                enforce_retention_duration=False,
            )

    @no_api
    def export_csv(self, key: str, q: Select | None = None) -> None:
        """Export statements to a sorted CSV file."""
        if not self.exists:
            return
        items = self._query_statement_data(q)
        with self._store.open(key, "w") as f:
            smart_write_csv(f, items)

    @no_api
    def get_changed_entity_ids(
        self,
        since: datetime,
        schemata: list[str] | None = None,
        prop: str | None = None,
    ) -> Iterator[str]:
        """Get entity IDs touched since a timestamp.

        Catches both *new* / *modified* statements (``first_seen >= since``)
        and *deleted* ones (``deleted_at >= since``) – the latter so the diff
        consumer can emit DEL ops for entities whose tombstone landed after
        the last diff state.
        """
        if not self.exists:
            return
        from sqlalchemy import or_

        since_truncated = since.replace(microsecond=0)
        sql = (
            select(TABLE)
            .distinct(TABLE.c.entity_id)
            .where(
                or_(
                    TABLE.c.first_seen >= since_truncated,
                    TABLE.c.deleted_at >= since_truncated,
                )
            )
        )
        if schemata:
            sql = sql.where(TABLE.c.schema.in_(schemata))
        if prop:
            sql = sql.where(TABLE.c.prop == prop)
        for shard in self._iter_shards():
            for row in stream_duckdb(sql.where(shard), self.deltatable):
                yield row.entity_id

    @no_api
    def destroy(self) -> None:
        """
        Destroy the deltalake by removing the transaction log in "_delta_log"
        directory. This is soft deleting, as the parquet files remain (but will
        be cleaned up on optimize --vacuum)
        """
        with Took() as t:
            self.log.warn("🔥 Destroying deltalake store ...")
            for key in self._lake._backend.iterate_keys("_delta_log"):
                self._lake._backend.delete(key)
        self.log.info("Deleted statement store.", took=t.took)

    def _iter_shards(self) -> Iterator[ColumnElement]:
        """Get existing shard keys as Sqlalchemy predicates.

        Returns free-column predicates (not bound to ``TABLE``) so they can be
        added to queries built on ftmq's table object without dragging in a
        second same-named ``Table`` reference (which would yield
        ``FROM x, x`` in the rendered SQL).
        """
        shard_col = column("shard")
        q = select(shard_col).select_from(TABLE).distinct()
        for row in stream_duckdb(q, self.deltatable):
            yield shard_col == row.shard

    def _list_partitions(self) -> list[tuple[str, str, str]]:
        """List all ``(shard, bucket, origin)`` triples currently in the table."""
        if not self.exists:
            return []
        with self._cursor() as cur:
            rows = cur.execute(
                f"SELECT DISTINCT shard, bucket, origin FROM {TABLE.name} "
                "ORDER BY shard, bucket, origin"
            ).fetchall()
        return [(s, b, o) for s, b, o in rows]

    def _query_statement_data(self, q: Select | None = None) -> Iterator[StatementDict]:
        """
        Query statement dicts, bypassing FtM object construction.

        Args:
            q: Optional SQLAlchemy select (default: Query().sql.statements)

        Yields:
            StatementDict instances
        """
        if q is None:
            q = Query().sql.statements
        for shard in self._iter_shards():
            for row in stream_duckdb(q.where(shard), self.deltatable, VIEW_FILTER):
                yield StatementDict(**vars(row))

    def _query_data(self, q: Select | None = None) -> Iterator[EntityPayload]:
        """
        Query entity dicts via aggregate_unsafe(), bypassing FtM object construction.

        Args:
            q: Optional SQLAlchemy select (default: Query().sql.statements)

        Yields:
            EntityPayload instances
        """
        if not self.exists:
            return
        yield from aggregate_unsafe(self._query_statement_data(q), self.dataset)

exists property

Check existence of deltatable

version property

Current version of the main Delta table.

append(batch)

Append a sorted batch of statements.

The batch should be scoped to a single shard for write efficiency (one parquet file per (shard, bucket, origin) partition). The method sorts by (bucket, origin, entity_id, id, last_seen DESC) then splits by bucket so each write_deltalake call uses the bucket-appropriate writer_properties (small vs. large profile). Duplicates land as separate rows and are reaped by :meth:merge.

Held under the dataset write fence so concurrent :meth:merge / :meth:compact / :meth:vacuum can't tombstone an in-flight append.

Parameters:

Name Type Description Default
batch Table

PyArrow table with the columns of :data:ftm_lakehouse.model.statement.SHARDED_SCHEMA. Rows should already be scoped to a single shard.

required
Source code in ftm_lakehouse/storage/parquet.py
@no_api
def append(self, batch: pa.Table) -> None:
    """Append a sorted batch of statements.

    The batch should be scoped to a single ``shard`` for write efficiency
    (one parquet file per ``(shard, bucket, origin)`` partition). The
    method sorts by ``(bucket, origin, entity_id, id, last_seen DESC)``
    then splits by ``bucket`` so each ``write_deltalake`` call uses the
    bucket-appropriate ``writer_properties`` (small vs. large profile).
    Duplicates land as separate rows and are reaped by :meth:`merge`.

    Held under the dataset write fence so concurrent :meth:`merge` /
    :meth:`compact` / :meth:`vacuum` can't tombstone an in-flight append.

    Args:
        batch: PyArrow table with the columns of
            :data:`ftm_lakehouse.model.statement.SHARDED_SCHEMA`. Rows
            should already be scoped to a single shard.
    """
    if len(batch) == 0:
        return

    batch = batch.sort_by(
        [
            ("bucket", "ascending"),
            ("origin", "ascending"),
            ("entity_id", "ascending"),
            ("id", "ascending"),
            ("last_seen", "descending"),
        ]
    )
    with self._write_lock():
        mode = "append" if self.exists else "overwrite"
        for bucket in pc.unique(batch["bucket"]).to_pylist():
            sub = batch.filter(pc.equal(batch["bucket"], bucket))
            write_deltalake(
                str(self.uri),
                sub,
                partition_by=PARTITIONS,
                mode=mode,
                writer_properties=writer_for_bucket(bucket),
                storage_options=storage_options(),
            )
            # After the first sub-batch, the table exists for subsequent buckets.
            mode = "append"

compact()

Bin-pack small parquet files within each partition.

Cheap maintenance – Delta's OPTIMIZE compact only rewrites small files into larger ones; it does not collapse duplicate rows or drop tombstones (use :meth:merge for that). Held under the dataset write fence (path.LOCK).

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def compact(self) -> None:
    """Bin-pack small parquet files within each partition.

    Cheap maintenance – Delta's ``OPTIMIZE compact`` only rewrites small
    files into larger ones; it does not collapse duplicate rows or drop
    tombstones (use :meth:`merge` for that). Held under the dataset write
    fence (``path.LOCK``).
    """
    if not self.exists:
        return
    with self._write_lock():
        for shard, bucket, origin in self._list_partitions():
            self.deltatable.optimize.compact(
                partition_filters=[
                    ("shard", "=", shard),
                    ("bucket", "=", bucket),
                    ("origin", "=", origin),
                ],
                writer_properties=writer_for_bucket(bucket),
            )

destroy()

Destroy the deltalake by removing the transaction log in "_delta_log" directory. This is soft deleting, as the parquet files remain (but will be cleaned up on optimize --vacuum)

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def destroy(self) -> None:
    """
    Destroy the deltalake by removing the transaction log in "_delta_log"
    directory. This is soft deleting, as the parquet files remain (but will
    be cleaned up on optimize --vacuum)
    """
    with Took() as t:
        self.log.warn("🔥 Destroying deltalake store ...")
        for key in self._lake._backend.iterate_keys("_delta_log"):
            self._lake._backend.delete(key)
    self.log.info("Deleted statement store.", took=t.took)

export_csv(key, q=None)

Export statements to a sorted CSV file.

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def export_csv(self, key: str, q: Select | None = None) -> None:
    """Export statements to a sorted CSV file."""
    if not self.exists:
        return
    items = self._query_statement_data(q)
    with self._store.open(key, "w") as f:
        smart_write_csv(f, items)

get(entity_id)

Lookup an Entity by its ID

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def get(self, entity_id: str) -> StatementEntity | None:
    """Lookup an Entity by its ID"""
    stmts = list(self.get_statements(entity_id))
    if stmts:
        return StatementEntity.from_statements(make_dataset(self.dataset), stmts)

get_changed_entity_ids(since, schemata=None, prop=None)

Get entity IDs touched since a timestamp.

Catches both new / modified statements (first_seen >= since) and deleted ones (deleted_at >= since) – the latter so the diff consumer can emit DEL ops for entities whose tombstone landed after the last diff state.

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def get_changed_entity_ids(
    self,
    since: datetime,
    schemata: list[str] | None = None,
    prop: str | None = None,
) -> Iterator[str]:
    """Get entity IDs touched since a timestamp.

    Catches both *new* / *modified* statements (``first_seen >= since``)
    and *deleted* ones (``deleted_at >= since``) – the latter so the diff
    consumer can emit DEL ops for entities whose tombstone landed after
    the last diff state.
    """
    if not self.exists:
        return
    from sqlalchemy import or_

    since_truncated = since.replace(microsecond=0)
    sql = (
        select(TABLE)
        .distinct(TABLE.c.entity_id)
        .where(
            or_(
                TABLE.c.first_seen >= since_truncated,
                TABLE.c.deleted_at >= since_truncated,
            )
        )
    )
    if schemata:
        sql = sql.where(TABLE.c.schema.in_(schemata))
    if prop:
        sql = sql.where(TABLE.c.prop == prop)
    for shard in self._iter_shards():
        for row in stream_duckdb(sql.where(shard), self.deltatable):
            yield row.entity_id

get_statements(entity_id)

Query all live statements for a single entity.

Uses the shard partition for efficient pruning.

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def get_statements(self, entity_id: str) -> Statements:
    """Query all live statements for a single entity.

    Uses the shard partition for efficient pruning.
    """
    if not self.exists:
        return
    shard = path.entity_shard(entity_id, self.shards)
    q = select(TABLE).where(TABLE.c.shard == shard, TABLE.c.entity_id == entity_id)
    yield from self.query_statements(q)

merge(grace_period_days=None)

Collapse duplicates and reap expired tombstones, partition by partition.

For each (shard, bucket, origin) partition, runs the merge query (keep latest row per id by last_seen DESC; fold first_seen to the min; drop tombstones older than the grace cutoff) and atomically overwrites that partition via partition_filters. Held under the dataset write fence (path.LOCK).

Parameters:

Name Type Description Default
grace_period_days int | None

Override settings.grace_period_days. Pass 0 to drop tombstones immediately.

None
Source code in ftm_lakehouse/storage/parquet.py
@no_api
def merge(self, grace_period_days: int | None = None) -> None:
    """Collapse duplicates and reap expired tombstones, partition by partition.

    For each ``(shard, bucket, origin)`` partition, runs the merge query
    (keep latest row per ``id`` by ``last_seen DESC``; fold ``first_seen``
    to the min; drop tombstones older than the grace cutoff) and atomically
    overwrites that partition via ``partition_filters``. Held under the
    dataset write fence (``path.LOCK``).

    Args:
        grace_period_days: Override ``settings.grace_period_days``. Pass
            ``0`` to drop tombstones immediately.
    """
    if not self.exists:
        return
    days = (
        grace_period_days
        if grace_period_days is not None
        else self.settings.grace_period_days
    )
    grace_cutoff = datetime.now(timezone.utc) - timedelta(days=days)
    with self._write_lock():
        for shard, bucket, origin in self._list_partitions():
            merge_select = build_merge_query(shard, bucket, origin, grace_cutoff)
            sql = str(merge_select.compile(compile_kwargs={"literal_binds": True}))
            with self._cursor() as cur:
                # ``to_arrow_reader`` yields a pyarrow RecordBatchReader
                # that DuckDB streams lazily from its execution
                # pipeline; ``write_deltalake`` consumes the reader
                # batch by batch, so the merge never materialises the
                # full partition in Python memory.
                reader = cur.execute(sql).to_arrow_reader()
                write_deltalake(
                    str(self.uri),
                    reader,
                    mode="overwrite",
                    partition_by=PARTITIONS,
                    predicate=(
                        f"shard = '{shard}' AND bucket = '{bucket}' "
                        f"AND origin = '{origin}'"
                    ),
                    writer_properties=writer_for_bucket(bucket),
                    storage_options=storage_options(),
                )

query(q=None)

Query Entities from the store.

Parameters:

Name Type Description Default
q Query | None

Optional Query object with filters

None

Yields:

Type Description
StatementEntities

StatementEntity objects matching the query

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def query(self, q: Query | None = None) -> StatementEntities:
    """
    Query Entities from the store.

    Args:
        q: Optional Query object with filters

    Yields:
        StatementEntity objects matching the query
    """
    sql = (q or Query()).sql.statements
    for data in self._query_data(sql):
        yield data.to_entity()

query_statements(q=None)

Query ordered Statements from the store.

Parameters:

Name Type Description Default
q Select | None

Optional SQLAlchemy query (default: Query().sql.statements)

None

Yields:

Type Description
Statements

Statement objects matching the query

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def query_statements(self, q: Select | None = None) -> Statements:
    """
    Query ordered Statements from the store.

    Args:
        q: Optional SQLAlchemy query (default: Query().sql.statements)

    Yields:
        Statement objects matching the query
    """
    for stmt_dict in self._query_statement_data(q):
        yield Statement.from_dict(stmt_dict)

stats()

Compute statistics from the statement store.

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def stats(self) -> DatasetStats:
    """Compute statistics from the statement store."""
    return self.view().stats()

unlock()

Forcibly release the dataset write fence.

Operator escape hatch for the case where a writer process died with the lock held (or an attacker held it on purpose). The lock is just a file at {dataset_root}/.LOCK; this method deletes it.

Use sparingly – breaking a lock that's still held by a live writer can corrupt a write in flight. Confirm no process is actively writing before running.

Returns:

Type Description
bool

True if a lock was released, False if no lock was

bool

held.

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def unlock(self) -> bool:
    """Forcibly release the dataset write fence.

    Operator escape hatch for the case where a writer process died
    with the lock held (or an attacker held it on purpose). The lock
    is just a file at ``{dataset_root}/.LOCK``; this method deletes
    it.

    **Use sparingly** – breaking a lock that's still held by a live
    writer can corrupt a write in flight. Confirm no process is
    actively writing before running.

    Returns:
        ``True`` if a lock was released, ``False`` if no lock was
        held.
    """
    if not self._store.exists(path.LOCK):
        return False
    self._store.delete(path.LOCK)
    return True

vacuum(retention_hours=0)

Delete obsolete parquet files no longer referenced by the Delta log.

Tombstoned files (replaced by :meth:merge / :meth:compact) become orphans on disk; vacuum prunes them once they're past retention_hours. Held under the dataset write fence (path.LOCK).

Parameters:

Name Type Description Default
retention_hours int

Keep files newer than this many hours. 0 drops every file the Delta log no longer references.

0
Source code in ftm_lakehouse/storage/parquet.py
@no_api
def vacuum(self, retention_hours: int = 0) -> None:
    """Delete obsolete parquet files no longer referenced by the Delta log.

    Tombstoned files (replaced by :meth:`merge` / :meth:`compact`) become
    orphans on disk; vacuum prunes them once they're past
    ``retention_hours``. Held under the dataset write fence
    (``path.LOCK``).

    Args:
        retention_hours: Keep files newer than this many hours. ``0``
            drops every file the Delta log no longer references.
    """
    if not self.exists:
        return
    with self._write_lock():
        self.deltatable.vacuum(
            retention_hours=retention_hours,
            dry_run=False,
            enforce_retention_duration=False,
        )

view()

Get a view for querying statements.

Source code in ftm_lakehouse/storage/parquet.py
@no_api
def view(self) -> LakeQueryView:
    """Get a view for querying statements."""
    return self._lake.default_view()

TagStore

Key-value freshness tracking.

ftm_lakehouse.storage.TagStore

Bases: Tags

Key-value store for freshness tracking.

Tags are timestamps stored as key-value pairs, used to track when resources were last updated and determine if processing is needed.

Layout: tags/{tenant}/{key}

This store has the "tags/{tenant}" key prefix set, so clients must use relative paths from there.

Source code in ftm_lakehouse/storage/tags.py
class TagStore(AnyTags):
    """
    Key-value store for freshness tracking.

    Tags are timestamps stored as key-value pairs, used to track
    when resources were last updated and determine if processing
    is needed.

    Layout: tags/{tenant}/{key}

    This store has the "tags/{tenant}" key prefix set, so clients must use
    relative paths from there.
    """

    store = Store[datetime, Literal[False]]

    def __init__(self, uri: Uri, tenant: str | None = None) -> None:
        uri = join_uri(uri, path.tag(tenant=tenant))
        store = get_store(uri, raise_on_nonexist=False)
        super().__init__(store)

    def is_latest(self, key: str, dependencies: Iterable[str]) -> bool:
        """
        Check if the tag is more recent than all dependencies.

        Args:
            key: Tag key to check
            dependencies: Tag keys that this key depends on

        Returns:
            True if key is newer than all dependencies, False otherwise
        """
        last_updated = self.get(key)
        if last_updated is None:
            return False
        updated_dependencies = [i for i in map(self.get, dependencies) if i]
        if not updated_dependencies:
            return False
        return all(last_updated > i for i in updated_dependencies)

    def set(self, key: str, timestamp: datetime | None = None) -> datetime:
        """Set a tag to the given timestamp (or now if not provided)."""
        ts = timestamp or datetime.now()
        self.put(key, ts)
        return ts

    def __repr__(self) -> str:
        return f"<{self.__class__.__name__}({mask_uri(self.store.uri)})>"

is_latest(key, dependencies)

Check if the tag is more recent than all dependencies.

Parameters:

Name Type Description Default
key str

Tag key to check

required
dependencies Iterable[str]

Tag keys that this key depends on

required

Returns:

Type Description
bool

True if key is newer than all dependencies, False otherwise

Source code in ftm_lakehouse/storage/tags.py
def is_latest(self, key: str, dependencies: Iterable[str]) -> bool:
    """
    Check if the tag is more recent than all dependencies.

    Args:
        key: Tag key to check
        dependencies: Tag keys that this key depends on

    Returns:
        True if key is newer than all dependencies, False otherwise
    """
    last_updated = self.get(key)
    if last_updated is None:
        return False
    updated_dependencies = [i for i in map(self.get, dependencies) if i]
    if not updated_dependencies:
        return False
    return all(last_updated > i for i in updated_dependencies)

set(key, timestamp=None)

Set a tag to the given timestamp (or now if not provided).

Source code in ftm_lakehouse/storage/tags.py
def set(self, key: str, timestamp: datetime | None = None) -> datetime:
    """Set a tag to the given timestamp (or now if not provided)."""
    ts = timestamp or datetime.now()
    self.put(key, ts)
    return ts

QueueStore

CRUD action queue for async processing.

ftm_lakehouse.storage.QueueStore

Bases: Queue

CRUD action queue for ordered mutation log.

All mutations (entity upsert/delete, file archive, mapping updates) go through this queue, ordered by UUID7 timestamp.

Layout: queue/{tenant}/{uuid7}.json

This store has the "queue/{tenant}" key prefix set, so clients must use relative paths from there.

Source code in ftm_lakehouse/storage/queue.py
class QueueStore(Queue):
    """
    CRUD action queue for ordered mutation log.

    All mutations (entity upsert/delete, file archive, mapping updates)
    go through this queue, ordered by UUID7 timestamp.

    Layout: queue/{tenant}/{uuid7}.json

    This store has the "queue/{tenant}" key prefix set, so clients must use
    relative paths from there.
    """

    def __init__(self, uri: Uri, model: Type[M], tenant: str | None = None) -> None:
        uri = join_uri(uri, path.queue(tenant))
        store = get_store(uri)
        super().__init__(store, model)