Skip to content

Layer 4: Operation

Multi-step workflow operations that coordinate across repositories.

Base Classes

ftm_lakehouse.operation.base.DatasetJobOperation

Bases: LakehouseApiMixin, Generic[DJ]

A (long-running) operation for a specific dataset that updates tags and checks dependencies for freshness to be able to skip this operation. The job result is stored after successful run.

Subclasses can either set class attributes target and dependencies, or override get_target() and get_dependencies() for dynamic values.

Source code in ftm_lakehouse/operation/base.py
class DatasetJobOperation(LakehouseApiMixin, Generic[DJ]):
    """
    A (long-running) operation for a specific dataset that updates tags and
    checks dependencies for freshness to be able to skip this operation. The job
    result is stored after successful run.

    Subclasses can either set class attributes `target` and `dependencies`,
    or override `get_target()` and `get_dependencies()` for dynamic values.
    """

    target: str = ""  # tag that gets touched after successful run
    dependencies: list[str] = []  # dependencies for freshness check
    _dataset: Dataset

    def __init__(
        self,
        job: DJ,
        archive: ArchiveRepository | None = None,
        entities: EntityRepository | None = None,
        documents: DocumentRepository | None = None,
        jobs: JobRepository | None = None,
        tags: TagStore | None = None,
        versions: VersionStore | None = None,
        uri: Uri | None = None,
    ) -> None:
        self.job = job
        self.log = job.log
        self.archive = archive or get_archive(job.dataset, uri)
        self.entities = entities or get_entities(job.dataset, uri)
        self.documents = documents or get_documents(job.dataset, uri)
        self.jobs = jobs or get_jobs(job.dataset, job.__class__, uri)
        self.tags = tags or get_tags(job.dataset, uri)
        self.versions = versions or get_versions(job.dataset, uri)
        super().__init__(uri or self.archive.uri)

    @classmethod
    def from_job(cls, job: DJ, dataset: Dataset) -> Self:
        """Create an operation instance from a job and Dataset.

        Args:
            job: The job model instance
            dataset: The Dataset providing repositories and storage

        Returns:
            Configured operation instance
        """
        instance = cls(
            job=job,
            archive=dataset.archive,
            entities=dataset.entities,
            documents=dataset.documents,
            jobs=get_jobs(dataset.name, job.__class__, dataset.uri),
            tags=dataset._tags,
            versions=dataset._versions,
        )
        instance._dataset = dataset
        return instance

    def get_target(self) -> str:
        """Return the target tag. Override for dynamic values."""
        return self.target

    def get_dependencies(self) -> list[str]:
        """Return the dependencies. Override for dynamic values."""
        return self.dependencies

    def handle(self, run: JobRun, *args, **kwargs) -> None:
        raise NotImplementedError

    @api_delegate("_api_run")
    def run(self, force: bool | None = False, *args, **kwargs) -> DJ:
        """Execute the handle function, force to run it regardless of freshness
        dependencies"""
        target = self.get_target()
        dependencies = self.get_dependencies()

        if not force:
            if target and dependencies:
                if self.tags.is_latest(target, dependencies):
                    self.job.log.info(
                        f"Already up-to-date: `{target}`, skipping ...",
                        target=target,
                        dependencies=dependencies,
                    )
                    self.job.stop()
                    return self.job

        # Execute: Store target tag and job result on successful context leave
        with self.jobs.run(self.job) as run, self.tags.touch(target) as now:
            self.job.log.info(
                f"Start `{target}` ...",
                target=target,
                dependencies=dependencies,
                started=now,
            )
            _ = self.handle(run, *args, force=force, **kwargs)
        self.log.info(
            f"Done `{target}`.",
            target=target,
            dependencies=dependencies,
            started=now,
            took=run.job.took,
            errors=run.job.errors,
        )
        result = self.jobs.latest()
        if result is not None:
            return result
        raise RuntimeError("Result is `None`")

    @require_api
    def _api_run(self, force: bool | None = False, *args, **kwargs) -> DJ:
        """Delegate run to remote api"""
        url = self._api.make_url("_api/operations")
        res = self._api.make_request(
            url,
            "POST",
            params={"force": force},
            json=self.job.model_dump(mode="json"),
        )
        return self.job.__class__(**res.json())

    def __repr__(self) -> str:
        return f"<{self.__class__.__name__}({self.job.dataset})>"

from_job(job, dataset) classmethod

Create an operation instance from a job and Dataset.

Parameters:

Name Type Description Default
job DJ

The job model instance

required
dataset Dataset

The Dataset providing repositories and storage

required

Returns:

Type Description
Self

Configured operation instance

