Skip to content

Layer 4: Operation

Multi-step workflow operations that coordinate across repositories.

Base Classes

ftm_lakehouse.operation.base.DatasetJobOperation

Bases: LakehouseApiMixin, Generic[DJ]

A (long-running) operation for a specific dataset that updates tags and checks dependencies for freshness to be able to skip this operation. The job result is stored after successful run.

Subclasses can either set class attributes target and dependencies, or override get_target() and get_dependencies() for dynamic values.

Source code in ftm_lakehouse/operation/base.py
class DatasetJobOperation(LakehouseApiMixin, Generic[DJ]):
    """
    A (long-running) operation for a specific dataset that updates tags and
    checks dependencies for freshness to be able to skip this operation. The job
    result is stored after successful run.

    Subclasses can either set class attributes `target` and `dependencies`,
    or override `get_target()` and `get_dependencies()` for dynamic values.
    """

    target: str = ""  # tag that gets touched after successful run
    dependencies: list[str] = []  # dependencies for freshness check
    _dataset: Dataset

    def __init__(
        self,
        job: DJ,
        archive: ArchiveRepository | None = None,
        entities: EntityRepository | None = None,
        documents: DocumentRepository | None = None,
        jobs: JobRepository | None = None,
        tags: TagStore | None = None,
        versions: VersionStore | None = None,
        uri: Uri | None = None,
    ) -> None:
        self.job = job
        self.log = job.log
        self.archive = archive or get_archive(job.dataset, uri)
        self.entities = entities or get_entities(job.dataset, uri)
        self.documents = documents or get_documents(job.dataset, uri)
        self.jobs = jobs or get_jobs(job.dataset, job.__class__, uri)
        self.tags = tags or get_tags(job.dataset, uri)
        self.versions = versions or get_versions(job.dataset, uri)
        super().__init__(uri or self.archive.uri)

    @classmethod
    def from_job(cls, job: DJ, dataset: Dataset) -> Self:
        """Create an operation instance from a job and Dataset.

        Args:
            job: The job model instance
            dataset: The Dataset providing repositories and storage

        Returns:
            Configured operation instance
        """
        instance = cls(
            job=job,
            archive=dataset.archive,
            entities=dataset.entities,
            documents=dataset.documents,
            jobs=get_jobs(dataset.name, job.__class__, dataset.uri),
            tags=dataset._tags,
            versions=dataset._versions,
        )
        instance._dataset = dataset
        return instance

    def get_target(self) -> str:
        """Return the target tag. Override for dynamic values."""
        return self.target

    def get_dependencies(self) -> list[str]:
        """Return the dependencies. Override for dynamic values."""
        return self.dependencies

    def handle(self, run: JobRun, *args, **kwargs) -> None:
        raise NotImplementedError

    def _run_local(self, force: bool | None = False, *args, **kwargs) -> DJ:
        """Core run logic – orchestration + handle()."""
        target = self.get_target()
        dependencies = self.get_dependencies()

        if not force:
            if target and dependencies:
                if self.tags.is_latest(target, dependencies):
                    self.job.log.info(
                        f"Already up-to-date: `{target}`, skipping ...",
                        target=target,
                        dependencies=dependencies,
                    )
                    self.job.stop()
                    return self.job

        # Execute: Store target tag and job result on successful context leave
        with self.jobs.run(self.job) as run, self.tags.touch(target) as now:
            self.job.log.info(
                f"Start `{target}` ...",
                target=target,
                dependencies=dependencies,
                started=now,
            )
            _ = self.handle(run, *args, force=force, **kwargs)
        self.log.info(
            f"Done `{target}`.",
            target=target,
            dependencies=dependencies,
            started=now,
            took=run.job.took,
            errors=run.job.errors,
        )
        result = self.jobs.latest()
        if result is not None:
            return result
        raise RuntimeError("Result is `None`")

    @api_delegate("_api_run")
    def run(self, force: bool | None = False, *args, **kwargs) -> DJ:
        """Execute the handle function, force to run it regardless of freshness
        dependencies"""
        return self._run_local(force, *args, **kwargs)

    @require_api
    def _api_run(self, force: bool | None = False, *args, **kwargs) -> DJ:
        """Delegate run to remote api"""
        url = self._api.make_url("_api/operations")
        res = self._api.make_request(
            url,
            "POST",
            params={"force": force},
            json=self.job.model_dump(mode="json"),
        )
        return self.job.__class__(**res.json())

    def __repr__(self) -> str:
        return f"<{self.__class__.__name__}({self.job.dataset})>"

