Interacting with repositories#

Now that we’ve covered the modeling basics, we are able to create our first repository class. The repository classes include all of the standard CRUD operations as well as a few advanced features such as pagination, filtering and bulk operations.

Tip

The full code for this tutorial can be found below in the Full Code section.

Before we jump in to the code, let’s take a look at the available functions available in the the synchronous and asynchronous repositories.

Available Functions in the Repositories

Function

Category

Description

get

Selecting Data

Select a single record by primary key. Raises an exception when no record is found.

get_one

Selecting Data

Select a single record specified by the kwargs parameters. Raises an exception when no record is found.

get_one_or_none

Selecting Data

Select a single record specified by the kwargs parameters. Returns None when no record is found.

list

Selecting Data

Select a list of records specified by the kwargs parameters. Optionally it can be filtered by the included FilterTypes args.

list_and_count

Selecting Data

Select a list of records specified by the kwargs parameters. Optionally it can be filtered by the included FilterTypes args. Results are returned as a 2 value tuple that includes the rows selected and the total count of records.

get_or_create

Creating Data

Select a single record specified by the the kwargs parameters. If no record is found, one is created with the given values. There’s an optional attribute to filter on a subset of the supplied parameters and to merge updates.

create

Creating Data

Create a new record in the database.

create_many

Creating Data

Create one or more rows in the database.

update

Updating Data

Update an existing record in the database.

update_many

Updating Data

Update one or more rows in the database.

upsert

Updating Data

A single operation that updates or inserts a record based whether or not the primary key value on the model object is populated.

upsert_many

Updating Data

Updates or inserts multiple records based whether or not the primary key value on the model object is populated.

remove

Removing Data

Remove a single record from the database.

remove_many

Removing Data

Remove one or more records from the database.

Note

  • All three of the bulk DML operations will leverage dialect-specific enhancements to be as efficient as possible. In addition to using efficient bulk inserts binds, the repository will optionally leverage the multi-row RETURNING support where possible. The repository will automatically detect this support from the SQLAlchemy driver, so no additional interaction is required to enable this.

  • SQL engines generally have a limit to the number of elements that can be appended into an IN clause. The repository operations will automatically break lists that exceed this limit into multiple queries that are concatenated together before return. You do not need to account for this in your own code.

In the following examples, we’ll cover a few ways that you can use the repository within your applications.

Model Repository#

Here we import the SQLAlchemyAsyncRepository class and create an AuthorRepository repository class. This is all that’s required to include all of the integrated repository features.

app.py#
1from litestar.contrib.sqlalchemy.repository import SQLAlchemyAsyncRepository
2class AuthorRepository(SQLAlchemyAsyncRepository[Author]):
3    """Author repository."""
4
5    model_type = Author

Repository Context Manager#

Since we’ll be using the repository outside of a Litestar application in this script, we’ll make a simple context manager to automatically handle the creation (and cleanup) of our Author repository.

The repository_factory method will do the following for us:
  • Automatically create a new DB session from the SQLAlchemy configuration.

  • Rollback session when any exception occurs.

  • Automatically commit after function call completes.

app.py#
1# let's make a simple context manager as an example here.
2@asynccontextmanager
3async def repository_factory() -> AsyncIterator[AuthorRepository]:
4    async with session_factory() as db_session:
5        try:
6            yield AuthorRepository(session=db_session)
7        except Exception:  # noqa: BLE001
8            await db_session.rollback()

Creating, Updating and Removing Data#

To illustrate a few ways you can manipulate data in your database, we’ll go through the various CRUD operations:

Creating Data: Here’s a simple insert operation to populate our new Author table:

app.py#
 1async def create_author() -> Author:
 2    async with repository_factory() as repo:
 3        obj = await repo.add(
 4            Author(
 5                name="F. Scott Fitzgerald",
 6                dob=datetime.strptime("1896-09-24", "%Y-%m-%d").date(),
 7            )
 8        )
 9        console.print(f"Created Author record for {obj.name} with primary key {obj.id}.")
