Channels#

Channels are a group of related functionalities, built to facilitate the routing of event streams, which for example can be used to broadcast messages to WebSocket clients.

Channels provide:

  1. Independent broker backends, optionally handling inter-process communication and data persistence on demand

  2. “Channel” based subscription management

  3. Subscriber objects as an abstraction over an individualized event stream, providing background workers and managed subscriptions

  4. Synchronous and asynchronous data publishing

  5. Optional history management on a per-channel basis

  6. WebSocket integration, generating WebSocket route handlers for an application, to handle the subscription and publishing of incoming events to the connected client

Basic concepts#

Utilizing channels involves a few moving parts, of which the most important ones are:

event#

A single piece of data published to, or received from a backend bound to the channel it was originally published to

event stream#

A stream of events, consisting of events from all the channels a Subscriber has previously subscribed to

subscriber#

A Subscriber: An object wrapping an event stream and providing access to it through various methods

backend#

A ChannelsBackend. This object manages communication between the plugin and the broker, publishing messages to and receiving messages from it. Each plugin instance is associated with exactly one backend.

broker#

Responsible for receiving and publishing messages to all connected backends; All backends sharing the same broker will have access to the same messages, allowing for inter-process communication. This is typically handled by a separate entity like Redis

plugin#

The ChannelsPlugin, a central instance managing subscribers, reading messages from the backend, putting them in the appropriate event stream, and publishing data to the backend

Flowcharts#

flowchart LR Backend(Backend) --> Broker[(Broker)] Plugin{{Plugin}} --> Backend Application[[Application]] --> Plugin

Publishing flow from the application to the broker#

flowchart TD Broker[(Broker)] Broker --> Backend_1(Backend) Broker --> Backend_2(Backend) Backend_1 --> Plugin_1{{Plugin}} Backend_2 --> Plugin_2{{Plugin}} Plugin_1 --> Subscriber_1[Subscriber] Plugin_1 --> Subscriber_2[Subscriber] Plugin_1 --> Subscriber_3[Subscriber] Plugin_2 --> Subscriber_4[Subscriber] Plugin_2 --> Subscriber_5[Subscriber] Plugin_2 --> Subscriber_6[Subscriber]

Fanout flow of data from the broker to the sockets, showing multiple plugin instances#

The ChannelsPlugin#

The ChannelsPlugin acts as the central entity for managing channels and subscribers. It’s used to publish messages, control how data is stored, and manage subscribers, route handlers, and configuration.

Tip

The plugin makes itself available as a dependency under the channels key, which means it’s not necessary to import it and instead, it can be used from within route handlers or other callables within the dependency tree directly

Configuring the channels#

The channels managed by the plugin can be either defined upfront, passing them to the channels argument, or created “on the fly” (i.e. on the first subscription to a channel) by setting arbitrary_channels_allowed=True.

Passing channels explicitly#
from litestar.channels import ChannelsPlugin

channels_plugin = ChannelsPlugin(..., channels=["foo", "bar"])
Allowing arbitrary channels#
from litestar.channels import ChannelsPlugin

channels_plugin = ChannelsPlugin(..., arbitrary_channels_allowed=True)

If arbitrary_channels_allowed is not True, trying to publish or subscribe to a channel not passed to channels will raise a ChannelsException.

Publishing data#

One of the core aspects of the plugin is publishing data, which is done through its publish method:

channels.publish({"message": "Hello"}, "general")

The above example will publish the data to the channel general, subsequently putting it into all subscriber’s event stream to be consumed.

This method is non-blocking, even though channels and the associated backends are fundamentally asynchronous.

Calling publish effectively enqueues a message to be sent to the backend, from which follows that there’s no guarantee that an event will be available in the backend immediately after this call.

Alternatively, the asynchronous wait_published method can be used, which skips the internal message queue, publishing the data to the backend directly.

Note

While calling publish does not guarantee the message is sent to the backend immediately, it will be sent there eventually; On shutdown, the plugin will wait for all queues to empty

Managing subscriptions#

Another core functionality of the plugin is managing subscriptions, for which two different approaches exist:

  1. Manually through the subscribe and unsubscribe methods

  2. By using the start_subscription context manager

Both subscribe and start_subscription produce a Subscriber, which can be used to interact with the streams of events subscribed to.

