Select
Select operations retrieve data from your database. Embar provides a fluent interface for building SELECT queries with full type safety.
Basic Select
Select all columns from a table using SelectAll:
import asyncio
import psycopg
from embar.column.common import Integer, Text
from embar.db.pg import AsyncPgDb
from embar.model import SelectAll
from embar.table import Table
class User(Table):
id: Integer = Integer(primary=True)
email: Text = Text()
async def get_db(tables: list[Table] = None):
if tables is None:
tables = [User]
database_url = "postgres://pg:pw@localhost:25432/db"
conn = await psycopg.AsyncConnection.connect(database_url)
db = AsyncPgDb(conn)
await db.migrate(tables)
return db
async def basic():
db = await get_db()
users = await db.select(SelectAll).from_(User)
# [User(id=1, email="alice@example.com"), User(id=2, email="bob@example.com")]
asyncio.run(basic())
This generates:
SELECT "user"."id", "user"."email" FROM "user"
Selecting Specific Columns
Define a custom model to select only specific columns:
from typing import Annotated
from pydantic import BaseModel
class UserEmail(BaseModel):
email: Annotated[str, User.email]
async def columns():
db = await get_db()
users = await db.select(UserEmail).from_(User)
# [UserEmail(email="alice@example.com"), UserEmail(email="bob@example.com")]
asyncio.run(columns())
This generates:
SELECT "user"."email" AS "email" FROM "user"
The Annotated type tells Embar which table column maps to each field in your result model.
Where Clauses
Filter results with .where():
from embar.query.where import Eq
async def where():
db = await get_db()
users = await (
db.select(SelectAll)
.from_(User)
.where(Eq(User.id, 1))
)
asyncio.run(where())
This generates:
SELECT "user"."id", "user"."email" FROM "user" WHERE "user"."id" = %(p0)s
Where clauses can be combined with And, Or, and other operators. See Where for details.
Joins
Join related tables using .left_join(), .right_join(), .inner_join(), .full_join(), or .cross_join():
from embar.query.where import Eq
class Message(Table):
id: Integer = Integer(primary=True)
user_id: Integer = Integer().fk(lambda: User.id)
content: Text = Text()
class UserWithMessages(BaseModel):
id: Annotated[int, User.id]
email: Annotated[str, User.email]
async def joins():
db = await get_db([User, Message])
users = await (
db.select(UserWithMessages)
.from_(User)
.left_join(Message, Eq(User.id, Message.user_id))
)
asyncio.run(joins())
This generates:
SELECT "user"."id" AS "id", "user"."email" AS "email"
FROM "user"
LEFT JOIN "message" ON "user"."id" = "message"."user_id"
For more on joins and nested data, see Joins.
Selecting Nested Arrays
Use .many() to select arrays of values or full nested objects:
class UserWithMessages(BaseModel):
id: Annotated[int, User.id]
messages: Annotated[list[str], Message.content.many()]
async def arrays():
db = await get_db([User, Message])
users = await (
db.select(UserWithMessages)
.from_(User)
.left_join(Message, Eq(User.id, Message.user_id))
.group_by(User.id)
)
# [UserWithMessages(id=1, messages=["Hello!", "How are you?"])]
asyncio.run(arrays())
Or select full nested objects:
class UserWithFullMessages(BaseModel):
id: Annotated[int, User.id]
messages: Annotated[list[Message], Message.many()]
async def nested():
db = await get_db([User, Message])
users = await (
db.select(UserWithFullMessages)
.from_(User)
.left_join(Message, Eq(User.id, Message.user_id))
.group_by(User.id)
)
# [UserWithFullMessages(
# id=1,
# messages=[
# Message(id=1, user_id=1, content="Hello!"),
# Message(id=2, user_id=1, content="How are you?")
# ]
# )]
asyncio.run(nested())
Distinct
Select distinct rows using select_distinct():
async def distinct():
db = await get_db()
users = await db.select_distinct(SelectAll).from_(User)
asyncio.run(distinct())
This generates:
SELECT DISTINCT "user"."id", "user"."email" FROM "user"
Group By
Group results with .group_by():
class UserMessageCount(BaseModel):
id: Annotated[int, User.id]
messages: Annotated[list[str], Message.content.many()]
async def group_by():
db = await get_db([User, Message])
users = await (
db.select(UserMessageCount)
.from_(User)
.left_join(Message, Eq(User.id, Message.user_id))
.group_by(User.id)
)
asyncio.run(group_by())
This generates:
SELECT "user"."id" AS "id", json_agg("message"."content") AS "messages"
FROM "user"
LEFT JOIN "message" ON "user"."id" = "message"."user_id"
GROUP BY "user"."id"
Having
Filter grouped results with .having():
from embar.query.where import Gt
from embar.sql import Sql
class UserWithCount(BaseModel):
id: Annotated[int, User.id]
message_count: Annotated[int, Sql(t"COUNT({Message.id})")]
async def having():
db = await get_db([User, Message])
users = await (
db.select(UserWithCount)
.from_(User)
.left_join(Message, Eq(User.id, Message.user_id))
.group_by(User.id)
.having(Gt(User.id, 2))
)
asyncio.run(having())
This generates:
SELECT "user"."id" AS "id", COUNT("message"."id") AS "message_count"
FROM "user"
LEFT JOIN "message" ON "user"."id" = "message"."user_id"
GROUP BY "user"."id"
HAVING COUNT("message"."id") > %(p0)s
The HAVING clause filters groups after aggregation, while WHERE filters rows before grouping.
Order By
Sort results with .order_by():
from embar.query.order_by import Asc, Desc
async def order_by():
db = await get_db()
users = await (
db.select(SelectAll)
.from_(User)
.order_by(Desc(User.id))
)
asyncio.run(order_by())
This generates:
SELECT "user"."id", "user"."email" FROM "user" ORDER BY "user"."id" DESC
You can order by multiple columns:
async def order_multi():
db = await get_db()
users = await (
db.select(SelectAll)
.from_(User)
.order_by(Asc(User.email), Desc(User.id))
)
asyncio.run(order_multi())
Control null ordering:
async def nulls():
db = await get_db()
users = await (
db.select(SelectAll)
.from_(User)
.order_by(Asc(User.email, nulls="last"))
)
asyncio.run(nulls())
This generates:
SELECT "user"."id", "user"."email" FROM "user" ORDER BY "user"."email" ASC NULLS LAST
Limit and Offset
Paginate results with .limit() and .offset():
async def limit():
db = await get_db()
users = await (
db.select(SelectAll)
.from_(User)
.limit(10)
.offset(20)
)
asyncio.run(limit())
This generates:
SELECT "user"."id", "user"."email" FROM "user" LIMIT 10 OFFSET 20
Aggregations
Use raw SQL for aggregations:
from embar.sql import Sql
class UserStats(BaseModel):
total: Annotated[int, Sql(t"COUNT(*)::int")]
avg_id: Annotated[float | None, Sql(t"AVG({User.id})::float")]
async def aggregation():
db = await get_db()
stats = await db.select(UserStats).from_(User)
# [UserStats(total=100, avg_id=50.5)]
asyncio.run(aggregation())
Common aggregations include COUNT(), SUM(), AVG(), MIN(), and MAX().
Viewing the SQL
Inspect the generated query without executing it:
async def raw_sql():
db = await get_db()
query = (
db.select(SelectAll)
.from_(User)
.where(Eq(User.id, 1))
.sql()
)
print(query.sql)
# SELECT "user"."id", "user"."email" FROM "user" WHERE "user"."id" = %(p0)s
print(query.params)
# {'p0': 1}
asyncio.run(raw_sql())