10        return obj

Updating Data: The update method will ensure any updates made to the model object are executed on the database:

app.py#
1async def update_author(obj: Author) -> Author:
2    async with repository_factory() as repo:
3        obj = await repo.update(obj)
4        console.print(f"Updated Author record for {obj.name} with primary key {obj.id}.")
5        return obj

Removing Data: The remove method accepts the primary key of the row you want to delete:

app.py#
1async def remove_author(id: UUID) -> Author:
2    async with repository_factory() as repo:
3        obj = await repo.delete(id)
4        console.print(f"Deleted Author record for {obj.name} with primary key {obj.id}.")
5        return obj

Now that we’ve seen how to do single-row operations, let’s look at the bulk methods we can use.

Working with Bulk Data Operations#

In this section, we delve into the powerful capabilities of the repository classes for handling bulk data operations. Our example illustrates how we can efficiently manage data in our database. Specifically, we’ll use a JSON file containing information about US states and their abbreviations.

Here’s what we’re going to cover:

Fixture Data Loading#

We will introduce a method for loading fixture data. Fixture data is sample data that populates your database and helps test the behavior of your application under realistic conditions. This pattern can be extended and adjusted to meet your needs.

app.py#
 1def open_fixture(fixtures_path: Path, fixture_name: str) -> Any:
 2    """Loads JSON file with the specified fixture name
 3
 4    Args:
 5        fixtures_path (Path): The path to look for fixtures
 6        fixture_name (str): The fixture name to load.
 7
 8    Raises:
 9        FileNotFoundError: Fixtures not found.
10
11    Returns:
12        Any: The parsed JSON data
13    """
14    fixture = Path(fixtures_path / f"{fixture_name}.json")
15    if fixture.exists():
16        with fixture.open(mode="r", encoding="utf-8") as f:
17            f_data = f.read()
18        return json.loads(f_data)
19    raise FileNotFoundError(f"Could not find the {fixture_name} fixture")

You can review the JSON source file here:

US State Lookup JSON

You can download it: /examples/contrib/sqlalchemy/us_state_lookup.json or view below:

