Skip to content

Transform patterns

This guide covers patterns for writing effective transform handlers that convert extracted records into FollowTheMoney entities.

Data handling

Field tracking

Track which source fields you've processed to catch unmapped data:

def handle(ctx, record, ix):
    # Copy record to track fields
    data = dict(record)

    entity = ctx.make_entity("Organization")
    entity.id = ctx.make_slug(data.pop("id"))
    entity.add("name", data.pop("name"))
    entity.add("country", data.pop("country", None))

    # Log unhandled fields
    if data:
        ctx.log.warning("Unhandled fields", fields=list(data.keys()), record=ix)

    yield entity

Address composition

Build addresses from individual fields:

def handle(ctx, record, ix):
    entity = ctx.make_entity("Person")
    entity.id = ctx.make_slug(record["id"])
    entity.add("name", record["name"])

    # Compose address from parts
    entity.add("country", record.get("country"))
    entity.add("city", record.get("city"))
    entity.add("street", record.get("street"))
    entity.add("postalCode", record.get("postal_code"))

    yield entity

Multiple entities per record

Emit multiple related entities from a single record:

def handle(ctx, record, ix):
    # Create company
    company = ctx.make_entity("Company")
    company.id = ctx.make_slug(record["company_id"])
    company.add("name", record["company_name"])
    yield company

    # Create director
    director = ctx.make_entity("Person")
    director.id = ctx.make_slug(record["director_id"])
    director.add("name", record["director_name"])
    yield director

    # Create directorship relationship
    directorship = ctx.make_entity("Directorship")
    directorship.id = ctx.make_id(director.id, "director", company.id)
    directorship.add("director", director)
    directorship.add("organization", company)
    directorship.add("role", record.get("director_role"))
    yield directorship

Using task context

For complex transformations with helper functions that emit multiple entities:

def make_person(task_ctx, data):
    """Helper that emits multiple entities"""
    person = task_ctx.make_entity("Person")
    person.id = task_ctx.make_slug(data["id"])
    person.add("name", data["name"])

    # Create associated entities
    if data.get("passport"):
        passport = task_ctx.make_entity("Passport")
        passport.id = task_ctx.make_id(person.id, "passport", data["passport"])
        passport.add("holder", person)
        passport.add("number", data["passport"])
        task_ctx.emit(passport)  # emit via context

    return person


def handle(ctx, record, ix):
    task_ctx = ctx.task()

    # Use helper functions
    person = make_person(task_ctx, record)
    task_ctx.emit(person)

    # Yield all emitted entities
    yield from task_ctx

Data validation

Runtime assertions

Use assertions to catch data issues early:

def handle(ctx, record, ix):
    entity = ctx.make_entity("Organization")

    # Ensure required fields exist
    assert record.get("id"), f"Missing ID at row {ix}"
    entity.id = ctx.make_slug(record["id"])

    # Validate data format
    assert len(record["country"]) == 2, f"Invalid country code: {record['country']}"
    entity.add("country", record["country"])

    yield entity

Schema validation

Validate entities before yielding:

def handle(ctx, record, ix):
    entity = ctx.make_entity("Person")
    entity.id = ctx.make_slug(record["id"])
    entity.add("name", record["name"])

    # Check if entity is valid
    if not entity.has("name"):
        ctx.log.warning("Person without name", id=entity.id, row=ix)
        return

    yield entity

Text processing

Normalization

Clean text data before adding to entities:

from normality import slugify, collapse_spaces

def handle(ctx, record, ix):
    entity = ctx.make_entity("Organization")

    # Remove extra whitespace
    name = collapse_spaces(record["name"])
    entity.add("name", name)

    # Generate consistent slugs
    entity.id = ctx.make_slug(slugify(record["id"]))

    yield entity

Language handling

Preserve original language when available:

# FollowTheMoney supports language-tagged values
entity.add("name", record["name_en"], lang="eng")
entity.add("name", record["name_de"], lang="deu")

Creating relationships

Direct relationships

Link entities with relationship schemas:

def handle(ctx, record, ix):
    # Create entities
    person = ctx.make_entity("Person")
    person.id = ctx.make_slug("person", record["person_id"])
    person.add("name", record["person_name"])

    company = ctx.make_entity("Company")
    company.id = ctx.make_slug("company", record["company_id"])
    company.add("name", record["company_name"])

    # Create ownership
    ownership = ctx.make_entity("Ownership")
    ownership.id = ctx.make_id(person.id, "owns", company.id)
    ownership.add("owner", person)
    ownership.add("asset", company)
    ownership.add("startDate", record.get("acquired_date"))
    ownership.add("percentage", record.get("stake"))

    yield person
    yield company
    yield ownership

Family relationships

