Skip to content

anystore.io

IOFormat

Bases: StrEnum

For use in typer cli

Source code in anystore/io/write.py
class IOFormat(StrEnum):
    """For use in typer cli"""

    csv = "csv"
    json = "json"

ModelWriter

Bases: Writer

A generic writer for pydantic objects to any out uri, either json or csv

Source code in anystore/io/write.py
class ModelWriter(Writer):
    """
    A generic writer for pydantic objects to any out uri, either json or csv
    """

    def write(self, row: BaseModel) -> None:
        data = row.model_dump(by_alias=True, mode="json")
        return super().write(data)

Writer

A generic writer for python dict objects to any out uri, either json or csv

Source code in anystore/io/write.py
class Writer:
    """
    A generic writer for python dict objects to any out uri, either json or csv
    """

    def __init__(
        self,
        uri: Uri,
        mode: str | None = DEFAULT_WRITE_MODE,
        output_format: Formats | None = "json",
        fieldnames: list[str] | None = None,
        clean: bool | None = False,
        **kwargs,
    ) -> None:
        if output_format not in (FORMAT_JSON, FORMAT_CSV):
            raise ValueError("Invalid output format, only csv or json allowed")
        mode = mode or DEFAULT_WRITE_MODE
        self.mode = mode.replace("b", "") if output_format == "csv" else mode
        self.handler = SmartHandler(uri, mode=self.mode, **kwargs)
        self.fieldnames = fieldnames
        self.output_format = output_format
        self.clean = clean
        self.csv_writer: csv.DictWriter | None = None

    def __enter__(self) -> Self:
        self.io = self.handler.open()
        return self

    def __exit__(self, *args) -> None:
        self.handler.close()

    def write(self, row: SDict) -> None:
        if self.output_format == "csv" and self.csv_writer is None:
            self.csv_writer = csv.DictWriter(self.io, self.fieldnames or row.keys())
            self.csv_writer.writeheader()

        if self.output_format == "json":
            if self.clean:
                row = clean_dict(row)
            line = orjson.dumps(
                row,
                default=_default_serializer,
                option=orjson.OPT_APPEND_NEWLINE | orjson.OPT_NAIVE_UTC,
            )
            if "b" not in self.mode:
                line = line.decode()
            self.io.write(line)
        elif self.csv_writer:
            self.csv_writer.writerow(row)

logged_items(items, action, chunk_size=10000, item_name=None, logger=None, total=None, **log_kwargs)

Log process of iterating items for io operations.

Example
from anystore.io import logged_items

items = [...]
for item in logged_items(items, "Read", uri="/tmp/foo.csv"):
    yield item

Parameters:

Name Type Description Default
items Iterable[T]

Sequence of any items

required
action str

Action name to log

required
chunk_size int | None

Log on every chunk_size

10000
item_name str | None

Name of item

None
logger Logger | BoundLogger | None

Specific logger to use

None

Yields:

Type Description
T

The input items

Source code in anystore/io/logging.py
def logged_items(
    items: Iterable[T],
    action: str,
    chunk_size: int | None = 10_000,
    item_name: str | None = None,
    logger: logging.Logger | BoundLogger | None = None,
    total: int | None = None,
    **log_kwargs,
) -> Generator[T, None, None]:
    """
    Log process of iterating items for io operations.

    Example:
        ```python
        from anystore.io import logged_items

        items = [...]
        for item in logged_items(items, "Read", uri="/tmp/foo.csv"):
            yield item
        ```

    Args:
        items: Sequence of any items
        action: Action name to log
        chunk_size: Log on every chunk_size
        item_name: Name of item
        logger: Specific logger to use

    Yields:
        The input items
    """
    log_ = logger or log
    chunk_size = chunk_size or 10_000
    ix = 0
    item_name = item_name or "Item"
    if total:
        log_.info(f"{action} {total} `{item_name}s` ...", **log_kwargs)
        yield from tqdm(items, total=total, unit=item_name)
        ix = total
    else:
        for ix, item in enumerate(items, 1):
            if ix == 1:
                item_name = item_name or item.__class__.__name__.title()
            if ix % chunk_size == 0:
                item_name = item_name or item.__class__.__name__.title()
                log_.info(f"{action} `{item_name}` {ix} ...", **log_kwargs)
            yield item
    if ix:
        log_.info(f"{action} {ix} `{item_name}s`: Done.", **log_kwargs)

