Skip to content

Layer 3: Repository

Domain-specific combinations of multiple stores. Each repository owns one domain concept.

ArchiveRepository

Content-addressed file archive with metadata and extracted text storage.

dataset.archive.put(uri)
dataset.archive.get(checksum)
dataset.archive.stream(file)

ftm_lakehouse.repository.ArchiveRepository

Bases: BaseRepository

Repository for file archive operations.

Combines content-addressed blob storage (raw bytes) and model-based metadata storage (JSON) to provide file archiving.

Blobs are stored once per checksum, but each unique source path creates its own metadata file (keyed by File.id).

Optionally, extracted text (by different origins) can be stored and retrieved. As well, other programs can write arbitrary additional data to the archive (such as pdf page thumbnails).

Example
archive = ArchiveRepository(dataset="my_data", uri="s3://bucket/dataset")

# Archive a file
file = archive.store("path/to/file.pdf")

# Retrieve file info
file = archive.get_file(checksum)

# Stream file contents by checksum
for chunk in archive.stream(file.checksum):
    process(chunk)
Source code in ftm_lakehouse/repository/archive.py
class ArchiveRepository(BaseRepository):
    """
    Repository for file archive operations.

    Combines content-addressed blob storage (raw bytes) and model-based
    metadata storage (JSON) to provide file archiving.

    Blobs are stored once per checksum, but each unique source path
    creates its own metadata file (keyed by File.id).

    Optionally, extracted text (by different origins) can be stored and
    retrieved. As well, other programs can write arbitrary additional data to
    the archive (such as pdf page thumbnails).

    Example:
        ```python
        archive = ArchiveRepository(dataset="my_data", uri="s3://bucket/dataset")

        # Archive a file
        file = archive.store("path/to/file.pdf")

        # Retrieve file info
        file = archive.get_file(checksum)

        # Stream file contents by checksum
        for chunk in archive.stream(file.checksum):
            process(chunk)
        ```
    """

    def __init__(self, dataset: str, uri: Uri) -> None:
        super().__init__(dataset, uri)
        self._store = get_store(
            self._store_uri, serialization_mode="raw", raise_on_nonexist=True
        )
        self._files = get_store(self._store_uri, model=File, raise_on_nonexist=True)
        self._txts = get_store(
            self._store_uri, serialization_mode="auto", raise_on_nonexist=False
        )

    def exists(self, checksum: str) -> bool:
        """Check if blob exists for the given checksum."""
        return self._store.exists(path.archive_blob(checksum))

    def get_file(self, checksum: str, file_id: str | None = None) -> File:
        """
        Get file metadata for the given checksum.

        Args:
            checksum: SHA256 checksum of file
            file_id: Optional File.id to get specific metadata

        Raises:
            FileNotFoundError: When no metadata file exists
        """
        if file_id is not None:
            key = path.archive_meta(checksum, file_id)
            return self._files.get(key)

        # Return first found metadata
        for file in self.get_all_files(checksum):
            return file
        raise FileNotFoundError(checksum)

    def get_all_files(self, checksum: str) -> Files:
        """
        Iterate all metadata files for the given checksum.

        Multiple crawlers may have archived the same file content from
        different source paths, each creating their own metadata file.
        """
        prefix = path.archive_prefix(checksum)
        yield from self._files.iterate_values(prefix, glob="*.json")

    def iterate_files(self) -> Files:
        """Iterate all file metadata in the archive."""
        yield from self._files.iterate_values(path.ARCHIVE, glob="**/*.json")

    def put_file(self, file: File) -> File:
        """Store file metadata object."""
        file.store = str(self.uri)
        file.dataset = self.dataset
        self._files.put(file.meta_path, file)
        return file

    def stream(self, checksum: str) -> BytesGenerator:
        """Stream blob contents as bytes."""
        yield from self._store.stream(path.archive_blob(checksum))

    def open(self, checksum: str) -> ContextManager[IO[bytes]]:
        """Get a file-like handle for reading."""
        return self._store.open(path.archive_blob(checksum), mode=DEFAULT_MODE)

    def to_uri(self, checksum: str) -> str:
        return self._store.to_uri(path.archive_blob(checksum))

    def local_path(self, checksum: str) -> ContextManager[Path]:
        """
        Get the local path to the blob.

        If storage is local, returns actual path. Otherwise, creates
        a temporary local copy that is cleaned up after context exit.
        """
        return self._store.local_path(path.archive_blob(checksum))

    def store(
        self,
        uri: Uri,
        file: File | None = None,
        checksum: str | None = None,
        **metadata: Any,
    ) -> File:
        """
        Archive a file from a local or remote URI.

        The blob is stored once per checksum, but each unique source path
        creates its own metadata file (keyed by File.id).

        Args:
            uri: Local or remote URI to the file
            file: Optional metadata file object to patch
            checksum: Content hash (skip computation if provided)
            **metadata: Additional data to store in file's extra field, including
                FollowTheMoney properties for the `Document` schema

        Returns:
            File metadata object
        """
        resource = UriResource(uri)

        # store bytes blob (skipped if already exists)
        checksum = self.store_blob(uri, checksum)

        # file metadata
        if file is None:
            info = resource.info()
            file = File.from_info(info, checksum)

        file.checksum = checksum

        for key in list(metadata.keys()):
            if key in file.__class__.model_fields:
                setattr(file, key, metadata.pop(key))
        file.extra = clean_dict(metadata)
        file.dataset = self.dataset
        file.origin = file.origin or ARCHIVE_ORIGIN

        # Store metadata
        self._files.put(file.meta_path, file)
        # Notify archive was updated
        self._tags.set(tag.ARCHIVE_UPDATED)

        self.log.info(
            f"Archived `{file.key} ({file.checksum})`",
            checksum=file.checksum,
        )

        return file

    def store_blob(self, uri: Uri, checksum: str | None = None) -> str:
        """
        Store bytes blob from given uri if it doesn't exist yet.

        Args:
            uri: Local or remote URI to the file
            checksum: Content hash (skip computation if provided)

        Returns:
            checksum
        """
        if checksum:
            validate_checksum(checksum)
        if checksum and self.exists(checksum):
            self.log.debug("Blob already exists, skipping", checksum=checksum)
            return checksum

        with open_virtual(uri, algorithm=CHECKSUM_ALGORITHM) as fh:
            if self.exists(fh.checksum):
                self.log.debug("Blob already exists, skipping", checksum=fh.checksum)
                return fh.checksum

            self.log.info(f"Storing blob `{fh.checksum}` ...", checksum=fh.checksum)
            self.write_blob(fh, fh.checksum)
            return fh.checksum

    def write_blob(self, fh: BinaryIO, checksum: str | None = None) -> str:
        """Write a blob from the given open file-handler"""
        if checksum and self.exists(checksum):
            self.log.debug("Blob already exists, skipping", checksum=checksum)
            return checksum
        if not checksum:
            checksum = make_checksum(fh)
            if self.exists(checksum):
                self.log.debug("Blob already exists, skipping", checksum=checksum)
                return checksum
            fh.seek(0)
        with self._store.open(path.archive_blob(checksum), "wb") as out:
            stream(fh, out, CHUNK_SIZE_LARGE)
        return checksum

    def delete(self, file: File) -> None:
        """
        Delete a file's metadata from the archive.

        The blob is never deleted. (FIXME)
        """
        self.log.warning(
            "Deleting file metadata",
            checksum=file.checksum,
            file_id=file.id,
        )
        self._files.delete(file.meta_path)

    def put_txt(self, checksum: str, text: str, origin: str = DEFAULT_ORIGIN) -> None:
        """Store extracted text for a file.

        Raises:
            ValueError: If ``checksum`` is not a valid SHA256 hex digest or
                ``origin`` is not a safe path component
                (see :func:`ftm_lakehouse.util.validate_origin`).
        """
        origin = origin or DEFAULT_ORIGIN
        key = path.archive_txt(checksum, origin)
        self._txts.put(key, text)

    def get_txt(self, checksum: str, origin: str | None = None) -> str | None:
        """Get extracted text for a file. If `origin`, get by this specific
        extraction, otherwise get the first txt value (no guaranteed order)"""
        if origin:
            key = path.archive_txt(checksum, origin)
            return self._txts.get(key)
        for value in self._txts.iterate_values(
            prefix=path.archive_prefix(checksum), glob="*.txt"
        ):
            return value

    def put_data(self, checksum: str, path: str, data: bytes) -> None:
        """Store raw data at the given path"""
        key = join_relpaths(make_checksum_key(checksum), path)
        self._store.put(key, data)

    def get_data(self, checksum: str, path: str) -> bytes:
        """Get raw data at the given path"""
        key = join_relpaths(make_checksum_key(checksum), path)
        return self._store.get(key)

