Usage
This guide covers using ftm-lakehouse from a tenant (application) perspective.
Getting Started
from ftm_lakehouse import lake
# Get or create a dataset
dataset = lake.ensure_dataset("my_dataset", title="My Dataset")
Working with Datasets
Get a Dataset
Check if Dataset Exists
Update Dataset Config
Access Dataset Metadata
Working with Entities
Entities are stored as statements in a journal, then flushed to a parquet store.
Add a Single Entity
from followthemoney import model
entity = model.make_entity("Person")
entity.make_identifier("john-doe")
entity.set("name", "John Doe")
dataset.entities.add(entity, origin="import")
Bulk Write Entities
with dataset.entities.writer(origin="import") as writer:
for entity in source_entities:
writer.add_entity(entity)
Flush to Parquet Store
Query Entities
# Query with filters
for entity in dataset.entities.query(origin="import"):
print(entity.caption)
# Get by ID
entity = dataset.entities.get("entity-id")
Stream from Exported JSON
Working with Files
The archive stores files with content-addressed storage.
Archive a File
Check if File Exists
Get File Metadata
file = dataset.archive.get(checksum)
print(f"Size: {file.size}")
print(f"Mimetype: {file.mimetype}")
Open a File
Stream File Content
Get Local Path (for external tools)
Custom Dataset Models
Tenants can extend DatasetModel with custom fields:
from ftm_lakehouse import lake
from ftm_lakehouse.model import DatasetModel
class MyDatasetModel(DatasetModel):
project_id: str | None = None
owner_email: str | None = None
sensitivity: str = "public"
# Use custom model
dataset = lake.get_dataset("my_data", model_class=MyDatasetModel)
# Access typed fields
model: MyDatasetModel = dataset.model
print(model.project_id)
# Update custom fields
dataset.update_model(
project_id="proj-123",
owner_email="alice@example.com",
)
Working with Catalogs
For managing multiple datasets:
from ftm_lakehouse import get_lakehouse
catalog = get_lakehouse()
# List all datasets
for dataset in catalog.list_datasets():
print(dataset.name)
# Create a new dataset
dataset = catalog.create_dataset("new_dataset", title="New Dataset")
Maintenance
The parquet statement store is append-only on the write path. Deduplication, first_seen folding, and tombstone reaping happen in three independent async operations that all run under a single dataset-wide write fence (.LOCK):
# Bin-pack small parquet files (cheap, can be run often)
dataset.entities.merge() # via repo.merge()
# Three primitives exposed on the lower-level ParquetStore:
dataset.entities._statements.compact() # cheap file bin-pack
dataset.entities._statements.merge(grace_period_days=7) # dedup + reap tombstones
dataset.entities._statements.vacuum(retention_hours=0) # delete obsolete files
CLI equivalents:
ftm-lakehouse -d my_dataset operations compact
ftm-lakehouse -d my_dataset operations merge
ftm-lakehouse -d my_dataset operations vacuum
Tombstones (from delete_entity / delete_statement) are kept for LAKEHOUSE_GRACE_PERIOD_DAYS (default 30) before merge drops them.
Bulk Import (bypassing the journal)
For one-shot loads of large entities.ftm.json files where the journal's write-amplification is wasteful, you can stream entities through an in-memory shard buffer and write straight to parquet:
from datetime import datetime, timezone
from ftmq.io import smart_read_proxies
from ftm_lakehouse.logic.entities.buffer import EntityBuffer
dataset = ensure_dataset("my_dataset")
repo = dataset.entities
buffer = EntityBuffer(dataset.name, dataset.model.shards, origin="bulk")
now = datetime.now(timezone.utc)
for proxy in smart_read_proxies("entities.ftm.json"):
buffer.add_entity(proxy)
if len(buffer) >= 1_000_000:
repo.write_statements(buffer.flush_buffer(), now=now)
if buffer:
repo.write_statements(buffer.flush_buffer(), now=now)
This is exactly what ftm-lakehouse entities import does.
Configuration
Configure via environment variables:
| Variable | Default | Description |
|---|---|---|
LAKEHOUSE_URI |
data |
Base storage path |
LAKEHOUSE_JOURNAL_URI |
sqlite:///:memory: |
Journal database URI |
LAKEHOUSE_ENTITY_SHARDS |
8 |
Uniform shard count per new dataset |
LAKEHOUSE_GRACE_PERIOD_DAYS |
30 |
Tombstone grace period used by merge |
Or use S3-compatible storage: