WebSockets#

There are three ways to handle WebSockets in Litestar:

  1. The low-level websocket route handler, providing basic abstractions over the ASGI WebSocket interface

  2. websocket_listener and WebsocketListener : Reactive, event-driven WebSockets with full serialization and DTO support and support for a synchronous interface

  3. websocket_stream and send_websocket_stream(): Proactive, stream oriented WebSockets with full serialization and DTO support

The main difference between the low and high level interfaces is that, dealing with low level interface requires, setting up a loop and listening for incoming data, handling exceptions, client disconnects, and parsing incoming and serializing outgoing data.

WebSocket Listeners#

WebSocket Listeners can be used to interact with a WebSocket in an event-driven manner, using a callback style interface. They treat a WebSocket handler like any other route handler: A callable that takes in incoming data in an already pre-processed form and returns data to be serialized and sent over the connection. The low level details will be handled behind the curtains.

from litestar import Litestar
from litestar.handlers.websocket_handlers import websocket_listener


@websocket_listener("/")
async def handler(data: str) -> str:
    return data


app = Litestar([handler])

This handler will accept connections on /, and wait to receive data. Once a message has been received, it will be passed into the handler function defined, via the data parameter. This works like a regular route handler, so it’s possible to specify the type of data which should be received, and it will be converted accordingly.

Note

Contrary to WebSocket route handlers, functions decorated with websocket_listener don’t have to be asynchronous.

Receiving data#

Data can be received in the listener via the data parameter. The data passed to this will be converted / parsed according to the given type annotation and supports str, bytes, or arbitrary dicts / or lists in the form of JSON.

from typing import Dict

from litestar import Litestar, websocket_listener


@websocket_listener("/")
async def handler(data: Dict[str, str]) -> Dict[str, str]:
    return data


app = Litestar([handler])
from litestar import Litestar, websocket_listener


@websocket_listener("/")
async def handler(data: dict[str, str]) -> dict[str, str]:
    return data


app = Litestar([handler])
from litestar import Litestar, websocket_listener


@websocket_listener("/")
async def handler(data: str) -> str:
    return data


app = Litestar([handler])
from litestar import Litestar, websocket_listener


@websocket_listener("/")
async def handler(data: bytes) -> str:
    return data.decode("utf-8")


app = Litestar([handler])

Important

Contrary to route handlers, JSON data will only be parsed but not validated. This is a limitation of the current implementation and will change in future versions.

Sending data#

Sending data is done by simply returning the value to be sent from the handler function. Similar to receiving data, type annotations configure how the data is being handled. Values that are not str or bytes are assumed to be JSON encodable and will be serialized accordingly before being sent. This serialization is available for all data types currently supported by Litestar ( dataclasses, TypedDict, NamedTuple, msgspec.Struct, etc.), including DTOs.

from litestar import Litestar, websocket_listener


@websocket_listener("/")
async def handler(data: str) -> str:
    return data


app = Litestar([handler])
from litestar import Litestar, websocket_listener


@websocket_listener("/")
async def handler(data: str) -> bytes:
    return data.encode("utf-8")


app = Litestar([handler])
from typing import Dict

from litestar import Litestar, websocket_listener


@websocket_listener("/")
async def handler(data: str) -> Dict[str, str]:
    return {"message": data}


app = Litestar([handler])
from litestar import Litestar, websocket_listener


@websocket_listener("/")
async def handler(data: str) -> dict[str, str]:
    return {"message": data}


app = Litestar([handler])
from dataclasses import dataclass
from datetime import datetime

from litestar import Litestar, websocket_listener


@dataclass
class Message:
    content: str
    timestamp: float


@websocket_listener("/")
async def handler(data: str) -> Message:
    return Message(content=data, timestamp=datetime.now().timestamp())


app = Litestar([handler])

Setting transport modes#

Receive mode#

text is the default mode and is appropriate for most messages, including structured data such as JSON.

from litestar import Litestar, websocket_listener


@websocket_listener("/", receive_mode="text")
async def handler(data: str) -> str:
    return data


app = Litestar([handler])
from litestar import Litestar, websocket_listener


@websocket_listener("/", receive_mode="binary")
async def handler(data: str) -> str:
    return data


app = Litestar([handler])

Important

Once configured with a mode, a listener will only listen to socket events of the appropriate type. This means if a listener is configured to use binary mode, it will not respond to WebSocket events sending data in the text channel.

Send mode#

text is the default mode and is appropriate for most messages, including structured data such as JSON.

