Skip to content

delete

Select query builder.

DeleteQueryReady

DeleteQueryReady is used to delete data from a table.

It is generic over the Table being inserted into, and the database being used.

DeleteQueryReady is returned by db.delete()

Source code in src/embar/query/delete.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class DeleteQueryReady[T: Table, Db: AllDbBase]:
    """
    `DeleteQueryReady` is used to delete data from a table.

    It is generic over the `Table` being inserted into, and the database being used.

    `DeleteQueryReady` is returned by `db.delete()`
    """

    table: type[T]
    _db: Db

    _where_clause: WhereClause | None = None
    _order_clause: OrderBy | None = None
    _limit_value: int | None = None

    def __init__(self, table: type[T], db: Db):
        """
        Create a new SelectQueryReady instance.
        """
        self.table = table
        self._db = db

    def returning(self) -> DeleteQueryReturning[T, Db]:
        return DeleteQueryReturning(self.table, self._db, self._where_clause, self._order_clause, self._limit_value)

    def where(self, where_clause: WhereClause) -> Self:
        """
        Add a WHERE clause to the query.
        """
        self._where_clause = where_clause
        return self

    def order_by(self, *clauses: ColumnBase | Asc | Desc | Sql) -> Self:
        """
        Add an ORDER BY clause to sort query results.

        Accepts multiple ordering clauses:
        - Bare column references (defaults to ASC): `User.id`
        - `Asc(User.id)` or `Asc(User.id, nulls="last")`
        - `Desc(User.id)` or `Desc(User.id, nulls="first")`
        - Raw SQL: `Sql(t"{User.id} DESC")`

        Can be called multiple times to add more sort columns.

        ```python
        from embar.db.pg import PgDb
        from embar.table import Table
        from embar.column.common import Integer

        class User(Table):
            id: Integer = Integer(primary=True)

        db = PgDb(None)

        query = db.delete(User).order_by(User.id)
        sql_result = query.sql()
        assert "ORDER BY" in sql_result.sql
        ```
        """
        # Convert each clause to an OrderByClause
        order_clauses: list[OrderByClause] = []
        for clause in clauses:
            if isinstance(clause, (Asc, Desc)):
                order_clauses.append(clause)
            elif isinstance(clause, Sql):
                order_clauses.append(RawSqlOrder(clause))
            else:
                order_clauses.append(BareColumn(clause))

        if self._order_clause is None:
            self._order_clause = OrderBy(tuple(order_clauses))
        else:
            # Add to existing ORDER BY clauses
            self._order_clause = OrderBy((*self._order_clause.clauses, *order_clauses))

        return self

    def limit(self, n: int) -> Self:
        """
        Add a LIMIT clause to the query.
        """
        self._limit_value = n
        return self

    def __await__(self):
        """
        Async users should construct their query and await it.

        Non-async users have the `run()` convenience method below.
        But this method will still work if called in an async context against a non-async db.

        The overrides provide for a few different cases:
        - A Model was passed, in which case that's the return type
        - `SelectAll` was passed, in which case the return type is the `Table`
        - This is called with an async db, in which case an error is returned.
        """
        query = self.sql()

        async def awaitable():
            db = self._db
            if isinstance(db, AsyncDbBase):
                await db.execute(query)
            else:
                db = cast(DbBase, self._db)
                db.execute(query)

        return awaitable().__await__()

    @overload
    def run(self: DeleteQueryReady[T, DbBase]) -> None: ...
    @overload
    def run(self: DeleteQueryReady[T, AsyncDbBase]) -> DeleteQueryReady[T, Db]: ...

    def run(self) -> None | DeleteQueryReady[T, Db]:
        """
        Run the query against the underlying DB.

        Convenience method for those not using async.
        But still works if awaited.
        """
        if isinstance(self._db, DbBase):
            query = self.sql()
            self._db.execute(query)
        return self

    def sql(self) -> QuerySingle:
        """
        Combine all the components of the query and build the SQL and bind parameters (psycopg format).
        """

        sql = f"""
        DELETE
        FROM {self.table.fqn()}
        """
        sql = dedent(sql).strip()

        count = -1

        def get_count() -> int:
            nonlocal count
            count += 1
            return count

        params: dict[str, Any] = {}

        if self._where_clause is not None:
            where_data = self._where_clause.sql(get_count)
            sql += f"\nWHERE {where_data.sql}"
            params = {**params, **where_data.params}

        if self._order_clause is not None:
            order_by_sql = self._order_clause.sql()
            sql += f"\nORDER BY {order_by_sql}"

        if self._limit_value is not None:
            sql += f"\nLIMIT {self._limit_value}"

        sql = sql.strip()

        return QuerySingle(sql, params=params)