from_job(job, dataset) classmethod

Create an operation instance from a job and Dataset.

Parameters:

Name Type Description Default
job DJ

The job model instance

required
dataset Dataset

The Dataset providing repositories and storage

required

Returns:

Type Description
Self

Configured operation instance

Source code in ftm_lakehouse/operation/base.py
@classmethod
def from_job(cls, job: DJ, dataset: Dataset) -> Self:
    """Create an operation instance from a job and Dataset.

    Args:
        job: The job model instance
        dataset: The Dataset providing repositories and storage

    Returns:
        Configured operation instance
    """
    instance = cls(
        job=job,
        archive=dataset.archive,
        entities=dataset.entities,
        documents=dataset.documents,
        jobs=get_jobs(dataset.name, job.__class__, dataset.uri),
        tags=dataset._tags,
        versions=dataset._versions,
    )
    instance._dataset = dataset
    return instance

get_dependencies()

Return the dependencies. Override for dynamic values.

Source code in ftm_lakehouse/operation/base.py
def get_dependencies(self) -> list[str]:
    """Return the dependencies. Override for dynamic values."""
    return self.dependencies

get_target()

Return the target tag. Override for dynamic values.

Source code in ftm_lakehouse/operation/base.py
def get_target(self) -> str:
    """Return the target tag. Override for dynamic values."""
    return self.target

run(force=False, *args, **kwargs)

Execute the handle function, force to run it regardless of freshness dependencies

Source code in ftm_lakehouse/operation/base.py
@api_delegate("_api_run")
def run(self, force: bool | None = False, *args, **kwargs) -> DJ:
    """Execute the handle function, force to run it regardless of freshness
    dependencies"""
    return self._run_local(force, *args, **kwargs)

CrawlOperation

Batch file ingestion from a source location.

ftm_lakehouse.operation.crawl.CrawlJob

Bases: DatasetJobModel

Job model for crawl operations.

Tracks the state and configuration of a crawl job.

Attributes:

Name Type Description
uri Uri

Source location URI to crawl

prefix str | None

Include only keys with this prefix

exclude_prefix str | None

Exclude keys with this prefix

glob str | None

Include only keys matching this glob pattern

exclude_glob str | None

Exclude keys matching this glob pattern

Source code in ftm_lakehouse/operation/crawl.py
class CrawlJob(DatasetJobModel):
    """
    Job model for crawl operations.

    Tracks the state and configuration of a crawl job.

    Attributes:
        uri: Source location URI to crawl
        prefix: Include only keys with this prefix
        exclude_prefix: Exclude keys with this prefix
        glob: Include only keys matching this glob pattern
        exclude_glob: Exclude keys matching this glob pattern
    """

    uri: Uri
    prefix: str | None = None
    exclude_prefix: str | None = None
    glob: str | None = None
    exclude_glob: str | None = None
    make_entities: bool = False
    existing: HandleExistingMode | None = HandleExistingMode.skip_path

ftm_lakehouse.operation.CrawlOperation

Bases: DatasetJobOperation[CrawlJob]

Crawl workflow that archives files and creates entities.

Iterates through files in a source store, archives them to the file repository, and creates corresponding entities in the entities repository.

Example
from ftm_lakehouse.operation import CrawlOperation, CrawlJob

