Skip to content

Layer 3: Repository

Domain-specific combinations of multiple stores. Each repository owns one domain concept.

ArchiveRepository

Content-addressed file archive with metadata and extracted text storage.

dataset.archive.put(uri)
dataset.archive.get(checksum)
dataset.archive.stream(file)

ftm_lakehouse.repository.ArchiveRepository

Bases: BaseRepository

Repository for file archive operations.

Combines content-addressed blob storage (raw bytes) and model-based metadata storage (JSON) to provide file archiving.

Blobs are stored once per checksum, but each unique source path creates its own metadata file (keyed by File.id).

Optionally, extracted text (by different origins) can be stored and retrieved. As well, other programs can write arbitrary additional data to the archive (such as pdf page thumbnails).

Example
archive = ArchiveRepository(dataset="my_data", uri="s3://bucket/dataset")

# Archive a file
file = archive.store("path/to/file.pdf")

# Retrieve file info
file = archive.get_file(checksum)

# Stream file contents by checksum
for chunk in archive.stream(file.checksum):
    process(chunk)
Source code in ftm_lakehouse/repository/archive.py
class ArchiveRepository(BaseRepository):
    """
    Repository for file archive operations.

    Combines content-addressed blob storage (raw bytes) and model-based
    metadata storage (JSON) to provide file archiving.

    Blobs are stored once per checksum, but each unique source path
    creates its own metadata file (keyed by File.id).

    Optionally, extracted text (by different origins) can be stored and
    retrieved. As well, other programs can write arbitrary additional data to
    the archive (such as pdf page thumbnails).

    Example:
        ```python
        archive = ArchiveRepository(dataset="my_data", uri="s3://bucket/dataset")

        # Archive a file
        file = archive.store("path/to/file.pdf")

        # Retrieve file info
        file = archive.get_file(checksum)

        # Stream file contents by checksum
        for chunk in archive.stream(file.checksum):
            process(chunk)
        ```
    """

    def __init__(self, dataset: str, uri: Uri) -> None:
        super().__init__(dataset, uri)
        self._store = get_store(
            self._store_uri, serialization_mode="raw", raise_on_nonexist=True
        )
        self._files = get_store(self._store_uri, model=File, raise_on_nonexist=True)
        self._txts = get_store(
            self._store_uri, serialization_mode="auto", raise_on_nonexist=False
        )

    def exists(self, checksum: str) -> bool:
        """Check if blob exists for the given checksum."""
        return self._store.exists(path.archive_blob(checksum))

    def get_file(self, checksum: str, file_id: str | None = None) -> File:
        """
        Get file metadata for the given checksum.

        Args:
            checksum: SHA256 checksum of file
            file_id: Optional File.id to get specific metadata

        Raises:
            FileNotFoundError: When no metadata file exists
        """
        if file_id is not None:
            key = path.archive_meta(checksum, file_id)
            return self._files.get(key)

        # Return first found metadata
        for file in self.get_all_files(checksum):
            return file
        raise FileNotFoundError(checksum)

    def get_all_files(self, checksum: str) -> Files:
        """
        Iterate all metadata files for the given checksum.

        Multiple crawlers may have archived the same file content from
        different source paths, each creating their own metadata file.
        """
        prefix = path.archive_prefix(checksum)
        yield from self._files.iterate_values(prefix, glob="*.json")

    def iterate_files(self) -> Files:
        """Iterate all file metadata in the archive."""
        yield from self._files.iterate_values(path.ARCHIVE, glob="**/*.json")

    def put_file(self, file: File) -> File:
        """Store file metadata object."""
        file.store = str(self.uri)
        file.dataset = self.dataset
        self._files.put(file.meta_path, file)
        return file

    def stream(self, checksum: str) -> BytesGenerator:
        """Stream blob contents as bytes."""
        yield from self._store.stream(path.archive_blob(checksum))

    def open(self, checksum: str) -> ContextManager[IO[bytes]]:
        """Get a file-like handle for reading."""
        return self._store.open(path.archive_blob(checksum), mode=DEFAULT_MODE)

    def to_uri(self, checksum: str) -> str:
        return self._store.to_uri(path.archive_blob(checksum))

    def local_path(self, checksum: str) -> ContextManager[Path]:
        """
        Get the local path to the blob.

        If storage is local, returns actual path. Otherwise, creates
        a temporary local copy that is cleaned up after context exit.
        """
        return self._store.local_path(path.archive_blob(checksum))

    def store(
        self,
        uri: Uri,
        file: File | None = None,
        checksum: str | None = None,
        **metadata: Any,
    ) -> File:
        """
        Archive a file from a local or remote URI.

        The blob is stored once per checksum, but each unique source path
        creates its own metadata file (keyed by File.id).

        Args:
            uri: Local or remote URI to the file
            file: Optional metadata file object to patch
            checksum: Content hash (skip computation if provided)
            **metadata: Additional data to store in file's extra field, including
                FollowTheMoney properties for the `Document` schema

        Returns:
            File metadata object
        """
        resource = UriResource(uri)

        # store bytes blob (skipped if already exists)
        checksum = self.store_blob(uri, checksum)

        # file metadata
        if file is None:
            info = resource.info()
            file = File.from_info(info, checksum)

        file.checksum = checksum

        for key in list(metadata.keys()):
            if key in file.__class__.model_fields:
                setattr(file, key, metadata.pop(key))
        file.extra = clean_dict(metadata)
        file.dataset = self.dataset
        file.origin = file.origin or ARCHIVE_ORIGIN

        # Store metadata
        self._files.put(file.meta_path, file)
        # Notify archive was updated
        self._tags.set(tag.ARCHIVE_UPDATED)

        self.log.info(
            f"Archived `{file.key} ({file.checksum})`",
            checksum=file.checksum,
        )

        return file

    def store_blob(self, uri: Uri, checksum: str | None = None) -> str:
        """
        Store bytes blob from given uri if it doesn't exist yet.

        Args:
            uri: Local or remote URI to the file
            checksum: Content hash (skip computation if provided)

        Returns:
            checksum
        """
        if checksum:
            validate_checksum(checksum)
        if checksum and self.exists(checksum):
            self.log.debug("Blob already exists, skipping", checksum=checksum)
            return checksum

        with open_virtual(uri, algorithm=CHECKSUM_ALGORITHM) as fh:
            if self.exists(fh.checksum):
                self.log.debug("Blob already exists, skipping", checksum=fh.checksum)
                return fh.checksum

            self.log.info(f"Storing blob `{fh.checksum}` ...", checksum=fh.checksum)
            self.write_blob(fh, fh.checksum)
            return fh.checksum

    def write_blob(self, fh: BinaryIO, checksum: str | None = None) -> str:
        """Write a blob from the given open file-handler"""
        if checksum and self.exists(checksum):
            self.log.debug("Blob already exists, skipping", checksum=checksum)
            return checksum
        if not checksum:
            checksum = make_checksum(fh)
            if self.exists(checksum):
                self.log.debug("Blob already exists, skipping", checksum=checksum)
                return checksum
            fh.seek(0)
        with self._store.open(path.archive_blob(checksum), "wb") as out:
            stream(fh, out, CHUNK_SIZE_LARGE)
        return checksum

    def delete(self, file: File) -> None:
        """
        Delete a file's metadata from the archive.

        The blob is never deleted. (FIXME)
        """
        self.log.warning(
            "Deleting file metadata",
            checksum=file.checksum,
            file_id=file.id,
        )
        self._files.delete(file.meta_path)

    def put_txt(self, checksum: str, text: str, origin: str = DEFAULT_ORIGIN) -> None:
        """Store extracted text for a file."""
        origin = origin or DEFAULT_ORIGIN
        key = path.archive_txt(checksum, origin)
        self._txts.put(key, text)

    def get_txt(self, checksum: str, origin: str | None = None) -> str | None:
        """Get extracted text for a file. If `origin`, get by this specific
        extraction, otherwise get the first txt value (no guaranteed order)"""
        if origin:
            key = path.archive_txt(checksum, origin)
            return self._txts.get(key)
        for value in self._txts.iterate_values(
            prefix=path.archive_prefix(checksum), glob="*.txt"
        ):
            return value

    def put_data(self, checksum: str, path: str, data: bytes) -> None:
        """Store raw data at the given path"""
        key = join_relpaths(make_checksum_key(checksum), path)
        self._store.put(key, data)

    def get_data(self, checksum: str, path: str) -> bytes:
        """Get raw data at the given path"""
        key = join_relpaths(make_checksum_key(checksum), path)
        return self._store.get(key)

