Skip to content

Overview

investigraph is a framework to orchestrate data processing workflows that transform source data into entities.

This framework tries to automatize as many functionality (scheduling and executing workflows, monitoring, extracting and storing to various local or remote sources, configuration, ...) as possible with the help of prefect.io.

As investigraph can be considered as an ETL-process for Follow The Money data, the structure (of the codebase and this overview documentation) roughly follows the three steps of such a pipeline: extract, transform, load.

The following documentation assumes you already checked out the tutorial. The documentation on this page covers the whole pipeline process more in depth. For a complete (technical) reference, check out the references for the core building blocks of investigraph:

Most of the running behaviour of a specific pipeline is configured on a per-dataset basis and/or via arguments given to a specific run of the pipeline, either via the prefect ui or via command line.

Extract

In the first step of a pipeline, we focus on getting one or more data sources and extracting data records from them that will eventually be passed to the next stage.

This stage is configured via the extract key within the config.yml

Source

A data source is defined by a uri. As investigraph is using fsspec under the hood, this uri can be anything from a local file path to a remote s3 resource.

Examples for source uris:

s3://my_bucket/data.csv
gs://my_bucket/data.csv
azure://my_bucket/data.csv
hdfs:///path/data.csv
hdfs://path/data.csv
webhdfs://host:port/path/data.csv
./local/path/data.csv
~/local/path/data.csv
local/path/data.csv
./local/path/data.csv.gz
file:///home/user/file.csv
file:///home/user/file.csv.bz2
[ssh|scp|sftp]://username@host//path/file.csv
[ssh|scp|sftp]://username@host/path/file.csv
[ssh|scp|sftp]://username:password@host/path/file.csv

And, of course, just http[s]://...

A pipeline can have more than one source and is defined in the config.yml within the extract.sources key. This can either be just a list of one or more uris or of more complex source objects.

Simple source

extract:
  sources:
    - uri: https://www.humanitarianoutcomes.org/gdho/search/results?format=csv

This tells the pipeline to fetch the output from the given url without any more logic.

As seen in the tutorial, this source has actually encoding problems and we want to skip the first line. So we need to give investigraph a bit more information on how to extract this source.

Named source

You can give a name (or identifier) to the source to be able to identify in your code from which source the generated records are coming from, e.g. to adjust a parsing function based on the source file.

extract:
  sources:
    - name: ec_juncker
      uri: https://ec.europa.eu/transparencyinitiative/meetings/dataxlsx.do?name=meetingscommissionrepresentatives1419
    - name: ec_leyen
      uri: https://ec.europa.eu/transparencyinitiative/meetings/dataxlsx.do?name=meetingscommissionrepresentatives1924

This helps us for the next stage (see below) to distinguish between different sources and adjust our parsing code to it.

More configurable source

For extracting most kinds of sources, investigrap uses runpandarun under the hood. This is a wrapper around pandas that allows specifying a pandas workflow as a yaml playbook. Pandas has a lot of options on how to read in data, and within our config.yml we can just pass any arbitrary argument to pandas.read_csv or pandas.read_excel. (runpandarun is picking the right function based on the sources mimetype.)

Just put the required arguments in the config key extract.sources[].pandas, in this case (see tutorial) like this:

extract:
  sources:
    - uri: https://www.humanitarianoutcomes.org/gdho/search/results?format=csv
      pandas:
        read:
          options:
            encoding: latin
            skiprows: 1

Under the hood, this calls

pandas.read_csv(uri, encoding="latin", skiprows=1)

If runpandarun is not able to detect the handler to read in the source, as happening in misconfigured web headers or wrong file extensions, you can manually specify the read.handler:

extract:
  sources:
    - uri: https://www.humanitarianoutcomes.org/gdho/search/results?format=csv
      pandas:
        read:
          handler: read_csv
          options:
            encoding: latin
            skiprows: 1

Prepare your data with pandas

