Skip to content

Insert

Insert operations in Embar are straightforward. You create a table row instance and insert it into the database.

Basic Insert

Insert a single row:

import asyncio
import psycopg

from embar.db.pg import AsyncPgDb
from embar.column.common import Integer, Text
from embar.table import Table

class User(Table):
    id: Integer = Integer()
    email: Text = Text()

async def get_db(tables: list[Table] = None):
    tables = tables if tables is not None else [User]
    database_url = "postgres://pg:pw@localhost:25432/db"
    conn = await psycopg.AsyncConnection.connect(database_url)
    db = AsyncPgDb(conn)
    await db.migrate(tables)
    return db

async def basic():
    db = await get_db()
    user = User(id=1, email="alice@example.com")
    await db.insert(User).values(user)

asyncio.run(basic())

This generates:

INSERT INTO "user" ("id", "email") VALUES (%(id)s, %(email)s)

With parameters: {'id': 1, 'email': 'alice@example.com'}

Inserting Multiple Rows

Pass multiple row instances to .values():

async def multiple():
    users = [
        User(id=10, email="alice@example.com"),
        User(id=11, email="bob@example.com"),
        User(id=12, email="charlie@example.com"),
    ]
    db = await get_db()
    await db.insert(User).values(*users)

asyncio.run(multiple())

This generates a single insert statement with multiple parameter sets:

INSERT INTO "user" ("id", "email") VALUES (%(id)s, %(email)s)

With parameters:

[
    {'id': 10, 'email': 'alice@example.com'},
    {'id': 11, 'email': 'bob@example.com'},
    {'id': 12, 'email': 'charlie@example.com'},
]

Returning Inserted Data

Use .returning() to get back the inserted rows. This is useful for retrieving auto-generated IDs or default values:

async def returning():
    db = await get_db()
    user = User(id=20, email="alice@example.com")
    inserted = await db.insert(User).values(user).returning()

    # inserted is a list of User instances
    assert inserted[0].id == 20
    assert inserted[0].email == "alice@example.com"

asyncio.run(returning())

This generates:

INSERT INTO "user" ("id", "email") VALUES (%(id)s, %(email)s) RETURNING *

The RETURNING * clause tells the database to return all columns of the inserted row.

Working with Defaults

Columns with default values can be omitted when creating instances:

class UserStatus(Table):
    id: Integer = Integer()
    email: Text = Text(not_null=True)
    status: Text = Text(default="active")

user = UserStatus(id=30, email="alice@example.com")
# status will be set to "active" by the database

async def defaults():
    db = await get_db([UserStatus])
    inserted = await db.insert(UserStatus).values(user).returning()
    assert inserted[0].status == "active"

asyncio.run(defaults())

Viewing the SQL

Use .sql() to inspect the generated query without executing it:

async def view_sql():
    user = User(id=1, email="alice@example.com")
    query = db.insert(User).values(user).sql()

    print(query.sql)
    # INSERT INTO "user" ("id", "email") VALUES (%(id)s, %(email)s)

    print(query.many_params)
    # [{'id': 1, 'email': 'alice@example.com'}]

Inserting with Foreign Keys

When inserting rows with foreign key relationships, insert the parent row first:

class UserSimple(Table):
    id: Integer = Integer(primary=True)

class Message(Table):
    id: Integer = Integer(primary=True)
    user_id: Integer = Integer().fk(lambda: UserSimple.id)
    content: Text = Text()

async def relations():
    db = await get_db([Message,UserSimple])
    user = UserSimple(id=40)
    await db.insert(UserSimple).values(user)

    message = Message(id=1, user_id=user.id, content="Hello!")
    await db.insert(Message).values(message)

asyncio.run(relations())

On Conflict (Upsert)

Handle duplicate key conflicts with on_conflict_do_nothing() or on_conflict_do_update().

Do Nothing

Ignore rows that would cause a unique constraint violation:

import asyncio
import psycopg

from embar.db.pg import AsyncPgDb
from embar.column.common import Integer, Text
from embar.table import Table

class Product(Table):
    id: Integer = Integer(primary=True)
    name: Text = Text()

async def get_db(tables: list[Table] = None):
    tables = tables if tables is not None else [Product]
    database_url = "postgres://pg:pw@localhost:25432/db"
    conn = await psycopg.AsyncConnection.connect(database_url)
    db = AsyncPgDb(conn)
    await db.migrate(tables)
    return db

async def do_nothing():
    db = await get_db([Product])

    # Insert initial product (using on_conflict for idempotency)
    await db.insert(Product).values(Product(id=100, name="Widget")).on_conflict_do_nothing(("id",))

    # Attempt to insert duplicate - will be ignored
    await db.insert(Product).values(
        Product(id=100, name="Gadget")
    ).on_conflict_do_nothing(("id",))

asyncio.run(do_nothing())

This generates:

INSERT INTO "product" ("id", "name") VALUES (%(id)s, %(name)s)
ON CONFLICT (id) DO NOTHING

Do Update

Update existing rows when a conflict occurs:

async def do_update():
    db = await get_db([Product])

    # Insert initial product (using on_conflict for idempotency)
    await db.insert(Product).values(Product(id=101, name="Widget")).on_conflict_do_nothing(("id",))

    # Upsert - update name if id already exists
    await db.insert(Product).values(
        Product(id=101, name="Gadget")
    ).on_conflict_do_update(("id",), {"name": "Updated Widget"})

asyncio.run(do_update())

This generates:

INSERT INTO "product" ("id", "name") VALUES (%(id)s, %(name)s)
ON CONFLICT (id) DO UPDATE SET name = %(set_name_0)s

With Returning

Combine with .returning() to get the result:

async def upsert_returning():
    db = await get_db([Product])

    # Insert initial product (using on_conflict for idempotency)
    await db.insert(Product).values(Product(id=102, name="Widget")).on_conflict_do_nothing(("id",))

    result = await db.insert(Product).values(
        Product(id=102, name="Gadget")
    ).on_conflict_do_update(("id",), {"name": "Updated"}).returning()

    assert result[0].id == 102

asyncio.run(upsert_returning())