delete(file)

Delete a file's metadata from the archive.

The blob is never deleted. (FIXME)

Source code in ftm_lakehouse/repository/archive.py
def delete(self, file: File) -> None:
    """
    Delete a file's metadata from the archive.

    The blob is never deleted. (FIXME)
    """
    self.log.warning(
        "Deleting file metadata",
        checksum=file.checksum,
        file_id=file.id,
    )
    self._files.delete(file.meta_path)

exists(checksum)

Check if blob exists for the given checksum.

Source code in ftm_lakehouse/repository/archive.py
def exists(self, checksum: str) -> bool:
    """Check if blob exists for the given checksum."""
    return self._store.exists(path.archive_blob(checksum))

get_all_files(checksum)

Iterate all metadata files for the given checksum.

Multiple crawlers may have archived the same file content from different source paths, each creating their own metadata file.

Source code in ftm_lakehouse/repository/archive.py
def get_all_files(self, checksum: str) -> Files:
    """
    Iterate all metadata files for the given checksum.

    Multiple crawlers may have archived the same file content from
    different source paths, each creating their own metadata file.
    """
    prefix = path.archive_prefix(checksum)
    yield from self._files.iterate_values(prefix, glob="*.json")

get_data(checksum, path)

Get raw data at the given path

Source code in ftm_lakehouse/repository/archive.py
def get_data(self, checksum: str, path: str) -> bytes:
    """Get raw data at the given path"""
    key = join_relpaths(make_checksum_key(checksum), path)
    return self._store.get(key)

get_file(checksum, file_id=None)

Get file metadata for the given checksum.

Parameters:

Name Type Description Default
checksum str

SHA256 checksum of file

required
file_id str | None

Optional File.id to get specific metadata

None

Raises:

Type Description
FileNotFoundError

When no metadata file exists

Source code in ftm_lakehouse/repository/archive.py
def get_file(self, checksum: str, file_id: str | None = None) -> File:
    """
    Get file metadata for the given checksum.

    Args:
        checksum: SHA256 checksum of file
        file_id: Optional File.id to get specific metadata

    Raises:
        FileNotFoundError: When no metadata file exists
    """
    if file_id is not None:
        key = path.archive_meta(checksum, file_id)
        return self._files.get(key)

    # Return first found metadata
    for file in self.get_all_files(checksum):
        return file
    raise FileNotFoundError(checksum)

get_txt(checksum, origin=None)

Get extracted text for a file. If origin, get by this specific extraction, otherwise get the first txt value (no guaranteed order)

Source code in ftm_lakehouse/repository/archive.py
def get_txt(self, checksum: str, origin: str | None = None) -> str | None:
    """Get extracted text for a file. If `origin`, get by this specific
    extraction, otherwise get the first txt value (no guaranteed order)"""
    if origin:
        key = path.archive_txt(checksum, origin)
        return self._txts.get(key)
    for value in self._txts.iterate_values(
        prefix=path.archive_prefix(checksum), glob="*.txt"
    ):
        return value

iterate_files()

Iterate all file metadata in the archive.

Source code in ftm_lakehouse/repository/archive.py
def iterate_files(self) -> Files:
    """Iterate all file metadata in the archive."""
    yield from self._files.iterate_values(path.ARCHIVE, glob="**/*.json")

local_path(checksum)

Get the local path to the blob.

If storage is local, returns actual path. Otherwise, creates a temporary local copy that is cleaned up after context exit.

Source code in ftm_lakehouse/repository/archive.py
def local_path(self, checksum: str) -> ContextManager[Path]:
    """
    Get the local path to the blob.

    If storage is local, returns actual path. Otherwise, creates
    a temporary local copy that is cleaned up after context exit.
    """
    return self._store.local_path(path.archive_blob(checksum))

open(checksum)

Get a file-like handle for reading.

Source code in ftm_lakehouse/repository/archive.py
def open(self, checksum: str) -> ContextManager[IO[bytes]]:
    """Get a file-like handle for reading."""
    return self._store.open(path.archive_blob(checksum), mode=DEFAULT_MODE)

put_data(checksum, path, data)

Store raw data at the given path

Source code in ftm_lakehouse/repository/archive.py
def put_data(self, checksum: str, path: str, data: bytes) -> None:
    """Store raw data at the given path"""
    key = join_relpaths(make_checksum_key(checksum), path)
    self._store.put(key, data)

put_file(file)

Store file metadata object.

Source code in ftm_lakehouse/repository/archive.py
def put_file(self, file: File) -> File:
    """Store file metadata object."""
    file.store = str(self.uri)
    file.dataset = self.dataset
    self._files.put(file.meta_path, file)
    return file

put_txt(checksum, text, origin=DEFAULT_ORIGIN)

Store extracted text for a file.

