Skip to content

ftm_lakehouse.lake

Public convenience functions for the lakehouse.

from ftm_lakehouse import lake

dataset = lake.get_dataset("my_data")
dataset = lake.ensure_dataset("my_data", title="My Dataset")
catalog = lake.get_lakehouse()

# Repository shortcuts
entities = lake.get_entities("my_data")
archive = lake.get_archive("my_data")
mappings = lake.get_mappings("my_data")

Get a lakehouse catalog.

Parameters:

Name Type Description Default
uri Uri | None

Storage URI (default from LAKEHOUSE_URI setting)

None
model_class type[DM]

Custom DatasetModel subclass

DatasetModel

Returns:

Type Description
Catalog[DM]

Catalog instance

Source code in ftm_lakehouse/lake.py
@cache
def get_lakehouse(
    uri: Uri | None = None,
    model_class: type[DM] = DatasetModel,
) -> Catalog[DM]:
    """
    Get a lakehouse catalog.

    Args:
        uri: Storage URI (default from LAKEHOUSE_URI setting)
        model_class: Custom DatasetModel subclass

    Returns:
        Catalog instance
    """
    settings = Settings()
    storage_uri = ensure_uri(uri or settings.uri)
    log.info("Loading catalog", uri=storage_uri)
    return Catalog(uri=storage_uri, model_class=model_class)

Get a dataset by name.

Parameters:

Name Type Description Default
name str

Dataset name

required
model_class type[DM]

Custom DatasetModel subclass

DatasetModel
**data Any

Additional config data (auto-saved if dataset exists)

{}

Returns:

Type Description
Dataset[DM]

Dataset instance

Source code in ftm_lakehouse/lake.py
def get_dataset(
    name: str,
    model_class: type[DM] = DatasetModel,
    **data: Any,
) -> Dataset[DM]:
    """
    Get a dataset by name.

    Args:
        name: Dataset name
        model_class: Custom DatasetModel subclass
        **data: Additional config data (auto-saved if dataset exists)

    Returns:
        Dataset instance
    """
    catalog = get_lakehouse(model_class=model_class)
    return catalog.get_dataset(name, **data)

Get a dataset and ensure it exists.

Creates config.yml if the dataset doesn't exist.

Parameters:

Name Type Description Default
name str

Dataset name

required
model_class type[DM]

Custom DatasetModel subclass

DatasetModel
**data Any

Config data for creation

{}

Returns:

Type Description
Dataset[DM]

Dataset instance (created if needed)

Source code in ftm_lakehouse/lake.py
def ensure_dataset(
    name: str,
    model_class: type[DM] = DatasetModel,
    **data: Any,
) -> Dataset[DM]:
    """
    Get a dataset and ensure it exists.

    Creates config.yml if the dataset doesn't exist.

    Args:
        name: Dataset name
        model_class: Custom DatasetModel subclass
        **data: Config data for creation

    Returns:
        Dataset instance (created if needed)
    """
    dataset = get_dataset(name, model_class=model_class, **data)
    dataset.ensure()
    return dataset

Repository Shortcuts

Get the entity repository for a dataset.

Parameters:

Name Type Description Default
dataset str

Dataset name

required
uri Uri | None

Dataset URI override (default: {LAKEHOUSE_URI}/{dataset})

None

Returns:

Type Description
EntityRepository

EntityRepository instance (cached)

Source code in ftm_lakehouse/repository/factories.py
@cache
def get_entities(dataset: str, uri: Uri | None = None) -> EntityRepository:
    """
    Get the entity repository for a dataset.

    Args:
        dataset: Dataset name
        uri: Dataset URI override (default: {LAKEHOUSE_URI}/{dataset})

    Returns:
        EntityRepository instance (cached)
    """
    settings = Settings()
    uri = uri or f"{settings.uri}/{dataset}"
    return EntityRepository(dataset, uri)

Get the archive repository for a dataset.

Parameters:

Name Type Description Default
dataset str

Dataset name

required
uri Uri | None

Dataset URI override (default: {LAKEHOUSE_URI}/{dataset})

None

Returns:

Type Description
ArchiveRepository

ArchiveRepository instance (cached)

Source code in ftm_lakehouse/repository/factories.py
@cache
def get_archive(dataset: str, uri: Uri | None = None) -> ArchiveRepository:
    """
    Get the archive repository for a dataset.

    Args:
        dataset: Dataset name
        uri: Dataset URI override (default: {LAKEHOUSE_URI}/{dataset})

    Returns:
        ArchiveRepository instance (cached)
    """
    settings = Settings()
    uri = uri or f"{settings.uri}/{dataset}"
    return ArchiveRepository(dataset, uri)

