Skip to content

Stores

ftmq extends the statement based store implementation of nomenklatura with more granular querying and aggregation possibilities.

Initialize a store

Get an initialized Store. The backend is inferred by the scheme of the store uri.

Example
from ftmq.store import get_store

# an in-memory store:
get_store("memory://")

# a leveldb store:
get_store("leveldb:///var/lib/data")

# a redis (or kvrocks) store:
get_store("redis://localhost")

# a sqlite store
get_store("sqlite:///data/followthemoney.db")

Parameters:

Name Type Description Default
uri PathLike | None

The store backend uri

'memory:///'
catalog Catalog | None

A ftmq.model.Catalog instance to limit the scope to

None
dataset Dataset | str | None

A ftmq.model.Dataset instance to limit the scope to

None
linker Resolver | str | None

A nomenklatura.Resolver instance with linked / deduped data

None

Returns:

Type Description
Store

The initialized store. This is a cached object.

Source code in ftmq/store/__init__.py
@cache
def get_store(
    uri: PathLike | None = "memory:///",
    catalog: Catalog | None = None,
    dataset: Dataset | str | None = None,
    linker: Resolver | str | None = None,
) -> Store:
    """
    Get an initialized [Store][ftmq.store.base.Store]. The backend is inferred
    by the scheme of the store uri.

    Example:
        ```python
        from ftmq.store import get_store

        # an in-memory store:
        get_store("memory://")

        # a leveldb store:
        get_store("leveldb:///var/lib/data")

        # a redis (or kvrocks) store:
        get_store("redis://localhost")

        # a sqlite store
        get_store("sqlite:///data/followthemoney.db")
        ```

    Args:
        uri: The store backend uri
        catalog: A `ftmq.model.Catalog` instance to limit the scope to
        dataset: A `ftmq.model.Dataset` instance to limit the scope to
        linker: A `nomenklatura.Resolver` instance with linked / deduped data

    Returns:
        The initialized store. This is a cached object.
    """
    if isinstance(dataset, str):
        dataset = Dataset(name=dataset)
    if isinstance(linker, (str, Path)):
        linker = get_resolver(linker)
    uri = str(uri)
    parsed = urlparse(uri)
    if parsed.scheme == "memory":
        return MemoryStore(catalog, dataset, linker=linker)
    if parsed.scheme == "leveldb":
        path = uri.replace("leveldb://", "")
        path = Path(path).absolute()
        try:
            from ftmq.store.level import LevelDBStore

            return LevelDBStore(catalog, dataset, path=path, linker=linker)
        except ImportError:
            raise ImportError("Can not load LevelDBStore. Install `plyvel`")
    if parsed.scheme == "redis":
        try:
            from ftmq.store.redis import RedisStore

            return RedisStore(catalog, dataset, path=path, linker=linker)
        except ImportError:
            raise ImportError("Can not load RedisStore. Install `redis`")
    if parsed.scheme == "clickhouse":
        try:
            from ftm_columnstore import get_store as get_cstore

            return get_cstore(catalog, dataset, linker=linker)
        except ImportError:
            raise ImportError("Can not load ClickhouseStore. Install `ftm-columnstore`")
    if "sql" in parsed.scheme:
        get_metadata.cache_clear()
        return SQLStore(catalog, dataset, uri=uri, linker=linker)
    if "aleph" in parsed.scheme:
        return AlephStore.from_uri(uri, catalog=catalog, dataset=dataset, linker=linker)
    raise NotImplementedError(uri)

Supported backends

  • in memory: get_store("memory://")
  • Redis (or kvrocks): get_store("redis://localhost")
  • LevelDB: get_store("leveldb://data")
  • Sql:
    • sqlite: get_store("sqlite:///data.db")
    • postgresql: get_store("postgresql://user:password@host/db")
    • ...any other supported by sqlalchemy
  • Clickhouse via ftm-clickhouse: get_store("clickhouse://localhost")

Read and query entities

Iterate through all the entities via Store.iterate:

from ftmq.store import get_store

store = get_store("sqlite:///followthemoney.store")
proxies = store.iterate()

Filter entities with a Query object using a store view:

from ftmq import Query

q = Query().where(dataset="my_dataset", schema="Person")
proxies = store.view(q)

Command line

ftmq -i sqlite:///followthemoney.store --dataset=my_dataset --schema=Person

cli reference

Write entities to a store

Use the bulk writer:

proxies = [...]

with store.writer() as bulk:
    for proxy in proxies:
        bulk.add_entity(proxy)

Or the smart_write_proxies shorthand, which uses the same bulk writer under the hood:

from ftmq.io import smart_write_proxies

smart_write_proxies("sqlite:///followthemoney.store", proxies)

Command line

cat entities.ftm.json | ftmq -o sqlite:///followthemoney.store

If the input entities don't have a dataset property, ensure a default dataset with the --store-dataset parameter.

ftmq -i s3://data/entities.ftm.json -o sqlite:///followthemoney.store --store-dataset=my_dataset

cli reference