extract.py
Bring your own code to the extraction stage.
The entrypoint function must yield dictionaries that will be passed as records to the next stage to transform.
let investigraph fetch your sources
config.yml
extract.py
import csv
from io import StringIO
from typing import Any, Generator
from investigraph.model import Context, Resolver
def handle(ctx: Context, res: Resolver) -> Generator[dict[str, Any], None, None]:
content = res.get_content()
yield from csv.DictReader(StringIO(content))