plugin#

class litestar.channels.plugin.ChannelsPlugin#

Bases: InitPluginProtocol, AbstractAsyncContextManager

__init__(backend: ChannelsBackend, *, channels: Iterable[str] | None = None, arbitrary_channels_allowed: bool = False, create_ws_route_handlers: bool = False, ws_handler_send_history: int = 0, ws_handler_base_path: str = '/', ws_send_mode: WebSocketMode = 'text', subscriber_max_backlog: int | None = None, subscriber_backlog_strategy: BacklogStrategy = 'backoff', subscriber_class: type[Subscriber] = <class 'litestar.channels.subscriber.Subscriber'>, type_encoders: TypeEncodersMap | None = None) None#

Plugin to handle broadcasting to WebSockets with support for channels.

This plugin is available as an injected dependency using the channels key.

Parameters:
  • backend – Backend to store data in

  • channels – Channels to serve. If arbitrary_channels_allowed is False (the default), trying to subscribe to a channel not in channels will raise an exception

  • arbitrary_channels_allowed – Allow the creation of channels on the fly

  • create_ws_route_handlers – If True, websocket route handlers will be created for all channels defined in channels. If arbitrary_channels_allowed is True, a single handler will be created instead, handling all channels. The handlers created will accept WebSocket connections and start sending received events for their respective channels.

  • ws_handler_send_history – Amount of history entries to send from the generated websocket route handlers after a client has connected. A value of 0 indicates to not send a history

  • ws_handler_base_path – Path prefix used for the generated route handlers

  • ws_send_mode – Send mode to use for sending data through a WebSocket. This will be used when sending within generated route handlers or Subscriber.run_in_background()

  • subscriber_max_backlog – Maximum amount of unsent messages to be held in memory for a given subscriber. If that limit is reached, new messages will be treated accordingly to backlog_strategy

  • subscriber_backlog_strategy – Define the behaviour if max_backlog is reached for a subscriber. ` backoff` will result in new messages being dropped until older ones have been processed. dropleft will drop older messages in favour of new ones.

  • subscriber_class – A Subscriber subclass to return from subscribe()

  • type_encoders – An additional mapping of type encoders used to encode data before sending

encode_data(data: LitestarEncodableType) bytes#

Encode data before storing it in the backend

on_app_init(app_config: AppConfig) AppConfig#

Plugin hook. Set up a channels dependency, add route handlers and register application hooks

publish(data: LitestarEncodableType, channels: str | Iterable[str]) None#

Schedule data to be published to channels.

Note

This is a synchronous method that returns immediately. There are no guarantees that when this method returns the data will have been published to the backend. For that, use wait_published()

async wait_published(data: LitestarEncodableType, channels: str | Iterable[str]) None#

Publish data to channels

async subscribe(channels: str | Iterable[str], history: int | None = None) Subscriber#

Create a Subscriber, providing a stream of all events in channels.

The created subscriber will be passive by default and has to be consumed manually, either by using Subscriber.run_in_background() or iterating over events using Subscriber.iter_events().

Parameters:
  • channels – Channel(s) to subscribe to

  • history – If a non-negative integer, add this amount of history entries from each channel to the subscriber’s event stream. Note that this will wait until all history entries are fetched and pushed to the subscriber’s stream. For more control use put_subscriber_history().

Returns:

A Subscriber

Raises:

ChannelsException – If a channel in channels has not been declared on this backend and arbitrary_channels_allowed has not been set to True

async unsubscribe(subscriber: Subscriber, channels: str | Iterable[str] | None = None) None#

Unsubscribe a Subscriber from channels. If the subscriber has a running sending task, it will be stopped.

Parameters:
  • channels – Channels to unsubscribe from. If None, unsubscribe from all channels

  • subscriberSubscriber to unsubscribe

start_subscription(channels: str | Iterable[str], history: int | None = None) AsyncGenerator[Subscriber, None]#

Create a Subscriber and tie its subscriptions to a context manager; Upon exiting the context, unsubscribe() will be called.

Parameters:
  • channels – Channel(s) to subscribe to

  • history – If a non-negative integer, add this amount of history entries from each channel to the subscriber’s event stream. Note that this will wait until all history entries are fetched and pushed to the subscriber’s stream. For more control use put_subscriber_history().

Returns:

A Subscriber

async put_subscriber_history(subscriber: Subscriber, channels: str | Iterable[str], limit: int | None = None) None#

Fetch the history of channels from the backend and put them in the subscriber’s stream

class litestar.channels.plugin.ChannelsException#

Bases: LitestarException