WebSockets#
There are three ways to handle WebSockets in Litestar:
The low-level
websocket
route handler, providing basic abstractions over the ASGI WebSocket interfacewebsocket_listener
andWebsocketListener
: Reactive, event-driven WebSockets with full serialization and DTO support and support for a synchronous interfacewebsocket_stream
andsend_websocket_stream()
: Proactive, stream oriented WebSockets with full serialization and DTO support
The main difference between the low and high level interfaces is that, dealing with low level interface requires, setting up a loop and listening for incoming data, handling exceptions, client disconnects, and parsing incoming and serializing outgoing data.
WebSocket Listeners#
WebSocket Listeners can be used to interact with a WebSocket in an event-driven manner, using a callback style interface. They treat a WebSocket handler like any other route handler: A callable that takes in incoming data in an already pre-processed form and returns data to be serialized and sent over the connection. The low level details will be handled behind the curtains.
from litestar import Litestar
from litestar.handlers.websocket_handlers import websocket_listener
@websocket_listener("/")
async def handler(data: str) -> str:
return data
app = Litestar([handler])
This handler will accept connections on /
, and wait to receive data. Once a message
has been received, it will be passed into the handler function defined, via the data
parameter. This works like a regular route handler, so it’s possible to specify the
type of data which should be received, and it will be converted accordingly.
Note
Contrary to WebSocket route handlers, functions decorated with
websocket_listener
don’t have to be
asynchronous.
Receiving data#
Data can be received in the listener via the data
parameter. The data passed to this
will be converted / parsed according to the given type annotation and supports
str
, bytes
, or arbitrary dict
s / or list
s in the
form of JSON.
from typing import Dict
from litestar import Litestar, websocket_listener
@websocket_listener("/")
async def handler(data: Dict[str, str]) -> Dict[str, str]:
return data
app = Litestar([handler])
from litestar import Litestar, websocket_listener
@websocket_listener("/")
async def handler(data: dict[str, str]) -> dict[str, str]:
return data
app = Litestar([handler])
from litestar import Litestar, websocket_listener
@websocket_listener("/")
async def handler(data: str) -> str:
return data
app = Litestar([handler])
from litestar import Litestar, websocket_listener
@websocket_listener("/")
async def handler(data: bytes) -> str:
return data.decode("utf-8")
app = Litestar([handler])
Important
Contrary to route handlers, JSON data will only be parsed but not validated. This is a limitation of the current implementation and will change in future versions.
Sending data#
Sending data is done by simply returning the value to be sent from the handler function.
Similar to receiving data, type annotations configure how the data is being handled.
Values that are not str
or bytes
are assumed to be JSON encodable and
will be serialized accordingly before being sent. This serialization is available for
all data types currently supported by Litestar (
dataclasses, TypedDict
,
NamedTuple
, msgspec.Struct
, etc.), including
DTOs.
from litestar import Litestar, websocket_listener
@websocket_listener("/")
async def handler(data: str) -> str:
return data
app = Litestar([handler])
from litestar import Litestar, websocket_listener
@websocket_listener("/")
async def handler(data: str) -> bytes:
return data.encode("utf-8")
app = Litestar([handler])
from typing import Dict
from litestar import Litestar, websocket_listener
@websocket_listener("/")
async def handler(data: str) -> Dict[str, str]:
return {"message": data}
app = Litestar([handler])
from litestar import Litestar, websocket_listener
@websocket_listener("/")
async def handler(data: str) -> dict[str, str]:
return {"message": data}
app = Litestar([handler])
from dataclasses import dataclass
from datetime import datetime
from litestar import Litestar, websocket_listener
@dataclass
class Message:
content: str
timestamp: float
@websocket_listener("/")
async def handler(data: str) -> Message:
return Message(content=data, timestamp=datetime.now().timestamp())
app = Litestar([handler])
Setting transport modes#
Receive mode#
text
is the default mode and is appropriate for most messages, including
structured data such as JSON.
from litestar import Litestar, websocket_listener
@websocket_listener("/", receive_mode="text")
async def handler(data: str) -> str:
return data
app = Litestar([handler])
from litestar import Litestar, websocket_listener
@websocket_listener("/", receive_mode="binary")
async def handler(data: str) -> str:
return data
app = Litestar([handler])
Important
Once configured with a mode, a listener will only listen to socket events of the
appropriate type. This means if a listener is configured to use binary
mode,
it will not respond to WebSocket events sending data in the text channel.
Send mode#
text
is the default mode and is appropriate for most messages, including
structured data such as JSON.
from litestar import Litestar, websocket_listener
@websocket_listener("/", send_mode="text")
async def handler(data: str) -> str:
return data
app = Litestar([handler])
from litestar import Litestar, websocket_listener
@websocket_listener("/", send_mode="binary")
async def handler(data: str) -> str:
return data
app = Litestar([handler])
Dependency injection#
Dependency Injection is available and generally works the same as in regular route handlers:
from litestar import Litestar, websocket_listener
from litestar.di import Provide
def some_dependency() -> str:
return "hello"
@websocket_listener("/", dependencies={"some": Provide(some_dependency)})
async def handler(data: str, some: str) -> str:
return data + some
app = Litestar([handler])
Important
Injected dependencies work on the level of the underlying route handler. This means they won’t be re-evaluated every time the listener function is called.
The following example makes use of yield dependencies and
the fact that dependencies are only evaluated once for every connection; The step after
the yield
will only be executed after the connection has been closed.
from typing import TypedDict
from litestar import Litestar, websocket_listener
from litestar.datastructures import State
from litestar.di import Provide
class Message(TypedDict):
message: str
client_count: int
def socket_client_count(state: State) -> int:
if not hasattr(state, "count"):
state.count = 0
state.count += 1
yield state.count
state.count -= 1
@websocket_listener("/", dependencies={"client_count": Provide(socket_client_count)})
async def handler(data: str, client_count: int) -> Message:
return Message(message=data, client_count=client_count)
app = Litestar([handler])
Interacting with the WebSocket directly#
Sometimes access to the socket instance is needed, in which case the
WebSocket
instance can be injected into the handler
function via the socket
argument:
from litestar import Litestar, WebSocket, websocket_listener
@websocket_listener("/")
async def handler(data: str, socket: WebSocket) -> str:
if data == "goodbye":
await socket.close()
return data
app = Litestar([handler])
Important
Since WebSockets are inherently asynchronous, to interact with the asynchronous
methods on WebSocket
, the handler function needs
to be asynchronous.
Customising connection acceptance#
By default, Litestar will accept all incoming connections by awaiting WebSocket.accept()
without arguments.
This behavior can be customized by passing a custom connection_accept_handler
function. Litestar will await this
function to accept the connection.
from litestar import Litestar, WebSocket, websocket_listener
async def accept_connection(socket: WebSocket) -> None:
await socket.accept(headers={"Cookie": "custom-cookie"})
@websocket_listener("/", connection_accept_handler=accept_connection)
def handler(data: str) -> str:
return data
app = Litestar([handler])
Class based WebSocket handling#
In addition to using a simple function as in the examples above, a class based approach
is made possible by extending the
WebSocketListener
. This provides
convenient access to socket events such as connect and disconnect, and can be used to
encapsulate more complex logic.
from litestar import Litestar, WebSocket
from litestar.handlers import WebsocketListener
class Handler(WebsocketListener):
path = "/"
def on_accept(self, socket: WebSocket) -> None:
print("Connection accepted")
def on_disconnect(self, socket: WebSocket) -> None:
print("Connection closed")
def on_receive(self, data: str) -> str:
return data
app = Litestar([Handler])
from litestar import Litestar, WebSocket
from litestar.handlers import WebsocketListener
class Handler(WebsocketListener):
path = "/"
async def on_accept(self, socket: WebSocket) -> None:
print("Connection accepted")
async def on_disconnect(self, socket: WebSocket) -> None:
print("Connection closed")
async def on_receive(self, data: str) -> str:
return data
app = Litestar([Handler])
Custom WebSocket#
New in version 2.7.0.
Litestar supports custom websocket_class
instances, which can be used to further configure the default WebSocket
.
The example below illustrates how to implement a custom WebSocket class for the whole application.
Example of a custom websocket at the application level
from __future__ import annotations
from litestar import Litestar, WebSocket, websocket_listener
from litestar.types.asgi_types import WebSocketMode
class CustomWebSocket(WebSocket):
async def receive_data(self, mode: WebSocketMode) -> str | bytes:
"""Return fixed response for every websocket message."""
await super().receive_data(mode=mode)
return "Fixed response"
@websocket_listener("/")
async def handler(data: str) -> str:
return data
app = Litestar([handler], websocket_class=CustomWebSocket)
Layered architecture
WebSocket classes are part of Litestar’s layered architecture, which means you can set a WebSocket class on every layer of the application. If you have set a WebSocket class on multiple layers, the layer closest to the route handler will take precedence.
You can read more about this in the Layered architecture section
WebSocket Streams#
WebSocket streams can be used to proactively push data to a client, using an
asynchronous generator function. Data will be sent via the socket every time the
generator yield
s, until it is either exhausted or the client disconnects.
import asyncio
import time
from typing import AsyncGenerator
from litestar import Litestar, websocket_stream
@websocket_stream("/")
async def ping() -> AsyncGenerator[float, None]:
while True:
yield time.time()
await asyncio.sleep(0.5)
app = Litestar([ping])
import asyncio
import time
from collections.abc import AsyncGenerator
from litestar import Litestar, websocket_stream
@websocket_stream("/")
async def ping() -> AsyncGenerator[float, None]:
while True:
yield time.time()
await asyncio.sleep(0.5)
app = Litestar([ping])
Serialization#
Just like with route handlers, type annotations configure how the data is being handled.
str
or bytes
will be sent as-is, while everything else will be encoded
as JSON before being sent. This serialization is available for all data types currently
supported by Litestar (dataclasses,
TypedDict
, NamedTuple
,
msgspec.Struct
, etc.), including DTOs.
Dependency Injection#
Dependency injection is available and works analogous to regular route handlers.
Important
One thing to keep in mind, especially for long-lived streams, is that dependencies are scoped to the lifetime of the handler. This means that if for example a database connection is acquired in a dependency, it will be held until the generator stops. This may not be desirable in all cases, and acquiring resources ad-hoc inside the generator itself preferable
import asyncio
from typing import AsyncGenerator
from app.lib import ping_external_resource
from litestar import Litestar, websocket_stream
RESOURCE_LOCK = asyncio.Lock()
async def acquire_lock() -> AsyncGenerator[None, None]:
async with RESOURCE_LOCK:
yield
@websocket_stream("/")
async def ping(lock: asyncio.Lock) -> AsyncGenerator[float, None]:
while True:
alive = await ping_external_resource()
yield alive
await asyncio.sleep(1)
app = Litestar([ping], dependencies={"lock": acquire_lock})
import asyncio
from collections.abc import AsyncGenerator
from app.lib import ping_external_resource
from litestar import Litestar, websocket_stream
RESOURCE_LOCK = asyncio.Lock()
async def acquire_lock() -> AsyncGenerator[None, None]:
async with RESOURCE_LOCK:
yield
@websocket_stream("/")
async def ping(lock: asyncio.Lock) -> AsyncGenerator[float, None]:
while True:
alive = await ping_external_resource()
yield alive
await asyncio.sleep(1)
app = Litestar([ping], dependencies={"lock": acquire_lock})
import asyncio
from typing import AsyncGenerator
from app.lib import ping_external_resource
from litestar import Litestar, websocket_stream
RESOURCE_LOCK = asyncio.Lock()
@websocket_stream("/")
async def ping() -> AsyncGenerator[float, None]:
while True:
async with RESOURCE_LOCK:
alive = await ping_external_resource()
yield alive
await asyncio.sleep(1)
app = Litestar([ping])
import asyncio
from collections.abc import AsyncGenerator
from app.lib import ping_external_resource
from litestar import Litestar, websocket_stream
RESOURCE_LOCK = asyncio.Lock()
@websocket_stream("/")
async def ping() -> AsyncGenerator[float, None]:
while True:
async with RESOURCE_LOCK:
alive = await ping_external_resource()
yield alive
await asyncio.sleep(1)
app = Litestar([ping])
Interacting with the WebSocket directly#
To interact with the WebSocket
directly, it can be
injected into the generator function via the socket
argument:
import asyncio
import time
from typing import Any, AsyncGenerator
from litestar import Litestar, WebSocket, websocket_stream
@websocket_stream("/")
async def ping(socket: WebSocket) -> AsyncGenerator[dict[str, Any], None]:
while True:
yield {"time": time.time(), "client": socket.client}
await asyncio.sleep(0.5)
app = Litestar([ping])
import asyncio
import time
from typing import Any
from collections.abc import AsyncGenerator
from litestar import Litestar, WebSocket, websocket_stream
@websocket_stream("/")
async def ping(socket: WebSocket) -> AsyncGenerator[dict[str, Any], None]:
while True:
yield {"time": time.time(), "client": socket.client}
await asyncio.sleep(0.5)
app = Litestar([ping])
Receiving data while streaming#
By default, a stream will listen for a client disconnect in the background, and stop the generator once received. Since this requires receiving data from the socket, it can lead to data loss if the application is attempting to read from the same socket simultaneously.
Tip
To prevent data loss, by default, websocket_stream
will raise an
exception if it receives any data while listening for client disconnects. If
incoming data should be ignored, allow_data_discard
should be set to True
If receiving data while streaming is desired,
send_websocket_stream()
can be configured to not listen for
disconnects by setting listen_for_disconnect=False
.
Important
When using listen_for_disconnect=False
, the application needs to ensure the
disconnect event is received elsewhere, otherwise the stream will only terminate
when the generator is exhausted
Combining streaming and receiving data#
To stream and receive data concurrently, the stream can be set up manually using
send_websocket_stream()
in combination with either a regular
websocket
handler or a WebSocket listener.
import asyncio
import time
from typing import Any, AsyncGenerator
from litestar import Litestar, WebSocket, websocket_listener
from litestar.handlers import send_websocket_stream
async def listener_lifespan(socket: WebSocket) -> None:
async def handle_stream() -> AsyncGenerator[dict[str, float], None]:
while True:
yield {"time": time.time()}
await asyncio.sleep(0.5)
task = asyncio.create_task(send_websocket_stream(socket=socket, stream=handle_stream()))
yield
task.cancel()
await task
@websocket_listener("/", connection_lifespan=listener_lifespan)
def handler(socket: WebSocket, data: Any) -> None:
print(f"{socket.client}: {data}")
app = Litestar([handler])
import asyncio
import time
from typing import Any
from collections.abc import AsyncGenerator
from litestar import Litestar, WebSocket, websocket_listener
from litestar.handlers import send_websocket_stream
async def listener_lifespan(socket: WebSocket) -> None:
async def handle_stream() -> AsyncGenerator[dict[str, float], None]:
while True:
yield {"time": time.time()}
await asyncio.sleep(0.5)
task = asyncio.create_task(send_websocket_stream(socket=socket, stream=handle_stream()))
yield
task.cancel()
await task
@websocket_listener("/", connection_lifespan=listener_lifespan)
def handler(socket: WebSocket, data: Any) -> None:
print(f"{socket.client}: {data}")
app = Litestar([handler])
import asyncio
import time
from typing import AsyncGenerator
from litestar import Litestar, WebSocket, websocket
from litestar.handlers import send_websocket_stream
@websocket("/")
async def handler(socket: WebSocket) -> None:
await socket.accept()
async def handle_stream() -> AsyncGenerator[dict[str, float], None]:
while True:
yield {"time": time.time()}
await asyncio.sleep(0.5)
async def handle_receive() -> None:
async for event in socket.iter_json():
print(f"{socket.client}: {event}")
async with asyncio.TaskGroup() as tg:
tg.create_task(send_websocket_stream(socket=socket, stream=handle_stream()))
tg.create_task(handle_receive())
app = Litestar([handler])
import asyncio
import time
from collections.abc import AsyncGenerator
from litestar import Litestar, WebSocket, websocket
from litestar.handlers import send_websocket_stream
@websocket("/")
async def handler(socket: WebSocket) -> None:
await socket.accept()
async def handle_stream() -> AsyncGenerator[dict[str, float], None]:
while True:
yield {"time": time.time()}
await asyncio.sleep(0.5)
async def handle_receive() -> None:
async for event in socket.iter_json():
print(f"{socket.client}: {event}")
async with asyncio.TaskGroup() as tg:
tg.create_task(send_websocket_stream(socket=socket, stream=handle_stream()))
tg.create_task(handle_receive())
app = Litestar([handler])
Transport modes#
WebSockets have two transport modes: text
and binary
. They dictate how bytes are
transferred over the wire and can be set independently from another, i.e. a socket can
send binary
and receive text
It may seem intuitive that text
and binary
should map to str
and
bytes
respectively, but this is not the case. WebSockets can receive and
send data in any format, independently of the mode. The mode only affects how the
bytes are handled during transport (i.e. on the protocol level). In most cases the
default mode - text
- is all that’s needed. Binary transport is usually employed
when sending binary blobs that don’t have a meaningful string representation, such
as images.