Skip to content

Transform

As outlined, investigraph tries to automate everything around this stage. That's because transforming any arbitrary source data into ftm entities is very dependent on the actual dataset.

Still, for simple use cases, you don't need to write any python code here at all. Just define a mapping. For more complex scenarios, write your own transform function.

Mapping

Simply plug in a standardized ftm mapping (as described here) into your pipeline configuration under the root key transform.queries:

transform:
  queries:
    - entities:
        org:
          schema: Organization
          keys:
            - Id
          properties:
            name:
              column: Name
            # ...

As it follows the mapping specification from Follow The Money, any existing mapping can be copied over here and a mapping can easily (and independent of investigraph) tested with the ftm command line:

ftm map-csv ./<dataset>/config.yml -i ./data.csv

Please refer to the aleph documentation for more details about mappings.

Bring your own code

For more complex transforming operations, just write your own code. As described, one of the main values of investigraph is that you only have to write this one python file for a dataset, everything else is handled automatically.

Convention

In the <stage>.handler key, you can either refer to a python function via it's module path, or to a file path to a python script containing the function. In that case, by convention the python files should be named after their stages (seed.py, extract.py, transform.py, load.py, export.py) and live in the same directory as the datasets config.yml. The main entrypoint function should be called handle().

Refer a function from a module

The module must be within the current PYTHONPATH at runtime.

transform:
    handler: my_library.transformers:wrangle

Refer a function from a local python script file

transform:
    handler: ./transform.py:handle

The entrypoint function for the transform stage has the following signature:

def handle(ctx: investigraph.model.SourceContext, data: dict[str, typing.Any], ix: int) -> typing.Generator[nomenklatura.entity.CE, None, None]:
    # transform `data` into one or more entities ...
    yield proxy

Ok. Let's break this down.

ctx contains the actual flow run context with some helpful information like:

  • ctx.dataset the current dataset name
  • ctx.source the current source from which the current data record comes from

data is the current extracted record.

ix is an integer of the index of the current record.

An actual transform.py for the gdho dataset could look like this:

from ftmq.types import CEGenerator
from investigraph.types import Record
from investigraph.model import SourceContext

def parse(ctx: SourceContext, record: Record, ix: int):
    proxy = ctx.make_entity("Organization", record.pop("Id"))  # schema, id
    proxy.add("name", record.pop("Name"))
    # add more property data ...
    yield proxy

The util function make_entity creates an entity, which is implemented in nomenklatura.entity.CompositeEntity, with the schema "Organization".

Then, following the ftm python api, properties can be added via proxy.add(<prop>, <value>)

Transformation depending on source

The SourceContext object contains information about the current extracted Source, so transformation logic can depend on that:

def parse(ctx: SourceContext, record: Record, ix: int):
    if ctx.source.name == "persons":
        yield from handle_person_record(ctx, record)
    else:
        yield from handle_org_record(ctx, record)

Reference

map_ftm(ctx, record, ix)

The default handler for the transform stage. It takes a Mapping and executes it on each incoming record.

Parameters:

Name Type Description Default
ctx SourceContext

instance of the current SourceContext

required
record Record

The record to transform, it is an arbitrary dict[str, Any]

required
ix int

The 1-based index of this record (e.g. line number of the extracted source)

required

Yields:

Type Description
StatementEntities

Generator of StatementEntity instances

Source code in investigraph/logic/transform.py
def map_ftm(ctx: "SourceContext", record: Record, ix: int) -> StatementEntities:
    """
    The default handler for the transform stage. It takes a
    [Mapping](https://followthemoney.tech/docs/mappings/) and executes it on
    each incoming record.

    Args:
        ctx: instance of the current `SourceContext`
        record: The record to transform, it is an arbitrary `dict[str, Any]`
        ix: The 1-based index of this record (e.g. line number of the extracted
            source)

    Yields:
        Generator of `StatementEntity` instances
    """
    for mapping in ctx.config.transform.queries:
        yield from map_record(record, mapping, ctx.config.dataset.name)

Inspecting transform stage

To iteratively test your configuration, you can use investigraph transform to see what output the transform stage is producing from incoming records.

We make use of bash piping here to feed in the first 10 records of the previous extract stage:

investigraph extract -c path/to/config.yml -l 10 | investigraph transform -c path/to/config.yml

This will output the first few mappend entities.