Source code for litestar.response.file

from __future__ import annotations

import itertools
from email.utils import formatdate
from mimetypes import encodings_map, guess_type
from typing import TYPE_CHECKING, Literal
from urllib.parse import quote
from zlib import adler32

from litestar.constants import ONE_MEGABYTE
from litestar.exceptions import ImproperlyConfiguredException
from litestar.file_system import (
    BaseFileSystem,
    FileSystemRegistry,
    maybe_wrap_fsspec_file_system,
)
from litestar.response.base import Response
from litestar.response.streaming import ASGIStreamingResponse
from litestar.utils.helpers import get_enum_string_value

if TYPE_CHECKING:
    from collections.abc import Iterable
    from os import PathLike

    from anyio import Path
    from fsspec import AbstractFileSystem
    from fsspec.asyn import AsyncFileSystem as AbstractAsyncFileSystem

    from litestar.background_tasks import BackgroundTask, BackgroundTasks
    from litestar.connection import Request
    from litestar.datastructures.cookie import Cookie
    from litestar.datastructures.headers import ETag
    from litestar.enums import MediaType
    from litestar.file_system import FileInfo
    from litestar.types import (
        HTTPResponseBodyEvent,
        PathType,
        Receive,
        ResponseCookies,
        ResponseHeaders,
        Send,
        TypeEncodersMap,
    )

__all__ = (
    "ASGIFileResponse",
    "File",
    "create_etag_for_file",
)

# brotli not supported in 'mimetypes.encodings_map' until py 3.9.
encodings_map[".br"] = "br"


