Skip to content

Layer 1: Model

Pure data structures with no dependencies. Pydantic models for serialization.

Dataset Models

ftm_lakehouse.model.DatasetModel

Bases: Dataset

Source code in ftm_lakehouse/model/dataset.py
class DatasetModel(Dataset):
    storage: StoreModel | None = None
    """Set storage for external lakehouse"""
    public_url_prefix: HttpUrlStr | None = None
    """Public url prefix for resources"""

    def get_public_prefix(self) -> str | None:
        if self.public_url_prefix:
            return self.public_url_prefix
        if settings.public_url_prefix:
            return render(settings.public_url_prefix, {"dataset": self.name})

public_url_prefix = None class-attribute instance-attribute

Public url prefix for resources

storage = None class-attribute instance-attribute

Set storage for external lakehouse

ftm_lakehouse.model.CatalogModel

Bases: Catalog

Source code in ftm_lakehouse/model/dataset.py
class CatalogModel(Catalog):
    storage: StoreModel | None = None
    """Lakehouse storage base path"""

storage = None class-attribute instance-attribute

Lakehouse storage base path

File Model

ftm_lakehouse.model.File

Bases: Stats

File metadata model. Arbitrary data can be stored in extra, including ftm properties that should be added to the generated Entity

Source code in ftm_lakehouse/model/file.py
class File(Stats):
    """File metadata model. Arbitrary data can be stored in `extra`, including
    ftm properties that should be added to the generated Entity"""

    model_config = ConfigDict(extra="allow")

    dataset: str
    """Dataset name"""
    checksum: str
    """SHA256 checksum (often referred to as `content_hash`)"""
    extra: dict[str, Any] = {}
    """Arbitrary extra data"""
    origin: str | None = None
    """Origin stage of this file"""

    @field_validator("checksum")
    @classmethod
    def check_checksum(cls, v: str) -> str:
        return validate_checksum(v)

    @field_validator("store")
    @classmethod
    def hide_store(cls, v: str) -> str:
        # always don't include original store uri
        return "lakehouse://"

    @model_validator(mode="before")
    @classmethod
    def collect_extra_fields(cls, data: Any) -> Any:
        if not isinstance(data, dict):
            return data
        known_fields = set(cls.model_fields.keys())
        known_fields.update(["id", "path", "parent"])  # computed_field
        extra = data.get("extra", {})
        extra_fields = {k: v for k, v in data.items() if k not in known_fields}
        if extra_fields:
            data = {k: v for k, v in data.items() if k in known_fields}
            data["extra"] = {**extra, **extra_fields}
        return data

    def to_entity(self) -> StatementEntity:
        """Make an entity for this File"""
        schema = mime_to_schema(self.mimetype)
        entity = make_entity(
            {"id": self.id, "schema": schema},
            entity_type=StatementEntity,
            default_dataset=self.dataset,
        )
        entity.add("contentHash", self.checksum)
        entity.add("fileName", self.name)
        entity.add("fileSize", self.size)
        entity.add("mimeType", self.mimetype)
        entity.add("parent", self.parent)
        for prop in schema.properties:
            if prop in self.extra:
                entity.add(prop, self.extra[prop])
        return entity

    def make_parents(self) -> StatementEntities:
        """Make parent `Folder` entities"""
        parent = Path(self.key).parent
        if parent.name:
            yield from make_folders(parent, dataset=self.dataset)

    def make_entities(self) -> StatementEntities:
        yield from self.make_parents()
        yield self.to_entity()

    @computed_field
    @property
    def id(self) -> str:
        """The entity id is generated by a hash of the file path and the
        checksum. Uses just the checksum as id if that's the key"""
        if self.key == self.checksum:
            return self.checksum
        return make_file_id(self.key, self.checksum)

    @computed_field
    @property
    def path(self) -> str:
        # "key" can be misleading in the codebase so this is an alias
        return self.key

    @computed_field
    @property
    def parent(self) -> str | None:
        for parent in reversed(list(self.make_parents())):
            return parent.id

    @property
    def blob_path(self) -> str:
        """Relative path to blob in dataset archive"""
        return path.archive_blob(self.checksum)

    @property
    def meta_path(self) -> str:
        """Relative path for this file's metadata json in dataset archive"""
        return path.archive_meta(self.checksum, self.id)

    @classmethod
    def from_info(cls, info: Stats, checksum: str, **data) -> Self:
        data["dataset"] = data.get("dataset", DEFAULT_DATASET)
        data["checksum"] = checksum
        return cls(**{**info.model_dump(), **data})

    def to_document(self) -> Document:
        return Document(
            id=self.id,
            checksum=self.checksum,
            name=self.name,
            path=self.path,
            size=self.size,
            mimetype=self.mimetype,
            updated_at=self.updated_at,
        )

blob_path property

Relative path to blob in dataset archive

checksum instance-attribute

SHA256 checksum (often referred to as content_hash)

dataset instance-attribute

Dataset name

extra = {} class-attribute instance-attribute

Arbitrary extra data

id property

