SQLAlchemy Models & Repository#
Litestar comes with a built-in repository class
(SQLAlchemyAsyncRepository
)
for SQLAlchemy to make CRUD operations easier.
Features#
Pre-configured
DeclarativeBase
for SQLAlchemy 2.0 that includes a UUID or Big Integer based primary-key, a sentinel column, and an optional version with audit columns.Generic synchronous and asynchronous repositories for select, insert, update, and delete operations on SQLAlchemy models
Implements optimized methods for bulk inserts, updates, and deletes and uses lambda_stmt when possible.
Integrated counts, pagination, sorting, filtering with
LIKE
,IN
, and dates before and/or after.Tested support for multiple database backends including:
Postgres via asyncpg or psycopg3 (async or sync)
MySQL via asyncmy
Oracle via oracledb
Google Spanner via spanner-sqlalchemy
DuckDB via duckdb_engine
Microsoft SQL Server via pyodbc
Basic Use#
To use the SQLAlchemyAsyncRepository
repository, you must first define your models using one of the included built-in DeclarativeBase
ORM base
implementations:
Both include a UUID
based primary key
and UUIDAuditBase
includes an updated_at
and created_at
timestamp column.
The UUID
will be a native UUID
/GUID
type on databases that support it such as Postgres. For other engines without
a native UUID data type, the UUID is stored as a 16-byte BYTES
or RAW
field.
Both include a BigInteger
based primary key
and BigIntAuditBase
includes an updated_at
and created_at
timestamp column.
Models using these bases also include the following enhancements:
Auto-generated snake-case table name from class name
Pydantic BaseModel and Dict classes map to an optimized JSON type that is
JSONB
for Postgres, VARCHAR or BYTES with JSON check constraint for Oracle, andJSON
for other dialects.
from __future__ import annotations
import uuid
from datetime import date
from uuid import UUID
from sqlalchemy import ForeignKey, func, select
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession
from sqlalchemy.orm import Mapped, mapped_column, relationship
from litestar import Litestar, get
from litestar.contrib.sqlalchemy.base import UUIDAuditBase, UUIDBase
from litestar.contrib.sqlalchemy.plugins import AsyncSessionConfig, SQLAlchemyAsyncConfig, SQLAlchemyPlugin
# the SQLAlchemy base includes a declarative model for you to use in your models.
# The `Base` class includes a `UUID` based primary key (`id`)
class Author(UUIDBase):
name: Mapped[str]
dob: Mapped[date]
books: Mapped[list[Book]] = relationship(back_populates="author", lazy="selectin")
# The `AuditBase` class includes the same UUID` based primary key (`id`) and 2
# additional columns: `created_at` and `updated_at`. `created_at` is a timestamp of when the
# record created, and `updated_at` is the last time the record was modified.
class Book(UUIDAuditBase):
title: Mapped[str]
author_id: Mapped[UUID] = mapped_column(ForeignKey("author.id"))
author: Mapped[Author] = relationship(lazy="joined", innerjoin=True, viewonly=True)
session_config = AsyncSessionConfig(expire_on_commit=False)
sqlalchemy_config = SQLAlchemyAsyncConfig(
connection_string="sqlite+aiosqlite:///test.sqlite", session_config=session_config, create_all=True
) # Create 'async_session' dependency.
async def on_startup() -> None:
"""Adds some dummy data if no data is present."""
async with sqlalchemy_config.get_session() as session:
statement = select(func.count()).select_from(Author)
count = await session.execute(statement)
if not count.scalar():
author_id = uuid.uuid4()
session.add(Author(name="Stephen King", dob=date(1954, 9, 21), id=author_id))
session.add(Book(title="It", author_id=author_id))
await session.commit()
@get(path="/authors")
async def get_authors(db_session: AsyncSession, db_engine: AsyncEngine) -> list[Author]:
"""Interact with SQLAlchemy engine and session."""
return list(await db_session.scalars(select(Author)))
app = Litestar(
route_handlers=[get_authors],
on_startup=[on_startup],
debug=True,
plugins=[SQLAlchemyPlugin(config=sqlalchemy_config)],
)
Basic Controller Integration#
Once you have declared your models, you are ready to use the SQLAlchemyAsyncRepository
class with
your controllers and function based routes.
from __future__ import annotations
from datetime import date
from typing import TYPE_CHECKING
from uuid import UUID
from pydantic import BaseModel as _BaseModel
from pydantic import TypeAdapter
from sqlalchemy import ForeignKey, select
from sqlalchemy.orm import Mapped, mapped_column, relationship, selectinload
from litestar import Litestar, get
from litestar.contrib.sqlalchemy.base import UUIDAuditBase, UUIDBase
from litestar.contrib.sqlalchemy.plugins import AsyncSessionConfig, SQLAlchemyAsyncConfig, SQLAlchemyInitPlugin
from litestar.contrib.sqlalchemy.repository import SQLAlchemyAsyncRepository
from litestar.controller import Controller
from litestar.di import Provide
from litestar.handlers.http_handlers.decorators import delete, patch, post
from litestar.pagination import OffsetPagination
from litestar.params import Parameter
from litestar.repository.filters import LimitOffset
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession
class BaseModel(_BaseModel):
"""Extend Pydantic's BaseModel to enable ORM mode"""
model_config = {"from_attributes": True}
# the SQLAlchemy base includes a declarative model for you to use in your models.
# The `Base` class includes a `UUID` based primary key (`id`)
class AuthorModel(UUIDBase):
# we can optionally provide the table name instead of auto-generating it
__tablename__ = "author" # type: ignore[assignment]
name: Mapped[str]
dob: Mapped[date | None]
books: Mapped[list[BookModel]] = relationship(back_populates="author", lazy="noload")
# The `AuditBase` class includes the same UUID` based primary key (`id`) and 2
# additional columns: `created` and `updated`. `created` is a timestamp of when the
# record created, and `updated` is the last time the record was modified.
class BookModel(UUIDAuditBase):
__tablename__ = "book" # type: ignore[assignment]
title: Mapped[str]
author_id: Mapped[UUID] = mapped_column(ForeignKey("author.id"))
author: Mapped[AuthorModel] = relationship(lazy="joined", innerjoin=True, viewonly=True)
# we will explicitly define the schema instead of using DTO objects for clarity.
class Author(BaseModel):
id: UUID | None
name: str
dob: date | None = None
class AuthorCreate(BaseModel):
name: str
dob: date | None = None
class AuthorUpdate(BaseModel):
name: str | None = None
dob: date | None = None
class AuthorRepository(SQLAlchemyAsyncRepository[AuthorModel]):
"""Author repository."""
model_type = AuthorModel
async def provide_authors_repo(db_session: AsyncSession) -> AuthorRepository:
"""This provides the default Authors repository."""
return AuthorRepository(session=db_session)
# we can optionally override the default `select` used for the repository to pass in
# specific SQL options such as join details
async def provide_author_details_repo(db_session: AsyncSession) -> AuthorRepository:
"""This provides a simple example demonstrating how to override the join options for the repository."""
return AuthorRepository(
statement=select(AuthorModel).options(selectinload(AuthorModel.books)),
session=db_session,
)
def provide_limit_offset_pagination(
current_page: int = Parameter(ge=1, query="currentPage", default=1, required=False),
page_size: int = Parameter(
query="pageSize",
ge=1,
default=10,
required=False,
),
) -> LimitOffset:
"""Add offset/limit pagination.
Return type consumed by `Repository.apply_limit_offset_pagination()`.
Parameters
----------
current_page : int
LIMIT to apply to select.
page_size : int
OFFSET to apply to select.
"""
return LimitOffset(page_size, page_size * (current_page - 1))
class AuthorController(Controller):
"""Author CRUD"""
dependencies = {"authors_repo": Provide(provide_authors_repo)}
@get(path="/authors")
async def list_authors(
self,
authors_repo: AuthorRepository,
limit_offset: LimitOffset,
) -> OffsetPagination[Author]:
"""List authors."""
results, total = await authors_repo.list_and_count(limit_offset)
type_adapter = TypeAdapter(list[Author])
return OffsetPagination[Author](
items=type_adapter.validate_python(results),
total=total,
limit=limit_offset.limit,
offset=limit_offset.offset,
)
@post(path="/authors")
async def create_author(
self,
authors_repo: AuthorRepository,
data: AuthorCreate,
) -> Author:
"""Create a new author."""
obj = await authors_repo.add(
AuthorModel(**data.model_dump(exclude_unset=True, exclude_none=True)),
)
await authors_repo.session.commit()
return Author.model_validate(obj)
# we override the authors_repo to use the version that joins the Books in
@get(path="/authors/{author_id:uuid}", dependencies={"authors_repo": Provide(provide_author_details_repo)})
async def get_author(
self,
authors_repo: AuthorRepository,
author_id: UUID = Parameter(
title="Author ID",
description="The author to retrieve.",
),
) -> Author:
"""Get an existing author."""
obj = await authors_repo.get(author_id)
return Author.model_validate(obj)
@patch(
path="/authors/{author_id:uuid}",
dependencies={"authors_repo": Provide(provide_author_details_repo)},
)
async def update_author(
self,
authors_repo: AuthorRepository,
data: AuthorUpdate,
author_id: UUID = Parameter(
title="Author ID",
description="The author to update.",
),
) -> Author:
"""Update an author."""
raw_obj = data.model_dump(exclude_unset=True, exclude_none=True)
raw_obj.update({"id": author_id})
obj = await authors_repo.update(AuthorModel(**raw_obj))
await authors_repo.session.commit()
return Author.from_orm(obj)
@delete(path="/authors/{author_id:uuid}")
async def delete_author(
self,
authors_repo: AuthorRepository,
author_id: UUID = Parameter(
title="Author ID",
description="The author to delete.",
),
) -> None:
"""Delete a author from the system."""
_ = await authors_repo.delete(author_id)
await authors_repo.session.commit()
session_config = AsyncSessionConfig(expire_on_commit=False)
sqlalchemy_config = SQLAlchemyAsyncConfig(
connection_string="sqlite+aiosqlite:///test.sqlite", session_config=session_config
) # Create 'db_session' dependency.
sqlalchemy_plugin = SQLAlchemyInitPlugin(config=sqlalchemy_config)
async def on_startup() -> None:
"""Initializes the database."""
async with sqlalchemy_config.get_engine().begin() as conn:
await conn.run_sync(UUIDBase.metadata.create_all)
app = Litestar(
route_handlers=[AuthorController],
on_startup=[on_startup],
plugins=[SQLAlchemyInitPlugin(config=sqlalchemy_config)],
dependencies={"limit_offset": Provide(provide_limit_offset_pagination)},
)
Alternately, you may use the SQLAlchemySyncRepository
class for your synchronous database connection.
from __future__ import annotations
from datetime import date
from typing import TYPE_CHECKING
from uuid import UUID
from pydantic import BaseModel as _BaseModel
from pydantic import TypeAdapter
from sqlalchemy import ForeignKey, select
from sqlalchemy.orm import Mapped, mapped_column, relationship, selectinload
from litestar import Litestar, get
from litestar.contrib.sqlalchemy.base import UUIDAuditBase, UUIDBase
from litestar.contrib.sqlalchemy.plugins.init import SQLAlchemyInitPlugin, SQLAlchemySyncConfig
from litestar.contrib.sqlalchemy.repository import SQLAlchemySyncRepository
from litestar.controller import Controller
from litestar.di import Provide
from litestar.handlers.http_handlers.decorators import delete, patch, post
from litestar.pagination import OffsetPagination
from litestar.params import Parameter
from litestar.repository.filters import LimitOffset
if TYPE_CHECKING:
from sqlalchemy.orm import Session
class BaseModel(_BaseModel):
"""Extend Pydantic's BaseModel to enable ORM mode"""
model_config = {"from_attributes": True}
# the SQLAlchemy base includes a declarative model for you to use in your models.
# The `Base` class includes a `UUID` based primary key (`id`)
class AuthorModel(UUIDBase):
# we can optionally provide the table name instead of auto-generating it
__tablename__ = "author" # type: ignore[assignment]
name: Mapped[str]
dob: Mapped[date | None]
books: Mapped[list[BookModel]] = relationship(back_populates="author", lazy="noload")
# The `AuditBase` class includes the same UUID` based primary key (`id`) and 2
# additional columns: `created` and `updated`. `created` is a timestamp of when the
# record created, and `updated` is the last time the record was modified.
class BookModel(UUIDAuditBase):
__tablename__ = "book" # type: ignore[assignment]
title: Mapped[str]
author_id: Mapped[UUID] = mapped_column(ForeignKey("author.id"))
author: Mapped[AuthorModel] = relationship(lazy="joined", innerjoin=True, viewonly=True)
# we will explicitly define the schema instead of using DTO objects for clarity.
class Author(BaseModel):
id: UUID | None
name: str
dob: date | None = None
class AuthorCreate(BaseModel):
name: str
dob: date | None = None
class AuthorUpdate(BaseModel):
name: str | None = None
dob: date | None = None
class AuthorRepository(SQLAlchemySyncRepository[AuthorModel]):
"""Author repository."""
model_type = AuthorModel
async def provide_authors_repo(db_session: Session) -> AuthorRepository:
"""This provides the default Authors repository."""
return AuthorRepository(session=db_session)
# we can optionally override the default `select` used for the repository to pass in
# specific SQL options such as join details
async def provide_author_details_repo(db_session: Session) -> AuthorRepository:
"""This provides a simple example demonstrating how to override the join options
for the repository."""
return AuthorRepository(
statement=select(AuthorModel).options(selectinload(AuthorModel.books)),
session=db_session,
)
def provide_limit_offset_pagination(
current_page: int = Parameter(ge=1, query="currentPage", default=1, required=False),
page_size: int = Parameter(
query="pageSize",
ge=1,
default=10,
required=False,
),
) -> LimitOffset:
"""Add offset/limit pagination.
Return type consumed by `Repository.apply_limit_offset_pagination()`.
Parameters
----------
current_page : int
LIMIT to apply to select.
page_size : int
OFFSET to apply to select.
"""
return LimitOffset(page_size, page_size * (current_page - 1))
class AuthorController(Controller):
"""Author CRUD"""
dependencies = {"authors_repo": Provide(provide_authors_repo, sync_to_thread=False)}
@get(path="/authors")
def list_authors(
self,
authors_repo: AuthorRepository,
limit_offset: LimitOffset,
) -> OffsetPagination[Author]:
"""List authors."""
results, total = authors_repo.list_and_count(limit_offset)
type_adapter = TypeAdapter(list[Author])
return OffsetPagination[Author](
items=type_adapter.validate_python(results),
total=total,
limit=limit_offset.limit,
offset=limit_offset.offset,
)
@post(path="/authors")
def create_author(
self,
authors_repo: AuthorRepository,
data: AuthorCreate,
) -> Author:
"""Create a new author."""
obj = authors_repo.add(
AuthorModel(**data.model_dump(exclude_unset=True, exclude_none=True)),
)
authors_repo.session.commit()
return Author.model_validate(obj)
# we override the authors_repo to use the version that joins the Books in
@get(
path="/authors/{author_id:uuid}",
dependencies={"authors_repo": Provide(provide_author_details_repo, sync_to_thread=False)},
)
def get_author(
self,
authors_repo: AuthorRepository,
author_id: UUID = Parameter(
title="Author ID",
description="The author to retrieve.",
),
) -> Author:
"""Get an existing author."""
obj = authors_repo.get(author_id)
return Author.model_validate(obj)
@patch(
path="/authors/{author_id:uuid}",
dependencies={"authors_repo": Provide(provide_author_details_repo, sync_to_thread=False)},
)
def update_author(
self,
authors_repo: AuthorRepository,
data: AuthorUpdate,
author_id: UUID = Parameter(
title="Author ID",
description="The author to update.",
),
) -> Author:
"""Update an author."""
raw_obj = data.model_dump(exclude_unset=True, exclude_none=True)
raw_obj.update({"id": author_id})
obj = authors_repo.update(AuthorModel(**raw_obj))
authors_repo.session.commit()
return Author.model_validate(obj)
@delete(path="/authors/{author_id:uuid}")
def delete_author(
self,
authors_repo: AuthorRepository,
author_id: UUID = Parameter(
title="Author ID",
description="The author to delete.",
),
) -> None:
"""Delete a author from the system."""
_ = authors_repo.delete(author_id)
authors_repo.session.commit()
sqlalchemy_config = SQLAlchemySyncConfig(connection_string="sqlite:///test.sqlite") # Create 'db_session' dependency.
sqlalchemy_plugin = SQLAlchemyInitPlugin(config=sqlalchemy_config)
def on_startup() -> None:
"""Initializes the database."""
with sqlalchemy_config.get_engine().begin() as conn:
UUIDBase.metadata.create_all(conn)
app = Litestar(
route_handlers=[AuthorController],
on_startup=[on_startup],
plugins=[SQLAlchemyInitPlugin(config=sqlalchemy_config)],
dependencies={"limit_offset": Provide(provide_limit_offset_pagination)},
)
See also