Quickstart
The quickstart uses the non-async sqlite client to make an easy example.
If you want to see a fully worked Postgres example, check out the Postgres Quickstart.
Install
uv add embar
Set up database models
# schema.py
from embar.column.common import Integer, Text
from embar.config import EmbarConfig
from embar.table import Table
class User(Table):
# If you don't provide a table name, it is generated from your class name
embar_config: EmbarConfig = EmbarConfig(table_name="users")
id: Integer = Integer(primary=True)
# Columns will also generate their own name if not provided
email: Text = Text("user_email", default="text", not_null=True)
class Message(Table):
id: Integer = Integer()
# Foreign key constraints are easy to add
user_id: Integer = Integer().fk(lambda: User.id)
content: Text = Text()
Create client and apply migrations
In production, you would (probably) use the embar CLI to generate and run migrations.
This example uses the utility function to do it all in code.
# main.py
import sqlite3
from embar.db.sqlite import SqliteDb
conn = sqlite3.connect(":memory:")
db = SqliteDb(conn)
db.migrate([User, Message]).run()
Insert some data
user = User(id=1, email="foo@bar.com")
message = Message(id=1, user_id=user.id, content="Hello!")
db.insert(User).values(user).run()
# you can return your inserted data if you want
msg_inserted = db.insert(Message).values(message).returning().run()
assert msg_inserted[0].content == message.content
Query some data
With join, where and group by.
from typing import Annotated
from pydantic import BaseModel
from embar.query.where import Eq, Like, Or
class UserSel(BaseModel):
id: Annotated[int, User.id]
messages: Annotated[list[str], Message.content.many()]
users = (
db.select(UserSel)
.from_(User)
.left_join(Message, Eq(User.id, Message.user_id))
.where(Or(
Eq(User.id, 1),
Like(User.email, "foo%")
))
.group_by(User.id)
.run()
)
# [ UserSel(id=1, messages=['Hello!']) ]
Query some more data
This time with fully nested child tables, and some raw SQL.
from datetime import datetime
from embar.sql import Sql
class UserHydrated(BaseModel):
email: Annotated[str, User.email]
messages: Annotated[list[Message], Message.many()]
date: Annotated[datetime, Sql(t"CURRENT_TIMESTAMP")]
users = (
db.select(UserHydrated)
.from_(User)
.left_join(Message, Eq(User.id, Message.user_id))
.group_by(User.id)
.limit(2)
.run()
)
# [UserHydrated(
# email='foo@bar.com',
# messages=[Message(content='Hello!', id=1, user_id=1)],
# date: datetime(2025, 10, 26, ...)
# )]
See the SQL
Every query produces exactly one... query.
And you can always see what's happening under the hood with the .sql() method:
users_query = (
db.select(UserHydrated)
.from_(User)
.left_join(Message, Eq(User.id, Message.user_id))
.group_by(User.id)
.sql()
)
users_query.sql
# SELECT
# "users"."user_email" AS "email",
# json_group_array(json_object(
# 'id', "message"."id",
# 'user_id', "message"."user_id",
# 'content', "message"."content"
# )) AS "messages",
# CURRENT_TIMESTAMP AS "date"
# FROM "users"
# LEFT JOIN "message" ON "users"."id" = "message"."user_id"
# GROUP BY "users"."id"
Update a row
Unfortunately this requires another model to be defined, as Python doesn't have a Partial[] type.
from typing import TypedDict
class MessageUpdate(TypedDict, total=False):
id: int
user_id: int
content: str
(
db.update(Message)
.set(MessageUpdate(content="Goodbye"))
.where(Eq(Message.id, 1))
.run()
)
Delete some rows
And return the deleted data if you like.
deleted = db.delete(Message).returning().run()
assert len(deleted) == 1
Add indexes
from embar.constraint import Index
class MessageIndexed(Table):
embar_config: EmbarConfig = EmbarConfig(
constraints=[Index("message_idx").on(lambda: MessageIndexed.user_id)]
)
user_id: Integer = Integer().fk(lambda: User.id)
Run raw SQL
db.sql(t"DELETE FROM {Message}").run()
Or with a return:
class UserId(BaseModel):
id: Annotated[int, int]
res = (
db.sql(t"SELECT * FROM {User}")
.model(UserId)
.run()
)
# [UserId(id=1)]