Skip to content

pg

Postgres database clients for sync and async operations.

AsyncPgDb

Bases: AsyncDbBase

Postgres database client for async operations.

Source code in src/embar/db/pg.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
@final
class AsyncPgDb(AsyncDbBase):
    """
    Postgres database client for async operations.
    """

    db_type = "postgres"
    conn: AsyncConnection
    _commit_after_execute: bool = True

    def __init__(self, connection: AsyncConnection):
        """
        Create a new AsyncPgDb instance.
        """
        self.conn = connection

    async def close(self):
        """
        Close the database connection.
        """
        if self.conn:
            await self.conn.close()

    def transaction(self) -> AsyncPgDbTransaction:
        """
        Start an isolated transaction.

        ```python notest
        from embar.db.pg import AsyncPgDb
        db = AsyncPgDb(None)

        async with db.transaction() as tx:
            ...
        ```
        """
        db_copy = AsyncPgDb(self.conn)
        db_copy._commit_after_execute = False
        return AsyncPgDbTransaction(db_copy)

    def select[M: BaseModel](self, model: type[M]) -> SelectQuery[M, Self]:
        """
        Create a SELECT query.
        """
        return SelectQuery[M, Self](db=self, model=model)

    def select_distinct[M: BaseModel](self, model: type[M]) -> SelectDistinctQuery[M, Self]:
        """
        Create a SELECT query.
        """
        return SelectDistinctQuery[M, Self](db=self, model=model)

    def insert[T: Table](self, table: type[T]) -> InsertQuery[T, Self]:
        """
        Create an INSERT query.
        """
        return InsertQuery[T, Self](table=table, db=self)

    def update[T: Table](self, table: type[T]) -> UpdateQuery[T, Self]:
        """
        Create an UPDATE query.
        """
        return UpdateQuery[T, Self](table=table, db=self)

    def delete[T: Table](self, table: type[T]) -> DeleteQueryReady[T, Self]:
        """
        Create an UPDATE query.
        """
        return DeleteQueryReady[T, Self](table=table, db=self)

    def sql(self, template: Template) -> DbSql[Self]:
        """
        Execute a raw SQL query using template strings.
        """
        return DbSql(template, self)

    def migrate(self, tables: Sequence[type[Table]], enums: Sequence[type[EnumBase]] | None = None) -> Migration[Self]:
        """
        Create a migration from a list of tables.
        """
        ddls = merge_ddls(MigrationDefs(tables, enums))
        return Migration(ddls, self)

    def migrates(self, schema: types.ModuleType) -> Migration[Self]:
        """
        Create a migration from a schema module.
        """
        defs = get_migration_defs(schema)
        return self.migrate(defs.tables, defs.enums)

    @override
    async def execute(self, query: QuerySingle) -> None:
        """
        Execute a query without returning results.
        """
        await self.conn.execute(query.sql, query.params)  # pyright:ignore[reportArgumentType]
        if self._commit_after_execute:
            await self.conn.commit()

    @override
    async def executemany(self, query: QueryMany):
        """
        Execute a query with multiple parameter sets.
        """
        params = _jsonify_dicts(query.many_params)
        async with self.conn.cursor() as cur:
            await cur.executemany(query.sql, params)  # pyright:ignore[reportArgumentType]
            if self._commit_after_execute:
                await self.conn.commit()

    @override
    async def fetch(self, query: QuerySingle | QueryMany) -> list[dict[str, Any]]:
        """
        Execute a query and return results as a list of dicts.
        """
        async with self.conn.cursor() as cur:
            if isinstance(query, QuerySingle):
                await cur.execute(query.sql, query.params)  # pyright:ignore[reportArgumentType]
            else:
                await cur.executemany(query.sql, query.many_params, returning=True)  # pyright:ignore[reportArgumentType]

            if cur.description is None:
                return []
            columns: list[str] = [desc[0] for desc in cur.description]
            results: list[dict[str, Any]] = []

            for row in await cur.fetchall():
                data = dict(zip(columns, row))
                results.append(data)
        if self._commit_after_execute:
            await self.conn.commit()
        return results

    @override
    async def truncate(self, schema: str | None = None):
        """
        Truncate all tables in the schema.
        """
        schema = schema if schema is not None else "public"
        tables = await self._get_live_table_names(schema)
        if tables is None:
            return
        table_names = ", ".join(tables)
        async with self.conn.cursor() as cursor:
            await cursor.execute(f"TRUNCATE TABLE {table_names} CASCADE")  # pyright:ignore[reportArgumentType]
            if self._commit_after_execute:
                await self.conn.commit()

    @override
    async def drop_tables(self, schema: str | None = None):
        """
        Drop all tables in the schema.
        """
        schema = schema if schema is not None else "public"
        tables = await self._get_live_table_names(schema)
        if tables is None:
            return
        table_names = ", ".join(tables)
        async with self.conn.cursor() as cursor:
            await cursor.execute(f"DROP TABLE {table_names} CASCADE")  # pyright:ignore[reportArgumentType]
            if self._commit_after_execute:
                await self.conn.commit()

    async def _get_live_table_names(self, schema: str) -> list[str] | None:
        async with self.conn.cursor() as cursor:
            # Get all table names from public schema
            await cursor.execute(f"SELECT tablename FROM pg_tables WHERE schemaname = '{schema}'")  # pyright:ignore[reportArgumentType]
            tables = await cursor.fetchall()
            if not tables:
                return None
            table_names = [f'"{table[0]}"' for table in tables]
        return table_names