Source code in ftm_lakehouse/operation/base.py
@classmethod
def from_job(cls, job: DJ, dataset: Dataset) -> Self:
    """Create an operation instance from a job and Dataset.

    Args:
        job: The job model instance
        dataset: The Dataset providing repositories and storage

    Returns:
        Configured operation instance
    """
    instance = cls(
        job=job,
        archive=dataset.archive,
        entities=dataset.entities,
        documents=dataset.documents,
        jobs=get_jobs(dataset.name, job.__class__, dataset.uri),
        tags=dataset._tags,
        versions=dataset._versions,
    )
    instance._dataset = dataset
    return instance

get_dependencies()

Return the dependencies. Override for dynamic values.

Source code in ftm_lakehouse/operation/base.py
def get_dependencies(self) -> list[str]:
    """Return the dependencies. Override for dynamic values."""
    return self.dependencies

get_target()

Return the target tag. Override for dynamic values.

Source code in ftm_lakehouse/operation/base.py
def get_target(self) -> str:
    """Return the target tag. Override for dynamic values."""
    return self.target

run(force=False, *args, **kwargs)

Execute the handle function, force to run it regardless of freshness dependencies

Source code in ftm_lakehouse/operation/base.py
@api_delegate("_api_run")
def run(self, force: bool | None = False, *args, **kwargs) -> DJ:
    """Execute the handle function, force to run it regardless of freshness
    dependencies"""
    target = self.get_target()
    dependencies = self.get_dependencies()

    if not force:
        if target and dependencies:
            if self.tags.is_latest(target, dependencies):
                self.job.log.info(
                    f"Already up-to-date: `{target}`, skipping ...",
                    target=target,
                    dependencies=dependencies,
                )
                self.job.stop()
                return self.job

    # Execute: Store target tag and job result on successful context leave
    with self.jobs.run(self.job) as run, self.tags.touch(target) as now:
        self.job.log.info(
            f"Start `{target}` ...",
            target=target,
            dependencies=dependencies,
            started=now,
        )
        _ = self.handle(run, *args, force=force, **kwargs)
    self.log.info(
        f"Done `{target}`.",
        target=target,
        dependencies=dependencies,
        started=now,
        took=run.job.took,
        errors=run.job.errors,
    )
    result = self.jobs.latest()
    if result is not None:
        return result
    raise RuntimeError("Result is `None`")

CrawlOperation

Batch file ingestion from a source location.

ftm_lakehouse.operation.crawl.CrawlJob

Bases: DatasetJobModel

Job model for crawl operations.

Tracks the state and configuration of a crawl job.

Attributes:

Name Type Description
uri Uri

Source location URI to crawl

prefix str | None

Include only keys with this prefix

exclude_prefix str | None

Exclude keys with this prefix

glob str | None

Include only keys matching this glob pattern

exclude_glob str | None

Exclude keys matching this glob pattern

Source code in ftm_lakehouse/operation/crawl.py
class CrawlJob(DatasetJobModel):
    """
    Job model for crawl operations.

    Tracks the state and configuration of a crawl job.

    Attributes:
        uri: Source location URI to crawl
        prefix: Include only keys with this prefix
        exclude_prefix: Exclude keys with this prefix
        glob: Include only keys matching this glob pattern
        exclude_glob: Exclude keys matching this glob pattern
    """

    uri: Uri
    prefix: str | None = None
    exclude_prefix: str | None = None
    glob: str | None = None
    exclude_glob: str | None = None
    make_entities: bool = False
    existing: HandleExistingMode | None = HandleExistingMode.skip_path

ftm_lakehouse.operation.CrawlOperation

Bases: DatasetJobOperation[CrawlJob]

Crawl workflow that archives files and creates entities.

Iterates through files in a source store, archives them to the file repository, and creates corresponding entities in the entities repository.

Example
from ftm_lakehouse.operation import CrawlOperation, CrawlJob