__await__()

Async users should construct their query and await it.

Non-async users have the run() convenience method below. But this method will still work if called in an async context against a non-async db.

The overrides provide for a few different cases: - A Model was passed, in which case that's the return type - SelectAll was passed, in which case the return type is the Table - This is called with an async db, in which case an error is returned.

Source code in src/embar/query/delete.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def __await__(self):
    """
    Async users should construct their query and await it.

    Non-async users have the `run()` convenience method below.
    But this method will still work if called in an async context against a non-async db.

    The overrides provide for a few different cases:
    - A Model was passed, in which case that's the return type
    - `SelectAll` was passed, in which case the return type is the `Table`
    - This is called with an async db, in which case an error is returned.
    """
    query = self.sql()

    async def awaitable():
        db = self._db
        if isinstance(db, AsyncDbBase):
            await db.execute(query)
        else:
            db = cast(DbBase, self._db)
            db.execute(query)

    return awaitable().__await__()

__init__(table, db)

Create a new SelectQueryReady instance.

Source code in src/embar/query/delete.py
37
38
39
40
41
42
def __init__(self, table: type[T], db: Db):
    """
    Create a new SelectQueryReady instance.
    """
    self.table = table
    self._db = db

limit(n)

Add a LIMIT clause to the query.

Source code in src/embar/query/delete.py
 99
100
101
102
103
104
def limit(self, n: int) -> Self:
    """
    Add a LIMIT clause to the query.
    """
    self._limit_value = n
    return self

order_by(*clauses)

Add an ORDER BY clause to sort query results.

Accepts multiple ordering clauses: - Bare column references (defaults to ASC): User.id - Asc(User.id) or Asc(User.id, nulls="last") - Desc(User.id) or Desc(User.id, nulls="first") - Raw SQL: Sql(t"{User.id} DESC")

Can be called multiple times to add more sort columns.

from embar.db.pg import PgDb
from embar.table import Table
from embar.column.common import Integer

class User(Table):
    id: Integer = Integer(primary=True)

db = PgDb(None)

query = db.delete(User).order_by(User.id)
sql_result = query.sql()
assert "ORDER BY" in sql_result.sql
Source code in src/embar/query/delete.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def order_by(self, *clauses: ColumnBase | Asc | Desc | Sql) -> Self:
    """
    Add an ORDER BY clause to sort query results.

    Accepts multiple ordering clauses:
    - Bare column references (defaults to ASC): `User.id`
    - `Asc(User.id)` or `Asc(User.id, nulls="last")`
    - `Desc(User.id)` or `Desc(User.id, nulls="first")`
    - Raw SQL: `Sql(t"{User.id} DESC")`

    Can be called multiple times to add more sort columns.

    ```python
    from embar.db.pg import PgDb
    from embar.table import Table
    from embar.column.common import Integer

    class User(Table):
        id: Integer = Integer(primary=True)

    db = PgDb(None)

    query = db.delete(User).order_by(User.id)
    sql_result = query.sql()
    assert "ORDER BY" in sql_result.sql
    ```
    """
    # Convert each clause to an OrderByClause
    order_clauses: list[OrderByClause] = []
    for clause in clauses:
        if isinstance(clause, (Asc, Desc)):
            order_clauses.append(clause)
        elif isinstance(clause, Sql):
            order_clauses.append(RawSqlOrder(clause))
        else:
            order_clauses.append(BareColumn(clause))

    if self._order_clause is None:
        self._order_clause = OrderBy(tuple(order_clauses))
    else:
        # Add to existing ORDER BY clauses
        self._order_clause = OrderBy((*self._order_clause.clauses, *order_clauses))

    return self

run()

run() -> None
run() -> DeleteQueryReady[T, Db]

Run the query against the underlying DB.

Convenience method for those not using async. But still works if awaited.

