Raw SQL
Sometimes you need to write raw SQL. Embar provides template strings for safe SQL interpolation while maintaining type safety.
Template Strings
Use Python's template strings with the t prefix to write raw SQL:
import asyncio
import psycopg
from embar.column.common import Integer, Text
from embar.db.pg import AsyncPgDb
from embar.table import Table
class User(Table):
id: Integer = Integer(primary=True)
status: Text = Text()
email: Text = Text()
async def get_db(tables: list[Table] = None):
if tables is None:
tables = [User]
database_url = "postgres://pg:pw@localhost:25432/db"
conn = await psycopg.AsyncConnection.connect(database_url)
db = AsyncPgDb(conn)
await db.migrate(tables)
return db
async def example():
db = await get_db()
await db.sql(t"DELETE FROM {User}")
asyncio.run(example())
This generates:
DELETE FROM "user"
The {User} interpolation is replaced with the properly quoted table name.
Table names are automatically escaped and quoted according to your database dialect.
Column Name Interpolation
Reference columns directly in your raw SQL:
async def col():
db = await get_db()
await db.sql(t"SELECT {User.email} FROM {User}")
asyncio.run(col())
This generates:
SELECT "user"."email" FROM "user"
Both the table and column are properly escaped and quoted.
Queries Without Return Values
Use db.sql() for queries that don't return data:
async def noreturn():
db = await get_db()
# Delete
await db.sql(t"DELETE FROM {User} WHERE id > 100")
# Update
await db.sql(t"UPDATE {User} SET {User.status} = 'inactive' WHERE {User.email} LIKE 'foo.com'")
# Truncate
await db.sql(t"TRUNCATE TABLE {User} CASCADE")
asyncio.run(noreturn())
Queries That Return Data
Use .model() to specify a return type for queries that fetch data:
from typing import Annotated
from pydantic import BaseModel
class UserId(BaseModel):
id: Annotated[int, int]
async def returning():
db = await get_db()
users = await db.sql(t"SELECT id FROM {User} WHERE email LIKE '%@gmail.com'").model(UserId)
# [UserId(id=1), UserId(id=2), UserId(id=3)]
asyncio.run(returning())
This generates:
SELECT id FROM "user" WHERE email LIKE '%@gmail.com'
The returned data is parsed into the model you provide.
Complex Return Types
Models can have multiple fields:
class UserEmail(BaseModel):
id: Annotated[int, int]
email: Annotated[str, str]
async def complex():
db = await get_db()
users = await db.sql(t"SELECT id, email FROM {User} ORDER BY id DESC LIMIT 10").model(UserEmail)
asyncio.run(complex())
Or even nested types:
from datetime import datetime
class MessageWithUser(BaseModel):
content: Annotated[str, str]
user_email: Annotated[str, str]
created_at: Annotated[datetime, datetime]
class Message(Table):
id: Integer = Integer(primary=True)
user_id: Integer = Integer().fk(lambda: User.id)
content: Text = Text()
async def nested():
db = await get_db([User, Message])
messages = await (
db.sql(t"""
SELECT m.content, u.email as user_email, CURRENT_TIMESTAMP as created_at
FROM {Message} m
JOIN {User} u ON m.user_id = u.id
WHERE m.content LIKE '%important%'
""")
.model(MessageWithUser)
)
asyncio.run(nested())
Raw SQL in Select Queries
Use the Sql() class to include raw SQL within select queries:
from embar.sql import Sql
from embar.query.where import Eq
class UserWithCount(BaseModel):
id: Annotated[int, User.id]
email: Annotated[str, User.email]
message_count: Annotated[int, Sql(t"COUNT({Message.id})")]
class Message(Table):
id: Integer = Integer(primary=True)
user_id: Integer = Integer().fk(lambda: User.id)
content: Text = Text()
async def select():
db = await get_db([User, Message])
users = await (
db.select(UserWithCount)
.from_(User)
.left_join(Message, Eq(User.id, Message.user_id))
.group_by(User.id)
)
asyncio.run(select())
This generates:
SELECT
"user"."id" AS "id",
"user"."email" AS "email",
COUNT("message"."id") AS "message_count"
FROM "user"
LEFT JOIN "message" ON "user"."id" = "message"."user_id"
GROUP BY "user"."id"
The raw SQL is inserted directly into the SELECT clause.
Database-Specific Functions
Use raw SQL for database-specific functionality:
class UserWithTimestamp(BaseModel):
email: Annotated[str, User.email]
now: Annotated[datetime, Sql(t"CURRENT_TIMESTAMP")]
async def timestamp():
db = await get_db()
users = await db.select(UserWithTimestamp).from_(User)
asyncio.run(timestamp())
Or for Postgres-specific features:
# JSON operations
class UserWithJsonField(BaseModel):
email: Annotated[str, User.email]
metadata: Annotated[dict, Sql(t"jsonb_build_object('status', 'active')")]
# Array aggregation
class UserWithTags(BaseModel):
email: Annotated[str, User.email]
tags: Annotated[list[str], Sql(t"array_agg({Tag.name})")]
Mixing Raw SQL with Query Builders
Combine raw SQL with query builders for complex scenarios:
from embar.query.where import Gt
class UserStats(BaseModel):
id: Annotated[int, User.id]
email: Annotated[str, User.email]
score: Annotated[float, Sql(t"RANDOM() * 100")]
async def mix():
db = await get_db()
users = await (
db.select(UserStats)
.from_(User)
.where(Gt(User.id, 10))
.limit(5)
)
asyncio.run(mix())
This lets you use raw SQL where needed while maintaining type safety and parameterization for the rest of the query.
Viewing the SQL
Inspect raw SQL queries before execution:
async def raw_sql():
db = await get_db()
query = db.sql(t"DELETE FROM {User} WHERE id > 100").sql()
print(query)
# DELETE FROM "user" WHERE id > 100
asyncio.run(raw_sql())
For queries with models:
async def with_models():
db = await get_db()
query = (
db.sql(t"SELECT id, email FROM {User} WHERE id < 10")
.model(UserEmail)
.sql()
)
print(query)
# SELECT id, email FROM "user" WHERE id < 10
asyncio.run(with_models())