us_state_lookup.json#
[
  {
    "name": "Alabama",
    "abbreviation": "AL"
  },
  {
    "name": "Alaska",
    "abbreviation": "AK"
  },
  {
    "name": "Arizona",
    "abbreviation": "AZ"
  },
  {
    "name": "Arkansas",
    "abbreviation": "AR"
  },
  {
    "name": "California",
    "abbreviation": "CA"
  },
  {
    "name": "Colorado",
    "abbreviation": "CO"
  },
  {
    "name": "Connecticut",
    "abbreviation": "CT"
  },
  {
    "name": "Delaware",
    "abbreviation": "DE"
  },
  {
    "name": "District Of Columbia",
    "abbreviation": "DC"
  },
  {
    "name": "Florida",
    "abbreviation": "FL"
  },
  {
    "name": "Georgia",
    "abbreviation": "GA"
  },
  {
    "name": "Guam",
    "abbreviation": "GU"
  },
  {
    "name": "Hawaii",
    "abbreviation": "HI"
  },
  {
    "name": "Idaho",
    "abbreviation": "ID"
  },
  {
    "name": "Illinois",
    "abbreviation": "IL"
  },
  {
    "name": "Indiana",
    "abbreviation": "IN"
  },
  {
    "name": "Iowa",
    "abbreviation": "IA"
  },
  {
    "name": "Kansas",
    "abbreviation": "KS"
  },
  {
    "name": "Kentucky",
    "abbreviation": "KY"
  },
  {
    "name": "Louisiana",
    "abbreviation": "LA"
  },
  {
    "name": "Maine",
    "abbreviation": "ME"
  },
  {
    "name": "Maryland",
    "abbreviation": "MD"
  },
  {
    "name": "Massachusetts",
    "abbreviation": "MA"
  },
  {
    "name": "Michigan",
    "abbreviation": "MI"
  },
  {
    "name": "Minnesota",
    "abbreviation": "MN"
  },
  {
    "name": "Mississippi",
    "abbreviation": "MS"
  },
  {
    "name": "Missouri",
    "abbreviation": "MO"
  },
  {
    "name": "Montana",
    "abbreviation": "MT"
  },
  {
    "name": "Nebraska",
    "abbreviation": "NE"
  },
  {
    "name": "Nevada",
    "abbreviation": "NV"
  },
  {
    "name": "New Hampshire",
    "abbreviation": "NH"
  },
  {
    "name": "New Jersey",
    "abbreviation": "NJ"
  },
  {
    "name": "New Mexico",
    "abbreviation": "NM"
  },
  {
    "name": "New York",
    "abbreviation": "NY"
  },
  {
    "name": "North Carolina",
    "abbreviation": "NC"
  },
  {
    "name": "North Dakota",
    "abbreviation": "ND"
  },
  {
    "name": "Ohio",
    "abbreviation": "OH"
  },
  {
    "name": "Oklahoma",
    "abbreviation": "OK"
  },
  {
    "name": "Oregon",
    "abbreviation": "OR"
  },
  {
    "name": "Palau",
    "abbreviation": "PW"
  },
  {
    "name": "Pennsylvania",
    "abbreviation": "PA"
  },
  {
    "name": "Puerto Rico",
    "abbreviation": "PR"
  },
  {
    "name": "Rhode Island",
    "abbreviation": "RI"
  },
  {
    "name": "South Carolina",
    "abbreviation": "SC"
  },
  {
    "name": "South Dakota",
    "abbreviation": "SD"
  },
  {
    "name": "Tennessee",
    "abbreviation": "TN"
  },
  {
    "name": "Texas",
    "abbreviation": "TX"
  },
  {
    "name": "Utah",
    "abbreviation": "UT"
  },
  {
    "name": "Vermont",
    "abbreviation": "VT"
  },
  {
    "name": "Virginia",
    "abbreviation": "VA"
  },
  {
    "name": "Washington",
    "abbreviation": "WA"
  },
  {
    "name": "West Virginia",
    "abbreviation": "WV"
  },
  {
    "name": "Wisconsin",
    "abbreviation": "WI"
  },
  {
    "name": "Wyoming",
    "abbreviation": "WY"
  }
]

Bulk Insert#

We’ll use our fixture data to demonstrate a bulk insert operation. This operation allows you to add multiple records to your database in a single transaction, improving performance when working with larger data sets.

app.py#
1        # 1) Load the JSON data into the US States table.
2        repo = USStateRepository(session=db_session)
3        fixture = open_fixture(here, USStateRepository.model_type.__tablename__)  # type: ignore
4        objs = repo.add_many([USStateRepository.model_type(**raw_obj) for raw_obj in fixture])
5        db_session.commit()
6        console.print(f"Created {len(objs)} new objects.")

Paginated Data Selection#

Next, let’s explore how to select multiple records with pagination. This functionality is useful for handling large amounts of data by breaking the data into manageable ‘pages’ or subsets. LimitOffset is one of several filter types you can use with the repository.

app.py#
1        # 2) Select paginated data and total row count.
2        created_objs, total_objs = repo.list_and_count(LimitOffset(limit=10, offset=0))
3        console.print(f"Selected {len(created_objs)} records out of a total of {total_objs}.")

Bulk Delete#

Here we demonstrate how to perform a bulk delete operation. Just as with the bulk insert, deleting multiple records with the batch record methods is more efficient than executing row-by-row.

app.py#
1        # 3) Let's remove the batch of records selected.
2        deleted_objs = repo.delete_many([new_obj.id for new_obj in created_objs])
3        console.print(f"Removed {len(deleted_objs)} records out of a total of {total_objs}.")

Counts#

Finally, we’ll demonstrate how to count the number of records remaining in the database.