In case you want to use the built-in support for followthemoney mappings, you might need to adjust the incoming data a bit more, as per design, followthemoney expects an already quite cleaned tabular source.

With the help of runpandarun we can basically do anything we need with the source data:

extract:
  sources:
    - uri: ./data.csv
      pandas:
        read:
          options:
            skiprows: 3
        operations:
          - handler: DataFrame.rename
            options:
              columns:
                value: amount
                "First name": first_name
          - handler: DataFrame.fillna
            options:
              value: ""
          - handler: Series.map
            column: slug
            options:
              func: "lambda x: normality.slugify(x) if isinstance(x) else 'NO DATA'"

This "pandas playbook" translates into these python calls that investigraph will run:

import pandas as pd
import normality

df = pd.read_csv("./data.csv", skiprows=3)
df = df.rename(columns={"value": "amount", "First name": "first_name"})
df = df.fillna("")
df["slug"] = df["slug"].map(lambda x: normality.slugify(x) if isinstance(x) else 'NO DATA')

Refer to the runpandarun documentation for more.

Apply data patches

runpandarun ships with datapatch integrated, so you can apply data patches after the pandas operations are applied:

extract:
  sources:
    - uri: ./data.csv
      pandas:
        read:
          options:
            skiprows: 3
        operations:
          - handler: DataFrame.fillna
            options:
              value: ""
        patch:
          countries:
            - match: "Greet Britain"
              value: "Great Britain"

Named source

You can give a name (or identifier) to the source to be able to identify in your code from which source the generated records are coming from, e.g. to adjust a parsing function based on the source file.

extract:
  sources:
    - name: ec_juncker
      uri: https://ec.europa.eu/transparencyinitiative/meetings/dataxlsx.do?name=meetingscommissionrepresentatives1419
    - name: ec_leyen
      uri: https://ec.europa.eu/transparencyinitiative/meetings/dataxlsx.do?name=meetingscommissionrepresentatives1924

This helps us for the next stage (see below) to distinguish between different sources and adjust our parsing code to it.

Bring your own code

When using a custom handler that handles the fetch & extraction logic, disable the fetch logic from investigraph and specify the custom script (or module):

extract:
  fetch: false
  handler: ./myscript.py:extract

This function has to yield a dict[str, Any] for each record that should be passed to the next stage:

import csv
import requests
from io import StringIO
from typing import Any, Generator
from investigraph.model import Context

def extract(ctx: Context) -> Generator[dict[str, Any], None, None]:
    res = requests.get(ctx.source.uri)
    yield from csv.DictReader(StringIO(res.text))

For more information about how to include custom code, see the relevant section in the transform stage.

Inspecting sources

To iteratively test your configuration, you can use investigraph inspect to see what output the extract stage is producing:

investigraph inspect <path/to/config.yml> --extract

This will output the first few extracted data records in tabular format.

Or, to output the first few records as json:

investigraph inspect <path/to/config.yml> --extract --to-json

Transform

As outlined, investigraph tries to automatize everything around this stage. That's because transforming any arbitrary source data into ftm entities is very dependant on the actual dataset.

Still, for simple use cases, you don't need to write any python code here at all. Just define a mapping. For more complex scenarios, write your own transform function.

Mapping

Simply plug in a standardized ftm mapping (as described here) into your pipeline configuration under the root key transform.queries:

transform:
  queries:
    - entities:
        org:
          schema: Organization
          keys:
            - Id
          properties:
            name:
              column: Name
            # ...

As it follows the mapping specification from Follow The Money, any existing mapping can be copied over here and a mapping can easily (and independent of investigraph) tested with the ftm command line:

ftm map-csv ./<dataset>/config.yml -i ./data.csv

Please refer to the aleph documentation for more details about mappings.

Bring your own code

For more complex transforming operations, just write your own code. As described, one of the main values of investigraph is that you only have to write this one python file for a dataset, everything else is handled automatically.

