Skip to content

Context

Context for the current dataset pipeline

Bases: BaseModel

Source code in investigraph/model/context.py
class DatasetContext(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)

    config: Config

    @property
    def dataset(self) -> str:
        """The dataset name (identifier)"""
        return self.config.dataset.name

    @property
    def prefix(self) -> str:
        """The dataset id prefix (defaults to its name)"""
        return self.config.dataset.prefix or self.dataset

    @property
    def cache(self) -> BaseStore:
        """A shared cache instance"""
        return get_runtime_cache()

    @cached_property
    def store(self) -> Store:
        """The statement store instance to write fragments to"""
        return get_store(self.config.load.uri, dataset=self.config.dataset.name)

    @property
    def log(self) -> BoundLogger:
        """A structlog dataset logging instance for the runtime"""
        return get_logger(f"investigraph.datasets.{self.dataset}")

    def extract_all(self, limit: int | None = None) -> RecordGenerator:
        """
        Extract all records from all sources.

        Args:
            limit: Optionally only return this number of items per source (for
                debugging purposes)

        Yields:
            Generator of dictionaries `dict[str, Any]` that are the extracted records.
        """
        for ctx in self.get_sources():
            for ix, rec in enumerate(ctx.extract(), 1):
                if limit is not None and ix > limit:
                    break
                yield rec

    def load(self, proxies: StatementEntities, *args, **kwargs) -> int:
        """
        Load transformed records with the configured handler.
        Defaults to [`investigraph.logic.load:handle`][investigraph.logic.load]

        Args:
            proxies: Generator of StatementEntity instances

        Returns:
            Number of entities loaded to store
        """
        proxies = logged_items(
            proxies,
            "Load",
            item_name="Proxy",
            logger=self.log,
            dataset=self.dataset,
            store=self.config.load.uri,
        )
        return self.config.load.handle(self, proxies, *args, **kwargs)

    def export(self, *args, **kwargs) -> Dataset:
        """
        Execute the configured export handler.
        Defaults to [`investigraph.logic.export:handle`][investigraph.logic.export]

        Returns:
            Dataset model instance for the pipeline dataset, with computed stats
                if configured.
        """
        return self.config.export.handle(self, *args, **kwargs)

    def get_sources(
        self, limit: int | None = None
    ) -> Generator["SourceContext", None, None]:
        """
        Get all the instances of
        [`SourceContext`][investigraph.model.context.SourceContext] for the
        current pipeline.

        Args:
            limit: Optionally only return this number of items (for debugging
                purposes)

        Yields:
            Generator for Source model instances
        """
        for ix, source in enumerate(
            (*self.config.seed.handle(self), *self.config.extract.sources), 1
        ):
            yield SourceContext(config=self.config, source=source)
            if limit is not None and limit > ix:
                return

    # RUNTIME HELPERS

    def make_entity(self, schema: str, *args, **kwargs) -> StatementEntity:
        """
        Instantiate a new Entity with its schema and optional data.

        Example:
            ```python
            def transform(ctx, record, ix):
                proxy = ctx.make_entity("Company")
                proxy.id = f"c-{ix}"
                proxy.add("name", record["name"])
            ```

        Args:
            schema: [FollowTheMoney schema](https://followthemoney.tech/explorer/)

        Returns:
            instance of StatementEntity
        """
        return make_entity(schema, *args, dataset=self.dataset, **kwargs)

    def make_slug(self, *args, **kwargs) -> str:
        """
        Generate a slug (usable for an entity ID). This guarantees a valid slug
        or raises an error. It either uses the configured dataset prefix or a
        custom prefix given as `prefix` keyword argument.

        Returns:
            A slug

        Raises:
            ValueError: When the slug is invalid (e.g. empty string or `None`)
        """
        prefix = kwargs.pop("prefix", self.prefix)
        slug = join_slug(*args, prefix=prefix, **kwargs)
        if not slug:
            raise ValueError("Empty slug")
        return slug

    def make_id(self, *args, **kwargs) -> str:
        """
        Generate an ID (usable for an entity ID). This guarantees a valid slug
        or raises an error. It either uses the configured dataset prefix or a
        custom prefix given as `prefix` keyword argument. The ID is generated
        from the arguments as a SHA1 hash (same as
        `followthemoney.util.make_entity_id`)

        Returns:
            An ID

        Raises:
            ValueError: When the id is invalid (e.g. empty string or `None`)
        """
        prefix = kwargs.pop("prefix", self.prefix)
        id_ = join_slug(make_entity_id(*args), prefix=prefix)
        if not id_:
            raise ValueError("Empty id")
        return id_

    def make_fingerprint_id(self, *args, **kwargs) -> str:
        """
        Generate an ID (usable for an entity ID). This guarantees a valid slug
        or raises an error. It either uses the configured dataset prefix or a
        custom prefix given as `prefix` keyword argument. The ID is generated
        from the fingerprint (using `rigour.fingerprints`) of the arguments as a
        SHA1 hash (same as `followthemoney.util.make_entity_id`)

        Returns:
            An ID based on the fingerprints of the input values

        Raises:
            ValueError: When the id is invalid (e.g. empty string or `None`)
        """
        prefix = kwargs.pop("prefix", self.prefix)
        id_ = join_slug(make_fingerprint_id(*args), prefix=prefix)
        if not id_:
            raise ValueError("Empty id")
        return id_

