Introduction#

We start with a full script that shows how you can use SQLAlchemy with Litestar. In this app, we interact with SQLAlchemy in the manner described by the SQLAlchemy documentation, and so if you are looking for more information about any of the SQLAlchemy code, this will be a great place to start.

You’ll notice that we use a couple of Litestar features that you may not have encountered yet:

  1. Management and injection of application state

  2. Use of a Lifespan context manager

And we will continue to learn about other Litestar features as we work through the tutorial, such as:

  1. Dependency injection

  2. Plugins

The full app#

While it may look imposing, this app only has minor behavioral differences to the previous example. It is still an app that maintains a TODO list, that allows for adding, updating and viewing the collection of TODO items.

Don’t worry if there are things in this example that you don’t understand. We will cover all of the components in detail in the following sections.

  1from contextlib import asynccontextmanager
  2from typing import Any, AsyncGenerator, Dict, List, Optional, Sequence
  3
  4from sqlalchemy import select
  5from sqlalchemy.exc import IntegrityError, NoResultFound
  6from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
  7from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
  8
  9from litestar import Litestar, get, post, put
 10from litestar.datastructures import State
 11from litestar.exceptions import ClientException, NotFoundException
 12from litestar.status_codes import HTTP_409_CONFLICT
 13
 14TodoType = Dict[str, Any]
 15TodoCollectionType = List[TodoType]
 16
 17
 18class Base(DeclarativeBase): ...
 19
 20
 21class TodoItem(Base):
 22    __tablename__ = "todo_items"
 23
 24    title: Mapped[str] = mapped_column(primary_key=True)
 25    done: Mapped[bool]
 26
 27
 28@asynccontextmanager
 29async def db_connection(app: Litestar) -> AsyncGenerator[None, None]:
 30    engine = getattr(app.state, "engine", None)
 31    if engine is None:
 32        engine = create_async_engine("sqlite+aiosqlite:///todo.sqlite", echo=True)
 33        app.state.engine = engine
 34
 35    async with engine.begin() as conn:
 36        await conn.run_sync(Base.metadata.create_all)
 37
 38    try:
 39        yield
 40    finally:
 41        await engine.dispose()
 42
 43
 44sessionmaker = async_sessionmaker(expire_on_commit=False)
 45
 46
 47def serialize_todo(todo: TodoItem) -> TodoType:
 48    return {"title": todo.title, "done": todo.done}
 49
 50
 51async def get_todo_by_title(todo_name: str, session: AsyncSession) -> TodoItem:
 52    query = select(TodoItem).where(TodoItem.title == todo_name)
 53    result = await session.execute(query)
 54    try:
 55        return result.scalar_one()
 56    except NoResultFound as e:
 57        raise NotFoundException(detail=f"TODO {todo_name!r} not found") from e
 58
 59
 60async def get_todo_list(done: Optional[bool], session: AsyncSession) -> Sequence[TodoItem]:
 61    query = select(TodoItem)
 62    if done is not None:
 63        query = query.where(TodoItem.done.is_(done))
 64
 65    result = await session.execute(query)
 66    return result.scalars().all()
 67
 68
 69@get("/")
 70async def get_list(state: State, done: Optional[bool] = None) -> TodoCollectionType:
 71    async with sessionmaker(bind=state.engine) as session:
 72        return [serialize_todo(todo) for todo in await get_todo_list(done, session)]
 73
 74
 75@post("/")
 76async def add_item(data: TodoType, state: State) -> TodoType:
 77    new_todo = TodoItem(title=data["title"], done=data["done"])
 78    async with sessionmaker(bind=state.engine) as session:
 79        try:
 80            async with session.begin():
 81                session.add(new_todo)
 82        except IntegrityError as e:
 83            raise ClientException(
 84                status_code=HTTP_409_CONFLICT,
 85                detail=f"TODO {new_todo.title!r} already exists",
 86            ) from e
 87
 88    return serialize_todo(new_todo)
 89
 90
 91@put("/{item_title:str}")
 92async def update_item(item_title: str, data: TodoType, state: State) -> TodoType:
 93    async with sessionmaker(bind=state.engine) as session, session.begin():
 94        todo_item = await get_todo_by_title(item_title, session)
 95        todo_item.title = data["title"]
 96        todo_item.done = data["done"]
 97    return serialize_todo(todo_item)
 98
 99