job = CrawlJob.make(
    uri="s3://bucket/documents",
    dataset="my_dataset",
    glob="*.pdf"
)
op = CrawlOperation(job=job)
result = op.run()
print(f"Crawled {result.done} files")
Source code in ftm_lakehouse/operation/crawl.py
class CrawlOperation(DatasetJobOperation[CrawlJob]):
    """
    Crawl workflow that archives files and creates entities.

    Iterates through files in a source store, archives them to the
    file repository, and creates corresponding entities in the
    entities repository.

    Example:
        ```python
        from ftm_lakehouse.operation import CrawlOperation, CrawlJob

        job = CrawlJob.make(
            uri="s3://bucket/documents",
            dataset="my_dataset",
            glob="*.pdf"
        )
        op = CrawlOperation(job=job)
        result = op.run()
        print(f"Crawled {result.done} files")
        ```
    """

    target = tag.OP_CRAWL

    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.source = get_store(self.job.uri)
        if self.source.is_http:
            backend_config = ensure_dict(self.source.backend_config)
            backend_config["client_kwargs"] = {
                **ensure_dict(backend_config.get("client_kwargs")),
                "timeout": aiohttp.ClientTimeout(total=3600 * 24),
            }
            self.source.backend_config = backend_config

    def get_uris(self) -> Generator[str, None, None]:
        """
        Generate file uris to crawl.

        Applies prefix, glob, and exclude filters to the source store.

        Yields:
            File uris to be crawled
        """
        self.log.info(f"Crawling `{mask_uri(self.job.uri)}` ...")
        for key in self.source.iterate_keys(
            prefix=self.job.prefix,
            exclude_prefix=self.job.exclude_prefix,
            glob=self.job.glob,
        ):
            if self.job.exclude_glob and fnmatch(key, self.job.exclude_glob):
                continue
            self.job.pending += 1
            self.job.touch()
            yield key

    def handle_crawl(self, uri: str, run: JobRun[CrawlJob]) -> datetime:
        """
        Handle a single crawl task.

        Archives the file and creates a corresponding entity.

        Args:
            uri: File uri to crawl
            run: Current job run context

        Returns:
            Timestamp when the task was processed
        """
        now = datetime.now()

        self.log.info(f"Crawling `{uri}` ...", source=mask_uri(self.source.uri))
        checksum = None
        if self.source.is_local:
            checksum = self.source.checksum(uri, algorithm=CHECKSUM_ALGORITHM)
        if not self._should_skip(uri, checksum):
            file = self.archive.store(
                self.source.to_uri(uri),
                checksum=checksum,
                key=uri,
                origin=tag.CRAWL_ORIGIN,
            )
            if self.job.make_entities:
                self.entities.add_many(file.make_entities(), tag.CRAWL_ORIGIN)
            run.job.done += 1
        return now

    def handle(self, run: JobRun, *args, **kwargs) -> None:
        for ix, task in enumerate(self.get_uris(), 1):
            if ix % 1000 == 0:
                self.log.info(
                    f"Handling task {ix} ...",
                    pending=self.job.pending,
                    done=self.job.done,
                )
                run.save()
            self.handle_crawl(task, run)
            run.job.pending -= 1
            run.job.touch()
        if self.job.make_entities:
            self.entities.flush()

    def _should_skip(self, uri: Uri, checksum: str | None) -> bool:
        if self.job.existing is None:
            return False
        if self.job.existing == HandleExistingMode.overwrite:
            return False
        if checksum is None:
            return False
        if self.job.existing == HandleExistingMode.skip_checksum:
            return self.archive.exists(checksum)
        if self.job.existing == HandleExistingMode.skip_path:
            if self.archive.exists(checksum):
                for file in self.archive.get_all_files(checksum):
                    if file.key == str(uri):
                        return True
        return False

get_uris()

Generate file uris to crawl.

Applies prefix, glob, and exclude filters to the source store.

Yields:

Type Description
str

File uris to be crawled

Source code in ftm_lakehouse/operation/crawl.py
def get_uris(self) -> Generator[str, None, None]:
    """
    Generate file uris to crawl.

    Applies prefix, glob, and exclude filters to the source store.

    Yields:
        File uris to be crawled
    """
    self.log.info(f"Crawling `{mask_uri(self.job.uri)}` ...")
    for key in self.source.iterate_keys(
        prefix=self.job.prefix,
        exclude_prefix=self.job.exclude_prefix,
        glob=self.job.glob,
    ):
        if self.job.exclude_glob and fnmatch(key, self.job.exclude_glob):
            continue
        self.job.pending += 1
        self.job.touch()
        yield key

handle_crawl(uri, run)

Handle a single crawl task.

Archives the file and creates a corresponding entity.

Parameters:

Name Type Description Default
uri str

File uri to crawl

required
run JobRun[CrawlJob]

Current job run context

required

Returns:

Type Description
datetime

Timestamp when the task was processed

Source code in ftm_lakehouse/operation/crawl.py
def handle_crawl(self, uri: str, run: JobRun[CrawlJob]) -> datetime:
    """
    Handle a single crawl task.

    Archives the file and creates a corresponding entity.

    Args:
        uri: File uri to crawl
        run: Current job run context

    Returns:
        Timestamp when the task was processed
    """
    now = datetime.now()

    self.log.info(f"Crawling `{uri}` ...", source=mask_uri(self.source.uri))
    checksum = None
    if self.source.is_local:
        checksum = self.source.checksum(uri, algorithm=CHECKSUM_ALGORITHM)
    if not self._should_skip(uri, checksum):
        file = self.archive.store(
            self.source.to_uri(uri),
            checksum=checksum,
            key=uri,
            origin=tag.CRAWL_ORIGIN,
        )
        if self.job.make_entities:
            self.entities.add_many(file.make_entities(), tag.CRAWL_ORIGIN)
        run.job.done += 1
    return now

