Skip to content

Stores

ftmq extends the statement based store implementation of nomenklatura with more granular querying and aggregation possibilities.

Initialize a store

Get an initialized Store. The backend is inferred by the scheme of the store uri.

Example
from ftmq.store import get_store

# an in-memory store:
get_store("memory://")

# a leveldb store:
get_store("leveldb:///var/lib/data")

# a redis (or kvrocks) store:
get_store("redis://localhost")

# a sqlite store
get_store("sqlite:///data/followthemoney.db")

Parameters:

Name Type Description Default
uri Uri | None

The store backend uri

DB_URL
dataset Dataset | str | None

A followthemoney.Dataset instance to limit the scope to

None
linker Resolver | None

A nomenklatura.Resolver instance with linked / deduped data

None

Returns:

Type Description
Store

The initialized store. This is a cached object.

Source code in ftmq/store/__init__.py
@cache
def get_store(
    uri: Uri | None = settings.DB_URL,
    dataset: Dataset | str | None = None,
    linker: Resolver | None = None,
) -> Store:
    """
    Get an initialized [Store][ftmq.store.base.Store]. The backend is inferred
    by the scheme of the store uri.

    Example:
        ```python
        from ftmq.store import get_store

        # an in-memory store:
        get_store("memory://")

        # a leveldb store:
        get_store("leveldb:///var/lib/data")

        # a redis (or kvrocks) store:
        get_store("redis://localhost")

        # a sqlite store
        get_store("sqlite:///data/followthemoney.db")
        ```

    Args:
        uri: The store backend uri
        dataset: A `followthemoney.Dataset` instance to limit the scope to
        linker: A `nomenklatura.Resolver` instance with linked / deduped data

    Returns:
        The initialized store. This is a cached object.
    """
    uri = str(uri)
    parsed = urlparse(uri)
    if parsed.scheme == "memory":
        return MemoryStore(dataset, linker=linker)
    if parsed.scheme == "leveldb":
        path = uri.replace("leveldb://", "")
        path = Path(path).absolute()
        try:
            from ftmq.store.level import LevelDBStore

            return LevelDBStore(dataset, path=path, linker=linker)
        except ImportError:
            raise ImportError("Can not load LevelDBStore. Install `plyvel`")
    if parsed.scheme == "redis":
        try:
            from ftmq.store.redis import RedisStore

            return RedisStore(dataset, linker=linker)
        except ImportError:
            raise ImportError("Can not load RedisStore. Install `redis`")
    if "sql" in parsed.scheme:
        try:
            from ftmq.store.sql import SQLStore

            get_metadata.cache_clear()
            return SQLStore(dataset, uri=uri, linker=linker)
        except ImportError:
            raise ImportError("Can not load SqlStore. Install sql dependencies.")
    if "aleph" in parsed.scheme:
        try:
            from ftmq.store.aleph import AlephStore

            return AlephStore.from_uri(uri, dataset=dataset, linker=linker)
        except ImportError:
            raise ImportError("Can not load AlephStore. Install `alephclient`")
    if uri.startswith("lake+"):
        try:
            from ftmq.store.lake import LakeStore

            uri = str(uri)[5:]
            return LakeStore(uri=uri, dataset=dataset, linker=linker)
        except ImportError:
            raise ImportError("Can not load LakeStore. Install `[lake]` dependencies")
    if uri.startswith("fragments+"):
        uri = str(uri)[10:]
        raise NotImplementedError(uri)
    raise NotImplementedError(uri)

Supported backends

  • in memory: get_store("memory://")
  • Redis (or kvrocks): get_store("redis://localhost")
  • LevelDB: get_store("leveldb://data")
  • Sql:
    • sqlite: get_store("sqlite:///data.db")
    • postgresql: get_store("postgresql://user:password@host/db")
    • ...any other supported by sqlalchemy
  • Clickhouse via ftm-clickhouse: get_store("clickhouse://localhost")

Read and query entities

Iterate through all the entities via Store.iterate:

from ftmq.store import get_store

store = get_store("sqlite:///followthemoney.store")
proxies = store.iterate()

Filter entities with a Query object using a store view:

from ftmq import Query

q = Query().where(dataset="my_dataset", schema="Person")
view = store.default_view()
proxies = store.query(q)

Command line

ftmq -i sqlite:///followthemoney.store --dataset=my_dataset --schema=Person

cli reference

Write entities to a store

Use the bulk writer:

proxies = [...]

with store.writer() as bulk:
    for proxy in proxies:
        bulk.add_entity(proxy)

Or the smart_write_proxies shorthand, which uses the same bulk writer under the hood:

from ftmq.io import smart_write_proxies

smart_write_proxies("sqlite:///followthemoney.store", proxies)

Command line

cat entities.ftm.json | ftmq -o sqlite:///followthemoney.store

If the input entities don't have a dataset property, ensure a default dataset with the --store-dataset parameter.

ftmq -i s3://data/entities.ftm.json -o sqlite:///followthemoney.store --store-dataset=my_dataset

cli reference