Skip to content

Extract

In the first step of a pipeline, we focus on getting one or more data sources and extracting data records from them that will eventually be passed to the transform stage.

This stage is configured via the extract key within the config.yml

config.yml

extract.sources

See sources config below

extract.handler

Reference to the python function that handles this stage.

Default: investigraph.logic.extract:handle

When using your own extractor, you can disable source fetching by investigraph, instead fetch (and extract) your sources within your own code:

extract:
  handler: ./extract.py:handle

Bring your own code (below)

extract.pandas

Pandas operations to apply to the data (see runpandarun), including datapatch

extract:
  pandas:
    read:
      options:
        skiprows: 2
    operations:
      - handler: DataFrame.fillna
        options:
          value: ""
    patch:
      countries:
        - match: "Greet Britain"
          value: "Great Britain"

Can also be applied per source:

extract:
  sources:
    - uri: test.csv
      options:
        pandas:
          # ...

Source

A data source is defined by a uri. As investigraph is using fsspec under the hood, this uri can be anything from a local file path to a remote s3 resource.

Examples for source uris:

s3://my_bucket/data.csv
gs://my_bucket/data.csv
azure://my_bucket/data.csv
hdfs:///path/data.csv
hdfs://path/data.csv
webhdfs://host:port/path/data.csv
./local/path/data.csv
~/local/path/data.csv
local/path/data.csv
./local/path/data.csv.gz
file:///home/user/file.csv
file:///home/user/file.csv.bz2
[ssh|scp|sftp]://username@host//path/file.csv
[ssh|scp|sftp]://username@host/path/file.csv
[ssh|scp|sftp]://username:password@host/path/file.csv

And, of course, just http[s]://...

A pipeline can have more than one source and is defined in the config.yml within the extract.sources[] key. This can either be just a list of one or more uris or of more complex source objects.

Simple source

extract:
  sources:
    - uri: https://www.humanitarianoutcomes.org/gdho/search/results?format=csv

This tells the pipeline to fetch the output from the given url without any more logic.

As seen in the tutorial, this source has actually encoding problems and we want to skip the first line. So we need to give investigraph a bit more information on how to extract this source (see tutorial and options below).

Named source

You can give a name (or identifier) to the source to be able to identify in your code from which source the generated records are coming from, e.g. to adjust a parsing function based on the source file.

extract:
  sources:
    - name: ec_juncker
      uri: https://ec.europa.eu/transparencyinitiative/meetings/dataxlsx.do?name=meetingscommissionrepresentatives1419
    - name: ec_leyen
      uri: https://ec.europa.eu/transparencyinitiative/meetings/dataxlsx.do?name=meetingscommissionrepresentatives1924

This helps us for the transform stage to distinguish between different sources and adjust our parsing code to it.

More configurable source

For extracting most kinds of sources, investigrap uses runpandarun under the hood. This is a wrapper around pandas that allows specifying a pandas workflow as a yaml playbook. Pandas has a lot of options on how to read in data, and within our config.yml we can just pass any arbitrary argument to pandas.read_csv or pandas.read_excel. (runpandarun is picking the right function based on the sources mimetype.)

Just put the required arguments in the config key extract.sources[].pandas, in this case (see tutorial) like this:

extract:
  sources:
    - uri: https://www.humanitarianoutcomes.org/gdho/search/results?format=csv
      pandas:
        read:
          options:
            encoding: latin
            skiprows: 1

Under the hood, this calls

pandas.read_csv(uri, encoding="latin", skiprows=1)

If runpandarun is not able to detect the handler to read in the source, as happening in misconfigured web headers or wrong file extensions, you can manually specify the read.handler:

extract:
  sources:
    - uri: https://www.humanitarianoutcomes.org/gdho/search/results?format=csv
      pandas:
        read:
          handler: read_csv
          options:
            encoding: latin
            skiprows: 1

Prepare your data with pandas

In case you want to use the built-in support for followthemoney mappings, you might need to adjust the incoming data a bit more, as per design, followthemoney expects an already quite cleaned tabular source.

With the help of runpandarun we can basically do anything we need with the source data:

extract:
  sources:
    - uri: ./data.csv
      pandas:
        read:
          options:
            skiprows: 3
        operations:
          - handler: DataFrame.rename
            options:
              columns:
                value: amount
                "First name": first_name
          - handler: DataFrame.fillna
            options:
              value: ""
          - handler: Series.map
            column: slug
            options:
              func: "lambda x: normality.slugify(x) if isinstance(x) else 'NO DATA'"

This "pandas playbook" translates into these python calls that investigraph will run:

import pandas as pd
import normality

df = pd.read_csv("./data.csv", skiprows=3)
df = df.rename(columns={"value": "amount", "First name": "first_name"})
df = df.fillna("")
df["slug"] = df["slug"].map(lambda x: normality.slugify(x) if isinstance(x) else 'NO DATA')

Refer to the runpandarun documentation for more.

To apply the same pandas transformations to all sources, use the sources.pandas key instead:

extract:
  sources:
    - uri: ./data1.csv
    - uri: ./data2.csv
  pandas:
    read:
      options:
        skiprows: 3

Apply data patches

runpandarun ships with datapatch integrated, so you can apply data patches after the pandas operations are applied:

extract:
  sources:
    - uri: ./data.csv
      pandas:
        read:
          options:
            skiprows: 3
        operations:
          - handler: DataFrame.fillna
            options:
              value: ""
        patch:
          countries:
            - match: "Greet Britain"
              value: "Great Britain"

Inspecting records

To iteratively test your configuration, you can use investigraph extract to see what output the extract stage is producing:

investigraph extract -c path/to/config.yml

Limit to only get the 10 first records:

investigraph extract -c path/to/config.yml -l 10

To output the first records as csv:

investigraph extract -c path/to/config.yml -l 10 --output-format csv

Archiving remote sources

Per default, investigraph stores remote sources in a local archive. To disable this behaviour, set extract.archive to false:

extract:
  archive: false
  sources:
    - # ...

Bring your own code

Bring your own code to the extraction stage.

The entrypoint function must yield dictionaries that will be passed as records to the next stage to transform.

config.yml

Convention

Custom handlers should be one python file per stage (extract.py) in the dataset folder (next to config.yml) that contain a main handle function.

extract:
  sources:
    - uri: https://example.com/data.csv
  handler: ./extract.py:handle

extract.py

import csv
from io import StringIO
from typing import Any, Generator

from investigraph.model import SourceContext

def handle(ctx: SourceContext, *args, **kwargs) -> Generator[dict[str, Any], None, None]:
    # download and open the source:
    with ctx.open("r") as fh:
      yield from csv.DictReader(fh)

Reference

Extract sources to iterate objects to dict records

handle(ctx, *args, **kwargs)

The default handler for the extract stage. It handles tabular sources with pandas. Custom extract handlers must follow this function signature.

Parameters:

Name Type Description Default
ctx SourceContext

instance of the current SourceContext

required

Yields:

Type Description
RecordGenerator

Generator of dictionaries dict[str, Any] that are the extracted records.

Source code in investigraph/logic/extract.py
def handle(ctx: SourceContext, *args, **kwargs) -> RecordGenerator:
    """
    The default handler for the extract stage. It handles tabular sources with
    `pandas`. Custom extract handlers must follow this function signature.

    Args:
        ctx: instance of the current `SourceContext`

    Yields:
        Generator of dictionaries `dict[str, Any]` that are the extracted records.
    """
    if ctx.source.pandas is None:
        ctx.source.pandas = Playbook()
    yield from extract_pandas(ctx, *args, **kwargs)