The entity id is generated by a hash of the file path and the checksum. Uses just the checksum as id if that's the key

meta_path property

Relative path for this file's metadata json in dataset archive

origin = None class-attribute instance-attribute

Origin stage of this file

make_parents()

Make parent Folder entities

Source code in ftm_lakehouse/model/file.py
def make_parents(self) -> StatementEntities:
    """Make parent `Folder` entities"""
    parent = Path(self.key).parent
    if parent.name:
        yield from make_folders(parent, dataset=self.dataset)

to_entity()

Make an entity for this File

Source code in ftm_lakehouse/model/file.py
def to_entity(self) -> StatementEntity:
    """Make an entity for this File"""
    schema = mime_to_schema(self.mimetype)
    entity = make_entity(
        {"id": self.id, "schema": schema},
        entity_type=StatementEntity,
        default_dataset=self.dataset,
    )
    entity.add("contentHash", self.checksum)
    entity.add("fileName", self.name)
    entity.add("fileSize", self.size)
    entity.add("mimeType", self.mimetype)
    entity.add("parent", self.parent)
    for prop in schema.properties:
        if prop in self.extra:
            entity.add(prop, self.extra[prop])
    return entity

Mapping Models

ftm_lakehouse.model.DatasetMapping

Bases: BaseModel

A complete mapping configuration for a dataset file.

Source code in ftm_lakehouse/model/mapping.py
class DatasetMapping(BaseModel):
    """A complete mapping configuration for a dataset file."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    dataset: str
    content_hash: str
    queries: list[Mapping]

Job Models

ftm_lakehouse.model.JobModel

Bases: BaseModel

Status model for a (probably long running) job

Source code in ftm_lakehouse/model/job.py
class JobModel(BaseModel):
    """Status model for a (probably long running) job"""

    run_id: str
    started: datetime | None = None
    stopped: datetime | None = None
    last_updated: datetime | None = None
    pending: int = 0
    done: int = 0
    errors: int = 0
    running: bool = False
    exc: str | None = None
    took: timedelta = timedelta()

    @computed_field
    @property
    def name(self) -> str:
        return self.__class__.__name__

    @field_validator("run_id", mode="before")
    @classmethod
    def ensure_run_id(cls, value: str | None = None) -> str:
        """Give a manual run id or create one"""
        return value or ensure_uuid()

    def touch(self) -> None:
        self.last_updated = datetime.now()

    def stop(self, exc: Exception | None = None) -> None:
        self.running = False
        self.stopped = datetime.now()
        self.exc = str(exc)
        if self.started and self.stopped:
            self.took = self.stopped - self.started

    @classmethod
    def make(cls, **kwargs) -> Self:
        kwargs["run_id"] = cls.ensure_run_id(kwargs.get("run_id"))
        return cls(**kwargs)

    @classmethod
    def start(cls, **kwargs) -> Self:
        run = cls.make(**kwargs)
        run.started = datetime.now()
        run.running = True
        run.touch()
        return run

    @cached_property
    def log(self) -> BoundLogger:
        return get_logger(__name__, run_id=self.run_id)

ensure_run_id(value=None) classmethod

Give a manual run id or create one

Source code in ftm_lakehouse/model/job.py
@field_validator("run_id", mode="before")
@classmethod
def ensure_run_id(cls, value: str | None = None) -> str:
    """Give a manual run id or create one"""
    return value or ensure_uuid()

ftm_lakehouse.model.DatasetJobModel

Bases: JobModel

Status model for a (probably long running) job bound to a dataset

Source code in ftm_lakehouse/model/job.py
class DatasetJobModel(JobModel):
    """Status model for a (probably long running) job bound to a dataset"""

    dataset: str

    @cached_property
    def log(self) -> BoundLogger:
        return get_logger(
            f"{self.dataset}.{self.name}",
            run_id=self.run_id,
            dataset=self.dataset,
        )

CRUD Models

ftm_lakehouse.model.Crud

Bases: BaseModel

Payload model for CRUD queue operations.

All lakehouse mutations go through this single queue, ordered by UUID7. The queue key (UUID7) is managed by anystore.Queue, not stored in the model.

Source code in ftm_lakehouse/model/crud.py
class Crud(BaseModel):
    """
    Payload model for CRUD queue operations.

    All lakehouse mutations go through this single queue, ordered by UUID7.
    The queue key (UUID7) is managed by anystore.Queue, not stored in the model.
    """

    action: CrudAction
    resource: CrudResource
    payload: Any = None
    created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

ftm_lakehouse.model.CrudAction

Bases: str, Enum

Action type for CRUD operations.

Source code in ftm_lakehouse/model/crud.py
class CrudAction(str, Enum):
    """Action type for CRUD operations."""

    UPSERT = "upsert"
    DELETE = "delete"

ftm_lakehouse.model.CrudResource

Bases: str, Enum

Target resource

Source code in ftm_lakehouse/model/crud.py
class CrudResource(str, Enum):
    """Target resource"""

    ARCHIVE = "archive"
    STATEMENTS = "statements"
    ENTITIES = "entities"