job = CrawlJob.make(
    uri="s3://bucket/documents",
    dataset="my_dataset",
    glob="*.pdf"
)
op = CrawlOperation(job=job)
result = op.run()
print(f"Crawled {result.done} files")
Source code in ftm_lakehouse/operation/crawl.py
class CrawlOperation(DatasetJobOperation[CrawlJob]):
    """
    Crawl workflow that archives files and creates entities.

    Iterates through files in a source store, archives them to the
    file repository, and creates corresponding entities in the
    entities repository.

    Example:
        ```python
        from ftm_lakehouse.operation import CrawlOperation, CrawlJob

        job = CrawlJob.make(
            uri="s3://bucket/documents",
            dataset="my_dataset",
            glob="*.pdf"
        )
        op = CrawlOperation(job=job)
        result = op.run()
        print(f"Crawled {result.done} files")
        ```
    """

    target = tag.OP_CRAWL

    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.source = get_store(self.job.uri)
        if self.source.is_http:
            backend_config = ensure_dict(self.source.backend_config)
            backend_config["client_kwargs"] = {
                **ensure_dict(backend_config.get("client_kwargs")),
                "timeout": aiohttp.ClientTimeout(total=3600 * 24),
            }
            self.source.backend_config = backend_config

    def get_uris(self) -> Generator[str, None, None]:
        """
        Generate file uris to crawl.

        Applies prefix, glob, and exclude filters to the source store.

        Yields:
            File uris to be crawled
        """
        self.log.info(f"Crawling `{mask_uri(self.job.uri)}` ...")
        for key in self.source.iterate_keys(
            prefix=self.job.prefix,
            exclude_prefix=self.job.exclude_prefix,
            glob=self.job.glob,
        ):
            if self.job.exclude_glob and fnmatch(key, self.job.exclude_glob):
                continue
            self.job.pending += 1
            self.job.touch()
            yield key

    def handle_crawl(self, uri: str, run: JobRun[CrawlJob]) -> datetime:
        """
        Handle a single crawl task.

        Archives the file and creates a corresponding entity.

        Args:
            uri: File uri to crawl
            run: Current job run context

        Returns:
            Timestamp when the task was processed
        """
        now = datetime.now()

        self.log.info(f"Crawling `{uri}` ...", source=mask_uri(self.source.uri))
        checksum = None
        if self.source.is_local:
            checksum = self.source.checksum(uri, algorithm=CHECKSUM_ALGORITHM)
        if not self._should_skip(uri, checksum):
            file = self.archive.store(
                self.source.to_uri(uri),
                checksum=checksum,
                key=uri,
                origin=tag.CRAWL_ORIGIN,
            )
            if self.job.make_entities:
                self.entities.add_many(file.make_entities(), tag.CRAWL_ORIGIN)
            run.job.done += 1
        return now

    def handle(self, run: JobRun, *args, **kwargs) -> None:
        for ix, task in enumerate(self.get_uris(), 1):
            if ix % 1000 == 0:
                self.log.info(
                    f"Handling task {ix} ...",
                    pending=self.job.pending,
                    done=self.job.done,
                )
                run.save()
            self.handle_crawl(task, run)
            run.job.pending -= 1
            run.job.touch()
        if self.job.make_entities:
            self.entities.flush()

    def _should_skip(self, uri: Uri, checksum: str | None) -> bool:
        if self.job.existing is None:
            return False
        if self.job.existing == HandleExistingMode.overwrite:
            return False
        if checksum is None:
            return False
        if self.job.existing == HandleExistingMode.skip_checksum:
            return self.archive.exists(checksum)
        if self.job.existing == HandleExistingMode.skip_path:
            if self.archive.exists(checksum):
                for file in self.archive.get_all_files(checksum):
                    if file.key == str(uri):
                        return True
        return False

    def _api_run(self, force: bool | None = False, *args, **kwargs) -> CrawlJob:
        """Crawl always runs locally – source files aren't on the API server."""
        return self._run_local(force, *args, **kwargs)

get_uris()

Generate file uris to crawl.

Applies prefix, glob, and exclude filters to the source store.

Yields:

Type Description
str

File uris to be crawled

