SQLAlchemy Init Plugin#

The SQLAlchemyInitPlugin adds functionality to the application that supports using Litestar with SQLAlchemy.

The plugin:

  • Makes the SQLAlchemy engine and session available via dependency injection.

  • Manages the SQLAlchemy engine and session factory in the application’s state.

  • Configures a before_send handler that is called before sending a response.

  • Includes relevant names in the signature namespace to aid resolving annotated types.

Dependencies#

The plugin makes the engine and session available for injection.

SQLAlchemy Async Dependencies#
 1from __future__ import annotations
 2
 3from typing import TYPE_CHECKING
 4
 5from sqlalchemy import select
 6
 7from litestar import Litestar, post
 8from litestar.contrib.sqlalchemy.plugins import SQLAlchemyAsyncConfig, SQLAlchemyInitPlugin
 9
10if TYPE_CHECKING:
11    from typing import Tuple
12
13    from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession
14
15
16@post("/")
17async def handler(db_session: AsyncSession, db_engine: AsyncEngine) -> Tuple[int, int]:
18    one = (await db_session.execute(select(1))).scalar()
19
20    async with db_engine.begin() as conn:
21        two = (await conn.execute(select(2))).scalar()
22
23    return one, two
24
25
26config = SQLAlchemyAsyncConfig(connection_string="sqlite+aiosqlite:///async.sqlite")
27plugin = SQLAlchemyInitPlugin(config=config)
28app = Litestar(route_handlers=[handler], plugins=[plugin])
SQLAlchemy Async Dependencies#
 1from __future__ import annotations
 2
 3from typing import TYPE_CHECKING
 4
 5from sqlalchemy import select
 6
 7from litestar import Litestar, post
 8from litestar.contrib.sqlalchemy.plugins import SQLAlchemyAsyncConfig, SQLAlchemyInitPlugin
 9
10if TYPE_CHECKING:
11
12    from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession
13
14
15@post("/")
16async def handler(db_session: AsyncSession, db_engine: AsyncEngine) -> tuple[int, int]:
17    one = (await db_session.execute(select(1))).scalar()
18
19    async with db_engine.begin() as conn:
20        two = (await conn.execute(select(2))).scalar()
21
22    return one, two
23
24
25config = SQLAlchemyAsyncConfig(connection_string="sqlite+aiosqlite:///async.sqlite")
26plugin = SQLAlchemyInitPlugin(config=config)
27app = Litestar(route_handlers=[handler], plugins=[plugin])
SQLAlchemy Sync Dependencies#
 1from __future__ import annotations
 2
 3from typing import TYPE_CHECKING
 4
 5from sqlalchemy import select
 6
 7from litestar import Litestar, post
 8from litestar.contrib.sqlalchemy.plugins import SQLAlchemyInitPlugin, SQLAlchemySyncConfig
 9
10if TYPE_CHECKING:
11    from typing import Tuple
12
13    from sqlalchemy import Engine
14    from sqlalchemy.orm import Session
15
16
17@post("/", sync_to_thread=True)
18def handler(db_session: Session, db_engine: Engine) -> Tuple[int, int]:
19    one = db_session.execute(select(1)).scalar()
20
21    with db_engine.begin() as conn:
22        two = conn.execute(select(2)).scalar()
23
24    return one, two
25
26
27config = SQLAlchemySyncConfig(connection_string="sqlite:///sync.sqlite")
28plugin = SQLAlchemyInitPlugin(config=config)
29app = Litestar(route_handlers=[handler], plugins=[plugin])
SQLAlchemy Sync Dependencies#
 1from __future__ import annotations
 2
 3from typing import TYPE_CHECKING
 4
 5from sqlalchemy import select
 6
 7from litestar import Litestar, post
 8from litestar.contrib.sqlalchemy.plugins import SQLAlchemyInitPlugin, SQLAlchemySyncConfig
 9
10if TYPE_CHECKING:
11
12    from sqlalchemy import Engine
13    from sqlalchemy.orm import Session
14
15
16@post("/", sync_to_thread=True)
17def handler(db_session: Session, db_engine: Engine) -> tuple[int, int]:
18    one = db_session.execute(select(1)).scalar()
19
20    with db_engine.begin() as conn:
21        two = conn.execute(select(2)).scalar()
22
23    return one, two
24
25
26config = SQLAlchemySyncConfig(connection_string="sqlite:///sync.sqlite")
27plugin = SQLAlchemyInitPlugin(config=config)
28app = Litestar(route_handlers=[handler], plugins=[plugin])

The above example illustrates how to access the engine and session in the handler, and like all other dependencies, they can also be injected into other dependency functions.