delete(file)

Delete a file's metadata from the archive.

The blob is never deleted. (FIXME)

Source code in ftm_lakehouse/repository/archive.py
def delete(self, file: File) -> None:
    """
    Delete a file's metadata from the archive.

    The blob is never deleted. (FIXME)
    """
    self.log.warning(
        "Deleting file metadata",
        checksum=file.checksum,
        file_id=file.id,
    )
    self._files.delete(file.meta_path)

exists(checksum)

Check if blob exists for the given checksum.

Source code in ftm_lakehouse/repository/archive.py
def exists(self, checksum: str) -> bool:
    """Check if blob exists for the given checksum."""
    return self._store.exists(path.archive_blob(checksum))

get_all_files(checksum)

Iterate all metadata files for the given checksum.

Multiple crawlers may have archived the same file content from different source paths, each creating their own metadata file.

Source code in ftm_lakehouse/repository/archive.py
def get_all_files(self, checksum: str) -> Files:
    """
    Iterate all metadata files for the given checksum.

    Multiple crawlers may have archived the same file content from
    different source paths, each creating their own metadata file.
    """
    prefix = path.archive_prefix(checksum)
    yield from self._files.iterate_values(prefix, glob="*.json")

get_data(checksum, path)

Get raw data at the given path

Source code in ftm_lakehouse/repository/archive.py
def get_data(self, checksum: str, path: str) -> bytes:
    """Get raw data at the given path"""
    key = join_relpaths(make_checksum_key(checksum), path)
    return self._store.get(key)

get_file(checksum, file_id=None)

Get file metadata for the given checksum.

Parameters:

Name Type Description Default
checksum str

SHA256 checksum of file

required
file_id str | None

Optional File.id to get specific metadata

None

Raises:

Type Description
FileNotFoundError

When no metadata file exists

Source code in ftm_lakehouse/repository/archive.py
def get_file(self, checksum: str, file_id: str | None = None) -> File:
    """
    Get file metadata for the given checksum.

    Args:
        checksum: SHA256 checksum of file
        file_id: Optional File.id to get specific metadata

    Raises:
        FileNotFoundError: When no metadata file exists
    """
    if file_id is not None:
        key = path.archive_meta(checksum, file_id)
        return self._files.get(key)

    # Return first found metadata
    for file in self.get_all_files(checksum):
        return file
    raise FileNotFoundError(checksum)

get_txt(checksum, origin=None)

Get extracted text for a file. If origin, get by this specific extraction, otherwise get the first txt value (no guaranteed order)

Source code in ftm_lakehouse/repository/archive.py
def get_txt(self, checksum: str, origin: str | None = None) -> str | None:
    """Get extracted text for a file. If `origin`, get by this specific
    extraction, otherwise get the first txt value (no guaranteed order)"""
    if origin:
        key = path.archive_txt(checksum, origin)
        return self._txts.get(key)
    for value in self._txts.iterate_values(
        prefix=path.archive_prefix(checksum), glob="*.txt"
    ):
        return value

iterate_files()

Iterate all file metadata in the archive.

Source code in ftm_lakehouse/repository/archive.py
def iterate_files(self) -> Files:
    """Iterate all file metadata in the archive."""
    yield from self._files.iterate_values(path.ARCHIVE, glob="**/*.json")

local_path(checksum)

Get the local path to the blob.

If storage is local, returns actual path. Otherwise, creates a temporary local copy that is cleaned up after context exit.

Source code in ftm_lakehouse/repository/archive.py
def local_path(self, checksum: str) -> ContextManager[Path]:
    """
    Get the local path to the blob.

    If storage is local, returns actual path. Otherwise, creates
    a temporary local copy that is cleaned up after context exit.
    """
    return self._store.local_path(path.archive_blob(checksum))

open(checksum)

Get a file-like handle for reading.

Source code in ftm_lakehouse/repository/archive.py
def open(self, checksum: str) -> ContextManager[IO[bytes]]:
    """Get a file-like handle for reading."""
    return self._store.open(path.archive_blob(checksum), mode=DEFAULT_MODE)

put_data(checksum, path, data)

Store raw data at the given path

Source code in ftm_lakehouse/repository/archive.py
def put_data(self, checksum: str, path: str, data: bytes) -> None:
    """Store raw data at the given path"""
    key = join_relpaths(make_checksum_key(checksum), path)
    self._store.put(key, data)

put_file(file)

Store file metadata object.

Source code in ftm_lakehouse/repository/archive.py
def put_file(self, file: File) -> File:
    """Store file metadata object."""
    file.store = str(self.uri)
    file.dataset = self.dataset
    self._files.put(file.meta_path, file)
    return file

put_txt(checksum, text, origin=DEFAULT_ORIGIN)

Store extracted text for a file.

Raises:

Type Description
ValueError

If checksum is not a valid SHA256 hex digest or origin is not a safe path component (see :func:ftm_lakehouse.util.validate_origin).

Source code in ftm_lakehouse/repository/archive.py
def put_txt(self, checksum: str, text: str, origin: str = DEFAULT_ORIGIN) -> None:
    """Store extracted text for a file.

    Raises:
        ValueError: If ``checksum`` is not a valid SHA256 hex digest or
            ``origin`` is not a safe path component
            (see :func:`ftm_lakehouse.util.validate_origin`).
    """
    origin = origin or DEFAULT_ORIGIN
    key = path.archive_txt(checksum, origin)
    self._txts.put(key, text)

store(uri, file=None, checksum=None, **metadata)

Archive a file from a local or remote URI.

The blob is stored once per checksum, but each unique source path creates its own metadata file (keyed by File.id).

Parameters:

Name Type Description Default
uri Uri

