Working with Controllers and Repositories#

We’ve been working our way up the stack, starting with the database models, and now we are ready to use the repository in an actual route. Let’s see how we can use this in a controller.

Tip

The full code for this tutorial can be found below in the Full Code section.

First, we create a simple function that returns an instance of AuthorRepository. This function will be used to inject a repository instance into our controller routes. Note that we are only passing in the database session in this example with no other parameters.

app.py#
1async def provide_authors_repo(db_session: AsyncSession) -> AuthorRepository:
2    """This provides the default Authors repository."""
3    return AuthorRepository(session=db_session)

Because we’ll be using the SQLAlchemy plugin in Litestar, the session is automatically configured as a dependency.

By default, the repository doesn’t add any additional query options to your base statement, but provides the flexibility to override it, allowing you to pass your own statement:

app.py#
1# we can optionally override the default `select` used for the repository to pass in
2# specific SQL options such as join details
3async def provide_author_details_repo(db_session: AsyncSession) -> AuthorRepository:
4    """This provides a simple example demonstrating how to override the join options for the repository."""
5    return AuthorRepository(
6        statement=select(AuthorModel).options(selectinload(AuthorModel.books)),
7        session=db_session,
8    )

In this instance, we enhance the repository function by adding a selectinload option. This option configures the specified relationship to load via SELECT .. IN … loading pattern, optimizing the query execution.

Next, we define the AuthorController. This controller exposes five routes for interacting with the Author model:

AuthorController (click to expand)
app.py#
 1class AuthorController(Controller):
 2    """Author CRUD"""
 3
 4    dependencies = {"authors_repo": Provide(provide_authors_repo)}
 5
 6    @get(path="/authors")
 7    async def list_authors(
 8        self,
 9        authors_repo: AuthorRepository,
10        limit_offset: LimitOffset,
11    ) -> OffsetPagination[Author]:
12        """List authors."""
13        results, total = await authors_repo.list_and_count(limit_offset)
14        type_adapter = TypeAdapter(list[Author])
15        return OffsetPagination[Author](
16            items=type_adapter.validate_python(results),
17            total=total,
18            limit=limit_offset.limit,
19            offset=limit_offset.offset,
20        )
21
22    @post(path="/authors")
23    async def create_author(
24        self,
25        authors_repo: AuthorRepository,
26        data: AuthorCreate,
27    ) -> Author:
28        """Create a new author."""
29        obj = await authors_repo.add(
30            AuthorModel(**data.model_dump(exclude_unset=True, exclude_none=True)),
31        )
32        await authors_repo.session.commit()
33        return Author.model_validate(obj)
34
35    # we override the authors_repo to use the version that joins the Books in
36    @get(path="/authors/{author_id:uuid}", dependencies={"authors_repo": Provide(provide_author_details_repo)})
37    async def get_author(
38        self,
39        authors_repo: AuthorRepository,
40        author_id: UUID = Parameter(
41            title="Author ID",
42            description="The author to retrieve.",
43        ),
44    ) -> Author:
45        """Get an existing author."""
46        obj = await authors_repo.get(author_id)
47        return Author.model_validate(obj)
48
49    @patch(
50        path="/authors/{author_id:uuid}",
51        dependencies={"authors_repo": Provide(provide_author_details_repo)},
52    )
53    async def update_author(
54        self,
55        authors_repo: AuthorRepository,
56        data: AuthorUpdate,
57        author_id: UUID = Parameter(
58            title="Author ID",
59            description="The author to update.",
60        ),
61    ) -> Author:
62        """Update an author."""
63        raw_obj = data.model_dump(exclude_unset=True, exclude_none=True)
64        raw_obj.update({"id": author_id})
65        obj = await authors_repo.update(AuthorModel(**raw_obj))
66        await authors_repo.session.commit()
67        return Author.from_orm(obj)
68
69    @delete(path="/authors/{author_id:uuid}")
70    async def delete_author(
71        self,
72        authors_repo: AuthorRepository,
73        author_id: UUID = Parameter(
74            title="Author ID",
75            description="The author to delete.",
76        ),
77    ) -> None:
78        """Delete a author from the system."""
79        _ = await authors_repo.delete(author_id)

