Interacting with repositories#
Now that we’ve covered the modeling basics, we are able to create our first repository class. The repository classes include all of the standard CRUD operations as well as a few advanced features such as pagination, filtering and bulk operations.
Tip
The full code for this tutorial can be found below in the Full Code section.
Before we jump in to the code, let’s take a look at the available functions available in the the synchronous and asynchronous repositories.
Available Functions in the Repositories
Function |
Category |
Description |
---|---|---|
|
Selecting Data |
Select a single record by primary key. Raises an exception when no record is found. |
|
Selecting Data |
Select a single record specified by the |
|
Selecting Data |
Select a single record specified by the |
|
Selecting Data |
Select a list of records specified by the |
|
Selecting Data |
Select a list of records specified by the |
|
Creating Data |
Select a single record specified by the the |
|
Creating Data |
Create a new record in the database. |
|
Creating Data |
Create one or more rows in the database. |
|
Updating Data |
Update an existing record in the database. |
|
Updating Data |
Update one or more rows in the database. |
|
Updating Data |
A single operation that updates or inserts a record based whether or not the primary key value on the model object is populated. |
|
Updating Data |
Updates or inserts multiple records based whether or not the primary key value on the model object is populated. |
|
Removing Data |
Remove a single record from the database. |
|
Removing Data |
Remove one or more records from the database. |
Note
All three of the bulk DML operations will leverage dialect-specific enhancements to be as efficient as possible. In addition to using efficient bulk inserts binds, the repository will optionally leverage the multi-row
RETURNING
support where possible. The repository will automatically detect this support from the SQLAlchemy driver, so no additional interaction is required to enable this.SQL engines generally have a limit to the number of elements that can be appended into an IN clause. The repository operations will automatically break lists that exceed this limit into multiple queries that are concatenated together before return. You do not need to account for this in your own code.
In the following examples, we’ll cover a few ways that you can use the repository within your applications.
Model Repository#
Here we import the
SQLAlchemyAsyncRepository
class and create an AuthorRepository
repository class. This is all that’s required
to include all of the integrated repository features.
1from litestar.contrib.sqlalchemy.repository import SQLAlchemyAsyncRepository
2class AuthorRepository(SQLAlchemyAsyncRepository[Author]):
3 """Author repository."""
4
5 model_type = Author
Repository Context Manager#
Since we’ll be using the repository outside of a Litestar application in this script, we’ll make a simple context manager to automatically handle the creation (and cleanup) of our Author repository.
- The
repository_factory
method will do the following for us: Automatically create a new DB session from the SQLAlchemy configuration.
Rollback session when any exception occurs.
Automatically commit after function call completes.
1# let's make a simple context manager as an example here.
2@asynccontextmanager
3async def repository_factory() -> AsyncIterator[AuthorRepository]:
4 async with session_factory() as db_session:
5 try:
6 yield AuthorRepository(session=db_session)
7 except Exception: # noqa: BLE001
8 await db_session.rollback()
Creating, Updating and Removing Data#
To illustrate a few ways you can manipulate data in your database, we’ll go through the various CRUD operations:
Creating Data: Here’s a simple insert operation to populate our new Author table:
1async def create_author() -> Author: 2 async with repository_factory() as repo: 3 obj = await repo.add( 4 Author( 5 name="F. Scott Fitzgerald", 6 dob=datetime.strptime("1896-09-24", "%Y-%m-%d").date(), 7 ) 8 ) 9 console.print(f"Created Author record for {obj.name} with primary key {obj.id}.") 10 return obj
Updating Data: The update
method will ensure any updates made to the model object are executed on the database:
1async def update_author(obj: Author) -> Author: 2 async with repository_factory() as repo: 3 obj = await repo.update(obj) 4 console.print(f"Updated Author record for {obj.name} with primary key {obj.id}.") 5 return obj
Removing Data: The remove
method accepts the primary key of the row you want to delete:
1async def remove_author(id: UUID) -> Author: 2 async with repository_factory() as repo: 3 obj = await repo.delete(id) 4 console.print(f"Deleted Author record for {obj.name} with primary key {obj.id}.") 5 return obj
Now that we’ve seen how to do single-row operations, let’s look at the bulk methods we can use.
Working with Bulk Data Operations#
In this section, we delve into the powerful capabilities of the repository classes for handling bulk data operations. Our example illustrates how we can efficiently manage data in our database. Specifically, we’ll use a JSON file containing information about US states and their abbreviations.
Here’s what we’re going to cover:
Fixture Data Loading#
We will introduce a method for loading fixture data. Fixture data is sample data that populates your database and helps test the behavior of your application under realistic conditions. This pattern can be extended and adjusted to meet your needs.
1def open_fixture(fixtures_path: Path, fixture_name: str) -> Any:
2 """Loads JSON file with the specified fixture name
3
4 Args:
5 fixtures_path (Path): The path to look for fixtures
6 fixture_name (str): The fixture name to load.
7
8 Raises:
9 FileNotFoundError: Fixtures not found.
10
11 Returns:
12 Any: The parsed JSON data
13 """
14 fixture = Path(fixtures_path / f"{fixture_name}.json")
15 if fixture.exists():
16 with fixture.open(mode="r", encoding="utf-8") as f:
17 f_data = f.read()
18 return json.loads(f_data)
19 raise FileNotFoundError(f"Could not find the {fixture_name} fixture")
You can review the JSON source file here:
US State Lookup JSON
You can download it: /examples/contrib/sqlalchemy/us_state_lookup.json
or view below:
[
{
"name": "Alabama",
"abbreviation": "AL"
},
{
"name": "Alaska",
"abbreviation": "AK"
},
{
"name": "Arizona",
"abbreviation": "AZ"
},
{
"name": "Arkansas",
"abbreviation": "AR"
},
{
"name": "California",
"abbreviation": "CA"
},
{
"name": "Colorado",
"abbreviation": "CO"
},
{
"name": "Connecticut",
"abbreviation": "CT"
},
{
"name": "Delaware",
"abbreviation": "DE"
},
{
"name": "District Of Columbia",
"abbreviation": "DC"
},
{
"name": "Florida",
"abbreviation": "FL"
},
{
"name": "Georgia",
"abbreviation": "GA"
},
{
"name": "Guam",
"abbreviation": "GU"
},
{
"name": "Hawaii",
"abbreviation": "HI"
},
{
"name": "Idaho",
"abbreviation": "ID"
},
{
"name": "Illinois",
"abbreviation": "IL"
},
{
"name": "Indiana",
"abbreviation": "IN"
},
{
"name": "Iowa",
"abbreviation": "IA"
},
{
"name": "Kansas",
"abbreviation": "KS"
},
{
"name": "Kentucky",
"abbreviation": "KY"
},
{
"name": "Louisiana",
"abbreviation": "LA"
},
{
"name": "Maine",
"abbreviation": "ME"
},
{
"name": "Maryland",
"abbreviation": "MD"
},
{
"name": "Massachusetts",
"abbreviation": "MA"
},
{
"name": "Michigan",
"abbreviation": "MI"
},
{
"name": "Minnesota",
"abbreviation": "MN"
},
{
"name": "Mississippi",
"abbreviation": "MS"
},
{
"name": "Missouri",
"abbreviation": "MO"
},
{
"name": "Montana",
"abbreviation": "MT"
},
{
"name": "Nebraska",
"abbreviation": "NE"
},
{
"name": "Nevada",
"abbreviation": "NV"
},
{
"name": "New Hampshire",
"abbreviation": "NH"
},
{
"name": "New Jersey",
"abbreviation": "NJ"
},
{
"name": "New Mexico",
"abbreviation": "NM"
},
{
"name": "New York",
"abbreviation": "NY"
},
{
"name": "North Carolina",
"abbreviation": "NC"
},
{
"name": "North Dakota",
"abbreviation": "ND"
},
{
"name": "Ohio",
"abbreviation": "OH"
},
{
"name": "Oklahoma",
"abbreviation": "OK"
},
{
"name": "Oregon",
"abbreviation": "OR"
},
{
"name": "Palau",
"abbreviation": "PW"
},
{
"name": "Pennsylvania",
"abbreviation": "PA"
},
{
"name": "Puerto Rico",
"abbreviation": "PR"
},
{
"name": "Rhode Island",
"abbreviation": "RI"
},
{
"name": "South Carolina",
"abbreviation": "SC"
},
{
"name": "South Dakota",
"abbreviation": "SD"
},
{
"name": "Tennessee",
"abbreviation": "TN"
},
{
"name": "Texas",
"abbreviation": "TX"
},
{
"name": "Utah",
"abbreviation": "UT"
},
{
"name": "Vermont",
"abbreviation": "VT"
},
{
"name": "Virginia",
"abbreviation": "VA"
},
{
"name": "Washington",
"abbreviation": "WA"
},
{
"name": "West Virginia",
"abbreviation": "WV"
},
{
"name": "Wisconsin",
"abbreviation": "WI"
},
{
"name": "Wyoming",
"abbreviation": "WY"
}
]
Bulk Insert#
We’ll use our fixture data to demonstrate a bulk insert operation. This operation allows you to add multiple records to your database in a single transaction, improving performance when working with larger data sets.
1 # 1) Load the JSON data into the US States table.
2 repo = USStateRepository(session=db_session)
3 fixture = open_fixture(here, USStateRepository.model_type.__tablename__) # type: ignore
4 objs = repo.add_many([USStateRepository.model_type(**raw_obj) for raw_obj in fixture])
5 db_session.commit()
6 console.print(f"Created {len(objs)} new objects.")
Paginated Data Selection#
Next, let’s explore how to select multiple records with pagination. This functionality
is useful for handling large amounts of data by breaking the data into manageable
‘pages’ or subsets. LimitOffset
is one of several filter types you can use with the
repository.
1 # 2) Select paginated data and total row count.
2 created_objs, total_objs = repo.list_and_count(LimitOffset(limit=10, offset=0))
3 console.print(f"Selected {len(created_objs)} records out of a total of {total_objs}.")
Bulk Delete#
Here we demonstrate how to perform a bulk delete operation. Just as with the bulk insert, deleting multiple records with the batch record methods is more efficient than executing row-by-row.
1 # 3) Let's remove the batch of records selected.
2 deleted_objs = repo.delete_many([new_obj.id for new_obj in created_objs])
3 console.print(f"Removed {len(deleted_objs)} records out of a total of {total_objs}.")
Counts#
Finally, we’ll demonstrate how to count the number of records remaining in the database.
1 # 4) Let's count the remaining rows
2 remaining_count = repo.count()
3 console.print(f"Found {remaining_count} remaining records after delete.")
Now that we have demonstrated how to interact with the repository objects outside of a
Litestar application, our next example will use dependency injection to add this
functionality to a Controller
!
Full Code#
Full Code (click to expand)
1from __future__ import annotations
2
3from contextlib import asynccontextmanager
4from datetime import date, datetime
5from typing import AsyncIterator
6from uuid import UUID
7
8import anyio
9from rich import get_console
10from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
11from sqlalchemy.orm import Mapped
12
13from litestar.contrib.sqlalchemy.base import UUIDBase
14from litestar.contrib.sqlalchemy.repository import SQLAlchemyAsyncRepository
15
16console = get_console()
17
18
19# the SQLAlchemy base includes a declarative model for you to use in your models.
20# The `Base` class includes a `UUID` based primary key (`id`)
21class Author(UUIDBase):
22 name: Mapped[str]
23 dob: Mapped[date]
24 dod: Mapped[date | None]
25
26
27class AuthorRepository(SQLAlchemyAsyncRepository[Author]):
28 """Author repository."""
29
30 model_type = Author
31
32
33engine = create_async_engine(
34 "sqlite+aiosqlite:///test.sqlite",
35 future=True,
36)
37session_factory = async_sessionmaker(engine, expire_on_commit=False)
38
39
40# let's make a simple context manager as an example here.
41@asynccontextmanager
42async def repository_factory() -> AsyncIterator[AuthorRepository]:
43 async with session_factory() as db_session:
44 try:
45 yield AuthorRepository(session=db_session)
46 except Exception: # noqa: BLE001
47 await db_session.rollback()
48 else:
49 await db_session.commit()
50
51
52async def create_author() -> Author:
53 async with repository_factory() as repo:
54 obj = await repo.add(
55 Author(
56 name="F. Scott Fitzgerald",
57 dob=datetime.strptime("1896-09-24", "%Y-%m-%d").date(),
58 )
59 )
60 console.print(f"Created Author record for {obj.name} with primary key {obj.id}.")
61 return obj
62
63
64async def update_author(obj: Author) -> Author:
65 async with repository_factory() as repo:
66 obj = await repo.update(obj)
67 console.print(f"Updated Author record for {obj.name} with primary key {obj.id}.")
68 return obj
69
70
71async def remove_author(id: UUID) -> Author:
72 async with repository_factory() as repo:
73 obj = await repo.delete(id)
74 console.print(f"Deleted Author record for {obj.name} with primary key {obj.id}.")
75 return obj
76
77
78async def get_author_if_exists(id: UUID) -> Author | None:
79 async with repository_factory() as repo:
80 obj = await repo.get_one_or_none(id=id)
81 if obj is not None:
82 console.print(f"Found Author record for {obj.name} with primary key {obj.id}.")
83 else:
84 console.print(f"Could not find Author with primary key {id}.")
85 return obj
86
87
88async def run_script() -> None:
89 """Load data from a fixture."""
90 async with engine.begin() as conn:
91 await conn.run_sync(UUIDBase.metadata.create_all)
92
93 # 1) create a new Author record.
94 console.print("1) Adding a new record")
95 author = await create_author()
96 author_id = author.id
97
98 # 2) Let's update the Author record.
99 console.print("2) Updating a record.")
100 author.dod = datetime.strptime("1940-12-21", "%Y-%m-%d").date()
101 await update_author(author)
102
103 # 3) Let's delete the record we just created.
104 console.print("3) Removing a record.")
105 await remove_author(author_id)
106
107 # 4) Let's verify the record no longer exists.
108 console.print("4) Select one or none.")
109 _should_be_none = await get_author_if_exists(author_id)
110
111
112if __name__ == "__main__":
113 anyio.run(run_script)
1from __future__ import annotations
2
3from contextlib import asynccontextmanager
4from datetime import date, datetime
5from collections.abc import AsyncIterator
6from uuid import UUID
7
8import anyio
9from rich import get_console
10from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
11from sqlalchemy.orm import Mapped
12
13from litestar.contrib.sqlalchemy.base import UUIDBase
14from litestar.contrib.sqlalchemy.repository import SQLAlchemyAsyncRepository
15
16console = get_console()
17
18
19# the SQLAlchemy base includes a declarative model for you to use in your models.
20# The `Base` class includes a `UUID` based primary key (`id`)
21class Author(UUIDBase):
22 name: Mapped[str]
23 dob: Mapped[date]
24 dod: Mapped[date | None]
25
26
27class AuthorRepository(SQLAlchemyAsyncRepository[Author]):
28 """Author repository."""
29
30 model_type = Author
31
32
33engine = create_async_engine(
34 "sqlite+aiosqlite:///test.sqlite",
35 future=True,
36)
37session_factory = async_sessionmaker(engine, expire_on_commit=False)
38
39
40# let's make a simple context manager as an example here.
41@asynccontextmanager
42async def repository_factory() -> AsyncIterator[AuthorRepository]:
43 async with session_factory() as db_session:
44 try:
45 yield AuthorRepository(session=db_session)
46 except Exception: # noqa: BLE001
47 await db_session.rollback()
48 else:
49 await db_session.commit()
50
51
52async def create_author() -> Author:
53 async with repository_factory() as repo:
54 obj = await repo.add(
55 Author(
56 name="F. Scott Fitzgerald",
57 dob=datetime.strptime("1896-09-24", "%Y-%m-%d").date(),
58 )
59 )
60 console.print(f"Created Author record for {obj.name} with primary key {obj.id}.")
61 return obj
62
63
64async def update_author(obj: Author) -> Author:
65 async with repository_factory() as repo:
66 obj = await repo.update(obj)
67 console.print(f"Updated Author record for {obj.name} with primary key {obj.id}.")
68 return obj
69
70
71async def remove_author(id: UUID) -> Author:
72 async with repository_factory() as repo:
73 obj = await repo.delete(id)
74 console.print(f"Deleted Author record for {obj.name} with primary key {obj.id}.")
75 return obj
76
77
78async def get_author_if_exists(id: UUID) -> Author | None:
79 async with repository_factory() as repo:
80 obj = await repo.get_one_or_none(id=id)
81 if obj is not None:
82 console.print(f"Found Author record for {obj.name} with primary key {obj.id}.")
83 else:
84 console.print(f"Could not find Author with primary key {id}.")
85 return obj
86
87
88async def run_script() -> None:
89 """Load data from a fixture."""
90 async with engine.begin() as conn:
91 await conn.run_sync(UUIDBase.metadata.create_all)
92
93 # 1) create a new Author record.
94 console.print("1) Adding a new record")
95 author = await create_author()
96 author_id = author.id
97
98 # 2) Let's update the Author record.
99 console.print("2) Updating a record.")
100 author.dod = datetime.strptime("1940-12-21", "%Y-%m-%d").date()
101 await update_author(author)
102
103 # 3) Let's delete the record we just created.
104 console.print("3) Removing a record.")
105 await remove_author(author_id)
106
107 # 4) Let's verify the record no longer exists.
108 console.print("4) Select one or none.")
109 _should_be_none = await get_author_if_exists(author_id)
110
111
112if __name__ == "__main__":
113 anyio.run(run_script)