memory

class litestar.channels.backends.memory.MemoryChannelsBackend

Bases: ChannelsBackend

An in-memory channels backend

__init__(history: int = 0) None
async on_startup() None

Called by the plugin on application startup

async on_shutdown() None

Called by the plugin on application shutdown

async publish(data: bytes, channels: Iterable[str]) None

Publish data to channels. If a channel has not yet been subscribed to, this will be a no-op.

Parameters:
  • data – Data to publish

  • channels – Channels to publish to

Returns:

None

Raises:

RuntimeError – If on_startup has not been called yet

async subscribe(channels: Iterable[str]) None

Subscribe to channels, and enable publishing to them

async unsubscribe(channels: Iterable[str]) None

Unsubscribe from channels

async stream_events() AsyncGenerator[tuple[str, Any], None]

Return a generator, iterating over events of subscribed channels as they become available

async get_history(channel: str, limit: int | None = None) list[bytes]

Return the event history of channel, at most limit entries