Source code in ftm_lakehouse/repository/archive.py
def put_txt(self, checksum: str, text: str, origin: str = DEFAULT_ORIGIN) -> None:
    """Store extracted text for a file."""
    origin = origin or DEFAULT_ORIGIN
    key = path.archive_txt(checksum, origin)
    self._txts.put(key, text)

store(uri, file=None, checksum=None, **metadata)

Archive a file from a local or remote URI.

The blob is stored once per checksum, but each unique source path creates its own metadata file (keyed by File.id).

Parameters:

Name Type Description Default
uri Uri

Local or remote URI to the file

required
file File | None

Optional metadata file object to patch

None
checksum str | None

Content hash (skip computation if provided)

None
**metadata Any

Additional data to store in file's extra field, including FollowTheMoney properties for the Document schema

{}

Returns:

Type Description
File

File metadata object

Source code in ftm_lakehouse/repository/archive.py
def store(
    self,
    uri: Uri,
    file: File | None = None,
    checksum: str | None = None,
    **metadata: Any,
) -> File:
    """
    Archive a file from a local or remote URI.

    The blob is stored once per checksum, but each unique source path
    creates its own metadata file (keyed by File.id).

    Args:
        uri: Local or remote URI to the file
        file: Optional metadata file object to patch
        checksum: Content hash (skip computation if provided)
        **metadata: Additional data to store in file's extra field, including
            FollowTheMoney properties for the `Document` schema

    Returns:
        File metadata object
    """
    resource = UriResource(uri)

    # store bytes blob (skipped if already exists)
    checksum = self.store_blob(uri, checksum)

    # file metadata
    if file is None:
        info = resource.info()
        file = File.from_info(info, checksum)

    file.checksum = checksum

    for key in list(metadata.keys()):
        if key in file.__class__.model_fields:
            setattr(file, key, metadata.pop(key))
    file.extra = clean_dict(metadata)
    file.dataset = self.dataset
    file.origin = file.origin or ARCHIVE_ORIGIN

    # Store metadata
    self._files.put(file.meta_path, file)
    # Notify archive was updated
    self._tags.set(tag.ARCHIVE_UPDATED)

    self.log.info(
        f"Archived `{file.key} ({file.checksum})`",
        checksum=file.checksum,
    )

    return file

store_blob(uri, checksum=None)

Store bytes blob from given uri if it doesn't exist yet.

Parameters:

Name Type Description Default
uri Uri

Local or remote URI to the file

required
checksum str | None

Content hash (skip computation if provided)

None

Returns:

Type Description
str

checksum

Source code in ftm_lakehouse/repository/archive.py
def store_blob(self, uri: Uri, checksum: str | None = None) -> str:
    """
    Store bytes blob from given uri if it doesn't exist yet.

    Args:
        uri: Local or remote URI to the file
        checksum: Content hash (skip computation if provided)

    Returns:
        checksum
    """
    if checksum:
        validate_checksum(checksum)
    if checksum and self.exists(checksum):
        self.log.debug("Blob already exists, skipping", checksum=checksum)
        return checksum

    with open_virtual(uri, algorithm=CHECKSUM_ALGORITHM) as fh:
        if self.exists(fh.checksum):
            self.log.debug("Blob already exists, skipping", checksum=fh.checksum)
            return fh.checksum

        self.log.info(f"Storing blob `{fh.checksum}` ...", checksum=fh.checksum)
        self.write_blob(fh, fh.checksum)
        return fh.checksum

stream(checksum)

Stream blob contents as bytes.

Source code in ftm_lakehouse/repository/archive.py
def stream(self, checksum: str) -> BytesGenerator:
    """Stream blob contents as bytes."""
    yield from self._store.stream(path.archive_blob(checksum))

write_blob(fh, checksum=None)

Write a blob from the given open file-handler

Source code in ftm_lakehouse/repository/archive.py
def write_blob(self, fh: BinaryIO, checksum: str | None = None) -> str:
    """Write a blob from the given open file-handler"""
    if checksum and self.exists(checksum):
        self.log.debug("Blob already exists, skipping", checksum=checksum)
        return checksum
    if not checksum:
        checksum = make_checksum(fh)
        if self.exists(checksum):
            self.log.debug("Blob already exists, skipping", checksum=checksum)
            return checksum
        fh.seek(0)
    with self._store.open(path.archive_blob(checksum), "wb") as out:
        stream(fh, out, CHUNK_SIZE_LARGE)
    return checksum

EntityRepository

Entity/statement operations combining JournalStore and ParquetStore.

dataset.entities.add(entity, origin="import")
dataset.entities.writer(origin="import")
dataset.entities.flush()
dataset.entities.query(origin="import")

ftm_lakehouse.repository.EntityRepository

Bases: ParquetDiffMixin, BaseRepository, ApiEntityRepository

Repository for entity/statement operations.

Combines JournalStore (write-ahead buffer) and ParquetStore (Delta Lake) to provide buffered statement storage with efficient querying.

Writes go to the journal first, then are flushed to the parquet store. Reads query the parquet store (optionally flushing first).

Example
repo = EntityRepository(uri="s3://bucket/dataset", dataset="my_data")

# Write entities
with repo.writer(origin="import") as writer:
    writer.add_entity(entity)

# Flush to parquet
repo.flush()

# Query entities
for entity in repo.query(origin="import"):
    process(entity)
