Skip to content

anystore.util

clean_dict(data)

Ensure dict return, clean up defaultdicts, drop None values and ensure str keys (for serialization)

Examples:

>>> clean_dict({1: 2})
{"1": 2}
>>> clean_dict({"a": ""})
{}
>>> clean_dict({"a": None})
{}
>>> clean_dict("foo")
{}

Parameters:

Name Type Description Default
data Any

Arbitrary input data

required

Returns:

Type Description
dict[str, Any]

A cleaned dict with string keys (or an empty one)

Source code in anystore/util/data.py
def clean_dict(data: Any) -> dict[str, Any]:
    """
    Ensure dict return, clean up defaultdicts, drop `None` values and ensure
    `str` keys (for serialization)

    Examples:
        >>> clean_dict({1: 2})
        {"1": 2}
        >>> clean_dict({"a": ""})
        {}
        >>> clean_dict({"a": None})
        {}
        >>> clean_dict("foo")
        {}

    Args:
        data: Arbitrary input data

    Returns:
        A cleaned dict with string keys (or an empty one)
    """
    if not is_mapping(data):
        return {}
    return _clean_dict(
        {
            str(k): clean_dict(dict(v)) or None if is_mapping(v) else _clean(v)
            for k, v in data.items()
        }
    )

dict_merge(d1, d2)

Merge the second dict into the first but omit empty values

Source code in anystore/util/data.py
def dict_merge(d1: dict[Any, Any], d2: dict[Any, Any]) -> dict[Any, Any]:
    """Merge the second dict into the first but omit empty values"""
    d1, d2 = clean_dict(d1), clean_dict(d2)
    for key, value in d2.items():
        if not is_empty(value):
            if is_mapping(value):
                value = ensure_dict(value)
                d1[key] = dict_merge(d1.get(key, {}), value)
            elif is_listish(value):
                merged = ensure_list(d1.get(key)) + ensure_list(value)
                seen: list[Any] = []
                for item in merged:
                    if item not in seen:
                        seen.append(item)
                d1[key] = seen
            else:
                d1[key] = value
    return d1

dump_json(obj, clean=False, newline=False)

Dump a python dictionary to json bytes via orjson

Parameters:

Name Type Description Default
obj SDict

The data object (dictionary with string keys)

required
clean bool | None

Apply clean_dict

False
newline bool | None

Add a linebreak

False
Source code in anystore/util/data.py
def dump_json(
    obj: SDict, clean: bool | None = False, newline: bool | None = False
) -> bytes:
    """
    Dump a python dictionary to json bytes via orjson

    Args:
        obj: The data object (dictionary with string keys)
        clean: Apply [clean_dict][anystore.util.data.clean_dict]
        newline: Add a linebreak
    """
    if clean:
        obj = clean_dict(obj)
    if newline:
        return orjson.dumps(obj, option=orjson.OPT_APPEND_NEWLINE)
    return orjson.dumps(obj)

dump_json_model(obj, clean=False, newline=False)

Dump a pydantic obj to json bytes via orjson

Parameters:

Name Type Description Default
obj BaseModel

The pydantic object

required
clean bool | None

Apply clean_dict

False
newline bool | None

Add a linebreak

False
Source code in anystore/util/data.py
def dump_json_model(
    obj: BaseModel, clean: bool | None = False, newline: bool | None = False
) -> bytes:
    """
    Dump a pydantic obj to json bytes via orjson

    Args:
        obj: The pydantic object
        clean: Apply [clean_dict][anystore.util.data.clean_dict]
        newline: Add a linebreak
    """
    data = model_dump(obj, clean)
    return dump_json(data, newline=newline)

dump_yaml(obj, clean=False, newline=False)

Dump a python dictionary to bytes

Parameters:

Name Type Description Default
obj SDict

The data object (dictionary with string keys)

required
clean bool | None

Apply clean_dict

False
newline bool | None

Add a linebreak

