Using the serialization plugin#
Our next improvement is to leverage the
SQLAlchemySerializationPlugin
so that we can receive and return our SQLAlchemy models directly to and from our handlers.
Here’s the code:
1from contextlib import asynccontextmanager
2from typing import AsyncGenerator, List, Optional
3
4from sqlalchemy import select
5from sqlalchemy.exc import IntegrityError, NoResultFound
6from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
7from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
8
9from litestar import Litestar, get, post, put
10from litestar.datastructures import State
11from litestar.exceptions import ClientException, NotFoundException
12from litestar.params import FromPath, FromQuery
13from litestar.plugins.sqlalchemy import SQLAlchemySerializationPlugin
14from litestar.status_codes import HTTP_409_CONFLICT
15
16
17class Base(DeclarativeBase): ...
18
19
20class TodoItem(Base):
21 __tablename__ = "todo_items"
22
23 title: Mapped[str] = mapped_column(primary_key=True)
24 done: Mapped[bool]
25
26
27@asynccontextmanager
28async def db_connection(app: Litestar) -> AsyncGenerator[None, None]:
29 engine = getattr(app.state, "engine", None)
30 if engine is None:
31 engine = create_async_engine("sqlite+aiosqlite:///todo.sqlite", echo=True)
32 app.state.engine = engine
33
34 async with engine.begin() as conn:
35 await conn.run_sync(Base.metadata.create_all)
36
37 try:
38 yield
39 finally:
40 await engine.dispose()
41
42
43sessionmaker = async_sessionmaker(expire_on_commit=False)
44
45
46async def provide_transaction(state: State) -> AsyncGenerator[AsyncSession, None]:
47 async with sessionmaker(bind=state.engine) as session:
48 try:
49 async with session.begin():
50 yield session
51 except IntegrityError as exc:
52 raise ClientException(
53 status_code=HTTP_409_CONFLICT,
54 detail=str(exc),
55 ) from exc
56
57
58async def get_todo_by_title(todo_name: FromPath[str], session: AsyncSession) -> TodoItem:
59 query = select(TodoItem).where(TodoItem.title == todo_name)
60 result = await session.execute(query)
61 try:
62 return result.scalar_one()
63 except NoResultFound as e:
64 raise NotFoundException(detail=f"TODO {todo_name!r} not found") from e
65
66
67async def get_todo_list(done: FromQuery[Optional[bool]], session: AsyncSession) -> List[TodoItem]:
68 query = select(TodoItem)
69 if done is not None:
70 query = query.where(TodoItem.done.is_(done))
71
72 result = await session.execute(query)
73 return list(result.scalars().all())
74
75
76@get("/")
77async def get_list(transaction: AsyncSession, done: FromQuery[Optional[bool]] = None) -> List[TodoItem]:
78 return await get_todo_list(done, transaction)
79
80
81@post("/")
82async def add_item(data: TodoItem, transaction: AsyncSession) -> TodoItem:
83 transaction.add(data)
84 return data
85
86
87@put("/{item_title:str}")
88async def update_item(item_title: FromPath[str], data: TodoItem, transaction: AsyncSession) -> TodoItem:
89 todo_item = await get_todo_by_title(item_title, transaction)
90 todo_item.title = data.title
91 todo_item.done = data.done
92 return todo_item
93
94
95app = Litestar(
96 [get_list, add_item, update_item],
97 dependencies={"transaction": provide_transaction},
98 lifespan=[db_connection],
99 plugins=[SQLAlchemySerializationPlugin()],
100)
1from contextlib import asynccontextmanager
2from typing import Optional
3from collections.abc import AsyncGenerator
4
5from sqlalchemy import select
6from sqlalchemy.exc import IntegrityError, NoResultFound
7from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
8from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
9
10from litestar import Litestar, get, post, put
11from litestar.datastructures import State
12from litestar.exceptions import ClientException, NotFoundException
13from litestar.params import FromPath, FromQuery
14from litestar.plugins.sqlalchemy import SQLAlchemySerializationPlugin
15from litestar.status_codes import HTTP_409_CONFLICT
16
17
18class Base(DeclarativeBase): ...
19
20
21class TodoItem(Base):
22 __tablename__ = "todo_items"
23
24 title: Mapped[str] = mapped_column(primary_key=True)
25 done: Mapped[bool]
26
27
28@asynccontextmanager
29async def db_connection(app: Litestar) -> AsyncGenerator[None, None]:
30 engine = getattr(app.state, "engine", None)
31 if engine is None:
32 engine = create_async_engine("sqlite+aiosqlite:///todo.sqlite", echo=True)
33 app.state.engine = engine
34
35 async with engine.begin() as conn:
36 await conn.run_sync(Base.metadata.create_all)
37
38 try:
39 yield
40 finally:
41 await engine.dispose()
42
43
44sessionmaker = async_sessionmaker(expire_on_commit=False)
45
46
47async def provide_transaction(state: State) -> AsyncGenerator[AsyncSession, None]:
48 async with sessionmaker(bind=state.engine) as session:
49 try:
50 async with session.begin():
51 yield session
52 except IntegrityError as exc:
53 raise ClientException(
54 status_code=HTTP_409_CONFLICT,
55 detail=str(exc),
56 ) from exc
57
58
59async def get_todo_by_title(todo_name: FromPath[str], session: AsyncSession) -> TodoItem:
60 query = select(TodoItem).where(TodoItem.title == todo_name)
61 result = await session.execute(query)
62 try:
63 return result.scalar_one()
64 except NoResultFound as e:
65 raise NotFoundException(detail=f"TODO {todo_name!r} not found") from e
66
67
68async def get_todo_list(done: FromQuery[Optional[bool]], session: AsyncSession) -> list[TodoItem]:
69 query = select(TodoItem)
70 if done is not None:
71 query = query.where(TodoItem.done.is_(done))
72
73 result = await session.execute(query)
74 return list(result.scalars().all())
75
76
77@get("/")
78async def get_list(transaction: AsyncSession, done: FromQuery[Optional[bool]] = None) -> list[TodoItem]:
79 return await get_todo_list(done, transaction)
80
81
82@post("/")
83async def add_item(data: TodoItem, transaction: AsyncSession) -> TodoItem:
84 transaction.add(data)
85 return data
86
87
88@put("/{item_title:str}")
89async def update_item(item_title: FromPath[str], data: TodoItem, transaction: AsyncSession) -> TodoItem:
90 todo_item = await get_todo_by_title(item_title, transaction)
91 todo_item.title = data.title
92 todo_item.done = data.done
93 return todo_item
94
95
96app = Litestar(
97 [get_list, add_item, update_item],
98 dependencies={"transaction": provide_transaction},
99 lifespan=[db_connection],
100 plugins=[SQLAlchemySerializationPlugin()],
101)
1from contextlib import asynccontextmanager
2from collections.abc import AsyncGenerator
3
4from sqlalchemy import select
5from sqlalchemy.exc import IntegrityError, NoResultFound
6from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
7from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
8
9from litestar import Litestar, get, post, put
10from litestar.datastructures import State
11from litestar.exceptions import ClientException, NotFoundException
12from litestar.params import FromPath, FromQuery
13from litestar.plugins.sqlalchemy import SQLAlchemySerializationPlugin
14from litestar.status_codes import HTTP_409_CONFLICT
15
16
17class Base(DeclarativeBase): ...
18
19
20class TodoItem(Base):
21 __tablename__ = "todo_items"
22
23 title: Mapped[str] = mapped_column(primary_key=True)
24 done: Mapped[bool]
25
26
27@asynccontextmanager
28async def db_connection(app: Litestar) -> AsyncGenerator[None, None]:
29 engine = getattr(app.state, "engine", None)
30 if engine is None:
31 engine = create_async_engine("sqlite+aiosqlite:///todo.sqlite", echo=True)
32 app.state.engine = engine
33
34 async with engine.begin() as conn:
35 await conn.run_sync(Base.metadata.create_all)
36
37 try:
38 yield
39 finally:
40 await engine.dispose()
41
42
43sessionmaker = async_sessionmaker(expire_on_commit=False)
44
45
46async def provide_transaction(state: State) -> AsyncGenerator[AsyncSession, None]:
47 async with sessionmaker(bind=state.engine) as session:
48 try:
49 async with session.begin():
50 yield session
51 except IntegrityError as exc:
52 raise ClientException(
53 status_code=HTTP_409_CONFLICT,
54 detail=str(exc),
55 ) from exc
56
57
58async def get_todo_by_title(todo_name: FromPath[str], session: AsyncSession) -> TodoItem:
59 query = select(TodoItem).where(TodoItem.title == todo_name)
60 result = await session.execute(query)
61 try:
62 return result.scalar_one()
63 except NoResultFound as e:
64 raise NotFoundException(detail=f"TODO {todo_name!r} not found") from e
65
66
67async def get_todo_list(done: FromQuery[bool | None], session: AsyncSession) -> list[TodoItem]:
68 query = select(TodoItem)
69 if done is not None:
70 query = query.where(TodoItem.done.is_(done))
71
72 result = await session.execute(query)
73 return list(result.scalars().all())
74
75
76@get("/")
77async def get_list(transaction: AsyncSession, done: FromQuery[bool | None] = None) -> list[TodoItem]:
78 return await get_todo_list(done, transaction)
79
80
81@post("/")
82async def add_item(data: TodoItem, transaction: AsyncSession) -> TodoItem:
83 transaction.add(data)
84 return data
85
86
87@put("/{item_title:str}")
88async def update_item(item_title: FromPath[str], data: TodoItem, transaction: AsyncSession) -> TodoItem:
89 todo_item = await get_todo_by_title(item_title, transaction)
90 todo_item.title = data.title
91 todo_item.done = data.done
92 return todo_item
93
94
95app = Litestar(
96 [get_list, add_item, update_item],
97 dependencies={"transaction": provide_transaction},
98 lifespan=[db_connection],
99 plugins=[SQLAlchemySerializationPlugin()],
100)
We’ve simply imported the plugin and added it to our app’s plugins list, and now we can receive and return our SQLAlchemy data models directly to and from our handler.
We’ve also been able to remove the TodoType and TodoCollectionType aliases, and the serialize_todo()
function, making the implementation even more concise.
Compare handlers before and after Serialization Plugin#
Once more, let’s compare the sets of application handlers before and after our refactoring:
1from contextlib import asynccontextmanager
2from typing import AsyncGenerator, List, Optional
3
4from sqlalchemy import select
5from sqlalchemy.exc import IntegrityError, NoResultFound
6from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
7from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
8
9from litestar import Litestar, get, post, put
10from litestar.datastructures import State
11from litestar.exceptions import ClientException, NotFoundException
12from litestar.params import FromPath, FromQuery
13from litestar.plugins.sqlalchemy import SQLAlchemySerializationPlugin
14 return list(result.scalars().all())
15
16
17@get("/")
18async def get_list(transaction: AsyncSession, done: FromQuery[Optional[bool]] = None) -> List[TodoItem]:
19 return await get_todo_list(done, transaction)
20
21
22@post("/")
23async def add_item(data: TodoItem, transaction: AsyncSession) -> TodoItem:
24 transaction.add(data)
25 return data
26
27
28@put("/{item_title:str}")
29async def update_item(item_title: FromPath[str], data: TodoItem, transaction: AsyncSession) -> TodoItem:
30 todo_item = await get_todo_by_title(item_title, transaction)
31 todo_item.title = data.title
32 todo_item.done = data.done
33 return todo_item
34
35
36app = Litestar(
37 [get_list, add_item, update_item],
38 dependencies={"transaction": provide_transaction},
39 lifespan=[db_connection],
40 plugins=[SQLAlchemySerializationPlugin()],
1from contextlib import asynccontextmanager
2from typing import Any, AsyncGenerator, Dict, List, Optional, Sequence
3
4from sqlalchemy import select
5from sqlalchemy.exc import IntegrityError, NoResultFound
6from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
7from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
8
9from litestar import Litestar, get, post, put
10from litestar.datastructures import State
11from litestar.exceptions import ClientException, NotFoundException
12from litestar.params import FromPath, FromQuery
13 return result.scalars().all()
14
15
16@get("/")
17async def get_list(state: State, done: FromQuery[Optional[bool]] = None) -> TodoCollectionType:
18 async with sessionmaker(bind=state.engine) as session:
19 return [serialize_todo(todo) for todo in await get_todo_list(done, session)]
20
21
22@post("/")
23async def add_item(data: TodoType, state: State) -> TodoType:
24 new_todo = TodoItem(title=data["title"], done=data["done"])
25 async with sessionmaker(bind=state.engine) as session:
26 try:
27 async with session.begin():
28 session.add(new_todo)
29 except IntegrityError as e:
30 raise ClientException(
31 status_code=HTTP_409_CONFLICT,
32 detail=f"TODO {new_todo.title!r} already exists",
33 ) from e
34
35 return serialize_todo(new_todo)
36
37
38@put("/{item_title:str}")
39async def update_item(item_title: FromPath[str], data: TodoType, state: State) -> TodoType:
40 async with sessionmaker(bind=state.engine) as session, session.begin():
41 todo_item = await get_todo_by_title(item_title, session)
42 todo_item.title = data["title"]
43 todo_item.done = data["done"]
44 return serialize_todo(todo_item)
Very nice! But, we can do better.
Next steps#
In our application, we’ve had to build a bit of scaffolding to integrate SQLAlchemy with our application. We’ve had to
define the db_connection() lifespan context manager, and the provide_transaction() dependency provider.
Next we’ll look at how the SQLAlchemyInitPlugin can
help us.