Source code in src/embar/query/delete.py
135
136
137
138
139
140
141
142
143
144
145
def run(self) -> None | DeleteQueryReady[T, Db]:
    """
    Run the query against the underlying DB.

    Convenience method for those not using async.
    But still works if awaited.
    """
    if isinstance(self._db, DbBase):
        query = self.sql()
        self._db.execute(query)
    return self

sql()

Combine all the components of the query and build the SQL and bind parameters (psycopg format).

Source code in src/embar/query/delete.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def sql(self) -> QuerySingle:
    """
    Combine all the components of the query and build the SQL and bind parameters (psycopg format).
    """

    sql = f"""
    DELETE
    FROM {self.table.fqn()}
    """
    sql = dedent(sql).strip()

    count = -1

    def get_count() -> int:
        nonlocal count
        count += 1
        return count

    params: dict[str, Any] = {}

    if self._where_clause is not None:
        where_data = self._where_clause.sql(get_count)
        sql += f"\nWHERE {where_data.sql}"
        params = {**params, **where_data.params}

    if self._order_clause is not None:
        order_by_sql = self._order_clause.sql()
        sql += f"\nORDER BY {order_by_sql}"

    if self._limit_value is not None:
        sql += f"\nLIMIT {self._limit_value}"

    sql = sql.strip()

    return QuerySingle(sql, params=params)

where(where_clause)

Add a WHERE clause to the query.

Source code in src/embar/query/delete.py
47
48
49
50
51
52
def where(self, where_clause: WhereClause) -> Self:
    """
    Add a WHERE clause to the query.
    """
    self._where_clause = where_clause
    return self

DeleteQueryReturning

DeleteQueryReturning is used to delete data from a table and return it.

Source code in src/embar/query/delete.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
class DeleteQueryReturning[T: Table, Db: AllDbBase]:
    """
    `DeleteQueryReturning` is used to delete data from a table and return it.
    """

    table: type[T]
    _db: Db

    _where_clause: WhereClause | None = None
    _order_clause: OrderBy | None = None
    _limit_value: int | None = None

    def __init__(
        self,
        table: type[T],
        db: Db,
        where_clause: WhereClause | None,
        order_clause: OrderBy | None,
        limit_value: int | None,
    ):
        """
        Create a new SelectQueryReady instance.
        """
        self.table = table
        self._db = db
        self._where_clause = where_clause
        self._order_clause = order_clause
        self._limit_value = limit_value

    def __await__(self) -> Generator[Any, None, list[T]]:
        """
        Async users should construct their query and await it.

        Non-async users have the `run()` convenience method below.
        But this method will still work if called in an async context against a non-async db.

        The overrides provide for a few different cases:
        - A Model was passed, in which case that's the return type
        - `SelectAll` was passed, in which case the return type is the `Table`
        - This is called with an async db, in which case an error is returned.
        """
        query = self.sql()
        model = self._get_model()
        model = cast(type[T], model)
        adapter = TypeAdapter(list[model])

        async def awaitable():
            db = self._db
            if isinstance(db, AsyncDbBase):
                data = await db.fetch(query)
            else:
                db = cast(DbBase, self._db)
                data = db.fetch(query)
            results = adapter.validate_python(data)
            return results

        return awaitable().__await__()

    @overload
    def run(self: DeleteQueryReady[T, DbBase]) -> list[T]: ...
    @overload
    def run(self: DeleteQueryReady[T, AsyncDbBase]) -> DeleteQueryReturning[T, Db]: ...

    def run(self) -> list[T] | DeleteQueryReturning[T, Db]:
        """
        Run the query against the underlying DB.

        Convenience method for those not using async.
        But still works if awaited.
        """
        if isinstance(self._db, DbBase):
            query = self.sql()
            model = self._get_model()
            model = cast(type[T], model)
            adapter = TypeAdapter(list[model])
            data = self._db.fetch(query)
            results = adapter.validate_python(data)
            return results
        return self

    def sql(self) -> QuerySingle:
        """
        Combine all the components of the query and build the SQL and bind parameters (psycopg format).
        """

        sql = f"""
        DELETE
        FROM {self.table.fqn()}
        """
        sql = dedent(sql).strip()

        count = -1

        def get_count() -> int:
            nonlocal count
            count += 1
            return count

        params: dict[str, Any] = {}

        if self._where_clause is not None:
            where_data = self._where_clause.sql(get_count)
            sql += f"\nWHERE {where_data.sql}"
            params = {**params, **where_data.params}

        if self._order_clause is not None:
            order_by_sql = self._order_clause.sql()
            sql += f"\nORDER BY {order_by_sql}"

        if self._limit_value is not None:
            sql += f"\nLIMIT {self._limit_value}"

        # This is the only difference vs the regular DeleteQueryReady sql
        sql += "\nRETURNING *"

        sql = sql.strip()

        return QuerySingle(sql, params=params)

    def _get_model(self) -> type[BaseModel]:
        """
        Generate the dataclass that will be used to deserialize (and validate) the query results.
        """
        model = generate_model(self.table)
        return model

