Skip to content

Operations Reference

API documentation for all built-in operations.

Initializers

Operations for starting crawler pipelines.

Initialize crawler with params and optional proxy configuration.

Merges stage params into the data dict and configures HTTP proxy if MEMORIOUS_HTTP_PROXY is set and not in debug mode.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Initial data dict.

required
Example
pipeline:
  init:
    method: init
    params:
      api_key: ${API_KEY}
      base_url: https://api.example.com
    handle:
      pass: fetch
Source code in memorious/operations/initializers.py
@register("init")
def init(context: Context, data: dict[str, Any]) -> None:
    """Initialize crawler with params and optional proxy configuration.

    Merges stage params into the data dict and configures HTTP proxy
    if MEMORIOUS_HTTP_PROXY is set and not in debug mode.

    Args:
        context: The crawler context.
        data: Initial data dict.

    Params:
        Any params are merged into the emitted data.

    Example:
        ```yaml
        pipeline:
          init:
            method: init
            params:
              api_key: ${API_KEY}
              base_url: https://api.example.com
            handle:
              pass: fetch
        ```
    """
    # Configure proxy if set
    proxy = context.settings.http_proxy
    if proxy and not context.settings.debug:
        context.http.client._mounts.clear()
        import httpx

        context.http.client._mounts[httpx.URL("http://")] = httpx.HTTPTransport(
            proxy=proxy
        )
        context.http.client._mounts[httpx.URL("https://")] = httpx.HTTPTransport(
            proxy=proxy
        )
        context.http.save()
        context.log.info("Configured HTTP proxy", proxy=proxy)

    # Merge params into data
    context.emit(data={**data, **ensure_dict(context.params)})

Initialize a crawler with seed URLs.

Emits data items for each URL provided in the configuration. URLs can contain format placeholders that are substituted with values from the incoming data dict.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Values available for URL formatting.

required

Parameters:

Name Type Description Default
url

Single URL or list of URLs.

required
urls

List of URLs (alternative to url).

required
Example
pipeline:
  init:
    method: seed
    params:
      urls:
        - https://example.com/page/1
        - https://example.com/page/2
    handle:
      pass: fetch

  # Or with dynamic URLs:
  seed_dynamic:
    method: seed
    params:
      url: "https://example.com/items/%(item_id)s"
    handle:
      pass: fetch
Source code in memorious/operations/initializers.py
@register("seed")
def seed(context: Context, data: dict[str, Any]) -> None:
    """Initialize a crawler with seed URLs.

    Emits data items for each URL provided in the configuration.
    URLs can contain format placeholders that are substituted with
    values from the incoming data dict.

    Args:
        context: The crawler context.
        data: Values available for URL formatting.

    Params:
        url: Single URL or list of URLs.
        urls: List of URLs (alternative to `url`).

    Example:
        ```yaml
        pipeline:
          init:
            method: seed
            params:
              urls:
                - https://example.com/page/1
                - https://example.com/page/2
            handle:
              pass: fetch

          # Or with dynamic URLs:
          seed_dynamic:
            method: seed
            params:
              url: "https://example.com/items/%(item_id)s"
            handle:
              pass: fetch
        ```
    """
    for key in ("url", "urls"):
        for url in ensure_list(context.params.get(key)):
            url = url % data
            context.emit(data={"url": url})

Iterate through a set of items and emit each one.

Takes a list of items from configuration and emits a data item for each, with the item value available as data["item"].

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Base data dict to include in each emission.

required

Parameters:

Name Type Description Default
items

List of items to enumerate.

required
Example
pipeline:
  init:
    method: enumerate
    params:
      items:
        - category_a
        - category_b
        - category_c
    handle:
      pass: seed

  seed:
    method: seed
    params:
      url: "https://example.com/%(item)s"
    handle:
      pass: fetch
Source code in memorious/operations/initializers.py
@register("enumerate")
def enumerate(context: Context, data: dict[str, Any]) -> None:
    """Iterate through a set of items and emit each one.

    Takes a list of items from configuration and emits a data item
    for each, with the item value available as `data["item"]`.

    Args:
        context: The crawler context.
        data: Base data dict to include in each emission.

    Params:
        items: List of items to enumerate.

    Example:
        ```yaml
        pipeline:
          init:
            method: enumerate
            params:
              items:
                - category_a
                - category_b
                - category_c
            handle:
              pass: seed

          seed:
            method: seed
            params:
              url: "https://example.com/%(item)s"
            handle:
              pass: fetch
        ```
    """
    items = ensure_list(context.params.get("items"))
    for item in items:
        data["item"] = item
        context.emit(data=data)

Trigger multiple subsequent stages in parallel.

Emits to all configured handlers, useful for splitting a pipeline into multiple parallel branches.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Data to pass to all branches.

required
Example
pipeline:
  fetch:
    method: fetch
    handle:
      pass: tee

  tee:
    method: tee
    handle:
      pdf: store_pdf
      metadata: extract_meta
      archive: backup

  store_pdf:
    method: directory
    # ...
Source code in memorious/operations/initializers.py
@register("tee")
def tee(context: Context, data: dict[str, Any]) -> None:
    """Trigger multiple subsequent stages in parallel.

    Emits to all configured handlers, useful for splitting a pipeline
    into multiple parallel branches.

    Args:
        context: The crawler context.
        data: Data to pass to all branches.

    Example:
        ```yaml
        pipeline:
          fetch:
            method: fetch
            handle:
              pass: tee

          tee:
            method: tee
            handle:
              pdf: store_pdf
              metadata: extract_meta
              archive: backup

          store_pdf:
            method: directory
            # ...
        ```
    """
    for rule, _ in context.stage.handlers.items():
        context.emit(rule=rule, data=data)

Generate a sequence of numbers.

The memorious equivalent of Python's range(), accepting start, stop, and step parameters. Supports two modes: - Immediate: generates all numbers in the range at once. - Recursive: generates numbers one by one with optional delay.

The recursive mode is useful for very large sequences to avoid overwhelming the job queue.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

May contain "number" to continue a recursive sequence.

required

Parameters:

Name Type Description Default
start

Starting number (default: 1).

required
stop

Stop number (exclusive).

required
step

Step increment (default: 1, can be negative).

required
delay

If set, use recursive mode with this delay in seconds.

required
tag

If set, emit each number only once across crawler runs.

required
Example
pipeline:
  pages:
    method: sequence
    params:
      start: 1
      stop: 100
      step: 1
    handle:
      pass: fetch

  # Recursive mode for large sequences:
  large_sequence:
    method: sequence
    params:
      start: 1
      stop: 10000
      delay: 5  # 5 second delay between emissions
      tag: page_sequence  # Incremental: skip already processed
    handle:
      pass: fetch
Source code in memorious/operations/initializers.py
@register("sequence")
def sequence(context: Context, data: dict[str, Any]) -> None:
    """Generate a sequence of numbers.

    The memorious equivalent of Python's range(), accepting start,
    stop, and step parameters. Supports two modes:
    - Immediate: generates all numbers in the range at once.
    - Recursive: generates numbers one by one with optional delay.

    The recursive mode is useful for very large sequences to avoid
    overwhelming the job queue.

    Args:
        context: The crawler context.
        data: May contain "number" to continue a recursive sequence.

    Params:
        start: Starting number (default: 1).
        stop: Stop number (exclusive).
        step: Step increment (default: 1, can be negative).
        delay: If set, use recursive mode with this delay in seconds.
        tag: If set, emit each number only once across crawler runs.

    Example:
        ```yaml
        pipeline:
          pages:
            method: sequence
            params:
              start: 1
              stop: 100
              step: 1
            handle:
              pass: fetch

          # Recursive mode for large sequences:
          large_sequence:
            method: sequence
            params:
              start: 1
              stop: 10000
              delay: 5  # 5 second delay between emissions
              tag: page_sequence  # Incremental: skip already processed
            handle:
              pass: fetch
        ```
    """
    number = data.get("number", context.params.get("start", 1))
    stop = context.params.get("stop")
    step = context.params.get("step", 1)
    delay = context.params.get("delay")
    prefix = context.params.get("tag")
    while True:
        tag = None if prefix is None else "%s:%s" % (prefix, number)

        if tag is None or not context.check_tag(tag):
            data["number"] = number
            context.emit(data=data)

        if tag is not None:
            context.set_tag(tag, True)

        number = number + step
        if step > 0 and number >= stop:
            break
        if step < 0 and number <= stop:
            break

        if delay is not None:
            data["number"] = number
            context.recurse(data=data, delay=delay)
            break

Generate a sequence of dates.

Generates dates by iterating backwards from an end date with a specified interval. Useful for scraping date-based archives.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

May contain "current" to continue iteration.

required

Parameters:

Name Type Description Default
format

Date format string (default: "%Y-%m-%d").

required
end

End date string, or uses current date if not specified.

required
begin

Beginning date string.

required
days

Number of days per step (default: 0).

required
weeks

Number of weeks per step (default: 0).

required
steps

Number of steps if begin not specified (default: 100).

required
Example
pipeline:
  date_range:
    method: dates
    params:
      format: "%Y-%m-%d"
      begin: "2020-01-01"
      end: "2024-01-01"
      days: 1
    handle:
      pass: fetch

  # Or with weeks:
  weekly:
    method: dates
    params:
      weeks: 1
      steps: 52  # Last 52 weeks
    handle:
      pass: fetch
Note

Each emission includes both date (formatted string) and date_iso (ISO format) for flexibility.

Source code in memorious/operations/initializers.py
@register("dates")
def dates(context: Context, data: dict[str, Any]) -> None:
    """Generate a sequence of dates.

    Generates dates by iterating backwards from an end date with a
    specified interval. Useful for scraping date-based archives.

    Args:
        context: The crawler context.
        data: May contain "current" to continue iteration.

    Params:
        format: Date format string (default: "%Y-%m-%d").
        end: End date string, or uses current date if not specified.
        begin: Beginning date string.
        days: Number of days per step (default: 0).
        weeks: Number of weeks per step (default: 0).
        steps: Number of steps if begin not specified (default: 100).

    Example:
        ```yaml
        pipeline:
          date_range:
            method: dates
            params:
              format: "%Y-%m-%d"
              begin: "2020-01-01"
              end: "2024-01-01"
              days: 1
            handle:
              pass: fetch

          # Or with weeks:
          weekly:
            method: dates
            params:
              weeks: 1
              steps: 52  # Last 52 weeks
            handle:
              pass: fetch
        ```

    Note:
        Each emission includes both `date` (formatted string) and
        `date_iso` (ISO format) for flexibility.
    """
    format = context.params.get("format", "%Y-%m-%d")
    delta = timedelta(
        days=context.params.get("days", 0), weeks=context.params.get("weeks", 0)
    )
    if delta == timedelta():
        context.log.error("No interval given", params=context.params)
        return

    if "end" in context.params:
        current = context.params.get("end")
        current = datetime.strptime(current, format)
    else:
        current = datetime.utcnow()

    if "current" in data:
        current = datetime.strptime(data.get("current"), format)

    if "begin" in context.params:
        begin = context.params.get("begin")
        begin = datetime.strptime(begin, format)
    else:
        steps = context.params.get("steps", 100)
        begin = current - (delta * steps)

    context.emit(
        data={"date": current.strftime(format), "date_iso": current.isoformat()}
    )
    current = current - delta
    if current >= begin:
        data = {"current": current.strftime(format)}
        context.recurse(data=data)

