Working with Entities
The entities repository is the primary way to work with FollowTheMoney data in ftm-lakehouse. It provides a unified API for reading, writing, and querying entities.
Overview
Entities in ftm-lakehouse are stored as statements - granular property-level records. This design enables:
- Versioning: Track changes over time
- Provenance: Know where each piece of data came from (via
origin,last_seen,original_valueand other metadata from the Statement model) - Incremental updates: Add new data without reprocessing everything
- Deduplication: Merge entities from multiple sources
The underlying storage is implemented with parquet files via deltalake.
Quick Start
from ftm_lakehouse import ensure_dataset
dataset = ensure_dataset("my_dataset")
# Write entities
with dataset.entities.writer(origin="import") as writer:
for entity in entities:
writer.add_entity(entity)
# Read a specific entity
entity = dataset.entities.get("entity-id-123")
# Query entities
for entity in dataset.entities.query():
process(entity)
Alternatively, use the shortcut to get the repository directly:
from ftm_lakehouse import lake
entities = lake.get_entities("my_dataset")
entities.add(entity, origin="import")
Writing Entities
Single Entity
from ftm_lakehouse import ensure_dataset
from followthemoney import model
dataset = ensure_dataset("my_dataset")
# Create an entity
entity = model.make_entity("Person")
entity.id = "jane-doe"
entity.add("name", "Jane Doe")
entity.add("nationality", "us")
# Write to the lakehouse
dataset.entities.add(entity, origin="manual")
Bulk Writing
For large imports, use the bulk writer for better performance:
from ftm_lakehouse import ensure_dataset
dataset = ensure_dataset("my_dataset")
# Using the context manager
with dataset.entities.writer(origin="bulk_import") as writer:
for entity in large_entity_source():
writer.add_entity(entity)
Flush to Storage
Writes are buffered in a SQL journal. Flush to persist to Delta Lake:
Reading Entities
Get by ID
from ftm_lakehouse import get_dataset
dataset = get_dataset("my_dataset")
entity = dataset.entities.get("jane-doe")
if entity:
print(entity.caption)
Query with Filters
from ftm_lakehouse import get_dataset
dataset = get_dataset("my_dataset")
# Query by origin
for entity in dataset.entities.query(origin="import"):
print(entity.id)
# Query specific entity IDs
ids = ["jane-doe", "john-smith"]
for entity in dataset.entities.query(entity_ids=ids):
print(entity.caption)
# Query by schema bucket (thing, interval, address)
for entity in dataset.entities.query(bucket="thing"):
print(entity.schema.name)
Stream from Exported File
For full dataset iteration, streaming from the pre-exported JSON file is faster:
from ftm_lakehouse import get_dataset
dataset = get_dataset("my_dataset")
# Stream from entities.ftm.json (requires prior export)
for entity in dataset.entities.stream():
process(entity)
Note
stream() reads from the exported entities.ftm.json file.
Use query() to query the live statement store.
The Origin Field
The origin field tracks where data came from. This is useful for:
- Filtering: Query only entities from a specific source
- Auditing: Know the provenance of each piece of data
- Updates: Replace data from one source without affecting others
from ftm_lakehouse import ensure_dataset
dataset = ensure_dataset("my_dataset")
# Import from different sources
with dataset.entities.writer(origin="source_a") as writer:
for entity in source_a_entities:
writer.add_entity(entity)
with dataset.entities.writer(origin="source_b") as writer:
for entity in source_b_entities:
writer.add_entity(entity)
# Query only source_a entities
for entity in dataset.entities.query(origin="source_a"):
print(entity.id)
Deleting Entities
Entities can be soft-deleted. Deletes go through the journal like writes, and take effect on the next flush.
Delete an Entity
from ftm_lakehouse import get_dataset
dataset = get_dataset("my_dataset")
# Delete all statements for an entity
count = dataset.entities.delete_entity("jane-doe")
print(f"Wrote {count} tombstones")
# Flush to apply the deletion
dataset.entities.flush()
After flush, the entity is immediately excluded from all queries, statistics, and exports — no compaction required.
Delete a Single Statement
# Get the statement to delete
stmts = list(dataset.entities._statements.query_statements())
target = stmts[0]
dataset.entities.delete_statement(target)
dataset.entities.flush()
Re-adding After Delete
A deleted entity can be re-added. The new data takes precedence:
dataset.entities.delete_entity("jane-doe")
dataset.entities.flush()
# Re-add with new data
dataset.entities.add(updated_jane, origin="correction")
dataset.entities.flush()
# jane-doe is alive again with the new data
Deduplication
When flushing, ftm-lakehouse automatically deduplicates statements. If the same statement (same id) is written again, only the translog metadata (last_seen timestamp) is updated — no duplicate rows are added to the main table.
dataset.entities.add(entity)
dataset.entities.flush() # writes to main table + translog
dataset.entities.add(entity) # same entity again
dataset.entities.flush() # returns 0 — only updates translog last_seen
This means repeated imports of the same data are cheap: the main parquet table doesn't grow, and only the lightweight translog is updated.
Maintenance
Flush the Journal
Writes are buffered in a SQL journal before being flushed to Delta Lake storage:
from ftm_lakehouse import get_dataset
dataset = get_dataset("my_dataset")
count = dataset.entities.flush()
print(f"Flushed {count} new statements")
Compact
Compaction applies the translog to the main table: deleted rows are removed and timestamps are updated. After compaction, the main table is self-contained.
from ftm_lakehouse import get_dataset
dataset = get_dataset("my_dataset")
# Apply translog to main table
dataset.entities.compact()
# Follow up with optimize to compact parquet files
dataset.entities.optimize()
Note
Compaction is optional. Queries always join with the translog, so deleted entities are filtered automatically even without compaction. Compact when you want to reclaim space or produce a clean, standalone main table.
Optimize Storage
Compact Delta Lake files for better read performance:
from ftm_lakehouse import get_dataset
dataset = get_dataset("my_dataset")
# Optimize (compact files)
dataset.entities.optimize()
# Optimize and vacuum (remove old files)
dataset.entities.optimize(vacuum=True)
Complete Example
Here's a complete example of an entity import pipeline:
from ftm_lakehouse import ensure_dataset
from followthemoney import model
def create_person(name: str, nationality: str) -> model.EntityProxy:
"""Create a Person entity."""
entity = model.make_entity("Person")
entity.make_id(name)
entity.add("name", name)
entity.add("nationality", nationality)
return entity
def main():
# Ensure dataset exists
dataset = ensure_dataset("people_dataset")
# Create some entities
people = [
create_person("Jane Doe", "us"),
create_person("John Smith", "gb"),
create_person("Maria Garcia", "es"),
]
# Write entities
with dataset.entities.writer(origin="manual") as writer:
for person in people:
writer.add_entity(person)
# Flush to storage
count = dataset.entities.flush()
print(f"Flushed {count} statements")
# Query back
jane = dataset.entities.get(people[0].id)
print(f"Found: {jane.caption}")
# Query all
print("All entities:")
for entity in dataset.entities.query():
print(f" - {entity.caption}")
if __name__ == "__main__":
main()