Source code for litestar.file_system

from __future__ import annotations

import abc
import os
import os.path
import pathlib
from datetime import datetime
from stat import S_ISDIR
from typing import TYPE_CHECKING, Any, Callable, ClassVar, Final, Union, cast

import anyio
from typing_extensions import NotRequired, TypeAlias, TypedDict

from litestar.concurrency import sync_to_thread
from litestar.plugins import InitPlugin

__all__ = (
    "AnyFileSystem",
    "BaseFileSystem",
    "BaseLocalFileSystem",
    "FileInfo",
    "FileSystemRegistry",
    "FsspecAsyncWrapper",
    "FsspecSyncWrapper",
    "LinkableFileSystem",
    "get_fsspec_mtime_equivalent",
    "maybe_wrap_fsspec_file_system",
)

if TYPE_CHECKING:
    import io
    from collections.abc import AsyncGenerator, Awaitable, Mapping

    from fsspec import AbstractFileSystem as FsspecFileSystem
    from fsspec.asyn import AsyncFileSystem as FsspecAsyncFileSystem

    from litestar.types import PathType


AnyFileSystem: TypeAlias = "Union[BaseFileSystem, FsspecFileSystem, FsspecAsyncFileSystem]"
SymlinkResolver: TypeAlias = "Callable[[AnyFileSystem, PathType], Awaitable[str]]"