__init__(connection)

Create a new AsyncPgDb instance.

Source code in src/embar/db/pg.py
240
241
242
243
244
def __init__(self, connection: AsyncConnection):
    """
    Create a new AsyncPgDb instance.
    """
    self.conn = connection

close() async

Close the database connection.

Source code in src/embar/db/pg.py
246
247
248
249
250
251
async def close(self):
    """
    Close the database connection.
    """
    if self.conn:
        await self.conn.close()

delete(table)

Create an UPDATE query.

Source code in src/embar/db/pg.py
293
294
295
296
297
def delete[T: Table](self, table: type[T]) -> DeleteQueryReady[T, Self]:
    """
    Create an UPDATE query.
    """
    return DeleteQueryReady[T, Self](table=table, db=self)

drop_tables(schema=None) async

Drop all tables in the schema.

Source code in src/embar/db/pg.py
377
378
379
380
381
382
383
384
385
386
387
388
389
390
@override
async def drop_tables(self, schema: str | None = None):
    """
    Drop all tables in the schema.
    """
    schema = schema if schema is not None else "public"
    tables = await self._get_live_table_names(schema)
    if tables is None:
        return
    table_names = ", ".join(tables)
    async with self.conn.cursor() as cursor:
        await cursor.execute(f"DROP TABLE {table_names} CASCADE")  # pyright:ignore[reportArgumentType]
        if self._commit_after_execute:
            await self.conn.commit()

execute(query) async

Execute a query without returning results.

Source code in src/embar/db/pg.py
319
320
321
322
323
324
325
326
@override
async def execute(self, query: QuerySingle) -> None:
    """
    Execute a query without returning results.
    """
    await self.conn.execute(query.sql, query.params)  # pyright:ignore[reportArgumentType]
    if self._commit_after_execute:
        await self.conn.commit()

executemany(query) async

Execute a query with multiple parameter sets.

Source code in src/embar/db/pg.py
328
329
330
331
332
333
334
335
336
337
@override
async def executemany(self, query: QueryMany):
    """
    Execute a query with multiple parameter sets.
    """
    params = _jsonify_dicts(query.many_params)
    async with self.conn.cursor() as cur:
        await cur.executemany(query.sql, params)  # pyright:ignore[reportArgumentType]
        if self._commit_after_execute:
            await self.conn.commit()

fetch(query) async

Execute a query and return results as a list of dicts.

Source code in src/embar/db/pg.py
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
@override
async def fetch(self, query: QuerySingle | QueryMany) -> list[dict[str, Any]]:
    """
    Execute a query and return results as a list of dicts.
    """
    async with self.conn.cursor() as cur:
        if isinstance(query, QuerySingle):
            await cur.execute(query.sql, query.params)  # pyright:ignore[reportArgumentType]
        else:
            await cur.executemany(query.sql, query.many_params, returning=True)  # pyright:ignore[reportArgumentType]

        if cur.description is None:
            return []
        columns: list[str] = [desc[0] for desc in cur.description]
        results: list[dict[str, Any]] = []

        for row in await cur.fetchall():
            data = dict(zip(columns, row))
            results.append(data)
    if self._commit_after_execute:
        await self.conn.commit()
    return results