Local or remote URI to the file

required
file File | None

Optional metadata file object to patch

None
checksum str | None

Content hash (skip computation if provided)

None
**metadata Any

Additional data to store in file's extra field, including FollowTheMoney properties for the Document schema

{}

Returns:

Type Description
File

File metadata object

Source code in ftm_lakehouse/repository/archive.py
def store(
    self,
    uri: Uri,
    file: File | None = None,
    checksum: str | None = None,
    **metadata: Any,
) -> File:
    """
    Archive a file from a local or remote URI.

    The blob is stored once per checksum, but each unique source path
    creates its own metadata file (keyed by File.id).

    Args:
        uri: Local or remote URI to the file
        file: Optional metadata file object to patch
        checksum: Content hash (skip computation if provided)
        **metadata: Additional data to store in file's extra field, including
            FollowTheMoney properties for the `Document` schema

    Returns:
        File metadata object
    """
    resource = UriResource(uri)

    # store bytes blob (skipped if already exists)
    checksum = self.store_blob(uri, checksum)

    # file metadata
    if file is None:
        info = resource.info()
        file = File.from_info(info, checksum)

    file.checksum = checksum

    for key in list(metadata.keys()):
        if key in file.__class__.model_fields:
            setattr(file, key, metadata.pop(key))
    file.extra = clean_dict(metadata)
    file.dataset = self.dataset
    file.origin = file.origin or ARCHIVE_ORIGIN

    # Store metadata
    self._files.put(file.meta_path, file)
    # Notify archive was updated
    self._tags.set(tag.ARCHIVE_UPDATED)

    self.log.info(
        f"Archived `{file.key} ({file.checksum})`",
        checksum=file.checksum,
    )

    return file

store_blob(uri, checksum=None)

Store bytes blob from given uri if it doesn't exist yet.

Parameters:

Name Type Description Default
uri Uri

Local or remote URI to the file

required
checksum str | None

Content hash (skip computation if provided)

None

Returns:

Type Description
str

checksum

Source code in ftm_lakehouse/repository/archive.py
def store_blob(self, uri: Uri, checksum: str | None = None) -> str:
    """
    Store bytes blob from given uri if it doesn't exist yet.

    Args:
        uri: Local or remote URI to the file
        checksum: Content hash (skip computation if provided)

    Returns:
        checksum
    """
    if checksum:
        validate_checksum(checksum)
    if checksum and self.exists(checksum):
        self.log.debug("Blob already exists, skipping", checksum=checksum)
        return checksum

    with open_virtual(uri, algorithm=CHECKSUM_ALGORITHM) as fh:
        if self.exists(fh.checksum):
            self.log.debug("Blob already exists, skipping", checksum=fh.checksum)
            return fh.checksum

        self.log.info(f"Storing blob `{fh.checksum}` ...", checksum=fh.checksum)
        self.write_blob(fh, fh.checksum)
        return fh.checksum

stream(checksum)

Stream blob contents as bytes.

Source code in ftm_lakehouse/repository/archive.py
def stream(self, checksum: str) -> BytesGenerator:
    """Stream blob contents as bytes."""
    yield from self._store.stream(path.archive_blob(checksum))

write_blob(fh, checksum=None)

Write a blob from the given open file-handler

Source code in ftm_lakehouse/repository/archive.py
def write_blob(self, fh: BinaryIO, checksum: str | None = None) -> str:
    """Write a blob from the given open file-handler"""
    if checksum and self.exists(checksum):
        self.log.debug("Blob already exists, skipping", checksum=checksum)
        return checksum
    if not checksum:
        checksum = make_checksum(fh)
        if self.exists(checksum):
            self.log.debug("Blob already exists, skipping", checksum=checksum)
            return checksum
        fh.seek(0)
    with self._store.open(path.archive_blob(checksum), "wb") as out:
        stream(fh, out, CHUNK_SIZE_LARGE)
    return checksum

EntityRepository

Entity/statement operations combining JournalStore and ParquetStore.

dataset.entities.add(entity, origin="import")
dataset.entities.writer(origin="import")
dataset.entities.flush()
dataset.entities.query(origin="import")

ftm_lakehouse.repository.EntityRepository

Bases: ParquetDiffMixin, BaseRepository, ApiEntityRepository

Repository for entity/statement operations.

Combines JournalStore (write-ahead buffer) and ParquetStore (Delta Lake) to provide buffered statement storage with efficient querying.

Writes go to the journal first, then are flushed to the parquet store. Reads query the parquet store (optionally flushing first).

Example
repo = EntityRepository(uri="s3://bucket/dataset", dataset="my_data")

# Write entities
with repo.writer(origin="import") as writer:
    writer.add_entity(entity)

# Flush to parquet
repo.flush()

# Query entities
for entity in repo.query(origin="import"):
    process(entity)