Source code in ftm_lakehouse/operation/crawl.py
def get_uris(self) -> Generator[str, None, None]:
    """
    Generate file uris to crawl.

    Applies prefix, glob, and exclude filters to the source store.

    Yields:
        File uris to be crawled
    """
    self.log.info(f"Crawling `{mask_uri(self.job.uri)}` ...")
    for key in self.source.iterate_keys(
        prefix=self.job.prefix,
        exclude_prefix=self.job.exclude_prefix,
        glob=self.job.glob,
    ):
        if self.job.exclude_glob and fnmatch(key, self.job.exclude_glob):
            continue
        self.job.pending += 1
        self.job.touch()
        yield key

handle_crawl(uri, run)

Handle a single crawl task.

Archives the file and creates a corresponding entity.

Parameters:

Name Type Description Default
uri str

File uri to crawl

required
run JobRun[CrawlJob]

Current job run context

required

Returns:

Type Description
datetime

Timestamp when the task was processed

Source code in ftm_lakehouse/operation/crawl.py
def handle_crawl(self, uri: str, run: JobRun[CrawlJob]) -> datetime:
    """
    Handle a single crawl task.

    Archives the file and creates a corresponding entity.

    Args:
        uri: File uri to crawl
        run: Current job run context

    Returns:
        Timestamp when the task was processed
    """
    now = datetime.now()

    self.log.info(f"Crawling `{uri}` ...", source=mask_uri(self.source.uri))
    checksum = None
    if self.source.is_local:
        checksum = self.source.checksum(uri, algorithm=CHECKSUM_ALGORITHM)
    if not self._should_skip(uri, checksum):
        file = self.archive.store(
            self.source.to_uri(uri),
            checksum=checksum,
            key=uri,
            origin=tag.CRAWL_ORIGIN,
        )
        if self.job.make_entities:
            self.entities.add_many(file.make_entities(), tag.CRAWL_ORIGIN)
        run.job.done += 1
    return now

Export Operations

ExportStatementsOperation

Export parquet store to exports/statements.csv.

ftm_lakehouse.operation.export.ExportStatementsJob

Bases: BaseExportJob

Source code in ftm_lakehouse/operation/export.py
class ExportStatementsJob(BaseExportJob):
    target: str = path.EXPORTS_STATEMENTS

ftm_lakehouse.operation.ExportStatementsOperation

Bases: BaseExportOperation[ExportStatementsJob]

Export parquet store to statements.csv. Checks if journal needs to be flushed first. Skips if the last export is newer then last statements update.

Source code in ftm_lakehouse/operation/export.py
class ExportStatementsOperation(BaseExportOperation[ExportStatementsJob]):
    """Export parquet store to statements.csv. Checks if journal needs to be
    flushed first. Skips if the last export is newer then last statements
    update."""

    def handle(self, run: JobRun, *args, **kwargs) -> None:
        if self.ensure_flush():
            self.entities._store.ensure_parent(path.EXPORTS_STATEMENTS)
            self.entities._statements.export_csv(path.EXPORTS_STATEMENTS)
            run.job.done = 1

ExportEntitiesOperation

Export parquet store to entities.ftm.json.

ftm_lakehouse.operation.export.ExportEntitiesJob

Bases: BaseExportJob

Source code in ftm_lakehouse/operation/export.py
class ExportEntitiesJob(BaseExportJob):
    target: str = path.ENTITIES_JSON
    make_diff: bool = True

ftm_lakehouse.operation.ExportEntitiesOperation

Bases: BaseExportOperation[ExportEntitiesJob]

Export parquet store to entities.ftm.json. Checks if journal needs to be flushed first. Skips if the last export is newer then last statements update.

