Skip to content

Core

Cross-cutting concerns used by all layers.

Settings

Configuration from environment variables.

from ftm_lakehouse.core.settings import Settings

settings = Settings()
print(settings.uri)          # LAKEHOUSE_URI
print(settings.journal_uri)  # LAKEHOUSE_JOURNAL_URI

ftm_lakehouse.core.settings.Settings

Bases: BaseSettings

Source code in ftm_lakehouse/core/settings.py
class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_prefix="lakehouse_",
        env_nested_delimiter="__",
        env_file=".env",
        secrets_dir="/run/secrets",
        nested_model_default_partial_update=True,
        extra="ignore",
    )

    uri: str = "data"
    journal_uri: str = "sqlite:///data/journal.db"
    api_key: str | None = None
    on_zfs: bool = False
    zfs_pool: str | None = None
    zfs_socket: str | None = None
    zfs_owner: str | None = None

    public_url_prefix: str | None = None
    archive_url_expire: int = 900  # seconds (15 minutes)

    @property
    def api_mode(self) -> bool:
        return self.uri.startswith("http")

    @property
    def resolved_journal_uri(self) -> str:
        if self.api_mode:
            # force journal uri to use api as well
            return self.uri
        return self.journal_uri

Path Conventions

Standard paths within the lakehouse.

Path conventions for the FollowTheMoney data lakehouse.

The fundamental idea is to have a convention-based file system layout with well-known paths for metadata and information interchange between processing stages.

All paths are dataset-relative unless otherwise noted.

Dataset Layout

::

lakehouse/
    index.json                          # catalog index
    config.yml                          # catalog configuration
    versions/                           # versioned snapshots
        YYYY/MM/YYYY-MM-DDTHH:MM:SS/
            index.json
            config.yml

    [dataset]/
        index.json                      # dataset index
        config.yml                      # dataset configuration

        versions/                       # versioned snapshots
            YYYY/MM/...

        .LOCK                           # dataset-wide lock
        locks/{tenant}/                 # operation-specific locks
        tags/{tenant}/                  # workflow state / cache
        queue/{tenant}/                 # task queues

        archive/                        # content-addressed file storage
            ab/cd/ef/{checksum}/        # SHA1 split into segments
                blob                    # file blob (stored once)
                {file_id}.json          # metadata (one per source path)
                {origin}.txt            # extracted text (one per engine)

        mappings/
            {content_hash}/
                mapping.yml             # current CSV mapping configuration
                versions/               # versioned snapshots
                    YYYY/MM/...

        entities/
            statements/                 # statement store (partitioned)
                origin={origin}/
                    *.parquet
            translog/                   # translog metadata table (mutable)

        entities.ftm.json               # aggregated entities export

        exports/
            statistics.json             # entity counts, facets
            statements.csv              # sorted statements
            documents.csv               # document metadata
            graph.cypher                # neo4j export (optional)

        diffs/
            entities.ftm.json/
                v10_20240116T103000Z.delta.json  # entities delta
            exports/
                documents.csv/
                    v10_20240116T103000Z.documents.diff.csv  # documents delta

        jobs/
            runs/
                {job_type}/
                    {timestamp}.json    # job run results

CONFIG = 'config.yml' module-attribute

user editable config filename

INDEX = 'index.json' module-attribute

generated index filename

ARCHIVE = 'archive' module-attribute

Base path for archive

STATEMENTS = f'{ENTITIES}/statements' module-attribute

Base path for storing statement data

MAPPINGS = 'mappings' module-attribute

Base path for storing mappings

EXPORTS = 'exports' module-attribute

Base path for exports

Tag Conventions

Standard tags for freshness tracking.

Global tags used to identify actions. Used for cache keys of workflow runs etc.

JOURNAL_UPDATED = 'journal/last_updated' module-attribute

Statement journal was updated

STATEMENTS_UPDATED = 'statements/last_updated' module-attribute

Statement store was updated

ARCHIVE_UPDATED = 'archive/last_updated' module-attribute

Archive last updated (file added or removed)

EXPORTS_STATEMENTS = 'exports/statements' module-attribute

Statements CSV export last updated

ENTITIES_JSON = 'exports/entities_json' module-attribute

Entities JSON export last updated

STATISTICS = 'exports/statistics' module-attribute

Statistics export last updated