False
Source code in anystore/util/data.py
def dump_yaml(obj: SDict, clean: bool | None = False, newline: bool | None = False):
    """
    Dump a python dictionary to bytes

    Args:
        obj: The data object (dictionary with string keys)
        clean: Apply [clean_dict][anystore.util.data.clean_dict]
        newline: Add a linebreak
    """
    if clean:
        obj = clean_dict(obj)
    data = yaml.dump(obj)
    if newline:
        data += "\n"
    return data.encode()

dump_yaml_model(obj, clean=False, newline=False)

Dump a pydantic obj to yaml bytes

Parameters:

Name Type Description Default
obj BaseModel

The pydantic object

required
clean bool | None

Apply clean_dict

False
newline bool | None

Add a linebreak

False
Source code in anystore/util/data.py
def dump_yaml_model(
    obj: BaseModel, clean: bool | None = False, newline: bool | None = False
) -> bytes:
    """
    Dump a pydantic obj to yaml bytes

    Args:
        obj: The pydantic object
        clean: Apply [clean_dict][anystore.util.data.clean_dict]
        newline: Add a linebreak
    """
    data = model_dump(obj, clean)
    return dump_yaml(data, newline=newline)

is_empty(value)

Check if a value is empty from a human point of view

Source code in anystore/util/data.py
def is_empty(value: Any) -> bool:
    """Check if a value is empty from a human point of view"""
    if isinstance(value, (bool, int)):
        return False
    if value == "":
        return False
    return not value

model_dump(obj, clean=False)

Serialize a pydantic object to a dict by alias and json mode

Parameters:

Name Type Description Default
clean bool | None

Apply clean_dict

False
Source code in anystore/util/data.py
def model_dump(obj: BaseModel, clean: bool | None = False) -> SDict:
    """
    Serialize a pydantic object to a dict by alias and json mode

    Args:
        clean: Apply [clean_dict][anystore.util.data.clean_dict]
    """
    data = obj.model_dump(by_alias=True, mode="json")
    if clean:
        data = clean_dict(data)
    return data

pydantic_merge(m1, m2)

Merge the second pydantic object into the first one

Source code in anystore/util/data.py
def pydantic_merge(m1: BM, m2: BM) -> BM:
    """Merge the second pydantic object into the first one"""
    if m1.__class__ != m2.__class__:
        raise ValueError(
            f"Cannot merge: `{m1.__class__.__name__}` with `{m2.__class__.__name__}`"
        )
    return m1.__class__(
        **dict_merge(m1.model_dump(mode="json"), m2.model_dump(mode="json"))
    )

make_checksum(io, algorithm=DEFAULT_HASH_ALGORITHM)

Calculate checksum for bytes input for given algorithm

Example

This can be used for file handlers:

with open("data.pdf") as fh:
    return make_checksum(fh, algorithm="md5")
Note

See make_data_checksum for easier implementation for arbitrary input data.

Parameters:

Name Type Description Default
io BinaryIO

File-like open handler

required
algorithm str

Algorithm from hashlib to use, default: sha1

DEFAULT_HASH_ALGORITHM

Returns:

Type Description
str

Generated checksum

Source code in anystore/util/checksum.py
def make_checksum(io: BinaryIO, algorithm: str = DEFAULT_HASH_ALGORITHM) -> str:
    """
    Calculate checksum for bytes input for given algorithm

    Example:
        This can be used for file handlers:

        ```python
        with open("data.pdf") as fh:
            return make_checksum(fh, algorithm="md5")
        ```

    Note:
        See [`make_data_checksum`][anystore.util.checksum.make_data_checksum] for easier
        implementation for arbitrary input data.

    Args:
        io: File-like open handler
        algorithm: Algorithm from `hashlib` to use, default: sha1

    Returns:
        Generated checksum
    """
    hash_ = getattr(hashlib, algorithm)()
    for chunk in iter(lambda: io.read(CHUNK_SIZE), b""):
        hash_.update(chunk)
    return hash_.hexdigest()