Get the mappings repository for a dataset.

Parameters:

Name Type Description Default
dataset str

Dataset name

required
uri Uri | None

Dataset URI override (default: {LAKEHOUSE_URI}/{dataset})

None

Returns:

Type Description
MappingRepository

MappingRepository instance (cached)

Source code in ftm_lakehouse/repository/factories.py
@cache
def get_mappings(dataset: str, uri: Uri | None = None) -> MappingRepository:
    """
    Get the mappings repository for a dataset.

    Args:
        dataset: Dataset name
        uri: Dataset URI override (default: {LAKEHOUSE_URI}/{dataset})

    Returns:
        MappingRepository instance (cached)
    """
    settings = Settings()
    uri = uri or f"{settings.uri}/{dataset}"
    return MappingRepository(dataset, uri)

Classes

Bases: Generic[DM]

Multi-dataset lakehouse catalog.

The Catalog manages multiple datasets within a lakehouse storage location.

Example
from ftm_lakehouse import get_lakehouse

catalog = get_lakehouse()

# List datasets
for dataset in catalog.list_datasets():
    print(dataset.name)

# Get a specific dataset
dataset = catalog.get_dataset("my_dataset")
Source code in ftm_lakehouse/catalog.py
class Catalog(Generic[DM]):
    """
    Multi-dataset lakehouse catalog.

    The Catalog manages multiple datasets within a lakehouse storage location.

    Example:
        ```python
        from ftm_lakehouse import get_lakehouse

        catalog = get_lakehouse()

        # List datasets
        for dataset in catalog.list_datasets():
            print(dataset.name)

        # Get a specific dataset
        dataset = catalog.get_dataset("my_dataset")
        ```
    """

    def __init__(
        self,
        uri: Uri,
        model_class: type[DM] = DatasetModel,
    ) -> None:
        self.uri = uri
        self._model_class = model_class
        self._log = get_logger(__name__, catalog=mask_uri(uri))

    def __repr__(self) -> str:
        return f"Catalog({self.uri!r})"

    # -------------------------------------------------------------------------
    # Storage primitives
    # -------------------------------------------------------------------------

    @cached_property
    def _store(self):
        """Raw storage access."""
        return get_store(uri=self.uri, serialization_mode="raw")

    @cached_property
    def _tags(self) -> TagStore:
        """Tag store for freshness tracking."""
        return TagStore(self.uri)

    @cached_property
    def _versions(self) -> VersionStore:
        """Version store for snapshots."""
        return VersionStore(self.uri)

    # -------------------------------------------------------------------------
    # Model access
    # -------------------------------------------------------------------------

    def _load_model(self, **data: Any) -> CatalogModel:
        """Load catalog model from config."""
        return CatalogModel(**load_config(self._store, **data))

    @property
    def model(self) -> CatalogModel:
        """Load and return the catalog model from config.yml."""
        return self._load_model()

    def update_model(self, **data: Any) -> CatalogModel:
        """
        Update config.yml with new data.

        Args:
            **data: Fields to update in the model

        Returns:
            Updated model
        """
        model = self._load_model(**data)
        self._versions.make(path.CONFIG, model)
        return model

    # -------------------------------------------------------------------------
    # Dataset management
    # -------------------------------------------------------------------------

    def get_dataset(self, name: str, **data: Any) -> "Dataset[DM]":
        """
        Get a Dataset instance by name.

        Args:
            name: Dataset name
            **data: Additional config data (auto-saved to config.yml if dataset exists)

        Returns:
            Dataset instance
        """
        from ftm_lakehouse.dataset import Dataset

        dataset_uri = join_uri(self.uri, name)
        dataset = Dataset(
            name=name,
            uri=dataset_uri,
            model_class=self._model_class,
        )

        # Auto-save config if data provided and dataset exists
        if data and dataset.exists():
            dataset.update_model(**data)

        return dataset

    def list_datasets(self) -> Generator["Dataset[DM]", None, None]:
        """
        Iterate through all datasets in the catalog.

        Yields:
            Dataset instances that have a config.yml
        """
        for child in self._store._fs.ls(self.uri):
            dataset_name = Path(child).name
            if self._store.exists(f"{dataset_name}/{path.CONFIG}"):
                yield self.get_dataset(dataset_name)

    def create_dataset(self, name: str, **data: Any) -> "Dataset[DM]":
        """
        Create a new dataset.

        Args:
            name: Dataset name
            **data: Initial config data

        Returns:
            Created Dataset instance
        """
        dataset = self.get_dataset(name, **data)
        dataset.ensure()
        return dataset