Source code in ftm_lakehouse/repository/entities/main.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
class EntityRepository(ParquetDiffMixin, BaseRepository, ApiEntityRepository):
    """
    Repository for entity/statement operations.

    Combines JournalStore (write-ahead buffer) and ParquetStore (Delta Lake)
    to provide buffered statement storage with efficient querying.

    Writes go to the journal first, then are flushed to the parquet store.
    Reads query the parquet store (optionally flushing first).

    Example:
        ```python
        repo = EntityRepository(uri="s3://bucket/dataset", dataset="my_data")

        # Write entities
        with repo.writer(origin="import") as writer:
            writer.add_entity(entity)

        # Flush to parquet
        repo.flush()

        # Query entities
        for entity in repo.query(origin="import"):
            process(entity)
        ```
    """

    def __init__(
        self,
        dataset: str,
        uri: Uri,
        shards: int | None = None,
    ) -> None:
        super().__init__(dataset, uri)
        self.shards = shards if shards is not None else settings.entity_shards
        self._journal = get_journal(dataset)
        self._statements = ParquetStore(uri, dataset, self.shards)
        self._store = get_store(self._store_uri)

    @contextmanager
    def writer(
        self, origin: str | None = None
    ) -> Generator[BaseJournalWriter, None, None]:
        """
        Get a bulk writer for adding entities/statements.

        Usage:
            with repo.writer(origin="import") as writer:
                writer.add_entity(entity)
        """
        with self._tags.touch(tag.JOURNAL_UPDATED):
            writer = self._journal.writer(self.shards, origin)
            try:
                yield writer
            except BaseException:
                writer.rollback()
                raise
            else:
                writer.flush()
            finally:
                writer.close()
                # keep journal not too full
                if self._journal.count() >= 1_000_000:
                    self.flush()

    def add(self, entity: EntityProxy, origin: str | None = None) -> None:
        """Add a single entity to the journal."""
        self.add_many([entity], origin)

    def add_many(
        self, entities: Iterable[EntityProxy], origin: str | None = None
    ) -> None:
        """Add an entity iterator to the journal."""
        with self.writer(origin) as writer:
            for entity in entities:
                writer.add_entity(entity)

    @api_delegate("_api_flush")
    def flush(self) -> int:
        """Drain the journal into the parquet statement store.

        Groups journal rows by ``(shard, bucket, origin)`` and appends one
        sorted parquet file per partition via :meth:`write_statements`.
        Duplicates and tombstones land as new rows; call :meth:`merge`
        afterwards to collapse them.

        Returns:
            Number of statements appended.
        """
        if self._journal.count() == 0:
            self.log.debug("Journal is empty", journal=mask_uri(self._journal.uri))
            # set tags for the initial run
            if not self._tags.exists(tag.JOURNAL_FLUSHED):
                self._tags.set(tag.JOURNAL_FLUSHED)
            if not self._tags.exists(tag.STATEMENTS_UPDATED):
                self._tags.set(tag.STATEMENTS_UPDATED)
            return 0

        with (
            self._tags.touch(tag.JOURNAL_FLUSHED),
            self._tags.touch(tag.STATEMENTS_UPDATED),
            Took() as t,
        ):
            self.log.info("Flushing journal ...", journal=mask_uri(self._journal.uri))

            now = datetime.now(timezone.utc)
            total = self.write_statements(self._journal.flush_statements(), now=now)

            self.log.info(
                "Flushed statements from journal to lake",
                count=total,
                took=t.took,
                journal=mask_uri(self._journal.uri),
            )

            return total

    @no_api
    def write_statements(
        self,
        statements: Iterable[StatementRow],
        now: datetime | None = None,
    ) -> int:
        """Pack and append a shard-sorted stream of statements to parquet.

        Input is an iterable of :class:`StatementRow` already ordered by
        shard – exactly what :meth:`EntityBuffer.flush_buffer` and
        :meth:`JournalStore.flush_statements` produce. Consecutive rows for
        the same shard accumulate into one per-shard batch;
        :meth:`ParquetStore.append` then splits each batch by bucket and
        writes one parquet file per partition.

        This is the shared core of:

        - :meth:`flush` (drains the journal),
        - bare bulk-import paths in the CLI that bypass the journal entirely.

        Tombstones (rows with ``deleted_at`` set) get their ``last_seen``
        bumped to ``deleted_at`` so they win the ``ROW_NUMBER() OVER (... ORDER
        BY last_seen DESC)`` tiebreak against the live row in
        :meth:`ParquetStore.merge`.

        Memory is bounded by :data:`WRITE_SHARD_BATCH`: within a single
        shard, the in-memory accumulator is emitted to parquet every
        ``WRITE_SHARD_BATCH`` rows so a pathologically large shard cannot
        OOM the writer.

        Args:
            statements: Shard-sorted stream of :class:`StatementRow`.
            now: Default timestamp for missing ``first_seen`` /
                ``last_seen``. Defaults to the current UTC time.

        Returns:
            Number of statements written.
        """
        if now is None:
            now = datetime.now(timezone.utc)

        total = 0
        current_shard: str | None = None
        buffer: list[dict] = []

        def _emit() -> None:
            nonlocal total
            if not buffer:
                return
            batch = pa.Table.from_pylist(buffer, schema=SHARDED_SCHEMA)
            self._statements.append(batch)
            total += len(batch)
            buffer.clear()

        for row in statements:
            if current_shard is not None and current_shard != row.shard:
                _emit()
            elif len(buffer) >= WRITE_SHARD_BATCH:
                _emit()
            current_shard = row.shard

            data = pack_statement(row.stmt)
            data["first_seen"] = data.get("first_seen") or now
            data["deleted_at"] = row.deleted_at
            # Tombstones bump last_seen to the delete timestamp so they win
            # the ROW_NUMBER ORDER BY last_seen DESC tiebreak in merge().
            data["last_seen"] = row.deleted_at or data.get("last_seen") or now
            data["shard"] = row.shard
            buffer.append(data)

        _emit()
        return total

    @api_delegate("_api_merge")
    def merge(self, grace_period_days: int | None = None) -> None:
        """Collapse duplicates and reap expired tombstones from parquet store"""
        self._statements.merge(grace_period_days)

    @no_api
    def unlock(self) -> bool:
        """Forcibly release the dataset write fence.

        Delegates to :meth:`ParquetStore.unlock`. Use as an operator
        escape hatch when a writer died with the lock held; do not
        invoke while a legitimate writer is still running.

        Returns:
            ``True`` if a lock was released, ``False`` otherwise.
        """
        return self._statements.unlock()

    @api_delegate("_api_query")
    def query(
        self,
        entity_ids: Iterable[str] | None = None,
        flush_first: bool = False,
        **filters,
    ) -> StatementEntities:
        """
        Query entities from the parquet store.

        Additional filter kwargs are passed to ftmq Query.

        Args:
            entity_ids: Filter by entity IDs
            flush_first: Flush journal before querying (default False)

        Yields:
            StatementEntity objects matching the query
        """
        if flush_first:
            self.flush()

        if entity_ids:
            filters["entity_id__in"] = list(entity_ids)
        q = Query().where(**filters)

        yield from self._statements.query(q)

    @api_delegate("_api_query_statements")
    def query_statements(self, q: Query | None = None) -> Statements:
        q = q or Query()
        sql = q.sql.statements
        yield from self._statements.query_statements(sql)

    def get(
        self,
        entity_id: str,
        origin: str | None = None,
        flush_first: bool = False,
    ) -> StatementEntity | None:
        """Get a single entity by ID."""
        for entity in self.query([entity_id], flush_first, origin=origin):
            return entity
        return None

    def stream(self) -> ValueEntities:
        """
        Stream entities from the exported JSON file.

        This reads from the pre-exported entities.ftm.json file,
        not directly from the parquet store.
        """
        if self._store.exists(path.ENTITIES_JSON):
            with self._store.open(path.ENTITIES_JSON) as fh:
                yield from smart_read_proxies(fh)

    @no_api
    def export_entities(self, statements_csv_uri: str | None = None) -> None:
        """
        Export entities to a JSON lines file without FtM object construction.

        Uses aggregate_unsafe() to bypass Statement/StatementEntity/to_dict()
        and writes directly to orjson output.

        When ``statements_csv_uri`` is provided (e.g. from ``make --full``
        where statements.csv was just exported), reads the already-sorted CSV
        instead of re-scanning the parquet store.

        Args:
            statements_csv_uri: Optional path to a fresh, sorted statements.csv
        """
        self._store.ensure_parent(path.ENTITIES_JSON)

        if statements_csv_uri is not None:
            rows = smart_stream_csv(statements_csv_uri)
        else:
            rows = self._statements._query_statement_data()

        entities = aggregate_unsafe(rows, self.dataset)
        entities = (e.to_dict() for e in entities)

        with self._store.open(path.ENTITIES_JSON, "wb") as fh:
            smart_write_json(fh, entities)

    @api_delegate("_api_delete_entity")
    def delete_entity(self, entity_id: str) -> int:
        """Delete all statements for an entity via journal tombstones.

        Reads statements from both parquet and journal, then UPSERTs
        tombstone rows (with deleted_at set) into the journal.

        Args:
            entity_id: The entity ID to delete

        Returns:
            Number of tombstone statements written
        """
        now = datetime.now(timezone.utc)
        stmts = self._collect_entity_statements(entity_id)
        if not stmts:
            return 0
        with self._journal.writer(self.shards) as w:
            for stmt in stmts:
                w.add_statement(stmt, deleted_at=now)
        self._tags.set(tag.JOURNAL_UPDATED)
        return len(stmts)

    def delete_statement(self, stmt: Statement) -> None:
        """Delete a single statement via journal tombstone.

        Args:
            stmt: The Statement to delete
        """
        with self._tags.touch(tag.JOURNAL_UPDATED):
            now = datetime.now(timezone.utc)
            with self._journal.writer(self.shards) as w:
                w.add_statement(stmt, deleted_at=now)

    @no_api
    def _collect_entity_statements(self, entity_id: str) -> list[Statement]:
        """Read all statements for an entity from parquet + journal.

        Uses shard-partitioned query for efficient single-entity lookup.
        """
        stmts_by_id: dict[str, Statement] = {}
        journal = cast(SqlJournalStore, self._journal)

        # Read from parquet store (uses shard partition for pruning)
        for stmt in self._statements.get_statements(entity_id):
            if stmt.id:
                stmts_by_id[stmt.id] = stmt

        # Read from journal (may override parquet entries). Use the shard
        # index for an index-assisted scan, then filter by canonical_id in
        # the unpacked statement.
        shard = path.entity_shard(entity_id, self.shards)
        q = (
            select(journal.table)
            .where(journal.table.c.shard == shard)
            .where(journal.table.c.deleted_at.is_(None))
        )
        with journal.engine.connect() as conn:
            for row in conn.execute(q):
                try:
                    stmt = unpack_statement(row.data)
                except MalformedStatementError as exc:
                    self.log.warning(
                        "Skipping malformed journal row in entity collect",
                        row_id=row.id,
                        shard=row.shard,
                        error=str(exc),
                    )
                    continue
                if stmt.entity_id != entity_id:
                    continue
                if stmt.id:
                    stmts_by_id[stmt.id] = stmt

        return list(stmts_by_id.values())

    @api_delegate("_api_stats")
    def get_statistics(self) -> DatasetStats:
        """Compute statistics from the parquet store."""
        return self._statements.stats()

    @property
    def version(self) -> int | None:
        """Current version of the main Delta table."""
        if self._is_api:
            return self._api_version()
        return self._statements.version

    # DiffMixin implementation

    _diff_base_path = path.DIFFS_ENTITIES

    @no_api
    def _get_changed_ids(self, since: datetime) -> Iterator[str]:
        """Get entity IDs with statements added since the given timestamp."""
        return self._statements.get_changed_entity_ids(since)

    @no_api
    def _write_diff(self, entity_ids: Iterator[str], ts: datetime, **kwargs) -> str:
        """Write entities as line-based JSON with operation envelopes."""
        key = path.entities_diff(ts)
        with self._store.open(key, "wb") as o:
            smart_write_json(o, self._get_delta_entities(entity_ids))
        return self._store.to_uri(key)

    @no_api
    def _get_delta_entities(
        self, entity_ids: Iterator[str]
    ) -> Generator[SDict, None, None]:
        original_ids: set[str] = set()
        seen_ids: set[str] = set()
        it = iter(entity_ids)
        while batch := set(islice(it, QUERY_IN_BATCH_SIZE)):
            original_ids.update(batch)
            for entity in self.query(entity_ids=batch, flush_first=False):
                if entity.id:
                    seen_ids.add(entity.id)
                yield make_envelope(entity.to_dict())
        for entity_id in original_ids - seen_ids:
            yield make_envelope({"id": entity_id}, op="DEL")

    @no_api
    def _write_initial_diff(self, ts: datetime, **kwargs) -> None:
        """Copy over exported entities.ftm.json to initial diff version"""
        with self._store.open(path.entities_diff(ts), "wb") as o:
            for data in self._store.stream(path.ENTITIES_JSON):
                line = orjson.dumps(
                    make_envelope(data), option=orjson.OPT_APPEND_NEWLINE
                )
                o.write(line)