Fetch

Operations for making HTTP requests.

Fetch a URL via HTTP GET request.

Performs an HTTP GET request on the URL specified in the data dict. Supports retry logic, URL rules filtering, incremental skipping, URL rewriting, pagination, and custom headers.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Must contain "url" key with the URL to fetch.

required

Parameters:

Name Type Description Default
rules

URL/content filtering rules (default: match_all).

required
retry

Number of retry attempts (default: 3).

required
emit_errors

If True, emit data even on HTTP errors (default: False).

required
headers

Extra HTTP headers to send.

required
base_url

Base URL for resolving relative URLs.

required
rewrite

URL rewriting configuration with "method" and "data" keys. Methods: "template" (Jinja2), "replace" (string replace).

required
pagination

Pagination config with "param" key for page number.

required
Example
pipeline:
  # Simple fetch
  fetch:
    method: fetch
    params:
      rules:
        domain: example.com
      retry: 5
    handle:
      pass: parse

  # Fetch with URL rewriting and headers
  fetch_detail:
    method: fetch
    params:
      headers:
        Referer: https://example.com/search
      rewrite:
        method: template
        data: "https://example.com/doc/{{ doc_id }}"
    handle:
      pass: parse

  # Fetch with pagination
  fetch_list:
    method: fetch
    params:
      url: https://example.com/results
      pagination:
        param: page
    handle:
      pass: parse
Source code in memorious/operations/fetch.py
@register("fetch")
def fetch(context: Context, data: dict[str, Any]) -> None:
    """Fetch a URL via HTTP GET request.

    Performs an HTTP GET request on the URL specified in the data dict.
    Supports retry logic, URL rules filtering, incremental skipping,
    URL rewriting, pagination, and custom headers.

    Args:
        context: The crawler context.
        data: Must contain "url" key with the URL to fetch.

    Params:
        rules: URL/content filtering rules (default: match_all).
        retry: Number of retry attempts (default: 3).
        emit_errors: If True, emit data even on HTTP errors (default: False).
        headers: Extra HTTP headers to send.
        base_url: Base URL for resolving relative URLs.
        rewrite: URL rewriting configuration with "method" and "data" keys.
            Methods: "template" (Jinja2), "replace" (string replace).
        pagination: Pagination config with "param" key for page number.

    Example:
        ```yaml
        pipeline:
          # Simple fetch
          fetch:
            method: fetch
            params:
              rules:
                domain: example.com
              retry: 5
            handle:
              pass: parse

          # Fetch with URL rewriting and headers
          fetch_detail:
            method: fetch
            params:
              headers:
                Referer: https://example.com/search
              rewrite:
                method: template
                data: "https://example.com/doc/{{ doc_id }}"
            handle:
              pass: parse

          # Fetch with pagination
          fetch_list:
            method: fetch
            params:
              url: https://example.com/results
              pagination:
                param: page
            handle:
              pass: parse
        ```
    """
    # Apply extra headers
    _apply_headers(context)

    # Get URL from params or data
    url = _get_url(context, data)
    if url is None:
        context.log.warning("No URL for GET request")
        return

    # Apply pagination
    if "pagination" in context.params:
        pagination = ensure_dict(context.params["pagination"])
        if "param" in pagination:
            page = data.get("page", 1)
            f = furl(url)
            f.args[pagination["param"]] = page
            url = f.url

    # Apply URL rewriting
    if "rewrite" in context.params:
        rewrite = context.params["rewrite"]
        method = rewrite.get("method")
        method_data = rewrite.get("data")
        if method == "replace":
            url = url.replace(*method_data)
        elif method == "template":
            url = render_template(method_data, data)

    if url is None:
        context.log.error("No URL specified")
        return

    # Handle relative URLs
    f = furl(url)
    if f.scheme is None:
        base_url = context.params.get("base_url")
        if base_url:
            url = furl(base_url).join(f).url
        elif "url" in data:
            url = furl(data["url"]).join(f).url

    # Validate URL scheme
    if urlparse(url).scheme not in ("http", "https", ""):
        context.log.info("Fetch skipped, unsupported scheme", url=url)
        return

    attempt = data.pop("retry_attempt", 1)
    try:
        result = context.http.get(url, lazy=True)
        rules = context.get("rules", {"match_all": {}})
        if not parse_rule(rules).apply(result):
            context.log.info("Fetch skip (rule)", url=result.url)
            return

        if not result.ok:
            context.emit_warning(
                "Fetch fail", url=result.url, status=result.status_code
            )
            if not context.params.get("emit_errors", False):
                return
        else:
            context.log.info("Fetched", url=result.url, status=result.status_code)

        data.update(result.serialize())
        if url != result.url:
            tag = context.make_key(context.run_id, url)
            context.set_tag(tag, None)
        context.emit(data=data)
    except httpx.HTTPError as ce:
        retries = int(context.get("retry", 3))
        if retries >= attempt:
            context.log.warning("Retry", url=url, error=str(ce), attempt=attempt)
            data["retry_attempt"] = attempt + 1
            context.recurse(data=data, delay=2**attempt)
        else:
            context.emit_warning("Fetch fail", url=url, error=str(ce))

Configure HTTP session parameters for subsequent requests.

Sets up authentication, user agent, referer, and proxy settings that will be used for all subsequent HTTP requests in this crawler run.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Passed through to next stage.

required

Parameters:

Name Type Description Default
user

Username for HTTP basic authentication.

required
password

Password for HTTP basic authentication.

required
user_agent

Custom User-Agent header.

required
url

URL to set as Referer header.

required
proxy

Proxy URL for HTTP/HTTPS requests.

required
Example
pipeline:
  setup_session:
    method: session
    params:
      user: "${HTTP_USER}"
      password: "${HTTP_PASSWORD}"
      user_agent: "MyBot/1.0"
    handle:
      pass: fetch
Source code in memorious/operations/fetch.py
@register("session")
def session(context: Context, data: dict[str, Any]) -> None:
    """Configure HTTP session parameters for subsequent requests.

    Sets up authentication, user agent, referer, and proxy settings
    that will be used for all subsequent HTTP requests in this crawler run.

    Args:
        context: The crawler context.
        data: Passed through to next stage.

    Params:
        user: Username for HTTP basic authentication.
        password: Password for HTTP basic authentication.
        user_agent: Custom User-Agent header.
        url: URL to set as Referer header.
        proxy: Proxy URL for HTTP/HTTPS requests.

    Example:
        ```yaml
        pipeline:
          setup_session:
            method: session
            params:
              user: "${HTTP_USER}"
              password: "${HTTP_PASSWORD}"
              user_agent: "MyBot/1.0"
            handle:
              pass: fetch
        ```
    """
    context.http.reset()

    user = context.get("user")
    password = context.get("password")

    if user is not None and password is not None:
        context.http.client.auth = (user, password)

    user_agent = context.get("user_agent")
    if user_agent is not None:
        context.http.client.headers["User-Agent"] = user_agent

    referer = context.get("url")
    if referer is not None:
        context.http.client.headers["Referer"] = referer

    proxy = context.get("proxy")
    if proxy is not None:
        context.http.client._mounts = {
            "http://": httpx.HTTPTransport(proxy=proxy),
            "https://": httpx.HTTPTransport(proxy=proxy),
        }

    # Explicitly save the session because no actual HTTP requests were made.
    context.http.save()
    context.emit(data=data)

Perform HTTP POST request with form data.

Sends a POST request with form-urlencoded data to the specified URL.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Current stage data.

required

Parameters:

Name Type Description Default
url

Target URL (or use data["url"]).

required
data dict[str, Any]

Dictionary of form fields to POST.

required
use_data

Map of {post_field: data_key} to include from data dict.

required
headers

Extra HTTP headers.

required
Example
pipeline:
  submit_form:
    method: post
    params:
      url: https://example.com/search
      data:
        query: "test"
        page: 1
      use_data:
        session_id: sid
      headers:
        X-Custom-Header: value
    handle:
      pass: parse
Source code in memorious/operations/fetch.py
@register("post")
def post(context: Context, data: dict[str, Any]) -> None:
    """Perform HTTP POST request with form data.

    Sends a POST request with form-urlencoded data to the specified URL.

    Args:
        context: The crawler context.
        data: Current stage data.

    Params:
        url: Target URL (or use data["url"]).
        data: Dictionary of form fields to POST.
        use_data: Map of {post_field: data_key} to include from data dict.
        headers: Extra HTTP headers.

    Example:
        ```yaml
        pipeline:
          submit_form:
            method: post
            params:
              url: https://example.com/search
              data:
                query: "test"
                page: 1
              use_data:
                session_id: sid
              headers:
                X-Custom-Header: value
            handle:
              pass: parse
        ```
    """
    _apply_headers(context)
    url = _get_url(context, data)
    if url is None:
        context.log.warning("No URL for POST request")
        return

    post_data = _get_post_data(context, data)
    context.log.debug("POST request", url=url)
    result = context.http.post(url, data=post_data)
    context.emit(data={**data, **result.serialize()})

Perform HTTP POST request with JSON body.

Sends a POST request with a JSON payload to the specified URL.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Current stage data.

required

Parameters:

Name Type Description Default
url

Target URL (or use data["url"]).

required
data dict[str, Any]

Dictionary to send as JSON body.

required
use_data

Map of {json_field: data_key} to include from data dict.

required
headers

Extra HTTP headers.

required
Example
pipeline:
  api_call:
    method: post_json
    params:
      url: https://api.example.com/documents
      data:
        action: "search"
        limit: 100
      use_data:
        document_id: doc_id
    handle:
      pass: process