Source code in ftm_lakehouse/repository/entities/main.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
class EntityRepository(ParquetDiffMixin, BaseRepository, ApiEntityRepository):
    """
    Repository for entity/statement operations.

    Combines JournalStore (write-ahead buffer) and ParquetStore (Delta Lake)
    to provide buffered statement storage with efficient querying.

    Writes go to the journal first, then are flushed to the parquet store.
    Reads query the parquet store (optionally flushing first).

    Example:
        ```python
        repo = EntityRepository(uri="s3://bucket/dataset", dataset="my_data")

        # Write entities
        with repo.writer(origin="import") as writer:
            writer.add_entity(entity)

        # Flush to parquet
        repo.flush()

        # Query entities
        for entity in repo.query(origin="import"):
            process(entity)
        ```
    """

    def __init__(
        self,
        dataset: str,
        uri: Uri,
    ) -> None:
        super().__init__(dataset, uri)
        self._journal = get_journal(dataset)
        self._statements = ParquetStore(uri, dataset)
        self._store = get_store(self._store_uri)

    @contextmanager
    def writer(
        self, origin: str | None = None
    ) -> Generator[BaseJournalWriter, None, None]:
        """
        Get a bulk writer for adding entities/statements.

        Usage:
            with repo.writer(origin="import") as writer:
                writer.add_entity(entity)
        """
        with self._tags.touch(tag.JOURNAL_UPDATED):
            writer = self._journal.writer(origin)
            try:
                yield writer
            except BaseException:
                writer.rollback()
                raise
            else:
                writer.flush()
            finally:
                writer.close()

    def add(self, entity: EntityProxy, origin: str | None = None) -> None:
        """Add a single entity to the journal."""
        self.add_many([entity], origin)

    def add_many(
        self, entities: Iterable[EntityProxy], origin: str | None = None
    ) -> None:
        """Add an entity iterator to the journal."""
        with self.writer(origin) as writer:
            for entity in entities:
                writer.add_entity(entity)
        if self._journal.count() >= 1_000_000:
            self.flush()

    @api_delegate("_api_flush")
    def flush(self) -> int:
        """
        Flush statements from journal to parquet store.

        Uses dedup logic:
        - New statements (not in main table): append to main + insert into translog
        - Duplicate statements (already in main): update translog last_seen only
        - Tombstones (deleted_at set): update translog deleted_at only

        Returns:
            Number of new statements flushed to the main table
        """
        if self._journal.count() == 0:
            self.log.debug("Journal is empty", journal=mask_uri(self._journal.uri))
            # set tags for the initial run
            if not self._tags.exists(tag.JOURNAL_FLUSHED):
                self._tags.set(tag.JOURNAL_FLUSHED)
            if not self._tags.exists(tag.STATEMENTS_UPDATED):
                self._tags.set(tag.STATEMENTS_UPDATED)
            return 0

        with (
            self._tags.touch(tag.JOURNAL_FLUSHED),
            self._tags.touch(tag.STATEMENTS_UPDATED),
            Took() as t,
        ):
            self.log.info("Flushing journal ...", journal=mask_uri(self._journal.uri))

            con = None
            if self._statements.exists:
                con = make_dedup_connection(self._statements._store.deltatable)

            total_new = 0
            current_bucket: str | None = None
            current_origin: str | None = None
            current_batch: list[dict] = []

            for _, bucket, origin, _, data, deleted_at in self._journal.flush():
                # Write batch when partition changes
                if bucket != current_bucket or origin != current_origin:
                    if current_batch:
                        total_new += self._write_flush_batch(current_batch, con)
                    current_batch = []
                    current_bucket = bucket
                    current_origin = origin

                if deleted_at is not None:
                    row = unpack_tombstone_row(data)
                    row["_deleted_at"] = deleted_at
                else:
                    stmt = unpack_statement(data)
                    row = lake_pack_statement(stmt)
                current_batch.append(row)

            # Write final batch
            if current_batch:
                total_new += self._write_flush_batch(current_batch, con)

            self.log.info(
                "Flushed statements from journal to lake",
                count=total_new,
                took=t.took,
                journal=mask_uri(self._journal.uri),
            )

            return total_new

    @no_api
    def _write_flush_batch(
        self, batch: list[dict], con: duckdb.DuckDBPyConnection | None
    ) -> int:
        """Write a partition batch with three-way split.

        1. Tombstones → translog mark_deleted only
        2. New rows (anti-join with existing IDs) → write main table + translog upsert
        3. Duplicate rows (semi-join) → translog upsert only (updates last_seen)

        Returns:
            Number of new rows written to main table
        """
        now = datetime.now(timezone.utc)

        # Split tombstones from live rows
        tombstones = [r for r in batch if r.get("_deleted_at") is not None]
        live = [r for r in batch if r.get("_deleted_at") is None]

        # Handle tombstones → translog only
        if tombstones:
            tomb_ids = [r["id"] for r in tombstones]
            tomb_deleted_at = [r["_deleted_at"] for r in tombstones]
            tomb_table = pa.table(
                {
                    "id": pa.array(tomb_ids, type=pa.string()),
                    "deleted_at": pa.array(tomb_deleted_at, type=pa.timestamp("us")),
                }
            )
            self._statements._translog.mark_deleted(tomb_table)

        if not live:
            return 0

        # Build translog rows for all live statements
        live_ids = [r["id"] for r in live]
        live_first_seen = [r.get("first_seen") or now for r in live]
        live_last_seen = [r.get("last_seen") or now for r in live]

        if con is not None:
            # Determine which are new vs duplicates
            batch_ids_table = pa.table({"id": pa.array(live_ids, type=pa.string())})
            con.register("batch_ids", batch_ids_table)

            new_ids_result = con.execute(
                "SELECT b.id FROM batch_ids b "
                "LEFT JOIN existing_ids e ON b.id = e.id "
                "WHERE e.id IS NULL"
            ).fetchall()
            new_id_set = {r[0] for r in new_ids_result}
            con.unregister("batch_ids")

            new_rows = [r for r in live if r["id"] in new_id_set]
        else:
            # First flush — all rows are new
            new_rows = live

        # Upsert all live rows into translog (new + dupes)
        translog_table = pa.table(
            {
                "id": pa.array(live_ids, type=pa.string()),
                "first_seen": pa.array(live_first_seen, type=pa.timestamp("us")),
                "last_seen": pa.array(live_last_seen, type=pa.timestamp("us")),
                "deleted_at": pa.array([None] * len(live_ids), type=pa.timestamp("us")),
            },
            schema=TRANSLOG_SCHEMA,
        )
        self._statements._translog.upsert(translog_table)

        # Write only new rows to main table
        if new_rows:
            table = pa.Table.from_pylist(new_rows, schema=ARROW_SCHEMA)
            write_deltalake(
                str(self._statements.uri),
                table,
                partition_by=PARTITIONS,
                mode="append",
                schema_mode="merge",
                writer_properties=WRITER,
                target_file_size=TARGET_SIZE,
                storage_options=storage_options(),
                configuration={"delta.enableChangeDataFeed": "true"},
            )

            # Update dedup connection for subsequent batches in the same flush
            if con is not None:
                new_ids_arr = pa.table(
                    {"id": pa.array([r["id"] for r in new_rows], type=pa.string())}
                )
                con.register("_new_batch", new_ids_arr)
                con.execute("INSERT INTO existing_ids SELECT id FROM _new_batch")
                con.unregister("_new_batch")

        return len(new_rows)

    @api_delegate("_api_query")
    def query(
        self,
        entity_ids: Iterable[str] | None = None,
        flush_first: bool = False,
        **filters,
    ) -> StatementEntities:
        """
        Query entities from the parquet store.

        Additional filter kwargs are passed to ftmq Query.

        Args:
            entity_ids: Filter by entity IDs
            flush_first: Flush journal before querying (default False)

        Yields:
            StatementEntity objects matching the query
        """
        if flush_first:
            self.flush()

        if entity_ids:
            filters["entity_id__in"] = list(entity_ids)
        q = Query().where(**filters)

        yield from self._statements.query(q)

    @api_delegate("_api_query_statements")
    def query_statements(self, q: Query | None = None) -> Statements:
        q = q or Query()
        sql = q.sql.statements
        yield from self._statements.query_statements(sql)

    def get(
        self,
        entity_id: str,
        origin: str | None = None,
        flush_first: bool = False,
    ) -> StatementEntity | None:
        """Get a single entity by ID."""
        for entity in self.query([entity_id], flush_first, origin=origin):
            return entity
        return None

    def stream(self) -> ValueEntities:
        """
        Stream entities from the exported JSON file.

        This reads from the pre-exported entities.ftm.json file,
        not directly from the parquet store.
        """
        if self._store.exists(path.ENTITIES_JSON):
            uri = self._store.to_uri(path.ENTITIES_JSON)
            yield from smart_read_proxies(uri)

    @no_api
    def export_entities(self, output_uri: str) -> None:
        """
        Export entities to a JSON lines file without FtM object construction.

        Uses query_raw() / aggregate_unsafe() to bypass
        Statement/StatementEntity/to_dict() and writes directly to orjson output.

        Args:
            output_uri: Destination URI for the entities.ftm.json file
        """
        self._store.ensure_parent(path.ENTITIES_JSON)
        with smart_open(output_uri, mode="wb") as fh:
            for entity in self._statements.query_raw():
                fh.write(orjson.dumps(entity, option=orjson.OPT_APPEND_NEWLINE))

    @api_delegate("_api_delete_entity")
    def delete_entity(self, entity_id: str) -> int:
        """Delete all statements for an entity via journal tombstones.

        Reads statements from both parquet and journal, then UPSERTs
        tombstone rows (with deleted_at set) into the journal.

        Args:
            entity_id: The entity ID to delete

        Returns:
            Number of tombstone statements written
        """
        now = datetime.now(timezone.utc)
        stmts = self._collect_entity_statements(entity_id)
        if not stmts:
            return 0
        with self._journal.writer() as w:
            for stmt in stmts:
                w.add_statement(stmt, deleted_at=now)
        self._tags.set(tag.JOURNAL_UPDATED)
        return len(stmts)

    def delete_statement(self, stmt: Statement) -> None:
        """Delete a single statement via journal tombstone.

        Args:
            stmt: The Statement to delete
        """
        with self._tags.touch(tag.JOURNAL_UPDATED):
            now = datetime.now(timezone.utc)
            with self._journal.writer() as w:
                w.add_statement(stmt, deleted_at=now)

    @no_api
    def _collect_entity_statements(self, entity_id: str) -> list[Statement]:
        """Read all statements for an entity from parquet + journal.

        Uses translog join when available to filter already-deleted statements.
        """
        stmts_by_id: dict[str, Statement] = {}
        journal = cast(SqlJournalStore, self._journal)

        # Read from parquet (with translog filtering if available)
        if self._statements.exists:
            dt = self._statements._store.deltatable
            translog = self._statements._translog

            if translog.exists:
                translog_dt = translog.deltatable
                con = duckdb.connect()
                con.register("arrow", dt.to_pyarrow_dataset())
                con.register("translog", translog_dt.to_pyarrow_dataset())
                result = con.sql(
                    "SELECT arrow.* FROM arrow "
                    "JOIN translog sc ON arrow.id = sc.id "
                    "WHERE sc.deleted_at IS NULL "
                    f"AND arrow.entity_id = '{entity_id}'"
                )
            else:
                rel = duckdb.arrow(dt.to_pyarrow_dataset())
                result = rel.query(
                    "arrow",
                    f"SELECT * FROM arrow WHERE entity_id = '{entity_id}'",
                )

            for row in result.fetchall():
                row_dict = dict(zip(result.columns, row))
                stmt = Statement.from_dict(row_dict)
                if stmt.id:
                    stmts_by_id[stmt.id] = stmt

        # Read from journal (may override parquet entries)
        q = (
            select(journal.table)
            .where(journal.table.c.canonical_id == entity_id)
            .where(journal.table.c.deleted_at.is_(None))
        )
        with journal.engine.connect() as conn:
            for row in conn.execute(q):
                stmt = unpack_statement(row.data)
                if stmt.id:
                    stmts_by_id[stmt.id] = stmt

        return list(stmts_by_id.values())

    @api_delegate("_api_stats")
    def get_statistics(self) -> DatasetStats:
        """Compute statistics from the parquet store."""
        return self._statements.stats()

    @property
    def version(self) -> int | None:
        """Current version of the main Delta table."""
        if self._is_api:
            return self._api_version()
        return self._statements.version

    # DiffMixin implementation

    _diff_base_path = path.DIFFS_ENTITIES

    @no_api
    def _filter_changes(
        self,
        changes: Generator[tuple[datetime, str, dict], None, None],
    ) -> set[str]:
        """Track all entity IDs that have any statement change."""
        changed_entity_ids: set[str] = set()
        for _, change_type, row in changes:
            if change_type in ("insert", "update_postimage"):
                changed_entity_ids.add(row["entity_id"])
        return changed_entity_ids

    @no_api
    def _write_diff(self, entity_ids: set[str], v: int, ts: datetime, **kwargs) -> str:
        """Write entities as line-based JSON with operation envelopes."""
        key = path.entities_diff(v, ts)
        with self._store.open(key, "wb") as o:
            smart_write_json(o, self._get_delta_entities(entity_ids))
        return self._store.to_uri(key)

    @no_api
    def _get_delta_entities(self, entity_ids: set[str]) -> Generator[SDict, None, None]:
        seen_ids: set[str] = set()
        it = iter(entity_ids)
        while batch := list(islice(it, QUERY_IN_BATCH_SIZE)):
            for entity in self.query(entity_ids=batch, flush_first=False):
                if entity.id:
                    seen_ids.add(entity.id)
                yield make_envelope(entity.to_dict())
        for entity_id in entity_ids - seen_ids:
            yield make_envelope({"id": entity_id}, op="DEL")

    @no_api
    def _write_initial_diff(self, version: int, ts: datetime, **kwargs) -> None:
        """Copy over exported entities.ftm.json to initial diff version"""
        with self._store.open(path.entities_diff(version, ts), "wb") as o:
            for data in self._store.stream(path.ENTITIES_JSON):
                line = orjson.dumps(
                    make_envelope(data), option=orjson.OPT_APPEND_NEWLINE
                )
                o.write(line)