insert(table)

Create an INSERT query.

Source code in src/embar/db/pg.py
281
282
283
284
285
def insert[T: Table](self, table: type[T]) -> InsertQuery[T, Self]:
    """
    Create an INSERT query.
    """
    return InsertQuery[T, Self](table=table, db=self)

migrate(tables, enums=None)

Create a migration from a list of tables.

Source code in src/embar/db/pg.py
305
306
307
308
309
310
def migrate(self, tables: Sequence[type[Table]], enums: Sequence[type[EnumBase]] | None = None) -> Migration[Self]:
    """
    Create a migration from a list of tables.
    """
    ddls = merge_ddls(MigrationDefs(tables, enums))
    return Migration(ddls, self)

migrates(schema)

Create a migration from a schema module.

Source code in src/embar/db/pg.py
312
313
314
315
316
317
def migrates(self, schema: types.ModuleType) -> Migration[Self]:
    """
    Create a migration from a schema module.
    """
    defs = get_migration_defs(schema)
    return self.migrate(defs.tables, defs.enums)

select(model)

Create a SELECT query.

Source code in src/embar/db/pg.py
269
270
271
272
273
def select[M: BaseModel](self, model: type[M]) -> SelectQuery[M, Self]:
    """
    Create a SELECT query.
    """
    return SelectQuery[M, Self](db=self, model=model)

select_distinct(model)

Create a SELECT query.

Source code in src/embar/db/pg.py
275
276
277
278
279
def select_distinct[M: BaseModel](self, model: type[M]) -> SelectDistinctQuery[M, Self]:
    """
    Create a SELECT query.
    """
    return SelectDistinctQuery[M, Self](db=self, model=model)

sql(template)

Execute a raw SQL query using template strings.

Source code in src/embar/db/pg.py
299
300
301
302
303
def sql(self, template: Template) -> DbSql[Self]:
    """
    Execute a raw SQL query using template strings.
    """
    return DbSql(template, self)

transaction()

Start an isolated transaction.

```python notest from embar.db.pg import AsyncPgDb db = AsyncPgDb(None)

async with db.transaction() as tx: ... ```

Source code in src/embar/db/pg.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def transaction(self) -> AsyncPgDbTransaction:
    """
    Start an isolated transaction.

    ```python notest
    from embar.db.pg import AsyncPgDb
    db = AsyncPgDb(None)

    async with db.transaction() as tx:
        ...
    ```
    """
    db_copy = AsyncPgDb(self.conn)
    db_copy._commit_after_execute = False
    return AsyncPgDbTransaction(db_copy)

truncate(schema=None) async

Truncate all tables in the schema.

Source code in src/embar/db/pg.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
@override
async def truncate(self, schema: str | None = None):
    """
    Truncate all tables in the schema.
    """
    schema = schema if schema is not None else "public"
    tables = await self._get_live_table_names(schema)
    if tables is None:
        return
    table_names = ", ".join(tables)
    async with self.conn.cursor() as cursor:
        await cursor.execute(f"TRUNCATE TABLE {table_names} CASCADE")  # pyright:ignore[reportArgumentType]
        if self._commit_after_execute:
            await self.conn.commit()

update(table)

Create an UPDATE query.

Source code in src/embar/db/pg.py
287
288
289
290
291
def update[T: Table](self, table: type[T]) -> UpdateQuery[T, Self]:
    """
    Create an UPDATE query.
    """
    return UpdateQuery[T, Self](table=table, db=self)

AsyncPgDbTransaction

Transaction context manager for AsyncPgDb.

Source code in src/embar/db/pg.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
class AsyncPgDbTransaction:
    """
    Transaction context manager for AsyncPgDb.
    """

    _db: AsyncPgDb
    _tx: AbstractAsyncContextManager[AsyncTransaction] | None = None

    def __init__(self, db: AsyncPgDb):
        self._db = db

    async def __aenter__(self) -> AsyncPgDb:
        self._tx = self._db.conn.transaction()
        await self._tx.__aenter__()
        return self._db

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: types.TracebackType | None,
    ):
        if self._tx is None:
            return False
        return await self._tx.__aexit__(exc_type, exc_val, exc_tb)