version property

Current version of the main Delta table.

add(entity, origin=None)

Add a single entity to the journal.

Source code in ftm_lakehouse/repository/entities/main.py
def add(self, entity: EntityProxy, origin: str | None = None) -> None:
    """Add a single entity to the journal."""
    self.add_many([entity], origin)

add_many(entities, origin=None)

Add an entity iterator to the journal.

Source code in ftm_lakehouse/repository/entities/main.py
def add_many(
    self, entities: Iterable[EntityProxy], origin: str | None = None
) -> None:
    """Add an entity iterator to the journal."""
    with self.writer(origin) as writer:
        for entity in entities:
            writer.add_entity(entity)

delete_entity(entity_id)

Delete all statements for an entity via journal tombstones.

Reads statements from both parquet and journal, then UPSERTs tombstone rows (with deleted_at set) into the journal.

Parameters:

Name Type Description Default
entity_id str

The entity ID to delete

required

Returns:

Type Description
int

Number of tombstone statements written

Source code in ftm_lakehouse/repository/entities/main.py
@api_delegate("_api_delete_entity")
def delete_entity(self, entity_id: str) -> int:
    """Delete all statements for an entity via journal tombstones.

    Reads statements from both parquet and journal, then UPSERTs
    tombstone rows (with deleted_at set) into the journal.

    Args:
        entity_id: The entity ID to delete

    Returns:
        Number of tombstone statements written
    """
    now = datetime.now(timezone.utc)
    stmts = self._collect_entity_statements(entity_id)
    if not stmts:
        return 0
    with self._journal.writer(self.shards) as w:
        for stmt in stmts:
            w.add_statement(stmt, deleted_at=now)
    self._tags.set(tag.JOURNAL_UPDATED)
    return len(stmts)

delete_statement(stmt)

Delete a single statement via journal tombstone.

Parameters:

Name Type Description Default
stmt Statement

The Statement to delete

required
Source code in ftm_lakehouse/repository/entities/main.py
def delete_statement(self, stmt: Statement) -> None:
    """Delete a single statement via journal tombstone.

    Args:
        stmt: The Statement to delete
    """
    with self._tags.touch(tag.JOURNAL_UPDATED):
        now = datetime.now(timezone.utc)
        with self._journal.writer(self.shards) as w:
            w.add_statement(stmt, deleted_at=now)

export_entities(statements_csv_uri=None)

Export entities to a JSON lines file without FtM object construction.

Uses aggregate_unsafe() to bypass Statement/StatementEntity/to_dict() and writes directly to orjson output.

When statements_csv_uri is provided (e.g. from make --full where statements.csv was just exported), reads the already-sorted CSV instead of re-scanning the parquet store.

Parameters:

Name Type Description Default
statements_csv_uri str | None

Optional path to a fresh, sorted statements.csv

None
Source code in ftm_lakehouse/repository/entities/main.py
@no_api
def export_entities(self, statements_csv_uri: str | None = None) -> None:
    """
    Export entities to a JSON lines file without FtM object construction.

    Uses aggregate_unsafe() to bypass Statement/StatementEntity/to_dict()
    and writes directly to orjson output.

    When ``statements_csv_uri`` is provided (e.g. from ``make --full``
    where statements.csv was just exported), reads the already-sorted CSV
    instead of re-scanning the parquet store.

    Args:
        statements_csv_uri: Optional path to a fresh, sorted statements.csv
    """
    self._store.ensure_parent(path.ENTITIES_JSON)

    if statements_csv_uri is not None:
        rows = smart_stream_csv(statements_csv_uri)
    else:
        rows = self._statements._query_statement_data()

    entities = aggregate_unsafe(rows, self.dataset)
    entities = (e.to_dict() for e in entities)

    with self._store.open(path.ENTITIES_JSON, "wb") as fh:
        smart_write_json(fh, entities)

