Skip to content

anystore.io

Generic io helpers

anystore is built on top of fsspec and provides an easy wrapper for reading and writing content from and to arbitrary locations using the io command:

Command-line usage
anystore io -i ./local/foo.txt -o s3://mybucket/other.txt

echo "hello" | anystore io -o sftp://user:password@host:/tmp/world.txt

anystore io -i https://investigativedata.io > index.html
Python usage
from anystore import smart_read, smart_write

data = smart_read("s3://mybucket/data.txt")
smart_write(".local/data", data)

logged_items(items, action, chunk_size=10000, uri=None, item_name=None, **log_kwargs)

Log process of iterating items for io operations.

Example
from anystore.io import logged_items

items = [...]
for item in logged_items(items, "Read", uri="/tmp/foo.csv"):
    yield item

Parameters:

Name Type Description Default
items Iterable[T]

Sequence of any items

required
action str

Action name to log

required
uri Uri | None

string or path-like key uri (only for logging purpose)

None
chunk_size int | None

Log on every chunk_size

10000
item_name str | None

Name of item

None

Yields:

Type Description
T

The input items

Source code in anystore/io.py
def logged_items(
    items: Iterable[T],
    action: str,
    chunk_size: int | None = 10_000,
    uri: Uri | None = None,
    item_name: str | None = None,
    **log_kwargs,
) -> Generator[T, None, None]:
    """
    Log process of iterating items for io operations.

    Example:
        ```python
        from anystore.io import logged_items

        items = [...]
        for item in logged_items(items, "Read", uri="/tmp/foo.csv"):
            yield item
        ```

    Args:
        items: Sequence of any items
        action: Action name to log
        uri: string or path-like key uri (only for logging purpose)
        chunk_size: Log on every chunk_size
        item_name: Name of item

    Yields:
        The input items
    """
    chunk_size = chunk_size or 10_000
    ix = 0
    item_name = item_name or "Item"
    if uri:
        uri = ensure_uri(uri)
    for ix, item in enumerate(items, 1):
        if ix == 1:
            item_name = item_name or item.__class__.__name__.title()
        if ix % chunk_size == 0:
            item_name = item_name or item.__class__.__name__.title()
            log.info(f"{action} `{item_name}` {ix} ...", uri=uri, **log_kwargs)
        yield item
    if ix:
        log.info(f"{action} {ix} `{item_name}s`: Done.", uri=uri, **log_kwargs)

smart_open(uri, mode=DEFAULT_MODE, **kwargs)

IO context similar to pythons built-in open().

Example
from anystore import smart_open

with smart_open("s3://mybucket/foo.csv") as fh:
    return fh.read()

Parameters:

Name Type Description Default
uri Uri

string or path-like key uri to open, e.g. ./local/data.txt or s3://mybucket/foo

required
mode str | None

open mode, default rb for byte reading.

DEFAULT_MODE
**kwargs Any

pass through storage-specific options

{}

Yields:

Type Description
IO

A generic file-handler like context object

Source code in anystore/io.py
@contextlib.contextmanager
def smart_open(
    uri: Uri,
    mode: str | None = DEFAULT_MODE,
    **kwargs: Any,
) -> Generator[IO, None, None]:
    """
    IO context similar to pythons built-in `open()`.

    Example:
        ```python
        from anystore import smart_open

        with smart_open("s3://mybucket/foo.csv") as fh:
            return fh.read()
        ```

    Args:
        uri: string or path-like key uri to open, e.g. `./local/data.txt` or `s3://mybucket/foo`
        mode: open mode, default `rb` for byte reading.
        **kwargs: pass through storage-specific options

    Yields:
        A generic file-handler like context object
    """
    handler = SmartHandler(uri, mode=mode, **kwargs)
    try:
        yield handler.open()
    except FileNotFoundError as e:
        raise DoesNotExist from e
    finally:
        handler.close()

smart_read(uri, mode=DEFAULT_MODE, **kwargs)

Return content for a given file-like key directly.

Parameters:

Name Type Description Default
uri Uri

string or path-like key uri to open, e.g. ./local/data.txt or s3://mybucket/foo

required
mode str | None

open mode, default rb for byte reading.

DEFAULT_MODE
**kwargs Any

pass through storage-specific options

{}

Returns:

Type Description
AnyStr

str or byte content, depending on mode

Source code in anystore/io.py
def smart_read(uri: Uri, mode: str | None = DEFAULT_MODE, **kwargs: Any) -> AnyStr:
    """
    Return content for a given file-like key directly.

    Args:
        uri: string or path-like key uri to open, e.g. `./local/data.txt` or `s3://mybucket/foo`
        mode: open mode, default `rb` for byte reading.
        **kwargs: pass through storage-specific options

    Returns:
        `str` or `byte` content, depending on `mode`
    """
    with smart_open(uri, mode, **kwargs) as fh:
        return fh.read()

smart_stream(uri, mode=DEFAULT_MODE, **kwargs)