In the <stage>.handler key, you can either refer to a python function via it's module path, or to a file path to a python script containing the function. In that case, by convention the python files should be named after their stages (extract.py, transform.py, load.py) and live in the same directory as the config.yml.

Refer a function from a module

The module must be within the current PYTHONPATH at runtime.

transform:
    handler: my_library.transformers:wrangle

Refer a function from a local python script file

This file must be locally accessible on the running host. This can be achieved via prefect blocks.

transform:
    handler: ./transform.py:handle

The entrypoint function for the transform stage has the following signature:

def handle(ctx: investigraph.model.Context, data: dict[str, typing.Any], ix: int) -> typing.Generator[nomenklatura.entity.CE, None, None]:
    # transform `data` into one or more entities ...
    yield proxy

Ok. Let's break this down.

ctx contains the actual flow run context with some helpful information like:

  • ctx.dataset the current dataset name
  • ctx.source the current source from which the current data record comes from

data is the current extracted record.

ix is an integer of the index of the current record.

An actual transform.py for the gdho dataset could look like this:

def parse(ctx, record, ix):
    proxy = ctx.make_proxy("Organization", record.pop("Id"))  # schema, id
    proxy.add("name", record.pop("Name"))
    # add more property data ...
    yield proxy

The util function make_proxy creates an entity, which is implemented in nomenklatura.entity.CompositeEntity, with the schema "Organization".

Then following the ftm python api, properties can be added via proxy.add(<prop>, <value>)

Inspecting transform stage

To iteratively test your configuration, you can use investigraph inspect to see what output the transform stage is producing:

investigraph inspect <path/to/config.yml> --extract

This will output the first few mappend entities.

Load

The transformed metadata and entities can be written to various local or remote targets, including cloud storage and sql databases.

All outputs can be specified within the prefect ui, the config.yml or via command line arguments.

Metadata

Location for the resulting dataset metadata, typically called index.json. Again, as investigraph is using fsspec (see above), this can basically be anywhere:

config.yml

load:
  index_uri: s3://my_bucket/<dataset>/index.json

command line

investigraph run ... --index-uri sftp://username:password@host/<dataset>/index.json

Fragments

investigraph has to store the intermediate entity fragments somewhere before merging them into entities in the last step. Per default, fragments are written to local files, but if you are using a decentralized setup where several agents are emitting fragments, you should specify a remote uri for it:

investigraph run ... --fragments-uri s3://my_bucket/<dataset>/fragments.json

This can as well be defined in the datasets config.yml:

load:
  fragments_uri: sftp://username:password@host/<dataset>/index.json

Entities

Entities can be written to any file uri or directly to a Follow The Money store specified via a sql connection string.

As a convention, entity json files should use the extension .ftm.json

config.yml

load:
  entities_uri: s3://my_bucket/<dataset>/entities.ftm.json

write to a ftm store:

load:
  entities_uri: sqlite:///followthemoney.store

Sqlite is only suitable for developing or small local deployments, beter use a proper sql database that allows concurrent writes:

load:
  entities_uri: postgresql://user:password@host:port/database

command line

investigraph run ... --entities-uri ...

Aggregate

One essential feature from the underlying followthemoney toolkit is the so called "entity fragmentation". This means, pipelines can output partial data for a given entity and later merge them together. For example, if one data source has information about a Persons birth date, and another has information about the nationality of this person, the two different pipelines would produce two different fragments of the same entity that are aggregated at a later stage. Read more about the technical details here.

Aggregation can happen in memory (per default) or via iterating through a sql database (if the complete data doesn't fit into the machines memory).

To disable aggregation, set the flag in the prefect ui when starting a flow, or specify via command-line:

investigraph run ... --no-aggregate

Or in the yaml config:

aggregate: False

Use a database

Per default, aggregation happns in memory. If the dataset is too big for that, specify the handler and a database uri instead:

aggregate:
  handler: db
  db_uri: postgresql:///ftm

Or, bring your own code