flush()

Drain the journal into the parquet statement store.

Groups journal rows by (shard, bucket, origin) and appends one sorted parquet file per partition via :meth:write_statements. Duplicates and tombstones land as new rows; call :meth:merge afterwards to collapse them.

Returns:

Type Description
int

Number of statements appended.

Source code in ftm_lakehouse/repository/entities/main.py
@api_delegate("_api_flush")
def flush(self) -> int:
    """Drain the journal into the parquet statement store.

    Groups journal rows by ``(shard, bucket, origin)`` and appends one
    sorted parquet file per partition via :meth:`write_statements`.
    Duplicates and tombstones land as new rows; call :meth:`merge`
    afterwards to collapse them.

    Returns:
        Number of statements appended.
    """
    if self._journal.count() == 0:
        self.log.debug("Journal is empty", journal=mask_uri(self._journal.uri))
        # set tags for the initial run
        if not self._tags.exists(tag.JOURNAL_FLUSHED):
            self._tags.set(tag.JOURNAL_FLUSHED)
        if not self._tags.exists(tag.STATEMENTS_UPDATED):
            self._tags.set(tag.STATEMENTS_UPDATED)
        return 0

    with (
        self._tags.touch(tag.JOURNAL_FLUSHED),
        self._tags.touch(tag.STATEMENTS_UPDATED),
        Took() as t,
    ):
        self.log.info("Flushing journal ...", journal=mask_uri(self._journal.uri))

        now = datetime.now(timezone.utc)
        total = self.write_statements(self._journal.flush_statements(), now=now)

        self.log.info(
            "Flushed statements from journal to lake",
            count=total,
            took=t.took,
            journal=mask_uri(self._journal.uri),
        )

        return total

get(entity_id, origin=None, flush_first=False)

Get a single entity by ID.

Source code in ftm_lakehouse/repository/entities/main.py
def get(
    self,
    entity_id: str,
    origin: str | None = None,
    flush_first: bool = False,
) -> StatementEntity | None:
    """Get a single entity by ID."""
    for entity in self.query([entity_id], flush_first, origin=origin):
        return entity
    return None

get_statistics()

Compute statistics from the parquet store.

Source code in ftm_lakehouse/repository/entities/main.py
@api_delegate("_api_stats")
def get_statistics(self) -> DatasetStats:
    """Compute statistics from the parquet store."""
    return self._statements.stats()

merge(grace_period_days=None)

Collapse duplicates and reap expired tombstones from parquet store

Source code in ftm_lakehouse/repository/entities/main.py
@api_delegate("_api_merge")
def merge(self, grace_period_days: int | None = None) -> None:
    """Collapse duplicates and reap expired tombstones from parquet store"""
    self._statements.merge(grace_period_days)

query(entity_ids=None, flush_first=False, **filters)

Query entities from the parquet store.

Additional filter kwargs are passed to ftmq Query.

Parameters:

Name Type Description Default
entity_ids Iterable[str] | None

Filter by entity IDs

None
flush_first bool

Flush journal before querying (default False)

False

Yields:

Type Description
StatementEntities

StatementEntity objects matching the query

Source code in ftm_lakehouse/repository/entities/main.py
@api_delegate("_api_query")
def query(
    self,
    entity_ids: Iterable[str] | None = None,
    flush_first: bool = False,
    **filters,
) -> StatementEntities:
    """
    Query entities from the parquet store.

    Additional filter kwargs are passed to ftmq Query.

    Args:
        entity_ids: Filter by entity IDs
        flush_first: Flush journal before querying (default False)

    Yields:
        StatementEntity objects matching the query
    """
    if flush_first:
        self.flush()

    if entity_ids:
        filters["entity_id__in"] = list(entity_ids)
    q = Query().where(**filters)

    yield from self._statements.query(q)

stream()

Stream entities from the exported JSON file.

This reads from the pre-exported entities.ftm.json file, not directly from the parquet store.

Source code in ftm_lakehouse/repository/entities/main.py
def stream(self) -> ValueEntities:
    """
    Stream entities from the exported JSON file.

    This reads from the pre-exported entities.ftm.json file,
    not directly from the parquet store.
    """
    if self._store.exists(path.ENTITIES_JSON):
        with self._store.open(path.ENTITIES_JSON) as fh:
            yield from smart_read_proxies(fh)

unlock()

Forcibly release the dataset write fence.

Delegates to :meth:ParquetStore.unlock. Use as an operator escape hatch when a writer died with the lock held; do not invoke while a legitimate writer is still running.

Returns:

Type Description
bool

True if a lock was released, False otherwise.

Source code in ftm_lakehouse/repository/entities/main.py
@no_api
def unlock(self) -> bool:
    """Forcibly release the dataset write fence.

    Delegates to :meth:`ParquetStore.unlock`. Use as an operator
    escape hatch when a writer died with the lock held; do not
    invoke while a legitimate writer is still running.

    Returns:
        ``True`` if a lock was released, ``False`` otherwise.
    """
    return self._statements.unlock()

write_statements(statements, now=None)

Pack and append a shard-sorted stream of statements to parquet.

Input is an iterable of :class:StatementRow already ordered by shard – exactly what :meth:EntityBuffer.flush_buffer and :meth:JournalStore.flush_statements produce. Consecutive rows for the same shard accumulate into one per-shard batch; :meth:ParquetStore.append then splits each batch by bucket and writes one parquet file per partition.

This is the shared core of:

  • :meth:flush (drains the journal),
  • bare bulk-import paths in the CLI that bypass the journal entirely.

Tombstones (rows with deleted_at set) get their last_seen bumped to deleted_at so they win the ROW_NUMBER() OVER (... ORDER BY last_seen DESC) tiebreak against the live row in :meth:ParquetStore.merge.

Memory is bounded by :data:WRITE_SHARD_BATCH: within a single shard, the in-memory accumulator is emitted to parquet every WRITE_SHARD_BATCH rows so a pathologically large shard cannot OOM the writer.

Parameters:

Name Type Description Default
statements Iterable[StatementRow]

Shard-sorted stream of :class:StatementRow.

required
now datetime | None

Default timestamp for missing first_seen / last_seen. Defaults to the current UTC time.

None

Returns:

Type Description
int

Number of statements written.