Stream content line by line.

Example
import orjson
from anystore import smart_stream

while data := smart_stream("s3://mybucket/data.json"):
    yield orjson.loads(data)

Parameters:

Name Type Description Default
uri Uri

string or path-like key uri to open, e.g. ./local/data.txt or s3://mybucket/foo

required
mode str | None

open mode, default rb for byte reading.

DEFAULT_MODE
**kwargs Any

pass through storage-specific options

{}

Yields:

Type Description
AnyStr

A generator of str or byte content, depending on mode

Source code in anystore/io.py
def smart_stream(
    uri: Uri, mode: str | None = DEFAULT_MODE, **kwargs: Any
) -> Generator[AnyStr, None, None]:
    """
    Stream content line by line.

    Example:
        ```python
        import orjson
        from anystore import smart_stream

        while data := smart_stream("s3://mybucket/data.json"):
            yield orjson.loads(data)
        ```

    Args:
        uri: string or path-like key uri to open, e.g. `./local/data.txt` or `s3://mybucket/foo`
        mode: open mode, default `rb` for byte reading.
        **kwargs: pass through storage-specific options

    Yields:
        A generator of `str` or `byte` content, depending on `mode`
    """
    with smart_open(uri, mode, **kwargs) as fh:
        while line := fh.readline():
            yield line

smart_stream_json(uri, mode=DEFAULT_MODE, **kwargs)

Stream line-based json as python objects.

Example
from anystore import smart_stream_json

for data in smart_stream_json("s3://mybucket/data.json"):
    yield data.get("foo")

Parameters:

Name Type Description Default
uri Uri

string or path-like key uri to open, e.g. ./local/data.txt or s3://mybucket/foo

required
mode str | None

open mode, default rb for byte reading.

DEFAULT_MODE
**kwargs Any

pass through storage-specific options

{}

Yields:

Type Description
SDictGenerator

A generator of dicts loaded via orjson

Source code in anystore/io.py
def smart_stream_json(
    uri: Uri, mode: str | None = DEFAULT_MODE, **kwargs: Any
) -> SDictGenerator:
    """
    Stream line-based json as python objects.

    Example:
        ```python
        from anystore import smart_stream_json

        for data in smart_stream_json("s3://mybucket/data.json"):
            yield data.get("foo")
        ```

    Args:
        uri: string or path-like key uri to open, e.g. `./local/data.txt` or `s3://mybucket/foo`
        mode: open mode, default `rb` for byte reading.
        **kwargs: pass through storage-specific options

    Yields:
        A generator of `dict`s loaded via `orjson`
    """
    for line in smart_stream(uri, mode, **kwargs):
        yield orjson.loads(line)

smart_write(uri, content, mode=DEFAULT_WRITE_MODE, **kwargs)

Write content to a given file-like key directly.

Parameters:

Name Type Description Default
uri Uri

string or path-like key uri to open, e.g. ./local/data.txt or s3://mybucket/foo

required
content bytes | str

str or bytes content to write.

required
mode str | None

open mode, default wb for byte writing.

DEFAULT_WRITE_MODE
**kwargs Any

pass through storage-specific options

{}
Source code in anystore/io.py
def smart_write(
    uri: Uri, content: bytes | str, mode: str | None = DEFAULT_WRITE_MODE, **kwargs: Any
) -> None:
    """
    Write content to a given file-like key directly.

    Args:
        uri: string or path-like key uri to open, e.g. `./local/data.txt` or `s3://mybucket/foo`
        content: `str` or `bytes` content to write.
        mode: open mode, default `wb` for byte writing.
        **kwargs: pass through storage-specific options
    """
    if uri == "-":
        if isinstance(content, str):
            content = content.encode()
    with smart_open(uri, mode, **kwargs) as fh:
        fh.write(content)

smart_write_json(uri, items, mode=DEFAULT_WRITE_MODE, **kwargs)

Write python data to json

Parameters:

Name Type Description Default
uri Uri

string or path-like key uri to open, e.g. ./local/data.txt or s3://mybucket/foo

required
items Iterable[SDict]

Iterable of dictionaries

required
mode str | None

open mode, default wb for byte writing.

DEFAULT_WRITE_MODE
**kwargs Any

pass through storage-specific options

{}
Source code in anystore/io.py
def smart_write_json(
    uri: Uri,
    items: Iterable[SDict],
    mode: str | None = DEFAULT_WRITE_MODE,
    **kwargs: Any,
) -> None:
    """
    Write python data to json

    Args:
        uri: string or path-like key uri to open, e.g. `./local/data.txt` or `s3://mybucket/foo`
        items: Iterable of dictionaries
        mode: open mode, default `wb` for byte writing.
        **kwargs: pass through storage-specific options
    """
    with smart_open(uri, mode, **kwargs) as fh:
        for item in items:
            line = orjson.dumps(item, option=orjson.OPT_APPEND_NEWLINE)
            if "b" not in mode:
                line = line.decode()
            fh.write(line)