version property

Current version of the main Delta table.

add(entity, origin=None)

Add a single entity to the journal.

Source code in ftm_lakehouse/repository/entities/main.py
def add(self, entity: EntityProxy, origin: str | None = None) -> None:
    """Add a single entity to the journal."""
    self.add_many([entity], origin)

add_many(entities, origin=None)

Add an entity iterator to the journal.

Source code in ftm_lakehouse/repository/entities/main.py
def add_many(
    self, entities: Iterable[EntityProxy], origin: str | None = None
) -> None:
    """Add an entity iterator to the journal."""
    with self.writer(origin) as writer:
        for entity in entities:
            writer.add_entity(entity)
    if self._journal.count() >= 1_000_000:
        self.flush()

delete_entity(entity_id)

Delete all statements for an entity via journal tombstones.

Reads statements from both parquet and journal, then UPSERTs tombstone rows (with deleted_at set) into the journal.

Parameters:

Name Type Description Default
entity_id str

The entity ID to delete

required

Returns:

Type Description
int

Number of tombstone statements written

Source code in ftm_lakehouse/repository/entities/main.py
@api_delegate("_api_delete_entity")
def delete_entity(self, entity_id: str) -> int:
    """Delete all statements for an entity via journal tombstones.

    Reads statements from both parquet and journal, then UPSERTs
    tombstone rows (with deleted_at set) into the journal.

    Args:
        entity_id: The entity ID to delete

    Returns:
        Number of tombstone statements written
    """
    now = datetime.now(timezone.utc)
    stmts = self._collect_entity_statements(entity_id)
    if not stmts:
        return 0
    with self._journal.writer() as w:
        for stmt in stmts:
            w.add_statement(stmt, deleted_at=now)
    self._tags.set(tag.JOURNAL_UPDATED)
    return len(stmts)