Export Operations

ExportStatementsOperation

Export parquet store to exports/statements.csv.

ftm_lakehouse.operation.export.ExportStatementsJob

Bases: BaseExportJob

Source code in ftm_lakehouse/operation/export.py
class ExportStatementsJob(BaseExportJob):
    target: str = path.EXPORTS_STATEMENTS

ftm_lakehouse.operation.ExportStatementsOperation

Bases: BaseExportOperation[ExportStatementsJob]

Export parquet store to statements.csv. Checks if journal needs to be flushed first. Skips if the last export is newer then last statements update.

Source code in ftm_lakehouse/operation/export.py
class ExportStatementsOperation(BaseExportOperation[ExportStatementsJob]):
    """Export parquet store to statements.csv. Checks if journal needs to be
    flushed first. Skips if the last export is newer then last statements
    update."""

    def handle(self, run: JobRun, *args, **kwargs) -> None:
        if self.ensure_flush():
            output_uri = self.entities._store.to_uri(path.EXPORTS_STATEMENTS)
            self.entities._store.ensure_parent(path.EXPORTS_STATEMENTS)
            self.entities._statements.export_csv(output_uri)
            run.job.done = 1

ExportEntitiesOperation

Export parquet store to entities.ftm.json.

ftm_lakehouse.operation.export.ExportEntitiesJob

Bases: BaseExportJob

Source code in ftm_lakehouse/operation/export.py
class ExportEntitiesJob(BaseExportJob):
    target: str = path.ENTITIES_JSON
    make_diff: bool = True

ftm_lakehouse.operation.ExportEntitiesOperation

Bases: BaseExportOperation[ExportEntitiesJob]

Export parquet store to entities.ftm.json. Checks if journal needs to be flushed first. Skips if the last export is newer then last statements update.

Source code in ftm_lakehouse/operation/export.py
class ExportEntitiesOperation(BaseExportOperation[ExportEntitiesJob]):
    """Export parquet store to entities.ftm.json. Checks if journal needs to be
    flushed first. Skips if the last export is newer then last statements
    update."""

    def handle(self, run: JobRun[ExportEntitiesJob], *args, **kwargs) -> None:
        if self.ensure_flush():
            output_uri = self.entities._store.to_uri(path.ENTITIES_JSON)
            self.entities.export_entities(output_uri)
            if run.job.make_diff:
                self.entities.export_diff()
            run.job.done = 1

ExportStatisticsOperation

Export statistics to exports/statistics.json.

ftm_lakehouse.operation.export.ExportStatisticsJob

Bases: BaseExportJob

Source code in ftm_lakehouse/operation/export.py
class ExportStatisticsJob(BaseExportJob):
    target: str = path.EXPORTS_STATISTICS

ftm_lakehouse.operation.ExportStatisticsOperation

Bases: BaseExportOperation[ExportStatisticsJob]

Export parquet store statistics to statistics.json. Checks if journal needs to be flushed first. Skips if the last export is newer then last statements update.

Source code in ftm_lakehouse/operation/export.py
class ExportStatisticsOperation(BaseExportOperation[ExportStatisticsJob]):
    """Export parquet store statistics to statistics.json. Checks if journal
    needs to be flushed first. Skips if the last export is newer then last
    statements update."""

    def handle(self, run: JobRun, *args, **kwargs) -> None:
        if self.ensure_flush():
            stats = self.entities.get_statistics()
            self.versions.make(path.EXPORTS_STATISTICS, stats)
            run.job.done = 1

ExportDocumentsOperation

Export document metadata to exports/documents.csv.

ftm_lakehouse.operation.export.ExportDocumentsJob

Bases: BaseExportJob

Source code in ftm_lakehouse/operation/export.py
class ExportDocumentsJob(BaseExportJob):
    target: str = path.EXPORTS_DOCUMENTS
    make_diff: bool = True
    public_url_prefix: HttpUrlStr | None = None

    def get_public_prefix(self) -> str | None:
        if self.public_url_prefix:
            return self.public_url_prefix
        if settings.public_url_prefix:
            return render(settings.public_url_prefix, {"dataset": self.dataset})

ftm_lakehouse.operation.ExportDocumentsOperation

