| author | Stephen Paul Weber
<singpolyma@singpolyma.net> 2025-06-24 18:33:54 UTC |
| committer | Stephen Paul Weber
<singpolyma@singpolyma.net> 2025-06-24 18:33:54 UTC |
| parent | 8f567dcb4edf8c79d0b6d20135fecf2cca94179b |
| snikket/persistence/Sqlite.hx | +90 | -84 |
| snikket/persistence/SqliteDriver.hx | +58 | -25 |
| snikket/persistence/SqliteDriver.js.hx | +2 | -1 |
diff --git a/snikket/persistence/Sqlite.hx b/snikket/persistence/Sqlite.hx index c483e3a..30f8405 100644 --- a/snikket/persistence/Sqlite.hx +++ b/snikket/persistence/Sqlite.hx @@ -35,90 +35,96 @@ class Sqlite implements Persistence implements KeyValueStore { public function new(dbfile: String, media: MediaStore) { this.media = media; media.setKV(this); - db = new SqliteDriver(dbfile); - db.exec("PRAGMA user_version;").then(iter -> { - final version = Std.parseInt(iter.next()?.user_version) ?? 0; - if (version < 1) { - db.exec(["CREATE TABLE messages ( - account_id TEXT NOT NULL, - mam_id TEXT NOT NULL, - mam_by TEXT NOT NULL, - stanza_id TEXT NOT NULL, - correction_id TEXT NOT NULL, - sync_point INTEGER NOT NULL, - chat_id TEXT NOT NULL, - sender_id TEXT NOT NULL, - created_at INTEGER NOT NULL, - status INTEGER NOT NULL, - direction INTEGER NOT NULL, - type INTEGER NOT NULL, - stanza TEXT NOT NULL, - PRIMARY KEY (account_id, mam_id, mam_by, stanza_id) - ) STRICT;", - "CREATE INDEX messages_created_at ON messages (account_id, chat_id, created_at);", - "CREATE INDEX messages_correction_id ON messages (correction_id);", - "CREATE TABLE chats ( - account_id TEXT NOT NULL, - chat_id TEXT NOT NULL, - trusted INTEGER NOT NULL, - avatar_sha1 BLOB, - fn TEXT, - ui_state INTEGER NOT NULL, - blocked INTEGER NOT NULL, - extensions TEXT, - read_up_to_id TEXT, - read_up_to_by TEXT, - caps_ver BLOB, - presence BLOB NOT NULL, - class TEXT NOT NULL, - PRIMARY KEY (account_id, chat_id) - ) STRICT;", - "CREATE TABLE keyvaluepairs ( - k TEXT NOT NULL PRIMARY KEY, - v TEXT NOT NULL - ) STRICT;", - "CREATE TABLE caps ( - sha1 BLOB NOT NULL PRIMARY KEY, - caps BLOB NOT NULL - ) STRICT;", - "CREATE TABLE services ( - account_id TEXT NOT NULL, - service_id TEXT NOT NULL, - name TEXT, - node TEXT, - caps BLOB NOT NULL, - PRIMARY KEY (account_id, service_id) - ) STRICT;", - "CREATE TABLE accounts ( - account_id TEXT NOT NULL, - client_id TEXT NOT NULL, - display_name TEXT, - token TEXT, - fast_count INTEGER NOT NULL DEFAULT 0, - sm_state BLOB, - PRIMARY KEY (account_id) - ) STRICT;", - "CREATE TABLE reactions ( - account_id TEXT NOT NULL, - update_id TEXT NOT NULL, - mam_id TEXT, - mam_by TEXT, - stanza_id TEXT, - chat_id TEXT NOT NULL, - sender_id TEXT NOT NULL, - created_at INTEGER NOT NULL, - reactions BLOB NOT NULL, - kind INTEGER NOT NULL, - PRIMARY KEY (account_id, chat_id, sender_id, update_id) - ) STRICT;", - "PRAGMA user_version = 1;"]); - } - if (version < 2) { - db.exec(["ALTER TABLE chats ADD COLUMN notifications_filtered INTEGER;", - "ALTER TABLE chats ADD COLUMN notify_mention INTEGER NOT NULL DEFAULT 0;", - "ALTER TABLE chats ADD COLUMN notify_reply INTEGER NOT NULL DEFAULT 0;", - "PRAGMA user_version = 2;"]); - } + db = new SqliteDriver(dbfile, (exec) -> { + exec(["PRAGMA user_version;"]).then(iter -> { + final version = Std.parseInt(iter.next()?.user_version) ?? 0; + return Promise.resolve(null).then(_ -> { + if (version < 1) { + return exec(["CREATE TABLE messages ( + account_id TEXT NOT NULL, + mam_id TEXT NOT NULL, + mam_by TEXT NOT NULL, + stanza_id TEXT NOT NULL, + correction_id TEXT NOT NULL, + sync_point INTEGER NOT NULL, + chat_id TEXT NOT NULL, + sender_id TEXT NOT NULL, + created_at INTEGER NOT NULL, + status INTEGER NOT NULL, + direction INTEGER NOT NULL, + type INTEGER NOT NULL, + stanza TEXT NOT NULL, + PRIMARY KEY (account_id, mam_id, mam_by, stanza_id) + ) STRICT;", + "CREATE INDEX messages_created_at ON messages (account_id, chat_id, created_at);", + "CREATE INDEX messages_correction_id ON messages (correction_id);", + "CREATE TABLE chats ( + account_id TEXT NOT NULL, + chat_id TEXT NOT NULL, + trusted INTEGER NOT NULL, + avatar_sha1 BLOB, + fn TEXT, + ui_state INTEGER NOT NULL, + blocked INTEGER NOT NULL, + extensions TEXT, + read_up_to_id TEXT, + read_up_to_by TEXT, + caps_ver BLOB, + presence BLOB NOT NULL, + class TEXT NOT NULL, + PRIMARY KEY (account_id, chat_id) + ) STRICT;", + "CREATE TABLE keyvaluepairs ( + k TEXT NOT NULL PRIMARY KEY, + v TEXT NOT NULL + ) STRICT;", + "CREATE TABLE caps ( + sha1 BLOB NOT NULL PRIMARY KEY, + caps BLOB NOT NULL + ) STRICT;", + "CREATE TABLE services ( + account_id TEXT NOT NULL, + service_id TEXT NOT NULL, + name TEXT, + node TEXT, + caps BLOB NOT NULL, + PRIMARY KEY (account_id, service_id) + ) STRICT;", + "CREATE TABLE accounts ( + account_id TEXT NOT NULL, + client_id TEXT NOT NULL, + display_name TEXT, + token TEXT, + fast_count INTEGER NOT NULL DEFAULT 0, + sm_state BLOB, + PRIMARY KEY (account_id) + ) STRICT;", + "CREATE TABLE reactions ( + account_id TEXT NOT NULL, + update_id TEXT NOT NULL, + mam_id TEXT, + mam_by TEXT, + stanza_id TEXT, + chat_id TEXT NOT NULL, + sender_id TEXT NOT NULL, + created_at INTEGER NOT NULL, + reactions BLOB NOT NULL, + kind INTEGER NOT NULL, + PRIMARY KEY (account_id, chat_id, sender_id, update_id) + ) STRICT;", + "PRAGMA user_version = 1;"]); + } + return Promise.resolve(null); + }).then(_ -> { + if (version < 2) { + return exec(["ALTER TABLE chats ADD COLUMN notifications_filtered INTEGER;", + "ALTER TABLE chats ADD COLUMN notify_mention INTEGER NOT NULL DEFAULT 0;", + "ALTER TABLE chats ADD COLUMN notify_reply INTEGER NOT NULL DEFAULT 0;", + "PRAGMA user_version = 2;"]); + } + return Promise.resolve(null); + }); + }); }); } diff --git a/snikket/persistence/SqliteDriver.hx b/snikket/persistence/SqliteDriver.hx index 4f4c7fe..b63c912 100644 --- a/snikket/persistence/SqliteDriver.hx +++ b/snikket/persistence/SqliteDriver.hx @@ -3,39 +3,72 @@ package snikket.persistence; import haxe.io.Bytes; import thenshim.Promise; import sys.db.Connection; +using Lambda; -// TODO: consider doing background threads for operations class SqliteDriver { - final db: Connection; + final dbs: sys.thread.Deque<Connection> = new sys.thread.Deque(); + private final writePool: sys.thread.IThreadPool = new sys.thread.FixedThreadPool(1); + private final readPool: sys.thread.IThreadPool; + private final dbfile: String; + private final ready: Promise<Bool>; + private var setReady: (Bool)->Void; + private var mainLoop: sys.thread.EventLoop; - public function new(dbfile: String) { - db = sys.db.Sqlite.open(dbfile); - db.request("PRAGMA journal_mode=WAL"); + public function new(dbfile: String, migrate: (Array<String>->Promise<haxe.iterators.ArrayIterator<Dynamic>>)->Promise<Any>) { + this.dbfile = dbfile; + readPool = Config.constrainedMemoryMode ? writePool : new sys.thread.ElasticThreadPool(10); + ready = new Promise((resolve, reject) -> setReady = resolve); + mainLoop = sys.thread.Thread.current().events; + + writePool.run(() -> { + final db = sys.db.Sqlite.open(dbfile); + db.request("PRAGMA journal_mode=WAL"); + dbs.push(db); + migrate((sql) -> this.execute(writePool, sql, [])).then(_ -> { + setReady(true); + }); + }); } - public function exec(sql: haxe.extern.EitherType<String, Array<String>>, ?params: Array<Dynamic>) { - var result = null; - final qs = if (Std.isOfType(sql, String)) { - [sql]; - } else { - cast (sql, Array<Dynamic>); - } - try { - for (q in qs) { - if (result == null) { - final prepared = prepare(q, params ?? []); - result = db.request(prepared); - } else { - db.request(q); + private function execute(pool: sys.thread.IThreadPool, qs: Array<String>, params: Array<Dynamic>) { + return new Promise((resolve, reject) -> { + pool.run(() -> { + var db = dbs.pop(false); + try { + if (db == null) { + db = sys.db.Sqlite.open(dbfile); + } + var result = null; + for (q in qs) { + if (result == null) { + final prepared = prepare(db, q, params); + result = db.request(prepared); + } else { + db.request(q); + } + } + // In testing, not copying to an array here caused BAD ACCESS sometimes + // Though from sqlite docs it seems like it should be safe? + final arr = { iterator: () -> result }.array(); + dbs.push(db); + mainLoop.run(() -> { resolve(arr.iterator()); }); + } catch (e) { + dbs.push(db); + mainLoop.run(() -> reject(e)); } - } - return Promise.resolve(result); - } catch (e) { - return Promise.reject(e); - } + }); + }); + } + + public function exec(sql: haxe.extern.EitherType<String, Array<String>>, ?params: Array<Dynamic>) { + return ready.then(_ -> { + final qs = Std.isOfType(sql, String) ? [sql] : sql; + final pool = StringTools.startsWith(qs[0], "SELECT") ? readPool : writePool; + return execute(pool, qs, params ?? []); + }); } - private function prepare(sql:String, params: Array<Dynamic>): String { + private function prepare(db: Connection, sql:String, params: Array<Dynamic>): String { return ~/\?/gm.map(sql, f -> { var p = params.shift(); return switch (Type.typeof(p)) { diff --git a/snikket/persistence/SqliteDriver.js.hx b/snikket/persistence/SqliteDriver.js.hx index 996ed76..5346302 100644 --- a/snikket/persistence/SqliteDriver.js.hx +++ b/snikket/persistence/SqliteDriver.js.hx @@ -14,7 +14,7 @@ class SqliteDriver { private var sqlite: Promiser; private var dbId: String; - public function new(dbfile: String) { + public function new(dbfile: String, migrate: (Array<String>->Promise<haxe.iterators.ArrayIterator<Dynamic>>)->Promise<Any>) { Worker1.v2({ worker: () -> new js.html.Worker( untyped new js.html.URL("sqlite-worker1.mjs", js.Syntax.code("import.meta.url")), @@ -25,6 +25,7 @@ class SqliteDriver { return sqlite("open", { filename: dbfile, vfs: "opfs-sahpool" }); }).then(openResult -> { dbId = openResult.dbId; + return migrate((sql) -> this.exec(sql)); }); }