PgDb

Bases: DbBase

Postgres database client for synchronous operations.

Source code in src/embar/db/pg.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
@final
class PgDb(DbBase):
    """
    Postgres database client for synchronous operations.
    """

    db_type = "postgres"
    conn: Connection
    _commit_after_execute: bool = True

    def __init__(self, connection: Connection):
        """
        Create a new PgDb instance.
        """
        self.conn = connection

    def close(self):
        """
        Close the database connection.
        """
        if self.conn:
            self.conn.close()

    def transaction(self) -> PgDbTransaction:
        """
        Start an isolated transaction.

        ```python notest
        from embar.db.pg import PgDb
        db = PgDb(None)

        with db.transaction() as tx:
            ...
        ```
        """
        db_copy = PgDb(self.conn)
        db_copy._commit_after_execute = False
        return PgDbTransaction(db_copy)

    def select[M: BaseModel](self, model: type[M]) -> SelectQuery[M, Self]:
        """
        Create a SELECT query.
        """
        return SelectQuery[M, Self](db=self, model=model)

    def select_distinct[M: BaseModel](self, model: type[M]) -> SelectDistinctQuery[M, Self]:
        """
        Create a SELECT query.
        """
        return SelectDistinctQuery[M, Self](db=self, model=model)

    def insert[T: Table](self, table: type[T]) -> InsertQuery[T, Self]:
        """
        Create an INSERT query.
        """
        return InsertQuery[T, Self](table=table, db=self)

    def update[T: Table](self, table: type[T]) -> UpdateQuery[T, Self]:
        """
        Create an UPDATE query.
        """
        return UpdateQuery[T, Self](table=table, db=self)

    def delete[T: Table](self, table: type[T]) -> DeleteQueryReady[T, Self]:
        """
        Create an UPDATE query.
        """
        return DeleteQueryReady[T, Self](table=table, db=self)

    def sql(self, template: Template) -> DbSql[Self]:
        """
        Execute a raw SQL query using template strings.
        """
        return DbSql(template, self)

    def migrate(self, tables: Sequence[type[Table]], enums: Sequence[type[EnumBase]] | None = None) -> Migration[Self]:
        """
        Create a migration from a list of tables.
        """
        ddls = merge_ddls(MigrationDefs(tables, enums))
        return Migration(ddls, self)

    def migrates(self, schema: types.ModuleType) -> Migration[Self]:
        """
        Create a migration from a schema module.
        """
        defs = get_migration_defs(schema)
        return self.migrate(defs.tables, defs.enums)

    @override
    def execute(self, query: QuerySingle) -> None:
        """
        Execute a query without returning results.
        """
        self.conn.execute(query.sql, query.params)  # pyright:ignore[reportArgumentType]
        if self._commit_after_execute:
            self.conn.commit()

    @override
    def executemany(self, query: QueryMany):
        """
        Execute a query with multiple parameter sets.
        """
        params = _jsonify_dicts(query.many_params)
        with self.conn.cursor() as cur:
            cur.executemany(query.sql, params)  # pyright:ignore[reportArgumentType]
        if self._commit_after_execute:
            self.conn.commit()

    @override
    def fetch(self, query: QuerySingle | QueryMany) -> list[dict[str, Any]]:
        """
        Execute a query and return results as a list of dicts.
        """
        with self.conn.cursor() as cur:
            if isinstance(query, QuerySingle):
                cur.execute(query.sql, query.params)  # pyright:ignore[reportArgumentType]
            else:
                cur.executemany(query.sql, query.many_params, returning=True)  # pyright:ignore[reportArgumentType]

            if cur.description is None:
                return []
            columns: list[str] = [desc[0] for desc in cur.description]
            results: list[dict[str, Any]] = []
            for row in cur.fetchall():
                data = dict(zip(columns, row))
                results.append(data)
        if self._commit_after_execute:
            self.conn.commit()  # Commit after SELECT
        return results

    @override
    def truncate(self, schema: str | None = None):
        """
        Truncate all tables in the schema.
        """
        schema = schema if schema is not None else "public"
        tables = self._get_live_table_names(schema)
        if tables is None:
            return
        table_names = ", ".join(tables)
        with self.conn.cursor() as cursor:
            cursor.execute(f"TRUNCATE TABLE {table_names} CASCADE")  # pyright:ignore[reportArgumentType]
            if self._commit_after_execute:
                self.conn.commit()

    @override
    def drop_tables(self, schema: str | None = None):
        """
        Drop all tables in the schema.
        """
        schema = schema if schema is not None else "public"
        tables = self._get_live_table_names(schema)
        if tables is None:
            return
        table_names = ", ".join(tables)
        with self.conn.cursor() as cursor:
            cursor.execute(f"DROP TABLE {table_names} CASCADE")  # pyright:ignore[reportArgumentType]
            if self._commit_after_execute:
                self.conn.commit()

    def _get_live_table_names(self, schema: str) -> list[str] | None:
        with self.conn.cursor() as cursor:
            # Get all table names from public schema
            cursor.execute(f"SELECT tablename FROM pg_tables WHERE schemaname = '{schema}'")  # pyright:ignore[reportArgumentType]
            tables = cursor.fetchall()
            if not tables:
                return None
            table_names = [f'"{table[0]}"' for table in tables]
        return table_names