make_data_checksum(data, algorithm=DEFAULT_HASH_ALGORITHM)

Calculate checksum for input data based on given algorithm

Examples:

>>> make_data_checksum({"foo": "bar"})
"8f3536a88e3405de70ca2524cfd962203db9a84a"

Parameters:

Name Type Description Default
data Any

Arbitrary input object

required
algorithm str

Algorithm from hashlib to use, default: sha1

DEFAULT_HASH_ALGORITHM

Returns:

Type Description
str

Generated checksum

Source code in anystore/util/checksum.py
def make_data_checksum(data: Any, algorithm: str = DEFAULT_HASH_ALGORITHM) -> str:
    """
    Calculate checksum for input data based on given algorithm

    Examples:
        >>> make_data_checksum({"foo": "bar"})
        "8f3536a88e3405de70ca2524cfd962203db9a84a"

    Args:
        data: Arbitrary input object
        algorithm: Algorithm from `hashlib` to use, default: sha1

    Returns:
        Generated checksum
    """
    if isinstance(data, bytes):
        return make_checksum(BytesIO(data), algorithm)
    if isinstance(data, str):
        return make_checksum(BytesIO(data.encode()), algorithm)
    data = b"".join(bytes_iter(data))
    return make_checksum(BytesIO(data), algorithm)

make_fast_hash(io)

Make a fast checksum for comparison. Don't use this for real data integrity or cryptographic checks.

Uses imohash: samples the beginning, middle and end of the stream and hashes those samples with MurmurHash3-128.

Source code in anystore/util/checksum.py
def make_fast_hash(io: BinaryIO) -> str:
    """
    Make a fast checksum for comparison. Don't use this for real data integrity
    or cryptographic checks.

    Uses imohash: samples the beginning, middle and end of the stream and
    hashes those samples with MurmurHash3-128.
    """
    from imohash import hashfileobject

    return hashfileobject(io, hexdigest=True)

make_signature_key(*args, algorithm=DEFAULT_HASH_ALGORITHM, **kwargs)

Calculate data checksum for arbitrary input (used for caching function calls)

Examples:

>>> make_signature_key(1, "foo", bar="baz")
"c6b22da6bcf4bf7158ba600594cae404648acd41"

Parameters:

Name Type Description Default
*args Any

Arbitrary input arguments

()
algorithm str

Algorithm from hashlib to use, default: sha1

DEFAULT_HASH_ALGORITHM
**kwargs Any

Arbitrary input keyword arguments

{}

Returns:

Type Description
str

Generated checksum

Source code in anystore/util/checksum.py
def make_signature_key(
    *args: Any, algorithm: str = DEFAULT_HASH_ALGORITHM, **kwargs: Any
) -> str:
    """
    Calculate data checksum for arbitrary input (used for caching function
    calls)

    Examples:
        >>> make_signature_key(1, "foo", bar="baz")
        "c6b22da6bcf4bf7158ba600594cae404648acd41"

    Args:
        *args: Arbitrary input arguments
        algorithm: Algorithm from `hashlib` to use, default: sha1
        **kwargs: Arbitrary input keyword arguments

    Returns:
        Generated checksum
    """
    return make_data_checksum((args, kwargs), algorithm)

make_uri_key(uri, algorithm=DEFAULT_HASH_ALGORITHM)

Make a verbose key usable for caching. It strips the scheme, uses host and path as key parts and creates a checksum for the uri (including fragments, params, etc.). This is useful for invalidating a cache store partially by deleting keys by given host or path prefixes.

Examples:

>>> make_uri_key("https://example.org/foo/bar#fragment?a=b&c")
"example.org/foo/bar/ecdb319854a7b223d72e819949ed37328fe034a0"

Parameters:

Name Type Description Default
uri Uri

Input URI

required
algorithm str

Algorithm from hashlib to use, default: sha1