Source code in ftm_lakehouse/repository/entities/main.py
@no_api
def write_statements(
    self,
    statements: Iterable[StatementRow],
    now: datetime | None = None,
) -> int:
    """Pack and append a shard-sorted stream of statements to parquet.

    Input is an iterable of :class:`StatementRow` already ordered by
    shard – exactly what :meth:`EntityBuffer.flush_buffer` and
    :meth:`JournalStore.flush_statements` produce. Consecutive rows for
    the same shard accumulate into one per-shard batch;
    :meth:`ParquetStore.append` then splits each batch by bucket and
    writes one parquet file per partition.

    This is the shared core of:

    - :meth:`flush` (drains the journal),
    - bare bulk-import paths in the CLI that bypass the journal entirely.

    Tombstones (rows with ``deleted_at`` set) get their ``last_seen``
    bumped to ``deleted_at`` so they win the ``ROW_NUMBER() OVER (... ORDER
    BY last_seen DESC)`` tiebreak against the live row in
    :meth:`ParquetStore.merge`.

    Memory is bounded by :data:`WRITE_SHARD_BATCH`: within a single
    shard, the in-memory accumulator is emitted to parquet every
    ``WRITE_SHARD_BATCH`` rows so a pathologically large shard cannot
    OOM the writer.

    Args:
        statements: Shard-sorted stream of :class:`StatementRow`.
        now: Default timestamp for missing ``first_seen`` /
            ``last_seen``. Defaults to the current UTC time.

    Returns:
        Number of statements written.
    """
    if now is None:
        now = datetime.now(timezone.utc)

    total = 0
    current_shard: str | None = None
    buffer: list[dict] = []

    def _emit() -> None:
        nonlocal total
        if not buffer:
            return
        batch = pa.Table.from_pylist(buffer, schema=SHARDED_SCHEMA)
        self._statements.append(batch)
        total += len(batch)
        buffer.clear()

    for row in statements:
        if current_shard is not None and current_shard != row.shard:
            _emit()
        elif len(buffer) >= WRITE_SHARD_BATCH:
            _emit()
        current_shard = row.shard

        data = pack_statement(row.stmt)
        data["first_seen"] = data.get("first_seen") or now
        data["deleted_at"] = row.deleted_at
        # Tombstones bump last_seen to the delete timestamp so they win
        # the ROW_NUMBER ORDER BY last_seen DESC tiebreak in merge().
        data["last_seen"] = row.deleted_at or data.get("last_seen") or now
        data["shard"] = row.shard
        buffer.append(data)

    _emit()
    return total

writer(origin=None)

Get a bulk writer for adding entities/statements.

Usage

with repo.writer(origin="import") as writer: writer.add_entity(entity)

Source code in ftm_lakehouse/repository/entities/main.py
@contextmanager
def writer(
    self, origin: str | None = None
) -> Generator[BaseJournalWriter, None, None]:
    """
    Get a bulk writer for adding entities/statements.

    Usage:
        with repo.writer(origin="import") as writer:
            writer.add_entity(entity)
    """
    with self._tags.touch(tag.JOURNAL_UPDATED):
        writer = self._journal.writer(self.shards, origin)
        try:
            yield writer
        except BaseException:
            writer.rollback()
            raise
        else:
            writer.flush()
        finally:
            writer.close()
            # keep journal not too full
            if self._journal.count() >= 1_000_000:
                self.flush()

MappingRepository

Mapping configuration storage.

dataset.mappings.put(mapping)
dataset.mappings.get(content_hash)
dataset.mappings.list()

ftm_lakehouse.repository.MappingRepository

Bases: BaseRepository

Repository for mapping configuration storage.

Combines FileStore (current configs) and VersionStore (snapshots) to provide versioned mapping configuration storage.

Each mapping is identified by a content_hash (SHA256 of the source CSV file).

Example
repo = MappingRepository(dataset="my_data", uri="s3://bucket/dataset")

# Store a mapping configuration
repo.put(mapping)

# Get a mapping configuration
mapping = repo.get(content_hash)

# List all mappings
for content_hash in repo.list():
    print(content_hash)
Source code in ftm_lakehouse/repository/mapping.py
class MappingRepository(BaseRepository):
    """
    Repository for mapping configuration storage.

    Combines FileStore (current configs) and VersionStore (snapshots)
    to provide versioned mapping configuration storage.

    Each mapping is identified by a content_hash (SHA256 of the source CSV file).

    Example:
        ```python
        repo = MappingRepository(dataset="my_data", uri="s3://bucket/dataset")

        # Store a mapping configuration
        repo.put(mapping)

        # Get a mapping configuration
        mapping = repo.get(content_hash)

        # List all mappings
        for content_hash in repo.list():
            print(content_hash)
        ```
    """

    def __init__(self, dataset: str, uri: Uri) -> None:
        super().__init__(dataset, uri)
        self._versions = VersionedModelStore(self._store_uri, DatasetMapping)

    def exists(self, content_hash: str) -> bool:
        """Check if a mapping configuration exists."""
        mapping_path = path.mapping(content_hash)
        return self._versions.exists(mapping_path)

    def get(self, content_hash: str) -> DatasetMapping:
        """
        Get a mapping configuration by content hash.

        Args:
            content_hash: SHA256 checksum of the source CSV file

        Returns:
            DatasetMapping if exists, None otherwise
        """
        mapping_path = path.mapping(content_hash)
        return self._versions.get(mapping_path)

    def put(self, mapping: DatasetMapping) -> str:
        """
        Store a mapping configuration.

        Creates both a current config and a versioned snapshot.

        Args:
            mapping: The mapping configuration to store

        Returns:
            The current version path
        """
        content_hash = mapping.content_hash
        mapping_path = path.mapping(content_hash)
        key = self._versions.make(mapping_path, mapping)

        self.log.info(
            f"Stored mapping `{mapping_path}`",
            content_hash=content_hash,
            version=key,
        )

        return key

    def delete(self, content_hash: str) -> None:
        """Delete a mapping configuration."""
        mapping_path = path.mapping(content_hash)
        self._versions.delete(mapping_path)
        self.log.warning("Deleted mapping", content_hash=content_hash)

    def list(self) -> Generator[str, None, None]:
        """
        List all content hashes that have mapping configurations.

        Yields:
            Content hash strings for files with mapping.yml configs
        """
        # Glob matches only direct mapping.yml, not versioned copies
        for key in self._versions.iterate_keys(
            prefix=path.MAPPINGS, glob=f"*/{path.MAPPING}"
        ):
            # Keys look like: mappings/<content_hash>/mapping.yml
            parts = key.split("/")
            if len(parts) == 3:
                yield parts[1]  # content_hash

    def iterate(self) -> Generator[DatasetMapping, None, None]:
        """Iterate all mapping configurations."""
        for content_hash in self.list():
            yield self.get(content_hash)

delete(content_hash)

Delete a mapping configuration.

Source code in ftm_lakehouse/repository/mapping.py
def delete(self, content_hash: str) -> None:
    """Delete a mapping configuration."""
    mapping_path = path.mapping(content_hash)
    self._versions.delete(mapping_path)
    self.log.warning("Deleted mapping", content_hash=content_hash)

exists(content_hash)

Check if a mapping configuration exists.

Source code in ftm_lakehouse/repository/mapping.py
def exists(self, content_hash: str) -> bool:
    """Check if a mapping configuration exists."""
    mapping_path = path.mapping(content_hash)
    return self._versions.exists(mapping_path)

get(content_hash)

Get a mapping configuration by content hash.

Parameters:

Name Type Description Default
content_hash str

SHA256 checksum of the source CSV file

required

Returns:

Type Description
DatasetMapping

DatasetMapping if exists, None otherwise