Source code in ftm_lakehouse/operation/export.py
class ExportEntitiesOperation(BaseExportOperation[ExportEntitiesJob]):
    """Export parquet store to entities.ftm.json. Checks if journal needs to be
    flushed first. Skips if the last export is newer then last statements
    update."""

    def _get_fresh_statements_csv(self) -> str | None:
        """Return statements.csv URI if it's at least as fresh as the store."""
        store = self.entities._store
        if not store.exists(path.EXPORTS_STATEMENTS):
            return None
        if self.tags.is_latest(tag.EXPORTS_STATEMENTS, [tag.STATEMENTS_UPDATED]):
            return store.to_uri(path.EXPORTS_STATEMENTS)
        return None

    def handle(self, run: JobRun[ExportEntitiesJob], *args, **kwargs) -> None:
        if self.ensure_flush():
            csv_uri = self._get_fresh_statements_csv()
            self.entities.export_entities(statements_csv_uri=csv_uri)
            if run.job.make_diff:
                self.entities.export_diff()
            run.job.done = 1

ExportStatisticsOperation

Export statistics to exports/statistics.json.

ftm_lakehouse.operation.export.ExportStatisticsJob

Bases: BaseExportJob

Source code in ftm_lakehouse/operation/export.py
class ExportStatisticsJob(BaseExportJob):
    target: str = path.EXPORTS_STATISTICS

ftm_lakehouse.operation.ExportStatisticsOperation

Bases: BaseExportOperation[ExportStatisticsJob]

Export parquet store statistics to statistics.json. Checks if journal needs to be flushed first. Skips if the last export is newer then last statements update.

Source code in ftm_lakehouse/operation/export.py
class ExportStatisticsOperation(BaseExportOperation[ExportStatisticsJob]):
    """Export parquet store statistics to statistics.json. Checks if journal
    needs to be flushed first. Skips if the last export is newer then last
    statements update."""

    def handle(self, run: JobRun, *args, **kwargs) -> None:
        if self.ensure_flush():
            stats = self.entities.get_statistics()
            self.versions.make(path.EXPORTS_STATISTICS, stats)
            run.job.done = 1

ExportDocumentsOperation

Export document metadata to exports/documents.csv.

ftm_lakehouse.operation.export.ExportDocumentsJob

Bases: BaseExportJob

Source code in ftm_lakehouse/operation/export.py
class ExportDocumentsJob(BaseExportJob):
    target: str = path.EXPORTS_DOCUMENTS
    make_diff: bool = True
    public_url_prefix: HttpUrlStr | None = None

    def get_public_prefix(self) -> str | None:
        if self.public_url_prefix:
            return self.public_url_prefix
        if settings.public_url_prefix:
            return render(settings.public_url_prefix, {"dataset": self.dataset})

ftm_lakehouse.operation.ExportDocumentsOperation

Bases: BaseExportOperation[ExportDocumentsJob]

Export file metadata to documents.csv. Checks if journal needs to be flushed first. Skips if the last export is newer then last statements update.

Source code in ftm_lakehouse/operation/export.py
class ExportDocumentsOperation(BaseExportOperation[ExportDocumentsJob]):
    """Export file metadata to documents.csv. Checks if journal needs to be
    flushed first. Skips if the last export is newer then last statements
    update."""

    def handle(self, run: JobRun[ExportDocumentsJob], *args, **kwargs) -> None:
        if self.ensure_flush():
            public_prefix = run.job.get_public_prefix()
            self.documents.export_csv(public_prefix)
            if run.job.make_diff:
                self.documents.export_diff(public_url_prefix=public_prefix)
            run.job.done = 1

ExportIndexOperation

Export index.json with optional resources.

ftm_lakehouse.operation.export.ExportIndexJob

Bases: BaseExportJob

Source code in ftm_lakehouse/operation/export.py
class ExportIndexJob(BaseExportJob):
    target: str = path.INDEX
    dependencies: list[str] = [
        path.CONFIG,
        path.EXPORTS_STATISTICS,
        path.ENTITIES_JSON,
        path.EXPORTS_DOCUMENTS,
    ]

ftm_lakehouse.operation.ExportIndexOperation

Bases: BaseExportOperation[ExportIndexJob]

Export index.json, optionally including resources, therefore these targets need to be existing.

