Piccolo ORM#
Piccolo ORM is an easy-to-use async ORM and query builder and Litestar includes PiccoloDTO
class
for data flow control. Example of a small application:
from typing import List
from piccolo.columns import Boolean, Varchar
from piccolo.table import Table, create_db_tables
from litestar import Litestar, MediaType, delete, get, patch, post
from litestar.contrib.piccolo import PiccoloDTO
from litestar.dto import DTOConfig, DTOData
from litestar.exceptions import NotFoundException
from .piccolo_conf import DB
class Task(Table, db=DB):
"""
An example table.
"""
name = Varchar()
completed = Boolean(default=False)
class PatchDTO(PiccoloDTO[Task]):
"""Allow partial updates."""
config = DTOConfig(exclude={"id"}, partial=True)
@get(
"/tasks",
return_dto=PiccoloDTO[Task],
media_type=MediaType.JSON,
tags=["Task"],
)
async def tasks() -> List[Task]:
return await Task.select().order_by(Task.id, ascending=False)
@post(
"/tasks",
dto=PiccoloDTO[Task],
return_dto=PiccoloDTO[Task],
media_type=MediaType.JSON,
tags=["Task"],
)
async def create_task(data: Task) -> Task:
await data.save()
await data.refresh()
return data
@patch(
"/tasks/{task_id:int}",
dto=PatchDTO,
return_dto=PiccoloDTO[Task],
media_type=MediaType.JSON,
tags=["Task"],
)
async def update_task(task_id: int, data: DTOData[Task]) -> Task:
task = await Task.objects().get(Task.id == task_id)
if not task:
raise NotFoundException("Task does not exist")
result = data.update_instance(task)
await result.save()
return result
@delete("/tasks/{task_id:int}", tags=["Task"])
async def delete_task(task_id: int) -> None:
task = await Task.objects().get(Task.id == task_id)
if not task:
raise NotFoundException("Task does not exist")
await task.remove()
async def on_startup():
await create_db_tables(Task, if_not_exists=True)
app = Litestar(route_handlers=[tasks, create_task, delete_task, update_task], on_startup=[on_startup], debug=True)
from piccolo.columns import Boolean, Varchar
from piccolo.table import Table, create_db_tables
from litestar import Litestar, MediaType, delete, get, patch, post
from litestar.contrib.piccolo import PiccoloDTO
from litestar.dto import DTOConfig, DTOData
from litestar.exceptions import NotFoundException
from .piccolo_conf import DB
class Task(Table, db=DB):
"""
An example table.
"""
name = Varchar()
completed = Boolean(default=False)
class PatchDTO(PiccoloDTO[Task]):
"""Allow partial updates."""
config = DTOConfig(exclude={"id"}, partial=True)
@get(
"/tasks",
return_dto=PiccoloDTO[Task],
media_type=MediaType.JSON,
tags=["Task"],
)
async def tasks() -> list[Task]:
return await Task.select().order_by(Task.id, ascending=False)
@post(
"/tasks",
dto=PiccoloDTO[Task],
return_dto=PiccoloDTO[Task],
media_type=MediaType.JSON,
tags=["Task"],
)
async def create_task(data: Task) -> Task:
await data.save()
await data.refresh()
return data
@patch(
"/tasks/{task_id:int}",
dto=PatchDTO,
return_dto=PiccoloDTO[Task],
media_type=MediaType.JSON,
tags=["Task"],
)
async def update_task(task_id: int, data: DTOData[Task]) -> Task:
task = await Task.objects().get(Task.id == task_id)
if not task:
raise NotFoundException("Task does not exist")
result = data.update_instance(task)
await result.save()
return result
@delete("/tasks/{task_id:int}", tags=["Task"])
async def delete_task(task_id: int) -> None:
task = await Task.objects().get(Task.id == task_id)
if not task:
raise NotFoundException("Task does not exist")
await task.remove()
async def on_startup():
await create_db_tables(Task, if_not_exists=True)
app = Litestar(route_handlers=[tasks, create_task, delete_task, update_task], on_startup=[on_startup], debug=True)