Skip to content

Load

The transformed entities from the transform stage need to be written to a store that aggregates and optionally exports the entities. Under the hood, this is a statement store via ftmq which is based on nomenklatura.

Fragments

One essential feature from the underlying followthemoney toolkit is the so called "entity fragmentation". This means, pipelines can output partial data for a given entity and later merge them together. For example, if one data source has information about a Persons birth date, and another has information about the nationality of this person, the two different pipelines would produce two different fragments of the same entity that are aggregated at a later stage. Read more about the technical details here.

Configure

If not configured, investigraph uses a simple MemoryStore for storing the entity fragments. This store only persists during runtime of the pipeline, and in that case the export stage automatically exports the entities to a local json file, even if this is not explicitly configured in the export config. If you set the store explicitly to a persistent store, the export doesn't happen automatically.

Warning

To avoid memory issues with bigger datasets, set the store to another backend than memory://.

When using stores with redis, postgresql or leveldb backend, refer to the install section for how to install investigraph with additional dependencies for these.

Set the store uri

Redis or KVRocks store

Accepts any valid redis connection url.

load:
  uri: redis://localhost

LevelDB store

Accepts any valid path to a local path in which the DB will be created if it doesn't exist.

load:
  uri: leveldb:///data/followthemoney.ldb

SQL store

Accepts any valid sql connection string (via sqlalchemy).

sqlite

load:
  uri: sqlite:///nomeklatura.db

postgresql

load:
  uri: postgresql://user:password@host/database

Bring your own code

Bring your own code to the loading stage.

It takes nomenklatura.entity.CompositeEntity proxies coming from the transform stage.

It is called for each chunk of transformed proxies.

config.yml

load:
  handler: ./load.py:handle

load.py

import orjson
import sys

from nomenklatura.entity import CEGenerator
from investigraph.model import DatasetContext

def handle(ctx: DatasetContext, proxies: CEGenerator):
    for proxy in proxies:
        sys.stdout.write(orjson.dumps(proxy.to_dict()))

handle(ctx, proxies)

The default handler for the load stage. It writes the given proxies to the configured store.

Parameters:

Name Type Description Default
ctx DatasetContext

instance of the current runtime DatasetContext

required
proxies Iterable[StatementEntity]

Iterable of StatementEntity

required

Returns:

Type Description
int

The number of entities written to the store.

Source code in investigraph/logic/load.py
def handle(ctx: "DatasetContext", proxies: Iterable[StatementEntity]) -> int:
    """
    The default handler for the load stage. It writes the given proxies to the
    configured store.

    Args:
        ctx: instance of the current runtime `DatasetContext`
        proxies: Iterable of `StatementEntity`

    Returns:
        The number of entities written to the store.
    """
    ix = 0
    with ctx.store.writer() as bulk:
        for proxy in proxies:
            bulk.add_entity(proxy)
            ix += 1
    return ix