app.py#
1        # 4) Let's count the remaining rows
2        remaining_count = repo.count()
3        console.print(f"Found {remaining_count} remaining records after delete.")

Now that we have demonstrated how to interact with the repository objects outside of a Litestar application, our next example will use dependency injection to add this functionality to a Controller!

Full Code#

Full Code (click to expand)
app.py#
  1from __future__ import annotations
  2
  3from contextlib import asynccontextmanager
  4from datetime import date, datetime
  5from typing import AsyncIterator
  6from uuid import UUID
  7
  8import anyio
  9from rich import get_console
 10from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
 11from sqlalchemy.orm import Mapped
 12
 13from litestar.contrib.sqlalchemy.base import UUIDBase
 14from litestar.contrib.sqlalchemy.repository import SQLAlchemyAsyncRepository
 15
 16console = get_console()
 17
 18
 19# the SQLAlchemy base includes a declarative model for you to use in your models.
 20# The `Base` class includes a `UUID` based primary key (`id`)
 21class Author(UUIDBase):
 22    name: Mapped[str]
 23    dob: Mapped[date]
 24    dod: Mapped[date | None]
 25
 26
 27class AuthorRepository(SQLAlchemyAsyncRepository[Author]):
 28    """Author repository."""
 29
 30    model_type = Author
 31
 32
 33engine = create_async_engine(
 34    "sqlite+aiosqlite:///test.sqlite",
 35    future=True,
 36)
 37session_factory = async_sessionmaker(engine, expire_on_commit=False)
 38
 39
 40# let's make a simple context manager as an example here.
 41@asynccontextmanager
 42async def repository_factory() -> AsyncIterator[AuthorRepository]:
 43    async with session_factory() as db_session:
 44        try:
 45            yield AuthorRepository(session=db_session)
 46        except Exception:  # noqa: BLE001
 47            await db_session.rollback()
 48        else:
 49            await db_session.commit()
 50
 51
 52async def create_author() -> Author:
 53    async with repository_factory() as repo:
 54        obj = await repo.add(
 55            Author(
 56                name="F. Scott Fitzgerald",
 57                dob=datetime.strptime("1896-09-24", "%Y-%m-%d").date(),
 58            )
 59        )
 60        console.print(f"Created Author record for {obj.name} with primary key {obj.id}.")
 61        return obj
 62
 63
 64async def update_author(obj: Author) -> Author:
 65    async with repository_factory() as repo:
 66        obj = await repo.update(obj)
 67        console.print(f"Updated Author record for {obj.name} with primary key {obj.id}.")
 68        return obj
 69
 70
 71async def remove_author(id: UUID) -> Author:
 72    async with repository_factory() as repo:
 73        obj = await repo.delete(id)
 74        console.print(f"Deleted Author record for {obj.name} with primary key {obj.id}.")
 75        return obj
 76
 77
 78async def get_author_if_exists(id: UUID) -> Author | None:
 79    async with repository_factory() as repo:
 80        obj = await repo.get_one_or_none(id=id)
 81        if obj is not None:
 82            console.print(f"Found Author record for {obj.name} with primary key {obj.id}.")
 83        else:
 84            console.print(f"Could not find Author with primary key {id}.")
 85        return obj
 86
 87
 88async def run_script() -> None:
 89    """Load data from a fixture."""
 90    async with engine.begin() as conn:
 91        await conn.run_sync(UUIDBase.metadata.create_all)
 92
 93    # 1) create a new Author record.
 94    console.print("1) Adding a new record")
 95    author = await create_author()
 96    author_id = author.id
 97
 98    # 2) Let's update the Author record.
 99    console.print("2) Updating a record.")