Source code in ftm_lakehouse/operation/export.py
class ExportIndexOperation(BaseExportOperation[ExportIndexJob]):
    """Export index.json, optionally including resources, therefore these
    targets need to be existing."""

    def handle(
        self,
        run: JobRun[ExportIndexJob],
        dataset: DatasetModel | None = None,
        *args,
        **kwargs,
    ) -> None:
        self.ensure_flush()

        if dataset is None:
            # we need a stub dataset to patch
            dataset = make_dataset(run.job.dataset, DatasetModel, uri=self.versions.uri)

        store = get_store(dataset.uri)
        public_prefix = dataset.get_public_prefix()

        if public_prefix:
            if store.exists(path.EXPORTS_STATEMENTS):
                uri = join_uri(dataset.uri, path.EXPORTS_STATEMENTS)
                public_url = join_uri(public_prefix, path.EXPORTS_STATEMENTS)
                dataset.resources.append(make_statements_resource(uri, public_url))

            if store.exists(path.ENTITIES_JSON):
                uri = join_uri(dataset.uri, path.ENTITIES_JSON)
                public_url = join_uri(public_prefix, path.ENTITIES_JSON)
                dataset.resources.append(make_entities_resource(uri, public_url))

            if store.exists(path.EXPORTS_DOCUMENTS):
                uri = join_uri(dataset.uri, path.EXPORTS_DOCUMENTS)
                public_url = join_uri(public_prefix, path.EXPORTS_DOCUMENTS)
                dataset.resources.append(make_documents_resource(uri, public_url))

            if store.exists(path.EXPORTS_STATISTICS):
                uri = join_uri(dataset.uri, path.EXPORTS_STATISTICS)
                public_url = join_uri(public_prefix, path.EXPORTS_STATISTICS)
                dataset.resources.append(make_statistics_resource(uri, public_url))

        if store.exists(path.EXPORTS_STATISTICS):
            dataset.apply_stats(store.get(path.EXPORTS_STATISTICS, model=DatasetStats))

        self.versions.make(path.INDEX, dataset)

        run.job.done = 1

MappingOperation

Process CSV-to-entity mapping configurations.

ftm_lakehouse.operation.mapping.MappingJob

Bases: DatasetJobModel

Source code in ftm_lakehouse/operation/mapping.py
class MappingJob(DatasetJobModel):
    content_hash: str
    entities: int = 0

ftm_lakehouse.operation.MappingOperation

Bases: DatasetJobOperation[MappingJob]

Mapping workflow that transforms a CSV file into entities.

Processes a single archived CSV file (identified by content_hash) using its mapping configuration to generate FollowTheMoney entities, which are written to the entity repository.

Example
from ftm_lakehouse.operation import MappingOperation, MappingJob

job = MappingJob.make(
    dataset="my_dataset",
    content_hash="5a6acf229ba576d9a40b09292595658bbb74ef56",
)
op = MappingOperation(job=job)
result = op.run()
print(f"Generated {result.done} entities")
Source code in ftm_lakehouse/operation/mapping.py
class MappingOperation(DatasetJobOperation[MappingJob]):
    """
    Mapping workflow that transforms a CSV file into entities.

    Processes a single archived CSV file (identified by content_hash)
    using its mapping configuration to generate FollowTheMoney entities,
    which are written to the entity repository.

    Example:
        ```python
        from ftm_lakehouse.operation import MappingOperation, MappingJob

        job = MappingJob.make(
            dataset="my_dataset",
            content_hash="5a6acf229ba576d9a40b09292595658bbb74ef56",
        )
        op = MappingOperation(job=job)
        result = op.run()
        print(f"Generated {result.done} entities")
        ```
    """

    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.mappings = MappingRepository(self.job.dataset, self.archive.uri)

    def get_target(self) -> str:
        return tag.mapping_tag(self.job.content_hash)

    def get_dependencies(self) -> list[str]:
        return [path.mapping(self.job.content_hash)]

    def handle(self, run: JobRun[MappingJob], *args, **kwargs) -> None:
        """
        Process the mapping configuration and store generated entities.

        Skips processing if the mapping output is already up-to-date
        relative to the mapping config.
        """
        origin = mapping_origin(self.job.content_hash)
        mapping = self.mappings.get(self.job.content_hash)
        file = self.archive.get_file(self.job.content_hash)
        with self.archive.local_path(file.checksum) as csv_path:
            with self.entities.writer(origin=origin) as bulk:
                for entity in map_entities(mapping, csv_path):
                    bulk.add_entity(entity)
                    run.job.done += 1
        self.entities.flush()

