from __future__ import annotations
from abc import ABC, abstractmethod
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING
from msgspec import Struct
from msgspec.msgpack import decode as msgpack_decode
from msgspec.msgpack import encode as msgpack_encode
if TYPE_CHECKING:
from types import TracebackType
from typing_extensions import Self
__all__ = ("NamespacedStore", "StorageObject", "Store")
[docs]
class Store(ABC):
"""Thread and process safe asynchronous key/value store."""
__slots__ = ()
[docs]
@abstractmethod
async def set(self, key: str, value: str | bytes, expires_in: int | timedelta | None = None) -> None:
"""Set a value.
Args:
key: Key to associate the value with
value: Value to store
expires_in: Time in seconds before the key is considered expired
Returns:
``None``
"""
raise NotImplementedError
[docs]
@abstractmethod
async def get(self, key: str, renew_for: int | timedelta | None = None) -> bytes | None:
"""Get a value.
Args:
key: Key associated with the value
renew_for: If given and the value had an initial expiry time set, renew the
expiry time for ``renew_for`` seconds. If the value has not been set
with an expiry time this is a no-op
Returns:
The value associated with ``key`` if it exists and is not expired, else
``None``
"""
raise NotImplementedError
[docs]
@abstractmethod
async def delete(self, key: str) -> None:
"""Delete a value.
If no such key exists, this is a no-op.
Args:
key: Key of the value to delete
"""
raise NotImplementedError
[docs]
@abstractmethod
async def delete_all(self) -> None:
"""Delete all stored values."""
raise NotImplementedError
[docs]
@abstractmethod
async def exists(self, key: str) -> bool:
"""Check if a given ``key`` exists."""
raise NotImplementedError
[docs]
@abstractmethod
async def expires_in(self, key: str) -> int | None:
"""Get the time in seconds ``key`` expires in. If no such ``key`` exists or no
expiry time was set, return ``None``.
"""
raise NotImplementedError
async def __aenter__(self) -> None: # noqa: B027
pass
async def __aexit__( # noqa: B027
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
pass
[docs]
class NamespacedStore(Store):
"""A subclass of :class:`Store`, offering hierarchical namespacing.
Bulk actions on a parent namespace should affect all child namespaces, whereas other operations on all namespaces
should be isolated.
"""
__slots__ = ("namespace",)
[docs]
@abstractmethod
def with_namespace(self, namespace: str) -> Self:
"""Return a new instance of :class:`NamespacedStore`, which exists in a child namespace of the current namespace.
Bulk actions on the parent namespace should affect all child namespaces, whereas other operations on all
namespaces should be isolated.
"""
[docs]
class StorageObject(Struct):
""":class:`msgspec.Struct` to store serialized data alongside with their expiry time."""
expires_at: datetime | None
data: bytes
[docs]
@classmethod
def new(cls, data: bytes, expires_in: int | timedelta | None) -> StorageObject:
"""Construct a new :class:`StorageObject` instance."""
if expires_in is not None and not isinstance(expires_in, timedelta):
expires_in = timedelta(seconds=expires_in)
return cls(
data=data,
expires_at=(datetime.now(tz=timezone.utc) + expires_in) if expires_in else None,
)
@property
def expired(self) -> bool:
"""Return if the :class:`StorageObject` is expired"""
return self.expires_at is not None and datetime.now(tz=timezone.utc) >= self.expires_at
@property
def expires_in(self) -> int:
"""Return the expiry time of this ``StorageObject`` in seconds. If no expiry time
was set, return ``-1``.
"""
if self.expires_at:
return int(self.expires_at.timestamp() - datetime.now(tz=timezone.utc).timestamp())
return -1
[docs]
def to_bytes(self) -> bytes:
"""Encode the instance to bytes"""
return msgpack_encode(self)
[docs]
@classmethod
def from_bytes(cls, raw: bytes) -> StorageObject:
"""Load a previously encoded with :meth:`StorageObject.to_bytes`"""
return msgpack_decode(raw, type=cls)