Skip to content

Export

aggregate fragments to export

handle(ctx, *args, **kwargs)

The default handler of the export stage. It iterates through the entities store, calculates dataset statistics and writes the entities and dataset index to json files.

If neither entities_uri or index_uri is set, no stats for the Dataset are computed.

Parameters:

Name Type Description Default
ctx DatasetContext

The current runtime DatasetContext

required

Returns:

Type Description
Dataset

The Dataset object with calculated statistics.

Source code in investigraph/logic/export.py
def handle(ctx: "DatasetContext", *args, **kwargs) -> Dataset:
    """
    The default handler of the export stage. It iterates through the entities
    store, calculates dataset statistics and writes the entities and dataset
    index to json files.

    If neither `entities_uri` or `index_uri` is set, no stats for the `Dataset`
    are computed.

    Args:
        ctx: The current runtime `DatasetContext`

    Returns:
        The `Dataset` object with calculated statistics.
    """
    collector = Collector()
    proxies = ctx.store.iterate(dataset=ctx.dataset)
    iterator = get_iterator(proxies, collector)
    if ctx.config.export.entities_uri:
        smart_write_proxies(ctx.config.export.entities_uri, iterator)
    elif ctx.config.export.index_uri:
        # still compute statistics by iterating through the proxy iterator
        _ = [p for p in iterator]

    if ctx.config.export.index_uri:
        stats = collector.export()
        ctx.config.dataset.apply_stats(stats)
        smart_write(
            ctx.config.export.index_uri, ctx.config.dataset.model_dump_json().encode()
        )

    return ctx.config.dataset

proxy_merge(self, other)

Used to override EntityProxy.merge in investigraph.__init__.py

Source code in investigraph/logic/export.py
def proxy_merge(self: E, other: E) -> E:
    """
    Used to override `EntityProxy.merge` in `investigraph.__init__.py`
    """
    return merge(
        make_entity(self.to_dict(), StatementEntity),
        make_entity(other.to_dict(), StatementEntity),
        downgrade=True,
    )

Metadata

Location for the resulting dataset metadata, typically called index.json. Again, as investigraph is using fsspec (see above), this can basically be anywhere:

config.yml

load:
  index_uri: s3://my_bucket/<dataset>/index.json

command line

investigraph run ... --index-uri sftp://username:password@host/<dataset>/index.json

command line

investigraph run ... --entities-uri ...

export.index_uri

Uri to output dataset metadata. Can be anything that fsspec understands.

Example: s3://<bucket-name>/<dataset-name>/index.json

Default: ./data/<dataset-name>/index.json

export.entities_uri

Uri to output transformed entities. Can be anything that fsspec understands, plus a SQL endpoint (for use with followthemoney-store)

Example:

  • s3://<bucket-name>/<dataset-name>/entities.ftm.json
  • postgresql://user:password@host:port/database

Default: ./data/<dataset-name>/entities.ftm.json