__init__(connection)

Create a new PgDb instance.

Source code in src/embar/db/pg.py
41
42
43
44
45
def __init__(self, connection: Connection):
    """
    Create a new PgDb instance.
    """
    self.conn = connection

close()

Close the database connection.

Source code in src/embar/db/pg.py
47
48
49
50
51
52
def close(self):
    """
    Close the database connection.
    """
    if self.conn:
        self.conn.close()

delete(table)

Create an UPDATE query.

Source code in src/embar/db/pg.py
94
95
96
97
98
def delete[T: Table](self, table: type[T]) -> DeleteQueryReady[T, Self]:
    """
    Create an UPDATE query.
    """
    return DeleteQueryReady[T, Self](table=table, db=self)

drop_tables(schema=None)

Drop all tables in the schema.

Source code in src/embar/db/pg.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
@override
def drop_tables(self, schema: str | None = None):
    """
    Drop all tables in the schema.
    """
    schema = schema if schema is not None else "public"
    tables = self._get_live_table_names(schema)
    if tables is None:
        return
    table_names = ", ".join(tables)
    with self.conn.cursor() as cursor:
        cursor.execute(f"DROP TABLE {table_names} CASCADE")  # pyright:ignore[reportArgumentType]
        if self._commit_after_execute:
            self.conn.commit()

execute(query)

Execute a query without returning results.

Source code in src/embar/db/pg.py
120
121
122
123
124
125
126
127
@override
def execute(self, query: QuerySingle) -> None:
    """
    Execute a query without returning results.
    """
    self.conn.execute(query.sql, query.params)  # pyright:ignore[reportArgumentType]
    if self._commit_after_execute:
        self.conn.commit()

executemany(query)

Execute a query with multiple parameter sets.

Source code in src/embar/db/pg.py
129
130
131
132
133
134
135
136
137
138
@override
def executemany(self, query: QueryMany):
    """
    Execute a query with multiple parameter sets.
    """
    params = _jsonify_dicts(query.many_params)
    with self.conn.cursor() as cur:
        cur.executemany(query.sql, params)  # pyright:ignore[reportArgumentType]
    if self._commit_after_execute:
        self.conn.commit()

fetch(query)

Execute a query and return results as a list of dicts.

Source code in src/embar/db/pg.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@override
def fetch(self, query: QuerySingle | QueryMany) -> list[dict[str, Any]]:
    """
    Execute a query and return results as a list of dicts.
    """
    with self.conn.cursor() as cur:
        if isinstance(query, QuerySingle):
            cur.execute(query.sql, query.params)  # pyright:ignore[reportArgumentType]
        else:
            cur.executemany(query.sql, query.many_params, returning=True)  # pyright:ignore[reportArgumentType]

        if cur.description is None:
            return []
        columns: list[str] = [desc[0] for desc in cur.description]
        results: list[dict[str, Any]] = []
        for row in cur.fetchall():
            data = dict(zip(columns, row))
            results.append(data)
    if self._commit_after_execute:
        self.conn.commit()  # Commit after SELECT
    return results