Source code in memorious/operations/fetch.py
@register("post_json")
def post_json(context: Context, data: dict[str, Any]) -> None:
    """Perform HTTP POST request with JSON body.

    Sends a POST request with a JSON payload to the specified URL.

    Args:
        context: The crawler context.
        data: Current stage data.

    Params:
        url: Target URL (or use data["url"]).
        data: Dictionary to send as JSON body.
        use_data: Map of {json_field: data_key} to include from data dict.
        headers: Extra HTTP headers.

    Example:
        ```yaml
        pipeline:
          api_call:
            method: post_json
            params:
              url: https://api.example.com/documents
              data:
                action: "search"
                limit: 100
              use_data:
                document_id: doc_id
            handle:
              pass: process
        ```
    """
    _apply_headers(context)
    url = _get_url(context, data)
    if url is None:
        context.log.warning("No URL for POST request")
        return

    json_data = _get_post_data(context, data)
    context.log.debug("POST JSON request", url=url)
    result = context.http.post(url, json_data=json_data)
    context.emit(data={**data, **result.serialize()})

Perform HTTP POST to an HTML form with its current values.

Extracts form fields from an HTML page and submits them with optional additional data.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Current stage data (must have cached HTML response).

required

Parameters:

Name Type Description Default
form

XPath to locate the form element.

required
data dict[str, Any]

Additional form fields to add/override.

required
use_data

Map of {form_field: data_key} to include from data dict.

required
headers

Extra HTTP headers.

required
Example
pipeline:
  submit_search:
    method: post_form
    params:
      form: './/form[@id="search-form"]'
      data:
        query: "documents"
      use_data:
        csrf_token: token
    handle:
      pass: parse_results
Source code in memorious/operations/fetch.py
@register("post_form")
def post_form(context: Context, data: dict[str, Any]) -> None:
    """Perform HTTP POST to an HTML form with its current values.

    Extracts form fields from an HTML page and submits them with
    optional additional data.

    Args:
        context: The crawler context.
        data: Current stage data (must have cached HTML response).

    Params:
        form: XPath to locate the form element.
        data: Additional form fields to add/override.
        use_data: Map of {form_field: data_key} to include from data dict.
        headers: Extra HTTP headers.

    Example:
        ```yaml
        pipeline:
          submit_search:
            method: post_form
            params:
              form: './/form[@id="search-form"]'
              data:
                query: "documents"
              use_data:
                csrf_token: token
            handle:
              pass: parse_results
        ```
    """
    _apply_headers(context)
    form_xpath = context.params.get("form")
    if not form_xpath:
        context.log.error("No form XPath specified")
        return

    result = context.http.rehash(data)
    if result.html is None:
        context.log.error("No HTML content to extract form from")
        return

    action, form_data = extract_form(result.html, form_xpath)
    if action is None:
        context.log.error("Form not found", xpath=form_xpath)
        return

    base_url = data.get("url", "")
    url = furl(base_url).join(action).url

    # Merge form data with additional data from params
    form_data.update(_get_post_data(context, data))
    context.log.debug("POST form request", url=url)
    post_result = context.http.post(url, data=form_data)
    context.emit(data={**data, **post_result.serialize()})

Parse

Operations for parsing responses.

Parse HTML response and extract URLs and metadata.

The main parsing operation that extracts URLs from HTML documents for further crawling and metadata based on XPath expressions.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Must contain cached HTTP response data.

required

Parameters:

Name Type Description Default
include_paths

List of XPath expressions to search for URLs.

required
meta

Dict mapping field names to XPath expressions.

required
meta_date

Dict mapping date field names to XPath expressions.

required
store

Rules dict to match responses for storage.

required
schema

FTM schema name for entity extraction.

required
properties

Dict mapping FTM properties to XPath expressions.

required
Example
pipeline:
  parse:
    method: parse
    params:
      include_paths:
        - './/div[@class="content"]//a'
      meta:
        title: './/h1/text()'
        author: './/span[@class="author"]/text()'
      meta_date:
        published_at: './/time/@datetime'
      store:
        mime_group: documents
    handle:
      fetch: fetch
      store: store
Source code in memorious/operations/parse.py
@register("parse")
def parse(context: Context, data: dict[str, Any]) -> None:
    """Parse HTML response and extract URLs and metadata.

    The main parsing operation that extracts URLs from HTML documents
    for further crawling and metadata based on XPath expressions.

    Args:
        context: The crawler context.
        data: Must contain cached HTTP response data.

    Params:
        include_paths: List of XPath expressions to search for URLs.
        meta: Dict mapping field names to XPath expressions.
        meta_date: Dict mapping date field names to XPath expressions.
        store: Rules dict to match responses for storage.
        schema: FTM schema name for entity extraction.
        properties: Dict mapping FTM properties to XPath expressions.

    Example:
        ```yaml
        pipeline:
          parse:
            method: parse
            params:
              include_paths:
                - './/div[@class="content"]//a'
              meta:
                title: './/h1/text()'
                author: './/span[@class="author"]/text()'
              meta_date:
                published_at: './/time/@datetime'
              store:
                mime_group: documents
            handle:
              fetch: fetch
              store: store
        ```
    """
    with context.http.rehash(data) as result:
        if result.html is not None:
            context.log.info("Parse HTML", url=result.url)

            # Extract page title
            for title in result.html.xpath(".//title/text()"):
                if title is not None and "title" not in data:
                    data["title"] = title

            _extract_metadata(context, data, result.html)

            if context.params.get("schema") is not None:
                _extract_ftm(context, data, result.html)

            _extract_urls(context, data, result.html, result.url)

        rules = context.params.get("store") or {"match_all": {}}
        if parse_rule(rules).apply(result):
            context.emit(rule="store", data=data)

Parse HTML listing with multiple items.

Extracts metadata from a list of items on a page and handles pagination. Useful for search results, archives, and index pages.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Must contain cached HTTP response data.

required

Parameters:

Name Type Description Default
items

XPath expression to select item elements.

required
meta

Dict mapping field names to XPath expressions (per item).

required
pagination

Pagination configuration.

required
emit

If True, emit each item's data.

required
parse_html

If True, extract URLs from items (default: True).

required
Example
pipeline:
  parse_results:
    method: parse_listing
    params:
      items: './/div[@class="result-item"]'
      meta:
        title: './/h2/text()'
        url: './/a/@href'
      pagination:
        total_pages: './/span[@class="pages"]/text()'
        param: page
      emit: true
    handle:
      item: fetch_detail
      next_page: fetch
Source code in memorious/operations/parse.py
@register("parse_listing")
def parse_listing(context: Context, data: dict[str, Any]) -> None:
    """Parse HTML listing with multiple items.

    Extracts metadata from a list of items on a page and handles
    pagination. Useful for search results, archives, and index pages.

    Args:
        context: The crawler context.
        data: Must contain cached HTTP response data.

    Params:
        items: XPath expression to select item elements.
        meta: Dict mapping field names to XPath expressions (per item).
        pagination: Pagination configuration.
        emit: If True, emit each item's data.
        parse_html: If True, extract URLs from items (default: True).

    Example:
        ```yaml
        pipeline:
          parse_results:
            method: parse_listing
            params:
              items: './/div[@class="result-item"]'
              meta:
                title: './/h2/text()'
                url: './/a/@href'
              pagination:
                total_pages: './/span[@class="pages"]/text()'
                param: page
              emit: true
            handle:
              item: fetch_detail
              next_page: fetch
        ```
    """
    should_emit = context.params.get("emit") is True
    should_parse_html = context.params.get("parse_html", True) is True
    items_xpath = context.params.get("items")

    with context.http.rehash(data) as result:
        if result.html is not None:
            base_url = data.get("url", result.url)

            for item in result.html.xpath(items_xpath):
                item_data = {**data}
                _extract_metadata(context, item_data, item)

                if should_parse_html:
                    _extract_urls(context, item_data, item, base_url)
                if should_emit:
                    context.emit(rule="item", data=item_data)

            paginate(context, data, result.html)

            rules = context.params.get("store") or {"match_all": {}}
            if parse_rule(rules).apply(result):
                context.emit(rule="store", data=data)

Parse JSON response using jq patterns.

Uses the jq query language to extract data from JSON responses. Emits one data item for each result from the jq query.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Must contain cached HTTP response data.

required

Parameters:

Name Type Description Default
pattern

jq pattern string to extract data.

required
Example
pipeline:
  parse_api:
    method: parse_jq
    params:
      pattern: '.results[] | {id: .id, name: .title, url: .links.self}'
    handle:
      pass: fetch_detail
Source code in memorious/operations/parse.py
@register("parse_jq")
def parse_jq(context: Context, data: dict[str, Any]) -> None:
    """Parse JSON response using jq patterns.

    Uses the jq query language to extract data from JSON responses.
    Emits one data item for each result from the jq query.

    Args:
        context: The crawler context.
        data: Must contain cached HTTP response data.

    Params:
        pattern: jq pattern string to extract data.

    Example:
        ```yaml
        pipeline:
          parse_api:
            method: parse_jq
            params:
              pattern: '.results[] | {id: .id, name: .title, url: .links.self}'
            handle:
              pass: fetch_detail
        ```
    """
    result = context.http.rehash(data)
    json_data = clean_dict(result.json)

    pattern = context.params["pattern"]
    jq_result = jq.compile(pattern).input(json_data)
    for item in jq_result.all():
        context.emit(data={**data, **item})

Parse CSV file and emit rows.

Reads a CSV file and emits each row as a data item. Can also emit all rows together as a list.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Must contain cached HTTP response data.

required

Parameters:

Name Type Description Default
skiprows

Number of rows to skip at the beginning.

required
delimiter

CSV field delimiter (default: comma).

required
Example
pipeline:
  parse_data:
    method: parse_csv
    params:
      skiprows: 1
      delimiter: ";"
    handle:
      row: process_row
      rows: store_all
Source code in memorious/operations/parse.py
@register("parse_csv")
def parse_csv(context: Context, data: dict[str, Any]) -> None:
    """Parse CSV file and emit rows.

    Reads a CSV file and emits each row as a data item. Can also
    emit all rows together as a list.

    Args:
        context: The crawler context.
        data: Must contain cached HTTP response data.

    Params:
        skiprows: Number of rows to skip at the beginning.
        delimiter: CSV field delimiter (default: comma).
        (Other csv.DictReader kwargs are supported)

    Example:
        ```yaml
        pipeline:
          parse_data:
            method: parse_csv
            params:
              skiprows: 1
              delimiter: ";"
            handle:
              row: process_row
              rows: store_all
        ```
    """
    result = context.http.rehash(data)
    parserkwargs = ensure_dict(context.params)
    skiprows = parserkwargs.pop("skiprows", 0)

    with result.local_path() as local_path:
        with open(local_path, encoding="utf-8") as fh:
            reader = csv.DictReader(fh, **parserkwargs)
            for _ in range(skiprows):
                next(reader, None)
            rows = []
            for row in reader:
                context.emit(rule="row", data=row, optional=True)
                rows.append(row)
            context.emit(rule="rows", data={**data, "rows": rows})