cache property

A shared cache instance

dataset property

The dataset name (identifier)

log property

A structlog dataset logging instance for the runtime

prefix property

The dataset id prefix (defaults to its name)

store cached property

The statement store instance to write fragments to

export(*args, **kwargs)

Execute the configured export handler. Defaults to investigraph.logic.export:handle

Returns:

Type Description
Dataset

Dataset model instance for the pipeline dataset, with computed stats if configured.

Source code in investigraph/model/context.py
def export(self, *args, **kwargs) -> Dataset:
    """
    Execute the configured export handler.
    Defaults to [`investigraph.logic.export:handle`][investigraph.logic.export]

    Returns:
        Dataset model instance for the pipeline dataset, with computed stats
            if configured.
    """
    return self.config.export.handle(self, *args, **kwargs)

extract_all(limit=None)

Extract all records from all sources.

Parameters:

Name Type Description Default
limit int | None

Optionally only return this number of items per source (for debugging purposes)

None

Yields:

Type Description
RecordGenerator

Generator of dictionaries dict[str, Any] that are the extracted records.

Source code in investigraph/model/context.py
def extract_all(self, limit: int | None = None) -> RecordGenerator:
    """
    Extract all records from all sources.

    Args:
        limit: Optionally only return this number of items per source (for
            debugging purposes)

    Yields:
        Generator of dictionaries `dict[str, Any]` that are the extracted records.
    """
    for ctx in self.get_sources():
        for ix, rec in enumerate(ctx.extract(), 1):
            if limit is not None and ix > limit:
                break
            yield rec

get_sources(limit=None)

Get all the instances of SourceContext for the current pipeline.

Parameters:

Name Type Description Default
limit int | None

Optionally only return this number of items (for debugging purposes)

None

Yields:

Type Description
SourceContext

Generator for Source model instances

Source code in investigraph/model/context.py
def get_sources(
    self, limit: int | None = None
) -> Generator["SourceContext", None, None]:
    """
    Get all the instances of
    [`SourceContext`][investigraph.model.context.SourceContext] for the
    current pipeline.

    Args:
        limit: Optionally only return this number of items (for debugging
            purposes)

    Yields:
        Generator for Source model instances
    """
    for ix, source in enumerate(
        (*self.config.seed.handle(self), *self.config.extract.sources), 1
    ):
        yield SourceContext(config=self.config, source=source)
        if limit is not None and limit > ix:
            return

load(proxies, *args, **kwargs)

Load transformed records with the configured handler. Defaults to investigraph.logic.load:handle

Parameters:

Name Type Description Default
proxies StatementEntities

Generator of StatementEntity instances

required

Returns:

Type Description
int

Number of entities loaded to store