Renaming the dependencies#

You can change the name that the engine and session are bound to by setting the engine_dependency_key and session_dependency_key attributes on the plugin configuration.

Configuring the before send handler#

The plugin configures a before_send handler that is called before sending a response. The default handler closes the session and removes it from the connection scope.

You can change the handler by setting the before_send_handler attribute on the configuration object. For example, an alternate handler is available that will also commit the session on success and rollback upon failure.

SQLAlchemy Async Before Send Handler#
 1from __future__ import annotations
 2
 3from litestar import Litestar
 4from litestar.contrib.sqlalchemy.plugins import SQLAlchemyAsyncConfig, SQLAlchemyInitPlugin
 5from litestar.contrib.sqlalchemy.plugins.init.config.asyncio import autocommit_before_send_handler
 6
 7config = SQLAlchemyAsyncConfig(
 8    connection_string="sqlite+aiosqlite:///:memory:",
 9    before_send_handler=autocommit_before_send_handler,
10)
11plugin = SQLAlchemyInitPlugin(config=config)
12
13app = Litestar(route_handlers=[], plugins=[plugin])
SQLAlchemy Sync Before Send Handler#
 1from __future__ import annotations
 2
 3from litestar import Litestar
 4from litestar.contrib.sqlalchemy.plugins import SQLAlchemyInitPlugin, SQLAlchemySyncConfig
 5from litestar.contrib.sqlalchemy.plugins.init.config.sync import autocommit_before_send_handler
 6
 7config = SQLAlchemySyncConfig(
 8    connection_string="sqlite:///:memory:",
 9    before_send_handler=autocommit_before_send_handler,
10)
11plugin = SQLAlchemyInitPlugin(config=config)
12
13app = Litestar(route_handlers=[], plugins=[plugin])

Configuring the plugins#

Both the SQLAlchemyAsyncConfig and the SQLAlchemySyncConfig have an engine_config attribute that is used to configure the engine. The engine_config attribute is an instance of EngineConfig and exposes all of the configuration options available to the SQLAlchemy engine.

The SQLAlchemyAsyncConfig class and the SQLAlchemySyncConfig class also have a session_config attribute that is used to configure the session. This is either an instance of AsyncSessionConfig or SyncSessionConfig depending on the type of config object. These classes expose all of the configuration options available to the SQLAlchemy session.

Finally, the SQLAlchemyAsyncConfig class and the SQLAlchemySyncConfig class expose configuration options to control their behavior.

Consult the reference documentation for more information.

Example#

The below example is a complete demonstration of use of the init plugin. Readers who are familiar with the prior section may note the additional complexity involved in managing the conversion to and from SQLAlchemy objects within the handlers. Read on to see how this increased complexity is efficiently handled by the SQLAlchemySerializationPlugin.

SQLAlchemy Async Init Plugin Example#
 1from __future__ import annotations
 2
 3from typing import TYPE_CHECKING
 4
 5from sqlalchemy import select
 6from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
 7
 8from litestar import Litestar, post
 9from litestar.contrib.sqlalchemy.plugins import SQLAlchemyAsyncConfig, SQLAlchemyInitPlugin
10
11if TYPE_CHECKING:
12    from typing import Any, Dict, List
13
14    from sqlalchemy.ext.asyncio import AsyncSession
15
16
17class Base(DeclarativeBase): ...
18
19
20class TodoItem(Base):
21    __tablename__ = "todo_item"
22    title: Mapped[str] = mapped_column(primary_key=True)
23    done: Mapped[bool]
24
25
26@post("/")
27async def add_item(data: Dict[str, Any], db_session: AsyncSession) -> List[Dict[str, Any]]:
28    todo_item = TodoItem(**data)
29    async with db_session.begin():
30        db_session.add(todo_item)
31    return [
32        {
33            "title": item.title,
34            "done": item.done,
35        }
36        for item in (await db_session.execute(select(TodoItem))).scalars()
37    ]
38
39
40async def init_db(app: Litestar) -> None:
41    async with app.state.db_engine.begin() as conn:
42        await conn.run_sync(Base.metadata.drop_all)
43        await conn.run_sync(Base.metadata.create_all)
44
45
46config = SQLAlchemyAsyncConfig(connection_string="sqlite+aiosqlite:///todo_async.sqlite")
47plugin = SQLAlchemyInitPlugin(config=config)
48app = Litestar(route_handlers=[add_item], plugins=[plugin], on_startup=[init_db])
SQLAlchemy Async Init Plugin Example#
 1from __future__ import annotations
 2
 3from typing import TYPE_CHECKING
 4
 5from sqlalchemy import select
 6from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
 7
 8from litestar import Litestar, post
 9from litestar.contrib.sqlalchemy.plugins import SQLAlchemyAsyncConfig, SQLAlchemyInitPlugin