Parse XML response and extract metadata.

Parses an XML document and extracts metadata using XPath expressions.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Must contain cached HTTP response data.

required

Parameters:

Name Type Description Default
meta

Dict mapping field names to XPath expressions.

required
meta_date

Dict mapping date field names to XPath expressions.

required
Example
pipeline:
  parse_feed:
    method: parse_xml
    params:
      meta:
        title: './/item/title/text()'
        link: './/item/link/text()'
    handle:
      pass: fetch
Source code in memorious/operations/parse.py
@register("parse_xml")
def parse_xml(context: Context, data: dict[str, Any]) -> None:
    """Parse XML response and extract metadata.

    Parses an XML document and extracts metadata using XPath expressions.

    Args:
        context: The crawler context.
        data: Must contain cached HTTP response data.

    Params:
        meta: Dict mapping field names to XPath expressions.
        meta_date: Dict mapping date field names to XPath expressions.

    Example:
        ```yaml
        pipeline:
          parse_feed:
            method: parse_xml
            params:
              meta:
                title: './/item/title/text()'
                link: './/item/link/text()'
            handle:
              pass: fetch
        ```
    """
    result = context.http.rehash(data)
    if result.xml is not None:
        _extract_metadata(context, data, result.xml)
    context.emit(data=data)

Clean

Operations for cleaning data.

Clean and validate metadata in the data dict.

Performs various data transformations including dropping keys, setting defaults, rewriting values, validating required fields, and type casting.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Data dict to clean (modified in place).

required

Parameters:

Name Type Description Default
drop

List of keys to remove from data.

required
defaults

Dict of default values for missing keys.

required
values

Dict for value rewriting (mapping or format string).

required
required

List of required keys (raises MetaDataError if missing).

required
typing

Type casting configuration with ignore list and date kwargs.

required
Example
pipeline:
  clean:
    method: clean
    params:
      drop:
        - page
        - formdata
        - session_id
      defaults:
        source: "web"
        language: "en"
      values:
        foreign_id: "{publisher[id]}-{reference}"
        status:
          draft: unpublished
          live: published
      required:
        - title
        - url
        - published_at
      typing:
        ignore:
          - reference
          - phone_number
        dateparserkwargs:
          dayfirst: true
    handle:
      pass: store
Source code in memorious/operations/clean.py
@register("clean")
def clean(context: Context, data: dict[str, Any]) -> None:
    """Clean and validate metadata in the data dict.

    Performs various data transformations including dropping keys,
    setting defaults, rewriting values, validating required fields,
    and type casting.

    Args:
        context: The crawler context.
        data: Data dict to clean (modified in place).

    Params:
        drop: List of keys to remove from data.
        defaults: Dict of default values for missing keys.
        values: Dict for value rewriting (mapping or format string).
        required: List of required keys (raises MetaDataError if missing).
        typing: Type casting configuration with ignore list and date kwargs.

    Example:
        ```yaml
        pipeline:
          clean:
            method: clean
            params:
              drop:
                - page
                - formdata
                - session_id
              defaults:
                source: "web"
                language: "en"
              values:
                foreign_id: "{publisher[id]}-{reference}"
                status:
                  draft: unpublished
                  live: published
              required:
                - title
                - url
                - published_at
              typing:
                ignore:
                  - reference
                  - phone_number
                dateparserkwargs:
                  dayfirst: true
            handle:
              pass: store
        ```
    """
    # Drop keys
    for key in ensure_list(context.params.get("drop")):
        data.pop(key, None)

    # Set defaults
    for key, value in ensure_dict(context.params.get("defaults")).items():
        if key not in data:
            data[key] = value

    # Rewrite values
    for key, values in ensure_dict(context.params.get("values")).items():
        if is_mapping(values) and data.get(key) in values:
            data[key] = values[data[key]]
        elif isinstance(values, str):
            try:
                data[key] = values.format(**data)
            except KeyError as e:
                context.log.warning(
                    "Missing key for format string", key=key, error=str(e)
                )

    # Validate required
    for key in ensure_list(context.params.get("required")):
        if key not in data:
            context.emit_warning(MetaDataError(f"`{key}` required but missing"))

    # Type casting
    typing_config = ensure_dict(context.params.get("typing"))
    if typing_config:
        ignore_keys = ensure_list(typing_config.get("ignore"))
        datekwargs = ensure_dict(typing_config.get("dateparserkwargs"))
        data.update(cast_dict(data, ignore_keys, **datekwargs))

    context.emit(data=data)

Clean HTML by removing specified elements.

Removes HTML elements matching the given XPath expressions and stores the cleaned HTML.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Must contain cached HTTP response data.

required

Parameters:

Name Type Description Default
remove_paths

List of XPath expressions for elements to remove.

required
Example
pipeline:
  clean:
    method: clean_html
    params:
      remove_paths:
        - './/script'
        - './/style'
        - './/nav'
        - './/footer'
    handle:
      pass: parse
Source code in memorious/operations/clean.py
@register("clean_html")
def clean_html(context: Context, data: dict[str, Any]) -> None:
    """Clean HTML by removing specified elements.

    Removes HTML elements matching the given XPath expressions and
    stores the cleaned HTML.

    Args:
        context: The crawler context.
        data: Must contain cached HTTP response data.

    Params:
        remove_paths: List of XPath expressions for elements to remove.

    Example:
        ```yaml
        pipeline:
          clean:
            method: clean_html
            params:
              remove_paths:
                - './/script'
                - './/style'
                - './/nav'
                - './/footer'
            handle:
              pass: parse
        ```
    """
    with context.http.rehash(data) as result:
        if not result.ok or result.html is None:
            context.emit(data=data)
            return
        doc = result.html
        for path in ensure_list(context.params.get("remove_paths")):
            for el in doc.xpath(path):
                el.drop_tree()
        content_hash = context.store_data(html.tostring(doc, pretty_print=True))
        data["content_hash"] = content_hash
        context.emit(data=data)

Extract

Operations for extracting archives.

Extract files from a compressed archive.

Supports ZIP, TAR (including gzip/bzip2), and 7z archives. Emits each extracted file as a separate data item.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Must contain cached HTTP response data.

required

Parameters:

Name Type Description Default
wildcards

List of shell-style patterns to filter extracted files.

required
Example
pipeline:
  extract:
    method: extract
    params:
      wildcards:
        - "*.pdf"
        - "*.doc"
        - "documents/*"
    handle:
      pass: store
Source code in memorious/operations/extract.py
@register("extract")
def extract(context: Context, data: dict[str, Any]) -> None:
    """Extract files from a compressed archive.

    Supports ZIP, TAR (including gzip/bzip2), and 7z archives.
    Emits each extracted file as a separate data item.

    Args:
        context: The crawler context.
        data: Must contain cached HTTP response data.

    Params:
        wildcards: List of shell-style patterns to filter extracted files.

    Example:
        ```yaml
        pipeline:
          extract:
            method: extract
            params:
              wildcards:
                - "*.pdf"
                - "*.doc"
                - "documents/*"
            handle:
              pass: store
        ```
    """
    with context.http.rehash(data) as result:
        if not result.ok:
            return

        with result.local_path() as local_file:
            file_path = str(local_file)
            content_type = result.content_type
            extract_dir = random_filename(context.work_path)

            if content_type in ZIP_MIME_TYPES:
                extracted_files = extract_zip(file_path, extract_dir, context)
            elif content_type in TAR_MIME_TYPES:
                extracted_files = extract_tar(file_path, extract_dir, context)
            elif content_type in SEVENZIP_MIME_TYPES:
                extracted_files = extract_7zip(file_path, extract_dir, context)
            else:
                context.log.warning(
                    "Unsupported archive content type", content_type=content_type
                )
                return

            wildcards = ensure_list(context.params.get("wildcards")) or None
            for path in extracted_files:
                if wildcards is None or _test_fname(wildcards, path):
                    relative_path = os.path.relpath(path, extract_dir)
                    content_hash = context.store_file(path)
                    data["content_hash"] = content_hash
                    data["file_name"] = relative_path
                    context.emit(data=data.copy())

Regex

Operations for regex extraction.

Extract named regex groups from data values.

Uses regex named capture groups to extract structured data from string values. Supports both simple single-pattern extraction and advanced multi-pattern extraction with splitting.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Data dict to extract from (modified in place).

required

Parameters:

Name Type Description Default
<key>

Regex pattern with named groups, or config dict.

required
Config dict supports

pattern/patterns: Single pattern or list of patterns. store_as: Key name for storing the result. split: Separator to split value before matching.

required
Example
pipeline:
  extract:
    method: regex_groups
    params:
      # Simple extraction: source key -> named groups added to data
      full_name: '(?P<first_name>\w+)\s(?P<last_name>\w+)'

      # From "John Doe" extracts: first_name="John", last_name="Doe"

      # Advanced extraction with splitting
      originators_raw:
        store_as: originators
        split: ","
        patterns:
          - '(?P<name>.*),\s*(?P<party>\w+)'
          - '(?P<name>.*)'

      # From "John Doe, SPD, Jane Smith" extracts:
      # originators = [
      #   {name: "John Doe", party: "SPD"},
      #   {name: "Jane Smith"}
      # ]

      # Metadata extraction
      meta_raw: >-
        .*Drucksache\s+(?P<reference>\d+/\d+)
        .*vom\s+(?P<published_at>\d{2}\.\d{2}\.\d{4}).*
    handle:
      pass: clean