Source code in investigraph/model/context.py
def load(self, proxies: StatementEntities, *args, **kwargs) -> int:
    """
    Load transformed records with the configured handler.
    Defaults to [`investigraph.logic.load:handle`][investigraph.logic.load]

    Args:
        proxies: Generator of StatementEntity instances

    Returns:
        Number of entities loaded to store
    """
    proxies = logged_items(
        proxies,
        "Load",
        item_name="Proxy",
        logger=self.log,
        dataset=self.dataset,
        store=self.config.load.uri,
    )
    return self.config.load.handle(self, proxies, *args, **kwargs)

make_entity(schema, *args, **kwargs)

Instantiate a new Entity with its schema and optional data.

Example
def transform(ctx, record, ix):
    proxy = ctx.make_entity("Company")
    proxy.id = f"c-{ix}"
    proxy.add("name", record["name"])

Parameters:

Name Type Description Default
schema str required

Returns:

Type Description
StatementEntity

instance of StatementEntity

Source code in investigraph/model/context.py
def make_entity(self, schema: str, *args, **kwargs) -> StatementEntity:
    """
    Instantiate a new Entity with its schema and optional data.

    Example:
        ```python
        def transform(ctx, record, ix):
            proxy = ctx.make_entity("Company")
            proxy.id = f"c-{ix}"
            proxy.add("name", record["name"])
        ```

    Args:
        schema: [FollowTheMoney schema](https://followthemoney.tech/explorer/)

    Returns:
        instance of StatementEntity
    """
    return make_entity(schema, *args, dataset=self.dataset, **kwargs)

make_fingerprint_id(*args, **kwargs)

Generate an ID (usable for an entity ID). This guarantees a valid slug or raises an error. It either uses the configured dataset prefix or a custom prefix given as prefix keyword argument. The ID is generated from the fingerprint (using rigour.fingerprints) of the arguments as a SHA1 hash (same as followthemoney.util.make_entity_id)

Returns:

Type Description
str

An ID based on the fingerprints of the input values

Raises:

Type Description
ValueError

When the id is invalid (e.g. empty string or None)

Source code in investigraph/model/context.py
def make_fingerprint_id(self, *args, **kwargs) -> str:
    """
    Generate an ID (usable for an entity ID). This guarantees a valid slug
    or raises an error. It either uses the configured dataset prefix or a
    custom prefix given as `prefix` keyword argument. The ID is generated
    from the fingerprint (using `rigour.fingerprints`) of the arguments as a
    SHA1 hash (same as `followthemoney.util.make_entity_id`)

    Returns:
        An ID based on the fingerprints of the input values

    Raises:
        ValueError: When the id is invalid (e.g. empty string or `None`)
    """
    prefix = kwargs.pop("prefix", self.prefix)
    id_ = join_slug(make_fingerprint_id(*args), prefix=prefix)
    if not id_:
        raise ValueError("Empty id")
    return id_

make_id(*args, **kwargs)

Generate an ID (usable for an entity ID). This guarantees a valid slug or raises an error. It either uses the configured dataset prefix or a custom prefix given as prefix keyword argument. The ID is generated from the arguments as a SHA1 hash (same as followthemoney.util.make_entity_id)

Returns:

Type Description
str

An ID

Raises:

Type Description
ValueError

When the id is invalid (e.g. empty string or None)

Source code in investigraph/model/context.py
def make_id(self, *args, **kwargs) -> str:
    """
    Generate an ID (usable for an entity ID). This guarantees a valid slug
    or raises an error. It either uses the configured dataset prefix or a
    custom prefix given as `prefix` keyword argument. The ID is generated
    from the arguments as a SHA1 hash (same as
    `followthemoney.util.make_entity_id`)

    Returns:
        An ID

    Raises:
        ValueError: When the id is invalid (e.g. empty string or `None`)
    """
    prefix = kwargs.pop("prefix", self.prefix)
    id_ = join_slug(make_entity_id(*args), prefix=prefix)
    if not id_:
        raise ValueError("Empty id")
    return id_

