Source code for litestar.stores.file

from __future__ import annotations

import os
import shutil
import unicodedata
from tempfile import mkstemp
from typing import TYPE_CHECKING

from anyio import Path

from litestar.concurrency import sync_to_thread

from .base import NamespacedStore, StorageObject

__all__ = ("FileStore",)


if TYPE_CHECKING:
    from datetime import timedelta
    from os import PathLike


def _safe_file_name(name: str) -> str:
    name = unicodedata.normalize("NFKD", name)
    return "".join(c if c.isalnum() else str(ord(c)) for c in name)


[docs] class FileStore(NamespacedStore): """File based, thread and process safe, asynchronous key/value store.""" __slots__ = {"create_directories": "flag to create directories in path", "path": "file path"}
[docs] def __init__(self, path: PathLike[str], *, create_directories: bool = False) -> None: """Initialize ``FileStorage``. Args: path: Path to store data under create_directories: Create the directories in ``path`` if they don't exist Default: ``False`` .. versionadded:: 2.9.0 """ self.path = Path(path) self.create_directories = create_directories
async def __aenter__(self) -> None: if self.create_directories: await self.path.mkdir(exist_ok=True, parents=True) return
[docs] def with_namespace(self, namespace: str) -> FileStore: """Return a new instance of :class:`FileStore`, using a sub-path of the current store's path.""" if not namespace.isalnum(): raise ValueError(f"Invalid namespace: {namespace!r}") return FileStore(self.path / namespace)
def _path_from_key(self, key: str) -> Path: return self.path / _safe_file_name(key) @staticmethod async def _load_from_path(path: Path) -> StorageObject | None: try: data = await path.read_bytes() return StorageObject.from_bytes(data) except FileNotFoundError: return None def _write_sync(self, target_file: Path, storage_obj: StorageObject) -> None: try: tmp_file_fd, tmp_file_name = mkstemp(dir=self.path, prefix=f"{target_file.name}.tmp") renamed = False try: try: os.write(tmp_file_fd, storage_obj.to_bytes()) finally: os.close(tmp_file_fd) os.replace(tmp_file_name, target_file) # noqa: PTH105 renamed = True finally: if not renamed: os.unlink(tmp_file_name) # noqa: PTH108 except OSError: pass async def _write(self, target_file: Path, storage_obj: StorageObject) -> None: await sync_to_thread(self._write_sync, target_file, storage_obj)
[docs] async def set(self, key: str, value: str | bytes, expires_in: int | timedelta | None = None) -> None: """Set a value. Args: key: Key to associate the value with value: Value to store expires_in: Time in seconds before the key is considered expired Returns: ``None`` """ await self.path.mkdir(exist_ok=True) path = self._path_from_key(key) if isinstance(value, str): value = value.encode("utf-8") storage_obj = StorageObject.new(data=value, expires_in=expires_in) await self._write(path, storage_obj)
[docs] async def get(self, key: str, renew_for: int | timedelta | None = None) -> bytes | None: """Get a value. Args: key: Key associated with the value renew_for: If given and the value had an initial expiry time set, renew the expiry time for ``renew_for`` seconds. If the value has not been set with an expiry time this is a no-op Returns: The value associated with ``key`` if it exists and is not expired, else ``None`` """ path = self._path_from_key(key) storage_obj = await self._load_from_path(path) if not storage_obj: return None if storage_obj.expired: await path.unlink(missing_ok=True) return None if renew_for and storage_obj.expires_at: await self.set(key, value=storage_obj.data, expires_in=renew_for) return storage_obj.data
[docs] async def delete(self, key: str) -> None: """Delete a value. If no such key exists, this is a no-op. Args: key: Key of the value to delete """ path = self._path_from_key(key) await path.unlink(missing_ok=True)
[docs] async def delete_all(self) -> None: """Delete all stored values. Note: This deletes and recreates :attr:`FileStore.path` """ await sync_to_thread(shutil.rmtree, self.path) await self.path.mkdir(exist_ok=True)
[docs] async def delete_expired(self) -> None: """Delete expired items. Since expired items are normally only cleared on access (i.e. when calling :meth:`.get`), this method should be called in regular intervals to free disk space. """ async for file in self.path.iterdir(): wrapper = await self._load_from_path(file) if wrapper and wrapper.expired: await file.unlink(missing_ok=True)
[docs] async def exists(self, key: str) -> bool: """Check if a given ``key`` exists.""" path = self._path_from_key(key) return await path.exists()
[docs] async def expires_in(self, key: str) -> int | None: """Get the time in seconds ``key`` expires in. If no such ``key`` exists or no expiry time was set, return ``None``. """ if storage_obj := await self._load_from_path(self._path_from_key(key)): return storage_obj.expires_in return None