handle(run, *args, **kwargs)

Process the mapping configuration and store generated entities.

Skips processing if the mapping output is already up-to-date relative to the mapping config.

Source code in ftm_lakehouse/operation/mapping.py
def handle(self, run: JobRun[MappingJob], *args, **kwargs) -> None:
    """
    Process the mapping configuration and store generated entities.

    Skips processing if the mapping output is already up-to-date
    relative to the mapping config.
    """
    origin = mapping_origin(self.job.content_hash)
    mapping = self.mappings.get(self.job.content_hash)
    file = self.archive.get_file(self.job.content_hash)
    with self.archive.local_path(file.checksum) as csv_path:
        with self.entities.writer(origin=origin) as bulk:
            for entity in map_entities(mapping, csv_path):
                bulk.add_entity(entity)
                run.job.done += 1
    self.entities.flush()

Maintenance Operations

Three independent async operations on the parquet statement store. All three acquire the dataset-wide write fence (.LOCK).

CompactOperation

Bin-pack small parquet files within each (shard, bucket, origin) partition (Delta OPTIMIZE compact). Cheap; does not change row contents.

ftm_lakehouse.operation.maintenance.CompactJob

Bases: DatasetJobModel

Source code in ftm_lakehouse/operation/maintenance.py
class CompactJob(DatasetJobModel):
    pass

ftm_lakehouse.operation.CompactOperation

Bases: DatasetJobOperation[CompactJob]

Bin-pack small parquet files (Delta OPTIMIZE compact).

Cheap maintenance – only rewrites small files into larger ones; does not dedupe rows or drop tombstones (use MergeOperation for that).

Source code in ftm_lakehouse/operation/maintenance.py
class CompactOperation(DatasetJobOperation[CompactJob]):
    """Bin-pack small parquet files (Delta OPTIMIZE compact).

    Cheap maintenance – only rewrites small files into larger ones; does not
    dedupe rows or drop tombstones (use ``MergeOperation`` for that).
    """

    target = tag.STATEMENTS_COMPACTED
    dependencies = [tag.STATEMENTS_UPDATED]

    def handle(self, run: JobRun[CompactJob], *args, **kwargs) -> None:
        self.entities._statements.compact()
        run.job.done = 1

MergeOperation

Per-partition rewrite that collapses duplicates (latest last_seen per id), folds first_seen to the min, and drops tombstones older than the grace cutoff (LAKEHOUSE_GRACE_PERIOD_DAYS).

ftm_lakehouse.operation.maintenance.MergeJob

Bases: DatasetJobModel

Source code in ftm_lakehouse/operation/maintenance.py
class MergeJob(DatasetJobModel):
    pass

ftm_lakehouse.operation.MergeOperation

Bases: DatasetJobOperation[MergeJob]

Collapse duplicates and reap expired tombstones, partition by partition.

For each (shard, bucket, origin) partition: keep the most-recent row per statement id, fold first_seen down to the minimum, drop tombstones older than LAKEHOUSE_GRACE_PERIOD_DAYS.

Source code in ftm_lakehouse/operation/maintenance.py
class MergeOperation(DatasetJobOperation[MergeJob]):
    """Collapse duplicates and reap expired tombstones, partition by partition.

    For each ``(shard, bucket, origin)`` partition: keep the most-recent row
    per statement id, fold ``first_seen`` down to the minimum, drop tombstones
    older than ``LAKEHOUSE_GRACE_PERIOD_DAYS``.
    """

    target = tag.STATEMENTS_MERGED
    dependencies = [tag.STATEMENTS_UPDATED]

    def handle(self, run: JobRun[MergeJob], *args, **kwargs) -> None:
        self.entities._statements.merge()
        run.job.done = 1