open_virtual(uri, algorithm=None, **kwargs)

Wrapper for UriResource.local_open

Parameters:

Name Type Description Default
uri 'TUri'

string or path-like key uri to open

required
algorithm str | None

Checksum algorithm from hashlib (default: "sha1")

None
**kwargs

pass through storage-specific options

{}
Source code in anystore/io/read.py
def open_virtual(
    uri: "TUri", algorithm: str | None = None, **kwargs
) -> ContextManager[VirtualIO]:
    """Wrapper for [UriResource.local_open][anystore.store.resource.UriResource.local_open]

    Args:
        uri: string or path-like key uri to open
        algorithm: Checksum algorithm from `hashlib` (default: "sha1")
        **kwargs: pass through storage-specific options
    """
    return UriResource(uri, **kwargs).local_open(algorithm=algorithm)

smart_open(uri, mode=DEFAULT_MODE, **kwargs)

IO context similar to pythons built-in open().

Example
from anystore import smart_open

with smart_open("s3://mybucket/foo.csv") as fh:
    return fh.read()

Parameters:

Name Type Description Default
uri Uri

string or path-like key uri to open, e.g. ./local/data.txt or s3://mybucket/foo

required
mode str | None

open mode, default rb for byte reading.

DEFAULT_MODE
**kwargs Any

pass through storage-specific options

{}

Yields:

Type Description
IO[AnyStr]

A generic file-handler like context object

Source code in anystore/io/handler.py
@contextlib.contextmanager
def smart_open(
    uri: Uri,
    mode: str | None = DEFAULT_MODE,
    **kwargs: Any,
) -> Generator[IO[AnyStr], None, None]:
    """
    IO context similar to pythons built-in `open()`.

    Example:
        ```python
        from anystore import smart_open

        with smart_open("s3://mybucket/foo.csv") as fh:
            return fh.read()
        ```

    Args:
        uri: string or path-like key uri to open, e.g. `./local/data.txt` or
            `s3://mybucket/foo`
        mode: open mode, default `rb` for byte reading.
        **kwargs: pass through storage-specific options

    Yields:
        A generic file-handler like context object
    """
    handler = SmartHandler(uri, mode=mode, **kwargs)
    try:
        yield handler.open()
    except FileNotFoundError as e:
        raise DoesNotExist(str(e))
    finally:
        handler.close()

smart_read(uri, mode=DEFAULT_MODE, **kwargs)

Return content for a given file-like key directly.

Parameters:

Name Type Description Default
uri Uri

string or path-like key uri to open, e.g. ./local/data.txt or s3://mybucket/foo

required
mode str | None

open mode, default rb for byte reading.

DEFAULT_MODE
**kwargs Any

pass through storage-specific options

{}

Returns:

Type Description
AnyStr

str or byte content, depending on mode

Source code in anystore/io/read.py
def smart_read(uri: Uri, mode: str | None = DEFAULT_MODE, **kwargs: Any) -> AnyStr:
    """
    Return content for a given file-like key directly.

    Args:
        uri: string or path-like key uri to open, e.g. `./local/data.txt` or
            `s3://mybucket/foo`
        mode: open mode, default `rb` for byte reading.
        **kwargs: pass through storage-specific options

    Returns:
        `str` or `byte` content, depending on `mode`
    """
    with smart_open(uri, mode, **kwargs) as fh:
        return fh.read()

smart_stream(uri, mode=DEFAULT_MODE, **kwargs)

Stream content line by line.

Example
import orjson
from anystore import smart_stream

while data := smart_stream("s3://mybucket/data.json"):
    yield orjson.loads(data)

Parameters:

Name Type Description Default
uri Uri

string or path-like key uri to open, e.g. ./local/data.txt or s3://mybucket/foo

required
mode str | None

open mode, default rb for byte reading.

DEFAULT_MODE
**kwargs Any

pass through storage-specific options

{}

Yields:

Type Description
AnyStr

A generator of str or byte content, depending on mode

Source code in anystore/io/read.py
def smart_stream(
    uri: Uri, mode: str | None = DEFAULT_MODE, **kwargs: Any
) -> Generator[AnyStr, None, None]:
    """
    Stream content line by line.

    Example:
        ```python
        import orjson
        from anystore import smart_stream

        while data := smart_stream("s3://mybucket/data.json"):
            yield orjson.loads(data)
        ```

    Args:
        uri: string or path-like key uri to open, e.g. `./local/data.txt` or
            `s3://mybucket/foo`
        mode: open mode, default `rb` for byte reading.
        **kwargs: pass through storage-specific options

    Yields:
        A generator of `str` or `byte` content, depending on `mode`
    """
    with smart_open(uri, mode, **kwargs) as fh:
        yield from iter_lines(fh)

