Source code for litestar.channels.backends.base

from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from collections.abc import AsyncGenerator, Iterable


[docs] class ChannelsBackend(ABC):
[docs] @abstractmethod async def on_startup(self) -> None: """Called by the plugin on application startup""" ...
[docs] @abstractmethod async def on_shutdown(self) -> None: """Called by the plugin on application shutdown""" ...
[docs] @abstractmethod async def publish(self, data: bytes, channels: Iterable[str]) -> None: """Publish the message ``data`` to all ``channels``""" ...
[docs] @abstractmethod async def subscribe(self, channels: Iterable[str]) -> None: """Start listening for events on ``channels``""" ...
[docs] @abstractmethod async def unsubscribe(self, channels: Iterable[str]) -> None: """Stop listening for events on ``channels``""" ...
[docs] @abstractmethod def stream_events(self) -> AsyncGenerator[tuple[str, bytes], None]: """Return a generator, iterating over events of subscribed channels as they become available""" ...
[docs] @abstractmethod async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]: """Return the event history of ``channel``, at most ``limit`` entries""" ...