base#

class litestar.channels.backends.base.ChannelsBackend#

Bases: ABC

abstract async on_startup() None#

Called by the plugin on application startup

abstract async on_shutdown() None#

Called by the plugin on application shutdown

abstract async publish(data: bytes, channels: Iterable[str]) None#

Publish the message data to all channels

abstract async subscribe(channels: Iterable[str]) None#

Start listening for events on channels

abstract async unsubscribe(channels: Iterable[str]) None#

Stop listening for events on channels

abstract stream_events() AsyncGenerator[tuple[str, bytes], None]#

Return a generator, iterating over events of subscribed channels as they become available

abstract async get_history(channel: str, limit: int | None = None) list[bytes]#

Return the event history of channel, at most limit entries