Skip to content

sqlite

SQLite database client.

SqliteDb

Bases: DbBase

SQLite database client for synchronous operations.

Source code in src/embar/db/sqlite.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
@final
class SqliteDb(DbBase):
    """
    SQLite database client for synchronous operations.
    """

    db_type = "sqlite"
    conn: sqlite3.Connection
    _commit_after_execute: bool = True

    def __init__(self, connection: sqlite3.Connection):
        """
        Create a new SqliteDb instance.
        """
        self.conn = connection
        self.conn.row_factory = sqlite3.Row

    def close(self):
        """
        Close the database connection.
        """
        if self.conn:
            self.conn.close()

    def transaction(self) -> SqliteDbTransaction:
        """
        Start an isolated transaction.
        """
        db_copy = SqliteDb(self.conn)
        db_copy._commit_after_execute = False
        return SqliteDbTransaction(db_copy)

    def select[M: BaseModel](self, model: type[M]) -> SelectQuery[M, Self]:
        """
        Create a SELECT query.
        """
        return SelectQuery[M, Self](db=self, model=model)

    def select_distinct[M: BaseModel](self, model: type[M]) -> SelectDistinctQuery[M, Self]:
        """
        Create a SELECT query.
        """
        return SelectDistinctQuery[M, Self](db=self, model=model)

    def insert[T: Table](self, table: type[T]) -> InsertQuery[T, Self]:
        """
        Create an INSERT query.
        """
        return InsertQuery[T, Self](table=table, db=self)

    def update[T: Table](self, table: type[T]) -> UpdateQuery[T, Self]:
        """
        Create an UPDATE query.
        """
        return UpdateQuery[T, Self](table=table, db=self)

    def delete[T: Table](self, table: type[T]) -> DeleteQueryReady[T, Self]:
        """
        Create an UPDATE query.
        """
        return DeleteQueryReady[T, Self](table=table, db=self)

    def sql(self, template: Template) -> DbSql[Self]:
        """
        Execute a raw SQL query using template strings.
        """
        return DbSql(template, self)

    def migrate(self, tables: Sequence[type[Table]], enums: Sequence[type[EnumBase]] | None = None) -> Migration[Self]:
        """
        Create a migration from a list of tables.
        """
        ddls = merge_ddls(MigrationDefs(tables, enums))
        return Migration(ddls, self)

    def migrates(self, schema: types.ModuleType) -> Migration[Self]:
        """
        Create a migration from a schema module.
        """
        defs = get_migration_defs(schema)
        return self.migrate(defs.tables, defs.enums)

    @override
    def execute(self, query: QuerySingle) -> None:
        """
        Execute a query without returning results.
        """
        sql = _convert_params(query.sql)
        self.conn.execute(sql, query.params)
        if self._commit_after_execute:
            self.conn.commit()

    @override
    def executemany(self, query: QueryMany):
        """
        Execute a query with multiple parameter sets.
        """
        sql = _convert_params(query.sql)
        self.conn.executemany(sql, query.many_params)
        if self._commit_after_execute:
            self.conn.commit()

    @override
    def fetch(self, query: QuerySingle | QueryMany) -> list[dict[str, Any]]:
        """
        Fetch all rows returned by a SELECT query.

        sqlite returns json/arrays as string, so need to parse them.
        """
        sql = _convert_params(query.sql)
        if isinstance(query, QuerySingle):
            cur = self.conn.execute(sql, query.params)
        else:
            cur = self.conn.executemany(sql, query.many_params)

        if cur.description is None:
            return []

        results: list[dict[str, Any]] = []
        for row in cur.fetchall():
            row_dict = dict(row)
            for key, value in row_dict.items():
                if isinstance(value, str):
                    try:
                        row_dict[key] = json.loads(value)
                    except (json.JSONDecodeError, ValueError):
                        try:
                            row_dict[key] = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
                        except ValueError:
                            pass  # Keep as string
            results.append(row_dict)
        return results

    @override
    def truncate(self, schema: str | None = None):
        """
        Truncate all tables in the database.
        """
        cursor = self.conn.cursor()
        tables = self._get_live_table_names()
        for (table_name,) in tables:
            cursor.execute(f"DELETE FROM {table_name}")
        if self._commit_after_execute:
            self.conn.commit()

    @override
    def drop_tables(self, schema: str | None = None):
        """
        Drop all tables in the database.
        """
        cursor = self.conn.cursor()
        tables = self._get_live_table_names()
        for (table_name,) in tables:
            cursor.execute(f"DROP TABLE {table_name}")
        if self._commit_after_execute:
            self.conn.commit()

    def _get_live_table_names(self) -> list[str]:
        cursor = self.conn.cursor()
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
        tables: list[str] = cursor.fetchall()
        return tables

__init__(connection)

Create a new SqliteDb instance.

Source code in src/embar/db/sqlite.py
41
42
43
44
45
46
def __init__(self, connection: sqlite3.Connection):
    """
    Create a new SqliteDb instance.
    """
    self.conn = connection
    self.conn.row_factory = sqlite3.Row

close()

Close the database connection.

Source code in src/embar/db/sqlite.py
48
49
50
51
52
53
def close(self):
    """
    Close the database connection.
    """
    if self.conn:
        self.conn.close()

delete(table)

Create an UPDATE query.

Source code in src/embar/db/sqlite.py
87
88
89
90
91
def delete[T: Table](self, table: type[T]) -> DeleteQueryReady[T, Self]:
    """
    Create an UPDATE query.
    """
    return DeleteQueryReady[T, Self](table=table, db=self)

drop_tables(schema=None)

Drop all tables in the database.