VacuumOperation

Delete obsolete parquet files no longer referenced by the Delta log.

ftm_lakehouse.operation.maintenance.VacuumJob

Bases: DatasetJobModel

Source code in ftm_lakehouse/operation/maintenance.py
class VacuumJob(DatasetJobModel):
    retention_hours: int = 0

ftm_lakehouse.operation.VacuumOperation

Bases: DatasetJobOperation[VacuumJob]

Delete obsolete parquet files no longer referenced by the Delta log.

Source code in ftm_lakehouse/operation/maintenance.py
class VacuumOperation(DatasetJobOperation[VacuumJob]):
    """Delete obsolete parquet files no longer referenced by the Delta log."""

    target = tag.STATEMENTS_VACUUMED
    dependencies = [tag.STATEMENTS_COMPACTED, tag.STATEMENTS_MERGED]

    def handle(self, run: JobRun[VacuumJob], *args, **kwargs) -> None:
        self.entities._statements.vacuum(retention_hours=run.job.retention_hours)
        run.job.done = 1

MakeOperation

Full workflow: flush journal + all exports.

ftm_lakehouse.operation.make.MakeJob

Bases: DatasetJobModel

Source code in ftm_lakehouse/operation/make.py
class MakeJob(DatasetJobModel):
    pass

ftm_lakehouse.operation.MakeOperation

Bases: DatasetJobOperation[MakeJob]

Source code in ftm_lakehouse/operation/make.py
class MakeOperation(DatasetJobOperation[MakeJob]):
    target = tag.OP_MAKE
    dependencies = [tag.JOURNAL_UPDATED, tag.STATEMENTS_UPDATED]

    def handle(self, run: JobRun, *args, **kwargs) -> None:
        force = kwargs.get("force", False)
        ds = self._dataset
        ds.entities.flush()
        factories.export_statements(ds, force=force)
        factories.export_entities(ds, force=force)
        factories.export_documents(ds, force=force)
        factories.export_statistics(ds, force=force)
        factories.export_index(ds, force=force)
        run.job.done = 1

DownloadArchiveOperation

Export archive files to their original paths.

ftm_lakehouse.operation.download.DownloadArchiveJob

Bases: DatasetJobModel

Source code in ftm_lakehouse/operation/download.py
class DownloadArchiveJob(DatasetJobModel):
    target: Uri
    skipped: int = 0

ftm_lakehouse.operation.DownloadArchiveOperation

Bases: DatasetJobOperation[DownloadArchiveJob]

Download the archive files to a target transforming into nice paths based on exported documents.csv

Source code in ftm_lakehouse/operation/download.py
class DownloadArchiveOperation(DatasetJobOperation[DownloadArchiveJob]):
    """
    Download the archive files to a target transforming into nice paths based on
    exported documents.csv
    """

    target = tag.OP_DOWNLOAD_ARCHIVE
    dependencies = [path.EXPORTS_DOCUMENTS]

    def handle(self, run: JobRun[DownloadArchiveJob], *args, **kwargs) -> None:
        target = get_store(run.job.target)
        self.log.info(
            "Downloading archive ...",
            target=mask_uri(target.uri),
            documents=mask_uri(self.documents.csv_uri),
        )
        for document in self.documents.stream():
            if target.exists(document.relative_path):
                self.log.debug(
                    f"Skipping `{document.relative_path}`, already exists.",
                    checksum=document.checksum,
                    source=mask_uri(self.archive.uri),
                    target=mask_uri(target.uri),
                )
                run.job.skipped += 1
                continue

            self.log.info(
                f"Downloading `{document.relative_path}` ...",
                checksum=document.checksum,
                source=mask_uri(self.archive.uri),
                target=mask_uri(target.uri),
            )
            with target.open(document.relative_path, "wb") as o:
                with self.archive.open(document.checksum) as i:
                    stream(i, o, CHUNK_SIZE_LARGE)
            run.job.done += 1