The context manager should be preferred, since it ensures that channels are being unsubscribed. Using the subscriber and unsubscribe methods directly should only be done when a context manager cannot be used, e.g. when the subscription would span different contexts.

Calling the subscription methods manually#
subscriber = await channels.subscribe(["foo", "bar"])
try:
    ...  # do some stuff here
finally:
    await channels.unsubscribe(subscriber)
Using the context manager#
async with channels.start_subscription(["foo", "bar"]) as subscriber:
    ...  # do some stuff here

It is also possible to unsubscribe from individual channels, which may be desirable if subscriptions need to be managed dynamically.

subscriber = await channels.subscribe(["foo", "bar"])
...  # do some stuff here
await channels.unsubscribe(subscriber, ["foo"])

Or, using the context manager

async with channels.start_subscription(["foo", "bar"]) as subscriber:
    ...  # do some stuff here
    await channels.unsubscribe(subscriber, ["foo"])

Managing history#

Some backends support per-channel history, keeping a certain amount of events in storage. This history can then be pushed to a subscriber.

The plugin’s put_subscriber_history can be used to fetch this history and put it into a subscriber’s event stream.

from litestar import Litestar, WebSocket, websocket
from litestar.channels import ChannelsPlugin
from litestar.channels.backends.memory import MemoryChannelsBackend


@websocket("/ws")
async def handler(socket: WebSocket, channels: ChannelsPlugin) -> None:
    await socket.accept()

    async with channels.start_subscription(["some_channel"]) as subscriber:
        await channels.put_subscriber_history(subscriber, ["some_channel"], limit=10)


app = Litestar(
    [handler],
    plugins=[ChannelsPlugin(backend=MemoryChannelsBackend(history=20))],
)

Note

The publication of the history happens sequentially, one channel and one event at a time. This is done to ensure the correct ordering of events and to avoid filling up a subscriber’s backlog, which would result in dropped history entries. Should the amount of entries exceed the maximum backlog size, the execution will wait until previous events have been processed.

The Subscriber#

The Subscriber manages an individual event stream, provided to it by the plugin, representing the sum of events from all channels the subscriber has subscribed to.

It can be considered the endpoint of all events, while the backends act as the source, and the plugin as a router, being responsible for supplying events gathered from the backend into the appropriate subscriber’s streams.

In addition to being an abstraction of an event stream, the Subscriber provides two methods to handle this stream:

iter_events

An asynchronous generator, producing one event from the stream at a time, waiting until the next one becomes available

run_in_background

A context manager, wrapping an asyncio.Task, consuming events yielded by iter_events, invoking a provided callback for each of them. Upon exit, it will attempt a graceful shutdown of the running task, waiting for all currently enqueued events in the stream to be processed. If the context exits with an error, the task will be cancelled instead.

Tip

It’s possible to force the task to stop immediately, by passing join=False to run_in_background, which will lead to the cancellation of the task. By default this only happens when the context is left with an exception.

Important

The events in the event streams are always bytes; When calling ChannelsPlugin.publish(), data will be serialized before being sent to the backend.

Consuming the event stream#

There are two general methods of consuming the event stream:

  1. By iterating over it directly, using iter_events

  2. By using the run_in_background context manager, which starts a background task, iterating over the stream, invoking a provided callback for every event received

Iterating over the stream directly is mostly useful if processing the events is the only concern, since iter_events is effectively an infinite loop. For all other applications, using the context manager is preferable, since it allows to easily run other code concurrently.

from litestar import Litestar, WebSocket, websocket
from litestar.channels import ChannelsPlugin
from litestar.channels.backends.memory import MemoryChannelsBackend


@websocket("/ws")
async def handler(socket: WebSocket, channels: ChannelsPlugin) -> None:
    await socket.accept()

    async with channels.start_subscription(["some_channel"]) as subscriber:
        async for message in subscriber.iter_events():
            await socket.send_text(message)


app = Litestar(
    [handler],
    plugins=[ChannelsPlugin(backend=MemoryChannelsBackend())],
)

In the above example, the stream is used to send data to a WebSocket.

The same can be achieve by passing WebbSocket.send_text as the callback to run_in_background. This will cause the WebSocket’s method to be invoked every time a new event becomes available in the stream, but gives control back to the application, providing an opportunity to perform other tasks, such as receiving incoming data from the socket.