make_slug(*args, **kwargs)

Generate a slug (usable for an entity ID). This guarantees a valid slug or raises an error. It either uses the configured dataset prefix or a custom prefix given as prefix keyword argument.

Returns:

Type Description
str

A slug

Raises:

Type Description
ValueError

When the slug is invalid (e.g. empty string or None)

Source code in investigraph/model/context.py
def make_slug(self, *args, **kwargs) -> str:
    """
    Generate a slug (usable for an entity ID). This guarantees a valid slug
    or raises an error. It either uses the configured dataset prefix or a
    custom prefix given as `prefix` keyword argument.

    Returns:
        A slug

    Raises:
        ValueError: When the slug is invalid (e.g. empty string or `None`)
    """
    prefix = kwargs.pop("prefix", self.prefix)
    slug = join_slug(*args, prefix=prefix, **kwargs)
    if not slug:
        raise ValueError("Empty slug")
    return slug

Context for the current source

Bases: DatasetContext

Source code in investigraph/model/context.py
class SourceContext(DatasetContext):
    source: Source

    @cached_property
    def extract_key(self) -> str:
        """
        The computed cache ke for extraction for the current source.
        See [Cache][investigraph.cache]
        """
        key = make_cache_key(self.source.uri, use_checksum=True)
        if not key:
            raise ValueError(f"Empty cache key for source `{self.source.name}`")
        return f"extracted/{self.dataset}/{key}"

    @cached_property
    def should_extract(self) -> bool:
        """
        Check if the source with the same cache key was already extracted
        """
        settings = Settings()
        if settings.extract_cache:
            cache = get_archive_cache()
            if cache.exists(self.extract_key):
                self.log.info(
                    "Skipping cached source",
                    cache_key=self.extract_key,
                    source=self.source.uri,
                )
                return False
        return True

    # STAGES

    def extract(self, limit: int | None = None) -> RecordGenerator:
        """
        Extract the records for the current source with the configured handler.
        Defaults to [`investigraph.logic.extract:handle`][investigraph.logic.extract]

        Args:
            limit: Optionally only return this number of items per source (for
                debugging purposes)

        Yields:
            Generator of dictionaries `dict[str, Any]` that are the extracted records.
        """

        if not self.should_extract:
            return

        def _records():
            for ix, record in enumerate(self.config.extract.handle(self), 1):
                if limit is not None and ix > limit:
                    break
                record["__source__"] = self.source.name
                yield record

        yield from logged_items(
            _records(),
            "Extract",
            item_name="Record",
            logger=self.log,
            dataset=self.dataset,
            source=self.source.uri,
        )

        cache = get_archive_cache()
        cache.touch(self.extract_key)

    def transform(self, records: RecordGenerator) -> StatementEntities:
        """
        Transform extracted records from the current source into FollowTheMoney
        entities with the configured handler.
        Defaults to [`investigraph.logic.transform:map_ftm`][investigraph.logic.transform]

        Args:
            records: Generator of record items as `dict[str, Any]`

        Yields:
            Generator of StatementEntity
        """

        def _proxies():
            for ix, record in enumerate(records, 1):
                yield from self.config.transform.handle(self, record, ix)

        yield from logged_items(
            _proxies(),
            "Transform",
            item_name="Proxy",
            logger=self.log,
            dataset=self.dataset,
            source=self.source.uri,
        )

    def task(self) -> "TaskContext":
        """
        Get a runtime task context to pass on to helper functions within
        transform stage. See [`TaskContext.emit`][investigraph.model.TaskContext.emit]

        Example:
            ```python
            def transform(ctx, record, ix):
                task_ctx = ctx.task()

                # do something, within this function use `task_ctx.emit()`
                handle_record(task_ctx, record)

                # pass on emitted entities to next stage
                yield from task_ctx
            ```

        Returns:
            The runtime task context
        """
        return TaskContext(**self.model_dump())

    def open(
        self, mode: str | None = DEFAULT_MODE, **kwargs
    ) -> ContextManager[IO[AnyStr]]:
        """
        Open the context source as a file-like handler. If `archive=True` is set
        via extract stage config, the source will be downloaded locally first.

        Example:
            ```python
            def extract(ctx, *args, **kwargs):
                with ctx.open() as h:
                    while line := h.readline():
                        yield line
            ```

        Args:
            mode: The mode to open, defaults `rb`

        Returns:
            A file-handler like context manager. The file gets closed when
                leaving the context.
        """
        uri = self.source.uri
        if self.config.extract.archive and not self.source.is_local:
            uri = archive_source(uri)
            archive = get_archive()
            return archive.open(uri, mode=mode, **kwargs)
        return smart_open(uri, mode=mode, **kwargs)