delete_statement(stmt)

Delete a single statement via journal tombstone.

Parameters:

Name Type Description Default
stmt Statement

The Statement to delete

required
Source code in ftm_lakehouse/repository/entities/main.py
def delete_statement(self, stmt: Statement) -> None:
    """Delete a single statement via journal tombstone.

    Args:
        stmt: The Statement to delete
    """
    with self._tags.touch(tag.JOURNAL_UPDATED):
        now = datetime.now(timezone.utc)
        with self._journal.writer() as w:
            w.add_statement(stmt, deleted_at=now)

export_entities(output_uri)

Export entities to a JSON lines file without FtM object construction.

Uses query_raw() / aggregate_unsafe() to bypass Statement/StatementEntity/to_dict() and writes directly to orjson output.

Parameters:

Name Type Description Default
output_uri str

Destination URI for the entities.ftm.json file

required
Source code in ftm_lakehouse/repository/entities/main.py
@no_api
def export_entities(self, output_uri: str) -> None:
    """
    Export entities to a JSON lines file without FtM object construction.

    Uses query_raw() / aggregate_unsafe() to bypass
    Statement/StatementEntity/to_dict() and writes directly to orjson output.

    Args:
        output_uri: Destination URI for the entities.ftm.json file
    """
    self._store.ensure_parent(path.ENTITIES_JSON)
    with smart_open(output_uri, mode="wb") as fh:
        for entity in self._statements.query_raw():
            fh.write(orjson.dumps(entity, option=orjson.OPT_APPEND_NEWLINE))

flush()

Flush statements from journal to parquet store.

Uses dedup logic: - New statements (not in main table): append to main + insert into translog - Duplicate statements (already in main): update translog last_seen only - Tombstones (deleted_at set): update translog deleted_at only

Returns:

Type Description
int

Number of new statements flushed to the main table

Source code in ftm_lakehouse/repository/entities/main.py
@api_delegate("_api_flush")
def flush(self) -> int:
    """
    Flush statements from journal to parquet store.

    Uses dedup logic:
    - New statements (not in main table): append to main + insert into translog
    - Duplicate statements (already in main): update translog last_seen only
    - Tombstones (deleted_at set): update translog deleted_at only

    Returns:
        Number of new statements flushed to the main table
    """
    if self._journal.count() == 0:
        self.log.debug("Journal is empty", journal=mask_uri(self._journal.uri))
        # set tags for the initial run
        if not self._tags.exists(tag.JOURNAL_FLUSHED):
            self._tags.set(tag.JOURNAL_FLUSHED)
        if not self._tags.exists(tag.STATEMENTS_UPDATED):
            self._tags.set(tag.STATEMENTS_UPDATED)
        return 0

    with (
        self._tags.touch(tag.JOURNAL_FLUSHED),
        self._tags.touch(tag.STATEMENTS_UPDATED),
        Took() as t,
    ):
        self.log.info("Flushing journal ...", journal=mask_uri(self._journal.uri))

        con = None
        if self._statements.exists:
            con = make_dedup_connection(self._statements._store.deltatable)

        total_new = 0
        current_bucket: str | None = None
        current_origin: str | None = None
        current_batch: list[dict] = []

        for _, bucket, origin, _, data, deleted_at in self._journal.flush():
            # Write batch when partition changes
            if bucket != current_bucket or origin != current_origin:
                if current_batch:
                    total_new += self._write_flush_batch(current_batch, con)
                current_batch = []
                current_bucket = bucket
                current_origin = origin

            if deleted_at is not None:
                row = unpack_tombstone_row(data)
                row["_deleted_at"] = deleted_at
            else:
                stmt = unpack_statement(data)
                row = lake_pack_statement(stmt)
            current_batch.append(row)

        # Write final batch
        if current_batch:
            total_new += self._write_flush_batch(current_batch, con)

        self.log.info(
            "Flushed statements from journal to lake",
            count=total_new,
            took=t.took,
            journal=mask_uri(self._journal.uri),
        )

        return total_new

get(entity_id, origin=None, flush_first=False)

Get a single entity by ID.

Source code in ftm_lakehouse/repository/entities/main.py
def get(
    self,
    entity_id: str,
    origin: str | None = None,
    flush_first: bool = False,
) -> StatementEntity | None:
    """Get a single entity by ID."""
    for entity in self.query([entity_id], flush_first, origin=origin):
        return entity
    return None

get_statistics()

Compute statistics from the parquet store.

Source code in ftm_lakehouse/repository/entities/main.py
@api_delegate("_api_stats")
def get_statistics(self) -> DatasetStats:
    """Compute statistics from the parquet store."""
    return self._statements.stats()

query(entity_ids=None, flush_first=False, **filters)

Query entities from the parquet store.

Additional filter kwargs are passed to ftmq Query.

Parameters:

Name Type Description Default
entity_ids Iterable[str] | None

Filter by entity IDs

None
flush_first bool

Flush journal before querying (default False)

False

Yields:

Type Description
StatementEntities

StatementEntity objects matching the query