100app = Litestar([get_list, add_item, update_item], lifespan=[db_connection])
  1from contextlib import asynccontextmanager
  2from typing import Any, Optional
  3from collections.abc import AsyncGenerator, Sequence
  4
  5from sqlalchemy import select
  6from sqlalchemy.exc import IntegrityError, NoResultFound
  7from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
  8from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
  9
 10from litestar import Litestar, get, post, put
 11from litestar.datastructures import State
 12from litestar.exceptions import ClientException, NotFoundException
 13from litestar.status_codes import HTTP_409_CONFLICT
 14
 15TodoType = dict[str, Any]
 16TodoCollectionType = list[TodoType]
 17
 18
 19class Base(DeclarativeBase): ...
 20
 21
 22class TodoItem(Base):
 23    __tablename__ = "todo_items"
 24
 25    title: Mapped[str] = mapped_column(primary_key=True)
 26    done: Mapped[bool]
 27
 28
 29@asynccontextmanager
 30async def db_connection(app: Litestar) -> AsyncGenerator[None, None]:
 31    engine = getattr(app.state, "engine", None)
 32    if engine is None:
 33        engine = create_async_engine("sqlite+aiosqlite:///todo.sqlite", echo=True)
 34        app.state.engine = engine
 35
 36    async with engine.begin() as conn:
 37        await conn.run_sync(Base.metadata.create_all)
 38
 39    try:
 40        yield
 41    finally:
 42        await engine.dispose()
 43
 44
 45sessionmaker = async_sessionmaker(expire_on_commit=False)
 46
 47
 48def serialize_todo(todo: TodoItem) -> TodoType:
 49    return {"title": todo.title, "done": todo.done}
 50
 51
 52async def get_todo_by_title(todo_name: str, session: AsyncSession) -> TodoItem:
 53    query = select(TodoItem).where(TodoItem.title == todo_name)
 54    result = await session.execute(query)
 55    try:
 56        return result.scalar_one()
 57    except NoResultFound as e:
 58        raise NotFoundException(detail=f"TODO {todo_name!r} not found") from e
 59
 60
 61async def get_todo_list(done: Optional[bool], session: AsyncSession) -> Sequence[TodoItem]:
 62    query = select(TodoItem)
 63    if done is not None:
 64        query = query.where(TodoItem.done.is_(done))
 65
 66    result = await session.execute(query)
 67    return result.scalars().all()
 68
 69
 70@get("/")
 71async def get_list(state: State, done: Optional[bool] = None) -> TodoCollectionType:
 72    async with sessionmaker(bind=state.engine) as session:
 73        return [serialize_todo(todo) for todo in await get_todo_list(done, session)]
 74
 75
 76@post("/")
 77async def add_item(data: TodoType, state: State) -> TodoType:
 78    new_todo = TodoItem(title=data["title"], done=data["done"])
 79    async with sessionmaker(bind=state.engine) as session:
 80        try:
 81            async with session.begin():
 82                session.add(new_todo)
 83        except IntegrityError as e:
 84            raise ClientException(
 85                status_code=HTTP_409_CONFLICT,
 86                detail=f"TODO {new_todo.title!r} already exists",
 87            ) from e
 88
 89    return serialize_todo(new_todo)
 90
 91
 92@put("/{item_title:str}")
 93async def update_item(item_title: str, data: TodoType, state: State) -> TodoType:
 94    async with sessionmaker(bind=state.engine) as session, session.begin():
 95        todo_item = await get_todo_by_title(item_title, session)
 96        todo_item.title = data["title"]
 97        todo_item.done = data["done"]
 98    return serialize_todo(todo_item)
 99