model property

Load and return the catalog model from config.yml.

get_dataset(name, **data)

Get a Dataset instance by name.

Parameters:

Name Type Description Default
name str

Dataset name

required
**data Any

Additional config data (auto-saved to config.yml if dataset exists)

{}

Returns:

Type Description
Dataset[DM]

Dataset instance

Source code in ftm_lakehouse/catalog.py
def get_dataset(self, name: str, **data: Any) -> "Dataset[DM]":
    """
    Get a Dataset instance by name.

    Args:
        name: Dataset name
        **data: Additional config data (auto-saved to config.yml if dataset exists)

    Returns:
        Dataset instance
    """
    from ftm_lakehouse.dataset import Dataset

    dataset_uri = join_uri(self.uri, name)
    dataset = Dataset(
        name=name,
        uri=dataset_uri,
        model_class=self._model_class,
    )

    # Auto-save config if data provided and dataset exists
    if data and dataset.exists():
        dataset.update_model(**data)

    return dataset

list_datasets()

Iterate through all datasets in the catalog.

Yields:

Type Description
Dataset[DM]

Dataset instances that have a config.yml

Source code in ftm_lakehouse/catalog.py
def list_datasets(self) -> Generator["Dataset[DM]", None, None]:
    """
    Iterate through all datasets in the catalog.

    Yields:
        Dataset instances that have a config.yml
    """
    for child in self._store._fs.ls(self.uri):
        dataset_name = Path(child).name
        if self._store.exists(f"{dataset_name}/{path.CONFIG}"):
            yield self.get_dataset(dataset_name)

create_dataset(name, **data)

Create a new dataset.

Parameters:

Name Type Description Default
name str

Dataset name

required
**data Any

Initial config data

{}

Returns:

Type Description
Dataset[DM]

Created Dataset instance

Source code in ftm_lakehouse/catalog.py
def create_dataset(self, name: str, **data: Any) -> "Dataset[DM]":
    """
    Create a new dataset.

    Args:
        name: Dataset name
        **data: Initial config data

    Returns:
        Created Dataset instance
    """
    dataset = self.get_dataset(name, **data)
    dataset.ensure()
    return dataset

update_model(**data)

Update config.yml with new data.

Parameters:

Name Type Description Default
**data Any

Fields to update in the model

{}

Returns:

Type Description
CatalogModel

Updated model

Source code in ftm_lakehouse/catalog.py
def update_model(self, **data: Any) -> CatalogModel:
    """
    Update config.yml with new data.

    Args:
        **data: Fields to update in the model

    Returns:
        Updated model
    """
    model = self._load_model(**data)
    self._versions.make(path.CONFIG, model)
    return model

Bases: Generic[DM]

A single dataset within the lakehouse.

Provides access to repositories for domain operations:

  • archive: File storage (ArchiveRepository)
  • entities: Entity/statement operations (EntityRepository)
  • mappings: Mapping configurations (MappingRepository)
  • jobs: Job tracking (JobRepository)
Example
from ftm_lakehouse import get_dataset

dataset = get_dataset("my_dataset")
dataset.ensure()

# Add entities
dataset.entities.add(entity, origin="import")

# Archive files
dataset.archive.put(uri)