In our list detail endpoint, we use the pagination filter for limiting the amount of data returned, allowing us to retrieve large datasets in smaller, more manageable chunks.

In the above examples, we’ve used the asynchronous repository implementation. However, Litestar also supports synchronous database drivers with an identical implementation. Here’s a corresponding synchronous version of the previous example:

Synchronous Repository (click to expand)
app.py#
  1from __future__ import annotations
  2
  3from datetime import date
  4from typing import TYPE_CHECKING
  5from uuid import UUID
  6
  7from pydantic import BaseModel as _BaseModel
  8from pydantic import TypeAdapter
  9from sqlalchemy import ForeignKey, select
 10from sqlalchemy.orm import Mapped, mapped_column, relationship, selectinload
 11
 12from litestar import Litestar, get
 13from litestar.contrib.sqlalchemy.base import UUIDAuditBase, UUIDBase
 14from litestar.contrib.sqlalchemy.plugins.init import SQLAlchemyInitPlugin, SQLAlchemySyncConfig
 15from litestar.contrib.sqlalchemy.repository import SQLAlchemySyncRepository
 16from litestar.controller import Controller
 17from litestar.di import Provide
 18from litestar.handlers.http_handlers.decorators import delete, patch, post
 19from litestar.pagination import OffsetPagination
 20from litestar.params import Parameter
 21from litestar.repository.filters import LimitOffset
 22
 23if TYPE_CHECKING:
 24    from sqlalchemy.orm import Session
 25
 26
 27class BaseModel(_BaseModel):
 28    """Extend Pydantic's BaseModel to enable ORM mode"""
 29
 30    model_config = {"from_attributes": True}
 31
 32
 33# the SQLAlchemy base includes a declarative model for you to use in your models.
 34# The `Base` class includes a `UUID` based primary key (`id`)
 35class AuthorModel(UUIDBase):
 36    # we can optionally provide the table name instead of auto-generating it
 37    __tablename__ = "author"  #  type: ignore[assignment]
 38    name: Mapped[str]
 39    dob: Mapped[date | None]
 40    books: Mapped[list[BookModel]] = relationship(back_populates="author", lazy="noload")
 41
 42
 43# The `AuditBase` class includes the same UUID` based primary key (`id`) and 2
 44# additional columns: `created` and `updated`. `created` is a timestamp of when the
 45# record created, and `updated` is the last time the record was modified.
 46class BookModel(UUIDAuditBase):
 47    __tablename__ = "book"  #  type: ignore[assignment]
 48    title: Mapped[str]
 49    author_id: Mapped[UUID] = mapped_column(ForeignKey("author.id"))
 50    author: Mapped[AuthorModel] = relationship(lazy="joined", innerjoin=True, viewonly=True)
 51
 52
 53# we will explicitly define the schema instead of using DTO objects for clarity.
 54
 55
 56class Author(BaseModel):
 57    id: UUID | None
 58    name: str
 59    dob: date | None = None
 60
 61
 62class AuthorCreate(BaseModel):
 63    name: str
 64    dob: date | None = None
 65
 66
 67class AuthorUpdate(BaseModel):
 68    name: str | None = None
 69    dob: date | None = None
 70
 71
 72class AuthorRepository(SQLAlchemySyncRepository[AuthorModel]):
 73    """Author repository."""
 74
 75    model_type = AuthorModel
 76
 77
 78async def provide_authors_repo(db_session: Session) -> AuthorRepository:
 79    """This provides the default Authors repository."""
 80    return AuthorRepository(session=db_session)
 81
 82
 83# we can optionally override the default `select` used for the repository to pass in
 84# specific SQL options such as join details
 85async def provide_author_details_repo(db_session: Session) -> AuthorRepository:
 86    """This provides a simple example demonstrating how to override the join options
 87    for the repository."""
 88    return AuthorRepository(
 89        statement=select(AuthorModel).options(selectinload(AuthorModel.books)),
 90        session=db_session,
 91    )
 92
 93
 94def provide_limit_offset_pagination(
 95    current_page: int = Parameter(ge=1, query="currentPage", default=1, required=False),
 96    page_size: int = Parameter(
 97        query="pageSize",
 98        ge=1,
 99        default=10,
100        required=False,
101    ),
102) -> LimitOffset:
103    """Add offset/limit pagination.
104
105    Return type consumed by `Repository.apply_limit_offset_pagination()`.
106
107    Parameters
108    ----------
109    current_page : int
110        LIMIT to apply to select.
111    page_size : int
112        OFFSET to apply to select.
113    """
114    return LimitOffset(page_size, page_size * (current_page - 1))
115
116
117class AuthorController(Controller):
118    """Author CRUD"""
119
120    dependencies = {"authors_repo": Provide(provide_authors_repo, sync_to_thread=False)}
121
122    @get(path="/authors")
123    def list_authors(
124        self,
125        authors_repo: AuthorRepository,
126        limit_offset: LimitOffset,
127    ) -> OffsetPagination[Author]:
128        """List authors."""
129        results, total = authors_repo.list_and_count(limit_offset)
130        type_adapter = TypeAdapter(list[Author])
131        return OffsetPagination[Author](
132            items=type_adapter.validate_python(results),
133            total=total,
134            limit=limit_offset.limit,
135            offset=limit_offset.offset,
136        )
137
138    @post(path="/authors")
139    def create_author(
140        self,
141        authors_repo: AuthorRepository,
142        data: AuthorCreate,
143    ) -> Author:
144        """Create a new author."""
145        obj = authors_repo.add(
146            AuthorModel(**data.model_dump(exclude_unset=True, exclude_none=True)),
147        )
148        authors_repo.session.commit()
149        return Author.model_validate(obj)
150
151    # we override the authors_repo to use the version that joins the Books in
152    @get(
153        path="/authors/{author_id:uuid}",
154        dependencies={"authors_repo": Provide(provide_author_details_repo, sync_to_thread=False)},
155    )
156    def get_author(
157        self,
158        authors_repo: AuthorRepository,
159        author_id: UUID = Parameter(
160            title="Author ID",
161            description="The author to retrieve.",
162        ),
163    ) -> Author:
164        """Get an existing author."""
165        obj = authors_repo.get(author_id)
166        return Author.model_validate(obj)
167
168    @patch(
169        path="/authors/{author_id:uuid}",
170        dependencies={"authors_repo": Provide(provide_author_details_repo, sync_to_thread=False)},
171    )
172    def update_author(
173        self,
174        authors_repo: AuthorRepository,
175        data: AuthorUpdate,
176        author_id: UUID = Parameter(
177            title="Author ID",
178            description="The author to update.",
179        ),
180    ) -> Author:
181        """Update an author."""
182        raw_obj = data.model_dump(exclude_unset=True, exclude_none=True)
183        raw_obj.update({"id": author_id})
184        obj = authors_repo.update(AuthorModel(**raw_obj))
185        authors_repo.session.commit()
186        return Author.model_validate(obj)
187
188    @delete(path="/authors/{author_id:uuid}")
189    def delete_author(
190        self,
191        authors_repo: AuthorRepository,
192        author_id: UUID = Parameter(
193            title="Author ID",
194            description="The author to delete.",
195        ),
196    ) -> None:
197        """Delete a author from the system."""
198        _ = authors_repo.delete(author_id)
199        authors_repo.session.commit()
200
201
202sqlalchemy_config = SQLAlchemySyncConfig(connection_string="sqlite:///test.sqlite")  # Create 'db_session' dependency.
203sqlalchemy_plugin = SQLAlchemyInitPlugin(config=sqlalchemy_config)
204
205
206def on_startup() -> None:
207    """Initializes the database."""
208    with sqlalchemy_config.get_engine().begin() as conn:
209        UUIDBase.metadata.create_all(conn)
210
211
212app = Litestar(
213    route_handlers=[AuthorController],
214    on_startup=[on_startup],
215    plugins=[SQLAlchemyInitPlugin(config=sqlalchemy_config)],
216    dependencies={"limit_offset": Provide(provide_limit_offset_pagination)},
217)