Source code in memorious/operations/regex.py
@register("regex_groups")
def regex_groups(context: Context, data: dict[str, Any]) -> None:
    """Extract named regex groups from data values.

    Uses regex named capture groups to extract structured data from
    string values. Supports both simple single-pattern extraction and
    advanced multi-pattern extraction with splitting.

    Args:
        context: The crawler context.
        data: Data dict to extract from (modified in place).

    Params:
        <key>: Regex pattern with named groups, or config dict.
        Config dict supports:
            pattern/patterns: Single pattern or list of patterns.
            store_as: Key name for storing the result.
            split: Separator to split value before matching.

    Example:
        ```yaml
        pipeline:
          extract:
            method: regex_groups
            params:
              # Simple extraction: source key -> named groups added to data
              full_name: '(?P<first_name>\\w+)\\s(?P<last_name>\\w+)'

              # From "John Doe" extracts: first_name="John", last_name="Doe"

              # Advanced extraction with splitting
              originators_raw:
                store_as: originators
                split: ","
                patterns:
                  - '(?P<name>.*),\\s*(?P<party>\\w+)'
                  - '(?P<name>.*)'

              # From "John Doe, SPD, Jane Smith" extracts:
              # originators = [
              #   {name: "John Doe", party: "SPD"},
              #   {name: "Jane Smith"}
              # ]

              # Metadata extraction
              meta_raw: >-
                .*Drucksache\\s+(?P<reference>\\d+/\\d+)
                .*vom\\s+(?P<published_at>\\d{2}\\.\\d{2}\\.\\d{4}).*
            handle:
              pass: clean
        ```
    """
    for key, patterns in ensure_dict(context.params).items():
        log_fn = context.log.warning

        if is_mapping(patterns):
            # Advanced extraction configuration
            config = dict(patterns)

            if key not in data:
                continue

            pattern_list = ensure_list(
                config.get("pattern", config.get("patterns", []))
            )
            store_key = config.get("store_as", key)
            separator = config.get("split")

            if separator:
                # Split value and extract from each part
                values = str(data[key]).split(separator)
                result = [
                    _extract_regex_groups(key, v.strip(), pattern_list, log_fn)
                    for v in values
                ]
                # Filter out empty results
                result = [r for r in result if r]
            else:
                # Single extraction
                result = _extract_regex_groups(key, data, pattern_list, log_fn)

            data[store_key] = result

        else:
            # Simple extraction: pattern(s) directly as value
            data.update(_extract_regex_groups(key, data, ensure_list(patterns), log_fn))

    context.emit(data=data)

Store

Operations for storing data.

Store with configurable backend and incremental marking.

A flexible store operation that delegates to other storage methods and marks incremental completion when the target stage is reached.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Must contain content_hash from a fetched response.

required

Parameters:

Name Type Description Default
operation

Storage operation name (default: "directory"). Options: "directory", "lakehouse"

required
Example
pipeline:
  store:
    method: store
    params:
      operation: lakehouse
Note

Incremental completion is marked automatically by the underlying storage operations (directory, lakehouse).

Source code in memorious/operations/store.py
@register("store")
def store(context: Context, data: dict[str, Any]) -> None:
    """Store with configurable backend and incremental marking.

    A flexible store operation that delegates to other storage methods
    and marks incremental completion when the target stage is reached.

    Args:
        context: The crawler context.
        data: Must contain content_hash from a fetched response.

    Params:
        operation: Storage operation name (default: "directory").
            Options: "directory", "lakehouse"

    Example:
        ```yaml
        pipeline:
          store:
            method: store
            params:
              operation: lakehouse
        ```

    Note:
        Incremental completion is marked automatically by the underlying
        storage operations (directory, lakehouse).
    """
    operation = context.params.get("operation", "directory")

    if operation == "directory":
        directory(context, data)
    elif operation == "lakehouse":
        lakehouse(context, data)
    else:
        context.log.error("Unknown store operation", operation=operation)

Store collected files to a local directory.

Saves files to a directory structure organized by crawler name. Also stores metadata as a JSON sidecar file.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Must contain content_hash from a fetched response.

required

Parameters:

Name Type Description Default
path

Custom storage path (default: {base_path}/store/{crawler_name}).

required
compute_path

Configure how file paths are computed. method: The path computation method (default: "url_path") - "url_path": Use the URL path - "template": Use Jinja2 template with data context - "file_name": Use only the file name (flat structure) params: Method-specific parameters For url_path: include_domain: bool - Include domain as path prefix (default: false) strip_prefix: str - Strip this prefix from the path For template: template: str - Jinja2 template with data context

required
Example
pipeline:
  store:
    method: directory
    params:
      path: /data/documents
      compute_path:
        method: url_path
        params:
          include_domain: true
          strip_prefix: "/api/v1"
Source code in memorious/operations/store.py
@register("directory")
def directory(context: Context, data: dict[str, Any]) -> None:
    """Store collected files to a local directory.

    Saves files to a directory structure organized by crawler name.
    Also stores metadata as a JSON sidecar file.

    Args:
        context: The crawler context.
        data: Must contain content_hash from a fetched response.

    Params:
        path: Custom storage path (default: {base_path}/store/{crawler_name}).
        compute_path: Configure how file paths are computed.
            method: The path computation method (default: "url_path")
                - "url_path": Use the URL path
                - "template": Use Jinja2 template with data context
                - "file_name": Use only the file name (flat structure)
            params: Method-specific parameters
                For url_path:
                    include_domain: bool - Include domain as path prefix (default: false)
                    strip_prefix: str - Strip this prefix from the path
                For template:
                    template: str - Jinja2 template with data context

    Example:
        ```yaml
        pipeline:
          store:
            method: directory
            params:
              path: /data/documents
              compute_path:
                method: url_path
                params:
                  include_domain: true
                  strip_prefix: "/api/v1"
        ```
    """
    with context.http.rehash(data) as result:
        if not result.ok:
            return

        content_hash = data.get("content_hash")
        if content_hash is None:
            context.emit_warning("No content hash in data.")
            return

        base_path = Path(_get_directory_path(context))

        # Compute the relative path (helper uses result.file_name as fallback)
        relative_path = _compute_file_path(
            context, data, content_hash, result.file_name
        )

        # Build full path and ensure parent directories exist
        file_path = base_path / relative_path
        file_path.parent.mkdir(parents=True, exist_ok=True)

        # Check for collision: file exists with different content
        if file_path.exists():
            with open(file_path, "rb") as fh:
                existing_hash = make_checksum(fh)
            if existing_hash != content_hash:
                # Add 8-char hash suffix to avoid overwriting
                suffix = content_hash[:8]
                new_name = f"{file_path.stem}_{suffix}{file_path.suffix}"
                file_path = file_path.parent / new_name
                relative_path = relative_path.parent / new_name

        data["_file_name"] = str(relative_path)

        if not file_path.exists():
            with result.local_path() as p:
                shutil.copyfile(p, file_path)

        context.log.info("Store [directory]", file=str(relative_path))

        # Store metadata as sidecar JSON (named by content_hash for easy lookup)
        meta_path = file_path.parent / f"{content_hash}.json"
        with open(meta_path, "w") as fh:
            json.dump(data, fh)

        # Mark incremental completion
        context.mark_emit_complete(data)
        context.emit(data=data)

Store collected file in the ftm-lakehouse archive.

Stores files in a structured archive with metadata tracking, suitable for integration with Aleph and other FTM-based systems.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Must contain content_hash from a fetched response.

required

Parameters:

Name Type Description Default
uri

Custom lakehouse URI (default: context.archive).

required
compute_path

Configure how file keys are computed. method: The path computation method (default: "url_path") - "url_path": Use the URL path - "template": Use Jinja2 template with data context - "file_name": Use only the file name (flat structure) params: Method-specific parameters For url_path: include_domain: bool - Include domain as path prefix (default: false) strip_prefix: str - Strip this prefix from the path For template: template: str - Jinja2 template with data context

required
Example
pipeline:
  store:
    method: lakehouse
    params:
      uri: s3://bucket/archive
      compute_path:
        method: url_path
        params:
          strip_prefix: "/api/v1"
Source code in memorious/operations/store.py
@register("lakehouse")
def lakehouse(context: Context, data: dict[str, Any]) -> None:
    """Store collected file in the ftm-lakehouse archive.

    Stores files in a structured archive with metadata tracking,
    suitable for integration with Aleph and other FTM-based systems.

    Args:
        context: The crawler context.
        data: Must contain content_hash from a fetched response.

    Params:
        uri: Custom lakehouse URI (default: context.archive).
        compute_path: Configure how file keys are computed.
            method: The path computation method (default: "url_path")
                - "url_path": Use the URL path
                - "template": Use Jinja2 template with data context
                - "file_name": Use only the file name (flat structure)
            params: Method-specific parameters
                For url_path:
                    include_domain: bool - Include domain as path prefix (default: false)
                    strip_prefix: str - Strip this prefix from the path
                For template:
                    template: str - Jinja2 template with data context

    Example:
        ```yaml
        pipeline:
          store:
            method: lakehouse
            params:
              uri: s3://bucket/archive
              compute_path:
                method: url_path
                params:
                  strip_prefix: "/api/v1"
        ```
    """
    with context.http.rehash(data) as result:
        if not result.ok:
            return

        content_hash = data.get("content_hash")
        if content_hash is None:
            context.emit_warning("No content hash in data.")
            return

        # Compute the file key using compute_path config
        relative_path = _compute_file_path(
            context, data, content_hash, result.file_name
        )
        file_key = str(relative_path)
        file_name = relative_path.name

        # Extract MIME type from headers for lakehouse metadata
        headers = httpx.Headers(data.get("headers", {}))
        mime_type = normalize_mimetype(headers.get("content-type"))

        # Use custom URI if provided, otherwise use context archive
        uri = context.params.get("uri")
        if uri:
            archive = get_lakehouse(uri).get_dataset(context.crawler.name).archive
        else:
            archive = context.archive

        # Store file in lakehouse archive with metadata. If the archive is the
        # same as the memorious intermediary archive (which is the default), the
        # file already exists and only the metadata is updated.
        data.update(
            origin="memorious",
            name=file_name,
            key=file_key,
            mimetype=mime_type,
        )
        file = archive.lookup_file(content_hash)
        if file is not None:
            file = _patch_file(file, **data)
        with result.local_path() as local_path:
            # this only stores if checksum is not already existing
            file = archive.archive_file(local_path, **data)

        context.log.info(
            "Store [lakehouse]", file=file_name, key=file_key, checksum=file.checksum
        )

        # Mark incremental completion
        context.mark_emit_complete(data)
        context.emit(data=data)

Remove a blob from the archive.

Deletes a file from the archive after processing is complete. Useful for cleaning up temporary files.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Must contain content_hash of file to delete.

required
Example
pipeline:
  cleanup:
    method: cleanup_archive
Source code in memorious/operations/store.py
@register("cleanup_archive")
def cleanup_archive(context: Context, data: dict[str, Any]) -> None:
    """Remove a blob from the archive.

    Deletes a file from the archive after processing is complete.
    Useful for cleaning up temporary files.

    Args:
        context: The crawler context.
        data: Must contain content_hash of file to delete.

    Example:
        ```yaml
        pipeline:
          cleanup:
            method: cleanup_archive
        ```
    """
    content_hash = data.get("content_hash")
    if content_hash is None:
        context.emit_warning("No content hash in data.")
        return
    file_info = context.archive.lookup_file(content_hash)
    if file_info:
        try:
            context.archive.delete_file(file_info)
        except NotImplementedError:
            context.log.warning("File deletion not supported by storage backend")