Source code in ftm_lakehouse/repository/entities/main.py
@api_delegate("_api_query")
def query(
    self,
    entity_ids: Iterable[str] | None = None,
    flush_first: bool = False,
    **filters,
) -> StatementEntities:
    """
    Query entities from the parquet store.

    Additional filter kwargs are passed to ftmq Query.

    Args:
        entity_ids: Filter by entity IDs
        flush_first: Flush journal before querying (default False)

    Yields:
        StatementEntity objects matching the query
    """
    if flush_first:
        self.flush()

    if entity_ids:
        filters["entity_id__in"] = list(entity_ids)
    q = Query().where(**filters)

    yield from self._statements.query(q)

stream()

Stream entities from the exported JSON file.

This reads from the pre-exported entities.ftm.json file, not directly from the parquet store.

Source code in ftm_lakehouse/repository/entities/main.py
def stream(self) -> ValueEntities:
    """
    Stream entities from the exported JSON file.

    This reads from the pre-exported entities.ftm.json file,
    not directly from the parquet store.
    """
    if self._store.exists(path.ENTITIES_JSON):
        uri = self._store.to_uri(path.ENTITIES_JSON)
        yield from smart_read_proxies(uri)

writer(origin=None)

Get a bulk writer for adding entities/statements.

Usage

with repo.writer(origin="import") as writer: writer.add_entity(entity)

Source code in ftm_lakehouse/repository/entities/main.py
@contextmanager
def writer(
    self, origin: str | None = None
) -> Generator[BaseJournalWriter, None, None]:
    """
    Get a bulk writer for adding entities/statements.

    Usage:
        with repo.writer(origin="import") as writer:
            writer.add_entity(entity)
    """
    with self._tags.touch(tag.JOURNAL_UPDATED):
        writer = self._journal.writer(origin)
        try:
            yield writer
        except BaseException:
            writer.rollback()
            raise
        else:
            writer.flush()
        finally:
            writer.close()

MappingRepository

Mapping configuration storage.

dataset.mappings.put(mapping)
dataset.mappings.get(content_hash)
dataset.mappings.list()

ftm_lakehouse.repository.MappingRepository

Bases: BaseRepository

Repository for mapping configuration storage.

Combines FileStore (current configs) and VersionStore (snapshots) to provide versioned mapping configuration storage.

Each mapping is identified by a content_hash (SHA1 of the source CSV file).

Example
repo = MappingRepository(dataset="my_data", uri="s3://bucket/dataset")

# Store a mapping configuration
repo.put(mapping)

# Get a mapping configuration
mapping = repo.get(content_hash)

# List all mappings
for content_hash in repo.list():
    print(content_hash)
Source code in ftm_lakehouse/repository/mapping.py
class MappingRepository(BaseRepository):
    """
    Repository for mapping configuration storage.

    Combines FileStore (current configs) and VersionStore (snapshots)
    to provide versioned mapping configuration storage.

    Each mapping is identified by a content_hash (SHA1 of the source CSV file).

    Example:
        ```python
        repo = MappingRepository(dataset="my_data", uri="s3://bucket/dataset")

        # Store a mapping configuration
        repo.put(mapping)

        # Get a mapping configuration
        mapping = repo.get(content_hash)

        # List all mappings
        for content_hash in repo.list():
            print(content_hash)
        ```
    """

    def __init__(self, dataset: str, uri: Uri) -> None:
        super().__init__(dataset, uri)
        self._versions = VersionedModelStore(self._store_uri, DatasetMapping)

    def exists(self, content_hash: str) -> bool:
        """Check if a mapping configuration exists."""
        mapping_path = path.mapping(content_hash)
        return self._versions.exists(mapping_path)

    def get(self, content_hash: str) -> DatasetMapping:
        """
        Get a mapping configuration by content hash.

        Args:
            content_hash: SHA1 checksum of the source CSV file

        Returns:
            DatasetMapping if exists, None otherwise
        """
        mapping_path = path.mapping(content_hash)
        return self._versions.get(mapping_path)

    def put(self, mapping: DatasetMapping) -> str:
        """
        Store a mapping configuration.

        Creates both a current config and a versioned snapshot.

        Args:
            mapping: The mapping configuration to store

        Returns:
            The current version path
        """
        content_hash = mapping.content_hash
        mapping_path = path.mapping(content_hash)
        key = self._versions.make(mapping_path, mapping)

        self.log.info(
            f"Stored mapping `{mapping_path}`",
            content_hash=content_hash,
            version=key,
        )

        return key

    def delete(self, content_hash: str) -> None:
        """Delete a mapping configuration."""
        mapping_path = path.mapping(content_hash)
        self._versions.delete(mapping_path)
        self.log.warning("Deleted mapping", content_hash=content_hash)

    def list(self) -> Generator[str, None, None]:
        """
        List all content hashes that have mapping configurations.

        Yields:
            Content hash strings for files with mapping.yml configs
        """
        # Glob matches only direct mapping.yml, not versioned copies
        for key in self._versions.iterate_keys(
            prefix=path.MAPPINGS, glob=f"*/{path.MAPPING}"
        ):
            # Keys look like: mappings/<content_hash>/mapping.yml
            parts = key.split("/")
            if len(parts) == 3:
                yield parts[1]  # content_hash

    def iterate(self) -> Generator[DatasetMapping, None, None]:
        """Iterate all mapping configurations."""
        for content_hash in self.list():
            yield self.get(content_hash)

delete(content_hash)

Delete a mapping configuration.

Source code in ftm_lakehouse/repository/mapping.py
def delete(self, content_hash: str) -> None:
    """Delete a mapping configuration."""
    mapping_path = path.mapping(content_hash)
    self._versions.delete(mapping_path)
    self.log.warning("Deleted mapping", content_hash=content_hash)

exists(content_hash)

Check if a mapping configuration exists.

Source code in ftm_lakehouse/repository/mapping.py
def exists(self, content_hash: str) -> bool:
    """Check if a mapping configuration exists."""
    mapping_path = path.mapping(content_hash)
    return self._versions.exists(mapping_path)

get(content_hash)

Get a mapping configuration by content hash.

Parameters:

Name Type Description Default
content_hash str

SHA1 checksum of the source CSV file

required

Returns:

Type Description
DatasetMapping

DatasetMapping if exists, None otherwise

Source code in ftm_lakehouse/repository/mapping.py
def get(self, content_hash: str) -> DatasetMapping:
    """
    Get a mapping configuration by content hash.

    Args:
        content_hash: SHA1 checksum of the source CSV file

    Returns:
        DatasetMapping if exists, None otherwise
    """
    mapping_path = path.mapping(content_hash)
    return self._versions.get(mapping_path)

iterate()

Iterate all mapping configurations.

Source code in ftm_lakehouse/repository/mapping.py
def iterate(self) -> Generator[DatasetMapping, None, None]:
    """Iterate all mapping configurations."""
    for content_hash in self.list():
        yield self.get(content_hash)

list()

List all content hashes that have mapping configurations.