smart_stream_csv(uri, **kwargs)

Stream csv as python objects.

Example
from anystore import smart_stream_csv

for data in smart_stream_csv("s3://mybucket/data.csv"):
    yield data.get("foo")

Parameters:

Name Type Description Default
uri Uri

string or path-like key uri to open, e.g. ./local/data.txt or s3://mybucket/foo

required
**kwargs Any

pass through storage-specific options

{}

Yields:

Type Description
SDictGenerator

A generator of dicts loaded via csv.DictReader

Source code in anystore/io/read.py
def smart_stream_csv(uri: Uri, **kwargs: Any) -> SDictGenerator:
    """
    Stream csv as python objects.

    Example:
        ```python
        from anystore import smart_stream_csv

        for data in smart_stream_csv("s3://mybucket/data.csv"):
            yield data.get("foo")
        ```

    Args:
        uri: string or path-like key uri to open, e.g. `./local/data.txt` or
            `s3://mybucket/foo`
        **kwargs: pass through storage-specific options

    Yields:
        A generator of `dict`s loaded via `csv.DictReader`
    """
    kwargs["mode"] = "r"
    with smart_open(uri, **kwargs) as f:
        yield from csv.DictReader(f)

smart_stream_csv_models(uri, model, **kwargs)

Stream csv as pydantic objects

Source code in anystore/io/read.py
def smart_stream_csv_models(uri: Uri, model: Type[M], **kwargs: Any) -> MGenerator:
    """
    Stream csv as pydantic objects
    """
    for row in logged_items(
        smart_stream_csv(uri, **kwargs),
        "Read",
        uri=uri,
        item_name=model.__name__,
    ):
        yield model(**row)

smart_stream_data(uri, input_format, **kwargs)

Stream data objects loaded as dict from json or csv sources

Parameters:

Name Type Description Default
uri Uri

string or path-like key uri to open, e.g. ./local/data.txt or s3://mybucket/foo

required
input_format str

csv or json

required
**kwargs Any

pass through storage-specific options

{}

Yields:

Type Description
SDictGenerator

A generator of dicts loaded via orjson

Source code in anystore/io/read.py
def smart_stream_data(uri: Uri, input_format: str, **kwargs: Any) -> SDictGenerator:
    """
    Stream data objects loaded as dict from json or csv sources

    Args:
        uri: string or path-like key uri to open, e.g. `./local/data.txt` or
            `s3://mybucket/foo`
        input_format: csv or json
        **kwargs: pass through storage-specific options

    Yields:
        A generator of `dict`s loaded via `orjson`
    """
    if input_format == "csv":
        yield from smart_stream_csv(uri, **kwargs)
    else:
        yield from smart_stream_json(uri, **kwargs)

smart_stream_json(uri, mode=DEFAULT_MODE, **kwargs)

Stream line-based json as python objects.

Example
from anystore import smart_stream_json

for data in smart_stream_json("s3://mybucket/data.json"):
    yield data.get("foo")

Parameters:

Name Type Description Default
uri Uri

string or path-like key uri to open, e.g. ./local/data.txt or s3://mybucket/foo

required
mode str | None

open mode, default rb for byte reading.

DEFAULT_MODE
**kwargs Any

pass through storage-specific options

{}

Yields:

Type Description
SDictGenerator

A generator of dicts loaded via orjson

Source code in anystore/io/read.py
def smart_stream_json(
    uri: Uri, mode: str | None = DEFAULT_MODE, **kwargs: Any
) -> SDictGenerator:
    """
    Stream line-based json as python objects.

    Example:
        ```python
        from anystore import smart_stream_json

        for data in smart_stream_json("s3://mybucket/data.json"):
            yield data.get("foo")
        ```

    Args:
        uri: string or path-like key uri to open, e.g. `./local/data.txt` or
            `s3://mybucket/foo`
        mode: open mode, default `rb` for byte reading.
        **kwargs: pass through storage-specific options

    Yields:
        A generator of `dict`s loaded via `orjson`
    """
    for line in smart_stream(uri, mode, **kwargs):
        yield orjson.loads(line)