__await__()

Async users should construct their query and await it.

Non-async users have the run() convenience method below. But this method will still work if called in an async context against a non-async db.

The overrides provide for a few different cases: - A Model was passed, in which case that's the return type - SelectAll was passed, in which case the return type is the Table - This is called with an async db, in which case an error is returned.

Source code in src/embar/query/delete.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def __await__(self) -> Generator[Any, None, list[T]]:
    """
    Async users should construct their query and await it.

    Non-async users have the `run()` convenience method below.
    But this method will still work if called in an async context against a non-async db.

    The overrides provide for a few different cases:
    - A Model was passed, in which case that's the return type
    - `SelectAll` was passed, in which case the return type is the `Table`
    - This is called with an async db, in which case an error is returned.
    """
    query = self.sql()
    model = self._get_model()
    model = cast(type[T], model)
    adapter = TypeAdapter(list[model])

    async def awaitable():
        db = self._db
        if isinstance(db, AsyncDbBase):
            data = await db.fetch(query)
        else:
            db = cast(DbBase, self._db)
            data = db.fetch(query)
        results = adapter.validate_python(data)
        return results

    return awaitable().__await__()

__init__(table, db, where_clause, order_clause, limit_value)

Create a new SelectQueryReady instance.

Source code in src/embar/query/delete.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def __init__(
    self,
    table: type[T],
    db: Db,
    where_clause: WhereClause | None,
    order_clause: OrderBy | None,
    limit_value: int | None,
):
    """
    Create a new SelectQueryReady instance.
    """
    self.table = table
    self._db = db
    self._where_clause = where_clause
    self._order_clause = order_clause
    self._limit_value = limit_value

run()

run() -> list[T]
run() -> DeleteQueryReturning[T, Db]

Run the query against the underlying DB.

Convenience method for those not using async. But still works if awaited.

Source code in src/embar/query/delete.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def run(self) -> list[T] | DeleteQueryReturning[T, Db]:
    """
    Run the query against the underlying DB.

    Convenience method for those not using async.
    But still works if awaited.
    """
    if isinstance(self._db, DbBase):
        query = self.sql()
        model = self._get_model()
        model = cast(type[T], model)
        adapter = TypeAdapter(list[model])
        data = self._db.fetch(query)
        results = adapter.validate_python(data)
        return results
    return self

sql()

Combine all the components of the query and build the SQL and bind parameters (psycopg format).

Source code in src/embar/query/delete.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def sql(self) -> QuerySingle:
    """
    Combine all the components of the query and build the SQL and bind parameters (psycopg format).
    """

    sql = f"""
    DELETE
    FROM {self.table.fqn()}
    """
    sql = dedent(sql).strip()

    count = -1

    def get_count() -> int:
        nonlocal count
        count += 1
        return count

    params: dict[str, Any] = {}

    if self._where_clause is not None:
        where_data = self._where_clause.sql(get_count)
        sql += f"\nWHERE {where_data.sql}"
        params = {**params, **where_data.params}

    if self._order_clause is not None:
        order_by_sql = self._order_clause.sql()
        sql += f"\nORDER BY {order_by_sql}"

    if self._limit_value is not None:
        sql += f"\nLIMIT {self._limit_value}"

    # This is the only difference vs the regular DeleteQueryReady sql
    sql += "\nRETURNING *"

    sql = sql.strip()

    return QuerySingle(sql, params=params)