10
11if TYPE_CHECKING:
12    from typing import Any
13
14    from sqlalchemy.ext.asyncio import AsyncSession
15
16
17class Base(DeclarativeBase): ...
18
19
20class TodoItem(Base):
21    __tablename__ = "todo_item"
22    title: Mapped[str] = mapped_column(primary_key=True)
23    done: Mapped[bool]
24
25
26@post("/")
27async def add_item(data: dict[str, Any], db_session: AsyncSession) -> list[dict[str, Any]]:
28    todo_item = TodoItem(**data)
29    async with db_session.begin():
30        db_session.add(todo_item)
31    return [
32        {
33            "title": item.title,
34            "done": item.done,
35        }
36        for item in (await db_session.execute(select(TodoItem))).scalars()
37    ]
38
39
40async def init_db(app: Litestar) -> None:
41    async with app.state.db_engine.begin() as conn:
42        await conn.run_sync(Base.metadata.drop_all)
43        await conn.run_sync(Base.metadata.create_all)
44
45
46config = SQLAlchemyAsyncConfig(connection_string="sqlite+aiosqlite:///todo_async.sqlite")
47plugin = SQLAlchemyInitPlugin(config=config)
48app = Litestar(route_handlers=[add_item], plugins=[plugin], on_startup=[init_db])
SQLAlchemy Sync Init Plugin Example#
 1from __future__ import annotations
 2
 3from typing import TYPE_CHECKING
 4
 5from sqlalchemy import select
 6from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
 7
 8from litestar import Litestar, post
 9from litestar.contrib.sqlalchemy.plugins import SQLAlchemyInitPlugin, SQLAlchemySyncConfig
10
11if TYPE_CHECKING:
12    from typing import Any, Dict, List
13
14    from sqlalchemy.orm import Session
15
16
17class Base(DeclarativeBase): ...
18
19
20class TodoItem(Base):
21    __tablename__ = "todo_item"
22    title: Mapped[str] = mapped_column(primary_key=True)
23    done: Mapped[bool]
24
25
26@post("/", sync_to_thread=True)
27def add_item(data: Dict[str, Any], db_session: Session) -> List[Dict[str, Any]]:
28    todo_item = TodoItem(**data)
29    with db_session.begin():
30        db_session.add(todo_item)
31    return [
32        {
33            "title": item.title,
34            "done": item.done,
35        }
36        for item in db_session.execute(select(TodoItem)).scalars()
37    ]
38
39
40def init_db(app: Litestar) -> None:
41    Base.metadata.drop_all(app.state.db_engine)
42    Base.metadata.create_all(app.state.db_engine)
43
44
45config = SQLAlchemySyncConfig(connection_string="sqlite:///todo_sync.sqlite")
46plugin = SQLAlchemyInitPlugin(config=config)
47app = Litestar(route_handlers=[add_item], plugins=[plugin], on_startup=[init_db])
SQLAlchemy Sync Init Plugin Example#
 1from __future__ import annotations
 2
 3from typing import TYPE_CHECKING
 4
 5from sqlalchemy import select
 6from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
 7
 8from litestar import Litestar, post
 9from litestar.contrib.sqlalchemy.plugins import SQLAlchemyInitPlugin, SQLAlchemySyncConfig
10
11if TYPE_CHECKING:
12    from typing import Any
13
14    from sqlalchemy.orm import Session
15
16
17class Base(DeclarativeBase): ...
18
19
20class TodoItem(Base):
21    __tablename__ = "todo_item"
22    title: Mapped[str] = mapped_column(primary_key=True)
23    done: Mapped[bool]
24
25
26@post("/", sync_to_thread=True)
27def add_item(data: dict[str, Any], db_session: Session) -> list[dict[str, Any]]:
28    todo_item = TodoItem(**data)
29    with db_session.begin():
30        db_session.add(todo_item)
31    return [
32        {
33            "title": item.title,
34            "done": item.done,
35        }
36        for item in db_session.execute(select(TodoItem)).scalars()
37    ]
38
39
40def init_db(app: Litestar) -> None:
41    Base.metadata.drop_all(app.state.db_engine)
42    Base.metadata.create_all(app.state.db_engine)
43
44
45config = SQLAlchemySyncConfig(connection_string="sqlite:///todo_sync.sqlite")
46plugin = SQLAlchemyInitPlugin(config=config)
47app = Litestar(route_handlers=[add_item], plugins=[plugin], on_startup=[init_db])