from __future__ import annotations
import os
import shutil
import unicodedata
from tempfile import mkstemp
from typing import TYPE_CHECKING
from anyio import Path
from litestar.concurrency import sync_to_thread
from .base import NamespacedStore, StorageObject
__all__ = ("FileStore",)
if TYPE_CHECKING:
from datetime import timedelta
from os import PathLike
def _safe_file_name(name: str) -> str:
name = unicodedata.normalize("NFKD", name)
return "".join(c if c.isalnum() else str(ord(c)) for c in name)
[docs]
class FileStore(NamespacedStore):
"""File based, thread and process safe, asynchronous key/value store."""
__slots__ = {"create_directories": "flag to create directories in path", "path": "file path"}
[docs]
def __init__(self, path: PathLike[str], *, create_directories: bool = False) -> None:
"""Initialize ``FileStorage``.
Args:
path: Path to store data under
create_directories: Create the directories in ``path`` if they don't exist
Default: ``False``
.. versionadded:: 2.9.0
"""
self.path = Path(path)
self.create_directories = create_directories
async def __aenter__(self) -> None:
if self.create_directories:
await self.path.mkdir(exist_ok=True, parents=True)
return
[docs]
def with_namespace(self, namespace: str) -> FileStore:
"""Return a new instance of :class:`FileStore`, using a sub-path of the current store's path."""
if not namespace.isalnum():
raise ValueError(f"Invalid namespace: {namespace!r}")
return FileStore(self.path / namespace)
def _path_from_key(self, key: str) -> Path:
return self.path / _safe_file_name(key)
@staticmethod
async def _load_from_path(path: Path) -> StorageObject | None:
try:
data = await path.read_bytes()
return StorageObject.from_bytes(data)
except FileNotFoundError:
return None
def _write_sync(self, target_file: Path, storage_obj: StorageObject) -> None:
try:
tmp_file_fd, tmp_file_name = mkstemp(dir=self.path, prefix=f"{target_file.name}.tmp")
renamed = False
try:
try:
os.write(tmp_file_fd, storage_obj.to_bytes())
finally:
os.close(tmp_file_fd)
os.replace(tmp_file_name, target_file) # noqa: PTH105
renamed = True
finally:
if not renamed:
os.unlink(tmp_file_name) # noqa: PTH108
except OSError:
pass
async def _write(self, target_file: Path, storage_obj: StorageObject) -> None:
await sync_to_thread(self._write_sync, target_file, storage_obj)
[docs]
async def set(self, key: str, value: str | bytes, expires_in: int | timedelta | None = None) -> None:
"""Set a value.
Args:
key: Key to associate the value with
value: Value to store
expires_in: Time in seconds before the key is considered expired
Returns:
``None``
"""
await self.path.mkdir(exist_ok=True)
path = self._path_from_key(key)
if isinstance(value, str):
value = value.encode("utf-8")
storage_obj = StorageObject.new(data=value, expires_in=expires_in)
await self._write(path, storage_obj)
[docs]
async def get(self, key: str, renew_for: int | timedelta | None = None) -> bytes | None:
"""Get a value.
Args:
key: Key associated with the value
renew_for: If given and the value had an initial expiry time set, renew the
expiry time for ``renew_for`` seconds. If the value has not been set
with an expiry time this is a no-op
Returns:
The value associated with ``key`` if it exists and is not expired, else
``None``
"""
path = self._path_from_key(key)
storage_obj = await self._load_from_path(path)
if not storage_obj:
return None
if storage_obj.expired:
await path.unlink(missing_ok=True)
return None
if renew_for and storage_obj.expires_at:
await self.set(key, value=storage_obj.data, expires_in=renew_for)
return storage_obj.data
[docs]
async def delete(self, key: str) -> None:
"""Delete a value.
If no such key exists, this is a no-op.
Args:
key: Key of the value to delete
"""
path = self._path_from_key(key)
await path.unlink(missing_ok=True)
[docs]
async def delete_all(self) -> None:
"""Delete all stored values.
Note:
This deletes and recreates :attr:`FileStore.path`
"""
await sync_to_thread(shutil.rmtree, self.path)
await self.path.mkdir(exist_ok=True)
[docs]
async def delete_expired(self) -> None:
"""Delete expired items.
Since expired items are normally only cleared on access (i.e. when calling
:meth:`.get`), this method should be called in regular intervals
to free disk space.
"""
async for file in self.path.iterdir():
wrapper = await self._load_from_path(file)
if wrapper and wrapper.expired:
await file.unlink(missing_ok=True)
[docs]
async def exists(self, key: str) -> bool:
"""Check if a given ``key`` exists."""
path = self._path_from_key(key)
return await path.exists()
[docs]
async def expires_in(self, key: str) -> int | None:
"""Get the time in seconds ``key`` expires in. If no such ``key`` exists or no
expiry time was set, return ``None``.
"""
if storage_obj := await self._load_from_path(self._path_from_key(key)):
return storage_obj.expires_in
return None