Bases: BaseExportOperation[ExportDocumentsJob]

Export file metadata to documents.csv. Checks if journal needs to be flushed first. Skips if the last export is newer then last statements update.

Source code in ftm_lakehouse/operation/export.py
class ExportDocumentsOperation(BaseExportOperation[ExportDocumentsJob]):
    """Export file metadata to documents.csv. Checks if journal needs to be
    flushed first. Skips if the last export is newer then last statements
    update."""

    def handle(self, run: JobRun[ExportDocumentsJob], *args, **kwargs) -> None:
        if self.ensure_flush():
            public_prefix = run.job.get_public_prefix()
            self.documents.export_csv(public_prefix)
            if run.job.make_diff:
                self.documents.export_diff(public_url_prefix=public_prefix)
            run.job.done = 1

ExportIndexOperation

Export index.json with optional resources.

ftm_lakehouse.operation.export.ExportIndexJob

Bases: BaseExportJob

Source code in ftm_lakehouse/operation/export.py
class ExportIndexJob(BaseExportJob):
    target: str = path.INDEX
    dependencies: list[str] = [
        path.CONFIG,
        path.EXPORTS_STATISTICS,
        path.ENTITIES_JSON,
        path.EXPORTS_DOCUMENTS,
    ]

ftm_lakehouse.operation.ExportIndexOperation

Bases: BaseExportOperation[ExportIndexJob]

Export index.json, optionally including resources, therefore these targets need to be existing.

Source code in ftm_lakehouse/operation/export.py
class ExportIndexOperation(BaseExportOperation[ExportIndexJob]):
    """Export index.json, optionally including resources, therefore these
    targets need to be existing."""

    def handle(
        self,
        run: JobRun[ExportIndexJob],
        dataset: DatasetModel | None = None,
        *args,
        **kwargs,
    ) -> None:
        self.ensure_flush()

        if dataset is None:
            # we need a stub dataset to patch
            dataset = make_dataset(run.job.dataset, DatasetModel, uri=self.versions.uri)
        public_prefix = dataset.get_public_prefix()

        if public_prefix:
            store = get_store(dataset.uri)
            if store.exists(path.EXPORTS_STATEMENTS):
                uri = join_uri(dataset.uri, path.EXPORTS_STATEMENTS)
                public_url = join_uri(public_prefix, path.EXPORTS_STATEMENTS)
                dataset.resources.append(make_statements_resource(uri, public_url))

            if store.exists(path.ENTITIES_JSON):
                uri = join_uri(dataset.uri, path.ENTITIES_JSON)
                public_url = join_uri(public_prefix, path.ENTITIES_JSON)
                dataset.resources.append(make_entities_resource(uri, public_url))

            if store.exists(path.EXPORTS_DOCUMENTS):
                uri = join_uri(dataset.uri, path.EXPORTS_DOCUMENTS)
                public_url = join_uri(public_prefix, path.EXPORTS_DOCUMENTS)
                dataset.resources.append(make_documents_resource(uri, public_url))

            if store.exists(path.EXPORTS_STATISTICS):
                uri = join_uri(dataset.uri, path.EXPORTS_STATISTICS)
                public_url = join_uri(public_prefix, path.EXPORTS_STATISTICS)
                dataset.resources.append(make_statistics_resource(uri, public_url))
                dataset.apply_stats(
                    store.get(path.EXPORTS_STATISTICS, model=DatasetStats)
                )

        self.versions.make(path.INDEX, dataset)

        run.job.done = 1

MappingOperation

Process CSV-to-entity mapping configurations.

ftm_lakehouse.operation.mapping.MappingJob

Bases: DatasetJobModel

Source code in ftm_lakehouse/operation/mapping.py
class MappingJob(DatasetJobModel):
    content_hash: str
    entities: int = 0

ftm_lakehouse.operation.MappingOperation

Bases: DatasetJobOperation[MappingJob]

Mapping workflow that transforms a CSV file into entities.

Processes a single archived CSV file (identified by content_hash) using its mapping configuration to generate FollowTheMoney entities, which are written to the entity repository.

Example
from ftm_lakehouse.operation import MappingOperation, MappingJob