DEFAULT_HASH_ALGORITHM
Source code in anystore/util/checksum.py
def make_uri_key(uri: Uri, algorithm: str = DEFAULT_HASH_ALGORITHM) -> str:
    """
    Make a verbose key usable for caching. It strips the scheme, uses host and
    path as key parts and creates a checksum for the uri (including fragments,
    params, etc.). This is useful for invalidating a cache store partially by
    deleting keys by given host or path prefixes.

    Examples:
        >>> make_uri_key("https://example.org/foo/bar#fragment?a=b&c")
        "example.org/foo/bar/ecdb319854a7b223d72e819949ed37328fe034a0"

    Args:
        uri: Input URI
        algorithm: Algorithm from `hashlib` to use, default: sha1
    """
    uri = unquote(str(uri))
    parsed = urlparse(uri)
    return join_relpaths(
        parsed.netloc, unquote(parsed.path), make_data_checksum(uri, algorithm)
    )

Took

Shorthand to measure time of a code block

Examples:

from anystore.util import Took

with Took() as t:
    # do something
    log.info(f"Job took:", t.took)
Source code in anystore/util/misc.py
class Took:
    """
    Shorthand to measure time of a code block

    Examples:
        ```python
        from anystore.util import Took

        with Took() as t:
            # do something
            log.info(f"Job took:", t.took)
        ```
    """

    def __init__(self) -> None:
        self.start = datetime.now()

    @property
    def took(self) -> timedelta:
        return datetime.now() - self.start

    def __enter__(self) -> Self:
        return self

    def __exit__(self, *args, **kwargs):
        pass

ensure_uuid(uuid=None)

Ensure uuid or create one

Source code in anystore/util/misc.py
def ensure_uuid(uuid: str | None = None) -> str:
    """Ensure uuid or create one"""
    if uuid:
        return str(uuid)
    return str(uuid7())

get_extension(uri)

Extract file extension from given uri.

Examples:

>>> get_extension("foo/bar.txt")
"txt"
>>> get_extension("foo/bar")
None

Parameters:

Name Type Description Default
uri Uri

Full path-like uri

required

Returns:

Type Description
str | None

Extension or None

Source code in anystore/util/misc.py
def get_extension(uri: Uri) -> str | None:
    """
    Extract file extension from given uri.

    Examples:
        >>> get_extension("foo/bar.txt")
        "txt"
        >>> get_extension("foo/bar")
        None

    Args:
        uri: Full path-like uri

    Returns:
        Extension or `None`
    """
    if isinstance(uri, (BytesIO, StringIO)):
        return None
    _, ext = splitext(str(uri))
    if ext:
        return ext[1:].lower()

guess_mimetype(key)

Guess the mimetype based on a file extension and normalize it via rigour.mime

Source code in anystore/util/misc.py
def guess_mimetype(key: Uri) -> str:
    """
    Guess the mimetype based on a file extension and normalize it via
    `rigour.mime`
    """
    mtype, _ = mimetypes.guess_type(str(key))
    return normalize_mimetype(mtype)

mask_uri(uri)

Replace username and password in a URI with asterisks

Source code in anystore/util/misc.py
def mask_uri(uri: Uri) -> str:
    """
    Replace username and password in a URI with asterisks
    """
    pattern = r"([a-zA-Z][a-zA-Z0-9+.-]*)://([^:]+):([^@]+)@"
    return re.sub(pattern, r"\1://***:***@", str(uri))

rm_rf(uri)

like rm -rf, ignoring errors.

Source code in anystore/util/misc.py
def rm_rf(uri: Uri) -> None:
    """
    like `rm -rf`, ignoring errors.
    """
    uri = ensure_uri(uri)
    if not uri.startswith("file"):
        raise ValueError(f"Uri not local: `{uri}`")
    try:
        p = uri_to_path(uri)
        if p.is_dir():
            shutil.rmtree(str(p), ignore_errors=True)
        else:
            p.unlink()
    except Exception as e:
        log.warn(f"Couldn't delete file or folder: `{e}`", uri=uri)