insert(table)

Create an INSERT query.

Source code in src/embar/db/pg.py
82
83
84
85
86
def insert[T: Table](self, table: type[T]) -> InsertQuery[T, Self]:
    """
    Create an INSERT query.
    """
    return InsertQuery[T, Self](table=table, db=self)

migrate(tables, enums=None)

Create a migration from a list of tables.

Source code in src/embar/db/pg.py
106
107
108
109
110
111
def migrate(self, tables: Sequence[type[Table]], enums: Sequence[type[EnumBase]] | None = None) -> Migration[Self]:
    """
    Create a migration from a list of tables.
    """
    ddls = merge_ddls(MigrationDefs(tables, enums))
    return Migration(ddls, self)

migrates(schema)

Create a migration from a schema module.

Source code in src/embar/db/pg.py
113
114
115
116
117
118
def migrates(self, schema: types.ModuleType) -> Migration[Self]:
    """
    Create a migration from a schema module.
    """
    defs = get_migration_defs(schema)
    return self.migrate(defs.tables, defs.enums)

select(model)

Create a SELECT query.

Source code in src/embar/db/pg.py
70
71
72
73
74
def select[M: BaseModel](self, model: type[M]) -> SelectQuery[M, Self]:
    """
    Create a SELECT query.
    """
    return SelectQuery[M, Self](db=self, model=model)

select_distinct(model)

Create a SELECT query.

Source code in src/embar/db/pg.py
76
77
78
79
80
def select_distinct[M: BaseModel](self, model: type[M]) -> SelectDistinctQuery[M, Self]:
    """
    Create a SELECT query.
    """
    return SelectDistinctQuery[M, Self](db=self, model=model)

sql(template)

Execute a raw SQL query using template strings.

Source code in src/embar/db/pg.py
100
101
102
103
104
def sql(self, template: Template) -> DbSql[Self]:
    """
    Execute a raw SQL query using template strings.
    """
    return DbSql(template, self)

transaction()

Start an isolated transaction.

```python notest from embar.db.pg import PgDb db = PgDb(None)

with db.transaction() as tx: ... ```

Source code in src/embar/db/pg.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def transaction(self) -> PgDbTransaction:
    """
    Start an isolated transaction.

    ```python notest
    from embar.db.pg import PgDb
    db = PgDb(None)

    with db.transaction() as tx:
        ...
    ```
    """
    db_copy = PgDb(self.conn)
    db_copy._commit_after_execute = False
    return PgDbTransaction(db_copy)

truncate(schema=None)

Truncate all tables in the schema.

Source code in src/embar/db/pg.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
@override
def truncate(self, schema: str | None = None):
    """
    Truncate all tables in the schema.
    """
    schema = schema if schema is not None else "public"
    tables = self._get_live_table_names(schema)
    if tables is None:
        return
    table_names = ", ".join(tables)
    with self.conn.cursor() as cursor:
        cursor.execute(f"TRUNCATE TABLE {table_names} CASCADE")  # pyright:ignore[reportArgumentType]
        if self._commit_after_execute:
            self.conn.commit()

update(table)

Create an UPDATE query.

Source code in src/embar/db/pg.py
88
89
90
91
92
def update[T: Table](self, table: type[T]) -> UpdateQuery[T, Self]:
    """
    Create an UPDATE query.
    """
    return UpdateQuery[T, Self](table=table, db=self)

PgDbTransaction

Transaction context manager for PgDb.

Source code in src/embar/db/pg.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
class PgDbTransaction:
    """
    Transaction context manager for PgDb.
    """

    _db: PgDb
    _tx: AbstractContextManager[Transaction] | None = None

    def __init__(self, db: PgDb):
        self._db = db

    def __enter__(self) -> PgDb:
        self._tx = self._db.conn.transaction()
        self._tx.__enter__()
        return self._db

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: types.TracebackType | None,
    ):
        if self._tx is None:
            return False
        return self._tx.__exit__(exc_type, exc_val, exc_tb)