Channels#
Channels are a group of related functionalities, built to facilitate the routing of event streams, which for example can be used to broadcast messages to WebSocket clients.
Channels provide:
Independent broker backends, optionally handling inter-process communication and data persistence on demand
“Channel” based subscription management
Subscriber objects as an abstraction over an individualized event stream, providing background workers and managed subscriptions
Synchronous and asynchronous data publishing
Optional history management on a per-channel basis
WebSocket integration, generating WebSocket route handlers for an application, to handle the subscription and publishing of incoming events to the connected client
Basic concepts#
Utilizing channels involves a few moving parts, of which the most important ones are:
- event#
A single piece of data published to, or received from a backend bound to the channel it was originally published to
- event stream#
A stream of events, consisting of events from all the channels a Subscriber has previously subscribed to
- subscriber#
A
Subscriber
: An object wrapping an event stream and providing access to it through various methods- backend#
A
ChannelsBackend
. This object manages communication between the plugin and the broker, publishing messages to and receiving messages from it. Each plugin instance is associated with exactly one backend.- broker#
Responsible for receiving and publishing messages to all connected backends; All backends sharing the same broker will have access to the same messages, allowing for inter-process communication. This is typically handled by a separate entity like Redis
- plugin#
The
ChannelsPlugin
, a central instance managing subscribers, reading messages from the backend, putting them in the appropriate event stream, and publishing data to the backend
Flowcharts#
The ChannelsPlugin
#
The ChannelsPlugin
acts as the central entity for managing channels and
subscribers. It’s used to publish messages, control how data is stored, and manage
subscribers, route handlers, and configuration.
Tip
The plugin makes itself available as a dependency under the channels
key, which
means it’s not necessary to import it and instead, it can be used from within route
handlers or other callables within the dependency tree directly
Configuring the channels#
The channels managed by the plugin can be either defined upfront, passing them to the
channels
argument, or created “on the fly” (i.e. on the first subscription to a
channel) by setting arbitrary_channels_allowed=True
.
from litestar.channels import ChannelsPlugin
channels_plugin = ChannelsPlugin(..., channels=["foo", "bar"])
from litestar.channels import ChannelsPlugin
channels_plugin = ChannelsPlugin(..., arbitrary_channels_allowed=True)
If arbitrary_channels_allowed
is not True
, trying to publish or subscribe to a
channel not passed to channels
will raise a ChannelsException
.
Publishing data#
One of the core aspects of the plugin is publishing data, which is done through its
publish
method:
channels.publish({"message": "Hello"}, "general")
The above example will publish the data to the channel general
, subsequently putting
it into all subscriber’s event stream to be consumed.
This method is non-blocking, even though channels and the associated backends are fundamentally asynchronous.
Calling publish
effectively enqueues a message to be sent to the backend, from which
follows that there’s no guarantee that an event will be available in the backend
immediately after this call.
Alternatively, the asynchronous wait_published
method can be used, which skips the internal message queue, publishing the data to the
backend directly.
Note
While calling publish
does not guarantee the
message is sent to the backend immediately, it will be sent there eventually; On
shutdown, the plugin will wait for all queues to empty
Managing subscriptions#
Another core functionality of the plugin is managing subscriptions, for which two different approaches exist:
Manually through the
subscribe
andunsubscribe
methodsBy using the
start_subscription
context manager
Both subscribe
and
start_subscription
produce a
Subscriber
, which can be used to interact with the streams of events subscribed
to.
The context manager should be preferred, since it ensures that channels are being
unsubscribed. Using the subscriber
and unsubscribe
methods directly should only
be done when a context manager cannot be used, e.g. when the subscription would span
different contexts.
subscriber = await channels.subscribe(["foo", "bar"])
try:
... # do some stuff here
finally:
await channels.unsubscribe(subscriber)
async with channels.start_subscription(["foo", "bar"]) as subscriber:
... # do some stuff here
It is also possible to unsubscribe from individual channels, which may be desirable if subscriptions need to be managed dynamically.
subscriber = await channels.subscribe(["foo", "bar"])
try:
... # do some stuff here
finally:
await channels.unsubscribe(subscriber, ["foo"])
Or, using the context manager
async with channels.start_subscription(["foo", "bar"]) as subscriber:
... # do some stuff here
await channels.unsubscribe(subscriber, ["foo"])
Managing history#
Some backends support per-channel history, keeping a certain amount of events in storage. This history can then be pushed to a subscriber.
The plugin’s put_subscriber_history
can
be used to fetch this history and put it into a subscriber’s event stream.
from litestar import Litestar, WebSocket, websocket
from litestar.channels import ChannelsPlugin
from litestar.channels.backends.memory import MemoryChannelsBackend
@websocket("/ws")
async def handler(socket: WebSocket, channels: ChannelsPlugin) -> None:
await socket.accept()
async with channels.start_subscription(["some_channel"]) as subscriber:
await channels.put_subscriber_history(subscriber, ["some_channel"], limit=10)
app = Litestar(
[handler],
plugins=[ChannelsPlugin(backend=MemoryChannelsBackend(history=20))],
)
Note
The publication of the history happens sequentially, one channel and one event at a time. This is done to ensure the correct ordering of events and to avoid filling up a subscriber’s backlog, which would result in dropped history entries. Should the amount of entries exceed the maximum backlog size, the execution will wait until previous events have been processed.
See also
The Subscriber
#
The Subscriber
manages an individual event stream, provided to it by
the plugin, representing the sum of events from all channels the subscriber has
subscribed to.
It can be considered the endpoint of all events, while the backends act as the source, and the plugin as a router, being responsible for supplying events gathered from the backend into the appropriate subscriber’s streams.
In addition to being an abstraction of an event stream, the Subscriber
provides two methods to handle this stream:
iter_events
An asynchronous generator, producing one event from the stream at a time, waiting until the next one becomes available
run_in_background
A context manager, wrapping an
asyncio.Task
, consuming events yielded byiter_events
, invoking a provided callback for each of them. Upon exit, it will attempt a graceful shutdown of the running task, waiting for all currently enqueued events in the stream to be processed. If the context exits with an error, the task will be cancelled instead.Tip
It’s possible to force the task to stop immediately, by passing
join=False
torun_in_background
, which will lead to the cancellation of the task. By default this only happens when the context is left with an exception.
Important
The events in the event streams are always
bytes; When calling ChannelsPlugin.publish()
, data will be serialized before
being sent to the backend.
Consuming the event stream#
There are two general methods of consuming the event stream:
By iterating over it directly, using
iter_events
By using the
run_in_background
context manager, which starts a background task, iterating over the stream, invoking a provided callback for every event received
Iterating over the stream directly is mostly useful if processing the events is the only
concern, since iter_events
is effectively an infinite
loop. For all other applications, using the context manager is preferable, since it
allows to easily run other code concurrently.
from litestar import Litestar, WebSocket, websocket
from litestar.channels import ChannelsPlugin
from litestar.channels.backends.memory import MemoryChannelsBackend
@websocket("/ws")
async def handler(socket: WebSocket, channels: ChannelsPlugin) -> None:
await socket.accept()
async with channels.start_subscription(["some_channel"]) as subscriber:
async for message in subscriber.iter_events():
await socket.send_text(message)
app = Litestar(
[handler],
plugins=[ChannelsPlugin(backend=MemoryChannelsBackend())],
)
In the above example, the stream is used to send data to a
WebSocket
.
The same can be achieve by passing
WebbSocket.send_text
as the callback
to run_in_background
. This will cause the
WebSocket’s method to be invoked every time a new event becomes available in the stream,
but gives control back to the application, providing an opportunity to perform other
tasks, such as receiving incoming data from the socket.
from litestar import Litestar, WebSocket, websocket
from litestar.channels import ChannelsPlugin
from litestar.channels.backends.memory import MemoryChannelsBackend
@websocket("/ws")
async def handler(socket: WebSocket, channels: ChannelsPlugin) -> None:
await socket.accept()
async with channels.start_subscription(["some_channel"]) as subscriber, subscriber.run_in_background(
socket.send_text
):
while True:
response = await socket.receive_text()
await socket.send_text(response)
app = Litestar(
[handler],
plugins=[ChannelsPlugin(backend=MemoryChannelsBackend(), channels=["some_channel"])],
)
Important
Iterating over iter_events
should be approached
with caution when being used together with WebSockets.
Since WebSocketDisconnect
is only raised after the corresponding ASGI event
has been received, it can result in an indefinitely suspended coroutine. This can
happen if for example the client disconnects, but no further events are received.
The generator will then wait for new events, but since it will never receive any,
no send
call on the WebSocket will be made, which in turn means no exception
will be raised to break the loop.
Managing backpressure#
Each subscriber manages its own backlog: A queue of unprocessed events. By default, this backlog is unlimited in size, allowing it to grow indefinitely. For most applications, this should be no issue, but when the recipient consistently can’t process messages faster than they come in, an application might opt to handle this case.
The channels plugin provides two different strategies for managing this backpressure:
A backoff strategy, dropping newly incoming messages as long as the backlog is full
An eviction strategy, dropping the oldest message in the backlog when a new one is added while the backlog is full
from litestar.channels import ChannelsPlugin
from litestar.channels.memory import MemoryChannelsBackend
channels = ChannelsPlugin(
backend=MemoryChannelsBackend(),
max_backlog=1000,
backlog_strategy="backoff",
)
from litestar.channels import ChannelsPlugin
from litestar.channels.memory import MemoryChannelsBackend
channels = ChannelsPlugin(
backend=MemoryChannelsBackend(),
max_backlog=1000,
backlog_strategy="dropleft",
)
Backends#
The storing and fanout of messages is handled by a
ChannelsBackend
. Currently
implemented are:
MemoryChannelsBacked
A basic in-memory backend, mostly useful for testing and local development, but still fully capable. Since it stores all data in-process, it can achieve the highest performance of all the backends, but at the same time is not suitable for applications running on multiple processes.
RedisChannelsPubSubBackend
A Redis based backend, using Pub/Sub to delivery messages. This Redis backend has a low latency and overhead and is generally recommended if history is not needed
RedisChannelsStreamBackend
A redis based backend, using streams to deliver messages. It has a slightly higher latency when publishing than the Pub/Sub backend, but achieves the same throughput in message fanout. Recommended when history is needed
AsyncPgChannelsBackend
A postgres backend using the asyncpg driver
PsycoPgChannelsBackend
A postgres backend using the psycopg3 async driver
Integrating with websocket handlers#
Generating route handlers#
A common pattern is to create a route handler per channel, sending data to the connected client from that channel. This can be fully automated, using the plugin to create these route handlers.
from litestar import Litestar
from litestar.channels import ChannelsPlugin
from litestar.channels.backends.memory import MemoryChannelsBackend
channels_plugin = ChannelsPlugin(
backend=MemoryChannelsBackend(),
channels=["foo", "bar"],
create_ws_route_handlers=True,
)
app = Litestar(plugins=[channels_plugin])
The generated route handlers can optionally be configured to send the channel’s history after a client has connected:
from litestar import Litestar
from litestar.channels import ChannelsPlugin
from litestar.channels.backends.memory import MemoryChannelsBackend
channels_plugin = ChannelsPlugin(
backend=MemoryChannelsBackend(history=10), # set the amount of messages per channel
# to keep in the backend
channels=["foo", "bar"],
create_ws_route_handlers=True,
ws_handler_send_history=10, # send 10 entries of the history by default
)
app = Litestar(plugins=[channels_plugin])
Tip
When using the arbitrary_channels_allowed
flag on the ChannelsPlugin
, a
single route handler will be generated instead, using a
path parameter to specify the channel name