from litestar import Litestar, websocket_listener


@websocket_listener("/", send_mode="text")
async def handler(data: str) -> str:
    return data


app = Litestar([handler])
from litestar import Litestar, websocket_listener


@websocket_listener("/", send_mode="binary")
async def handler(data: str) -> str:
    return data


app = Litestar([handler])

Dependency injection#

Dependency Injection is available and generally works the same as in regular route handlers:

from litestar import Litestar, websocket_listener
from litestar.di import Provide


def some_dependency() -> str:
    return "hello"


@websocket_listener("/", dependencies={"some": Provide(some_dependency)})
async def handler(data: str, some: str) -> str:
    return data + some


app = Litestar([handler])

Important

Injected dependencies work on the level of the underlying route handler. This means they won’t be re-evaluated every time the listener function is called.

The following example makes use of yield dependencies and the fact that dependencies are only evaluated once for every connection; The step after the yield will only be executed after the connection has been closed.

from typing import TypedDict

from litestar import Litestar, websocket_listener
from litestar.datastructures import State
from litestar.di import Provide


class Message(TypedDict):
    message: str
    client_count: int


def socket_client_count(state: State) -> int:
    if not hasattr(state, "count"):
        state.count = 0

    state.count += 1
    yield state.count
    state.count -= 1


@websocket_listener("/", dependencies={"client_count": Provide(socket_client_count)})
async def handler(data: str, client_count: int) -> Message:
    return Message(message=data, client_count=client_count)


app = Litestar([handler])

Interacting with the WebSocket directly#

Sometimes access to the socket instance is needed, in which case the WebSocket instance can be injected into the handler function via the socket argument:

from litestar import Litestar, WebSocket, websocket_listener


@websocket_listener("/")
async def handler(data: str, socket: WebSocket) -> str:
    if data == "goodbye":
        await socket.close()

    return data


app = Litestar([handler])

Important

Since WebSockets are inherently asynchronous, to interact with the asynchronous methods on WebSocket, the handler function needs to be asynchronous.

Customising connection acceptance#

By default, Litestar will accept all incoming connections by awaiting WebSocket.accept() without arguments. This behavior can be customized by passing a custom connection_accept_handler function. Litestar will await this function to accept the connection.

from litestar import Litestar, WebSocket, websocket_listener


async def accept_connection(socket: WebSocket) -> None:
    await socket.accept(headers={"Cookie": "custom-cookie"})


@websocket_listener("/", connection_accept_handler=accept_connection)
def handler(data: str) -> str:
    return data


app = Litestar([handler])

Class based WebSocket handling#

In addition to using a simple function as in the examples above, a class based approach is made possible by extending the WebSocketListener. This provides convenient access to socket events such as connect and disconnect, and can be used to encapsulate more complex logic.

from litestar import Litestar, WebSocket
from litestar.handlers import WebsocketListener


class Handler(WebsocketListener):
    path = "/"

    def on_accept(self, socket: WebSocket) -> None:
        print("Connection accepted")

    def on_disconnect(self, socket: WebSocket) -> None:
        print("Connection closed")

    def on_receive(self, data: str) -> str:
        return data


app = Litestar([Handler])
from litestar import Litestar, WebSocket
from litestar.handlers import WebsocketListener


class Handler(WebsocketListener):
    path = "/"

    async def on_accept(self, socket: WebSocket) -> None:
        print("Connection accepted")

    async def on_disconnect(self, socket: WebSocket) -> None:
        print("Connection closed")

    async def on_receive(self, data: str) -> str:
        return data


app = Litestar([Handler])

Custom WebSocket#

New in version 2.7.0.

Litestar supports custom websocket_class instances, which can be used to further configure the default WebSocket. The example below illustrates how to implement a custom WebSocket class for the whole application.

Example of a custom websocket at the application level
from __future__ import annotations

from litestar import Litestar, WebSocket, websocket_listener
from litestar.types.asgi_types import WebSocketMode


class CustomWebSocket(WebSocket):
    async def receive_data(self, mode: WebSocketMode) -> str | bytes:
        """Return fixed response for every websocket message."""
        await super().receive_data(mode=mode)
        return "Fixed response"


@websocket_listener("/")
async def handler(data: str) -> str:
    return data


app = Litestar([handler], websocket_class=CustomWebSocket)

Layered architecture

WebSocket classes are part of Litestar’s layered architecture, which means you can set a WebSocket class on every layer of the application. If you have set a WebSocket class on multiple layers, the layer closest to the route handler will take precedence.