Debug

Operations for debugging.

Log the current data dict for inspection.

Prints the data dictionary in a formatted way for debugging. Passes data through to the next stage unchanged.

Parameters:

Name Type Description Default
context Context

The crawler context.

required
data dict[str, Any]

Data to inspect.

required
Example
pipeline:
  debug:
    method: inspect
    handle:
      pass: store
Source code in memorious/operations/debug.py
@register("inspect")
def inspect(context: Context, data: dict[str, Any]) -> None:
    """Log the current data dict for inspection.

    Prints the data dictionary in a formatted way for debugging.
    Passes data through to the next stage unchanged.

    Args:
        context: The crawler context.
        data: Data to inspect.

    Example:
        ```yaml
        pipeline:
          debug:
            method: inspect
            handle:
              pass: store
        ```
    """
    context.log.info("Inspect data", data=pformat(data))
    context.emit(data=data, optional=True)

Drop into an interactive ipdb debugger session.

Pauses execution and opens an interactive Python debugger, allowing inspection of the context and data at runtime.

Parameters:

Name Type Description Default
context Context

The crawler context (available as context and cn).

required
data dict[str, Any]

Current stage data (available as data).

required
Note

Requires ipdb to be installed (pip install ipdb). Only useful during local development, not in production.

Example
pipeline:
  debug:
    method: ipdb
    handle:
      pass: store
Source code in memorious/operations/debug.py
@register("ipdb")
def ipdb(context: Context, data: dict[str, Any]) -> None:
    """Drop into an interactive ipdb debugger session.

    Pauses execution and opens an interactive Python debugger,
    allowing inspection of the context and data at runtime.

    Args:
        context: The crawler context (available as `context` and `cn`).
        data: Current stage data (available as `data`).

    Note:
        Requires ipdb to be installed (`pip install ipdb`).
        Only useful during local development, not in production.

    Example:
        ```yaml
        pipeline:
          debug:
            method: ipdb
            handle:
              pass: store
        ```
    """
    cn = context  # noqa: F841 - available in debugger
    import ipdb

    ipdb.set_trace()

FTP

Source code in memorious/operations/ftp.py
@register("ftp_fetch")
def ftp_fetch(context, data):

    try:
        import requests
        import requests_ftp
    except ImportError as e:
        context.log.error("Please install ftp dependencies: `requests-ftp`")
        raise e

    url = data.get("url")
    context.log.info("FTP fetch", url=url)
    requests_ftp.monkeypatch_session()
    session = requests.Session()
    username = context.get("username", "Anonymous")
    password = context.get("password", "anonymous@ftp")

    resource = urlparse(url).netloc or url
    # a bit weird to have a http rate limit while using ftp
    limit = context.get("http_rate_limit", settings.http_rate_limit)
    limit = limit / 60  # per minute to per second for stricter enforcement
    rate_limit = get_rate_limit(resource, limit=limit, interval=1, unit=1)

    cached = context.get_tag(url)
    if cached is not None:
        context.emit(rule="pass", data=cached)
        return

    context.enforce_rate_limit(rate_limit)
    resp = session.retr(url, auth=(username, password))
    if resp.status_code < 399:
        data.update(
            {
                "status_code": resp.status_code,
                "retrieved_at": datetime.utcnow().isoformat(),
                "content_hash": context.store_data(data=resp.content),
            }
        )
        context.set_tag(url, data)
        context.emit(rule="pass", data=data)
    else:
        context.enforce_rate_limit(rate_limit)
        resp = session.nlst(url, auth=(username, password))
        for child in resp.iter_lines(decode_unicode=True):
            child_data = data.copy()
            child_data["url"] = os.path.join(url, child)
            context.log.info("FTP directory child", url=child_data["url"])
            context.emit(rule="child", data=child_data)

WebDAV

List files in a WebDAV directory.

Source code in memorious/operations/webdav.py
@register("dav_index")
def dav_index(context, data):
    """List files in a WebDAV directory."""
    # This is made to work with ownCloud/nextCloud, but some rumor has
    # it they are "standards compliant" and it should thus work for
    # other DAV servers.
    url = data.get("url")
    context.log.info("Fetching WebDAV path", url=url)
    result = context.http.request("PROPFIND", url)
    for resp in result.xml.findall("./{DAV:}response"):
        href = resp.findtext("./{DAV:}href")
        if href is None:
            continue

        child_url = urljoin(url, href)
        if child_url == url:
            continue
        child = dict(data)
        child["url"] = child_url
        child["foreign_id"] = child_url
        child["file_name"] = _get_url_file_name(href)

        rule = "file"
        if resp.find(".//{DAV:}collection") is not None:
            rule = "folder"
        context.emit(data=child, rule=rule)

DocumentCloud

Source code in memorious/operations/documentcloud.py
@register("documentcloud_query")
def documentcloud_query(context, data):
    host = context.get("host", API_HOST)
    instance = context.get("instance", DEFAULT_INSTANCE)
    query = data.get("query", context.get("query"))
    if isinstance(query, list):
        for q in query:
            data["query"] = q
            context.recurse(data)
        return
    page = data.get("page", 1)

    search_url = urljoin(host, "/api/documents/search")

    context.log.info("Searching DocumentCloud", query=query, page=page)
    res = context.http.get(
        search_url,
        params={"q": query, "per_page": 100, "page": page, "expand": "organization"},
    )

    documents = res.json.get("results", [])

    for document in documents:
        doc = {
            "foreign_id": "%s:%s" % (instance, document.get("id")),
            "url": "{}/documents/{}/{}.pdf".format(
                ASSET_HOST, document.get("id"), document.get("slug")
            ),
            "source_url": "{}/documents/{}-{}".format(
                DOCUMENT_HOST, document.get("id"), document.get("slug")
            ),
            "title": document.get("title"),
            "publisher": document.get("organization", {}).get("name"),
            "file_name": "{}.pdf".format(document.get("slug")),
            "mime_type": "application/pdf",
        }

        # In incremental crawling mode, skip processing this document if it has
        # been already fully processed before.  The key we check for is set in
        # `mark_processed` after a document is fully processed.  So the supplied
        # arguments to `context.make_key` must match.
        if context.incremental:
            key = context.make_key(doc["foreign_id"], document.get("file_hash"))
            if context.check_tag(key):
                context.log.info(
                    "Skipping processing of document", foreign_id=doc["foreign_id"]
                )
                continue

        lang = LANGUAGES.get(document.get("language"))
        if lang is not None:
            doc["languages"] = [lang]

        published = document.get("created_at")
        if published is not None:
            try:
                dt = datetime.strptime(published, "%Y-%m-%dT%H:%M:%S.%fZ")
            except ValueError:
                dt = datetime.strptime(published, "%Y-%m-%dT%H:%M:%SZ")
            doc["published_at"] = dt.isoformat()

        context.emit(data=doc)

    if len(documents):
        context.recurse(data={"page": page + 1, "query": query})

Create a persistent tag to indicate that a document has been fully processed

On subsequent runs, we can check and skip processing this document earlier in the pipeline.

Source code in memorious/operations/documentcloud.py
@register("documentcloud_mark_processed")
def documentcloud_mark_processed(context, data):
    """Create a persistent tag to indicate that a document has been fully processed

    On subsequent runs, we can check and skip processing this document earlier in the
    pipeline.
    """
    key = context.make_key(data["foreign_id"], data["content_hash"])
    context.log.info("Document has been processed", foreign_id=data["foreign_id"])
    context.set_tag(key, "processed")

Aleph

Operations for Aleph integration.

Source code in memorious/operations/aleph.py
@register("aleph_emit")
def aleph_emit(context, data):
    aleph_emit_document(context, data)
Source code in memorious/operations/aleph.py
@register("aleph_emit_document")
def aleph_emit_document(context, data):
    api = get_api(context)
    if api is None:
        return
    collection_id = get_collection_id(context, api)
    content_hash = data.get("content_hash")
    source_url = data.get("source_url", data.get("url"))
    foreign_id = data.get("foreign_id", data.get("request_id", source_url))
    # Fetch document id and metadata from cache
    document = context.get_tag(make_key(collection_id, foreign_id, content_hash))
    if isinstance(document, dict):
        context.log.info("Skip aleph upload", foreign_id=foreign_id)
        data["aleph_id"] = document["id"]
        data["aleph_document"] = document
        data["aleph_collection_id"] = collection_id
        context.emit(data=data, optional=True)
        return

    meta = clean_dict(_create_meta_object(context, data))
    meta.update(_create_document_metadata(context, data))

    label = meta.get("file_name", meta.get("source_url"))
    context.log.info("Upload", label=label)
    with context.open(content_hash) as fh:
        if fh is None:
            return
        file_path = Path(fh.name).resolve()

        for try_number in range(api.retries):
            rate = settings.MEMORIOUS_RATE_LIMIT
            rate_limit = get_rate_limit("aleph", limit=rate)
            rate_limit.comply()
            try:
                res = api.ingest_upload(collection_id, file_path, meta)
                document_id = res.get("id")
                context.log.info("Aleph document ID", document_id=document_id)
                # Save the document id in cache for future use
                meta["id"] = document_id
                context.set_tag(make_key(collection_id, foreign_id, content_hash), meta)
                data["aleph_id"] = document_id
                data["aleph_document"] = meta
                data["aleph_collection_id"] = collection_id
                context.emit(data=data, optional=True)
                return
            except AlephException as exc:
                if try_number > api.retries or not exc.transient:
                    context.emit_warning("Error: %s" % exc)
                    return
                backoff(exc, try_number)
Source code in memorious/operations/aleph.py
@register("aleph_folder")
def aleph_folder(context, data):
    api = get_api(context)
    if api is None:
        return
    collection_id = get_collection_id(context, api)
    foreign_id = data.get("foreign_id")
    if foreign_id is None:
        context.log.warning("No folder foreign ID")
        return

    meta = clean_dict(_create_meta_object(context, data))
    label = meta.get("file_name", meta.get("source_url"))
    context.log.info("Make folder", label=label)
    for try_number in range(api.retries):
        rate = settings.MEMORIOUS_RATE_LIMIT
        rate_limit = get_rate_limit("aleph", limit=rate)
        rate_limit.comply()
        try:
            res = api.ingest_upload(collection_id, metadata=meta, sync=True)
            document_id = res.get("id")
            context.log.info("Aleph folder entity ID", document_id=document_id)
            # Save the document id in cache for future use
            context.set_tag(make_key(collection_id, foreign_id), document_id)
            data["aleph_folder_id"] = document_id
            data["aleph_collection_id"] = collection_id
            context.emit(data=data, optional=True)
            return
        except AlephException as ae:
            if try_number > api.retries or not ae.transient:
                context.emit_warning("Error: %s" % ae)
                return
            backoff(ae, try_number)