Source code in ftm_lakehouse/repository/mapping.py
def get(self, content_hash: str) -> DatasetMapping:
    """
    Get a mapping configuration by content hash.

    Args:
        content_hash: SHA256 checksum of the source CSV file

    Returns:
        DatasetMapping if exists, None otherwise
    """
    mapping_path = path.mapping(content_hash)
    return self._versions.get(mapping_path)

iterate()

Iterate all mapping configurations.

Source code in ftm_lakehouse/repository/mapping.py
def iterate(self) -> Generator[DatasetMapping, None, None]:
    """Iterate all mapping configurations."""
    for content_hash in self.list():
        yield self.get(content_hash)

list()

List all content hashes that have mapping configurations.

Yields:

Type Description
str

Content hash strings for files with mapping.yml configs

Source code in ftm_lakehouse/repository/mapping.py
def list(self) -> Generator[str, None, None]:
    """
    List all content hashes that have mapping configurations.

    Yields:
        Content hash strings for files with mapping.yml configs
    """
    # Glob matches only direct mapping.yml, not versioned copies
    for key in self._versions.iterate_keys(
        prefix=path.MAPPINGS, glob=f"*/{path.MAPPING}"
    ):
        # Keys look like: mappings/<content_hash>/mapping.yml
        parts = key.split("/")
        if len(parts) == 3:
            yield parts[1]  # content_hash

put(mapping)

Store a mapping configuration.

Creates both a current config and a versioned snapshot.

Parameters:

Name Type Description Default
mapping DatasetMapping

The mapping configuration to store

required

Returns:

Type Description
str

The current version path

Source code in ftm_lakehouse/repository/mapping.py
def put(self, mapping: DatasetMapping) -> str:
    """
    Store a mapping configuration.

    Creates both a current config and a versioned snapshot.

    Args:
        mapping: The mapping configuration to store

    Returns:
        The current version path
    """
    content_hash = mapping.content_hash
    mapping_path = path.mapping(content_hash)
    key = self._versions.make(mapping_path, mapping)

    self.log.info(
        f"Stored mapping `{mapping_path}`",
        content_hash=content_hash,
        version=key,
    )

    return key

JobRepository

Job tracking and status.

dataset.jobs.put(job)
dataset.jobs.get(run_id)

ftm_lakehouse.repository.JobRepository

Bases: BaseRepository, Generic[J]

Repository for job run storage.

Persists job run data as JSON files, organized by job type and run ID.

Example
repo = JobRepository(dataset="my_data", uri="s3://bucket/dataset")

# Store a job run
repo.put(job)

# Get latest run for a job type
job = repo.latest(CrawlJob)

# Run a job with lifecycle management
with repo.run(job) as run:
    # Do work...
    run.save()  # Periodic save
# Job automatically stopped when context exits
Source code in ftm_lakehouse/repository/job.py
class JobRepository(BaseRepository, Generic[J]):
    """
    Repository for job run storage.

    Persists job run data as JSON files,
    organized by job type and run ID.

    Example:
        ```python
        repo = JobRepository(dataset="my_data", uri="s3://bucket/dataset")

        # Store a job run
        repo.put(job)

        # Get latest run for a job type
        job = repo.latest(CrawlJob)

        # Run a job with lifecycle management
        with repo.run(job) as run:
            # Do work...
            run.save()  # Periodic save
        # Job automatically stopped when context exits
        ```
    """

    def __init__(self, dataset: str, uri: Uri, model: type[J]) -> None:
        super().__init__(dataset, uri)
        self.job_type = model.__name__
        self._store = get_store(self._store_uri, model=model)

    def put(self, job: JobModel) -> None:
        """Store a job run."""
        self._store.put(path.job_run(self.job_type, job.run_id), job)

    def get(self, run_id: str) -> J:
        """Get a specific job run by type and run ID."""
        key = path.job_run(self.job_type, run_id)
        return self._store.get(key)

    def latest(self) -> J | None:
        """
        Get the latest run for the configured job type (self.model).

        Jobs are sorted by run ID (which contains timestamp),
        so the latest is the last in alphabetical order.
        """
        for key in sorted(
            self._store.iterate_keys(prefix=path.job_prefix(self.job_type)),
            reverse=True,
        ):
            return self._store.get(key)
        return None

    def iterate(self) -> Generator[J, None, None]:
        """Iterate all runs for the current job type."""
        yield from self._store.iterate_values(prefix=path.job_prefix(self.job_type))

    @contextlib.contextmanager
    def run(self, job: J) -> Generator[JobRun[J], None, None]:
        """
        Get a context manager for running a job.

        The job is automatically started on entry and stopped on exit.
        If an exception occurs, it's recorded in the job's exc field.
        """
        run = JobRun(self, job)
        try:
            run.start()
            yield run
        except Exception as e:
            run.stop(e)
            raise
        finally:
            if job.running:  # Only stop if not already stopped
                run.stop()

    def delete(self, job: J) -> None:
        """Delete a job run."""
        key = path.job_run(self.job_type, job.run_id)
        self._store.delete(key)
        self.log.warning("Deleted job run", job=job.name, run_id=job.run_id)

delete(job)

Delete a job run.

Source code in ftm_lakehouse/repository/job.py
def delete(self, job: J) -> None:
    """Delete a job run."""
    key = path.job_run(self.job_type, job.run_id)
    self._store.delete(key)
    self.log.warning("Deleted job run", job=job.name, run_id=job.run_id)

get(run_id)

Get a specific job run by type and run ID.

Source code in ftm_lakehouse/repository/job.py
def get(self, run_id: str) -> J:
    """Get a specific job run by type and run ID."""
    key = path.job_run(self.job_type, run_id)
    return self._store.get(key)

iterate()

Iterate all runs for the current job type.

Source code in ftm_lakehouse/repository/job.py
def iterate(self) -> Generator[J, None, None]:
    """Iterate all runs for the current job type."""
    yield from self._store.iterate_values(prefix=path.job_prefix(self.job_type))

latest()

Get the latest run for the configured job type (self.model).

Jobs are sorted by run ID (which contains timestamp), so the latest is the last in alphabetical order.

Source code in ftm_lakehouse/repository/job.py
def latest(self) -> J | None:
    """
    Get the latest run for the configured job type (self.model).

    Jobs are sorted by run ID (which contains timestamp),
    so the latest is the last in alphabetical order.
    """
    for key in sorted(
        self._store.iterate_keys(prefix=path.job_prefix(self.job_type)),
        reverse=True,
    ):
        return self._store.get(key)
    return None

put(job)

Store a job run.

Source code in ftm_lakehouse/repository/job.py
def put(self, job: JobModel) -> None:
    """Store a job run."""
    self._store.put(path.job_run(self.job_type, job.run_id), job)

run(job)

Get a context manager for running a job.

The job is automatically started on entry and stopped on exit. If an exception occurs, it's recorded in the job's exc field.

Source code in ftm_lakehouse/repository/job.py
@contextlib.contextmanager
def run(self, job: J) -> Generator[JobRun[J], None, None]:
    """
    Get a context manager for running a job.

    The job is automatically started on entry and stopped on exit.
    If an exception occurs, it's recorded in the job's exc field.
    """
    run = JobRun(self, job)
    try:
        run.start()
        yield run
    except Exception as e:
        run.stop(e)
        raise
    finally:
        if job.running:  # Only stop if not already stopped
            run.stop()