def handle(ctx, record, ix):
    person1 = ctx.make_entity("Person")
    person1.id = ctx.make_slug("person", record["person1_id"])
    person1.add("name", record["person1_name"])

    person2 = ctx.make_entity("Person")
    person2.id = ctx.make_slug("person", record["person2_id"])
    person2.add("name", record["person2_name"])

    family = ctx.make_entity("Family")
    family.id = ctx.make_id(person1.id, "family", person2.id)
    family.add("person", person1)
    family.add("relative", person2)
    family.add("relationship", record["relationship_type"])

    yield person1
    yield person2
    yield family

Conditional entity creation

Skip records based on conditions

def handle(ctx, record, ix):
    # Skip test records
    if record.get("status") == "TEST":
        return

    # Skip records without required data
    if not record.get("name") or not record.get("id"):
        ctx.log.warning("Skipping incomplete record", row=ix)
        return

    entity = ctx.make_entity("Organization")
    entity.id = ctx.make_slug(record["id"])
    entity.add("name", record["name"])

    yield entity

Different schemas based on data

def handle(ctx, record, ix):
    # Choose schema based on entity type
    entity_type = record.get("type", "").lower()

    if entity_type == "person":
        entity = ctx.make_entity("Person")
        entity.id = ctx.make_slug("person", record["id"])
        entity.add("name", record["name"])
        entity.add("birthDate", record.get("birth_date"))
    elif entity_type == "company":
        entity = ctx.make_entity("Company")
        entity.id = ctx.make_slug("company", record["id"])
        entity.add("name", record["name"])
        entity.add("incorporationDate", record.get("founded_date"))
    else:
        ctx.log.warning("Unknown entity type", type=entity_type, row=ix)
        return

    yield entity

Caching within transforms

Cache computed values when processing related records:

def handle(ctx, record, ix):
    task_ctx = ctx.task()

    # Cache organization IDs to avoid recomputation
    if record["org_id"] not in task_ctx.data:
        org = make_organization(task_ctx, record)
        task_ctx.data[record["org_id"]] = org.id
        task_ctx.emit(org)

    # Use cached ID
    person = make_person(task_ctx, record)
    membership = make_membership(
        task_ctx,
        person.id,
        task_ctx.data[record["org_id"]],
        record
    )

    yield from task_ctx


def make_organization(task_ctx, record):
    org = task_ctx.make_entity("Organization")
    org.id = task_ctx.make_slug("org", record["org_id"])
    org.add("name", record["org_name"])
    return org


def make_person(task_ctx, record):
    person = task_ctx.make_entity("Person")
    person.id = task_ctx.make_slug("person", record["person_id"])
    person.add("name", record["person_name"])
    return person


def make_membership(task_ctx, person_id, org_id, record):
    membership = task_ctx.make_entity("Membership")
    membership.id = task_ctx.make_id(person_id, "member", org_id)
    membership.add("member", person_id)
    membership.add("organization", org_id)
    membership.add("role", record.get("role"))
    return membership

Performance optimization

Memory management

Yield entities as you create them rather than accumulating:

def handle(ctx, record, ix):
    # Don't do this - accumulates in memory
    # entities = []
    # for item in large_list:
    #     entities.append(make_entity(ctx, item))
    # return entities

    # Do this - yields immediately
    for item in parse_items(record):
        yield make_entity(ctx, item)

Batch logging

For large datasets, log progress periodically:

def handle(ctx, record, ix):
    # Log progress every 1000 records
    if ix % 1000 == 0:
        ctx.log.info("Progress", records_processed=ix)

    entity = ctx.make_entity("Organization")
    entity.id = ctx.make_slug(record["id"])
    entity.add("name", record["name"])

    yield entity

Custom extract handlers

For sources beyond simple CSV/Excel:

def handle(ctx):
    """Custom extract handler"""
    with ctx.open() as fh:
        # Parse custom format
        for line in fh:
            if line.startswith("#"):
                continue  # skip comments

            parts = line.strip().split("|")
            yield {
                "id": parts[0],
                "name": parts[1],
                "type": parts[2],
            }

API pagination

Handle paginated APIs in extract stage:

def handle(ctx):
    """Extract from paginated API"""
    import requests

    page = 1
    while True:
        resp = requests.get(
            ctx.source.uri,
            params={"page": page, "limit": 100}
        )
        resp.raise_for_status()
        data = resp.json()

        if not data["results"]:
            break

        for item in data["results"]:
            yield item

        page += 1

Complex entity composition

Building composite entities

Create complex entities from multiple record types:

from ftmq.util import make_fingerprint
from followthemoney.util import make_entity_id