Source code in memorious/operations/aleph.py
@register("aleph_emit_entity")
def aleph_emit_entity(context, data):
    api = get_api(context)
    if api is None:
        return
    collection_id = get_collection_id(context, api)
    entity_id = data.get("entity_id", data.get("id"))
    if not entity_id:
        context.emit_warning("Error: Can not create entity. `id` is not defined")
        return
    source_url = data.get("source_url", data.get("url"))
    foreign_id = data.get("foreign_id", data.get("request_id", source_url))
    # Fetch entity from cache
    cached_entity = context.get_tag(make_key(collection_id, foreign_id, entity_id))

    if cached_entity and isinstance(cached_entity, dict):
        context.log.info("Skip entity creation", foreign_id=foreign_id)
        data["aleph_id"] = cached_entity["id"]
        data["aleph_collection_id"] = collection_id
        data["aleph_entity"] = cached_entity
        context.emit(data=data, optional=True)
        return

    for try_number in range(api.retries):
        rate = settings.MEMORIOUS_RATE_LIMIT
        rate_limit = get_rate_limit("aleph", limit=rate)
        rate_limit.comply()
        try:
            res = api.write_entity(
                collection_id,
                {
                    "schema": data.get("schema"),
                    "properties": data.get("properties"),
                },
                entity_id,
            )

            entity = {
                "id": res.get("id"),
                "schema": res.get("schema"),
                "properties": res.get("properties"),
            }
            context.log.info("Aleph entity ID", entity_id=entity["id"])

            # Save the entity in cache for future use
            context.set_tag(make_key(collection_id, foreign_id, entity_id), entity)

            data["aleph_id"] = entity["id"]
            data["aleph_collection_id"] = collection_id
            data["aleph_entity"] = entity
            context.emit(data=data, optional=True)
            return
        except AlephException as exc:
            if try_number > api.retries or not exc.transient:
                context.emit_warning("Error: %s" % exc)
                return
            backoff(exc, try_number)

FTM Store

Operations for FollowTheMoney entity storage.

Store an entity or a list of entities to an ftm store.

Source code in memorious/operations/ftm.py
@register("ftm_store")
@register("balkhash_put")  # Legacy alias
def ftm_store(context, data):
    """Store an entity or a list of entities to an ftm store."""
    # This is a simplistic implementation of a balkhash memorious operation.
    # It is meant to serve the use of OCCRP where we pipe data into postgresql.
    dataset = get_dataset(context)
    bulk = dataset.bulk()
    entities = ensure_list(data.get("entities", data))
    for entity in entities:
        context.log.debug(
            "Store entity", schema=entity.get("schema"), id=entity.get("id")
        )
        bulk.put(entity, entity.pop("fragment", None))
        context.emit(rule="fragment", data=data, optional=True)
    context.emit(data=data, optional=True)
    bulk.flush()

Write each entity from an ftm store to Aleph via the _bulk API.

Source code in memorious/operations/ftm.py
@register("ftm_load_aleph")
def ftm_load_aleph(context, data):
    """Write each entity from an ftm store to Aleph via the _bulk API."""
    api = get_api(context)
    if api is None:
        return
    foreign_id = context.params.get("foreign_id", context.crawler.name)
    collection = api.load_collection_by_foreign_id(foreign_id, {})
    collection_id = collection.get("id")
    unsafe = context.params.get("unsafe", False)
    entities = get_dataset(context)
    api.write_entities(collection_id, entities, unsafe=unsafe)

Helpers

Utility modules for operations.

Pagination

memorious.helpers.pagination

Pagination utilities for web crawlers.

This module provides helper functions for handling pagination in crawlers, including URL manipulation and next-page detection.

get_paginated_url(url, page, param='page')

Apply page number to URL query parameter.

Parameters:

Name Type Description Default
url str

The base URL.

required
page int

Page number to set.

required
param str

Query parameter name for the page number.

'page'

Returns:

Type Description
str

URL with the page parameter set.

Example

get_paginated_url("https://example.com/search", 2) 'https://example.com/search?page=2' get_paginated_url("https://example.com/search?q=test", 3, "p") 'https://example.com/search?q=test&p=3'

Source code in memorious/helpers/pagination.py
def get_paginated_url(url: str, page: int, param: str = "page") -> str:
    """Apply page number to URL query parameter.

    Args:
        url: The base URL.
        page: Page number to set.
        param: Query parameter name for the page number.

    Returns:
        URL with the page parameter set.

    Example:
        >>> get_paginated_url("https://example.com/search", 2)
        'https://example.com/search?page=2'
        >>> get_paginated_url("https://example.com/search?q=test", 3, "p")
        'https://example.com/search?q=test&p=3'
    """
    f = furl(url)
    f.args[param] = page
    return f.url

paginate(context, data, html)

Emit next page if pagination indicates more pages.

Examines pagination configuration and HTML content to determine if there are more pages, and emits the next page data.

Parameters:

Name Type Description Default
context Context

The crawler context with pagination params.

required
data dict[str, Any]

Current data dict (used to get current page).

required
html HtmlElement

HTML element containing pagination info.

required

Example YAML configuration::

pipeline:
  parse:
    method: parse
    params:
      pagination:
        total: './/span[@class="total"]/text()'
        per_page: 20
        param: page
    handle:
      next_page: fetch
      store: store
Source code in memorious/helpers/pagination.py
def paginate(context: Context, data: dict[str, Any], html: HtmlElement) -> None:
    """Emit next page if pagination indicates more pages.

    Examines pagination configuration and HTML content to determine
    if there are more pages, and emits the next page data.

    Args:
        context: The crawler context with pagination params.
        data: Current data dict (used to get current page).
        html: HTML element containing pagination info.

    Example YAML configuration::

        pipeline:
          parse:
            method: parse
            params:
              pagination:
                total: './/span[@class="total"]/text()'
                per_page: 20
                param: page
            handle:
              next_page: fetch
              store: store
    """
    config = context.params.get("pagination")
    if not config:
        return

    config = ensure_dict(config)
    current = data.get("page", 1)
    next_page = calculate_next_page(html, current, config)

    if next_page:
        context.log.info("Next page", page=next_page)
        next_data = {**data, "page": next_page}
        param = config.get("param", "page")
        if "url" in next_data:
            next_data["url"] = get_paginated_url(next_data["url"], next_page, param)
        context.emit(rule="next_page", data=next_data)

Casting

memorious.helpers.casting

Type casting utilities for scraped data.

This module provides functions for automatically casting scraped string values to appropriate Python types (int, float, date, datetime).

cast_value(value, with_date=False, **datekwargs)

Cast a value to its appropriate type.

Attempts to convert strings to int, float, or date as appropriate.

Parameters:

Name Type Description Default
value Any

The value to cast.

required
with_date bool

If True, attempt to parse strings as dates.

False
**datekwargs Any

Additional arguments for date parsing.

{}

Returns:

Type Description
int | float | date | datetime | Any

The cast value (int, float, date, datetime, or original type).

Example

cast_value("42") 42 cast_value("3.14") 3.14 cast_value("2024-01-15", with_date=True) datetime.date(2024, 1, 15)

Source code in memorious/helpers/casting.py
def cast_value(
    value: Any,
    with_date: bool = False,
    **datekwargs: Any,
) -> int | float | date | datetime | Any:
    """Cast a value to its appropriate type.

    Attempts to convert strings to int, float, or date as appropriate.

    Args:
        value: The value to cast.
        with_date: If True, attempt to parse strings as dates.
        **datekwargs: Additional arguments for date parsing.

    Returns:
        The cast value (int, float, date, datetime, or original type).

    Example:
        >>> cast_value("42")
        42
        >>> cast_value("3.14")
        3.14
        >>> cast_value("2024-01-15", with_date=True)
        datetime.date(2024, 1, 15)
    """
    if not isinstance(value, (str, float, int)):
        return value
    if isinstance(value, str):
        value = value.strip()
        if not value:
            return None
    try:
        f = float(value)
        return int(f) if f == int(f) else f
    except (TypeError, ValueError):
        pass
    if with_date:
        try:
            return datetime.fromisoformat(str(value))
        except ValueError:
            result = ensure_date(str(value), **datekwargs)
            return result if result else value
    return value

cast_dict(data, ignore_keys=None, **kwargs)

Cast all values in a dictionary to appropriate types.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary to process.

required
ignore_keys list[str] | None

Keys to skip during casting.

None
**kwargs Any

Additional arguments for date parsing.

{}

Returns:

Type Description
dict[str, Any]

New dictionary with cast values.

Example

cast_dict({"count": "42", "date": "2024-01-15"})