# Update config
dataset.update_model(title="New Title")
Source code in ftm_lakehouse/dataset.py
class Dataset(Generic[DM]):
    """
    A single dataset within the lakehouse.

    Provides access to repositories for domain operations:

    - archive: File storage (ArchiveRepository)
    - entities: Entity/statement operations (EntityRepository)
    - mappings: Mapping configurations (MappingRepository)
    - jobs: Job tracking (JobRepository)

    Example:
        ```python
        from ftm_lakehouse import get_dataset

        dataset = get_dataset("my_dataset")
        dataset.ensure()

        # Add entities
        dataset.entities.add(entity, origin="import")

        # Archive files
        dataset.archive.put(uri)

        # Update config
        dataset.update_model(title="New Title")
        ```
    """

    def __init__(
        self,
        name: str,
        uri: Uri,
        model_class: type[DM] = DatasetModel,
    ) -> None:
        self.name = name
        self.uri = uri
        self._model_class = model_class
        self._settings = Settings()
        self._log = log.bind(dataset=name, uri=mask_uri(uri))

        if self._store.is_local and self._settings.on_zfs:
            if self._settings.zfs_pool is None:
                raise RuntimeError("Configure LAKEHOUSE_ZFS_POOL for zfs integration!")
            ensure_zfs_dataset(self._settings.zfs_pool, self.name)

    def __repr__(self) -> str:
        return f"Dataset({self.name!r})"

    # -------------------------------------------------------------------------
    # Storage primitives
    # -------------------------------------------------------------------------

    @cached_property
    def _store(self):
        """Raw storage access."""
        return get_store(uri=ensure_api_uri(self.uri), serialization_mode="raw")

    @cached_property
    def _tags(self) -> TagStore:
        """Tag store for freshness tracking."""
        return get_tags(self.name, self.uri)

    @cached_property
    def _versions(self) -> VersionStore:
        """Version store for snapshots."""
        return get_versions(self.name, self.uri)

    # -------------------------------------------------------------------------
    # Model access (config.yml via VersionStore)
    # -------------------------------------------------------------------------

    def _load_model(self, **data: Any) -> DM:
        """Load dataset model from config.yml."""
        data["name"] = self.name
        data.pop("storage", None)
        return self._model_class(**load_config(self._store, **data))

    @property
    def model(self) -> DM:
        """Load and return the dataset model from config.yml."""
        return self._load_model()

    @property
    def index(self) -> DM:
        """Load and return the generated index.json (or fall back to config.yml)"""
        index = self._versions.get(path.INDEX, model=self._model_class)
        if index:
            return index
        return self.model

    def update_model(self, **data: Any) -> DM:
        """
        Update config.yml with new data.

        Uses VersionStore to create versioned snapshots.

        Args:
            **data: Fields to update in the model

        Returns:
            Updated model
        """
        model = self._load_model(**data)
        self._versions.make(path.CONFIG, model)
        self._log.info("Updated dataset config")
        return model

    # -------------------------------------------------------------------------
    # Repositories (cached, initialized on first access)
    # -------------------------------------------------------------------------

    @cached_property
    def archive(self) -> ArchiveRepository:
        """File archive operations."""
        return ArchiveRepository(self.name, self.uri)

    @cached_property
    def entities(self) -> EntityRepository:
        """Entity/statement operations."""
        return EntityRepository(self.name, self.uri)

    @cached_property
    def documents(self) -> DocumentRepository:
        """Document metadata operations."""
        return DocumentRepository(self.name, self.uri)

    @cached_property
    def mappings(self) -> MappingRepository:
        """Mapping configuration storage."""
        return MappingRepository(self.name, self.uri)

    @cached_property
    def jobs(self) -> JobRepository:
        """Job tracking."""
        return JobRepository(self.name, self.uri, DatasetJobModel)

    # -------------------------------------------------------------------------
    # Lifecycle
    # -------------------------------------------------------------------------

    def exists(self) -> bool:
        """Check if dataset exists (has config.yml)."""
        return self._store.exists(path.CONFIG)

    def ensure(self) -> None:
        """Ensure dataset exists, create config.yml if needed."""
        if not self.exists():
            self.update_model()
            self._log.info("Created dataset")

    # -------------------------------------------------------------------------
    # Public Url exposure
    # -------------------------------------------------------------------------

    def get_blob_url(self, checksum: str) -> str:
        from ftm_lakehouse.core.archive_url import resolve_archive_url

        return resolve_archive_url(
            store=self._store,
            dataset=self.name,
            checksum=checksum,
            public_prefix=self.model.get_public_prefix(),
        )

archive cached property

File archive operations.

entities cached property

Entity/statement operations.

mappings cached property

Mapping configuration storage.

jobs cached property

Job tracking.

model property

Load and return the dataset model from config.yml.

update_model(**data)

Update config.yml with new data.

Uses VersionStore to create versioned snapshots.

Parameters:

Name Type Description Default
**data Any

Fields to update in the model

{}

Returns:

Type Description
DM

Updated model

Source code in ftm_lakehouse/dataset.py
def update_model(self, **data: Any) -> DM:
    """
    Update config.yml with new data.

    Uses VersionStore to create versioned snapshots.

    Args:
        **data: Fields to update in the model

    Returns:
        Updated model
    """
    model = self._load_model(**data)
    self._versions.make(path.CONFIG, model)
    self._log.info("Updated dataset config")
    return model

exists()

Check if dataset exists (has config.yml).

Source code in ftm_lakehouse/dataset.py
def exists(self) -> bool:
    """Check if dataset exists (has config.yml)."""
    return self._store.exists(path.CONFIG)

ensure()

Ensure dataset exists, create config.yml if needed.

Source code in ftm_lakehouse/dataset.py
def ensure(self) -> None:
    """Ensure dataset exists, create config.yml if needed."""
    if not self.exists():
        self.update_model()
        self._log.info("Created dataset")