Skip to content

sql_db

Raw SQL query execution with optional result parsing.

DbSql

Used to run raw SQL queries.

Source code in src/embar/sql_db.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class DbSql[Db: AllDbBase]:
    """
    Used to run raw SQL queries.
    """

    _sql: Sql
    _db: Db

    def __init__(self, template: Template, db: Db):
        """
        Create a new DbSql instance.
        """
        self._sql = Sql(template)
        self._db = db

    def model[M: BaseModel](self, model: type[M]) -> DbSqlReturning[M, Db]:
        """
        Specify a model for parsing results.
        """
        return DbSqlReturning(self._sql, model, self._db)

    def sql(self) -> str:
        return self._sql.sql()

    def __await__(self):
        """
        Run the query asynchronously without returning results.
        """
        sql = self._sql.sql()
        query = QuerySingle(sql)

        async def awaitable():
            db = self._db

            if isinstance(db, AsyncDbBase):
                await db.execute(query)
            else:
                db = cast(DbBase, self._db)
                db.execute(query)

        return awaitable().__await__()

    @overload
    def run(self: DbSql[DbBase]) -> None: ...
    @overload
    def run(self: DbSql[AsyncDbBase]) -> DbSql[Db]: ...

    def run(self) -> None | DbSql[Db]:
        """
        Run the query synchronously without returning results.
        """
        if isinstance(self._db, DbBase):
            sql = self._sql.sql()
            query = QuerySingle(sql)
            self._db.execute(query)
        return self

__await__()

Run the query asynchronously without returning results.

Source code in src/embar/sql_db.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def __await__(self):
    """
    Run the query asynchronously without returning results.
    """
    sql = self._sql.sql()
    query = QuerySingle(sql)

    async def awaitable():
        db = self._db

        if isinstance(db, AsyncDbBase):
            await db.execute(query)
        else:
            db = cast(DbBase, self._db)
            db.execute(query)

    return awaitable().__await__()

__init__(template, db)

Create a new DbSql instance.

Source code in src/embar/sql_db.py
23
24
25
26
27
28
def __init__(self, template: Template, db: Db):
    """
    Create a new DbSql instance.
    """
    self._sql = Sql(template)
    self._db = db

model(model)

Specify a model for parsing results.

Source code in src/embar/sql_db.py
30
31
32
33
34
def model[M: BaseModel](self, model: type[M]) -> DbSqlReturning[M, Db]:
    """
    Specify a model for parsing results.
    """
    return DbSqlReturning(self._sql, model, self._db)

run()

run() -> None
run() -> DbSql[Db]

Run the query synchronously without returning results.

Source code in src/embar/sql_db.py
62
63
64
65
66
67
68
69
70
def run(self) -> None | DbSql[Db]:
    """
    Run the query synchronously without returning results.
    """
    if isinstance(self._db, DbBase):
        sql = self._sql.sql()
        query = QuerySingle(sql)
        self._db.execute(query)
    return self

DbSqlReturning

Used to run raw SQL queries and return a value.

Source code in src/embar/sql_db.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class DbSqlReturning[M: BaseModel, Db: AllDbBase]:
    """
    Used to run raw SQL queries and return a value.
    """

    _sql: Sql
    model: type[M]
    _db: Db

    def __init__(self, sql: Sql, model: type[M], db: Db):
        """
        Create a new DbSqlReturning instance.
        """
        self._sql = sql
        self.model = model
        self._db = db

    def sql(self) -> str:
        return self._sql.sql()

    def __await__(self) -> Generator[Any, None, Sequence[M]]:
        """
        Run the query asynchronously and return parsed results.
        """
        sql = self._sql.sql()
        query = QuerySingle(sql)
        model = self._get_model()
        adapter = TypeAdapter(list[model])

        async def awaitable():
            db = self._db

            if isinstance(db, AsyncDbBase):
                data = await db.fetch(query)
            else:
                db = cast(DbBase, self._db)
                data = db.fetch(query)
            results = adapter.validate_python(data)
            return results

        return awaitable().__await__()

    @overload
    def run(self: DbSqlReturning[M, DbBase]) -> Sequence[M]: ...
    @overload
    def run(self: DbSqlReturning[M, AsyncDbBase]) -> DbSqlReturning[M, Db]: ...

    def run(self) -> Sequence[M] | DbSqlReturning[M, Db]:
        """
        Run the query synchronously and return parsed results.
        """
        if isinstance(self._db, DbBase):
            sql = self._sql.sql()
            query = QuerySingle(sql)
            data = self._db.fetch(query)
            model = self._get_model()
            adapter = TypeAdapter(list[model])
            self.model.__init_subclass__()
            results = adapter.validate_python(data)
            return results
        return self

    def _get_model(self) -> type[M]:
        """
        Generate the dataclass that will be used to deserialize (and validate) the query results.
        """
        return upgrade_model_nested_fields(self.model)

__await__()

Run the query asynchronously and return parsed results.

Source code in src/embar/sql_db.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def __await__(self) -> Generator[Any, None, Sequence[M]]:
    """
    Run the query asynchronously and return parsed results.
    """
    sql = self._sql.sql()
    query = QuerySingle(sql)
    model = self._get_model()
    adapter = TypeAdapter(list[model])

    async def awaitable():
        db = self._db

        if isinstance(db, AsyncDbBase):
            data = await db.fetch(query)
        else:
            db = cast(DbBase, self._db)
            data = db.fetch(query)
        results = adapter.validate_python(data)
        return results

    return awaitable().__await__()

__init__(sql, model, db)

Create a new DbSqlReturning instance.

Source code in src/embar/sql_db.py
82
83
84
85
86
87
88
def __init__(self, sql: Sql, model: type[M], db: Db):
    """
    Create a new DbSqlReturning instance.
    """
    self._sql = sql
    self.model = model
    self._db = db

run()

run() -> Sequence[M]
run() -> DbSqlReturning[M, Db]

Run the query synchronously and return parsed results.

Source code in src/embar/sql_db.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def run(self) -> Sequence[M] | DbSqlReturning[M, Db]:
    """
    Run the query synchronously and return parsed results.
    """
    if isinstance(self._db, DbBase):
        sql = self._sql.sql()
        query = QuerySingle(sql)
        data = self._db.fetch(query)
        model = self._get_model()
        adapter = TypeAdapter(list[model])
        self.model.__init_subclass__()
        results = adapter.validate_python(data)
        return results
    return self