100    author.dod = datetime.strptime("1940-12-21", "%Y-%m-%d").date()
101    await update_author(author)
102
103    # 3) Let's delete the record we just created.
104    console.print("3) Removing a record.")
105    await remove_author(author_id)
106
107    # 4) Let's verify the record no longer exists.
108    console.print("4) Select one or none.")
109    _should_be_none = await get_author_if_exists(author_id)
110
111
112if __name__ == "__main__":
113    anyio.run(run_script)
app.py#
  1from __future__ import annotations
  2
  3from contextlib import asynccontextmanager
  4from datetime import date, datetime
  5from collections.abc import AsyncIterator
  6from uuid import UUID
  7
  8import anyio
  9from rich import get_console
 10from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
 11from sqlalchemy.orm import Mapped
 12
 13from litestar.contrib.sqlalchemy.base import UUIDBase
 14from litestar.contrib.sqlalchemy.repository import SQLAlchemyAsyncRepository
 15
 16console = get_console()
 17
 18
 19# the SQLAlchemy base includes a declarative model for you to use in your models.
 20# The `Base` class includes a `UUID` based primary key (`id`)
 21class Author(UUIDBase):
 22    name: Mapped[str]
 23    dob: Mapped[date]
 24    dod: Mapped[date | None]
 25
 26
 27class AuthorRepository(SQLAlchemyAsyncRepository[Author]):
 28    """Author repository."""
 29
 30    model_type = Author
 31
 32
 33engine = create_async_engine(
 34    "sqlite+aiosqlite:///test.sqlite",
 35    future=True,
 36)
 37session_factory = async_sessionmaker(engine, expire_on_commit=False)
 38
 39
 40# let's make a simple context manager as an example here.
 41@asynccontextmanager
 42async def repository_factory() -> AsyncIterator[AuthorRepository]:
 43    async with session_factory() as db_session:
 44        try:
 45            yield AuthorRepository(session=db_session)
 46        except Exception:  # noqa: BLE001
 47            await db_session.rollback()
 48        else:
 49            await db_session.commit()
 50
 51
 52async def create_author() -> Author:
 53    async with repository_factory() as repo:
 54        obj = await repo.add(
 55            Author(
 56                name="F. Scott Fitzgerald",
 57                dob=datetime.strptime("1896-09-24", "%Y-%m-%d").date(),
 58            )
 59        )
 60        console.print(f"Created Author record for {obj.name} with primary key {obj.id}.")
 61        return obj
 62
 63
 64async def update_author(obj: Author) -> Author:
 65    async with repository_factory() as repo:
 66        obj = await repo.update(obj)
 67        console.print(f"Updated Author record for {obj.name} with primary key {obj.id}.")
 68        return obj
 69
 70
 71async def remove_author(id: UUID) -> Author:
 72    async with repository_factory() as repo:
 73        obj = await repo.delete(id)
 74        console.print(f"Deleted Author record for {obj.name} with primary key {obj.id}.")
 75        return obj
 76
 77
 78async def get_author_if_exists(id: UUID) -> Author | None:
 79    async with repository_factory() as repo:
 80        obj = await repo.get_one_or_none(id=id)
 81        if obj is not None:
 82            console.print(f"Found Author record for {obj.name} with primary key {obj.id}.")
 83        else:
 84            console.print(f"Could not find Author with primary key {id}.")
 85        return obj
 86
 87
 88async def run_script() -> None:
 89    """Load data from a fixture."""
 90    async with engine.begin() as conn:
 91        await conn.run_sync(UUIDBase.metadata.create_all)
 92
 93    # 1) create a new Author record.
 94    console.print("1) Adding a new record")
 95    author = await create_author()
 96    author_id = author.id
 97
 98    # 2) Let's update the Author record.
 99    console.print("2) Updating a record.")
100    author.dod = datetime.strptime("1940-12-21", "%Y-%m-%d").date()
101    await update_author(author)
102
103    # 3) Let's delete the record we just created.
104    console.print("3) Removing a record.")
105    await remove_author(author_id)
106
107    # 4) Let's verify the record no longer exists.
108    console.print("4) Select one or none.")
109    _should_be_none = await get_author_if_exists(author_id)
110
111
112if __name__ == "__main__":
113    anyio.run(run_script)