You can read more about this in the Layered architecture section

WebSocket Streams#

WebSocket streams can be used to proactively push data to a client, using an asynchronous generator function. Data will be sent via the socket every time the generator yields, until it is either exhausted or the client disconnects.

Streaming the current time in 0.5 second intervals#
import asyncio
import time
from typing import AsyncGenerator

from litestar import Litestar, websocket_stream


@websocket_stream("/")
async def ping() -> AsyncGenerator[float, None]:
    while True:
        yield time.time()
        await asyncio.sleep(0.5)


app = Litestar([ping])
Streaming the current time in 0.5 second intervals#
import asyncio
import time
from collections.abc import AsyncGenerator

from litestar import Litestar, websocket_stream


@websocket_stream("/")
async def ping() -> AsyncGenerator[float, None]:
    while True:
        yield time.time()
        await asyncio.sleep(0.5)


app = Litestar([ping])

Serialization#

Just like with route handlers, type annotations configure how the data is being handled. str or bytes will be sent as-is, while everything else will be encoded as JSON before being sent. This serialization is available for all data types currently supported by Litestar (dataclasses, TypedDict, NamedTuple, msgspec.Struct, etc.), including DTOs.

Dependency Injection#

Dependency injection is available and works analogous to regular route handlers.

Important

One thing to keep in mind, especially for long-lived streams, is that dependencies are scoped to the lifetime of the handler. This means that if for example a database connection is acquired in a dependency, it will be held until the generator stops. This may not be desirable in all cases, and acquiring resources ad-hoc inside the generator itself preferable

Bad: The lock will be held until the client disconnects#
import asyncio
from typing import AsyncGenerator

from app.lib import ping_external_resource
from litestar import Litestar, websocket_stream

RESOURCE_LOCK = asyncio.Lock()


async def acquire_lock() -> AsyncGenerator[None, None]:
    async with RESOURCE_LOCK:
        yield


@websocket_stream("/")
async def ping(lock: asyncio.Lock) -> AsyncGenerator[float, None]:
    while True:
        alive = await ping_external_resource()
        yield alive
        await asyncio.sleep(1)


app = Litestar([ping], dependencies={"lock": acquire_lock})
Bad: The lock will be held until the client disconnects#
import asyncio
from collections.abc import AsyncGenerator

from app.lib import ping_external_resource
from litestar import Litestar, websocket_stream

RESOURCE_LOCK = asyncio.Lock()


async def acquire_lock() -> AsyncGenerator[None, None]:
    async with RESOURCE_LOCK:
        yield


@websocket_stream("/")
async def ping(lock: asyncio.Lock) -> AsyncGenerator[float, None]:
    while True:
        alive = await ping_external_resource()
        yield alive
        await asyncio.sleep(1)


app = Litestar([ping], dependencies={"lock": acquire_lock})
Good: The lock will only be acquired when it’s needed#
import asyncio
from typing import AsyncGenerator

from app.lib import ping_external_resource
from litestar import Litestar, websocket_stream

RESOURCE_LOCK = asyncio.Lock()


@websocket_stream("/")
async def ping() -> AsyncGenerator[float, None]:
    while True:
        async with RESOURCE_LOCK:
            alive = await ping_external_resource()
        yield alive
        await asyncio.sleep(1)


app = Litestar([ping])
Good: The lock will only be acquired when it’s needed#
import asyncio
from collections.abc import AsyncGenerator

from app.lib import ping_external_resource
from litestar import Litestar, websocket_stream

RESOURCE_LOCK = asyncio.Lock()


@websocket_stream("/")
async def ping() -> AsyncGenerator[float, None]:
    while True:
        async with RESOURCE_LOCK:
            alive = await ping_external_resource()
        yield alive
        await asyncio.sleep(1)


app = Litestar([ping])

Interacting with the WebSocket directly#

To interact with the WebSocket directly, it can be injected into the generator function via the socket argument:

import asyncio
import time
from typing import Any, AsyncGenerator

from litestar import Litestar, WebSocket, websocket_stream


@websocket_stream("/")
async def ping(socket: WebSocket) -> AsyncGenerator[dict[str, Any], None]:
    while True:
        yield {"time": time.time(), "client": socket.client}
        await asyncio.sleep(0.5)


app = Litestar([ping])
import asyncio
import time
from typing import Any
from collections.abc import AsyncGenerator

from litestar import Litestar, WebSocket, websocket_stream