from litestar import Litestar, WebSocket, websocket
from litestar.channels import ChannelsPlugin
from litestar.channels.backends.memory import MemoryChannelsBackend


@websocket("/ws")
async def handler(socket: WebSocket, channels: ChannelsPlugin) -> None:
    await socket.accept()

    async with channels.start_subscription(["some_channel"]) as subscriber, subscriber.run_in_background(
        socket.send_text
    ):
        while True:
            response = await socket.receive_text()
            await socket.send_text(response)


app = Litestar(
    [handler],
    plugins=[ChannelsPlugin(backend=MemoryChannelsBackend(), channels=["some_channel"])],
)

Important

Iterating over iter_events should be approached with caution when being used together with WebSockets.

Since WebSocketDisconnect is only raised after the corresponding ASGI event has been received, it can result in an indefinitely suspended coroutine. This can happen if for example the client disconnects, but no further events are received. The generator will then wait for new events, but since it will never receive any, no send call on the WebSocket will be made, which in turn means no exception will be raised to break the loop.

Managing backpressure#

Each subscriber manages its own backlog: A queue of unprocessed events. By default, this backlog is unlimited in size, allowing it to grow indefinitely. For most applications, this should be no issue, but when the recipient consistently can’t process messages faster than they come in, an application might opt to handle this case.

The channels plugin provides two different strategies for managing this backpressure:

  1. A backoff strategy, dropping newly incoming messages as long as the backlog is full

  2. An eviction strategy, dropping the oldest message in the backlog when a new one is added while the backlog is full

Backoff strategy#
from litestar.channels import ChannelsPlugin
from litestar.channels.memory import MemoryChannelsBackend

channels = ChannelsPlugin(
    backend=MemoryChannelsBackend(),
    max_backlog=1000,
    backlog_strategy="backoff",
)
Eviction strategy#
from litestar.channels import ChannelsPlugin
from litestar.channels.memory import MemoryChannelsBackend

channels = ChannelsPlugin(
    backend=MemoryChannelsBackend(),
    max_backlog=1000,
    backlog_strategy="dropleft",
)

Backends#

The storing and fanout of messages is handled by a ChannelsBackend. Currently implemented are:

MemoryChannelsBacked

A basic in-memory backend, mostly useful for testing and local development, but still fully capable. Since it stores all data in-process, it can achieve the highest performance of all the backends, but at the same time is not suitable for applications running on multiple processes.

RedisChannelsPubSubBackend

A Redis based backend, using Pub/Sub to delivery messages. This Redis backend has a low latency and overhead and is generally recommended if history is not needed

RedisChannelsStreamBackend

A redis based backend, using streams to deliver messages. It has a slightly higher latency when publishing than the Pub/Sub backend, but achieves the same throughput in message fanout. Recommended when history is needed

AsyncPgChannelsBackend

A postgres backend using the asyncpg driver

PsycoPgChannelsBackend

A postgres backend using the psycopg3 async driver

Integrating with websocket handlers#

Generating route handlers#

A common pattern is to create a route handler per channel, sending data to the connected client from that channel. This can be fully automated, using the plugin to create these route handlers.

Setting create_ws_route_handlers=True will create route handlers for all channels#
from litestar import Litestar
from litestar.channels import ChannelsPlugin
from litestar.channels.backends.memory import MemoryChannelsBackend

channels_plugin = ChannelsPlugin(
    backend=MemoryChannelsBackend(),
    channels=["foo", "bar"],
    create_ws_route_handlers=True,
)

app = Litestar(plugins=[channels_plugin])

The generated route handlers can optionally be configured to send the channel’s history after a client has connected:

Sending the first 10 history entries after a client connects#
from litestar import Litestar
from litestar.channels import ChannelsPlugin
from litestar.channels.backends.memory import MemoryChannelsBackend

channels_plugin = ChannelsPlugin(
    backend=MemoryChannelsBackend(history=10),  # set the amount of messages per channel
    # to keep in the backend
    channels=["foo", "bar"],
    create_ws_route_handlers=True,
    ws_handler_send_history=10,  # send 10 entries of the history by default
)

app = Litestar(plugins=[channels_plugin])

Tip

When using the arbitrary_channels_allowed flag on the ChannelsPlugin, a single route handler will be generated instead, using a path parameter to specify the channel name