def parse_record(ctx, data: dict, body: StatementEntity):
    """Parse record and create related entities"""
    # Create persons from comma-separated lists
    involved = []
    for name, role in zip_things(
        data.pop("names"),
        data.pop("roles", ""),
        scream=True
    ):
        person = ctx.make_entity("Person")
        person.id = ctx.make_slug("person", make_entity_id(body.id, make_fingerprint(name)))
        person.add("name", name)
        person.add("description", role)
        involved.append(person)
        yield person

    # Create memberships
    for member in involved:
        membership = ctx.make_entity("Membership")
        membership.id = ctx.make_slug("membership", make_entity_id(body.id, member.id))
        membership.add("organization", body)
        membership.add("member", member)
        membership.add("role", member.get("description"))
        yield membership


def handle(ctx, record, ix):
    # Create main organization
    body = ctx.make_entity("PublicBody")
    body.id = ctx.make_slug(make_fingerprint(record.pop("name")))
    body.add("name", record["name"])
    body.add("jurisdiction", "eu")

    yield body
    yield from parse_record(ctx, record, body)

Conditional parsing

Route records to different parsers based on source:

from ftmq.util import make_fingerprint

def parse_record_ec(ctx, data: dict):
    """Parse EC representative meetings"""
    name = data.pop("cabinet_name")
    id_ = ctx.make_slug(make_fingerprint(name))
    body = ctx.make_entity("PublicBody", id_)
    body.add("name", name)
    body.add("jurisdiction", "eu")

    yield body
    yield from parse_common(ctx, data, body)


def parse_record_dg(ctx, data: dict):
    """Parse Director-General meetings"""
    acronym = data.pop("dg_acronym")
    id_ = ctx.make_slug("dg", acronym)
    body = ctx.make_entity("PublicBody", id_)
    body.add("name", data.pop("dg_full_name"))
    body.add("weakAlias", acronym)
    body.add("jurisdiction", "eu")

    yield body
    yield from parse_common(ctx, data, body)


def handle(ctx, record, ix):
    # Route based on source name
    if ctx.source.name.startswith("ec"):
        handler = parse_record_ec
    else:
        handler = parse_record_dg

    yield from handler(ctx, record)

Using entity captions

Use entity captions for building descriptions:

from followthemoney.util import join_text

def handle(ctx, record, ix):
    # Create participants
    participants = []
    for name in record["participant_names"].split(","):
        person = ctx.make_entity("Person")
        person.id = ctx.make_fingerprint_id(name.strip())
        person.add("name", name.strip())
        participants.append(person)

    # Create event with participant names in description
    event = ctx.make_entity("Event")
    event.id = ctx.make_slug("event", record["event_id"])

    # Use entity captions to build readable name
    label = join_text(*[p.caption for p in participants])
    event.add("name", f"{record['date']} - {label}")
    event.add("date", record["date"])

    # Add participants
    for p in participants:
        event.add("involved", p)

    yield from participants
    yield event

Configuration patterns

Organize transform queries logically

transform:
  queries:
    # Primary entities first
    - entities:
        person:
          schema: Person
          keys: [person_id]
          properties:
            name:
              column: person_name

    # Related entities second
    - entities:
        company:
          schema: Company
          keys: [company_id]
          properties:
            name:
              column: company_name

    # Relationships last
    - entities:
        ownership:
          schema: Ownership
          keys: [person_id, company_id]
          properties:
            owner:
              entity: person
            asset:
              entity: company

Real-world examples

EC Meetings dataset

Transform meeting records with organizations and representatives:

from ftmq.util import make_fingerprint
from followthemoney.util import join_text, make_entity_id

def handle(ctx, record, ix):
    # Create organizing body
    body = ctx.make_entity("PublicBody")
    body.id = ctx.make_slug(make_fingerprint(record["cabinet_name"]))
    body.add("name", record["cabinet_name"])

    # Create organizations from transparency register
    orgs = []
    for name, regId in zip_things(
        record["org_names"],
        record["transparency_ids"]
    ):
        org = ctx.make_entity("Organization")
        org.id = ctx.make_slug(regId, prefix="eu-tr")
        if make_fingerprint(name):
            org.add("name", name)
        org.add("idNumber", regId)
        orgs.append(org)

    # Create persons
    persons = []
    for name, role in zip_things(
        record["ec_rep_names"],
        record["ec_rep_titles"]
    ):
        person = ctx.make_entity("Person")
        person.id = ctx.make_slug("person", make_entity_id(body.id, make_fingerprint(name)))
        person.add("name", name)
        person.add("description", role)
        persons.append(person)

    # Create meeting event
    event = ctx.make_entity("Event")
    org_label = join_text(*[o.first("name") for o in orgs])
    event.id = ctx.make_slug(
        "meeting",
        record["date"],
        make_entity_id(body.id, *sorted([o.id for o in orgs]))
    )
    event.add("name", f"{record['date']} - {body.caption} x {org_label}")
    event.add("date", record["date"])
    event.add("summary", record["subject"])
    event.add("organizer", body)
    event.add("involved", persons)
    event.add("involved", orgs)

    yield body
    yield from orgs
    yield from persons
    yield event

Further reading