@websocket_stream("/")
async def ping(socket: WebSocket) -> AsyncGenerator[dict[str, Any], None]:
    while True:
        yield {"time": time.time(), "client": socket.client}
        await asyncio.sleep(0.5)


app = Litestar([ping])

Receiving data while streaming#

By default, a stream will listen for a client disconnect in the background, and stop the generator once received. Since this requires receiving data from the socket, it can lead to data loss if the application is attempting to read from the same socket simultaneously.

Tip

To prevent data loss, by default, websocket_stream will raise an exception if it receives any data while listening for client disconnects. If incoming data should be ignored, allow_data_discard should be set to True

If receiving data while streaming is desired, send_websocket_stream() can be configured to not listen for disconnects by setting listen_for_disconnect=False.

Important

When using listen_for_disconnect=False, the application needs to ensure the disconnect event is received elsewhere, otherwise the stream will only terminate when the generator is exhausted

Combining streaming and receiving data#

To stream and receive data concurrently, the stream can be set up manually using send_websocket_stream() in combination with either a regular websocket handler or a WebSocket listener.

import asyncio
import time
from typing import Any, AsyncGenerator

from litestar import Litestar, WebSocket, websocket_listener
from litestar.handlers import send_websocket_stream


async def listener_lifespan(socket: WebSocket) -> None:
    async def handle_stream() -> AsyncGenerator[dict[str, float], None]:
        while True:
            yield {"time": time.time()}
            await asyncio.sleep(0.5)

    task = asyncio.create_task(send_websocket_stream(socket=socket, stream=handle_stream()))
    yield
    task.cancel()
    await task


@websocket_listener("/", connection_lifespan=listener_lifespan)
def handler(socket: WebSocket, data: Any) -> None:
    print(f"{socket.client}: {data}")


app = Litestar([handler])
import asyncio
import time
from typing import Any
from collections.abc import AsyncGenerator

from litestar import Litestar, WebSocket, websocket_listener
from litestar.handlers import send_websocket_stream


async def listener_lifespan(socket: WebSocket) -> None:
    async def handle_stream() -> AsyncGenerator[dict[str, float], None]:
        while True:
            yield {"time": time.time()}
            await asyncio.sleep(0.5)

    task = asyncio.create_task(send_websocket_stream(socket=socket, stream=handle_stream()))
    yield
    task.cancel()
    await task


@websocket_listener("/", connection_lifespan=listener_lifespan)
def handler(socket: WebSocket, data: Any) -> None:
    print(f"{socket.client}: {data}")


app = Litestar([handler])
import asyncio
import time
from typing import AsyncGenerator

from litestar import Litestar, WebSocket, websocket
from litestar.handlers import send_websocket_stream


@websocket("/")
async def handler(socket: WebSocket) -> None:
    await socket.accept()

    async def handle_stream() -> AsyncGenerator[dict[str, float], None]:
        while True:
            yield {"time": time.time()}
            await asyncio.sleep(0.5)

    async def handle_receive() -> None:
        async for event in socket.iter_json():
            print(f"{socket.client}: {event}")

    async with asyncio.TaskGroup() as tg:
        tg.create_task(send_websocket_stream(socket=socket, stream=handle_stream()))
        tg.create_task(handle_receive())


app = Litestar([handler])
import asyncio
import time
from collections.abc import AsyncGenerator

from litestar import Litestar, WebSocket, websocket
from litestar.handlers import send_websocket_stream


@websocket("/")
async def handler(socket: WebSocket) -> None:
    await socket.accept()

    async def handle_stream() -> AsyncGenerator[dict[str, float], None]:
        while True:
            yield {"time": time.time()}
            await asyncio.sleep(0.5)

    async def handle_receive() -> None:
        async for event in socket.iter_json():
            print(f"{socket.client}: {event}")

    async with asyncio.TaskGroup() as tg:
        tg.create_task(send_websocket_stream(socket=socket, stream=handle_stream()))
        tg.create_task(handle_receive())


app = Litestar([handler])

Transport modes#

WebSockets have two transport modes: text and binary. They dictate how bytes are transferred over the wire and can be set independently from another, i.e. a socket can send binary and receive text

It may seem intuitive that text and binary should map to str and bytes respectively, but this is not the case. WebSockets can receive and send data in any format, independently of the mode. The mode only affects how the bytes are handled during transport (i.e. on the protocol level). In most cases the default mode - text - is all that’s needed. Binary transport is usually employed when sending binary blobs that don’t have a meaningful string representation, such as images.