Events#
Litestar supports a simple implementation of the event emitter / listener pattern:
from dataclasses import dataclass
from litestar import Request, post
from litestar.events import listener
from litestar import Litestar
from db import user_repository
from utils.email import send_welcome_mail
@listener("user_created")
async def send_welcome_email_handler(email: str) -> None:
# do something here to send an email
await send_welcome_mail(email)
@dataclass
class CreateUserDTO:
first_name: str
last_name: str
email: str
@post("/users")
async def create_user_handler(data: UserDTO, request: Request) -> None:
# do something here to create a new user
# e.g. insert the user into a database
await user_repository.insert(data)
# assuming we have now inserted a user, we want to send a welcome email.
# To do this in a none-blocking fashion, we will emit an event to a listener, which will send the email,
# using a different async block than the one where we are returning a response.
request.app.emit("user_created", email=data.email)
app = Litestar(
route_handlers=[create_user_handler], listeners=[send_welcome_email_handler]
)
The above example illustrates the power of this pattern - it allows us to perform async operations without blocking, and without slowing down the response cycle.
Listening to Multiple Events#
Event listeners can listen to multiple events:
from litestar.events import listener
@listener("user_created", "password_changed")
async def send_email_handler(email: str, message: str) -> None:
# do something here to send an email
await send_email(email, message)
Using Multiple Listeners#
You can also listen to the same events using multiple listeners:
from dataclasses import dataclass
from litestar import Request, post
from litestar.events import listener
from db import user_repository
from utils.client import client
from utils.email import send_farewell_email
@listener("user_deleted")
async def send_farewell_email_handler(email: str, **kwargs) -> None:
# do something here to send an email
await send_farewell_email(email)
@listener("user_deleted")
async def notify_customer_support(reason: str, **kwargs) -> None:
# do something here to send an email
await client.post("some-url", reason)
@dataclass
class DeleteUserDTO:
email: str
reason: str
@post("/users")
async def delete_user_handler(data: UserDTO, request: Request) -> None:
await user_repository.delete({"email": email})
request.app.emit("user_deleted", email=data.email, reason="deleted")
In the above example we are performing two side effect for the same event, one sends the user an email, and the other sending an HTTP request to a service management system to create an issue.
Passing Arguments to Listeners#
The method emit
has the following signature:
def emit(self, event_id: str, *args: Any, **kwargs: Any) -> None: ...
This means that it expects a string for event_id
following by any number of positional and keyword arguments. While
this is highly flexible, it also means you need to ensure the listeners for a given event can handle all the expected args
and kwargs.
For example, the following would raise an exception in python:
@listener("user_deleted")
async def send_farewell_email_handler(email: str) -> None:
await send_farewell_email(email)
@listener("user_deleted")
async def notify_customer_support(reason: str) -> None:
# do something here to send an email
await client.post("some-url", reason)
@dataclass
class DeleteUserDTO:
email: str
reason: str
@post("/users")
async def delete_user_handler(data: UserDTO, request: Request) -> None:
await user_repository.delete({"email": email})
request.app.emit("user_deleted", email=data.email, reason="deleted")
The reason for this is that both listeners will receive two kwargs - email
and reason
. To avoid this, the previous example
had **kwargs
in both:
@listener("user_deleted")
async def send_farewell_email_handler(email: str, **kwargs) -> None:
await send_farewell_email(email)
@listener("user_deleted")
async def notify_customer_support(reason: str, **kwargs) -> None:
await client.post("some-url", reason)
Creating Event Emitters#
An “event emitter” is a class that inherits from
BaseEventEmitterBackend
, which
itself inherits from contextlib.AbstractAsyncContextManager
.
emit
: This is the method that performs the actual emitting logic.
Additionally, the abstract __aenter__
and __aexit__
methods from
contextlib.AbstractAsyncContextManager
must be implemented, allowing the
emitter to be used as an async context manager.
By default Litestar uses the
SimpleEventEmitter
, which offers an
in-memory async queue.
This solution works well if the system does not need to rely on complex behaviour, such as a retry mechanism, persistence, or scheduling/cron. For these more complex use cases, users should implement their own backend using either a DB/Key store that supports events (Redis, Postgres, etc.), or a message broker, job queue, or task queue technology.