Skip to content

anystore.store

Top-level store entrypoint

Store

Bases: StoreModel, Generic[V, Raise]

Source code in anystore/store/base.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
class Store(StoreModel, Generic[V, Raise]):
    @cached_property
    def _fs(self) -> fsspec.AbstractFileSystem:
        return fsspec.url_to_fs(self.uri, **self.ensure_kwargs())[0]

    @cached_property
    def _keys(self) -> Keys:
        return Keys(self.uri)

    # Explicit raise_on_nonexist=True always returns V
    @overload
    def get(
        self,
        key: Uri,
        raise_on_nonexist: Literal[True],
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> V:
        pass

    # Explicit raise_on_nonexist=False always returns V | None
    @overload
    def get(
        self,
        key: Uri,
        raise_on_nonexist: Literal[False],
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> V | None:
        pass

    # Store configured with raise_on_nonexist=True, param is None -> returns V
    @overload
    def get(
        self: "Store[V, Literal[True]]",
        key: Uri,
        raise_on_nonexist: None = None,
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> V:
        pass

    # Default case (store configured with False or unknown) -> returns V | None
    @overload
    def get(
        self,
        key: Uri,
        raise_on_nonexist: None = None,
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> V | None:
        pass

    def get(
        self,
        key: Uri,
        raise_on_nonexist: bool | None = None,
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> V | None:
        """
        Get a value from the store for the given key

        Args:
            key: Key relative to store base uri
            raise_on_nonexist: Raise `DoesNotExist` if key doesn't exist or stay
                silent, overrides store settings
            serialization_mode: Serialize result ("auto", "raw", "pickle",
                "json"), overrides store settings
            deserialization_func: Specific function to use (ignores
                `serialization_mode`), overrides store settings
            model: Pydantic serialization model (ignores `serialization_mode`
                and `deserialization_func`), overrides store settings

        Returns:
            The (optionally serialized) value for the key
        """
        serialization_mode = serialization_mode or self.serialization_mode
        deserialization_func = deserialization_func or self.deserialization_func
        model = model or self.model
        if raise_on_nonexist is None:
            raise_on_nonexist = self.raise_on_nonexist
        kwargs = self.ensure_kwargs(**kwargs)
        kwargs.pop("mode", None)
        key = self._keys.to_fs_key(key)
        self._check_ttl(key, raise_on_nonexist)
        try:
            return from_store(
                self._fs.cat_file(key, **kwargs),
                serialization_mode,
                deserialization_func=deserialization_func,
                model=model,
            )
        except FileNotFoundError:  # fsspec
            if raise_on_nonexist:
                raise DoesNotExist(f"Key does not exist: `{key}`")
            return None

    # Explicit raise_on_nonexist=True always returns V
    @overload
    def pop(
        self,
        key: Uri,
        raise_on_nonexist: Literal[True],
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> V:
        pass

    # Explicit raise_on_nonexist=False always returns V | None
    @overload
    def pop(
        self,
        key: Uri,
        raise_on_nonexist: Literal[False],
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> V | None:
        pass

    # Store configured with raise_on_nonexist=True, param is None -> returns V
    @overload
    def pop(
        self: "Store[V, Literal[True]]",
        key: Uri,
        raise_on_nonexist: None = None,
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> V:
        pass

    # Default case (store configured with False or unknown) -> returns V | None
    @overload
    def pop(
        self,
        key: Uri,
        raise_on_nonexist: None = None,
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> V | None:
        pass

    def pop(
        self,
        key: Uri,
        raise_on_nonexist: bool | None = None,
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> V | None:
        """
        Retrieve the value for the given key and remove it from the store.

        Args:
            key: Key relative to store base uri
            raise_on_nonexist: Raise `DoesNotExist` if key doesn't exist or stay
                silent, overrides store settings
            serialization_mode: Serialize result ("auto", "raw", "pickle",
                "json"), overrides store settings
            deserialization_func: Specific function to use (ignores
                `serialization_mode`), overrides store settings
            model: Pydantic serialization model (ignores `serialization_mode`
                and `deserialization_func`), overrides store settings
            **kwargs: Any valid arguments for the stores `get` function

        Returns:
            The (optionally serialized) value for the key
        """
        value = self.get(
            key,
            raise_on_nonexist=raise_on_nonexist,
            serialization_mode=serialization_mode,
            deserialization_func=deserialization_func,
            model=model,
            **kwargs,
        )
        self.delete(key)
        return value

    def delete(self, key: Uri, ignore_errors: bool = False) -> None:
        """
        Delete the content at the given key.

        Args:
            key: Key relative to store base uri
            ignore_errors: Ignore exceptions if deletion fails
        """
        key = self._keys.to_fs_key(key)
        try:
            self._fs.rm_file(key)
        except Exception as e:
            if not ignore_errors:
                raise e

    # Explicit raise_on_nonexist=True always returns Generator
    @overload
    def stream(
        self,
        key: Uri,
        raise_on_nonexist: Literal[True],
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> Generator[V, None, None]:
        pass

    # Explicit raise_on_nonexist=False always returns Generator | None
    @overload
    def stream(
        self,
        key: Uri,
        raise_on_nonexist: Literal[False],
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> Generator[V, None, None] | None:
        pass

    # Store configured with raise_on_nonexist=True, param is None -> returns Generator
    @overload
    def stream(
        self: "Store[V, Literal[True]]",
        key: Uri,
        raise_on_nonexist: None = None,
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> Generator[V, None, None]:
        pass

    # Default case (store configured with False or unknown) -> returns Generator | None
    @overload
    def stream(
        self,
        key: Uri,
        raise_on_nonexist: None = None,
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> Generator[V, None, None] | None:
        pass

    def stream(
        self,
        key: Uri,
        raise_on_nonexist: bool | None = None,
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> Generator[V, None, None] | None:
        """
        Stream a value line by line from the store for the given key

        Args:
            key: Key relative to store base uri
            raise_on_nonexist: Raise `DoesNotExist` if key doesn't exist or stay
                silent, overrides store settings
            serialization_mode: Serialize result ("auto", "raw", "pickle",
                "json"), overrides store settings
            deserialization_func: Specific function to use (ignores
                `serialization_mode`), overrides store settings
            model: Pydantic serialization model (ignores `serialization_mode`
                and `deserialization_func`), overrides store settings

        Yields:
            The (optionally serialized) values line by line

        Raises:
            anystore.exceptions.DoesNotExists: If key doesn't exist and
                raise_on_nonexist=True
        """
        model = model or self.model
        extra_kwargs = {
            "serialization_mode": serialization_mode or self.serialization_mode,
            "deserialization_func": deserialization_func or self.deserialization_func,
            "model": model,
        }
        try:
            with self.open(key) as i:
                for line in iter_lines(i):
                    yield from_store(line, **extra_kwargs)
        except FileNotFoundError:
            if raise_on_nonexist is True or self.raise_on_nonexist:
                raise DoesNotExist(f"Key does not exist: `{key}`")
            return None

    def put(
        self,
        key: Uri,
        value: V,
        serialization_mode: Mode | None = None,
        serialization_func: Callable | None = None,
        model: Model | None = None,
        ttl: int | None = None,
        **kwargs,
    ):
        """
        Store a value at the given key

        Args:
            key: Key relative to store base uri
            value: The content
            serialization_mode: Serialize value prior to storing ("auto", "raw",
                "pickle", "json"), overrides store settings
            serialization_func: Specific function to use (ignores
                `serialization_mode`), overrides store settings
            model: Pydantic serialization model (ignores `serialization_mode`
                and `deserialization_func`), overrides store settings
            ttl: Time to live (in seconds) for that key if the backend supports
                it (e.g. redis, sql)
        """
        if value is None and not self.store_none_values:
            return
        serialization_mode = serialization_mode or self.serialization_mode
        serialization_func = serialization_func or self.serialization_func
        model = model or self.model
        kwargs = self.ensure_kwargs(**kwargs)
        ttl = ttl or self.default_ttl or None
        key = self._keys.to_fs_key(key)
        self.ensure_parent(key)
        with self._fs.open(key, "wb", ttl=ttl) as o:
            o.write(
                to_store(
                    value,
                    serialization_mode,
                    serialization_func=serialization_func,
                    model=model,
                )
            )

    def _check_ttl(self, fs_key: str, raise_on_nonexist: bool | None = True) -> bool:
        """Check if key is expired by TTL; delete and return False if so."""
        if not self.default_ttl:
            return True
        try:
            info = Info(**self._fs.info(fs_key))
            if info.created_at is None:
                return True
            now = datetime.now(timezone.utc)
            if (now - info.created_at).total_seconds() > self.default_ttl:
                self._fs.rm_file(fs_key)
                return False
            return True
        except FileNotFoundError:  # fsspec
            if raise_on_nonexist:
                raise DoesNotExist(
                    f"Key does not exist: `{self._keys.to_fs_key(fs_key)}`"
                )
            return False

    def exists(self, key: Uri) -> bool:
        """Check if the given `key` exists"""
        key = self._keys.to_fs_key(key)
        if not self._check_ttl(key, raise_on_nonexist=False):
            return False
        return self._fs.exists(key)

    def info(self, key: Uri) -> Stats:
        """
        Get metadata for the given `key`.

        Returns:
            Key metadata
        """
        fs_key = self._keys.to_fs_key(key)
        info = self._fs.info(fs_key)
        name = Path(info.get("name", key)).name
        return Stats(
            **{
                **info,
                "name": name,
                "store": str(self.uri),
                "key": str(key),
            }
        )

    def ensure_kwargs(self, **kwargs) -> dict[str, Any]:
        config = clean_dict(self.backend_config)
        return {**config, **clean_dict(kwargs)}

    def iterate_keys(
        self,
        prefix: str | None = None,
        exclude_prefix: str | None = None,
        glob: str | None = None,
    ) -> Generator[str, None, None]:
        """
        Iterate through all the keys in the store based on given criteria.
        Criteria can be combined (e.g. include but exclude a subset).

        Example:
            ```python
            for key in store.iterate_keys(prefix="dataset1", glob="*.pdf"):
                data = store.get(key, mode="raw")
                parse(data)
            ```

        Args:
            prefix: Include only keys with the given prefix (e.g. "foo/bar")
            exclude_prefix: Exclude keys with this prefix
            glob: Path-style glob pattern for keys to filter (e.g. "foo/**/*.json")

        Returns:
            The matching keys as a generator of strings
        """
        if prefix:
            base = self._keys.to_fs_key(prefix)
        else:
            base = self._keys.key_prefix

        if hasattr(self._fs, "iter_find"):
            keys = self._fs.iter_find(base, glob=glob)
        elif glob:
            keys = self._fs.glob(f"{base}/{glob}")
        else:
            try:
                keys = self._fs.find(base)
            except FileNotFoundError:
                return
        for key in keys:
            rel = self._keys.from_fs_key(key)
            if exclude_prefix and rel.startswith(exclude_prefix):
                continue
            yield rel

    def iterate_values(
        self,
        prefix: str | None = None,
        exclude_prefix: str | None = None,
        glob: str | None = None,
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
    ) -> Generator[V, None, None]:
        """
        Iterate through all the values in the store based on given criteria.
        Criteria can be combined (e.g. include but exclude a subset).

        Example:
            ```python
            yield from store.iterate_values(prefix="dataset1", glob="*.pdf", model=MyModel)
            ```

        Args:
            prefix: Include only keys with the given prefix (e.g. "foo/bar")
            exclude_prefix: Exclude keys with this prefix
            glob: Path-style glob pattern for keys to filter (e.g. "foo/**/*.json")
            serialization_mode: Serialize result ("auto", "raw", "pickle",
                "json"), overrides store settings
            deserialization_func: Specific function to use (ignores
                `serialization_mode`), overrides store settings
            model: Pydantic serialization model (ignores `serialization_mode`
                and `deserialization_func`), overrides store settings

        Returns:
            The matching values as a generator of any (serialized) type
        """
        for key in self.iterate_keys(prefix, exclude_prefix, glob):
            value = self.get(
                key,
                serialization_mode=serialization_mode,
                deserialization_func=deserialization_func,
                model=model,
            )
            if value is not None:
                yield value

    def checksum(
        self, key: Uri, algorithm: str | None = DEFAULT_HASH_ALGORITHM, **kwargs: Any
    ) -> str:
        """
        Get the checksum for the value at the given key

        Args:
            key: Key relative to store base uri
            algorithm: Checksum algorithm from `hashlib` (default: "sha1")
            **kwargs: Pass through arguments to content retrieval

        Returns:
            The computed checksum
        """
        kwargs = self.ensure_kwargs(**kwargs)
        kwargs["mode"] = "rb"
        key = self._keys.to_fs_key(key)
        with self._fs.open(key, **kwargs) as io:
            return make_checksum(io, algorithm or DEFAULT_HASH_ALGORITHM)

    def open(
        self, key: Uri, mode: str | None = DEFAULT_MODE, **kwargs: Any
    ) -> ContextManager[IO]:
        """
        Open the given key similar to built-in `open()`

        Example:
            ```python
            from anystore import get_store

            store = get_store()
            with store.open("foo/bar.txt") as fh:
                return fh.read()
            ```

        Args:
            key: Key relative to store base uri
            mode: Open mode ("rb", "wb", "r", "w")
            **kwargs: Pass through arguments to backend

        Returns:
            The open handler
        """
        mode = mode or DEFAULT_MODE
        kwargs = self.ensure_kwargs(**kwargs)
        key = self._keys.to_fs_key(key)
        if "w" in mode:
            self.ensure_parent(key)
        return self._fs.open(key, mode=mode, **kwargs)

    def touch(self, key: Uri, **kwargs: Any) -> datetime:
        """
        Store the current timestamp at the given key

        Args:
            key: Key relative to store base uri
            **kwargs: Any valid arguments for the stores `put` function

        Returns:
            The timestamp
        """
        now = datetime.now(timezone.utc)
        self.put(key, now, **kwargs)
        return now

    def ensure_parent(self, fs_key: Uri) -> None:
        """Ensure existence of parent path. This mostly only is relevant for
        stores on local filesystem"""
        if self.is_local:
            parent = Path(fs_key).parent
            self._fs.mkdirs(parent, exist_ok=True)

    def to_uri(self, key: Uri) -> str:
        return self._keys.to_absolute_uri(key)

    @contextlib.contextmanager
    def local_path(self, key: Uri) -> Generator[Path, None, None]:
        """
        Download the resource at `key` for temporary local processing and get
        its local path. If the file itself is already on the local filesystem,
        the actual file will be used. The file is cleaned up when leaving the
        context, unless it was a local file, it won't be deleted in any case.

        Example:
            ```python
            store = get_store("s3://bucket")
            with store.local_path("data.json") as path:
                do_something(path)
            ```
        Yields:
            The absolute temporary `path` as a `pathlib.Path` object
        """
        uri = self._keys.to_fs_key(key)
        tmp = None
        if self.is_local:
            path = uri_to_path(uri)
        else:
            from anystore.store.virtual import get_virtual_store

            tmp = get_virtual_store()
            tmp_store = tmp.__enter__()
            stream_bytes(str(key), self, tmp_store)
            path = uri_to_path(tmp_store._keys.to_fs_key(key))
        try:
            yield path
        finally:
            if tmp is not None:
                tmp.__exit__(None, None, None)

    @contextlib.contextmanager
    def local_open(
        self,
        key: Uri,
        algorithm: str | None = DEFAULT_HASH_ALGORITHM,
    ) -> Generator[VirtualIO, None, None]:
        """
        Download a file for temporary local processing and get its checksum and
        an open handler. If the file itself is already on the local filesystem,
        the actual file will be used. The file is cleaned up when leaving the
        context, except if it was a local file, it won't be deleted in any case.

        Example:
            ```python
            store = get_store("http://example.org")
            with r.local_open("test.txt") as fh:
                smart_write(uri=f"./local/{fh.checksum}", fh.read())
            ```

        Args:
            key: Key relative to store base uri
            algorithm: Checksum algorithm from `hashlib` (default: "sha1")

        Yields:
            A generic file-handler like context object. It has 3 extra attributes:
                - `checksum`
                - the absolute temporary `path` as a `pathlib.Path` object
                - [`info`][anystore.model.Stats] object
        """
        with self.local_path(key) as path:
            with path.open("rb") as fh:
                checksum = make_checksum(fh, algorithm or DEFAULT_HASH_ALGORITHM)
                fh.seek(0)
                yield VirtualIO(fh, checksum=checksum, path=path, info=self.info(key))

checksum(key, algorithm=DEFAULT_HASH_ALGORITHM, **kwargs)

Get the checksum for the value at the given key

Parameters:

Name Type Description Default
key Uri

Key relative to store base uri

required
algorithm str | None

Checksum algorithm from hashlib (default: "sha1")

DEFAULT_HASH_ALGORITHM
**kwargs Any

Pass through arguments to content retrieval

{}

Returns:

Type Description
str

The computed checksum

Source code in anystore/store/base.py
def checksum(
    self, key: Uri, algorithm: str | None = DEFAULT_HASH_ALGORITHM, **kwargs: Any
) -> str:
    """
    Get the checksum for the value at the given key

    Args:
        key: Key relative to store base uri
        algorithm: Checksum algorithm from `hashlib` (default: "sha1")
        **kwargs: Pass through arguments to content retrieval

    Returns:
        The computed checksum
    """
    kwargs = self.ensure_kwargs(**kwargs)
    kwargs["mode"] = "rb"
    key = self._keys.to_fs_key(key)
    with self._fs.open(key, **kwargs) as io:
        return make_checksum(io, algorithm or DEFAULT_HASH_ALGORITHM)

delete(key, ignore_errors=False)

Delete the content at the given key.

Parameters:

Name Type Description Default
key Uri

Key relative to store base uri

required
ignore_errors bool

Ignore exceptions if deletion fails

False
Source code in anystore/store/base.py
def delete(self, key: Uri, ignore_errors: bool = False) -> None:
    """
    Delete the content at the given key.

    Args:
        key: Key relative to store base uri
        ignore_errors: Ignore exceptions if deletion fails
    """
    key = self._keys.to_fs_key(key)
    try:
        self._fs.rm_file(key)
    except Exception as e:
        if not ignore_errors:
            raise e

ensure_parent(fs_key)

Ensure existence of parent path. This mostly only is relevant for stores on local filesystem

Source code in anystore/store/base.py
def ensure_parent(self, fs_key: Uri) -> None:
    """Ensure existence of parent path. This mostly only is relevant for
    stores on local filesystem"""
    if self.is_local:
        parent = Path(fs_key).parent
        self._fs.mkdirs(parent, exist_ok=True)

exists(key)

Check if the given key exists

Source code in anystore/store/base.py
def exists(self, key: Uri) -> bool:
    """Check if the given `key` exists"""
    key = self._keys.to_fs_key(key)
    if not self._check_ttl(key, raise_on_nonexist=False):
        return False
    return self._fs.exists(key)

get(key, raise_on_nonexist=None, serialization_mode=None, deserialization_func=None, model=None, **kwargs)

get(
    key: Uri,
    raise_on_nonexist: Literal[True],
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> V
get(
    key: Uri,
    raise_on_nonexist: Literal[False],
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> V | None
get(
    key: Uri,
    raise_on_nonexist: None = None,
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> V
get(
    key: Uri,
    raise_on_nonexist: None = None,
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> V | None

Get a value from the store for the given key

Parameters:

Name Type Description Default
key Uri

Key relative to store base uri

required
raise_on_nonexist bool | None

Raise DoesNotExist if key doesn't exist or stay silent, overrides store settings

None
serialization_mode Mode | None

Serialize result ("auto", "raw", "pickle", "json"), overrides store settings

None
deserialization_func Callable | None

Specific function to use (ignores serialization_mode), overrides store settings

None
model Model | None

Pydantic serialization model (ignores serialization_mode and deserialization_func), overrides store settings

None

Returns:

Type Description
V | None

The (optionally serialized) value for the key

Source code in anystore/store/base.py
def get(
    self,
    key: Uri,
    raise_on_nonexist: bool | None = None,
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs,
) -> V | None:
    """
    Get a value from the store for the given key

    Args:
        key: Key relative to store base uri
        raise_on_nonexist: Raise `DoesNotExist` if key doesn't exist or stay
            silent, overrides store settings
        serialization_mode: Serialize result ("auto", "raw", "pickle",
            "json"), overrides store settings
        deserialization_func: Specific function to use (ignores
            `serialization_mode`), overrides store settings
        model: Pydantic serialization model (ignores `serialization_mode`
            and `deserialization_func`), overrides store settings

    Returns:
        The (optionally serialized) value for the key
    """
    serialization_mode = serialization_mode or self.serialization_mode
    deserialization_func = deserialization_func or self.deserialization_func
    model = model or self.model
    if raise_on_nonexist is None:
        raise_on_nonexist = self.raise_on_nonexist
    kwargs = self.ensure_kwargs(**kwargs)
    kwargs.pop("mode", None)
    key = self._keys.to_fs_key(key)
    self._check_ttl(key, raise_on_nonexist)
    try:
        return from_store(
            self._fs.cat_file(key, **kwargs),
            serialization_mode,
            deserialization_func=deserialization_func,
            model=model,
        )
    except FileNotFoundError:  # fsspec
        if raise_on_nonexist:
            raise DoesNotExist(f"Key does not exist: `{key}`")
        return None

info(key)

Get metadata for the given key.

Returns:

Type Description
Stats

Key metadata

Source code in anystore/store/base.py
def info(self, key: Uri) -> Stats:
    """
    Get metadata for the given `key`.

    Returns:
        Key metadata
    """
    fs_key = self._keys.to_fs_key(key)
    info = self._fs.info(fs_key)
    name = Path(info.get("name", key)).name
    return Stats(
        **{
            **info,
            "name": name,
            "store": str(self.uri),
            "key": str(key),
        }
    )

iterate_keys(prefix=None, exclude_prefix=None, glob=None)

Iterate through all the keys in the store based on given criteria. Criteria can be combined (e.g. include but exclude a subset).

Example
for key in store.iterate_keys(prefix="dataset1", glob="*.pdf"):
    data = store.get(key, mode="raw")
    parse(data)

Parameters:

Name Type Description Default
prefix str | None

Include only keys with the given prefix (e.g. "foo/bar")

None
exclude_prefix str | None

Exclude keys with this prefix

None
glob str | None

Path-style glob pattern for keys to filter (e.g. "foo/*/.json")

None

Returns:

Type Description
None

The matching keys as a generator of strings

Source code in anystore/store/base.py
def iterate_keys(
    self,
    prefix: str | None = None,
    exclude_prefix: str | None = None,
    glob: str | None = None,
) -> Generator[str, None, None]:
    """
    Iterate through all the keys in the store based on given criteria.
    Criteria can be combined (e.g. include but exclude a subset).

    Example:
        ```python
        for key in store.iterate_keys(prefix="dataset1", glob="*.pdf"):
            data = store.get(key, mode="raw")
            parse(data)
        ```

    Args:
        prefix: Include only keys with the given prefix (e.g. "foo/bar")
        exclude_prefix: Exclude keys with this prefix
        glob: Path-style glob pattern for keys to filter (e.g. "foo/**/*.json")

    Returns:
        The matching keys as a generator of strings
    """
    if prefix:
        base = self._keys.to_fs_key(prefix)
    else:
        base = self._keys.key_prefix

    if hasattr(self._fs, "iter_find"):
        keys = self._fs.iter_find(base, glob=glob)
    elif glob:
        keys = self._fs.glob(f"{base}/{glob}")
    else:
        try:
            keys = self._fs.find(base)
        except FileNotFoundError:
            return
    for key in keys:
        rel = self._keys.from_fs_key(key)
        if exclude_prefix and rel.startswith(exclude_prefix):
            continue
        yield rel

iterate_values(prefix=None, exclude_prefix=None, glob=None, serialization_mode=None, deserialization_func=None, model=None)

Iterate through all the values in the store based on given criteria. Criteria can be combined (e.g. include but exclude a subset).

Example
yield from store.iterate_values(prefix="dataset1", glob="*.pdf", model=MyModel)

Parameters:

Name Type Description Default
prefix str | None

Include only keys with the given prefix (e.g. "foo/bar")

None
exclude_prefix str | None

Exclude keys with this prefix

None
glob str | None

Path-style glob pattern for keys to filter (e.g. "foo/*/.json")

None
serialization_mode Mode | None

Serialize result ("auto", "raw", "pickle", "json"), overrides store settings

None
deserialization_func Callable | None

Specific function to use (ignores serialization_mode), overrides store settings

None
model Model | None

Pydantic serialization model (ignores serialization_mode and deserialization_func), overrides store settings

None

Returns:

Type Description
None

The matching values as a generator of any (serialized) type

Source code in anystore/store/base.py
def iterate_values(
    self,
    prefix: str | None = None,
    exclude_prefix: str | None = None,
    glob: str | None = None,
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
) -> Generator[V, None, None]:
    """
    Iterate through all the values in the store based on given criteria.
    Criteria can be combined (e.g. include but exclude a subset).

    Example:
        ```python
        yield from store.iterate_values(prefix="dataset1", glob="*.pdf", model=MyModel)
        ```

    Args:
        prefix: Include only keys with the given prefix (e.g. "foo/bar")
        exclude_prefix: Exclude keys with this prefix
        glob: Path-style glob pattern for keys to filter (e.g. "foo/**/*.json")
        serialization_mode: Serialize result ("auto", "raw", "pickle",
            "json"), overrides store settings
        deserialization_func: Specific function to use (ignores
            `serialization_mode`), overrides store settings
        model: Pydantic serialization model (ignores `serialization_mode`
            and `deserialization_func`), overrides store settings

    Returns:
        The matching values as a generator of any (serialized) type
    """
    for key in self.iterate_keys(prefix, exclude_prefix, glob):
        value = self.get(
            key,
            serialization_mode=serialization_mode,
            deserialization_func=deserialization_func,
            model=model,
        )
        if value is not None:
            yield value

local_open(key, algorithm=DEFAULT_HASH_ALGORITHM)

Download a file for temporary local processing and get its checksum and an open handler. If the file itself is already on the local filesystem, the actual file will be used. The file is cleaned up when leaving the context, except if it was a local file, it won't be deleted in any case.

Example
store = get_store("http://example.org")
with r.local_open("test.txt") as fh:
    smart_write(uri=f"./local/{fh.checksum}", fh.read())

Parameters:

Name Type Description Default
key Uri

Key relative to store base uri

required
algorithm str | None

Checksum algorithm from hashlib (default: "sha1")

DEFAULT_HASH_ALGORITHM

Yields:

Type Description
VirtualIO

A generic file-handler like context object. It has 3 extra attributes: - checksum - the absolute temporary path as a pathlib.Path object - info object

Source code in anystore/store/base.py
@contextlib.contextmanager
def local_open(
    self,
    key: Uri,
    algorithm: str | None = DEFAULT_HASH_ALGORITHM,
) -> Generator[VirtualIO, None, None]:
    """
    Download a file for temporary local processing and get its checksum and
    an open handler. If the file itself is already on the local filesystem,
    the actual file will be used. The file is cleaned up when leaving the
    context, except if it was a local file, it won't be deleted in any case.

    Example:
        ```python
        store = get_store("http://example.org")
        with r.local_open("test.txt") as fh:
            smart_write(uri=f"./local/{fh.checksum}", fh.read())
        ```

    Args:
        key: Key relative to store base uri
        algorithm: Checksum algorithm from `hashlib` (default: "sha1")

    Yields:
        A generic file-handler like context object. It has 3 extra attributes:
            - `checksum`
            - the absolute temporary `path` as a `pathlib.Path` object
            - [`info`][anystore.model.Stats] object
    """
    with self.local_path(key) as path:
        with path.open("rb") as fh:
            checksum = make_checksum(fh, algorithm or DEFAULT_HASH_ALGORITHM)
            fh.seek(0)
            yield VirtualIO(fh, checksum=checksum, path=path, info=self.info(key))

local_path(key)

Download the resource at key for temporary local processing and get its local path. If the file itself is already on the local filesystem, the actual file will be used. The file is cleaned up when leaving the context, unless it was a local file, it won't be deleted in any case.

Example
store = get_store("s3://bucket")
with store.local_path("data.json") as path:
    do_something(path)

Yields: The absolute temporary path as a pathlib.Path object

Source code in anystore/store/base.py
@contextlib.contextmanager
def local_path(self, key: Uri) -> Generator[Path, None, None]:
    """
    Download the resource at `key` for temporary local processing and get
    its local path. If the file itself is already on the local filesystem,
    the actual file will be used. The file is cleaned up when leaving the
    context, unless it was a local file, it won't be deleted in any case.

    Example:
        ```python
        store = get_store("s3://bucket")
        with store.local_path("data.json") as path:
            do_something(path)
        ```
    Yields:
        The absolute temporary `path` as a `pathlib.Path` object
    """
    uri = self._keys.to_fs_key(key)
    tmp = None
    if self.is_local:
        path = uri_to_path(uri)
    else:
        from anystore.store.virtual import get_virtual_store

        tmp = get_virtual_store()
        tmp_store = tmp.__enter__()
        stream_bytes(str(key), self, tmp_store)
        path = uri_to_path(tmp_store._keys.to_fs_key(key))
    try:
        yield path
    finally:
        if tmp is not None:
            tmp.__exit__(None, None, None)

open(key, mode=DEFAULT_MODE, **kwargs)

Open the given key similar to built-in open()

Example
from anystore import get_store

store = get_store()
with store.open("foo/bar.txt") as fh:
    return fh.read()

Parameters:

Name Type Description Default
key Uri

Key relative to store base uri

required
mode str | None

Open mode ("rb", "wb", "r", "w")

DEFAULT_MODE
**kwargs Any

Pass through arguments to backend

{}

Returns:

Type Description
ContextManager[IO]

The open handler

Source code in anystore/store/base.py
def open(
    self, key: Uri, mode: str | None = DEFAULT_MODE, **kwargs: Any
) -> ContextManager[IO]:
    """
    Open the given key similar to built-in `open()`

    Example:
        ```python
        from anystore import get_store

        store = get_store()
        with store.open("foo/bar.txt") as fh:
            return fh.read()
        ```

    Args:
        key: Key relative to store base uri
        mode: Open mode ("rb", "wb", "r", "w")
        **kwargs: Pass through arguments to backend

    Returns:
        The open handler
    """
    mode = mode or DEFAULT_MODE
    kwargs = self.ensure_kwargs(**kwargs)
    key = self._keys.to_fs_key(key)
    if "w" in mode:
        self.ensure_parent(key)
    return self._fs.open(key, mode=mode, **kwargs)

pop(key, raise_on_nonexist=None, serialization_mode=None, deserialization_func=None, model=None, **kwargs)

pop(
    key: Uri,
    raise_on_nonexist: Literal[True],
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> V
pop(
    key: Uri,
    raise_on_nonexist: Literal[False],
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> V | None
pop(
    key: Uri,
    raise_on_nonexist: None = None,
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> V
pop(
    key: Uri,
    raise_on_nonexist: None = None,
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> V | None

Retrieve the value for the given key and remove it from the store.

Parameters:

Name Type Description Default
key Uri

Key relative to store base uri

required
raise_on_nonexist bool | None

Raise DoesNotExist if key doesn't exist or stay silent, overrides store settings

None
serialization_mode Mode | None

Serialize result ("auto", "raw", "pickle", "json"), overrides store settings

None
deserialization_func Callable | None

Specific function to use (ignores serialization_mode), overrides store settings

None
model Model | None

Pydantic serialization model (ignores serialization_mode and deserialization_func), overrides store settings

None
**kwargs

Any valid arguments for the stores get function

{}

Returns:

Type Description
V | None

The (optionally serialized) value for the key

Source code in anystore/store/base.py
def pop(
    self,
    key: Uri,
    raise_on_nonexist: bool | None = None,
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs,
) -> V | None:
    """
    Retrieve the value for the given key and remove it from the store.

    Args:
        key: Key relative to store base uri
        raise_on_nonexist: Raise `DoesNotExist` if key doesn't exist or stay
            silent, overrides store settings
        serialization_mode: Serialize result ("auto", "raw", "pickle",
            "json"), overrides store settings
        deserialization_func: Specific function to use (ignores
            `serialization_mode`), overrides store settings
        model: Pydantic serialization model (ignores `serialization_mode`
            and `deserialization_func`), overrides store settings
        **kwargs: Any valid arguments for the stores `get` function

    Returns:
        The (optionally serialized) value for the key
    """
    value = self.get(
        key,
        raise_on_nonexist=raise_on_nonexist,
        serialization_mode=serialization_mode,
        deserialization_func=deserialization_func,
        model=model,
        **kwargs,
    )
    self.delete(key)
    return value

put(key, value, serialization_mode=None, serialization_func=None, model=None, ttl=None, **kwargs)

Store a value at the given key

Parameters:

Name Type Description Default
key Uri

Key relative to store base uri

required
value V

The content

required
serialization_mode Mode | None

Serialize value prior to storing ("auto", "raw", "pickle", "json"), overrides store settings

None
serialization_func Callable | None

Specific function to use (ignores serialization_mode), overrides store settings

None
model Model | None

Pydantic serialization model (ignores serialization_mode and deserialization_func), overrides store settings

None
ttl int | None

Time to live (in seconds) for that key if the backend supports it (e.g. redis, sql)

None
Source code in anystore/store/base.py
def put(
    self,
    key: Uri,
    value: V,
    serialization_mode: Mode | None = None,
    serialization_func: Callable | None = None,
    model: Model | None = None,
    ttl: int | None = None,
    **kwargs,
):
    """
    Store a value at the given key

    Args:
        key: Key relative to store base uri
        value: The content
        serialization_mode: Serialize value prior to storing ("auto", "raw",
            "pickle", "json"), overrides store settings
        serialization_func: Specific function to use (ignores
            `serialization_mode`), overrides store settings
        model: Pydantic serialization model (ignores `serialization_mode`
            and `deserialization_func`), overrides store settings
        ttl: Time to live (in seconds) for that key if the backend supports
            it (e.g. redis, sql)
    """
    if value is None and not self.store_none_values:
        return
    serialization_mode = serialization_mode or self.serialization_mode
    serialization_func = serialization_func or self.serialization_func
    model = model or self.model
    kwargs = self.ensure_kwargs(**kwargs)
    ttl = ttl or self.default_ttl or None
    key = self._keys.to_fs_key(key)
    self.ensure_parent(key)
    with self._fs.open(key, "wb", ttl=ttl) as o:
        o.write(
            to_store(
                value,
                serialization_mode,
                serialization_func=serialization_func,
                model=model,
            )
        )

stream(key, raise_on_nonexist=None, serialization_mode=None, deserialization_func=None, model=None, **kwargs)

stream(
    key: Uri,
    raise_on_nonexist: Literal[True],
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> Generator[V, None, None]
stream(
    key: Uri,
    raise_on_nonexist: Literal[False],
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> Generator[V, None, None] | None
stream(
    key: Uri,
    raise_on_nonexist: None = None,
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> Generator[V, None, None]
stream(
    key: Uri,
    raise_on_nonexist: None = None,
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> Generator[V, None, None] | None

Stream a value line by line from the store for the given key

Parameters:

Name Type Description Default
key Uri

Key relative to store base uri

required
raise_on_nonexist bool | None

Raise DoesNotExist if key doesn't exist or stay silent, overrides store settings

None
serialization_mode Mode | None

Serialize result ("auto", "raw", "pickle", "json"), overrides store settings

None
deserialization_func Callable | None

Specific function to use (ignores serialization_mode), overrides store settings

None
model Model | None

Pydantic serialization model (ignores serialization_mode and deserialization_func), overrides store settings

None

Yields:

Type Description
Generator[V, None, None] | None

The (optionally serialized) values line by line

Raises:

Type Description
DoesNotExists

If key doesn't exist and raise_on_nonexist=True

Source code in anystore/store/base.py
def stream(
    self,
    key: Uri,
    raise_on_nonexist: bool | None = None,
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs,
) -> Generator[V, None, None] | None:
    """
    Stream a value line by line from the store for the given key

    Args:
        key: Key relative to store base uri
        raise_on_nonexist: Raise `DoesNotExist` if key doesn't exist or stay
            silent, overrides store settings
        serialization_mode: Serialize result ("auto", "raw", "pickle",
            "json"), overrides store settings
        deserialization_func: Specific function to use (ignores
            `serialization_mode`), overrides store settings
        model: Pydantic serialization model (ignores `serialization_mode`
            and `deserialization_func`), overrides store settings

    Yields:
        The (optionally serialized) values line by line

    Raises:
        anystore.exceptions.DoesNotExists: If key doesn't exist and
            raise_on_nonexist=True
    """
    model = model or self.model
    extra_kwargs = {
        "serialization_mode": serialization_mode or self.serialization_mode,
        "deserialization_func": deserialization_func or self.deserialization_func,
        "model": model,
    }
    try:
        with self.open(key) as i:
            for line in iter_lines(i):
                yield from_store(line, **extra_kwargs)
    except FileNotFoundError:
        if raise_on_nonexist is True or self.raise_on_nonexist:
            raise DoesNotExist(f"Key does not exist: `{key}`")
        return None

touch(key, **kwargs)

Store the current timestamp at the given key

Parameters:

Name Type Description Default
key Uri

Key relative to store base uri

required
**kwargs Any

Any valid arguments for the stores put function

{}

Returns:

Type Description
datetime

The timestamp

Source code in anystore/store/base.py
def touch(self, key: Uri, **kwargs: Any) -> datetime:
    """
    Store the current timestamp at the given key

    Args:
        key: Key relative to store base uri
        **kwargs: Any valid arguments for the stores `put` function

    Returns:
        The timestamp
    """
    now = datetime.now(timezone.utc)
    self.put(key, now, **kwargs)
    return now

get_store(uri=None, settings=None, **kwargs)

Short-hand initializer for a new store. The call is cached during runtime if input doesn't change.

Example
from anystore import get_store

# initialize from current configuration
store = get_store()
# get a redis store with custom prefix
store = get_store("redis://localhost", backend_config={"redis_prefix": "foo"})

Parameters:

Name Type Description Default
uri Uri | None

Store base uri, if relative it is considered as a local file store, otherwise the store backend is inferred from the scheme. If omitted, store is derived from settings defaults (taking current environment into account).

None
**kwargs Any

pass through storage-specific options

{}

Returns:

Type Description
Store

A Store instance

Source code in anystore/store/__init__.py
def get_store(
    uri: Uri | None = None, settings: Settings | None = None, **kwargs: Any
) -> Store:
    """
    Short-hand initializer for a new store. The call is cached during runtime if
    input doesn't change.

    Example:
        ```python
        from anystore import get_store

        # initialize from current configuration
        store = get_store()
        # get a redis store with custom prefix
        store = get_store("redis://localhost", backend_config={"redis_prefix": "foo"})
        ```

    Args:
        uri: Store base uri, if relative it is considered as a local file store,
             otherwise the store backend is inferred from the scheme. If omitted,
             store is derived from settings defaults (taking current environment
             into account).
        **kwargs: pass through storage-specific options

    Returns:
        A `Store` instance
    """
    settings = settings or Settings()
    kwargs = {**{"backend_config": settings.backend_config}, **kwargs}
    if uri is None:
        if settings.yaml_uri is not None:
            return Store.from_yaml_uri(settings.yaml_uri, **kwargs)
        if settings.json_uri is not None:
            return Store.from_json_uri(settings.json_uri, **kwargs)
        uri = settings.uri
    uri = ensure_uri(uri)

    # Cache per (uri, thread) to avoid re-creating stores
    cache_key = make_data_checksum((str(uri), kwargs, threading.get_ident()))
    with _store_lock:
        if cache_key in _store_cache:
            return _store_cache[cache_key]

    store = Store(uri=uri, **kwargs)
    # test if backend fs is available, raises ImportError if not
    _ = store._fs

    with _store_lock:
        _store_cache[cache_key] = store
    return store

Base store interface

The store class provides the top-level interface regardless for the storage backend.

Store

Bases: StoreModel, Generic[V, Raise]

Source code in anystore/store/base.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
class Store(StoreModel, Generic[V, Raise]):
    @cached_property
    def _fs(self) -> fsspec.AbstractFileSystem:
        return fsspec.url_to_fs(self.uri, **self.ensure_kwargs())[0]

    @cached_property
    def _keys(self) -> Keys:
        return Keys(self.uri)

    # Explicit raise_on_nonexist=True always returns V
    @overload
    def get(
        self,
        key: Uri,
        raise_on_nonexist: Literal[True],
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> V:
        pass

    # Explicit raise_on_nonexist=False always returns V | None
    @overload
    def get(
        self,
        key: Uri,
        raise_on_nonexist: Literal[False],
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> V | None:
        pass

    # Store configured with raise_on_nonexist=True, param is None -> returns V
    @overload
    def get(
        self: "Store[V, Literal[True]]",
        key: Uri,
        raise_on_nonexist: None = None,
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> V:
        pass

    # Default case (store configured with False or unknown) -> returns V | None
    @overload
    def get(
        self,
        key: Uri,
        raise_on_nonexist: None = None,
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> V | None:
        pass

    def get(
        self,
        key: Uri,
        raise_on_nonexist: bool | None = None,
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> V | None:
        """
        Get a value from the store for the given key

        Args:
            key: Key relative to store base uri
            raise_on_nonexist: Raise `DoesNotExist` if key doesn't exist or stay
                silent, overrides store settings
            serialization_mode: Serialize result ("auto", "raw", "pickle",
                "json"), overrides store settings
            deserialization_func: Specific function to use (ignores
                `serialization_mode`), overrides store settings
            model: Pydantic serialization model (ignores `serialization_mode`
                and `deserialization_func`), overrides store settings

        Returns:
            The (optionally serialized) value for the key
        """
        serialization_mode = serialization_mode or self.serialization_mode
        deserialization_func = deserialization_func or self.deserialization_func
        model = model or self.model
        if raise_on_nonexist is None:
            raise_on_nonexist = self.raise_on_nonexist
        kwargs = self.ensure_kwargs(**kwargs)
        kwargs.pop("mode", None)
        key = self._keys.to_fs_key(key)
        self._check_ttl(key, raise_on_nonexist)
        try:
            return from_store(
                self._fs.cat_file(key, **kwargs),
                serialization_mode,
                deserialization_func=deserialization_func,
                model=model,
            )
        except FileNotFoundError:  # fsspec
            if raise_on_nonexist:
                raise DoesNotExist(f"Key does not exist: `{key}`")
            return None

    # Explicit raise_on_nonexist=True always returns V
    @overload
    def pop(
        self,
        key: Uri,
        raise_on_nonexist: Literal[True],
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> V:
        pass

    # Explicit raise_on_nonexist=False always returns V | None
    @overload
    def pop(
        self,
        key: Uri,
        raise_on_nonexist: Literal[False],
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> V | None:
        pass

    # Store configured with raise_on_nonexist=True, param is None -> returns V
    @overload
    def pop(
        self: "Store[V, Literal[True]]",
        key: Uri,
        raise_on_nonexist: None = None,
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> V:
        pass

    # Default case (store configured with False or unknown) -> returns V | None
    @overload
    def pop(
        self,
        key: Uri,
        raise_on_nonexist: None = None,
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> V | None:
        pass

    def pop(
        self,
        key: Uri,
        raise_on_nonexist: bool | None = None,
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> V | None:
        """
        Retrieve the value for the given key and remove it from the store.

        Args:
            key: Key relative to store base uri
            raise_on_nonexist: Raise `DoesNotExist` if key doesn't exist or stay
                silent, overrides store settings
            serialization_mode: Serialize result ("auto", "raw", "pickle",
                "json"), overrides store settings
            deserialization_func: Specific function to use (ignores
                `serialization_mode`), overrides store settings
            model: Pydantic serialization model (ignores `serialization_mode`
                and `deserialization_func`), overrides store settings
            **kwargs: Any valid arguments for the stores `get` function

        Returns:
            The (optionally serialized) value for the key
        """
        value = self.get(
            key,
            raise_on_nonexist=raise_on_nonexist,
            serialization_mode=serialization_mode,
            deserialization_func=deserialization_func,
            model=model,
            **kwargs,
        )
        self.delete(key)
        return value

    def delete(self, key: Uri, ignore_errors: bool = False) -> None:
        """
        Delete the content at the given key.

        Args:
            key: Key relative to store base uri
            ignore_errors: Ignore exceptions if deletion fails
        """
        key = self._keys.to_fs_key(key)
        try:
            self._fs.rm_file(key)
        except Exception as e:
            if not ignore_errors:
                raise e

    # Explicit raise_on_nonexist=True always returns Generator
    @overload
    def stream(
        self,
        key: Uri,
        raise_on_nonexist: Literal[True],
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> Generator[V, None, None]:
        pass

    # Explicit raise_on_nonexist=False always returns Generator | None
    @overload
    def stream(
        self,
        key: Uri,
        raise_on_nonexist: Literal[False],
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> Generator[V, None, None] | None:
        pass

    # Store configured with raise_on_nonexist=True, param is None -> returns Generator
    @overload
    def stream(
        self: "Store[V, Literal[True]]",
        key: Uri,
        raise_on_nonexist: None = None,
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> Generator[V, None, None]:
        pass

    # Default case (store configured with False or unknown) -> returns Generator | None
    @overload
    def stream(
        self,
        key: Uri,
        raise_on_nonexist: None = None,
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> Generator[V, None, None] | None:
        pass

    def stream(
        self,
        key: Uri,
        raise_on_nonexist: bool | None = None,
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
        **kwargs,
    ) -> Generator[V, None, None] | None:
        """
        Stream a value line by line from the store for the given key

        Args:
            key: Key relative to store base uri
            raise_on_nonexist: Raise `DoesNotExist` if key doesn't exist or stay
                silent, overrides store settings
            serialization_mode: Serialize result ("auto", "raw", "pickle",
                "json"), overrides store settings
            deserialization_func: Specific function to use (ignores
                `serialization_mode`), overrides store settings
            model: Pydantic serialization model (ignores `serialization_mode`
                and `deserialization_func`), overrides store settings

        Yields:
            The (optionally serialized) values line by line

        Raises:
            anystore.exceptions.DoesNotExists: If key doesn't exist and
                raise_on_nonexist=True
        """
        model = model or self.model
        extra_kwargs = {
            "serialization_mode": serialization_mode or self.serialization_mode,
            "deserialization_func": deserialization_func or self.deserialization_func,
            "model": model,
        }
        try:
            with self.open(key) as i:
                for line in iter_lines(i):
                    yield from_store(line, **extra_kwargs)
        except FileNotFoundError:
            if raise_on_nonexist is True or self.raise_on_nonexist:
                raise DoesNotExist(f"Key does not exist: `{key}`")
            return None

    def put(
        self,
        key: Uri,
        value: V,
        serialization_mode: Mode | None = None,
        serialization_func: Callable | None = None,
        model: Model | None = None,
        ttl: int | None = None,
        **kwargs,
    ):
        """
        Store a value at the given key

        Args:
            key: Key relative to store base uri
            value: The content
            serialization_mode: Serialize value prior to storing ("auto", "raw",
                "pickle", "json"), overrides store settings
            serialization_func: Specific function to use (ignores
                `serialization_mode`), overrides store settings
            model: Pydantic serialization model (ignores `serialization_mode`
                and `deserialization_func`), overrides store settings
            ttl: Time to live (in seconds) for that key if the backend supports
                it (e.g. redis, sql)
        """
        if value is None and not self.store_none_values:
            return
        serialization_mode = serialization_mode or self.serialization_mode
        serialization_func = serialization_func or self.serialization_func
        model = model or self.model
        kwargs = self.ensure_kwargs(**kwargs)
        ttl = ttl or self.default_ttl or None
        key = self._keys.to_fs_key(key)
        self.ensure_parent(key)
        with self._fs.open(key, "wb", ttl=ttl) as o:
            o.write(
                to_store(
                    value,
                    serialization_mode,
                    serialization_func=serialization_func,
                    model=model,
                )
            )

    def _check_ttl(self, fs_key: str, raise_on_nonexist: bool | None = True) -> bool:
        """Check if key is expired by TTL; delete and return False if so."""
        if not self.default_ttl:
            return True
        try:
            info = Info(**self._fs.info(fs_key))
            if info.created_at is None:
                return True
            now = datetime.now(timezone.utc)
            if (now - info.created_at).total_seconds() > self.default_ttl:
                self._fs.rm_file(fs_key)
                return False
            return True
        except FileNotFoundError:  # fsspec
            if raise_on_nonexist:
                raise DoesNotExist(
                    f"Key does not exist: `{self._keys.to_fs_key(fs_key)}`"
                )
            return False

    def exists(self, key: Uri) -> bool:
        """Check if the given `key` exists"""
        key = self._keys.to_fs_key(key)
        if not self._check_ttl(key, raise_on_nonexist=False):
            return False
        return self._fs.exists(key)

    def info(self, key: Uri) -> Stats:
        """
        Get metadata for the given `key`.

        Returns:
            Key metadata
        """
        fs_key = self._keys.to_fs_key(key)
        info = self._fs.info(fs_key)
        name = Path(info.get("name", key)).name
        return Stats(
            **{
                **info,
                "name": name,
                "store": str(self.uri),
                "key": str(key),
            }
        )

    def ensure_kwargs(self, **kwargs) -> dict[str, Any]:
        config = clean_dict(self.backend_config)
        return {**config, **clean_dict(kwargs)}

    def iterate_keys(
        self,
        prefix: str | None = None,
        exclude_prefix: str | None = None,
        glob: str | None = None,
    ) -> Generator[str, None, None]:
        """
        Iterate through all the keys in the store based on given criteria.
        Criteria can be combined (e.g. include but exclude a subset).

        Example:
            ```python
            for key in store.iterate_keys(prefix="dataset1", glob="*.pdf"):
                data = store.get(key, mode="raw")
                parse(data)
            ```

        Args:
            prefix: Include only keys with the given prefix (e.g. "foo/bar")
            exclude_prefix: Exclude keys with this prefix
            glob: Path-style glob pattern for keys to filter (e.g. "foo/**/*.json")

        Returns:
            The matching keys as a generator of strings
        """
        if prefix:
            base = self._keys.to_fs_key(prefix)
        else:
            base = self._keys.key_prefix

        if hasattr(self._fs, "iter_find"):
            keys = self._fs.iter_find(base, glob=glob)
        elif glob:
            keys = self._fs.glob(f"{base}/{glob}")
        else:
            try:
                keys = self._fs.find(base)
            except FileNotFoundError:
                return
        for key in keys:
            rel = self._keys.from_fs_key(key)
            if exclude_prefix and rel.startswith(exclude_prefix):
                continue
            yield rel

    def iterate_values(
        self,
        prefix: str | None = None,
        exclude_prefix: str | None = None,
        glob: str | None = None,
        serialization_mode: Mode | None = None,
        deserialization_func: Callable | None = None,
        model: Model | None = None,
    ) -> Generator[V, None, None]:
        """
        Iterate through all the values in the store based on given criteria.
        Criteria can be combined (e.g. include but exclude a subset).

        Example:
            ```python
            yield from store.iterate_values(prefix="dataset1", glob="*.pdf", model=MyModel)
            ```

        Args:
            prefix: Include only keys with the given prefix (e.g. "foo/bar")
            exclude_prefix: Exclude keys with this prefix
            glob: Path-style glob pattern for keys to filter (e.g. "foo/**/*.json")
            serialization_mode: Serialize result ("auto", "raw", "pickle",
                "json"), overrides store settings
            deserialization_func: Specific function to use (ignores
                `serialization_mode`), overrides store settings
            model: Pydantic serialization model (ignores `serialization_mode`
                and `deserialization_func`), overrides store settings

        Returns:
            The matching values as a generator of any (serialized) type
        """
        for key in self.iterate_keys(prefix, exclude_prefix, glob):
            value = self.get(
                key,
                serialization_mode=serialization_mode,
                deserialization_func=deserialization_func,
                model=model,
            )
            if value is not None:
                yield value

    def checksum(
        self, key: Uri, algorithm: str | None = DEFAULT_HASH_ALGORITHM, **kwargs: Any
    ) -> str:
        """
        Get the checksum for the value at the given key

        Args:
            key: Key relative to store base uri
            algorithm: Checksum algorithm from `hashlib` (default: "sha1")
            **kwargs: Pass through arguments to content retrieval

        Returns:
            The computed checksum
        """
        kwargs = self.ensure_kwargs(**kwargs)
        kwargs["mode"] = "rb"
        key = self._keys.to_fs_key(key)
        with self._fs.open(key, **kwargs) as io:
            return make_checksum(io, algorithm or DEFAULT_HASH_ALGORITHM)

    def open(
        self, key: Uri, mode: str | None = DEFAULT_MODE, **kwargs: Any
    ) -> ContextManager[IO]:
        """
        Open the given key similar to built-in `open()`

        Example:
            ```python
            from anystore import get_store

            store = get_store()
            with store.open("foo/bar.txt") as fh:
                return fh.read()
            ```

        Args:
            key: Key relative to store base uri
            mode: Open mode ("rb", "wb", "r", "w")
            **kwargs: Pass through arguments to backend

        Returns:
            The open handler
        """
        mode = mode or DEFAULT_MODE
        kwargs = self.ensure_kwargs(**kwargs)
        key = self._keys.to_fs_key(key)
        if "w" in mode:
            self.ensure_parent(key)
        return self._fs.open(key, mode=mode, **kwargs)

    def touch(self, key: Uri, **kwargs: Any) -> datetime:
        """
        Store the current timestamp at the given key

        Args:
            key: Key relative to store base uri
            **kwargs: Any valid arguments for the stores `put` function

        Returns:
            The timestamp
        """
        now = datetime.now(timezone.utc)
        self.put(key, now, **kwargs)
        return now

    def ensure_parent(self, fs_key: Uri) -> None:
        """Ensure existence of parent path. This mostly only is relevant for
        stores on local filesystem"""
        if self.is_local:
            parent = Path(fs_key).parent
            self._fs.mkdirs(parent, exist_ok=True)

    def to_uri(self, key: Uri) -> str:
        return self._keys.to_absolute_uri(key)

    @contextlib.contextmanager
    def local_path(self, key: Uri) -> Generator[Path, None, None]:
        """
        Download the resource at `key` for temporary local processing and get
        its local path. If the file itself is already on the local filesystem,
        the actual file will be used. The file is cleaned up when leaving the
        context, unless it was a local file, it won't be deleted in any case.

        Example:
            ```python
            store = get_store("s3://bucket")
            with store.local_path("data.json") as path:
                do_something(path)
            ```
        Yields:
            The absolute temporary `path` as a `pathlib.Path` object
        """
        uri = self._keys.to_fs_key(key)
        tmp = None
        if self.is_local:
            path = uri_to_path(uri)
        else:
            from anystore.store.virtual import get_virtual_store

            tmp = get_virtual_store()
            tmp_store = tmp.__enter__()
            stream_bytes(str(key), self, tmp_store)
            path = uri_to_path(tmp_store._keys.to_fs_key(key))
        try:
            yield path
        finally:
            if tmp is not None:
                tmp.__exit__(None, None, None)

    @contextlib.contextmanager
    def local_open(
        self,
        key: Uri,
        algorithm: str | None = DEFAULT_HASH_ALGORITHM,
    ) -> Generator[VirtualIO, None, None]:
        """
        Download a file for temporary local processing and get its checksum and
        an open handler. If the file itself is already on the local filesystem,
        the actual file will be used. The file is cleaned up when leaving the
        context, except if it was a local file, it won't be deleted in any case.

        Example:
            ```python
            store = get_store("http://example.org")
            with r.local_open("test.txt") as fh:
                smart_write(uri=f"./local/{fh.checksum}", fh.read())
            ```

        Args:
            key: Key relative to store base uri
            algorithm: Checksum algorithm from `hashlib` (default: "sha1")

        Yields:
            A generic file-handler like context object. It has 3 extra attributes:
                - `checksum`
                - the absolute temporary `path` as a `pathlib.Path` object
                - [`info`][anystore.model.Stats] object
        """
        with self.local_path(key) as path:
            with path.open("rb") as fh:
                checksum = make_checksum(fh, algorithm or DEFAULT_HASH_ALGORITHM)
                fh.seek(0)
                yield VirtualIO(fh, checksum=checksum, path=path, info=self.info(key))

checksum(key, algorithm=DEFAULT_HASH_ALGORITHM, **kwargs)

Get the checksum for the value at the given key

Parameters:

Name Type Description Default
key Uri

Key relative to store base uri

required
algorithm str | None

Checksum algorithm from hashlib (default: "sha1")

DEFAULT_HASH_ALGORITHM
**kwargs Any

Pass through arguments to content retrieval

{}

Returns:

Type Description
str

The computed checksum

Source code in anystore/store/base.py
def checksum(
    self, key: Uri, algorithm: str | None = DEFAULT_HASH_ALGORITHM, **kwargs: Any
) -> str:
    """
    Get the checksum for the value at the given key

    Args:
        key: Key relative to store base uri
        algorithm: Checksum algorithm from `hashlib` (default: "sha1")
        **kwargs: Pass through arguments to content retrieval

    Returns:
        The computed checksum
    """
    kwargs = self.ensure_kwargs(**kwargs)
    kwargs["mode"] = "rb"
    key = self._keys.to_fs_key(key)
    with self._fs.open(key, **kwargs) as io:
        return make_checksum(io, algorithm or DEFAULT_HASH_ALGORITHM)

delete(key, ignore_errors=False)

Delete the content at the given key.

Parameters:

Name Type Description Default
key Uri

Key relative to store base uri

required
ignore_errors bool

Ignore exceptions if deletion fails

False
Source code in anystore/store/base.py
def delete(self, key: Uri, ignore_errors: bool = False) -> None:
    """
    Delete the content at the given key.

    Args:
        key: Key relative to store base uri
        ignore_errors: Ignore exceptions if deletion fails
    """
    key = self._keys.to_fs_key(key)
    try:
        self._fs.rm_file(key)
    except Exception as e:
        if not ignore_errors:
            raise e

ensure_parent(fs_key)

Ensure existence of parent path. This mostly only is relevant for stores on local filesystem

Source code in anystore/store/base.py
def ensure_parent(self, fs_key: Uri) -> None:
    """Ensure existence of parent path. This mostly only is relevant for
    stores on local filesystem"""
    if self.is_local:
        parent = Path(fs_key).parent
        self._fs.mkdirs(parent, exist_ok=True)

exists(key)

Check if the given key exists

Source code in anystore/store/base.py
def exists(self, key: Uri) -> bool:
    """Check if the given `key` exists"""
    key = self._keys.to_fs_key(key)
    if not self._check_ttl(key, raise_on_nonexist=False):
        return False
    return self._fs.exists(key)

get(key, raise_on_nonexist=None, serialization_mode=None, deserialization_func=None, model=None, **kwargs)

get(
    key: Uri,
    raise_on_nonexist: Literal[True],
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> V
get(
    key: Uri,
    raise_on_nonexist: Literal[False],
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> V | None
get(
    key: Uri,
    raise_on_nonexist: None = None,
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> V
get(
    key: Uri,
    raise_on_nonexist: None = None,
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> V | None

Get a value from the store for the given key

Parameters:

Name Type Description Default
key Uri

Key relative to store base uri

required
raise_on_nonexist bool | None

Raise DoesNotExist if key doesn't exist or stay silent, overrides store settings

None
serialization_mode Mode | None

Serialize result ("auto", "raw", "pickle", "json"), overrides store settings

None
deserialization_func Callable | None

Specific function to use (ignores serialization_mode), overrides store settings

None
model Model | None

Pydantic serialization model (ignores serialization_mode and deserialization_func), overrides store settings

None

Returns:

Type Description
V | None

The (optionally serialized) value for the key

Source code in anystore/store/base.py
def get(
    self,
    key: Uri,
    raise_on_nonexist: bool | None = None,
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs,
) -> V | None:
    """
    Get a value from the store for the given key

    Args:
        key: Key relative to store base uri
        raise_on_nonexist: Raise `DoesNotExist` if key doesn't exist or stay
            silent, overrides store settings
        serialization_mode: Serialize result ("auto", "raw", "pickle",
            "json"), overrides store settings
        deserialization_func: Specific function to use (ignores
            `serialization_mode`), overrides store settings
        model: Pydantic serialization model (ignores `serialization_mode`
            and `deserialization_func`), overrides store settings

    Returns:
        The (optionally serialized) value for the key
    """
    serialization_mode = serialization_mode or self.serialization_mode
    deserialization_func = deserialization_func or self.deserialization_func
    model = model or self.model
    if raise_on_nonexist is None:
        raise_on_nonexist = self.raise_on_nonexist
    kwargs = self.ensure_kwargs(**kwargs)
    kwargs.pop("mode", None)
    key = self._keys.to_fs_key(key)
    self._check_ttl(key, raise_on_nonexist)
    try:
        return from_store(
            self._fs.cat_file(key, **kwargs),
            serialization_mode,
            deserialization_func=deserialization_func,
            model=model,
        )
    except FileNotFoundError:  # fsspec
        if raise_on_nonexist:
            raise DoesNotExist(f"Key does not exist: `{key}`")
        return None

info(key)

Get metadata for the given key.

Returns:

Type Description
Stats

Key metadata

Source code in anystore/store/base.py
def info(self, key: Uri) -> Stats:
    """
    Get metadata for the given `key`.

    Returns:
        Key metadata
    """
    fs_key = self._keys.to_fs_key(key)
    info = self._fs.info(fs_key)
    name = Path(info.get("name", key)).name
    return Stats(
        **{
            **info,
            "name": name,
            "store": str(self.uri),
            "key": str(key),
        }
    )

iterate_keys(prefix=None, exclude_prefix=None, glob=None)

Iterate through all the keys in the store based on given criteria. Criteria can be combined (e.g. include but exclude a subset).

Example
for key in store.iterate_keys(prefix="dataset1", glob="*.pdf"):
    data = store.get(key, mode="raw")
    parse(data)

Parameters:

Name Type Description Default
prefix str | None

Include only keys with the given prefix (e.g. "foo/bar")

None
exclude_prefix str | None

Exclude keys with this prefix

None
glob str | None

Path-style glob pattern for keys to filter (e.g. "foo/*/.json")

None

Returns:

Type Description
None

The matching keys as a generator of strings

Source code in anystore/store/base.py
def iterate_keys(
    self,
    prefix: str | None = None,
    exclude_prefix: str | None = None,
    glob: str | None = None,
) -> Generator[str, None, None]:
    """
    Iterate through all the keys in the store based on given criteria.
    Criteria can be combined (e.g. include but exclude a subset).

    Example:
        ```python
        for key in store.iterate_keys(prefix="dataset1", glob="*.pdf"):
            data = store.get(key, mode="raw")
            parse(data)
        ```

    Args:
        prefix: Include only keys with the given prefix (e.g. "foo/bar")
        exclude_prefix: Exclude keys with this prefix
        glob: Path-style glob pattern for keys to filter (e.g. "foo/**/*.json")

    Returns:
        The matching keys as a generator of strings
    """
    if prefix:
        base = self._keys.to_fs_key(prefix)
    else:
        base = self._keys.key_prefix

    if hasattr(self._fs, "iter_find"):
        keys = self._fs.iter_find(base, glob=glob)
    elif glob:
        keys = self._fs.glob(f"{base}/{glob}")
    else:
        try:
            keys = self._fs.find(base)
        except FileNotFoundError:
            return
    for key in keys:
        rel = self._keys.from_fs_key(key)
        if exclude_prefix and rel.startswith(exclude_prefix):
            continue
        yield rel

iterate_values(prefix=None, exclude_prefix=None, glob=None, serialization_mode=None, deserialization_func=None, model=None)

Iterate through all the values in the store based on given criteria. Criteria can be combined (e.g. include but exclude a subset).

Example
yield from store.iterate_values(prefix="dataset1", glob="*.pdf", model=MyModel)

Parameters:

Name Type Description Default
prefix str | None

Include only keys with the given prefix (e.g. "foo/bar")

None
exclude_prefix str | None

Exclude keys with this prefix

None
glob str | None

Path-style glob pattern for keys to filter (e.g. "foo/*/.json")

None
serialization_mode Mode | None

Serialize result ("auto", "raw", "pickle", "json"), overrides store settings

None
deserialization_func Callable | None

Specific function to use (ignores serialization_mode), overrides store settings

None
model Model | None

Pydantic serialization model (ignores serialization_mode and deserialization_func), overrides store settings

None

Returns:

Type Description
None

The matching values as a generator of any (serialized) type

Source code in anystore/store/base.py
def iterate_values(
    self,
    prefix: str | None = None,
    exclude_prefix: str | None = None,
    glob: str | None = None,
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
) -> Generator[V, None, None]:
    """
    Iterate through all the values in the store based on given criteria.
    Criteria can be combined (e.g. include but exclude a subset).

    Example:
        ```python
        yield from store.iterate_values(prefix="dataset1", glob="*.pdf", model=MyModel)
        ```

    Args:
        prefix: Include only keys with the given prefix (e.g. "foo/bar")
        exclude_prefix: Exclude keys with this prefix
        glob: Path-style glob pattern for keys to filter (e.g. "foo/**/*.json")
        serialization_mode: Serialize result ("auto", "raw", "pickle",
            "json"), overrides store settings
        deserialization_func: Specific function to use (ignores
            `serialization_mode`), overrides store settings
        model: Pydantic serialization model (ignores `serialization_mode`
            and `deserialization_func`), overrides store settings

    Returns:
        The matching values as a generator of any (serialized) type
    """
    for key in self.iterate_keys(prefix, exclude_prefix, glob):
        value = self.get(
            key,
            serialization_mode=serialization_mode,
            deserialization_func=deserialization_func,
            model=model,
        )
        if value is not None:
            yield value

local_open(key, algorithm=DEFAULT_HASH_ALGORITHM)

Download a file for temporary local processing and get its checksum and an open handler. If the file itself is already on the local filesystem, the actual file will be used. The file is cleaned up when leaving the context, except if it was a local file, it won't be deleted in any case.

Example
store = get_store("http://example.org")
with r.local_open("test.txt") as fh:
    smart_write(uri=f"./local/{fh.checksum}", fh.read())

Parameters:

Name Type Description Default
key Uri

Key relative to store base uri

required
algorithm str | None

Checksum algorithm from hashlib (default: "sha1")

DEFAULT_HASH_ALGORITHM

Yields:

Type Description
VirtualIO

A generic file-handler like context object. It has 3 extra attributes: - checksum - the absolute temporary path as a pathlib.Path object - info object

Source code in anystore/store/base.py
@contextlib.contextmanager
def local_open(
    self,
    key: Uri,
    algorithm: str | None = DEFAULT_HASH_ALGORITHM,
) -> Generator[VirtualIO, None, None]:
    """
    Download a file for temporary local processing and get its checksum and
    an open handler. If the file itself is already on the local filesystem,
    the actual file will be used. The file is cleaned up when leaving the
    context, except if it was a local file, it won't be deleted in any case.

    Example:
        ```python
        store = get_store("http://example.org")
        with r.local_open("test.txt") as fh:
            smart_write(uri=f"./local/{fh.checksum}", fh.read())
        ```

    Args:
        key: Key relative to store base uri
        algorithm: Checksum algorithm from `hashlib` (default: "sha1")

    Yields:
        A generic file-handler like context object. It has 3 extra attributes:
            - `checksum`
            - the absolute temporary `path` as a `pathlib.Path` object
            - [`info`][anystore.model.Stats] object
    """
    with self.local_path(key) as path:
        with path.open("rb") as fh:
            checksum = make_checksum(fh, algorithm or DEFAULT_HASH_ALGORITHM)
            fh.seek(0)
            yield VirtualIO(fh, checksum=checksum, path=path, info=self.info(key))

local_path(key)

Download the resource at key for temporary local processing and get its local path. If the file itself is already on the local filesystem, the actual file will be used. The file is cleaned up when leaving the context, unless it was a local file, it won't be deleted in any case.

Example
store = get_store("s3://bucket")
with store.local_path("data.json") as path:
    do_something(path)

Yields: The absolute temporary path as a pathlib.Path object

Source code in anystore/store/base.py
@contextlib.contextmanager
def local_path(self, key: Uri) -> Generator[Path, None, None]:
    """
    Download the resource at `key` for temporary local processing and get
    its local path. If the file itself is already on the local filesystem,
    the actual file will be used. The file is cleaned up when leaving the
    context, unless it was a local file, it won't be deleted in any case.

    Example:
        ```python
        store = get_store("s3://bucket")
        with store.local_path("data.json") as path:
            do_something(path)
        ```
    Yields:
        The absolute temporary `path` as a `pathlib.Path` object
    """
    uri = self._keys.to_fs_key(key)
    tmp = None
    if self.is_local:
        path = uri_to_path(uri)
    else:
        from anystore.store.virtual import get_virtual_store

        tmp = get_virtual_store()
        tmp_store = tmp.__enter__()
        stream_bytes(str(key), self, tmp_store)
        path = uri_to_path(tmp_store._keys.to_fs_key(key))
    try:
        yield path
    finally:
        if tmp is not None:
            tmp.__exit__(None, None, None)

open(key, mode=DEFAULT_MODE, **kwargs)

Open the given key similar to built-in open()

Example
from anystore import get_store

store = get_store()
with store.open("foo/bar.txt") as fh:
    return fh.read()

Parameters:

Name Type Description Default
key Uri

Key relative to store base uri

required
mode str | None

Open mode ("rb", "wb", "r", "w")

DEFAULT_MODE
**kwargs Any

Pass through arguments to backend

{}

Returns:

Type Description
ContextManager[IO]

The open handler

Source code in anystore/store/base.py
def open(
    self, key: Uri, mode: str | None = DEFAULT_MODE, **kwargs: Any
) -> ContextManager[IO]:
    """
    Open the given key similar to built-in `open()`

    Example:
        ```python
        from anystore import get_store

        store = get_store()
        with store.open("foo/bar.txt") as fh:
            return fh.read()
        ```

    Args:
        key: Key relative to store base uri
        mode: Open mode ("rb", "wb", "r", "w")
        **kwargs: Pass through arguments to backend

    Returns:
        The open handler
    """
    mode = mode or DEFAULT_MODE
    kwargs = self.ensure_kwargs(**kwargs)
    key = self._keys.to_fs_key(key)
    if "w" in mode:
        self.ensure_parent(key)
    return self._fs.open(key, mode=mode, **kwargs)

pop(key, raise_on_nonexist=None, serialization_mode=None, deserialization_func=None, model=None, **kwargs)

pop(
    key: Uri,
    raise_on_nonexist: Literal[True],
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> V
pop(
    key: Uri,
    raise_on_nonexist: Literal[False],
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> V | None
pop(
    key: Uri,
    raise_on_nonexist: None = None,
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> V
pop(
    key: Uri,
    raise_on_nonexist: None = None,
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> V | None

Retrieve the value for the given key and remove it from the store.

Parameters:

Name Type Description Default
key Uri

Key relative to store base uri

required
raise_on_nonexist bool | None

Raise DoesNotExist if key doesn't exist or stay silent, overrides store settings

None
serialization_mode Mode | None

Serialize result ("auto", "raw", "pickle", "json"), overrides store settings

None
deserialization_func Callable | None

Specific function to use (ignores serialization_mode), overrides store settings

None
model Model | None

Pydantic serialization model (ignores serialization_mode and deserialization_func), overrides store settings

None
**kwargs

Any valid arguments for the stores get function

{}

Returns:

Type Description
V | None

The (optionally serialized) value for the key

Source code in anystore/store/base.py
def pop(
    self,
    key: Uri,
    raise_on_nonexist: bool | None = None,
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs,
) -> V | None:
    """
    Retrieve the value for the given key and remove it from the store.

    Args:
        key: Key relative to store base uri
        raise_on_nonexist: Raise `DoesNotExist` if key doesn't exist or stay
            silent, overrides store settings
        serialization_mode: Serialize result ("auto", "raw", "pickle",
            "json"), overrides store settings
        deserialization_func: Specific function to use (ignores
            `serialization_mode`), overrides store settings
        model: Pydantic serialization model (ignores `serialization_mode`
            and `deserialization_func`), overrides store settings
        **kwargs: Any valid arguments for the stores `get` function

    Returns:
        The (optionally serialized) value for the key
    """
    value = self.get(
        key,
        raise_on_nonexist=raise_on_nonexist,
        serialization_mode=serialization_mode,
        deserialization_func=deserialization_func,
        model=model,
        **kwargs,
    )
    self.delete(key)
    return value

put(key, value, serialization_mode=None, serialization_func=None, model=None, ttl=None, **kwargs)

Store a value at the given key

Parameters:

Name Type Description Default
key Uri

Key relative to store base uri

required
value V

The content

required
serialization_mode Mode | None

Serialize value prior to storing ("auto", "raw", "pickle", "json"), overrides store settings

None
serialization_func Callable | None

Specific function to use (ignores serialization_mode), overrides store settings

None
model Model | None

Pydantic serialization model (ignores serialization_mode and deserialization_func), overrides store settings

None
ttl int | None

Time to live (in seconds) for that key if the backend supports it (e.g. redis, sql)

None
Source code in anystore/store/base.py
def put(
    self,
    key: Uri,
    value: V,
    serialization_mode: Mode | None = None,
    serialization_func: Callable | None = None,
    model: Model | None = None,
    ttl: int | None = None,
    **kwargs,
):
    """
    Store a value at the given key

    Args:
        key: Key relative to store base uri
        value: The content
        serialization_mode: Serialize value prior to storing ("auto", "raw",
            "pickle", "json"), overrides store settings
        serialization_func: Specific function to use (ignores
            `serialization_mode`), overrides store settings
        model: Pydantic serialization model (ignores `serialization_mode`
            and `deserialization_func`), overrides store settings
        ttl: Time to live (in seconds) for that key if the backend supports
            it (e.g. redis, sql)
    """
    if value is None and not self.store_none_values:
        return
    serialization_mode = serialization_mode or self.serialization_mode
    serialization_func = serialization_func or self.serialization_func
    model = model or self.model
    kwargs = self.ensure_kwargs(**kwargs)
    ttl = ttl or self.default_ttl or None
    key = self._keys.to_fs_key(key)
    self.ensure_parent(key)
    with self._fs.open(key, "wb", ttl=ttl) as o:
        o.write(
            to_store(
                value,
                serialization_mode,
                serialization_func=serialization_func,
                model=model,
            )
        )

stream(key, raise_on_nonexist=None, serialization_mode=None, deserialization_func=None, model=None, **kwargs)

stream(
    key: Uri,
    raise_on_nonexist: Literal[True],
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> Generator[V, None, None]
stream(
    key: Uri,
    raise_on_nonexist: Literal[False],
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> Generator[V, None, None] | None
stream(
    key: Uri,
    raise_on_nonexist: None = None,
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> Generator[V, None, None]
stream(
    key: Uri,
    raise_on_nonexist: None = None,
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs
) -> Generator[V, None, None] | None

Stream a value line by line from the store for the given key

Parameters:

Name Type Description Default
key Uri

Key relative to store base uri

required
raise_on_nonexist bool | None

Raise DoesNotExist if key doesn't exist or stay silent, overrides store settings

None
serialization_mode Mode | None

Serialize result ("auto", "raw", "pickle", "json"), overrides store settings

None
deserialization_func Callable | None

Specific function to use (ignores serialization_mode), overrides store settings

None
model Model | None

Pydantic serialization model (ignores serialization_mode and deserialization_func), overrides store settings

None

Yields:

Type Description
Generator[V, None, None] | None

The (optionally serialized) values line by line

Raises:

Type Description
DoesNotExists

If key doesn't exist and raise_on_nonexist=True

Source code in anystore/store/base.py
def stream(
    self,
    key: Uri,
    raise_on_nonexist: bool | None = None,
    serialization_mode: Mode | None = None,
    deserialization_func: Callable | None = None,
    model: Model | None = None,
    **kwargs,
) -> Generator[V, None, None] | None:
    """
    Stream a value line by line from the store for the given key

    Args:
        key: Key relative to store base uri
        raise_on_nonexist: Raise `DoesNotExist` if key doesn't exist or stay
            silent, overrides store settings
        serialization_mode: Serialize result ("auto", "raw", "pickle",
            "json"), overrides store settings
        deserialization_func: Specific function to use (ignores
            `serialization_mode`), overrides store settings
        model: Pydantic serialization model (ignores `serialization_mode`
            and `deserialization_func`), overrides store settings

    Yields:
        The (optionally serialized) values line by line

    Raises:
        anystore.exceptions.DoesNotExists: If key doesn't exist and
            raise_on_nonexist=True
    """
    model = model or self.model
    extra_kwargs = {
        "serialization_mode": serialization_mode or self.serialization_mode,
        "deserialization_func": deserialization_func or self.deserialization_func,
        "model": model,
    }
    try:
        with self.open(key) as i:
            for line in iter_lines(i):
                yield from_store(line, **extra_kwargs)
    except FileNotFoundError:
        if raise_on_nonexist is True or self.raise_on_nonexist:
            raise DoesNotExist(f"Key does not exist: `{key}`")
        return None

touch(key, **kwargs)

Store the current timestamp at the given key

Parameters:

Name Type Description Default
key Uri

Key relative to store base uri

required
**kwargs Any

Any valid arguments for the stores put function

{}

Returns:

Type Description
datetime

The timestamp

Source code in anystore/store/base.py
def touch(self, key: Uri, **kwargs: Any) -> datetime:
    """
    Store the current timestamp at the given key

    Args:
        key: Key relative to store base uri
        **kwargs: Any valid arguments for the stores `put` function

    Returns:
        The timestamp
    """
    now = datetime.now(timezone.utc)
    self.put(key, now, **kwargs)
    return now