FollowTheMoney Data Lake Spec
This specification defines a data lake structure for use with OpenAleph, OpenSanctions and related systems. The idea to to produce a long-term storage mechanism for investigative graph data, both in source, intermediate and processed form.
Core concepts
Datasets
are logical units of source data, often representing a data origin, such as an official register or publication, or a leak of documents.Data catalogs
are index files that help make indiviudal datasets more easily discoverable.Entity files
are data files in which individual FollowTheMoney entities are stored in a ready-to-index form, ie. they've been aggregated from fragments or statements. An indexer may need to add authorization information and apply denormalisations as needed.Archive objects
are files that represent source or intermediate document formats used in document forensics. They're referenced from FtM entities via their SHA1 content checksum.
Function
The idea of a FtM data lake is to provide a platform for multi-stage processing of data into FtM format. Keeping this data at rest (rather than, for example, in an Aleph operational database, and in followthemoney-store
) promises modularity, simpler architecture, and cost effectiveness.
The fundamental idea is to have a convention-based file system layout with well-known paths for metadata, and for information interchange between different processing stages.
Basic layout
A data lake file system may need to be able to hold metadata headers (e.g. Content-Disposition
, Content-Type
), so its better to think of this as object storage (S3, GCS, MinIO) than a plain operating system FS.
datasets/
catalog.json
[name]/
index.json
versions/
index-[ts]-[uuid].json
.LOCK
# If present, a crucial write operation is currently
# happening by one of the daemons. Contains timestamp of
# start, anticipated end, and tolerance.
archive/
# sha1 is full file name, not `data` like in `servicelayer`:
00/de/ad/beef123456789012345678901234567890
# optional: file metadata (e.g. scraped headers)
00/de/ad/beef123456789012345678901234567890.json
# text-only representation (e.g. from an ocr process); convenient
# for non-Aleph analytics on file system
00/de/ad/beef123456789012345678901234567890.txt
mappings/
[uuid-1]/
mapping.yml
sheet1.csv
sheet2.csv
entities/
# idea for storing UI-generated entities:
crud/
[entity_id]/
# sortable IDs:
[uuid-1].json
# missing `current.json` implies entity is deleted
current.json
# append-only file fragments from ingest-file, bulk upload, etc:
statements/
[origin-1]/
[uuid-1].csv
[uuid-2].csv
[origin-2]/
[uuid-3].csv
# Instead of `origin`, are these `phase`, `stage`?
# generated by an aggregator batch job on `statements/`:
entities.ftm.json
# generated by a vectorizing service
entities.vectors.json # {"id": <entity_id>, "data": <...>}
# for delta generation:
entities.hash
# Alternative:
aggregates/
[run_id]/
.BEGIN # timestamp as content
entities.ftm.json
.DONE # timestamp as content
xref/
# Theoretical, but an inverted index for xref entity
# blocking:
xref.idx
# generated by an aggregator batch job on entities and archives:
# some of them are optional and subject to specific use cases/applications
exports/
statistics.json # entity counts, pre-computed facets
graph.cypher # neo 4j
statements.csv # complete sorted statements
documents.csv # document metadata
inverted.idx # what entity IDs point to entity X
# for UI rendering in apps (e.g. OpenAleph)
files.json # nested file graph (folder -> subfolder -> file)
emails.json # nested/resolved email entity graph
archive.zstd
# diff exports
replay/ # ??
statements.[from-date]-[to-date].csv.diff
entities.[from-date]-[to-date].ftm.json.diff
Some thoughts on this:
- The entity data is not versioned here. In OpenSanctions, we're actually using a subfolder called
artifacts/[run_id]
to identify different ETL runs. This may not apply as well to Aleph, since it has no strong segregation of individual ETL runs. - This still doesn't have a nice way to do garbage collection on the archive without refcounting on entities.
- We may want the entity object structure in the lake to be a new format, e.g. with a
dataset
field andstatements
lists on each entity (instead ofproperties
).
Meet the daemons
Entity aggregator
A service that would traverse all individual statement files in the entities/statements
folder, sort them into a combined order and then emit aggregated FtM entities.
Ideas: DuckDB doing a big fat UNION on the CSV files right from the bucket, or some monstrous Java MapReduce/Spark thing that is good at sorting a terabyte without breaking a sweat. (Output does not have to be FtM entities - a combined & sorted statements.csv
has the same effect of making the data indexable).
See also:
ftm4 aggregate-statements
command reading sorted statements to emit entities.
Entity analyzer
A service that would read entities.ftm.json
and does analysis on (a filtered, subset of) the entities, e.g. NER, language detection, translations, vectorization. New statements are chunked and written back to the lake.
These micro services are already built with this lake concept in mind:
Entity indexer
logstash -j128 -i s3://lake/[dataset]/entities.ftm.json
File ingestor
Reads uploaded documents from entities/crud
(?) and then drops statement files into the statement folder every 60 MB (or after each document?).
If the backend supports notifications (eg. via SQS, PubSub), then the act of dropping a file to one origin
/phase
folder could trigger the subsequent layer of processing.
- cf. https://min.io/docs/minio/linux/administration/monitoring/bucket-notifications.html
Catalog collector
Goes through each dataset folder, and brings a reduced version of the dataset metadata into a big overview catalog.json
. This then pretty directly travels into the collections
Aleph database.
- Example: https://data.opensanctions.org/datasets/latest/index.json
Concept for user edits (3rd party apps)
An app, e.g. for Network Diagrams, would fetch the complete entities.ftm.json
, load it in a temporary store (e.g. DuckDB) and do read/write operations on it. After an edit session, the resulting store is exported back to the lake.
Implementation stages
- FtM 4.0 with dataset and catalog metadata specs
- Make sure lake FS change notifications can be used for stage coupling
- Build an
followthemoney-store
dumper - Find migration path for servicelayer
archive
Long-term implications
- This creates a flat-file alternative to
followthemoney-store
, using an external sorting mechanism to aggregate entity fragments. - The
entities/crud/
section implicitly unifies thedocument
andentity
tables currently used in Aleph.- Introduces versioning - nice in WYSIWYG scenarios.
- No need to have
collection
table, index andcatalog.json
. - While we're at it, mappings become flat files (as is right and proper) and can be run by a daemon.
- Lake folders can be copied between Aleph instances, making repeat processing (eg. leaks) unnecessary.
- Re-indexing gains a huge performance boost (no sorting
followthemoney-store
tables, efficient bulk indexing). - Option for file-based inverted index building that can allow hi-performance cross-referencing (xref)
- Improved infrastructure cost-efficiency as
followthemoney-store
postgresql (and therefore SSD storage) is obsolete
References
The outlined concept above is the result of a decade of open source tooling around this problem. A lot of experimental and production work has already been done within the FollowTheMoney/Aleph open source ecosystem:
- servicelayer as the core document storage layer for Aleph/OpenAleph
- nomenklatura by OpenSanctions for the statement based entities model
- anystore by DARC as a generic key/value storage that can act both as a blob (file) storage backend or caching backend
- leakrfc by DARC that was a first experiment for a standardized way of storing distributed file archives that Aleph, OpenAleph, memorious and other clients can read and write to.