Final touches and recap¶
There is one more improvement that we can make to our application. Currently, we utilize both the
SQLAlchemyInitPlugin
and the
SQLAlchemySerializationPlugin
, but there
is a shortcut for this configuration: the
SQLAlchemyPlugin
is a combination of the two, so we
can simplify our configuration by using it instead.
Here is our final application:
1from collections.abc import AsyncGenerator
2from typing import Optional
3
4from sqlalchemy import select
5from sqlalchemy.exc import IntegrityError, NoResultFound
6from sqlalchemy.ext.asyncio import AsyncSession
7from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
8
9from litestar import Litestar, get, post, put
10from litestar.contrib.sqlalchemy.plugins import SQLAlchemyAsyncConfig, SQLAlchemyPlugin
11from litestar.exceptions import ClientException, NotFoundException
12from litestar.status_codes import HTTP_409_CONFLICT
13
14
15class Base(DeclarativeBase): ...
16
17
18class TodoItem(Base):
19 __tablename__ = "todo_items"
20
21 title: Mapped[str] = mapped_column(primary_key=True)
22 done: Mapped[bool]
23
24
25async def provide_transaction(db_session: AsyncSession) -> AsyncGenerator[AsyncSession, None]:
26 try:
27 async with db_session.begin():
28 yield db_session
29 except IntegrityError as exc:
30 raise ClientException(
31 status_code=HTTP_409_CONFLICT,
32 detail=str(exc),
33 ) from exc
34
35
36async def get_todo_by_title(todo_name, session: AsyncSession) -> TodoItem:
37 query = select(TodoItem).where(TodoItem.title == todo_name)
38 result = await session.execute(query)
39 try:
40 return result.scalar_one()
41 except NoResultFound as e:
42 raise NotFoundException(detail=f"TODO {todo_name!r} not found") from e
43
44
45async def get_todo_list(done: Optional[bool], session: AsyncSession) -> list[TodoItem]:
46 query = select(TodoItem)
47 if done is not None:
48 query = query.where(TodoItem.done.is_(done))
49
50 result = await session.execute(query)
51 return result.scalars().all()
52
53
54@get("/")
55async def get_list(transaction: AsyncSession, done: Optional[bool] = None) -> list[TodoItem]:
56 return await get_todo_list(done, transaction)
57
58
59@post("/")
60async def add_item(data: TodoItem, transaction: AsyncSession) -> TodoItem:
61 transaction.add(data)
62 return data
63
64
65@put("/{item_title:str}")
66async def update_item(item_title: str, data: TodoItem, transaction: AsyncSession) -> TodoItem:
67 todo_item = await get_todo_by_title(item_title, transaction)
68 todo_item.title = data.title
69 todo_item.done = data.done
70 return todo_item
71
72
73db_config = SQLAlchemyAsyncConfig(
74 connection_string="sqlite+aiosqlite:///todo.sqlite",
75 metadata=Base.metadata,
76 create_all=True,
77 before_send_handler="autocommit",
78)
79
80app = Litestar(
81 [get_list, add_item, update_item],
82 dependencies={"transaction": provide_transaction},
83 plugins=[SQLAlchemyPlugin(db_config)],
84)
1from collections.abc import AsyncGenerator
2
3from sqlalchemy import select
4from sqlalchemy.exc import IntegrityError, NoResultFound
5from sqlalchemy.ext.asyncio import AsyncSession
6from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
7
8from litestar import Litestar, get, post, put
9from litestar.contrib.sqlalchemy.plugins import SQLAlchemyAsyncConfig, SQLAlchemyPlugin
10from litestar.exceptions import ClientException, NotFoundException
11from litestar.status_codes import HTTP_409_CONFLICT
12
13
14class Base(DeclarativeBase): ...
15
16
17class TodoItem(Base):
18 __tablename__ = "todo_items"
19
20 title: Mapped[str] = mapped_column(primary_key=True)
21 done: Mapped[bool]
22
23
24async def provide_transaction(db_session: AsyncSession) -> AsyncGenerator[AsyncSession, None]:
25 try:
26 async with db_session.begin():
27 yield db_session
28 except IntegrityError as exc:
29 raise ClientException(
30 status_code=HTTP_409_CONFLICT,
31 detail=str(exc),
32 ) from exc
33
34
35async def get_todo_by_title(todo_name, session: AsyncSession) -> TodoItem:
36 query = select(TodoItem).where(TodoItem.title == todo_name)
37 result = await session.execute(query)
38 try:
39 return result.scalar_one()
40 except NoResultFound as e:
41 raise NotFoundException(detail=f"TODO {todo_name!r} not found") from e
42
43
44async def get_todo_list(done: bool | None, session: AsyncSession) -> list[TodoItem]:
45 query = select(TodoItem)
46 if done is not None:
47 query = query.where(TodoItem.done.is_(done))
48
49 result = await session.execute(query)
50 return result.scalars().all()
51
52
53@get("/")
54async def get_list(transaction: AsyncSession, done: bool | None = None) -> list[TodoItem]:
55 return await get_todo_list(done, transaction)
56
57
58@post("/")
59async def add_item(data: TodoItem, transaction: AsyncSession) -> TodoItem:
60 transaction.add(data)
61 return data
62
63
64@put("/{item_title:str}")
65async def update_item(item_title: str, data: TodoItem, transaction: AsyncSession) -> TodoItem:
66 todo_item = await get_todo_by_title(item_title, transaction)
67 todo_item.title = data.title
68 todo_item.done = data.done
69 return todo_item
70
71
72db_config = SQLAlchemyAsyncConfig(
73 connection_string="sqlite+aiosqlite:///todo.sqlite",
74 metadata=Base.metadata,
75 create_all=True,
76 before_send_handler="autocommit",
77)
78
79app = Litestar(
80 [get_list, add_item, update_item],
81 dependencies={"transaction": provide_transaction},
82 plugins=[SQLAlchemyPlugin(db_config)],
83)
Recap¶
In this tutorial, we have learned how to use the SQLAlchemy plugin to create a simple application that uses a database to store and retrieve data.
In the final application TodoItem
is defined, representing a TODO item. It extends from the
DeclarativeBase
class provided by SQLAlchemy:
1class Base(DeclarativeBase): ...
2
3
4class TodoItem(Base):
5 __tablename__ = "todo_items"
6
7 title: Mapped[str] = mapped_column(primary_key=True)
8 done: Mapped[bool]
Next, we define a dependency that centralizes our database transaction management and error handling. This dependency
depends on the db_session
dependency, which is provided by the SQLAlchemy plugin, and is made available to our
handlers via the transaction
argument:
1async def provide_transaction(db_session: AsyncSession) -> AsyncGenerator[AsyncSession, None]:
2 try:
3 async with db_session.begin():
4 yield db_session
5 except IntegrityError as exc:
6 raise ClientException(
7 status_code=HTTP_409_CONFLICT,
8 detail=str(exc),
9 ) from exc
We also define a couple of utility functions, that help us to retrieve our TODO items from the database:
1async def get_todo_by_title(todo_name, session: AsyncSession) -> TodoItem:
2 query = select(TodoItem).where(TodoItem.title == todo_name)
3 result = await session.execute(query)
4 try:
5 return result.scalar_one()
6 except NoResultFound as e:
7 raise NotFoundException(detail=f"TODO {todo_name!r} not found") from e
8
9
10async def get_todo_list(done: Optional[bool], session: AsyncSession) -> list[TodoItem]:
11 query = select(TodoItem)
12 if done is not None:
13 query = query.where(TodoItem.done.is_(done))
14
15 result = await session.execute(query)
16 return result.scalars().all()
We define our route handlers, which are the interface through which TODO items can be created, retrieved and updated:
1@get("/")
2async def get_list(transaction: AsyncSession, done: Optional[bool] = None) -> list[TodoItem]:
3 return await get_todo_list(done, transaction)
4
5
6@post("/")
7async def add_item(data: TodoItem, transaction: AsyncSession) -> TodoItem:
8 transaction.add(data)
9 return data
10
11
12@put("/{item_title:str}")
13async def update_item(item_title: str, data: TodoItem, transaction: AsyncSession) -> TodoItem:
14 todo_item = await get_todo_by_title(item_title, transaction)
15 todo_item.title = data.title
16 todo_item.done = data.done
17 return todo_item
Finally, we define our application, using the
SQLAlchemyPlugin
to configure SQLAlchemy and manage the
engine and session lifecycle, and register our transaction
dependency.
1app = Litestar(
2 [get_list, add_item, update_item],
3 dependencies={"transaction": provide_transaction},
4 plugins=[SQLAlchemyPlugin(db_config)],
See also