Responses#
Litestar allows for several ways in which HTTP responses can be specified and handled, each fitting a different use case. The base pattern though is straightforward - simply return a value from a route handler function and let Litestar take care of the rest:
from pydantic import BaseModel
from litestar import get
class Resource(BaseModel):
id: int
name: str
@get("/resources")
def retrieve_resource() -> Resource:
return Resource(id=1, name="my resource")
In the example above, the route handler function returns an instance of the Resource
pydantic class. This value will
then be used by Litestar to construct an instance of the Response
class using defaults values: the response status code will be set to 200
and it’s Content-Type
header will be set
to application/json
. The Resource
instance will be serialized into JSON and set as the response body.
Media Type#
You do not have to specify the media_type
kwarg in the route handler function if the response should be JSON. But
if you wish to return a response other than JSON, you should specify this value. You can use
the MediaType
enum for this purpose:
from litestar import MediaType, get
@get("/resources", media_type=MediaType.TEXT)
def retrieve_resource() -> str:
return "The rumbling rabbit ran around the rock"
The value of the media_type
kwarg affects both the serialization of response data and the generation of OpenAPI docs.
The above example will cause Litestar to serialize the response as a simple bytes string with a Content-Type
header
value of text/plain
. It will also set the corresponding values in the OpenAPI documentation.
MediaType has the following members:
MediaType.JSON:
application/json
MediaType.MessagePack:
application/x-msgpack
MediaType.TEXT:
text/plain
MediaType.HTML:
text/html
You can also set any IANA referenced media type
string as the media_type
. While this will still affect the OpenAPI generation as expected, you might need to handle
serialization using either a custom response with serializer or by serializing
the value in the route handler function.
JSON responses#
As previously mentioned, the default media_type
is MediaType.JSON
. which supports the following values:
models from libraries that extend pydantic models
container types such as
dict
orlist
containing supported types
If you need to return other values and would like to extend serialization you can do this custom responses.
You can also set an application media type string with the +json
suffix
defined in RFC 6839
as the media_type
and it will be recognized and serialized as json.
For example, you can use application/vnd.example.resource+json
and it will work just like json but have the appropriate content-type header
and show up in the generated OpenAPI schema.
from typing import Any, Dict
import litestar.status_codes
from litestar import Litestar, get
@get(
"/resources",
status_code=litestar.status_codes.HTTP_418_IM_A_TEAPOT,
media_type="application/vnd.example.resource+json",
)
async def retrieve_resource() -> Dict[str, Any]:
return {
"title": "Server thinks it is a teapot",
"type": "Server delusion",
"status": litestar.status_codes.HTTP_418_IM_A_TEAPOT,
}
app = Litestar(route_handlers=[retrieve_resource])
from typing import Any
import litestar.status_codes
from litestar import Litestar, get
@get(
"/resources",
status_code=litestar.status_codes.HTTP_418_IM_A_TEAPOT,
media_type="application/vnd.example.resource+json",
)
async def retrieve_resource() -> dict[str, Any]:
return {
"title": "Server thinks it is a teapot",
"type": "Server delusion",
"status": litestar.status_codes.HTTP_418_IM_A_TEAPOT,
}
app = Litestar(route_handlers=[retrieve_resource])
MessagePack responses#
In addition to JSON, Litestar offers support for the MessagePack format which can be a time and space efficient alternative to JSON.
It supports all the same types as JSON serialization. To send a MessagePack
response,
simply specify the media type as MediaType.MESSAGEPACK
:
from typing import Dict
from litestar import get, MediaType
@get(path="/health-check", media_type=MediaType.MESSAGEPACK)
def health_check() -> Dict[str, str]:
return {"hello": "world"}
from litestar import get, MediaType
@get(path="/health-check", media_type=MediaType.MESSAGEPACK)
def health_check() -> dict[str, str]:
return {"hello": "world"}
Plaintext responses#
For MediaType.TEXT
, route handlers should return a str
or bytes
value:
from litestar import get, MediaType
@get(path="/health-check", media_type=MediaType.TEXT)
def health_check() -> str:
return "healthy"
HTML responses#
For MediaType.HTML
, route handlers should return a str
or bytes
value that contains HTML:
from litestar import get, MediaType
@get(path="/page", media_type=MediaType.HTML)
def health_check() -> str:
return """
<html>
<body>
<div>
<span>Hello World!</span>
</div>
</body>
</html>
"""
Tip
It’s a good idea to use a template engine for more complex HTML responses and to write the template itself in a separate file rather than a string.
Content Negotiation#
If your handler can return data with different media types and you want to use
Content Negotiation
to allow the client to choose which type to return, you can use the
Request.accept
property to
calculate the best matching return media type.
from litestar import Litestar, MediaType, Request, Response, get
@get("/resource", sync_to_thread=False)
def retrieve_resource(request: Request) -> Response[bytes]:
provided_types = [MediaType.TEXT, MediaType.HTML, "application/xml"]
preferred_type = request.accept.best_match(provided_types, default=MediaType.TEXT)
content = None
if preferred_type == MediaType.TEXT:
content = b"Hello World!"
elif preferred_type == MediaType.HTML:
content = b"<h1>Hello World!</h1>"
elif preferred_type == "application/xml":
content = b"<xml><msg>Hello World!</msg></xml>"
return Response(content=content, media_type=preferred_type)
app = Litestar(route_handlers=[retrieve_resource])
Run it
> curl http://127.0.0.1:8000/resource
Hello World!
> curl http://127.0.0.1:8000/resource -H Accept: text/html
<h1>Hello World!</h1>
> curl http://127.0.0.1:8000/resource -H Accept: application/xml
<xml><msg>Hello World!</msg></xml>
Status Codes#
You can control the response status_code
by setting the corresponding kwarg to the desired value:
from pydantic import BaseModel
from litestar import get
from litestar.status_codes import HTTP_202_ACCEPTED
class Resource(BaseModel):
id: int
name: str
@get("/resources", status_code=HTTP_202_ACCEPTED)
def retrieve_resource() -> Resource:
return Resource(id=1, name="my resource")
If status_code
is not set by the user, the following defaults are used:
POST: 201 (Created)
DELETE: 204 (No Content)
GET, PATCH, PUT: 200 (Ok)
Attention
For status codes < 100 or 204, 304 statuses, no response body is allowed. If you specify a return annotation other
than None
, an ImproperlyConfiguredException
will be raised.
Note
When using the route
decorator with multiple http methods, the default status code is 200
.
The default for delete
is 204
because by default it is assumed that delete operations return no data.
This though might not be the case in your implementation - so take care of setting it as you see fit.
Tip
While you can write integers as the value for status_code
, e.g. 200
, it’s best practice to use constants (also in
tests). Litestar includes easy to use statuses that are exported from litestar.status_codes
, e.g. HTTP_200_OK
and HTTP_201_CREATED
. Another option is the http.HTTPStatus
enum from the standard library, which also offers
extra functionality.
Returning responses#
While the default response handling fits most use cases, in some cases you need to be able to return a response instance directly.
Litestar allows you to return any class inheriting from the Response
class. Thus, the below
example will work perfectly fine:
from pydantic import BaseModel
from litestar import Litestar, Response, get
from litestar.datastructures import Cookie
class Resource(BaseModel):
id: int
name: str
@get("/resources", sync_to_thread=False)
def retrieve_resource() -> Response[Resource]:
return Response(
Resource(
id=1,
name="my resource",
),
headers={"MY-HEADER": "xyz"},
cookies=[Cookie(key="my-cookie", value="abc")],
)
app = Litestar(route_handlers=[retrieve_resource])
Attention
In the case of the builtin Template
,
File
, Stream
, and
Redirect
you should use the response “response containers”, otherwise
OpenAPI documentation will not be generated correctly. For more details see the respective documentation sections:
Annotating responses#
As you can see above, the Response
class accepts a generic argument. This allows Litestar
to infer the response body when generating the OpenAPI docs.
Note
If the generic argument is not provided, and thus defaults to Any
, the OpenAPI docs will be imprecise. So make sure
to type this argument even when returning an empty or null
body, i.e. use None
.
Returning ASGI Applications#
Litestar also supports returning ASGI applications directly, as you would responses. For example:
from litestar import get
from litestar.types import ASGIApp, Receive, Scope, Send
@get("/")
def handler() -> ASGIApp:
async def my_asgi_app(scope: Scope, receive: Receive, send: Send) -> None: ...
return my_asgi_app
What is an ASGI Application?#
An ASGI application in this context is any async callable (function, class method or simply a class that implements
that special object.__call__()
dunder method) that accepts the three ASGI arguments: scope
, receive
, and
send
.
For example, all the following examples are ASGI applications:
Function ASGI Application#
from litestar.types import Receive, Scope, Send
async def my_asgi_app_function(scope: Scope, receive: Receive, send: Send) -> None:
# do something here
...
Method ASGI Application#
from litestar.types import Receive, Scope, Send
class MyClass:
async def my_asgi_app_method(
self, scope: Scope, receive: Receive, send: Send
) -> None:
# do something here
...
Class ASGI Application#
from litestar.types import Receive, Scope, Send
class ASGIApp:
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
# do something here
...
Returning responses from third party libraries#
Because you can return any ASGI Application from a route handler, you can also use any ASGI application from other libraries. For example, you can return the response classes from Starlette or FastAPI directly from route handlers:
from starlette.responses import JSONResponse
from litestar import get
from litestar.types import ASGIApp
@get("/")
def handler() -> ASGIApp:
return JSONResponse(content={"hello": "world"}) # type: ignore
Attention
Litestar offers strong typing for the ASGI arguments. Other libraries often offer less strict typing, which might
cause type checkers to complain when using ASGI apps from them inside Litestar.
For the time being, the only solution is to add # type: ignore
comments in the pertinent places.
Nonetheless, the above example will work perfectly fine.
Setting Response Headers#
Litestar allows you to define response headers by using the response_headers
kwarg. This kwarg is
available on all layers of the app - individual route handlers, controllers, routers, and the app
itself:
from litestar import Controller, Litestar, MediaType, Router, get
from litestar.datastructures import ResponseHeader
class MyController(Controller):
path = "/controller-path"
response_headers = [
ResponseHeader(name="controller-level-header", value="controller header", description="controller level header")
]
@get(
path="/handler-path",
response_headers=[
ResponseHeader(name="my-local-header", value="local header", description="local level header")
],
media_type=MediaType.TEXT,
sync_to_thread=False,
)
def my_route_handler(self) -> str:
return "hello world"
router = Router(
path="/router-path",
route_handlers=[MyController],
response_headers=[
ResponseHeader(name="router-level-header", value="router header", description="router level header")
],
)
app = Litestar(
route_handlers=[router],
response_headers=[ResponseHeader(name="app-level-header", value="app header", description="app level header")],
)
In the above example the response returned from my_route_handler
will have headers set from each layer of the
application using the given key+value combinations. I.e. it will be a dictionary equal to this:
{
"my-local-header": "local header",
"controller-level-header": "controller header",
"router-level-header": "router header",
"app-level-header": "app header"
}
The respective descriptions will be used for the OpenAPI documentation.
Tip
ResponseHeader
is
a special class that allows to add OpenAPI attributes such as description or documentation_only.
If you don’t need those, you can optionally define response_headers using a mapping - such as a dictionary -
as well:
@get(response_headers={"my-header": "header-value"})
async def handler() -> str: ...
Setting Headers Dynamically#
The above detailed scheme works great for statically configured headers, but how would you go about handling dynamically setting headers? Litestar allows you to set headers dynamically in several ways and below we will detail the two primary patterns.
Using Annotated Responses#
We can simply return a response instance directly from the route handler and set the headers dictionary manually as you see fit, e.g.:
from random import randint
from pydantic import BaseModel
from litestar import Litestar, Response, get
from litestar.datastructures import ResponseHeader
class Resource(BaseModel):
id: int
name: str
@get(
"/resources",
response_headers=[
ResponseHeader(
name="Random-Header", description="a random number in the range 1 - 100", documentation_only=True
)
],
sync_to_thread=False,
)
def retrieve_resource() -> Response[Resource]:
return Response(
Resource(
id=1,
name="my resource",
),
headers={"Random-Header": str(randint(1, 100))},
)
app = Litestar(route_handlers=[retrieve_resource])
In the above we use the response_headers
kwarg to pass the name
and description
parameters for the Random-Header
to the OpenAPI documentation, but we set the value dynamically in as part of
the annotated response we return. To this end we do not set a value
for it and we designate it as documentation_only=True
.
Using the After Request Hook#
An alternative pattern would be to use an after request handler. We can define the handler on different layers of the application as explained in the pertinent docs. We should take care to document the headers on the corresponding layer:
from random import randint
from pydantic import BaseModel
from litestar import Litestar, Response, Router, get
from litestar.datastructures import ResponseHeader
class Resource(BaseModel):
id: int
name: str
@get(
"/resources",
response_headers=[
ResponseHeader(
name="Random-Header",
description="a random number in the range 100 - 1000",
documentation_only=True,
)
],
sync_to_thread=False,
)
def retrieve_resource() -> Response[Resource]:
return Response(
Resource(
id=1,
name="my resource",
),
headers={"Random-Header": str(randint(100, 1000))},
)
def after_request_handler(response: Response) -> Response:
response.headers.update({"Random-Header": str(randint(1, 100))})
return response
router = Router(
path="/router-path",
route_handlers=[retrieve_resource],
after_request=after_request_handler,
response_headers=[
ResponseHeader(
name="Random-Header",
description="a random number in the range 1 - 100",
documentation_only=True,
)
],
)
app = Litestar(route_handlers=[router])
In the above we set the response header using an after_request_handler
function on the router level. Because the
handler function is applied on the router, we also set the documentation for it on the router.
We can use this pattern to fine-tune the OpenAPI documentation more granularly by overriding header specification as required. For example, lets say we have a router level header being set and a local header with the same key but a different value range:
from random import randint
from pydantic import BaseModel
from litestar import Litestar, Response, Router, get
from litestar.datastructures import ResponseHeader
class Resource(BaseModel):
id: int
name: str
@get(
"/resources",
response_headers=[
ResponseHeader(
name="Random-Header",
description="a random number in the range 100 - 1000",
documentation_only=True,
)
],
sync_to_thread=False,
)
def retrieve_resource() -> Response[Resource]:
return Response(
Resource(
id=1,
name="my resource",
),
headers={"Random-Header": str(randint(100, 1000))},
)
def after_request_handler(response: Response) -> Response:
response.headers.update({"Random-Header": str(randint(1, 100))})
return response
router = Router(
path="/router-path",
route_handlers=[retrieve_resource],
after_request=after_request_handler,
response_headers=[
ResponseHeader(
name="Random-Header", description="a random number in the range 1 - 100", documentation_only=True
)
],
)
app = Litestar(route_handlers=[router])
Predefined Headers#
Litestar has a dedicated implementation for a few commonly used headers. These headers can be set separately with dedicated keyword arguments or as class attributes on all layers of the app (individual route handlers, controllers, routers, and the app itself). Each layer overrides the layer above it - thus, the headers defined for a specific route handler will override those defined on its router, which will in turn override those defined on the app level.
These header implementations allow easy creating, serialization and parsing according to the associated header specifications.
Cache Control#
CacheControlHeader
represents a
Cache-Control Header.
Here is a simple example that shows how to use it:
import time
from litestar import Controller, Litestar, get
from litestar.datastructures import CacheControlHeader
class MyController(Controller):
cache_control = CacheControlHeader(max_age=86_400, public=True)
@get("/chance_of_rain", sync_to_thread=False)
def get_chance_of_rain(self) -> float:
"""This endpoint uses the cache control value defined in the controller which overrides the app value."""
return 0.5
@get("/timestamp", cache_control=CacheControlHeader(no_store=True), sync_to_thread=False)
def get_server_time(self) -> float:
"""This endpoint overrides the cache control value defined in the controller."""
return time.time()
@get("/population", sync_to_thread=False)
def get_population_count() -> int:
"""This endpoint will use the cache control defined in the app."""
return 100000
app = Litestar(
route_handlers=[MyController, get_population_count],
cache_control=CacheControlHeader(max_age=2_628_288, public=True),
)
In this example we have a cache-control
with max-age
of 1 month for the whole app, a max-age
of
1 day for all routes within MyController
, and no-store
for one specific route get_server_time
. Here are the cache
control values that will be returned from each endpoint:
When calling
/population
the response will havecache-control
withmax-age=2628288
(1 month).When calling
/chance_of_rain
the response will havecache-control
withmax-age=86400
(1 day).When calling
/timestamp
the response will havecache-control
withno-store
which means don’t store the result in any cache.
ETag#
ETag
represents an
ETag header.
Here are some usage examples:
import random
import time
from litestar import Controller, Litestar, get
from litestar.datastructures import ETag
from litestar.enums import MediaType
from litestar.response import Response
class MyController(Controller):
etag = ETag(value="foo")
@get("/chance_of_rain")
def get_chance_of_rain(self) -> float:
"""This endpoint uses the etag value in the controller which overrides the app value.
The returned header will be `etag: "foo"`
"""
return 0.5
@get("/timestamp", etag=ETag(value="bar"))
def get_server_time(self) -> float:
"""This endpoint overrides the etag defined in the controller.
The returned header will be `etag: W/"bar"`
"""
return time.time()
@get("/population")
def get_population_count() -> int:
"""This endpoint will use the etag defined in the app.
The returned header will be `etag: "bar"`
"""
return 100000
@get("/population-dynamic", etag=ETag(documentation_only=True))
def get_population_count_dynamic() -> Response[str]:
"""The etag defined in this route handler will not be returned, and does not need a value.
It will only be used for OpenAPI generation.
"""
population_count = random.randint(0, 1000)
return Response(
content=str(population_count),
headers={"etag": str(population_count)},
media_type=MediaType.TEXT,
status_code=200,
)
app = Litestar(route_handlers=[MyController, get_population_count], etag=ETag(value="bar"))
from litestar.datastructures import ETag
assert ETag.from_header('"foo"') == ETag(value="foo")
assert ETag.from_header('W/"foo"') == ETag(value="foo", weak=True)
Redirect Responses#
Redirect responses are special HTTP responses with a status code in the 30x range.
In Litestar, a redirect response looks like this:
from litestar.status_codes import HTTP_302_FOUND
from litestar import get
from litestar.response import Redirect
@get(path="/some-path", status_code=HTTP_302_FOUND)
def redirect() -> Redirect:
# do some stuff here
# ...
# finally return redirect
return Redirect(path="/other-path")
To return a redirect response you should do the following:
File Responses#
File responses send a file:
from pathlib import Path
from litestar import get
from litestar.response import File
@get(path="/file-download")
def handle_file_download() -> File:
return File(
path=Path(Path(__file__).resolve().parent, "report").with_suffix(".pdf"),
filename="report.pdf",
)
The File
class expects two kwargs:
path
: path of the file to download.filename
: the filename to set in the response Content-Disposition attachment.
Attention
When a route handler’s return value is annotated with File
, the default
media_type
for the route_handler is switched from MediaType.JSON
to
MediaType.TEXT
(i.e. "text/plain"
). If the file being sent has an
IANA media type, you should set it
as the value for media_type
instead.
For example:
from pathlib import Path
from litestar import get
from litestar.response import File
@get(path="/file-download", media_type="application/pdf")
def handle_file_download() -> File:
return File(
path=Path(Path(__file__).resolve().parent, "report").with_suffix(".pdf"),
filename="report.pdf",
)
Streaming Responses#
To return a streaming response use the Stream
class. The class
receives a single positional arg, that must be an iterator delivering the stream:
from asyncio import sleep
from datetime import datetime
from typing import AsyncGenerator
from litestar import Litestar, get
from litestar.response import Stream
from litestar.serialization import encode_json
async def my_generator() -> AsyncGenerator[bytes, None]:
while True:
await sleep(0.01)
yield encode_json({"current_time": datetime.now()})
@get(path="/time")
def stream_time() -> Stream:
return Stream(my_generator())
app = Litestar(route_handlers=[stream_time])
from asyncio import sleep
from datetime import datetime
from collections.abc import AsyncGenerator
from litestar import Litestar, get
from litestar.response import Stream
from litestar.serialization import encode_json
async def my_generator() -> AsyncGenerator[bytes, None]:
while True:
await sleep(0.01)
yield encode_json({"current_time": datetime.now()})
@get(path="/time")
def stream_time() -> Stream:
return Stream(my_generator())
app = Litestar(route_handlers=[stream_time])
Note
You can use different kinds of values for the iterator. It can be a callable returning a sync or async generator, a generator itself, a sync or async iterator class, or an instance of a sync or async iterator class.
Server Sent Event Responses#
To send server-sent-events or SSEs to the frontend, use the ServerSentEvent
class. The class receives a content arg. You can additionally specify event_type
, which is the
name of the event as declared in the browser, the event_id
, which sets the event source property, comment_message
,
which is used in for sending pings, and retry_duration
, which dictates the duration for retrying.
from asyncio import sleep
from typing import AsyncGenerator
from litestar import Litestar, get
from litestar.response import ServerSentEvent, ServerSentEventMessage
from litestar.types import SSEData
async def my_generator() -> AsyncGenerator[SSEData, None]:
count = 0
while count < 10:
await sleep(0.01)
count += 1
# In the generator you can yield integers, strings, bytes, dictionaries, or ServerSentEventMessage objects
# dicts can have the following keys: data, event, id, retry, comment
# here we yield an integer
yield count
# here a string
yield str(count)
# here bytes
yield str(count).encode("utf-8")
# here a dictionary
yield {"data": 2 * count, "event": "event2", "retry": 10}
# here a ServerSentEventMessage object
yield ServerSentEventMessage(event="something-with-comment", retry=1000, comment="some comment")
@get(path="/count", sync_to_thread=False)
def sse_handler() -> ServerSentEvent:
return ServerSentEvent(my_generator())
app = Litestar(route_handlers=[sse_handler])
from asyncio import sleep
from collections.abc import AsyncGenerator
from litestar import Litestar, get
from litestar.response import ServerSentEvent, ServerSentEventMessage
from litestar.types import SSEData
async def my_generator() -> AsyncGenerator[SSEData, None]:
count = 0
while count < 10:
await sleep(0.01)
count += 1
# In the generator you can yield integers, strings, bytes, dictionaries, or ServerSentEventMessage objects
# dicts can have the following keys: data, event, id, retry, comment
# here we yield an integer
yield count
# here a string
yield str(count)
# here bytes
yield str(count).encode("utf-8")
# here a dictionary
yield {"data": 2 * count, "event": "event2", "retry": 10}
# here a ServerSentEventMessage object
yield ServerSentEventMessage(event="something-with-comment", retry=1000, comment="some comment")
@get(path="/count", sync_to_thread=False)
def sse_handler() -> ServerSentEvent:
return ServerSentEvent(my_generator())
app = Litestar(route_handlers=[sse_handler])
Note
You can use different kinds of values for the iterator. It can be a callable returning a sync or async generator, a generator itself, a sync or async iterator class, or an instance of a sync or async iterator class.
In your iterator function you can yield integers, strings or bytes, the message sent in that case will have message
as the event_type
if the ServerSentEvent has no event_type
set, otherwise it will use the event_type
specified, and the data will be the yielded value.
If you want to send a different event type, you can use a dictionary with the keys event_type
and data
or the ServerSentMessage
class.
Note
You can further customize all the sse parameters, add comments, and set the retry duration by using the ServerSentEvent
class directly or by using the ServerSentEventMessage
or dictionaries with the appropriate keys.
Template Responses#
Template responses are used to render templates into HTML. To use a template response you must first register a template engine on the application level. Once an engine is in place, you can use a template response like so:
from litestar import Request, get
from litestar.response import Template
@get(path="/info")
def info(request: Request) -> Template:
return Template(template_name="info.html", context={"user": request.user})
In the above example, Template
is passed the template name, which is a
path like value, and a context dictionary that maps string keys into values that will be rendered in the template.
Custom Responses#
While Litestar supports the serialization of many types by default, sometimes you want to return something that’s not supported. In those cases it’s convenient to make use of a custom response class.
The example below illustrates how to deal with MultiDict
instances.
from litestar import Litestar, Response, get
from litestar.datastructures import MultiDict
class MultiDictResponse(Response):
type_encoders = {MultiDict: lambda d: d.dict()}
@get("/")
async def index() -> MultiDict:
return MultiDict([("foo", "bar"), ("foo", "baz")])
app = Litestar([index], response_class=MultiDictResponse)
Run it
> curl http://127.0.0.1:8000/
{"foo":["bar","baz"]}
Layered architecture
Response classes are part of Litestar’s layered architecture, which means you can set a response class on every layer of the application. If you have set a response class on multiple layers, the layer closest to the route handler will take precedence.
You can read more about this here: Layered architecture
Background Tasks#
All Litestar responses allow passing in a background
kwarg. This kwarg accepts either an instance of
BackgroundTask
or an instance of
BackgroundTasks
, which wraps an iterable of
BackgroundTask
instances.
A background task is a sync or async callable (function, method, or class that implements the object.__call__()
dunder method) that will be called after the response finishes sending the data.
Thus, in the following example the passed in background task will be executed after the response sends:
import logging
from typing import Dict
from litestar import Litestar, Response, get
from litestar.background_tasks import BackgroundTask
logger = logging.getLogger(__name__)
async def logging_task(identifier: str, message: str) -> None:
logger.info("%s: %s", identifier, message)
@get("/", sync_to_thread=False)
def greeter(name: str) -> Response[Dict[str, str]]:
return Response(
{"hello": name},
background=BackgroundTask(logging_task, "greeter", message=f"was called with name {name}"),
)
app = Litestar(route_handlers=[greeter])
import logging
from litestar import Litestar, Response, get
from litestar.background_tasks import BackgroundTask
logger = logging.getLogger(__name__)
async def logging_task(identifier: str, message: str) -> None:
logger.info("%s: %s", identifier, message)
@get("/", sync_to_thread=False)
def greeter(name: str) -> Response[dict[str, str]]:
return Response(
{"hello": name},
background=BackgroundTask(logging_task, "greeter", message=f"was called with name {name}"),
)
app = Litestar(route_handlers=[greeter])
When the greeter
handler is called, the logging task will be called with any *args
and **kwargs
passed into
the BackgroundTask
.
Note
In the above example "greeter"
is an arg and message=f"was called with name {name}"
is a kwarg.
The function signature of logging_task
allows for this, so this should pose no problem.
BackgroundTask
is typed with ParamSpec
,
enabling correct type checking for arguments and keyword arguments passed to it.
Route decorators (e.g. @get
, @post
, etc.) also allow passing in a background task with the background
kwarg:
import logging
from typing import Dict
from litestar import Litestar, get
from litestar.background_tasks import BackgroundTask
logger = logging.getLogger(__name__)
async def logging_task(identifier: str, message: str) -> None:
logger.info("%s: %s", identifier, message)
@get("/", background=BackgroundTask(logging_task, "greeter", message="was called"), sync_to_thread=False)
def greeter() -> Dict[str, str]:
return {"hello": "world"}
app = Litestar(route_handlers=[greeter])
import logging
from litestar import Litestar, get
from litestar.background_tasks import BackgroundTask
logger = logging.getLogger(__name__)
async def logging_task(identifier: str, message: str) -> None:
logger.info("%s: %s", identifier, message)
@get("/", background=BackgroundTask(logging_task, "greeter", message="was called"), sync_to_thread=False)
def greeter() -> dict[str, str]:
return {"hello": "world"}
app = Litestar(route_handlers=[greeter])
Note
Route handler arguments cannot be passed into background tasks when they are passed into decorators.
Executing Multiple Background Tasks#
You can also use the BackgroundTasks
class and pass to it an iterable
(list
, tuple
, etc.) of BackgroundTask
instances:
import logging
from typing import Dict
from litestar import Litestar, Response, get
from litestar.background_tasks import BackgroundTask, BackgroundTasks
logger = logging.getLogger(__name__)
greeted = set()
async def logging_task(name: str) -> None:
logger.info("%s was greeted", name)
async def saving_task(name: str) -> None:
greeted.add(name)
@get("/", sync_to_thread=False)
def greeter(name: str) -> Response[Dict[str, str]]:
return Response(
{"hello": name},
background=BackgroundTasks(
[
BackgroundTask(logging_task, name),
BackgroundTask(saving_task, name),
]
),
)
app = Litestar(route_handlers=[greeter])
import logging
from litestar import Litestar, Response, get
from litestar.background_tasks import BackgroundTask, BackgroundTasks
logger = logging.getLogger(__name__)
greeted = set()
async def logging_task(name: str) -> None:
logger.info("%s was greeted", name)
async def saving_task(name: str) -> None:
greeted.add(name)
@get("/", sync_to_thread=False)
def greeter(name: str) -> Response[dict[str, str]]:
return Response(
{"hello": name},
background=BackgroundTasks(
[
BackgroundTask(logging_task, name),
BackgroundTask(saving_task, name),
]
),
)
app = Litestar(route_handlers=[greeter])
BackgroundTasks
class
accepts an optional keyword argument run_in_task_group
with a default value of False
. Setting this to True
allows background tasks to run concurrently, using an anyio.task_group.
Note
Setting run_in_task_group
to True
will not preserve execution order.
Pagination#
When you need to return a large number of items from an endpoint it is common practice to use pagination to ensure clients can request a specific subset or “page” from the total dataset. Litestar supports three types of pagination out of the box:
classic pagination
limit / offset pagination
cursor pagination
Classic Pagination#
In classic pagination the dataset is divided into pages of a specific size and the consumer then requests a specific page.
from typing import List
from polyfactory.factories.pydantic_factory import ModelFactory
from pydantic import BaseModel
from litestar import Litestar, get
from litestar.pagination import AbstractSyncClassicPaginator, ClassicPagination
class Person(BaseModel):
id: str
name: str
class PersonFactory(ModelFactory[Person]):
__model__ = Person
# we will implement a paginator - the paginator must implement two methods 'get_total' and 'get_items'
# we would usually use a database for this, but for our case we will "fake" the dataset using a factory.
class PersonClassicPaginator(AbstractSyncClassicPaginator[Person]):
def __init__(self) -> None:
self.data = PersonFactory.batch(50)
def get_total(self, page_size: int) -> int:
return round(len(self.data) / page_size)
def get_items(self, page_size: int, current_page: int) -> List[Person]:
return [self.data[i : i + page_size] for i in range(0, len(self.data), page_size)][current_page - 1]
paginator = PersonClassicPaginator()
# we now create a regular handler. The handler will receive two query parameters - 'page_size' and 'current_page', which
# we will pass to the paginator.
@get("/people", sync_to_thread=False)
def people_handler(page_size: int, current_page: int) -> ClassicPagination[Person]:
return paginator(page_size=page_size, current_page=current_page)
app = Litestar(route_handlers=[people_handler])
from polyfactory.factories.pydantic_factory import ModelFactory
from pydantic import BaseModel
from litestar import Litestar, get
from litestar.pagination import AbstractSyncClassicPaginator, ClassicPagination
class Person(BaseModel):
id: str
name: str
class PersonFactory(ModelFactory[Person]):
__model__ = Person
# we will implement a paginator - the paginator must implement two methods 'get_total' and 'get_items'
# we would usually use a database for this, but for our case we will "fake" the dataset using a factory.
class PersonClassicPaginator(AbstractSyncClassicPaginator[Person]):
def __init__(self) -> None:
self.data = PersonFactory.batch(50)
def get_total(self, page_size: int) -> int:
return round(len(self.data) / page_size)
def get_items(self, page_size: int, current_page: int) -> list[Person]:
return [self.data[i : i + page_size] for i in range(0, len(self.data), page_size)][current_page - 1]
paginator = PersonClassicPaginator()
# we now create a regular handler. The handler will receive two query parameters - 'page_size' and 'current_page', which
# we will pass to the paginator.
@get("/people", sync_to_thread=False)
def people_handler(page_size: int, current_page: int) -> ClassicPagination[Person]:
return paginator(page_size=page_size, current_page=current_page)
app = Litestar(route_handlers=[people_handler])
The data container for this pagination is called ClassicPagination
, which is
what will be returned by the paginator in the above example This will also generate the corresponding OpenAPI
documentation.
If you require async logic, you can implement
the AbstractAsyncClassicPaginator
instead of the
AbstractSyncClassicPaginator
.
Offset Pagination#
In offset pagination the consumer requests a number of items specified by limit
and the offset
from the beginning of the dataset.
For example, given a list of 50 items, you could request limit=10
, offset=39
to request items 40-50.
from itertools import islice
from typing import List
from polyfactory.factories.pydantic_factory import ModelFactory
from pydantic import BaseModel
from litestar import Litestar, get
from litestar.pagination import AbstractSyncOffsetPaginator, OffsetPagination
class Person(BaseModel):
id: str
name: str
class PersonFactory(ModelFactory[Person]):
__model__ = Person
# we will implement a paginator - the paginator must implement two methods 'get_total' and 'get_items'
# we would usually use a database for this, but for our case we will "fake" the dataset using a factory.
class PersonOffsetPaginator(AbstractSyncOffsetPaginator[Person]):
def __init__(self) -> None:
self.data = PersonFactory.batch(50)
def get_total(self) -> int:
return len(self.data)
def get_items(self, limit: int, offset: int) -> List[Person]:
return list(islice(islice(self.data, offset, None), limit))
paginator = PersonOffsetPaginator()
# we now create a regular handler. The handler will receive two query parameters - 'limit' and 'offset', which
# we will pass to the paginator.
@get("/people", sync_to_thread=False)
def people_handler(limit: int, offset: int) -> OffsetPagination[Person]:
return paginator(limit=limit, offset=offset)
app = Litestar(route_handlers=[people_handler])
from itertools import islice
from polyfactory.factories.pydantic_factory import ModelFactory
from pydantic import BaseModel
from litestar import Litestar, get
from litestar.pagination import AbstractSyncOffsetPaginator, OffsetPagination
class Person(BaseModel):
id: str
name: str
class PersonFactory(ModelFactory[Person]):
__model__ = Person
# we will implement a paginator - the paginator must implement two methods 'get_total' and 'get_items'
# we would usually use a database for this, but for our case we will "fake" the dataset using a factory.
class PersonOffsetPaginator(AbstractSyncOffsetPaginator[Person]):
def __init__(self) -> None:
self.data = PersonFactory.batch(50)
def get_total(self) -> int:
return len(self.data)
def get_items(self, limit: int, offset: int) -> list[Person]:
return list(islice(islice(self.data, offset, None), limit))
paginator = PersonOffsetPaginator()
# we now create a regular handler. The handler will receive two query parameters - 'limit' and 'offset', which
# we will pass to the paginator.
@get("/people", sync_to_thread=False)
def people_handler(limit: int, offset: int) -> OffsetPagination[Person]:
return paginator(limit=limit, offset=offset)
app = Litestar(route_handlers=[people_handler])
The data container for this pagination is
called OffsetPagination
, which is what will be returned by the paginator in the
above example This will also generate the corresponding OpenAPI documentation.
If you require async logic, you can implement
the AbstractAsyncOffsetPaginator
instead of the
AbstractSyncOffsetPaginator
.
Offset Pagination With SQLAlchemy#
When retrieving paginated data from the database using SQLAlchemy, the Paginator instance requires an SQLAlchemy session instance to make queries. This can be achieved with Dependency Injection
from typing import TYPE_CHECKING, List, cast
from sqlalchemy import func, select
from sqlalchemy.orm import Mapped
from litestar import Litestar, get
from litestar.contrib.sqlalchemy.base import UUIDBase
from litestar.contrib.sqlalchemy.plugins import SQLAlchemyAsyncConfig, SQLAlchemyInitPlugin
from litestar.di import Provide
from litestar.pagination import AbstractAsyncOffsetPaginator, OffsetPagination
if TYPE_CHECKING:
from sqlalchemy.engine.result import ScalarResult
from sqlalchemy.ext.asyncio import AsyncSession
class Person(UUIDBase):
name: Mapped[str]
class PersonOffsetPaginator(AbstractAsyncOffsetPaginator[Person]):
def __init__(self, async_session: AsyncSession) -> None: # 'async_session' dependency will be injected here.
self.async_session = async_session
async def get_total(self) -> int:
return cast("int", await self.async_session.scalar(select(func.count(Person.id))))
async def get_items(self, limit: int, offset: int) -> List[Person]:
people: ScalarResult = await self.async_session.scalars(select(Person).slice(offset, limit))
return list(people.all())
# Create a route handler. The handler will receive two query parameters - 'limit' and 'offset', which is passed
# to the paginator instance. Also create a dependency 'paginator' which will be injected into the handler.
@get("/people", dependencies={"paginator": Provide(PersonOffsetPaginator)})
async def people_handler(paginator: PersonOffsetPaginator, limit: int, offset: int) -> OffsetPagination[Person]:
return await paginator(limit=limit, offset=offset)
sqlalchemy_config = SQLAlchemyAsyncConfig(
connection_string="sqlite+aiosqlite:///test.sqlite", session_dependency_key="async_session"
) # Create 'async_session' dependency.
sqlalchemy_plugin = SQLAlchemyInitPlugin(config=sqlalchemy_config)
async def on_startup() -> None:
"""Initializes the database."""
async with sqlalchemy_config.get_engine().begin() as conn:
await conn.run_sync(UUIDBase.metadata.create_all)
app = Litestar(route_handlers=[people_handler], on_startup=[on_startup], plugins=[sqlalchemy_plugin])
from typing import TYPE_CHECKING, cast
from sqlalchemy import func, select
from sqlalchemy.orm import Mapped
from litestar import Litestar, get
from litestar.contrib.sqlalchemy.base import UUIDBase
from litestar.contrib.sqlalchemy.plugins import SQLAlchemyAsyncConfig, SQLAlchemyInitPlugin
from litestar.di import Provide
from litestar.pagination import AbstractAsyncOffsetPaginator, OffsetPagination
if TYPE_CHECKING:
from sqlalchemy.engine.result import ScalarResult
from sqlalchemy.ext.asyncio import AsyncSession
class Person(UUIDBase):
name: Mapped[str]
class PersonOffsetPaginator(AbstractAsyncOffsetPaginator[Person]):
def __init__(self, async_session: AsyncSession) -> None: # 'async_session' dependency will be injected here.
self.async_session = async_session
async def get_total(self) -> int:
return cast("int", await self.async_session.scalar(select(func.count(Person.id))))
async def get_items(self, limit: int, offset: int) -> list[Person]:
people: ScalarResult = await self.async_session.scalars(select(Person).slice(offset, limit))
return list(people.all())
# Create a route handler. The handler will receive two query parameters - 'limit' and 'offset', which is passed
# to the paginator instance. Also create a dependency 'paginator' which will be injected into the handler.
@get("/people", dependencies={"paginator": Provide(PersonOffsetPaginator)})
async def people_handler(paginator: PersonOffsetPaginator, limit: int, offset: int) -> OffsetPagination[Person]:
return await paginator(limit=limit, offset=offset)
sqlalchemy_config = SQLAlchemyAsyncConfig(
connection_string="sqlite+aiosqlite:///test.sqlite", session_dependency_key="async_session"
) # Create 'async_session' dependency.
sqlalchemy_plugin = SQLAlchemyInitPlugin(config=sqlalchemy_config)
async def on_startup() -> None:
"""Initializes the database."""
async with sqlalchemy_config.get_engine().begin() as conn:
await conn.run_sync(UUIDBase.metadata.create_all)
app = Litestar(route_handlers=[people_handler], on_startup=[on_startup], plugins=[sqlalchemy_plugin])
See SQLAlchemy plugin for sqlalchemy integration.
Cursor Pagination#
In cursor pagination the consumer requests a number of items specified by results_per_page
and a cursor
after which results are given.
Cursor is unique identifier within the dataset that serves as a way to point the starting position.
from typing import List, Optional, Tuple
from polyfactory.factories.pydantic_factory import ModelFactory
from pydantic import BaseModel
from litestar import Litestar, get
from litestar.pagination import AbstractSyncCursorPaginator, CursorPagination
class Person(BaseModel):
id: str
name: str
class PersonFactory(ModelFactory[Person]):
__model__ = Person
# we will implement a paginator - the paginator must implement the method 'get_items'.
class PersonCursorPaginator(AbstractSyncCursorPaginator[str, Person]):
def __init__(self) -> None:
self.data = PersonFactory.batch(50)
def get_items(self, cursor: Optional[str], results_per_page: int) -> Tuple[List[Person], Optional[str]]:
results = self.data[:results_per_page]
return results, results[-1].id
paginator = PersonCursorPaginator()
# we now create a regular handler. The handler will receive a single query parameter - 'cursor', which
# we will pass to the paginator.
@get("/people", sync_to_thread=False)
def people_handler(cursor: Optional[str], results_per_page: int) -> CursorPagination[str, Person]:
return paginator(cursor=cursor, results_per_page=results_per_page)
app = Litestar(route_handlers=[people_handler])
from typing import Optional
from polyfactory.factories.pydantic_factory import ModelFactory
from pydantic import BaseModel
from litestar import Litestar, get
from litestar.pagination import AbstractSyncCursorPaginator, CursorPagination
class Person(BaseModel):
id: str
name: str
class PersonFactory(ModelFactory[Person]):
__model__ = Person
# we will implement a paginator - the paginator must implement the method 'get_items'.
class PersonCursorPaginator(AbstractSyncCursorPaginator[str, Person]):
def __init__(self) -> None:
self.data = PersonFactory.batch(50)
def get_items(self, cursor: Optional[str], results_per_page: int) -> tuple[list[Person], Optional[str]]:
results = self.data[:results_per_page]
return results, results[-1].id
paginator = PersonCursorPaginator()
# we now create a regular handler. The handler will receive a single query parameter - 'cursor', which
# we will pass to the paginator.
@get("/people", sync_to_thread=False)
def people_handler(cursor: Optional[str], results_per_page: int) -> CursorPagination[str, Person]:
return paginator(cursor=cursor, results_per_page=results_per_page)
app = Litestar(route_handlers=[people_handler])
from polyfactory.factories.pydantic_factory import ModelFactory
from pydantic import BaseModel
from litestar import Litestar, get
from litestar.pagination import AbstractSyncCursorPaginator, CursorPagination
class Person(BaseModel):
id: str
name: str
class PersonFactory(ModelFactory[Person]):
__model__ = Person
# we will implement a paginator - the paginator must implement the method 'get_items'.
class PersonCursorPaginator(AbstractSyncCursorPaginator[str, Person]):
def __init__(self) -> None:
self.data = PersonFactory.batch(50)
def get_items(self, cursor: str | None, results_per_page: int) -> tuple[list[Person], str | None]:
results = self.data[:results_per_page]
return results, results[-1].id
paginator = PersonCursorPaginator()
# we now create a regular handler. The handler will receive a single query parameter - 'cursor', which
# we will pass to the paginator.
@get("/people", sync_to_thread=False)
def people_handler(cursor: str | None, results_per_page: int) -> CursorPagination[str, Person]:
return paginator(cursor=cursor, results_per_page=results_per_page)
app = Litestar(route_handlers=[people_handler])
The data container for this pagination is called CursorPagination
, which is what
will be returned by the paginator in the above example This will also generate the corresponding OpenAPI documentation.
If you require async logic, you can implement
the AbstractAsyncCursorPaginator
instead of the
AbstractSyncCursorPaginator
.