Load
The transformed entities from the transform stage need to be written to a store that aggregates and optionally exports the entities. Under the hood, this is a statement store via ftmq which is based on nomenklatura.
Fragments
One essential feature from the underlying followthemoney toolkit is the so called "entity fragmentation". This means, pipelines can output partial data for a given entity and later merge them together. For example, if one data source has information about a Persons birth date, and another has information about the nationality of this person, the two different pipelines would produce two different fragments of the same entity that are aggregated at a later stage. Read more about the technical details here.
Configure
If not configured, investigraph uses a simple MemoryStore for storing the entity fragments. This store only persists during runtime of the pipeline, and in that case the export stage automatically exports the entities to a local json file, even if this is not explicitly configured in the export config. If you set the store explicitly to a persistent store, the export doesn't happen automatically.
Warning
To avoid memory issues with bigger datasets, set the store to another backend than memory://.
When using stores with redis, postgresql or leveldb backend, refer to the install section for how to install investigraph with additional dependencies for these.
Set the store uri
Redis or KVRocks store
Accepts any valid redis connection url.
LevelDB store
Accepts any valid path to a local path in which the DB will be created if it doesn't exist.
SQL store
Accepts any valid sql connection string (via sqlalchemy).
sqlite
postgresql
Bring your own code
Bring your own code to the loading stage.
It takes nomenklatura.entity.CompositeEntity proxies coming from the transform stage.
It is called for each chunk of transformed proxies.
config.yml
load.py
import orjson
import sys
from nomenklatura.entity import CEGenerator
from investigraph.model import DatasetContext
def handle(ctx: DatasetContext, proxies: CEGenerator):
for proxy in proxies:
sys.stdout.write(orjson.dumps(proxy.to_dict()))
handle(ctx, proxies)
The default handler for the load stage. It writes the given proxies to the configured store.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx
|
DatasetContext
|
instance of the current runtime |
required |
proxies
|
Iterable[StatementEntity]
|
Iterable of |
required |
Returns:
| Type | Description |
|---|---|
int
|
The number of entities written to the store. |