Working with Controllers and Repositories#
We’ve been working our way up the stack, starting with the database models, and now we are ready to use the repository in an actual route. Let’s see how we can use this in a controller.
Tip
The full code for this tutorial can be found below in the Full Code section.
First, we create a simple function that returns an instance of AuthorRepository
.
This function will be used to inject a repository instance into our controller routes.
Note that we are only passing in the database session in this example with no other
parameters.
1 model_type = AuthorModel
Because we’ll be using the SQLAlchemy plugin in Litestar, the session is automatically configured as a dependency.
By default, the repository doesn’t add any additional query options to your base statement, but provides the flexibility to override it, allowing you to pass your own statement:
1 """This provides the default Authors repository."""
2 return AuthorRepository(session=db_session)
3
4
5# we can optionally override the default `select` used for the repository to pass in
6# specific SQL options such as join details
7async def provide_author_details_repo(db_session: AsyncSession) -> AuthorRepository:
8 """This provides a simple example demonstrating how to override the join options for the repository."""
In this instance, we enhance the repository function by adding a selectinload
option. This option configures the specified relationship to load via
SELECT .. IN … loading pattern, optimizing the query execution.
Next, we define the AuthorController
. This controller exposes five routes for
interacting with the Author
model:
AuthorController (click to expand)
1 """
2 return filters.LimitOffset(page_size, page_size * (current_page - 1))
3
4
5class AuthorController(Controller):
6 """Author CRUD"""
7
8 dependencies = {"authors_repo": Provide(provide_authors_repo)}
9
10 @get(path="/authors")
11 async def list_authors(
12 self,
13 authors_repo: AuthorRepository,
14 limit_offset: filters.LimitOffset,
15 ) -> OffsetPagination[Author]:
16 """List authors."""
17 results, total = await authors_repo.list_and_count(limit_offset)
18 type_adapter = TypeAdapter(list[Author])
19 return OffsetPagination[Author](
20 items=type_adapter.validate_python(results),
21 total=total,
22 limit=limit_offset.limit,
23 offset=limit_offset.offset,
24 )
25
26 @post(path="/authors")
27 async def create_author(
28 self,
29 authors_repo: AuthorRepository,
30 data: AuthorCreate,
31 ) -> Author:
32 """Create a new author."""
33 obj = await authors_repo.add(
34 AuthorModel(**data.model_dump(exclude_unset=True, exclude_none=True)),
35 )
36 await authors_repo.session.commit()
37 return Author.model_validate(obj)
38
39 # we override the authors_repo to use the version that joins the Books in
40 @get(path="/authors/{author_id:uuid}", dependencies={"authors_repo": Provide(provide_author_details_repo)})
41 async def get_author(
42 self,
43 authors_repo: AuthorRepository,
44 author_id: UUID = Parameter(
45 title="Author ID",
46 description="The author to retrieve.",
47 ),
48 ) -> Author:
49 """Get an existing author."""
50 obj = await authors_repo.get(author_id)
51 return Author.model_validate(obj)
52
53 @patch(
54 path="/authors/{author_id:uuid}",
55 dependencies={"authors_repo": Provide(provide_author_details_repo)},
56 )
57 async def update_author(
58 self,
59 authors_repo: AuthorRepository,
60 data: AuthorUpdate,
61 author_id: UUID = Parameter(
62 title="Author ID",
63 description="The author to update.",
64 ),
65 ) -> Author:
66 """Update an author."""
67 raw_obj = data.model_dump(exclude_unset=True, exclude_none=True)
68 raw_obj.update({"id": author_id})
69 obj = await authors_repo.update(AuthorModel(**raw_obj))
70 await authors_repo.session.commit()
71 return Author.from_orm(obj)
72
73 @delete(path="/authors/{author_id:uuid}")
74 async def delete_author(
75 self,
76 authors_repo: AuthorRepository,
77 author_id: UUID = Parameter(
78 title="Author ID",
79 description="The author to delete.",
In our list detail endpoint, we use the pagination filter for limiting the amount of data returned, allowing us to retrieve large datasets in smaller, more manageable chunks.
In the above examples, we’ve used the asynchronous repository implementation. However, Litestar also supports synchronous database drivers with an identical implementation. Here’s a corresponding synchronous version of the previous example:
Synchronous Repository (click to expand)
1from __future__ import annotations
2
3from datetime import date
4from typing import TYPE_CHECKING
5from uuid import UUID
6
7from pydantic import BaseModel as _BaseModel
8from pydantic import TypeAdapter
9from sqlalchemy import ForeignKey, select
10from sqlalchemy.orm import Mapped, mapped_column, relationship, selectinload
11
12from litestar import Litestar, get
13from litestar.contrib.sqlalchemy import SQLAlchemyInitPlugin, SQLAlchemySyncConfig
14from litestar.contrib.sqlalchemy.base import UUIDAuditBase, UUIDBase
15from litestar.contrib.sqlalchemy.repository import SQLAlchemySyncRepository
16from litestar.controller import Controller
17from litestar.di import Provide
18from litestar.handlers.http_handlers.decorators import delete, patch, post
19from litestar.pagination import OffsetPagination
20from litestar.params import Parameter
21from litestar.repository.filters import LimitOffset
22
23if TYPE_CHECKING:
24 from sqlalchemy.orm import Session
25
26
27class BaseModel(_BaseModel):
28 """Extend Pydantic's BaseModel to enable ORM mode"""
29
30 model_config = {"from_attributes": True}
31
32
33# The SQLAlchemy base includes a declarative model for you to use in your models.
34# The `UUIDBase` class includes a `UUID` based primary key (`id`)
35class AuthorModel(UUIDBase):
36 # we can optionally provide the table name instead of auto-generating it
37 __tablename__ = "author" # type: ignore[assignment]
38 name: Mapped[str]
39 dob: Mapped[date | None]
40 books: Mapped[list[BookModel]] = relationship(back_populates="author", lazy="noload")
41
42
43# The `UUIDAuditBase` class includes the same UUID` based primary key (`id`) and 2
44# additional columns: `created_at` and `updated_at`. `created_at` is a timestamp of when the
45# record created, and `updated_at` is the last time the record was modified.
46class BookModel(UUIDAuditBase):
47 __tablename__ = "book" # type: ignore[assignment]
48 title: Mapped[str]
49 author_id: Mapped[UUID] = mapped_column(ForeignKey("author.id"))
50 author: Mapped[AuthorModel] = relationship(lazy="joined", innerjoin=True, viewonly=True)
51
52
53# we will explicitly define the schema instead of using DTO objects for clarity.
54
55
56class Author(BaseModel):
57 id: UUID | None
58 name: str
59 dob: date | None = None
60
61
62class AuthorCreate(BaseModel):
63 name: str
64 dob: date | None = None
65
66
67class AuthorUpdate(BaseModel):
68 name: str | None = None
69 dob: date | None = None
70
71
72class AuthorRepository(SQLAlchemySyncRepository[AuthorModel]):
73 """Author repository."""
74
75 model_type = AuthorModel
76
77
78async def provide_authors_repo(db_session: Session) -> AuthorRepository:
79 """This provides the default Authors repository."""
80 return AuthorRepository(session=db_session)
81
82
83# we can optionally override the default `select` used for the repository to pass in
84# specific SQL options such as join details
85async def provide_author_details_repo(db_session: Session) -> AuthorRepository:
86 """This provides a simple example demonstrating how to override the join options
87 for the repository."""
88 return AuthorRepository(
89 statement=select(AuthorModel).options(selectinload(AuthorModel.books)),
90 session=db_session,
91 )
92
93
94def provide_limit_offset_pagination(
95 current_page: int = Parameter(ge=1, query="currentPage", default=1, required=False),
96 page_size: int = Parameter(
97 query="pageSize",
98 ge=1,
99 default=10,
100 required=False,
101 ),
102) -> LimitOffset:
103 """Add offset/limit pagination.
104
105 Return type consumed by `Repository.apply_limit_offset_pagination()`.
106
107 Parameters
108 ----------
109 current_page : int
110 LIMIT to apply to select.
111 page_size : int
112 OFFSET to apply to select.
113 """
114 return LimitOffset(page_size, page_size * (current_page - 1))
115
116
117class AuthorController(Controller):
118 """Author CRUD"""
119
120 dependencies = {"authors_repo": Provide(provide_authors_repo, sync_to_thread=False)}
121
122 @get(path="/authors")
123 def list_authors(
124 self,
125 authors_repo: AuthorRepository,
126 limit_offset: LimitOffset,
127 ) -> OffsetPagination[Author]:
128 """List authors."""
129 results, total = authors_repo.list_and_count(limit_offset)
130 type_adapter = TypeAdapter(list[Author])
131 return OffsetPagination[Author](
132 items=type_adapter.validate_python(results),
133 total=total,
134 limit=limit_offset.limit,
135 offset=limit_offset.offset,
136 )
137
138 @post(path="/authors")
139 def create_author(
140 self,
141 authors_repo: AuthorRepository,
142 data: AuthorCreate,
143 ) -> Author:
144 """Create a new author."""
145 obj = authors_repo.add(
146 AuthorModel(**data.model_dump(exclude_unset=True, exclude_none=True)),
147 )
148 authors_repo.session.commit()
149 return Author.model_validate(obj)
150
151 # we override the authors_repo to use the version that joins the Books in
152 @get(
153 path="/authors/{author_id:uuid}",
154 dependencies={"authors_repo": Provide(provide_author_details_repo, sync_to_thread=False)},
155 )
156 def get_author(
157 self,
158 authors_repo: AuthorRepository,
159 author_id: UUID = Parameter(
160 title="Author ID",
161 description="The author to retrieve.",
162 ),
163 ) -> Author:
164 """Get an existing author."""
165 obj = authors_repo.get(author_id)
166 return Author.model_validate(obj)
167
168 @patch(
169 path="/authors/{author_id:uuid}",
170 dependencies={"authors_repo": Provide(provide_author_details_repo, sync_to_thread=False)},
171 )
172 def update_author(
173 self,
174 authors_repo: AuthorRepository,
175 data: AuthorUpdate,
176 author_id: UUID = Parameter(
177 title="Author ID",
178 description="The author to update.",
179 ),
180 ) -> Author:
181 """Update an author."""
182 raw_obj = data.model_dump(exclude_unset=True, exclude_none=True)
183 raw_obj.update({"id": author_id})
184 obj = authors_repo.update(AuthorModel(**raw_obj))
185 authors_repo.session.commit()
186 return Author.model_validate(obj)
187
188 @delete(path="/authors/{author_id:uuid}")
189 def delete_author(
190 self,
191 authors_repo: AuthorRepository,
192 author_id: UUID = Parameter(
193 title="Author ID",
194 description="The author to delete.",
195 ),
196 ) -> None:
197 """Delete a author from the system."""
198 _ = authors_repo.delete(author_id)
199 authors_repo.session.commit()
200
201
202sqlalchemy_config = SQLAlchemySyncConfig(connection_string="sqlite:///test.sqlite") # Create 'db_session' dependency.
203sqlalchemy_plugin = SQLAlchemyInitPlugin(config=sqlalchemy_config)
204
205
206def on_startup() -> None:
207 """Initializes the database."""
208 with sqlalchemy_config.get_engine().begin() as conn:
209 UUIDBase.metadata.create_all(conn)
210
211
212app = Litestar(
213 route_handlers=[AuthorController],
214 on_startup=[on_startup],
215 plugins=[SQLAlchemyInitPlugin(config=sqlalchemy_config)],
216 dependencies={"limit_offset": Provide(provide_limit_offset_pagination)},
217)
The examples above enable a feature-complete CRUD service that includes pagination! In the next section, we’ll explore how to extend the built-in repository to add additional functionality to our application.
Full Code#
Full Code (click to expand)
1from __future__ import annotations
2
3from datetime import date
4from typing import TYPE_CHECKING
5from uuid import UUID
6
7from pydantic import BaseModel as _BaseModel
8from pydantic import TypeAdapter
9from sqlalchemy import ForeignKey, select
10from sqlalchemy.orm import Mapped, mapped_column, relationship, selectinload
11
12from litestar import Litestar, get
13from litestar.controller import Controller
14from litestar.di import Provide
15from litestar.handlers.http_handlers.decorators import delete, patch, post
16from litestar.pagination import OffsetPagination
17from litestar.params import Parameter
18from litestar.plugins.sqlalchemy import (
19 AsyncSessionConfig,
20 SQLAlchemyAsyncConfig,
21 SQLAlchemyInitPlugin,
22 base,
23 filters,
24 repository,
25)
26
27if TYPE_CHECKING:
28 from sqlalchemy.ext.asyncio import AsyncSession
29
30
31class BaseModel(_BaseModel):
32 """Extend Pydantic's BaseModel to enable ORM mode"""
33
34 model_config = {"from_attributes": True}
35
36
37# The SQLAlchemy base includes a declarative model for you to use in your models.
38# The `UUIDBase` class includes a `UUID` based primary key (`id`)
39class AuthorModel(base.UUIDBase):
40 # we can optionally provide the table name instead of auto-generating it
41 __tablename__ = "author" # type: ignore[assignment]
42 name: Mapped[str]
43 dob: Mapped[date | None]
44 books: Mapped[list[BookModel]] = relationship(back_populates="author", lazy="noload")
45
46
47# The `UUIDAuditBase` class includes the same UUID` based primary key (`id`) and 2
48# additional columns: `created_at` and `updated_at`. `created_at` is a timestamp of when the
49# record created, and `updated_at` is the last time the record was modified.
50class BookModel(base.UUIDAuditBase):
51 __tablename__ = "book" # type: ignore[assignment]
52 title: Mapped[str]
53 author_id: Mapped[UUID] = mapped_column(ForeignKey("author.id"))
54 author: Mapped[AuthorModel] = relationship(lazy="joined", innerjoin=True, viewonly=True)
55
56
57# we will explicitly define the schema instead of using DTO objects for clarity.
58
59
60class Author(BaseModel):
61 id: UUID | None
62 name: str
63 dob: date | None = None
64
65
66class AuthorCreate(BaseModel):
67 name: str
68 dob: date | None = None
69
70
71class AuthorUpdate(BaseModel):
72 name: str | None = None
73 dob: date | None = None
74
75
76class AuthorRepository(repository.SQLAlchemyAsyncRepository[AuthorModel]):
77 """Author repository."""
78
79 model_type = AuthorModel
80
81
82async def provide_authors_repo(db_session: AsyncSession) -> AuthorRepository:
83 """This provides the default Authors repository."""
84 return AuthorRepository(session=db_session)
85
86
87# we can optionally override the default `select` used for the repository to pass in
88# specific SQL options such as join details
89async def provide_author_details_repo(db_session: AsyncSession) -> AuthorRepository:
90 """This provides a simple example demonstrating how to override the join options for the repository."""
91 return AuthorRepository(
92 statement=select(AuthorModel).options(selectinload(AuthorModel.books)),
93 session=db_session,
94 )
95
96
97def provide_limit_offset_pagination(
98 current_page: int = Parameter(ge=1, query="currentPage", default=1, required=False),
99 page_size: int = Parameter(
100 query="pageSize",
101 ge=1,
102 default=10,
103 required=False,
104 ),
105) -> filters.LimitOffset:
106 """Add offset/limit pagination.
107
108 Return type consumed by `Repository.apply_limit_offset_pagination()`.
109
110 Parameters
111 ----------
112 current_page : int
113 LIMIT to apply to select.
114 page_size : int
115 OFFSET to apply to select.
116 """
117 return filters.LimitOffset(page_size, page_size * (current_page - 1))
118
119
120class AuthorController(Controller):
121 """Author CRUD"""
122
123 dependencies = {"authors_repo": Provide(provide_authors_repo)}
124
125 @get(path="/authors")
126 async def list_authors(
127 self,
128 authors_repo: AuthorRepository,
129 limit_offset: filters.LimitOffset,
130 ) -> OffsetPagination[Author]:
131 """List authors."""
132 results, total = await authors_repo.list_and_count(limit_offset)
133 type_adapter = TypeAdapter(list[Author])
134 return OffsetPagination[Author](
135 items=type_adapter.validate_python(results),
136 total=total,
137 limit=limit_offset.limit,
138 offset=limit_offset.offset,
139 )
140
141 @post(path="/authors")
142 async def create_author(
143 self,
144 authors_repo: AuthorRepository,
145 data: AuthorCreate,
146 ) -> Author:
147 """Create a new author."""
148 obj = await authors_repo.add(
149 AuthorModel(**data.model_dump(exclude_unset=True, exclude_none=True)),
150 )
151 await authors_repo.session.commit()
152 return Author.model_validate(obj)
153
154 # we override the authors_repo to use the version that joins the Books in
155 @get(path="/authors/{author_id:uuid}", dependencies={"authors_repo": Provide(provide_author_details_repo)})
156 async def get_author(
157 self,
158 authors_repo: AuthorRepository,
159 author_id: UUID = Parameter(
160 title="Author ID",
161 description="The author to retrieve.",
162 ),
163 ) -> Author:
164 """Get an existing author."""
165 obj = await authors_repo.get(author_id)
166 return Author.model_validate(obj)
167
168 @patch(
169 path="/authors/{author_id:uuid}",
170 dependencies={"authors_repo": Provide(provide_author_details_repo)},
171 )
172 async def update_author(
173 self,
174 authors_repo: AuthorRepository,
175 data: AuthorUpdate,
176 author_id: UUID = Parameter(
177 title="Author ID",
178 description="The author to update.",
179 ),
180 ) -> Author:
181 """Update an author."""
182 raw_obj = data.model_dump(exclude_unset=True, exclude_none=True)
183 raw_obj.update({"id": author_id})
184 obj = await authors_repo.update(AuthorModel(**raw_obj))
185 await authors_repo.session.commit()
186 return Author.from_orm(obj)
187
188 @delete(path="/authors/{author_id:uuid}")
189 async def delete_author(
190 self,
191 authors_repo: AuthorRepository,
192 author_id: UUID = Parameter(
193 title="Author ID",
194 description="The author to delete.",
195 ),
196 ) -> None:
197 """Delete a author from the system."""
198 _ = await authors_repo.delete(author_id)
199 await authors_repo.session.commit()
200
201
202session_config = AsyncSessionConfig(expire_on_commit=False)
203sqlalchemy_config = SQLAlchemyAsyncConfig(
204 connection_string="sqlite+aiosqlite:///test.sqlite", session_config=session_config
205) # Create 'db_session' dependency.
206sqlalchemy_plugin = SQLAlchemyInitPlugin(config=sqlalchemy_config)
207
208
209async def on_startup() -> None:
210 """Initializes the database."""
211 async with sqlalchemy_config.get_engine().begin() as conn:
212 await conn.run_sync(base.UUIDBase.metadata.create_all)
213
214
215app = Litestar(
216 route_handlers=[AuthorController],
217 on_startup=[on_startup],
218 plugins=[SQLAlchemyInitPlugin(config=sqlalchemy_config)],
219 dependencies={"limit_offset": Provide(provide_limit_offset_pagination)},
220)