[docs] class FileInfo(TypedDict): """File information gathered from a file system.""" is_symlink: NotRequired[bool | None] """'True' if the file is a symbolic link. 'None' if the file system does not support symlinks""" mtime: NotRequired[float] """Modified time stamp.""" name: str """The path of the file.""" size: int """Total size, in bytes.""" type: str """The type of the file system object."""
[docs] class BaseFileSystem(abc.ABC):
[docs] @abc.abstractmethod async def info(self, path: PathType, **kwargs: Any) -> FileInfo: """Return a :class:`~litestar.file_system.FileInfo` for the given ``path``"""
[docs] @abc.abstractmethod async def read_bytes( self, path: PathType, start: int = 0, end: int = -1, ) -> bytes: """Read bytes from ``path`` Args: path: File to read from start: Offset to start reading from end: Offset to stop reading at (inclusive) """
[docs] @abc.abstractmethod def iter( self, path: PathType, chunksize: int, start: int = 0, end: int = -1, ) -> AsyncGenerator[bytes, None]: """Stream bytes from ``path`` Args: path: Path to read from chunksize: Number of bytes to read per chunk start: Offset to start reading from end: Offset to stop reading at (inclusive) """
[docs] class LinkableFileSystem(BaseFileSystem, abc.ABC): """A file system that supports symlinks""" _registry: ClassVar[dict[type[AnyFileSystem], SymlinkResolver]] = {}
[docs] @classmethod def register_as_linkable(cls, fs_type: type[AnyFileSystem], resolver: SymlinkResolver) -> None: """Register a file system as 'linkable', i.e. supporting symlinks, that does not itself implement :class:`~litestar.file_system.LinkableFileSystem`. By default, :class:`fsspec.implementations.local.LocalFileSystem` is registered. Args: fs_type: The file system type resolver: A function that takes in a path and returns an absolute path with all symlinks resolved """ cls._registry[fs_type] = resolver
try: from fsspec.implementations.local import LocalFileSystem as FsspecLocalFileSystem async def _resolve_symlink_fsspec_local(fs: AnyFileSystem, path: PathType) -> str: return os.path.realpath(path) LinkableFileSystem.register_as_linkable(FsspecLocalFileSystem, _resolve_symlink_fsspec_local) except ImportError: pass
[docs] class BaseLocalFileSystem(LinkableFileSystem): """Base class for a local file system.""" def _info(self, path: PathType) -> FileInfo: """Return a :class:`~litestar.file_system.FileInfo` for the given ``path``""" path = pathlib.Path(path) result = path.stat() return { "is_symlink": path.is_symlink(), "mtime": result.st_mtime, "name": str(path), "size": result.st_size, "type": "directory" if S_ISDIR(result.st_mode) else "file", }
[docs] async def info(self, path: PathType, **kwargs: Any) -> FileInfo: """Return a :class:`~litestar.file_system.FileInfo` for the given ``path``""" return await sync_to_thread(self._info, path)
[docs] async def read_bytes( self, path: PathType, start: int = 0, end: int = -1, ) -> bytes: """Read bytes from ``path`` Args: path: File to read from start: Offset to start reading from end: Offset to stop reading at (inclusive) """ # simple case; read the whole thing in one go if start == 0 and end == -1: return await sync_to_thread(pathlib.Path(path).read_bytes) # we have either a start or end set, so open explicitly and then seek / read fh: anyio.AsyncFile if end != -1: end = end - start async with await anyio.open_file(path, mode="rb") as fh: if start: await fh.seek(start) return await fh.read(end)
[docs] async def iter( self, path: PathType, chunksize: int, start: int = 0, end: int = -1, ) -> AsyncGenerator[bytes, None]: """Stream bytes from ``path`` Args: path: Path to read from chunksize: Number of bytes to read per chunk start: Offset to start reading from end: Offset to stop reading at (inclusive) """ fh: anyio.AsyncFile async with await anyio.open_file(path, mode="rb") as fh: if start != 0: await fh.seek(start) current_pos = start while True: to_read = chunksize if end != -1: if current_pos >= end: break to_read = min(chunksize, end - current_pos) chunk = await fh.read(to_read) if not chunk: break yield chunk current_pos = await fh.tell()
[docs] class FsspecSyncWrapper(BaseFileSystem):
[docs] def __init__(self, fs: FsspecFileSystem) -> None: """Wrap a :class:`fsspec.spec.AbstractFileSystem` to provide a :class:`litestar.file_system.BaseFileSystem` compatible interface """ self.wrapped_fs = fs
[docs] async def info(self, path: PathType, **kwargs: Any) -> FileInfo: """Return a :class:`~litestar.file_system.FileInfo` for the given ``path``. Return info verbatim from :meth:`fsspec.spec.AbstractFileSystem.info`, but try to patch 'mtime' using :func:`litestar.file_system.get_fsspec_mtime_equivalent`. """ result: dict[str, Any] = await sync_to_thread(self.wrapped_fs.info, str(path), **kwargs) result["mtime"] = get_fsspec_mtime_equivalent(result) result.setdefault("is_symlink", result.get("islink")) return cast("FileInfo", result)
[docs] async def read_bytes( self, path: PathType, start: int = 0, end: int = -1, ) -> bytes: """Read bytes from ``path`` using :meth:`fsspec.spec.AbstractFileSystem.cat_file` Args: path: File to read from start: Offset to start reading from end: Offset to stop reading at (inclusive) """ return await sync_to_thread( self.wrapped_fs.cat_file, # pyright: ignore str(path), start=start or None, end=end if end != -1 else None, )
[docs] async def iter( self, path: PathType, chunksize: int, start: int = 0, end: int = -1, ) -> AsyncGenerator[bytes, None]: """Stream bytes from ``path`` using :meth:`fsspec.spec.AbstractFileSystem.open` Args: path: Path to read from chunksize: Number of bytes to read per chunk start: Offset to start reading from end: Offset to stop reading at (inclusive) """ fh = cast("io.BytesIO", await sync_to_thread(self.wrapped_fs.open, str(path), mode="rb")) try: if start != 0: await sync_to_thread(fh.seek, start) to_read = chunksize while True: if end != -1: current_pos = await sync_to_thread(fh.tell) if current_pos >= end: break to_read = min(chunksize, end - current_pos) chunk = await sync_to_thread(fh.read, to_read) if not chunk: break yield chunk finally: await sync_to_thread(fh.close)
[docs] class FsspecAsyncWrapper(BaseFileSystem):
[docs] def __init__(self, fs: FsspecAsyncFileSystem) -> None: """Wrap a :class:`fsspec.asyn.AsyncFileSystem` to provide a class:`litestar.file_system.BaseFileSystem` compatible interface Args: fs: An :class:`fsspec.asyn.AsyncFileSystem` instantiated with ` `asynchronous=True`` """ import fsspec.asyn self.wrapped_fs = fs # 'open_async' may not be implemented self._supports_open_async = type(fs).open_async is not fsspec.asyn.AsyncFileSystem.open_async
[docs] async def info(self, path: PathType, **kwargs: Any) -> FileInfo: """Return a :class:`~litestar.file_system.FileInfo` for the given ``path``. Return info verbatim from ``fsspec.async.AsyncFileSystem._info``, but try to patch 'mtime' using :func:`litestar.file_system.get_fsspec_mtime_equivalent`. """ result: dict[str, Any] = await self.wrapped_fs._info(str(path), **kwargs) result["mtime"] = get_fsspec_mtime_equivalent(result) result.setdefault("is_symlink", result.get("islink")) return cast("FileInfo", result)
[docs] async def read_bytes( self, path: PathType, start: int = 0, end: int = -1, ) -> bytes: """Read bytes from ``path`` using ``fsspec.asyn.AsyncFileSystem._cat_file`` Args: path: File to read from start: Offset to start reading from end: Offset to stop reading at (inclusive) """ return cast( "bytes", await self.wrapped_fs._cat_file( str(path), start=start or None, end=end if end != -1 else None, ), )
[docs] async def iter(self, path: PathType, chunksize: int, start: int = 0, end: int = -1) -> AsyncGenerator[bytes, None]: """Stream bytes from ``path``. If no offsets are given and the file system implements ``fsspec.async.AsyncFileSystem.async_open``, use ``async_open``, otherwise read chunks manually using ``fsspec.async.AsyncFileSystem._cat_file`` Args: path: Path to read from chunksize: Number of bytes to read per chunk start: Offset to start reading from end: Offset to stop reading at (inclusive) """ path = str(path) # if we want to stream the whole thing we can use '.open_async' if it's # implemented if start == 0 and end == -1 and self._supports_open_async: async with await self.wrapped_fs.open_async(path, mode="rb") as fh: # pyright: ignore while chunk := await fh.read(chunksize): yield chunk return # if 'open_async' is not implemented, or 'start' or 'end' are set, we have to do # a bit more work. # the object returned by open_async()' does not support seek + tell or other # mechanisms for reading specific ranges # https://github.com/fsspec/filesystem_spec/issues/1772 to_read = chunksize current_pos = start while True: if end != -1: if current_pos >= end: break to_read = min(chunksize, end - current_pos) chunk = await self.read_bytes(path, start=current_pos, end=current_pos + to_read) if not chunk: break current_pos += len(chunk) yield chunk
[docs] def maybe_wrap_fsspec_file_system(file_system: AnyFileSystem) -> BaseFileSystem: """If ``file_system`` is an fsspec file system, wrap it in :class:`~litestar.file_system.FsspecSyncWrapper` or :class:`~litestar.file_system.FsspecAsyncWrapper` depending on its type. If fsspec is not installed or ``file_system`` is not an fsspec file system, return ``file_system`` """ try: from fsspec import AbstractFileSystem from fsspec.asyn import AsyncFileSystem except ImportError: return cast("BaseFileSystem", file_system) if isinstance(file_system, AsyncFileSystem): return FsspecAsyncWrapper(file_system) if isinstance(file_system, AbstractFileSystem): return FsspecSyncWrapper(file_system) return file_system
[docs] class FileSystemRegistry(InitPlugin):
[docs] def __init__( self, file_systems: Mapping[str, AnyFileSystem] | None = None, default: AnyFileSystem | None = None, ) -> None: """File system registry Args: file_systems: A mapping of names to file systems default: Default file system to use when no system is specified. If ``None``, default to :class:`~litestar.file_system.BaseLocalFileSystem` """ self._adapters: dict[str, BaseFileSystem] = {} self.register("file", BaseLocalFileSystem()) if file_systems: for scheme, fs in file_systems.items(): self.register(scheme, fs) if default is not None: self._default = maybe_wrap_fsspec_file_system(default) else: self._default = BaseLocalFileSystem()
@property def default(self) -> BaseFileSystem: """Return the default file system""" return self._default def __getitem__(self, name: str) -> BaseFileSystem: return self._adapters[name]
[docs] def get(self, name: str) -> BaseFileSystem | None: """Return the file system registered under ``name``. If no matching file system is found, return ``None`` """ return self._adapters.get(name)
[docs] def register(self, name: str, fs: AnyFileSystem) -> None: """Register a file system ``fs`` under ``name``. If a file system was previously registered under this name, it will be overwritten """ self._adapters[name] = maybe_wrap_fsspec_file_system(fs)
_MTIME_KEYS: Final = ( "mtime", "ctime", "Last-Modified", "updated_at", "modification_time", "last_changed", "change_time", "last_modified", "last_updated", "timestamp", )
[docs] def get_fsspec_mtime_equivalent(info: dict[str, Any]) -> float | None: """Return the 'mtime' or equivalent for different fsspec implementations, since they are not standardized. See https://github.com/fsspec/filesystem_spec/issues/526. """ # inspired by https://github.com/mdshw5/pyfaidx/blob/cac82f24e9c4e334cf87a92e477b92d4615d260f/pyfaidx/__init__.py#L1318-L1345 mtime: Any | None = next((info[key] for key in _MTIME_KEYS if key in info), None) if mtime is None or isinstance(mtime, float): return mtime if isinstance(mtime, datetime): return mtime.timestamp() if isinstance(mtime, str): return datetime.fromisoformat(mtime.replace("Z", "+00:00")).timestamp() raise ValueError(f"Unsupported mtime-type value type {type(mtime)!r}")