Yields:

Type Description
str

Content hash strings for files with mapping.yml configs

Source code in ftm_lakehouse/repository/mapping.py
def list(self) -> Generator[str, None, None]:
    """
    List all content hashes that have mapping configurations.

    Yields:
        Content hash strings for files with mapping.yml configs
    """
    # Glob matches only direct mapping.yml, not versioned copies
    for key in self._versions.iterate_keys(
        prefix=path.MAPPINGS, glob=f"*/{path.MAPPING}"
    ):
        # Keys look like: mappings/<content_hash>/mapping.yml
        parts = key.split("/")
        if len(parts) == 3:
            yield parts[1]  # content_hash

put(mapping)

Store a mapping configuration.

Creates both a current config and a versioned snapshot.

Parameters:

Name Type Description Default
mapping DatasetMapping

The mapping configuration to store

required

Returns:

Type Description
str

The current version path

Source code in ftm_lakehouse/repository/mapping.py
def put(self, mapping: DatasetMapping) -> str:
    """
    Store a mapping configuration.

    Creates both a current config and a versioned snapshot.

    Args:
        mapping: The mapping configuration to store

    Returns:
        The current version path
    """
    content_hash = mapping.content_hash
    mapping_path = path.mapping(content_hash)
    key = self._versions.make(mapping_path, mapping)

    self.log.info(
        f"Stored mapping `{mapping_path}`",
        content_hash=content_hash,
        version=key,
    )

    return key

JobRepository

Job tracking and status.

dataset.jobs.put(job)
dataset.jobs.get(run_id)

ftm_lakehouse.repository.JobRepository

Bases: BaseRepository, Generic[J]

Repository for job run storage.

Persists job run data as JSON files, organized by job type and run ID.

Example
repo = JobRepository(dataset="my_data", uri="s3://bucket/dataset")

# Store a job run
repo.put(job)

# Get latest run for a job type
job = repo.latest(CrawlJob)

# Run a job with lifecycle management
with repo.run(job) as run:
    # Do work...
    run.save()  # Periodic save
# Job automatically stopped when context exits
Source code in ftm_lakehouse/repository/job.py
class JobRepository(BaseRepository, Generic[J]):
    """
    Repository for job run storage.

    Persists job run data as JSON files,
    organized by job type and run ID.

    Example:
        ```python
        repo = JobRepository(dataset="my_data", uri="s3://bucket/dataset")

        # Store a job run
        repo.put(job)

        # Get latest run for a job type
        job = repo.latest(CrawlJob)

        # Run a job with lifecycle management
        with repo.run(job) as run:
            # Do work...
            run.save()  # Periodic save
        # Job automatically stopped when context exits
        ```
    """

    def __init__(self, dataset: str, uri: Uri, model: type[J]) -> None:
        super().__init__(dataset, uri)
        self.job_type = model.__name__
        self._store = get_store(uri, model=model)

    def put(self, job: JobModel) -> None:
        """Store a job run."""
        self._store.put(path.job_run(self.job_type, job.run_id), job)

    def get(self, run_id: str) -> J:
        """Get a specific job run by type and run ID."""
        key = path.job_run(self.job_type, run_id)
        return self._store.get(key)

    def latest(self) -> J | None:
        """
        Get the latest run for the configured job type (self.model).

        Jobs are sorted by run ID (which contains timestamp),
        so the latest is the last in alphabetical order.
        """
        for key in sorted(
            self._store.iterate_keys(prefix=path.job_prefix(self.job_type)),
            reverse=True,
        ):
            return self._store.get(key)
        return None

    def iterate(self) -> Generator[J, None, None]:
        """Iterate all runs for the current job type."""
        yield from self._store.iterate_values(prefix=path.job_prefix(self.job_type))

    @contextlib.contextmanager
    def run(self, job: J) -> Generator[JobRun[J], None, None]:
        """
        Get a context manager for running a job.

        The job is automatically started on entry and stopped on exit.
        If an exception occurs, it's recorded in the job's exc field.
        """
        run = JobRun(self, job)
        try:
            run.start()
            yield run
        except Exception as e:
            run.stop(e)
            raise
        finally:
            if job.running:  # Only stop if not already stopped
                run.stop()

    def delete(self, job: J) -> None:
        """Delete a job run."""
        key = path.job_run(self.job_type, job.run_id)
        self._store.delete(key)
        self.log.warning("Deleted job run", job=job.name, run_id=job.run_id)

delete(job)

Delete a job run.

Source code in ftm_lakehouse/repository/job.py
def delete(self, job: J) -> None:
    """Delete a job run."""
    key = path.job_run(self.job_type, job.run_id)
    self._store.delete(key)
    self.log.warning("Deleted job run", job=job.name, run_id=job.run_id)

get(run_id)

Get a specific job run by type and run ID.

Source code in ftm_lakehouse/repository/job.py
def get(self, run_id: str) -> J:
    """Get a specific job run by type and run ID."""
    key = path.job_run(self.job_type, run_id)
    return self._store.get(key)

iterate()

Iterate all runs for the current job type.

Source code in ftm_lakehouse/repository/job.py
def iterate(self) -> Generator[J, None, None]:
    """Iterate all runs for the current job type."""
    yield from self._store.iterate_values(prefix=path.job_prefix(self.job_type))

latest()

Get the latest run for the configured job type (self.model).

Jobs are sorted by run ID (which contains timestamp), so the latest is the last in alphabetical order.

Source code in ftm_lakehouse/repository/job.py
def latest(self) -> J | None:
    """
    Get the latest run for the configured job type (self.model).

    Jobs are sorted by run ID (which contains timestamp),
    so the latest is the last in alphabetical order.
    """
    for key in sorted(
        self._store.iterate_keys(prefix=path.job_prefix(self.job_type)),
        reverse=True,
    ):
        return self._store.get(key)
    return None

put(job)

Store a job run.

Source code in ftm_lakehouse/repository/job.py
def put(self, job: JobModel) -> None:
    """Store a job run."""
    self._store.put(path.job_run(self.job_type, job.run_id), job)

run(job)

Get a context manager for running a job.

The job is automatically started on entry and stopped on exit. If an exception occurs, it's recorded in the job's exc field.

Source code in ftm_lakehouse/repository/job.py
@contextlib.contextmanager
def run(self, job: J) -> Generator[JobRun[J], None, None]:
    """
    Get a context manager for running a job.

    The job is automatically started on entry and stopped on exit.
    If an exception occurs, it's recorded in the job's exc field.
    """
    run = JobRun(self, job)
    try:
        run.start()
        yield run
    except Exception as e:
        run.stop(e)
        raise
    finally:
        if job.running:  # Only stop if not already stopped
            run.stop()