The examples above enable a feature-complete CRUD service that includes pagination! In the next section, we’ll explore how to extend the built-in repository to add additional functionality to our application.

Full Code#

Full Code (click to expand)
app.py#
  1from __future__ import annotations
  2
  3from datetime import date
  4from typing import TYPE_CHECKING
  5from uuid import UUID
  6
  7from pydantic import BaseModel as _BaseModel
  8from pydantic import TypeAdapter
  9from sqlalchemy import ForeignKey, select
 10from sqlalchemy.orm import Mapped, mapped_column, relationship, selectinload
 11
 12from litestar import Litestar, get
 13from litestar.contrib.sqlalchemy.base import UUIDAuditBase, UUIDBase
 14from litestar.contrib.sqlalchemy.plugins import AsyncSessionConfig, SQLAlchemyAsyncConfig, SQLAlchemyInitPlugin
 15from litestar.contrib.sqlalchemy.repository import SQLAlchemyAsyncRepository
 16from litestar.controller import Controller
 17from litestar.di import Provide
 18from litestar.handlers.http_handlers.decorators import delete, patch, post
 19from litestar.pagination import OffsetPagination
 20from litestar.params import Parameter
 21from litestar.repository.filters import LimitOffset
 22
 23if TYPE_CHECKING:
 24    from sqlalchemy.ext.asyncio import AsyncSession
 25
 26
 27class BaseModel(_BaseModel):
 28    """Extend Pydantic's BaseModel to enable ORM mode"""
 29
 30    model_config = {"from_attributes": True}
 31
 32
 33# the SQLAlchemy base includes a declarative model for you to use in your models.
 34# The `Base` class includes a `UUID` based primary key (`id`)
 35class AuthorModel(UUIDBase):
 36    # we can optionally provide the table name instead of auto-generating it
 37    __tablename__ = "author"  #  type: ignore[assignment]
 38    name: Mapped[str]
 39    dob: Mapped[date | None]
 40    books: Mapped[list[BookModel]] = relationship(back_populates="author", lazy="noload")
 41
 42
 43# The `AuditBase` class includes the same UUID` based primary key (`id`) and 2
 44# additional columns: `created` and `updated`. `created` is a timestamp of when the
 45# record created, and `updated` is the last time the record was modified.
 46class BookModel(UUIDAuditBase):
 47    __tablename__ = "book"  #  type: ignore[assignment]
 48    title: Mapped[str]
 49    author_id: Mapped[UUID] = mapped_column(ForeignKey("author.id"))
 50    author: Mapped[AuthorModel] = relationship(lazy="joined", innerjoin=True, viewonly=True)
 51
 52
 53# we will explicitly define the schema instead of using DTO objects for clarity.
 54
 55
 56class Author(BaseModel):
 57    id: UUID | None
 58    name: str
 59    dob: date | None = None
 60
 61
 62class AuthorCreate(BaseModel):
 63    name: str
 64    dob: date | None = None
 65
 66
 67class AuthorUpdate(BaseModel):
 68    name: str | None = None
 69    dob: date | None = None
 70
 71
 72class AuthorRepository(SQLAlchemyAsyncRepository[AuthorModel]):
 73    """Author repository."""
 74
 75    model_type = AuthorModel
 76
 77
 78async def provide_authors_repo(db_session: AsyncSession) -> AuthorRepository:
 79    """This provides the default Authors repository."""
 80    return AuthorRepository(session=db_session)
 81
 82
 83# we can optionally override the default `select` used for the repository to pass in
 84# specific SQL options such as join details
 85async def provide_author_details_repo(db_session: AsyncSession) -> AuthorRepository:
 86    """This provides a simple example demonstrating how to override the join options for the repository."""
 87    return AuthorRepository(
 88        statement=select(AuthorModel).options(selectinload(AuthorModel.books)),
 89        session=db_session,
 90    )
 91
 92
 93def provide_limit_offset_pagination(
 94    current_page: int = Parameter(ge=1, query="currentPage", default=1, required=False),
 95    page_size: int = Parameter(
 96        query="pageSize",
 97        ge=1,
 98        default=10,
 99        required=False,
100    ),
101) -> LimitOffset:
102    """Add offset/limit pagination.
103
104    Return type consumed by `Repository.apply_limit_offset_pagination()`.
105
106    Parameters
107    ----------
108    current_page : int
109        LIMIT to apply to select.
110    page_size : int
111        OFFSET to apply to select.
112    """
113    return LimitOffset(page_size, page_size * (current_page - 1))
114
115
116class AuthorController(Controller):
117    """Author CRUD"""
118
119    dependencies = {"authors_repo": Provide(provide_authors_repo)}
120
121    @get(path="/authors")
122    async def list_authors(
123        self,
124        authors_repo: AuthorRepository,
125        limit_offset: LimitOffset,
126    ) -> OffsetPagination[Author]:
127        """List authors."""
128        results, total = await authors_repo.list_and_count(limit_offset)
129        type_adapter = TypeAdapter(list[Author])
130        return OffsetPagination[Author](
131            items=type_adapter.validate_python(results),
132            total=total,
133            limit=limit_offset.limit,
134            offset=limit_offset.offset,
135        )
136
137    @post(path="/authors")
138    async def create_author(
139        self,
140        authors_repo: AuthorRepository,
141        data: AuthorCreate,
142    ) -> Author:
143        """Create a new author."""
144        obj = await authors_repo.add(
145            AuthorModel(**data.model_dump(exclude_unset=True, exclude_none=True)),
146        )
147        await authors_repo.session.commit()
148        return Author.model_validate(obj)
149
150    # we override the authors_repo to use the version that joins the Books in
151    @get(path="/authors/{author_id:uuid}", dependencies={"authors_repo": Provide(provide_author_details_repo)})
152    async def get_author(
153        self,
154        authors_repo: AuthorRepository,
155        author_id: UUID = Parameter(
156            title="Author ID",
157            description="The author to retrieve.",
158        ),
159    ) -> Author:
160        """Get an existing author."""
161        obj = await authors_repo.get(author_id)
162        return Author.model_validate(obj)
163
164    @patch(
165        path="/authors/{author_id:uuid}",
166        dependencies={"authors_repo": Provide(provide_author_details_repo)},
167    )
168    async def update_author(
169        self,
170        authors_repo: AuthorRepository,
171        data: AuthorUpdate,
172        author_id: UUID = Parameter(
173            title="Author ID",
174            description="The author to update.",
175        ),
176    ) -> Author:
177        """Update an author."""
178        raw_obj = data.model_dump(exclude_unset=True, exclude_none=True)
179        raw_obj.update({"id": author_id})
180        obj = await authors_repo.update(AuthorModel(**raw_obj))
181        await authors_repo.session.commit()
182        return Author.from_orm(obj)
183
184    @delete(path="/authors/{author_id:uuid}")
185    async def delete_author(
186        self,
187        authors_repo: AuthorRepository,
188        author_id: UUID = Parameter(
189            title="Author ID",
190            description="The author to delete.",
191        ),
192    ) -> None:
193        """Delete a author from the system."""
194        _ = await authors_repo.delete(author_id)
195        await authors_repo.session.commit()
196
197
198session_config = AsyncSessionConfig(expire_on_commit=False)
199sqlalchemy_config = SQLAlchemyAsyncConfig(
200    connection_string="sqlite+aiosqlite:///test.sqlite", session_config=session_config
201)  # Create 'db_session' dependency.
202sqlalchemy_plugin = SQLAlchemyInitPlugin(config=sqlalchemy_config)
203
204
205async def on_startup() -> None:
206    """Initializes the database."""
207    async with sqlalchemy_config.get_engine().begin() as conn:
208        await conn.run_sync(UUIDBase.metadata.create_all)
209
210
211app = Litestar(
212    route_handlers=[AuthorController],
213    on_startup=[on_startup],
214    plugins=[SQLAlchemyInitPlugin(config=sqlalchemy_config)],
215    dependencies={"limit_offset": Provide(provide_limit_offset_pagination)},
216)