job = MappingJob.make(
    dataset="my_dataset",
    content_hash="5a6acf229ba576d9a40b09292595658bbb74ef56",
)
op = MappingOperation(job=job)
result = op.run()
print(f"Generated {result.done} entities")
Source code in ftm_lakehouse/operation/mapping.py
class MappingOperation(DatasetJobOperation[MappingJob]):
    """
    Mapping workflow that transforms a CSV file into entities.

    Processes a single archived CSV file (identified by content_hash)
    using its mapping configuration to generate FollowTheMoney entities,
    which are written to the entity repository.

    Example:
        ```python
        from ftm_lakehouse.operation import MappingOperation, MappingJob

        job = MappingJob.make(
            dataset="my_dataset",
            content_hash="5a6acf229ba576d9a40b09292595658bbb74ef56",
        )
        op = MappingOperation(job=job)
        result = op.run()
        print(f"Generated {result.done} entities")
        ```
    """

    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.mappings = MappingRepository(self.job.dataset, self.archive.uri)

    def get_target(self) -> str:
        return tag.mapping_tag(self.job.content_hash)

    def get_dependencies(self) -> list[str]:
        return [path.mapping(self.job.content_hash)]

    def handle(self, run: JobRun[MappingJob], *args, **kwargs) -> None:
        """
        Process the mapping configuration and store generated entities.

        Skips processing if the mapping output is already up-to-date
        relative to the mapping config.
        """
        origin = mapping_origin(self.job.content_hash)
        mapping = self.mappings.get(self.job.content_hash)
        file = self.archive.get_file(self.job.content_hash)
        with self.archive.local_path(file.checksum) as csv_path:
            with self.entities.writer(origin=origin) as bulk:
                for entity in map_entities(mapping, csv_path):
                    bulk.add_entity(entity)
                    run.job.done += 1
        self.entities.flush()

handle(run, *args, **kwargs)

Process the mapping configuration and store generated entities.

Skips processing if the mapping output is already up-to-date relative to the mapping config.

Source code in ftm_lakehouse/operation/mapping.py
def handle(self, run: JobRun[MappingJob], *args, **kwargs) -> None:
    """
    Process the mapping configuration and store generated entities.

    Skips processing if the mapping output is already up-to-date
    relative to the mapping config.
    """
    origin = mapping_origin(self.job.content_hash)
    mapping = self.mappings.get(self.job.content_hash)
    file = self.archive.get_file(self.job.content_hash)
    with self.archive.local_path(file.checksum) as csv_path:
        with self.entities.writer(origin=origin) as bulk:
            for entity in map_entities(mapping, csv_path):
                bulk.add_entity(entity)
                run.job.done += 1
    self.entities.flush()

OptimizeOperation

Compact Delta Lake parquet files and optionally apply translog to main table.

ftm_lakehouse.operation.optimize.OptimizeJob

Bases: DatasetJobModel

Source code in ftm_lakehouse/operation/optimize.py
class OptimizeJob(DatasetJobModel):
    bucket: str | None = None
    origin: str | None = None
    vacuum: bool = False
    vacuum_keep_hours: int = 0
    compact: bool = False

ftm_lakehouse.operation.OptimizeOperation

Bases: DatasetJobOperation[OptimizeJob]

Optimize the parquet delta like with optional vacuum (purge of old files). The optimization can be scoped to a bucket and/or an origin. For instance, after a crawl operation, only optimizing origin=crawl is feasible.

When compact=True, performs full compaction: dedup, remove tombstones, rewrite the table, Z_ORDER optimize, and vacuum. This is a heavier operation than standard optimize but produces a clean table.

Depending on the size of the dataset, this can be a very long running operation that may require some local memory and tmp disk storage.

Source code in ftm_lakehouse/operation/optimize.py
class OptimizeOperation(DatasetJobOperation[OptimizeJob]):
    """
    Optimize the parquet delta like with optional vacuum (purge of old
    files). The optimization can be scoped to a bucket and/or an origin. For
    instance, after a crawl operation, only optimizing origin=crawl is
    feasible.

    When compact=True, performs full compaction: dedup, remove tombstones,
    rewrite the table, Z_ORDER optimize, and vacuum. This is a heavier
    operation than standard optimize but produces a clean table.

    Depending on the size of the dataset, this can be a very long running
    operation that may require some local memory and tmp disk storage.
    """

    target = tag.STORE_OPTIMIZED
    dependencies = [tag.STATEMENTS_UPDATED]

    def handle(self, run: JobRun[OptimizeJob], *args, **kwargs) -> None:
        if run.job.compact:
            self.entities._statements.compact()
        self.entities._statements.optimize(
            vacuum=run.job.vacuum,
            vacuum_keep_hours=run.job.vacuum_keep_hours,
            bucket=run.job.bucket,
            origin=run.job.origin,
        )
        run.job.done = 1

MakeOperation

Full workflow: flush journal + all exports.

ftm_lakehouse.operation.make.MakeJob

Bases: DatasetJobModel

Source code in ftm_lakehouse/operation/make.py
class MakeJob(DatasetJobModel):
    pass

ftm_lakehouse.operation.MakeOperation