Source code in src/embar/db/sqlite.py
176
177
178
179
180
181
182
183
184
185
186
@override
def drop_tables(self, schema: str | None = None):
    """
    Drop all tables in the database.
    """
    cursor = self.conn.cursor()
    tables = self._get_live_table_names()
    for (table_name,) in tables:
        cursor.execute(f"DROP TABLE {table_name}")
    if self._commit_after_execute:
        self.conn.commit()

execute(query)

Execute a query without returning results.

Source code in src/embar/db/sqlite.py
113
114
115
116
117
118
119
120
121
@override
def execute(self, query: QuerySingle) -> None:
    """
    Execute a query without returning results.
    """
    sql = _convert_params(query.sql)
    self.conn.execute(sql, query.params)
    if self._commit_after_execute:
        self.conn.commit()

executemany(query)

Execute a query with multiple parameter sets.

Source code in src/embar/db/sqlite.py
123
124
125
126
127
128
129
130
131
@override
def executemany(self, query: QueryMany):
    """
    Execute a query with multiple parameter sets.
    """
    sql = _convert_params(query.sql)
    self.conn.executemany(sql, query.many_params)
    if self._commit_after_execute:
        self.conn.commit()

fetch(query)

Fetch all rows returned by a SELECT query.

sqlite returns json/arrays as string, so need to parse them.

Source code in src/embar/db/sqlite.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
@override
def fetch(self, query: QuerySingle | QueryMany) -> list[dict[str, Any]]:
    """
    Fetch all rows returned by a SELECT query.

    sqlite returns json/arrays as string, so need to parse them.
    """
    sql = _convert_params(query.sql)
    if isinstance(query, QuerySingle):
        cur = self.conn.execute(sql, query.params)
    else:
        cur = self.conn.executemany(sql, query.many_params)

    if cur.description is None:
        return []

    results: list[dict[str, Any]] = []
    for row in cur.fetchall():
        row_dict = dict(row)
        for key, value in row_dict.items():
            if isinstance(value, str):
                try:
                    row_dict[key] = json.loads(value)
                except (json.JSONDecodeError, ValueError):
                    try:
                        row_dict[key] = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
                    except ValueError:
                        pass  # Keep as string
        results.append(row_dict)
    return results

insert(table)

Create an INSERT query.

Source code in src/embar/db/sqlite.py
75
76
77
78
79
def insert[T: Table](self, table: type[T]) -> InsertQuery[T, Self]:
    """
    Create an INSERT query.
    """
    return InsertQuery[T, Self](table=table, db=self)

migrate(tables, enums=None)

Create a migration from a list of tables.

Source code in src/embar/db/sqlite.py
 99
100
101
102
103
104
def migrate(self, tables: Sequence[type[Table]], enums: Sequence[type[EnumBase]] | None = None) -> Migration[Self]:
    """
    Create a migration from a list of tables.
    """
    ddls = merge_ddls(MigrationDefs(tables, enums))
    return Migration(ddls, self)

migrates(schema)

Create a migration from a schema module.

Source code in src/embar/db/sqlite.py
106
107
108
109
110
111
def migrates(self, schema: types.ModuleType) -> Migration[Self]:
    """
    Create a migration from a schema module.
    """
    defs = get_migration_defs(schema)
    return self.migrate(defs.tables, defs.enums)

select(model)

Create a SELECT query.

Source code in src/embar/db/sqlite.py
63
64
65
66
67
def select[M: BaseModel](self, model: type[M]) -> SelectQuery[M, Self]:
    """
    Create a SELECT query.
    """
    return SelectQuery[M, Self](db=self, model=model)

select_distinct(model)

Create a SELECT query.

Source code in src/embar/db/sqlite.py
69
70
71
72
73
def select_distinct[M: BaseModel](self, model: type[M]) -> SelectDistinctQuery[M, Self]:
    """
    Create a SELECT query.
    """
    return SelectDistinctQuery[M, Self](db=self, model=model)

sql(template)

Execute a raw SQL query using template strings.

Source code in src/embar/db/sqlite.py
93
94
95
96
97
def sql(self, template: Template) -> DbSql[Self]:
    """
    Execute a raw SQL query using template strings.
    """
    return DbSql(template, self)

transaction()

Start an isolated transaction.

Source code in src/embar/db/sqlite.py
55
56
57
58
59
60
61
def transaction(self) -> SqliteDbTransaction:
    """
    Start an isolated transaction.
    """
    db_copy = SqliteDb(self.conn)
    db_copy._commit_after_execute = False
    return SqliteDbTransaction(db_copy)

truncate(schema=None)

Truncate all tables in the database.

Source code in src/embar/db/sqlite.py
164
165
166
167
168
169
170
171
172
173
174
@override
def truncate(self, schema: str | None = None):
    """
    Truncate all tables in the database.
    """
    cursor = self.conn.cursor()
    tables = self._get_live_table_names()
    for (table_name,) in tables:
        cursor.execute(f"DELETE FROM {table_name}")
    if self._commit_after_execute:
        self.conn.commit()

update(table)

Create an UPDATE query.

Source code in src/embar/db/sqlite.py
81
82
83
84
85
def update[T: Table](self, table: type[T]) -> UpdateQuery[T, Self]:
    """
    Create an UPDATE query.
    """
    return UpdateQuery[T, Self](table=table, db=self)

SqliteDbTransaction

Transaction context manager for SqliteDb

Source code in src/embar/db/sqlite.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
class SqliteDbTransaction:
    """
    Transaction context manager for SqliteDb
    """

    _db: SqliteDb

    def __init__(self, db: SqliteDb):
        self._db = db

    def __enter__(self) -> SqliteDb:
        self._db.conn.execute("BEGIN")
        return self._db

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: types.TracebackType | None,
    ) -> bool:
        if exc_type is None:
            self._db.conn.commit()
        else:
            self._db.conn.rollback()
        return False