smart_stream_json_models(uri, model, **kwargs)

Stream json as pydantic objects

Source code in anystore/io/read.py
def smart_stream_json_models(uri: Uri, model: Type[M], **kwargs: Any) -> MGenerator:
    """
    Stream json as pydantic objects
    """
    for row in logged_items(
        smart_stream_json(uri, **kwargs),
        "Read",
        uri=uri,
        item_name=model.__name__,
    ):
        yield model(**row)

smart_stream_models(uri, model, input_format, **kwargs)

Stream json as pydantic objects

Source code in anystore/io/read.py
def smart_stream_models(
    uri: Uri, model: Type[M], input_format: str, **kwargs: Any
) -> MGenerator:
    """
    Stream json as pydantic objects
    """
    if input_format == "csv":
        yield from smart_stream_csv_models(uri, model, **kwargs)
    elif input_format == "json":
        yield from smart_stream_json_models(uri, model, **kwargs)
    else:
        raise ValueError("Invalid format, only csv or json allowed")

smart_write(uri, content, mode=DEFAULT_WRITE_MODE, **kwargs)

Write content to a given file-like key directly.

Parameters:

Name Type Description Default
uri Uri

string or path-like key uri to open, e.g. ./local/data.txt or s3://mybucket/foo

required
content bytes | str

str or bytes content to write.

required
mode str | None

open mode, default wb for byte writing.

DEFAULT_WRITE_MODE
**kwargs Any

pass through storage-specific options

{}
Source code in anystore/io/write.py
def smart_write(
    uri: Uri, content: bytes | str, mode: str | None = DEFAULT_WRITE_MODE, **kwargs: Any
) -> None:
    """
    Write content to a given file-like key directly.

    Args:
        uri: string or path-like key uri to open, e.g. `./local/data.txt` or
            `s3://mybucket/foo`
        content: `str` or `bytes` content to write.
        mode: open mode, default `wb` for byte writing.
        **kwargs: pass through storage-specific options
    """
    if uri == "-":
        if isinstance(content, str):
            content = content.encode()
    with smart_open(uri, mode, **kwargs) as fh:
        fh.write(content)

smart_write_csv(uri, items, mode=DEFAULT_WRITE_MODE, **kwargs)

Write python data to csv

Parameters:

Name Type Description Default
uri Uri

string or path-like key uri to open, e.g. ./local/data.txt or s3://mybucket/foo

required
items Iterable[SDict]

Iterable of dictionaries

required
mode str | None

open mode, default wb for byte writing.

DEFAULT_WRITE_MODE
**kwargs Any

pass through storage-specific options

{}
Source code in anystore/io/write.py
def smart_write_csv(
    uri: Uri,
    items: Iterable[SDict],
    mode: str | None = DEFAULT_WRITE_MODE,
    **kwargs: Any,
) -> None:
    """
    Write python data to csv

    Args:
        uri: string or path-like key uri to open, e.g. `./local/data.txt` or
            `s3://mybucket/foo`
        items: Iterable of dictionaries
        mode: open mode, default `wb` for byte writing.
        **kwargs: pass through storage-specific options
    """
    with Writer(uri, mode, output_format="csv", **kwargs) as writer:
        for item in items:
            writer.write(item)

smart_write_data(uri, items, mode=DEFAULT_WRITE_MODE, output_format='json', **kwargs)

Write python data to json or csv

Parameters:

Name Type Description Default
uri Uri

string or path-like key uri to open, e.g. ./local/data.txt or s3://mybucket/foo

required
items Iterable[SDict]

Iterable of dictionaries

required
mode str | None

open mode, default wb for byte writing.

DEFAULT_WRITE_MODE
output_format Formats | None

csv or json (default: json)

'json'
**kwargs Any

pass through storage-specific options

{}
Source code in anystore/io/write.py
def smart_write_data(
    uri: Uri,
    items: Iterable[SDict],
    mode: str | None = DEFAULT_WRITE_MODE,
    output_format: Formats | None = "json",
    **kwargs: Any,
) -> None:
    """
    Write python data to json or csv

    Args:
        uri: string or path-like key uri to open, e.g. `./local/data.txt` or
            `s3://mybucket/foo`
        items: Iterable of dictionaries
        mode: open mode, default `wb` for byte writing.
        output_format: csv or json (default: json)
        **kwargs: pass through storage-specific options
    """
    with Writer(uri, mode, output_format=output_format, **kwargs) as writer:
        for item in items:
            writer.write(item)