Bases: DatasetJobOperation[MakeJob]

Source code in ftm_lakehouse/operation/make.py
class MakeOperation(DatasetJobOperation[MakeJob]):
    target = tag.OP_MAKE
    dependencies = [tag.JOURNAL_UPDATED, tag.STATEMENTS_UPDATED]

    def handle(self, run: JobRun, *args, **kwargs) -> None:
        force = kwargs.get("force", False)
        ds = self._dataset
        ds.entities.flush()
        factories.export_statements(ds, force=force)
        factories.export_entities(ds, force=force)
        factories.export_documents(ds, force=force)
        factories.export_statistics(ds, force=force)
        factories.export_index(ds, force=force)
        run.job.done = 1

RecreateOperation

Repair corrupted datasets from exported files.

ftm_lakehouse.operation.recreate.RecreateJob

Bases: DatasetJobModel

Job model for recreate operation.

Source code in ftm_lakehouse/operation/recreate.py
class RecreateJob(DatasetJobModel):
    """Job model for recreate operation."""

    source: RecreateSource = RecreateSource.AUTO
    statements_imported: int = 0
    entities_imported: int = 0
    files_imported: int = 0

ftm_lakehouse.operation.recreate.RecreateOperation

Bases: DatasetJobOperation[RecreateJob]

Recreate a corrupted dataset by rebuilding the parquet store from exports.

This operation repairs corrupted lakehouse datasets by: 1. Clearing the statement store (parquet) and journal 2. Re-importing entities/statements from the most recent export

The source for re-import is selected based on tag timestamps: - If entities.ftm.json is newer, import entities - If statements.csv is newer, import statements - Can be forced to use a specific source via the job's source field

Warning: This operation is destructive - it will delete all existing statement data before re-importing from exports.

Source code in ftm_lakehouse/operation/recreate.py
class RecreateOperation(DatasetJobOperation[RecreateJob]):
    """
    Recreate a corrupted dataset by rebuilding the parquet store from exports.

    This operation repairs corrupted lakehouse datasets by:
    1. Clearing the statement store (parquet) and journal
    2. Re-importing entities/statements from the most recent export

    The source for re-import is selected based on tag timestamps:
    - If entities.ftm.json is newer, import entities
    - If statements.csv is newer, import statements
    - Can be forced to use a specific source via the job's `source` field

    Warning: This operation is destructive - it will delete all existing
    statement data before re-importing from exports.
    """

    target = tag.OP_RECREATE
    dependencies = []  # No automatic freshness check - always run when called

    def _get_source(self) -> RecreateSource:
        """Determine which export to use based on tag timestamps or job config."""
        if self.job.source != RecreateSource.AUTO:
            return self.job.source

        entities_ts = self.tags.get(tag.ENTITIES_JSON)
        statements_ts = self.tags.get(tag.EXPORTS_STATEMENTS)

        entities_exists = self.entities._store.exists(path.ENTITIES_JSON)
        statements_exists = self.entities._store.exists(path.EXPORTS_STATEMENTS)

        if not entities_exists and not statements_exists:
            raise RuntimeError(
                "No export files found. Cannot recreate dataset without "
                f"`{path.ENTITIES_JSON}` or `{path.EXPORTS_STATEMENTS}`"
            )

        if not entities_exists:
            return RecreateSource.STATEMENTS
        if not statements_exists:
            return RecreateSource.ENTITIES

        # Both exist, compare timestamps
        if entities_ts is None and statements_ts is None:
            # No tags, prefer statements (faster)
            return RecreateSource.STATEMENTS
        if entities_ts is None:
            return RecreateSource.STATEMENTS
        if statements_ts is None:
            return RecreateSource.ENTITIES

        # Return the most recent one
        return (
            RecreateSource.ENTITIES
            if entities_ts >= statements_ts
            else RecreateSource.STATEMENTS
        )

    def _import_from_entities(self, run: JobRun[RecreateJob]) -> None:
        """Import entities from entities.ftm.json."""
        uri = self.entities._store.to_uri(path.ENTITIES_JSON)

        self.log.info(f"Importing from `{path.ENTITIES_JSON}` ...", uri=mask_uri(uri))

        with self.entities.writer() as writer:
            for entity in self.entities.stream():
                writer.add_entity(entity)
                run.job.entities_imported += 1

                if run.job.entities_imported % 10_000 == 0:
                    self.log.info(
                        f"Importing Entity {run.job.entities_imported} ...",
                        entities=run.job.entities_imported,
                        uri=mask_uri(uri),
                    )
                    run.save()
        self.log.info(
            f"Importing from `{path.ENTITIES_JSON}` done.",
            entities=run.job.entities_imported,
            uri=mask_uri(uri),
        )
        run.save()

    def _import_from_statements(self, run: JobRun[RecreateJob]) -> None:
        """Import statements from statements.csv."""
        uri = self.entities._store.to_uri(path.EXPORTS_STATEMENTS)

        self.log.info(
            f"Importing from `{path.EXPORTS_STATEMENTS}` ...", uri=mask_uri(uri)
        )

        with self.entities.writer() as writer:
            with smart_open(uri, "rb") as fh:
                for stmt in read_csv_statements(fh):  # type: ignore[arg-type]
                    writer.add_statement(stmt)
                    run.job.statements_imported += 1

                    if run.job.statements_imported % 100_000 == 0:
                        self.log.info(
                            f"Importing Statement {run.job.statements_imported} ...",
                            statements=run.job.statements_imported,
                            uri=mask_uri(uri),
                        )
                        run.save()
        self.log.info(
            f"Importing from `{path.EXPORTS_STATEMENTS}` done.",
            statements=run.job.statements_imported,
            uri=mask_uri(uri),
        )
        run.save()

    def _import_from_archive(self, run: JobRun[RecreateJob]) -> None:
        """Collect files metadata to add document entities"""
        self.log.info("Importing from archive ...", uri=mask_uri(self.archive.uri))

        with self.entities.writer(origin=tag.CRAWL_ORIGIN) as writer:
            for file in self.archive.iterate_files():
                if file.origin == tag.CRAWL_ORIGIN:
                    for entity in file.make_entities():
                        writer.add_entity(entity)
                run.job.files_imported += 1

                if run.job.files_imported % 1_000 == 0:
                    self.log.info(
                        f"Importing File {run.job.files_imported} ...",
                        files=run.job.files_imported,
                        uri=mask_uri(self.archive.uri),
                    )
                    run.save()
        self.log.info(
            "Importing from archive done.",
            files=run.job.files_imported,
            uri=mask_uri(self.archive.uri),
        )
        run.save()

    def handle(self, run: JobRun[RecreateJob], *args, **kwargs) -> None:
        source = self._get_source()
        self.log.info("Recreating dataset ...", source=source.value)

        # Step 1: Clear the parquet statement store
        self.entities._statements.destroy()

        # Step 2: Re-import from export
        if source == RecreateSource.STATEMENTS:
            self._import_from_statements(run)
        else:
            self._import_from_entities(run)

        # Step 3: Collect file entities from "crawl" origin
        self._import_from_archive(run)

        # Step 4: Flush journal to parquet
        flushed = self.entities.flush()

        self.log.info(
            "Recreate complete",
            source=source.value,
            entities_imported=run.job.entities_imported,
            statements_imported=run.job.statements_imported,
            statements_flushed=flushed,
        )

        run.job.done = 1