100
101app = Litestar([get_list, add_item, update_item], lifespan=[db_connection])
  1from contextlib import asynccontextmanager
  2from typing import Any
  3from collections.abc import AsyncGenerator, Sequence
  4
  5from sqlalchemy import select
  6from sqlalchemy.exc import IntegrityError, NoResultFound
  7from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
  8from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
  9
 10from litestar import Litestar, get, post, put
 11from litestar.datastructures import State
 12from litestar.exceptions import ClientException, NotFoundException
 13from litestar.status_codes import HTTP_409_CONFLICT
 14
 15TodoType = dict[str, Any]
 16TodoCollectionType = list[TodoType]
 17
 18
 19class Base(DeclarativeBase): ...
 20
 21
 22class TodoItem(Base):
 23    __tablename__ = "todo_items"
 24
 25    title: Mapped[str] = mapped_column(primary_key=True)
 26    done: Mapped[bool]
 27
 28
 29@asynccontextmanager
 30async def db_connection(app: Litestar) -> AsyncGenerator[None, None]:
 31    engine = getattr(app.state, "engine", None)
 32    if engine is None:
 33        engine = create_async_engine("sqlite+aiosqlite:///todo.sqlite", echo=True)
 34        app.state.engine = engine
 35
 36    async with engine.begin() as conn:
 37        await conn.run_sync(Base.metadata.create_all)
 38
 39    try:
 40        yield
 41    finally:
 42        await engine.dispose()
 43
 44
 45sessionmaker = async_sessionmaker(expire_on_commit=False)
 46
 47
 48def serialize_todo(todo: TodoItem) -> TodoType:
 49    return {"title": todo.title, "done": todo.done}
 50
 51
 52async def get_todo_by_title(todo_name: str, session: AsyncSession) -> TodoItem:
 53    query = select(TodoItem).where(TodoItem.title == todo_name)
 54    result = await session.execute(query)
 55    try:
 56        return result.scalar_one()
 57    except NoResultFound as e:
 58        raise NotFoundException(detail=f"TODO {todo_name!r} not found") from e
 59
 60
 61async def get_todo_list(done: bool | None, session: AsyncSession) -> Sequence[TodoItem]:
 62    query = select(TodoItem)
 63    if done is not None:
 64        query = query.where(TodoItem.done.is_(done))
 65
 66    result = await session.execute(query)
 67    return result.scalars().all()
 68
 69
 70@get("/")
 71async def get_list(state: State, done: bool | None = None) -> TodoCollectionType:
 72    async with sessionmaker(bind=state.engine) as session:
 73        return [serialize_todo(todo) for todo in await get_todo_list(done, session)]
 74
 75
 76@post("/")
 77async def add_item(data: TodoType, state: State) -> TodoType:
 78    new_todo = TodoItem(title=data["title"], done=data["done"])
 79    async with sessionmaker(bind=state.engine) as session:
 80        try:
 81            async with session.begin():
 82                session.add(new_todo)
 83        except IntegrityError as e:
 84            raise ClientException(
 85                status_code=HTTP_409_CONFLICT,
 86                detail=f"TODO {new_todo.title!r} already exists",
 87            ) from e
 88
 89    return serialize_todo(new_todo)
 90
 91
 92@put("/{item_title:str}")
 93async def update_item(item_title: str, data: TodoType, state: State) -> TodoType:
 94    async with sessionmaker(bind=state.engine) as session, session.begin():
 95        todo_item = await get_todo_by_title(item_title, session)
 96        todo_item.title = data["title"]
 97        todo_item.done = data["done"]
 98    return serialize_todo(todo_item)
 99
100
101app = Litestar([get_list, add_item, update_item], lifespan=[db_connection])

The differences#

Apart from the obvious differences due to the SQLAlchemy code, there are a few things worth mentioning from the outset.

Complexity#

This code is undoubtedly more complex than the code we have seen so far - although a crude measure of complexity, we can see that there are more than double the lines of code to the previous example.

Lifespan context manager#

When using a database, we need to ensure that we clean up our resources correctly. To do this, we create a context manager called db_connection() that creates a new engine and disposes of it when we are done. This context manager is added to the application’s lifespan argument.

 1@asynccontextmanager
 2async def db_connection(app: Litestar) -> AsyncGenerator[None, None]:
 3    engine = getattr(app.state, "engine", None)
 4    if engine is None:
 5        engine = create_async_engine("sqlite+aiosqlite:///todo.sqlite", echo=True)
 6        app.state.engine = engine
 7
 8    async with engine.begin() as conn:
 9        await conn.run_sync(Base.metadata.create_all)
