Skip to content

sql_db

Raw SQL query execution with optional result parsing.

DbSql

Used to run raw SQL queries.

Source code in src/embar/sql_db.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class DbSql[Db: AllDbBase]:
    """
    Used to run raw SQL queries.
    """

    _sql: Sql
    _db: Db

    def __init__(self, template: Template, db: Db):
        """
        Create a new DbSql instance.
        """
        self._sql = Sql(template)
        self._db = db

    def model[M: DataModel](self, model: type[M]) -> DbSqlReturning[M, Db]:
        """
        Specify a model for parsing results.
        """
        return DbSqlReturning(self._sql, model, self._db)

    def sql(self) -> str:
        return self._sql.sql()

    def __await__(self):
        """
        Run the query asynchronously without returning results.
        """
        sql = self._sql.sql()
        query = QuerySingle(sql)

        async def awaitable():
            db = self._db

            if isinstance(db, AsyncDbBase):
                await db.execute(query)
            else:
                db = cast(DbBase, self._db)
                db.execute(query)

        return awaitable().__await__()

    def run(self) -> Self:
        """
        Run the query synchronously without returning results.

        Returns self so that `await db.sql(...).run()` works for async.
        For sync callers, the return value can be ignored.
        """
        if isinstance(self._db, DbBase):
            sql = self._sql.sql()
            query = QuerySingle(sql)
            self._db.execute(query)
        return self

__await__()

Run the query asynchronously without returning results.

Source code in src/embar/sql_db.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __await__(self):
    """
    Run the query asynchronously without returning results.
    """
    sql = self._sql.sql()
    query = QuerySingle(sql)

    async def awaitable():
        db = self._db

        if isinstance(db, AsyncDbBase):
            await db.execute(query)
        else:
            db = cast(DbBase, self._db)
            db.execute(query)

    return awaitable().__await__()

__init__(template, db)

Create a new DbSql instance.

Source code in src/embar/sql_db.py
21
22
23
24
25
26
def __init__(self, template: Template, db: Db):
    """
    Create a new DbSql instance.
    """
    self._sql = Sql(template)
    self._db = db

model(model)

Specify a model for parsing results.

Source code in src/embar/sql_db.py
28
29
30
31
32
def model[M: DataModel](self, model: type[M]) -> DbSqlReturning[M, Db]:
    """
    Specify a model for parsing results.
    """
    return DbSqlReturning(self._sql, model, self._db)

run()

Run the query synchronously without returning results.

Returns self so that await db.sql(...).run() works for async. For sync callers, the return value can be ignored.

Source code in src/embar/sql_db.py
55
56
57
58
59
60
61
62
63
64
65
66
def run(self) -> Self:
    """
    Run the query synchronously without returning results.

    Returns self so that `await db.sql(...).run()` works for async.
    For sync callers, the return value can be ignored.
    """
    if isinstance(self._db, DbBase):
        sql = self._sql.sql()
        query = QuerySingle(sql)
        self._db.execute(query)
    return self

DbSqlReturning

Used to run raw SQL queries and return a value.

Source code in src/embar/sql_db.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class DbSqlReturning[M: DataModel, Db: AllDbBase]:
    """
    Used to run raw SQL queries and return a value.
    """

    _sql: Sql
    model: type[M]
    _db: Db

    def __init__(self, sql: Sql, model: type[M], db: Db):
        """
        Create a new DbSqlReturning instance.
        """
        self._sql = sql
        self.model = model
        self._db = db

    def sql(self) -> str:
        return self._sql.sql()

    def __await__(self) -> Generator[Any, None, Sequence[M]]:
        """
        Run the query asynchronously and return parsed results.
        """
        sql = self._sql.sql()
        query = QuerySingle(sql)
        model = self._get_model()

        async def awaitable():
            db = self._db

            if isinstance(db, AsyncDbBase):
                data = await db.fetch(query)
            else:
                db = cast(DbBase, self._db)
                data = db.fetch(query)
            results = load_results(model, data)
            return results

        return awaitable().__await__()

    def run(self) -> Sequence[M]:
        """
        Run the query synchronously and return parsed results.

        For async, use `await query` instead.
        """
        sql = self._sql.sql()
        query = QuerySingle(sql)
        model = self._get_model()
        db = cast(DbBase, self._db)
        data = db.fetch(query)
        results = load_results(model, data)
        return results

    def _get_model(self) -> type[M]:
        """
        Generate the dataclass that will be used to deserialize (and validate) the query results.
        """
        use_pydantic = isinstance(self.model, type) and issubclass(self.model, BaseModel)
        return upgrade_model_nested_fields(self.model, use_pydantic=use_pydantic)

__await__()

Run the query asynchronously and return parsed results.

Source code in src/embar/sql_db.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def __await__(self) -> Generator[Any, None, Sequence[M]]:
    """
    Run the query asynchronously and return parsed results.
    """
    sql = self._sql.sql()
    query = QuerySingle(sql)
    model = self._get_model()

    async def awaitable():
        db = self._db

        if isinstance(db, AsyncDbBase):
            data = await db.fetch(query)
        else:
            db = cast(DbBase, self._db)
            data = db.fetch(query)
        results = load_results(model, data)
        return results

    return awaitable().__await__()

__init__(sql, model, db)

Create a new DbSqlReturning instance.

Source code in src/embar/sql_db.py
78
79
80
81
82
83
84
def __init__(self, sql: Sql, model: type[M], db: Db):
    """
    Create a new DbSqlReturning instance.
    """
    self._sql = sql
    self.model = model
    self._db = db

run()

Run the query synchronously and return parsed results.

For async, use await query instead.

Source code in src/embar/sql_db.py
110
111
112
113
114
115
116
117
118
119
120
121
122
def run(self) -> Sequence[M]:
    """
    Run the query synchronously and return parsed results.

    For async, use `await query` instead.
    """
    sql = self._sql.sql()
    query = QuerySingle(sql)
    model = self._get_model()
    db = cast(DbBase, self._db)
    data = db.fetch(query)
    results = load_results(model, data)
    return results