Transform
As outlined, investigraph tries to automate everything around this stage. That's because transforming any arbitrary source data into ftm entities is very dependent on the actual dataset.
Still, for simple use cases, you don't need to write any python code here at all. Just define a mapping. For more complex scenarios, write your own transform function.
Mapping
Simply plug in a standardized ftm mapping (as described here) into your pipeline configuration under the root key transform.queries:
transform:
queries:
- entities:
org:
schema: Organization
keys:
- Id
properties:
name:
column: Name
# ...
As it follows the mapping specification from Follow The Money, any existing mapping can be copied over here and a mapping can easily (and independent of investigraph) tested with the ftm command line:
ftm map-csv ./<dataset>/config.yml -i ./data.csv
Please refer to the aleph documentation for more details about mappings.
Bring your own code
For more complex transforming operations, just write your own code. As described, one of the main values of investigraph is that you only have to write this one python file for a dataset, everything else is handled automatically.
Convention
In the <stage>.handler key, you can either refer to a python function via it's module path, or to a file path to a python script containing the function. In that case, by convention the python files should be named after their stages (seed.py, extract.py, transform.py, load.py, export.py) and live in the same directory as the datasets config.yml. The main entrypoint function should be called handle().
Refer a function from a module
The module must be within the current PYTHONPATH at runtime.
Refer a function from a local python script file
The entrypoint function for the transform stage has the following signature:
def handle(ctx: investigraph.model.SourceContext, data: dict[str, typing.Any], ix: int) -> typing.Generator[nomenklatura.entity.CE, None, None]:
# transform `data` into one or more entities ...
yield proxy
Ok. Let's break this down.
ctx contains the actual flow run context with some helpful information like:
ctx.datasetthe current dataset namectx.sourcethe current source from which the current data record comes from
data is the current extracted record.
ix is an integer of the index of the current record.
An actual transform.py for the gdho dataset could look like this:
from ftmq.types import CEGenerator
from investigraph.types import Record
from investigraph.model import SourceContext
def parse(ctx: SourceContext, record: Record, ix: int):
proxy = ctx.make_entity("Organization", record.pop("Id")) # schema, id
proxy.add("name", record.pop("Name"))
# add more property data ...
yield proxy
The util function make_entity creates an entity, which is implemented in nomenklatura.entity.CompositeEntity, with the schema "Organization".
Then, following the ftm python api, properties can be added via proxy.add(<prop>, <value>)
Transformation depending on source
The SourceContext object contains information about the current extracted Source, so transformation logic can depend on that:
def parse(ctx: SourceContext, record: Record, ix: int):
if ctx.source.name == "persons":
yield from handle_person_record(ctx, record)
else:
yield from handle_org_record(ctx, record)
Reference
map_ftm(ctx, record, ix)
The default handler for the transform stage. It takes a Mapping and executes it on each incoming record.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx
|
SourceContext
|
instance of the current |
required |
record
|
Record
|
The record to transform, it is an arbitrary |
required |
ix
|
int
|
The 1-based index of this record (e.g. line number of the extracted source) |
required |
Yields:
| Type | Description |
|---|---|
StatementEntities
|
Generator of |
Source code in investigraph/logic/transform.py
Inspecting transform stage
To iteratively test your configuration, you can use investigraph transform to see what output the transform stage is producing from incoming records.
We make use of bash piping here to feed in the first 10 records of the previous extract stage:
investigraph extract -c path/to/config.yml -l 10 | investigraph transform -c path/to/config.yml
This will output the first few mappend entities.