10
11    try:
12        yield
13    finally:
14        await engine.dispose()
15app = Litestar([get_list, add_item, update_item], lifespan=[db_connection])

Database creation#

Before we can use the database we need to make sure it exists and the tables are created as defined by the TodoItem class. This can be done by a synchronous call to Base.metadata.create_all which is invoked by run_sync. If the tables are already setup according to the model, the call does nothing.

 1@asynccontextmanager
 2async def db_connection(app: Litestar) -> AsyncGenerator[None, None]:
 3    engine = getattr(app.state, "engine", None)
 4    if engine is None:
 5        engine = create_async_engine("sqlite+aiosqlite:///todo.sqlite", echo=True)
 6        app.state.engine = engine
 7
 8    async with engine.begin() as conn:
 9        await conn.run_sync(Base.metadata.create_all)
10
11    try:
12        yield
13    finally:
14        await engine.dispose()

Application state#

We see two examples of access and use of application state. The first is in the db_connection() context manager, where we use the app.state object to store the engine.

 1@asynccontextmanager
 2async def db_connection(app: Litestar) -> AsyncGenerator[None, None]:
 3    engine = getattr(app.state, "engine", None)
 4    if engine is None:
 5        engine = create_async_engine("sqlite+aiosqlite:///todo.sqlite", echo=True)
 6        app.state.engine = engine
 7
 8    async with engine.begin() as conn:
 9        await conn.run_sync(Base.metadata.create_all)
10
11    try:
12        yield
13    finally:
14        await engine.dispose()

The second is by using the state keyword argument in our handler functions, so that we can access the engine in our handlers.

1@get("/")
2async def get_list(state: State, done: Optional[bool] = None) -> TodoCollectionType:
3    async with sessionmaker(bind=state.engine) as session:
4        return [serialize_todo(todo) for todo in await get_todo_list(done, session)]

Serialization#

Now that we are using SQLAlchemy models, Litestar cannot automatically handle (de)serialization of our data. We have to convert the SQLAlchemy models to a type that Litestar can serialize. This example introduces two type aliases, TodoType and TodoCollectionType to help us represent this data at the boundaries of our handlers. It also introduces the serialize_todo() to help us convert our data from the TodoItem type to a type that is serializable by Litestar.

 1from typing import Any, Dict, List
 2
 3TodoType = Dict[str, Any]
 4TodoCollectionType = List[TodoType]
 5def serialize_todo(todo: TodoItem) -> TodoType:
 6    return {"title": todo.title, "done": todo.done}
 7
 8
 9@put("/{item_title:str}")
10async def update_item(item_title: str, data: TodoType, state: State) -> TodoType:
11    async with sessionmaker(bind=state.engine) as session, session.begin():
12        todo_item = await get_todo_by_title(item_title, session)
13        todo_item.title = data["title"]
14        todo_item.done = data["done"]
15    return serialize_todo(todo_item)
 1from typing import Any
 2
 3TodoType = dict[str, Any]
 4TodoCollectionType = list[TodoType]
 5def serialize_todo(todo: TodoItem) -> TodoType:
 6    return {"title": todo.title, "done": todo.done}
 7
 8
 9@put("/{item_title:str}")
10async def update_item(item_title: str, data: TodoType, state: State) -> TodoType:
11    async with sessionmaker(bind=state.engine) as session, session.begin():
12        todo_item = await get_todo_by_title(item_title, session)
13        todo_item.title = data["title"]
14        todo_item.done = data["done"]
15    return serialize_todo(todo_item)

Behavior#

The add_item() and update_item() routes no longer return the full collection, instead they return the item that was added or updated. This is a minor detail change, but it is worth noting as it brings the behavior of the app closer to what we would expect from a conventional API.

Next steps#

Lets start cleaning this app up a little.

One of the standout issues is that we repeat the logic to create a database session in every handler. This is something that we can fix with dependency injection.