[docs] def create_etag_for_file(path: PathType, modified_time: float | None, file_size: int) -> str: """Create an etag. Notes: - Function is derived from flask. Returns: An etag. """ check = adler32(str(path).encode("utf-8")) & 0xFFFFFFFF parts = [str(file_size), str(check)] if modified_time: parts.insert(0, str(modified_time)) return f'"{"-".join(parts)}"'
[docs] class ASGIFileResponse(ASGIStreamingResponse): """A low-level ASGI response, streaming a file as response body."""
[docs] def __init__( self, *, background: BackgroundTask | BackgroundTasks | None = None, chunk_size: int = ONE_MEGABYTE, content_disposition_type: Literal["attachment", "inline"] = "attachment", content_length: int | None = None, cookies: Iterable[Cookie] | None = None, encoding: str = "utf-8", etag: ETag | None = None, file_info: FileInfo | None = None, file_path: str | PathLike | Path, file_system: BaseFileSystem, filename: str = "", headers: dict[str, str] | None = None, is_head_response: bool = False, media_type: MediaType | str | None = None, status_code: int | None = None, ) -> None: """A low-level ASGI response, streaming a file as response body. Args: background: A background task or a list of background tasks to be executed after the response is sent. chunk_size: The chunk size to use. content_disposition_type: The type of the ``Content-Disposition``. Either ``inline`` or ``attachment``. content_length: The response content length. cookies: The response cookies. encoding: The response encoding. etag: An etag. file_info: A file info. file_path: A path to a file. file_system: A file system adapter. filename: The name of the file. headers: A dictionary of headers. headers: The response headers. is_head_response: A boolean indicating if the response is a HEAD response. media_type: The media type of the file. status_code: The response status code. """ headers = headers or {} if not media_type: mimetype, content_encoding = guess_type(filename) if filename else (None, None) media_type = mimetype or "application/octet-stream" if content_encoding is not None: headers.update({"content-encoding": content_encoding}) self._file_system = file_system super().__init__( iterator=iter(b""), headers=headers, media_type=media_type, cookies=cookies, background=background, status_code=status_code, content_length=content_length, encoding=encoding, is_head_response=is_head_response, ) quoted_filename = quote(filename) is_utf8 = quoted_filename == filename if is_utf8: content_disposition = f'{content_disposition_type}; filename="{filename}"' else: content_disposition = f"{content_disposition_type}; filename*=utf-8''{quoted_filename}" self.headers.setdefault("content-disposition", content_disposition) self.chunk_size = chunk_size self.etag = etag self.file_path = file_path self.file_info = file_info
[docs] async def send_body(self, send: Send, receive: Receive) -> None: """Emit a stream of events correlating with the response body. Args: send: The ASGI send function. receive: The ASGI receive function. Returns: None """ if self.content_length < self.chunk_size: # no need to chunk and stream; read and send the whole thing in one go body_event: HTTPResponseBodyEvent = { "type": "http.response.body", "body": await self._file_system.read_bytes(self.file_path), "more_body": False, } await send(body_event) else: self.iterator = self._file_system.iter(self.file_path, chunksize=self.chunk_size) await super().send_body(send=send, receive=receive)
[docs] async def start_response(self, send: Send) -> None: """Emit the start event of the response. This event includes the headers and status codes. Args: send: The ASGI send function. Returns: None """ try: if self.file_info is None: file_info = await self._file_system.info(self.file_path) else: file_info = self.file_info except FileNotFoundError as e: raise ImproperlyConfiguredException(f"{self.file_path} does not exist") from e if file_info["type"] != "file": raise ImproperlyConfiguredException(f"{self.file_path} is not a file") self.content_length = file_info["size"] self.headers.setdefault("content-length", str(self.content_length)) mtime = file_info.get("mtime") if mtime is not None: self.headers.setdefault("last-modified", formatdate(mtime, usegmt=True)) if self.etag: self.headers.setdefault("etag", self.etag.to_header()) else: self.headers.setdefault( "etag", create_etag_for_file( path=self.file_path, modified_time=mtime, file_size=file_info["size"], ), ) await super().start_response(send=send)
[docs] class File(Response): """A response, streaming a file as response body.""" __slots__ = ( "chunk_size", "content_disposition_type", "etag", "file_info", "file_path", "file_system", "filename", )
[docs] def __init__( self, path: str | PathLike | Path, *, background: BackgroundTask | BackgroundTasks | None = None, chunk_size: int = ONE_MEGABYTE, content_disposition_type: Literal["attachment", "inline"] = "attachment", cookies: ResponseCookies | None = None, encoding: str = "utf-8", etag: ETag | None = None, file_info: FileInfo | None = None, file_system: str | BaseFileSystem | AbstractFileSystem | AbstractAsyncFileSystem | None = None, filename: str | None = None, headers: ResponseHeaders | None = None, media_type: Literal[MediaType.TEXT] | str | None = None, status_code: int | None = None, ) -> None: """Send a file from a file system. Args: path: A file path in one of the supported formats. background: A :class:`BackgroundTask <.background_tasks.BackgroundTask>` instance or :class:`BackgroundTasks <.background_tasks.BackgroundTasks>` to execute after the response is finished. Defaults to None. chunk_size: The chunk sizes to use when streaming the file. Defaults to 1MB. content_disposition_type: The type of the ``Content-Disposition``. Either ``inline`` or ``attachment``. cookies: A list of :class:`Cookie <.datastructures.Cookie>` instances to be set under the response ``Set-Cookie`` header. encoding: The encoding to be used for the response headers. etag: An optional :class:`ETag <.datastructures.ETag>` instance. If not provided, an etag will be generated. file_info: The output of calling :meth:`file_system.info <litestar.file_system.BaseFileSystem.info>` file_system: The file system to load the file from. Instances of :class:`~litestar.file_system.BaseFileSystem`, :class:`fsspec.spec.AbstractFileSystem`, :class:`fsspec.asyn.AsyncFileSystem` will be used directly. If passed string, use it to look up the corresponding file system from the :class:`~litestar.file_system.FileSystemRegistry`. If not given, the file will be loaded from :attr:`~litestar.file_system.FileSystemRegistry.default` filename: An optional filename to set in the header. headers: A string keyed dictionary of response headers. Header keys are insensitive. media_type: A value for the response ``Content-Type`` header. If not provided, the value will be either derived from the filename if provided and supported by the stdlib, or will default to ``application/octet-stream``. status_code: An HTTP status code. """ self.chunk_size = chunk_size self.content_disposition_type = content_disposition_type self.etag = etag self.file_info = file_info self.file_path = path self.file_system = file_system self.filename = filename or "" super().__init__( content=None, status_code=status_code, media_type=media_type, background=background, headers=headers, cookies=cookies, encoding=encoding, )
[docs] def to_asgi_response( self, request: Request, *, background: BackgroundTask | BackgroundTasks | None = None, cookies: Iterable[Cookie] | None = None, headers: dict[str, str] | None = None, is_head_response: bool = False, media_type: MediaType | str | None = None, status_code: int | None = None, type_encoders: TypeEncodersMap | None = None, ) -> ASGIFileResponse: """Create an :class:`ASGIFileResponse <litestar.response.file.ASGIFileResponse>` instance. Args: background: Background task(s) to be executed after the response is sent. cookies: A list of cookies to be set on the response. headers: Additional headers to be merged with the response headers. Response headers take precedence. is_head_response: Whether the response is a HEAD response. media_type: Media type for the response. If ``media_type`` is already set on the response, this is ignored. request: The :class:`Request <.connection.Request>` instance. status_code: Status code for the response. If ``status_code`` is already set on the response, this is type_encoders: A dictionary of type encoders to use for encoding the response content. Returns: A low-level ASGI file response. """ headers = {**headers, **self.headers} if headers is not None else self.headers cookies = self.cookies if cookies is None else itertools.chain(self.cookies, cookies) media_type = self.media_type or media_type if media_type is not None: media_type = get_enum_string_value(media_type) file_system: BaseFileSystem if self.file_system is None: file_system = request.app.plugins.get(FileSystemRegistry).default elif isinstance(self.file_system, str): file_system_plugin = request.app.plugins.get(FileSystemRegistry) file_system = file_system_plugin[self.file_system] else: file_system = maybe_wrap_fsspec_file_system(self.file_system) return ASGIFileResponse( file_path=self.file_path, file_system=file_system, filename=self.filename, background=self.background or background, chunk_size=self.chunk_size, content_disposition_type=self.content_disposition_type, # pyright: ignore content_length=0, cookies=cookies, encoding=self.encoding, etag=self.etag, file_info=self.file_info, headers=headers, is_head_response=is_head_response, media_type=media_type, status_code=self.status_code or status_code, )