smart_write_json(uri, items, mode=DEFAULT_WRITE_MODE, **kwargs)

Write python data to json

Parameters:

Name Type Description Default
uri Uri

string or path-like key uri to open, e.g. ./local/data.txt or s3://mybucket/foo

required
items Iterable[SDict]

Iterable of dictionaries

required
mode str | None

open mode, default wb for byte writing.

DEFAULT_WRITE_MODE
**kwargs Any

pass through storage-specific options

{}
Source code in anystore/io/write.py
def smart_write_json(
    uri: Uri,
    items: Iterable[SDict],
    mode: str | None = DEFAULT_WRITE_MODE,
    **kwargs: Any,
) -> None:
    """
    Write python data to json

    Args:
        uri: string or path-like key uri to open, e.g. `./local/data.txt` or
            `s3://mybucket/foo`
        items: Iterable of dictionaries
        mode: open mode, default `wb` for byte writing.
        **kwargs: pass through storage-specific options
    """
    with Writer(uri, mode, output_format="json", **kwargs) as writer:
        for item in items:
            writer.write(item)

smart_write_model(uri, obj, mode=DEFAULT_WRITE_MODE, output_format='json', clean=False, **kwargs)

Write a single pydantic object to the target

Parameters:

Name Type Description Default
uri Uri

string or path-like key uri to open, e.g. ./local/data.txt or s3://mybucket/foo

required
obj BaseModel

Pydantic object

required
mode str | None

open mode, default wb for byte writing.

DEFAULT_WRITE_MODE
clean bool | None

Apply clean_dict

False
**kwargs Any

pass through storage-specific options

{}
Source code in anystore/io/write.py
def smart_write_model(
    uri: Uri,
    obj: BaseModel,
    mode: str | None = DEFAULT_WRITE_MODE,
    output_format: Formats | None = "json",
    clean: bool | None = False,
    **kwargs: Any,
) -> None:
    """
    Write a single pydantic object to the target

    Args:
        uri: string or path-like key uri to open, e.g. `./local/data.txt` or
            `s3://mybucket/foo`
        obj: Pydantic object
        mode: open mode, default `wb` for byte writing.
        clean: Apply [clean_dict][anystore.util.data.clean_dict]
        **kwargs: pass through storage-specific options
    """
    with ModelWriter(uri, mode, output_format, clean=clean, **kwargs) as writer:
        writer.write(obj)

smart_write_models(uri, objects, mode=DEFAULT_WRITE_MODE, output_format='json', clean=False, **kwargs)

Write pydantic objects to json lines or csv

Parameters:

Name Type Description Default
uri Uri

string or path-like key uri to open, e.g. ./local/data.txt or s3://mybucket/foo

required
objects Iterable[BaseModel]

Iterable of pydantic objects

required
mode str | None

open mode, default wb for byte writing.

DEFAULT_WRITE_MODE
clean bool | None

Apply clean_dict

False
**kwargs Any

pass through storage-specific options

{}
Source code in anystore/io/write.py
def smart_write_models(
    uri: Uri,
    objects: Iterable[BaseModel],
    mode: str | None = DEFAULT_WRITE_MODE,
    output_format: Formats | None = "json",
    clean: bool | None = False,
    **kwargs: Any,
) -> None:
    """
    Write pydantic objects to json lines or csv

    Args:
        uri: string or path-like key uri to open, e.g. `./local/data.txt` or
            `s3://mybucket/foo`
        objects: Iterable of pydantic objects
        mode: open mode, default `wb` for byte writing.
        clean: Apply [clean_dict][anystore.util.data.clean_dict]
        **kwargs: pass through storage-specific options
    """
    with ModelWriter(uri, mode, output_format, clean=clean, **kwargs) as writer:
        for obj in objects:
            writer.write(obj)

stream_bytes(key, source, target, **kwargs)

Stream binary content for key from source to target store.

Source code in anystore/logic/io.py
def stream_bytes(key: str, source: "Store", target: "Store", **kwargs: Any) -> None:
    """Stream binary content for *key* from *source* to *target* store."""
    with source.open(key, "rb", **kwargs) as i:
        with target.open(key, "wb") as o:
            stream(i, o)