memory#

class litestar.channels.backends.memory.MemoryChannelsBackend#

Bases: ChannelsBackend

An in-memory channels backend

__init__(history: int = 0) None#
async on_startup() None#

Called by the plugin on application startup

async on_shutdown() None#

Called by the plugin on application shutdown

async publish(data: bytes, channels: Iterable[str]) None#

Publish data to channels. If a channel has not yet been subscribed to, this will be a no-op.

Parameters:
  • data – Data to publish

  • channels – Channels to publish to

Returns:

None

Raises:

RuntimeError – If on_startup has not been called yet

async subscribe(channels: Iterable[str]) None#

Subscribe to channels, and enable publishing to them

async unsubscribe(channels: Iterable[str]) None#

Unsubscribe from channels

async stream_events() AsyncGenerator[tuple[str, Any], None]#

Return a generator, iterating over events of subscribed channels as they become available

async get_history(channel: str, limit: int | None = None) list[bytes]#

Return the event history of channel, at most limit entries