Source code in memorious/helpers/casting.py
def cast_dict(
    data: dict[str, Any],
    ignore_keys: list[str] | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """Cast all values in a dictionary to appropriate types.

    Args:
        data: Dictionary to process.
        ignore_keys: Keys to skip during casting.
        **kwargs: Additional arguments for date parsing.

    Returns:
        New dictionary with cast values.

    Example:
        >>> cast_dict({"count": "42", "date": "2024-01-15"})
        {'count': 42, 'date': datetime.date(2024, 1, 15)}
    """
    ignore = ignore_keys or []
    return {
        k: cast_value(v, with_date=True, **kwargs) if k not in ignore else v
        for k, v in data.items()
    }

ensure_date(value, raise_on_error=False, **parserkwargs)

Parse a value into a date object.

Tries multiple parsing strategies: datetime.date, dateutil.parse, and dateparser.parse.

Parameters:

Name Type Description Default
value str | date | datetime | None

The value to parse (string, date, datetime, or None).

required
raise_on_error bool

If True, raise exception on parse failure.

False
**parserkwargs Any

Additional arguments passed to date parsers.

{}

Returns:

Type Description
date | None

A date object, or None if parsing fails and raise_on_error is False.

Raises:

Type Description
Exception

If parsing fails and raise_on_error is True.

Example

ensure_date("2024-01-15") datetime.date(2024, 1, 15) ensure_date("January 15, 2024") datetime.date(2024, 1, 15)

Source code in memorious/helpers/casting.py
def ensure_date(
    value: str | date | datetime | None,
    raise_on_error: bool = False,
    **parserkwargs: Any,
) -> date | None:
    """Parse a value into a date object.

    Tries multiple parsing strategies: datetime.date, dateutil.parse,
    and dateparser.parse.

    Args:
        value: The value to parse (string, date, datetime, or None).
        raise_on_error: If True, raise exception on parse failure.
        **parserkwargs: Additional arguments passed to date parsers.

    Returns:
        A date object, or None if parsing fails and raise_on_error is False.

    Raises:
        Exception: If parsing fails and raise_on_error is True.

    Example:
        >>> ensure_date("2024-01-15")
        datetime.date(2024, 1, 15)
        >>> ensure_date("January 15, 2024")
        datetime.date(2024, 1, 15)
    """
    if value is None:
        return None
    if isinstance(value, datetime):
        return value.date()
    if isinstance(value, date):
        return value
    value_str = str(value)
    try:
        return dateparse(value_str, **parserkwargs).date()
    except Exception:
        try:
            parsed = dateparse2(value_str, **parserkwargs)
            return parsed.date() if parsed else None
        except Exception as e:
            if raise_on_error:
                raise e
            return None

XPath

memorious.helpers.xpath

XPath extraction utilities for HTML/XML parsing.

This module provides helper functions for extracting values from HTML and XML documents using XPath expressions.

extract_xpath(html, path)

Extract value from HTML/XML element using XPath.

Handles common cases like single-element lists and text extraction.

Parameters:

Name Type Description Default
html HtmlElement

The lxml HTML/XML element to query.

required
path str

XPath expression to evaluate.

required

Returns:

Type Description
Any

The extracted value. If the result is a single-element list,

Any

returns just that element. If the element has a text attribute,

Any

returns the stripped text.

Example

extract_xpath(html, './/title/text()') 'Page Title' extract_xpath(html, './/a/@href') 'https://example.com'

Source code in memorious/helpers/xpath.py
def extract_xpath(html: HtmlElement, path: str) -> Any:
    """Extract value from HTML/XML element using XPath.

    Handles common cases like single-element lists and text extraction.

    Args:
        html: The lxml HTML/XML element to query.
        path: XPath expression to evaluate.

    Returns:
        The extracted value. If the result is a single-element list,
        returns just that element. If the element has a text attribute,
        returns the stripped text.

    Example:
        >>> extract_xpath(html, './/title/text()')
        'Page Title'
        >>> extract_xpath(html, './/a/@href')
        'https://example.com'
    """
    result = html.xpath(path)
    if isinstance(result, list) and len(result) == 1:
        result = result[0]
    if hasattr(result, "text"):
        result = result.text
    if isinstance(result, str):
        return result.strip()
    return result

Template

memorious.helpers.template

Jinja2 templating utilities for URL and string generation.

This module provides functions for rendering Jinja2 templates with data, useful for dynamic URL construction in crawlers.

render_template(template, data)

Render a Jinja2 template string with data.

Parameters:

Name Type Description Default
template str

Jinja2 template string.

required
data dict[str, Any]

Dictionary of values to substitute.

required

Returns:

Type Description
str

The rendered string.

Example

render_template("https://example.com/page/{{ page }}", {"page": 1}) 'https://example.com/page/1'

Source code in memorious/helpers/template.py
def render_template(template: str, data: dict[str, Any]) -> str:
    """Render a Jinja2 template string with data.

    Args:
        template: Jinja2 template string.
        data: Dictionary of values to substitute.

    Returns:
        The rendered string.

    Example:
        >>> render_template("https://example.com/page/{{ page }}", {"page": 1})
        'https://example.com/page/1'
    """
    env = Environment(loader=BaseLoader())
    return env.from_string(template).render(**data)

Forms

memorious.helpers.forms

HTML form extraction utilities.

This module provides helper functions for extracting form data from HTML documents, useful for form submission in crawlers.

extract_form(html, xpath)

Extract form action URL and field values from an HTML form.

Parameters:

Name Type Description Default
html HtmlElement

HTML element containing the form.

required
xpath str

XPath expression to locate the form element.

required

Returns:

Type Description
str | None

Tuple of (action_url, form_data_dict). Returns (None, {}) if

dict[str, Any]

the form is not found.

Example

action, data = extract_form(html, './/form[@id="login"]') action '/login' data

Source code in memorious/helpers/forms.py
def extract_form(html: HtmlElement, xpath: str) -> tuple[str | None, dict[str, Any]]:
    """Extract form action URL and field values from an HTML form.

    Args:
        html: HTML element containing the form.
        xpath: XPath expression to locate the form element.

    Returns:
        Tuple of (action_url, form_data_dict). Returns (None, {}) if
        the form is not found.

    Example:
        >>> action, data = extract_form(html, './/form[@id="login"]')
        >>> action
        '/login'
        >>> data
        {'username': '', 'password': '', 'csrf_token': 'abc123'}
    """
    form = html.find(xpath)
    if form is None:
        return None, {}

    action = form.xpath("@action")
    action_url = action[0] if action else None

    data: dict[str, Any] = {}
    for el in form.findall(".//input"):
        if el.name:
            data[el.name] = el.value
    for el in form.findall(".//select"):
        if el.name:
            data[el.name] = el.value

    return action_url, data

Regex

memorious.helpers.regex

Regex extraction utilities for data parsing.

This module provides helper functions for extracting data from strings using regular expressions.

regex_first(pattern, string)

Extract the first regex match from a string.

Parameters:

Name Type Description Default
pattern str

Regular expression pattern.

required
string str

String to search.

required

Returns:

Type Description
str

The first match, stripped of whitespace.

Raises:

Type Description
RegexError

If no match is found.

Example

regex_first(r"\d+", "Page 42 of 100") '42'

Source code in memorious/helpers/regex.py
def regex_first(pattern: str, string: str) -> str:
    """Extract the first regex match from a string.

    Args:
        pattern: Regular expression pattern.
        string: String to search.

    Returns:
        The first match, stripped of whitespace.

    Raises:
        RegexError: If no match is found.

    Example:
        >>> regex_first(r"\\d+", "Page 42 of 100")
        '42'
    """
    matches = re.findall(pattern, string)
    if matches:
        return str(matches[0]).strip()
    raise RegexError(f"No match for pattern: {pattern}", string)

YAML

memorious.helpers.yaml

YAML loader with !include constructor support.

This module provides a custom YAML loader that supports including external files using the !include directive.

Example
# main.yml
settings:
  database: !include database.yml

# database.yml
host: localhost
port: 5432

IncludeLoader

Bases: SafeLoader

YAML Loader with !include constructor for file inclusion.

Source code in memorious/helpers/yaml.py
class IncludeLoader(yaml.SafeLoader):
    """YAML Loader with !include constructor for file inclusion."""

    def __init__(self, stream: IO) -> None:
        """Initialize the loader with the root directory from the stream."""
        try:
            self._root = Path(stream.name).parent
        except AttributeError:
            self._root = Path.cwd()
        super().__init__(stream)

__init__(stream)

Initialize the loader with the root directory from the stream.

Source code in memorious/helpers/yaml.py
def __init__(self, stream: IO) -> None:
    """Initialize the loader with the root directory from the stream."""
    try:
        self._root = Path(stream.name).parent
    except AttributeError:
        self._root = Path.cwd()
    super().__init__(stream)

load_yaml(path)

Load YAML file with !include support.

Parameters:

Name Type Description Default
path str | Path

Path to the YAML file.

required

Returns:

Type Description
dict[str, Any]

Parsed YAML content as a dictionary.

Example

config = load_yaml("crawler.yml")

Source code in memorious/helpers/yaml.py
def load_yaml(path: str | Path) -> dict[str, Any]:
    """Load YAML file with !include support.

    Args:
        path: Path to the YAML file.

    Returns:
        Parsed YAML content as a dictionary.

    Example:
        >>> config = load_yaml("crawler.yml")
    """
    with open(path, encoding="utf-8") as fh:
        return yaml.load(fh, IncludeLoader)

Registry

memorious.operations.register(name)

Decorator to register an operation.

Raises ValueError if an operation with the same name already exists.

Example

@register("my_operation") def my_operation(context: Context, data: dict) -> None: ...

Source code in memorious/operations/__init__.py
def register(name: str):
    """Decorator to register an operation.

    Raises ValueError if an operation with the same name already exists.

    Example:
        @register("my_operation")
        def my_operation(context: Context, data: dict) -> None:
            ...
    """

    def decorator(func: OperationFunc) -> OperationFunc:
        if name in _REGISTRY:
            raise ValueError(
                f"Operation '{name}' is already registered. "
                f"Use a different name or the module:function syntax."
            )
        _REGISTRY[name] = func
        return func

    return decorator

memorious.operations.resolve_operation(method_name, base_path=None)

Resolve an operation method by name.

Resolution order: 1. Local registry (built-in and decorated operations) 2. Module import (module:function syntax, e.g., "mypackage.ops:my_func") 3. File import (file:function syntax, e.g., "./src/ops.py:my_func")

Parameters:

Name Type Description Default
method_name str

Either a registered name (e.g., "fetch"), a module path (e.g., "mypackage.ops:my_func"), or a file path (e.g., "./src/ops.py:my_func")

required
base_path Path | str | None

Base directory for resolving relative file paths. Typically the directory containing the crawler config.

None

Returns:

Type Description
OperationFunc

The operation function.

Raises:

Type Description
ValueError

If the operation cannot be resolved.

Source code in memorious/operations/__init__.py
@cache
def resolve_operation(
    method_name: str, base_path: Path | str | None = None
) -> OperationFunc:
    """
    Resolve an operation method by name.

    Resolution order:
    1. Local registry (built-in and decorated operations)
    2. Module import (module:function syntax, e.g., "mypackage.ops:my_func")
    3. File import (file:function syntax, e.g., "./src/ops.py:my_func")

    Args:
        method_name: Either a registered name (e.g., "fetch"),
                     a module path (e.g., "mypackage.ops:my_func"),
                     or a file path (e.g., "./src/ops.py:my_func")
        base_path: Base directory for resolving relative file paths.
                   Typically the directory containing the crawler config.

    Returns:
        The operation function.

    Raises:
        ValueError: If the operation cannot be resolved.
    """
    # Check local registry first
    if method_name in _REGISTRY:
        return _REGISTRY[method_name]

    # Import from module or file path
    if ":" in method_name:
        bp = Path(base_path) if base_path else None
        return _load_func(method_name, bp)

    raise ValueError(f"Unknown operation: {method_name}")

memorious.operations.list_operations()

Return a list of all registered operation names.

Source code in memorious/operations/__init__.py
def list_operations() -> list[str]:
    """Return a list of all registered operation names."""
    return sorted(_REGISTRY.keys())