extract_key cached property

The computed cache ke for extraction for the current source. See Cache

should_extract cached property

Check if the source with the same cache key was already extracted

extract(limit=None)

Extract the records for the current source with the configured handler. Defaults to investigraph.logic.extract:handle

Parameters:

Name Type Description Default
limit int | None

Optionally only return this number of items per source (for debugging purposes)

None

Yields:

Type Description
RecordGenerator

Generator of dictionaries dict[str, Any] that are the extracted records.

Source code in investigraph/model/context.py
def extract(self, limit: int | None = None) -> RecordGenerator:
    """
    Extract the records for the current source with the configured handler.
    Defaults to [`investigraph.logic.extract:handle`][investigraph.logic.extract]

    Args:
        limit: Optionally only return this number of items per source (for
            debugging purposes)

    Yields:
        Generator of dictionaries `dict[str, Any]` that are the extracted records.
    """

    if not self.should_extract:
        return

    def _records():
        for ix, record in enumerate(self.config.extract.handle(self), 1):
            if limit is not None and ix > limit:
                break
            record["__source__"] = self.source.name
            yield record

    yield from logged_items(
        _records(),
        "Extract",
        item_name="Record",
        logger=self.log,
        dataset=self.dataset,
        source=self.source.uri,
    )

    cache = get_archive_cache()
    cache.touch(self.extract_key)

open(mode=DEFAULT_MODE, **kwargs)

Open the context source as a file-like handler. If archive=True is set via extract stage config, the source will be downloaded locally first.

Example
def extract(ctx, *args, **kwargs):
    with ctx.open() as h:
        while line := h.readline():
            yield line

Parameters:

Name Type Description Default
mode str | None

The mode to open, defaults rb

DEFAULT_MODE

Returns:

Type Description
ContextManager[IO[AnyStr]]

A file-handler like context manager. The file gets closed when leaving the context.

Source code in investigraph/model/context.py
def open(
    self, mode: str | None = DEFAULT_MODE, **kwargs
) -> ContextManager[IO[AnyStr]]:
    """
    Open the context source as a file-like handler. If `archive=True` is set
    via extract stage config, the source will be downloaded locally first.

    Example:
        ```python
        def extract(ctx, *args, **kwargs):
            with ctx.open() as h:
                while line := h.readline():
                    yield line
        ```

    Args:
        mode: The mode to open, defaults `rb`

    Returns:
        A file-handler like context manager. The file gets closed when
            leaving the context.
    """
    uri = self.source.uri
    if self.config.extract.archive and not self.source.is_local:
        uri = archive_source(uri)
        archive = get_archive()
        return archive.open(uri, mode=mode, **kwargs)
    return smart_open(uri, mode=mode, **kwargs)

task()

Get a runtime task context to pass on to helper functions within transform stage. See TaskContext.emit

Example
def transform(ctx, record, ix):
    task_ctx = ctx.task()

    # do something, within this function use `task_ctx.emit()`
    handle_record(task_ctx, record)

    # pass on emitted entities to next stage
    yield from task_ctx

Returns:

Type Description
TaskContext

The runtime task context