DownloadArchiveOperation

Export archive files to their original paths.

ftm_lakehouse.operation.download.DownloadArchiveJob

Bases: DatasetJobModel

Source code in ftm_lakehouse/operation/download.py
class DownloadArchiveJob(DatasetJobModel):
    target: Uri
    skipped: int = 0

ftm_lakehouse.operation.DownloadArchiveOperation

Bases: DatasetJobOperation[DownloadArchiveJob]

Download the archive files to a target transforming into nice paths based on exported documents.csv

Source code in ftm_lakehouse/operation/download.py
class DownloadArchiveOperation(DatasetJobOperation[DownloadArchiveJob]):
    """
    Download the archive files to a target transforming into nice paths based on
    exported documents.csv
    """

    target = tag.OP_DOWNLOAD_ARCHIVE
    dependencies = [path.EXPORTS_DOCUMENTS]

    def handle(self, run: JobRun[DownloadArchiveJob], *args, **kwargs) -> None:
        target = get_store(run.job.target)
        self.log.info(
            "Downloading archive ...",
            target=mask_uri(target.uri),
            documents=mask_uri(self.documents.csv_uri),
        )
        for document in self.documents.stream():
            if target.exists(document.relative_path):
                self.log.debug(
                    f"Skipping `{document.relative_path}`, already exists.",
                    checksum=document.checksum,
                    source=mask_uri(self.archive.uri),
                    target=mask_uri(target.uri),
                )
                run.job.skipped += 1
                continue

            self.log.info(
                f"Downloading `{document.relative_path}` ...",
                checksum=document.checksum,
                source=mask_uri(self.archive.uri),
                target=mask_uri(target.uri),
            )
            with target.open(document.relative_path, "wb") as o:
                with self.archive.open(document.checksum) as i:
                    stream(i, o, CHUNK_SIZE_LARGE)
            run.job.done += 1