Source code in investigraph/model/context.py
def task(self) -> "TaskContext":
    """
    Get a runtime task context to pass on to helper functions within
    transform stage. See [`TaskContext.emit`][investigraph.model.TaskContext.emit]

    Example:
        ```python
        def transform(ctx, record, ix):
            task_ctx = ctx.task()

            # do something, within this function use `task_ctx.emit()`
            handle_record(task_ctx, record)

            # pass on emitted entities to next stage
            yield from task_ctx
        ```

    Returns:
        The runtime task context
    """
    return TaskContext(**self.model_dump())

transform(records)

Transform extracted records from the current source into FollowTheMoney entities with the configured handler. Defaults to investigraph.logic.transform:map_ftm

Parameters:

Name Type Description Default
records RecordGenerator

Generator of record items as dict[str, Any]

required

Yields:

Type Description
StatementEntities

Generator of StatementEntity

Source code in investigraph/model/context.py
def transform(self, records: RecordGenerator) -> StatementEntities:
    """
    Transform extracted records from the current source into FollowTheMoney
    entities with the configured handler.
    Defaults to [`investigraph.logic.transform:map_ftm`][investigraph.logic.transform]

    Args:
        records: Generator of record items as `dict[str, Any]`

    Yields:
        Generator of StatementEntity
    """

    def _proxies():
        for ix, record in enumerate(records, 1):
            yield from self.config.transform.handle(self, record, ix)

    yield from logged_items(
        _proxies(),
        "Transform",
        item_name="Proxy",
        logger=self.log,
        dataset=self.dataset,
        source=self.source.uri,
    )

Runtime task context

Bases: SourceContext

Source code in investigraph/model/context.py
class TaskContext(SourceContext):
    model_config = ConfigDict(arbitrary_types_allowed=True)

    proxies: dict[str, StatementEntity] = {}
    data: dict[str, Any] = {}

    def __iter__(self) -> StatementEntities:
        yield from self.proxies.values()

    def emit(self, *proxies: StatementEntity | None) -> None:
        """
        Emit Entity instances during task
        runtime. The entities will already be merged. This is useful for helper
        functions within transform logic that create multiple entities "on the
        fly"

        Example:
            ```python
            def make_person(ctx: TaskContext, record: dict[str, Any]) -> E:
                person = ctx.make_entity("Person", id=1, name="Jane Doe")
                note = ctx.make_entity("Note", id="note-1", entity=person)

                # make sure the note entity is emitted as we are only returning
                # the person entity:
                ctx.emit(note)

                return person
            ```

        """
        for proxy in proxies:
            if proxy is not None:
                if not proxy.id:
                    raise DataError("No Entity ID!")
                # do merge already
                if proxy.id in self.proxies:
                    self.proxies[proxy.id] = merge(self.proxies[proxy.id], proxy)
                else:
                    self.proxies[proxy.id] = proxy

emit(*proxies)

Emit Entity instances during task runtime. The entities will already be merged. This is useful for helper functions within transform logic that create multiple entities "on the fly"

Example
def make_person(ctx: TaskContext, record: dict[str, Any]) -> E:
    person = ctx.make_entity("Person", id=1, name="Jane Doe")
    note = ctx.make_entity("Note", id="note-1", entity=person)

    # make sure the note entity is emitted as we are only returning
    # the person entity:
    ctx.emit(note)

    return person
Source code in investigraph/model/context.py
def emit(self, *proxies: StatementEntity | None) -> None:
    """
    Emit Entity instances during task
    runtime. The entities will already be merged. This is useful for helper
    functions within transform logic that create multiple entities "on the
    fly"

    Example:
        ```python
        def make_person(ctx: TaskContext, record: dict[str, Any]) -> E:
            person = ctx.make_entity("Person", id=1, name="Jane Doe")
            note = ctx.make_entity("Note", id="note-1", entity=person)

            # make sure the note entity is emitted as we are only returning
            # the person entity:
            ctx.emit(note)

            return person
        ```

    """
    for proxy in proxies:
        if proxy is not None:
            if not proxy.id:
                raise DataError("No Entity ID!")
            # do merge already
            if proxy.id in self.proxies:
                self.proxies[proxy.id] = merge(self.proxies[proxy.id], proxy)
            else:
                self.proxies[proxy.id] = proxy