git » sdk » commit b53d6e2

Native Sqlite in background threads

author Stephen Paul Weber
2025-06-24 18:33:54 UTC
committer Stephen Paul Weber
2025-06-24 18:33:54 UTC
parent 8f567dcb4edf8c79d0b6d20135fecf2cca94179b

Native Sqlite in background threads

Run migrations before anything else gets to run.

snikket/persistence/Sqlite.hx +90 -84
snikket/persistence/SqliteDriver.hx +58 -25
snikket/persistence/SqliteDriver.js.hx +2 -1

diff --git a/snikket/persistence/Sqlite.hx b/snikket/persistence/Sqlite.hx
index c483e3a..30f8405 100644
--- a/snikket/persistence/Sqlite.hx
+++ b/snikket/persistence/Sqlite.hx
@@ -35,90 +35,96 @@ class Sqlite implements Persistence implements KeyValueStore {
 	public function new(dbfile: String, media: MediaStore) {
 		this.media = media;
 		media.setKV(this);
-		db = new SqliteDriver(dbfile);
-		db.exec("PRAGMA user_version;").then(iter -> {
-			final version = Std.parseInt(iter.next()?.user_version) ?? 0;
-			if (version < 1) {
-				db.exec(["CREATE TABLE messages (
-					account_id TEXT NOT NULL,
-					mam_id TEXT NOT NULL,
-					mam_by TEXT NOT NULL,
-					stanza_id TEXT NOT NULL,
-					correction_id TEXT NOT NULL,
-					sync_point INTEGER NOT NULL,
-					chat_id TEXT NOT NULL,
-					sender_id TEXT NOT NULL,
-					created_at INTEGER NOT NULL,
-					status INTEGER NOT NULL,
-					direction INTEGER NOT NULL,
-					type INTEGER NOT NULL,
-					stanza TEXT NOT NULL,
-					PRIMARY KEY (account_id, mam_id, mam_by, stanza_id)
-				) STRICT;",
-				"CREATE INDEX messages_created_at ON messages (account_id, chat_id, created_at);",
-				"CREATE INDEX messages_correction_id ON messages (correction_id);",
-				"CREATE TABLE chats (
-					account_id TEXT NOT NULL,
-					chat_id TEXT NOT NULL,
-					trusted INTEGER NOT NULL,
-					avatar_sha1 BLOB,
-					fn TEXT,
-					ui_state INTEGER NOT NULL,
-					blocked INTEGER NOT NULL,
-					extensions TEXT,
-					read_up_to_id TEXT,
-					read_up_to_by TEXT,
-					caps_ver BLOB,
-					presence BLOB NOT NULL,
-					class TEXT NOT NULL,
-					PRIMARY KEY (account_id, chat_id)
-				) STRICT;",
-				"CREATE TABLE keyvaluepairs (
-					k TEXT NOT NULL PRIMARY KEY,
-					v TEXT NOT NULL
-				) STRICT;",
-				"CREATE TABLE caps (
-					sha1 BLOB NOT NULL PRIMARY KEY,
-					caps BLOB NOT NULL
-				) STRICT;",
-				"CREATE TABLE services (
-					account_id TEXT NOT NULL,
-					service_id TEXT NOT NULL,
-					name TEXT,
-					node TEXT,
-					caps BLOB NOT NULL,
-					PRIMARY KEY (account_id, service_id)
-				) STRICT;",
-				"CREATE TABLE accounts (
-					account_id TEXT NOT NULL,
-					client_id TEXT NOT NULL,
-					display_name TEXT,
-					token TEXT,
-					fast_count INTEGER NOT NULL DEFAULT 0,
-					sm_state BLOB,
-					PRIMARY KEY (account_id)
-				) STRICT;",
-				"CREATE TABLE reactions (
-					account_id TEXT NOT NULL,
-					update_id TEXT NOT NULL,
-					mam_id TEXT,
-					mam_by TEXT,
-					stanza_id TEXT,
-					chat_id TEXT NOT NULL,
-					sender_id TEXT NOT NULL,
-					created_at INTEGER NOT NULL,
-					reactions BLOB NOT NULL,
-					kind INTEGER NOT NULL,
-					PRIMARY KEY (account_id, chat_id, sender_id, update_id)
-				) STRICT;",
-				"PRAGMA user_version = 1;"]);
-			}
-			if (version < 2) {
-				db.exec(["ALTER TABLE chats ADD COLUMN notifications_filtered INTEGER;",
-				"ALTER TABLE chats ADD COLUMN notify_mention INTEGER NOT NULL DEFAULT 0;",
-				"ALTER TABLE chats ADD COLUMN notify_reply INTEGER NOT NULL DEFAULT 0;",
-				"PRAGMA user_version = 2;"]);
-			}
+		db = new SqliteDriver(dbfile, (exec) -> {
+			exec(["PRAGMA user_version;"]).then(iter -> {
+				final version = Std.parseInt(iter.next()?.user_version) ?? 0;
+				return Promise.resolve(null).then(_ -> {
+					if (version < 1) {
+						return exec(["CREATE TABLE messages (
+							account_id TEXT NOT NULL,
+							mam_id TEXT NOT NULL,
+							mam_by TEXT NOT NULL,
+							stanza_id TEXT NOT NULL,
+							correction_id TEXT NOT NULL,
+							sync_point INTEGER NOT NULL,
+							chat_id TEXT NOT NULL,
+							sender_id TEXT NOT NULL,
+							created_at INTEGER NOT NULL,
+							status INTEGER NOT NULL,
+							direction INTEGER NOT NULL,
+							type INTEGER NOT NULL,
+							stanza TEXT NOT NULL,
+							PRIMARY KEY (account_id, mam_id, mam_by, stanza_id)
+						) STRICT;",
+						"CREATE INDEX messages_created_at ON messages (account_id, chat_id, created_at);",
+						"CREATE INDEX messages_correction_id ON messages (correction_id);",
+						"CREATE TABLE chats (
+							account_id TEXT NOT NULL,
+							chat_id TEXT NOT NULL,
+							trusted INTEGER NOT NULL,
+							avatar_sha1 BLOB,
+							fn TEXT,
+							ui_state INTEGER NOT NULL,
+							blocked INTEGER NOT NULL,
+							extensions TEXT,
+							read_up_to_id TEXT,
+							read_up_to_by TEXT,
+							caps_ver BLOB,
+							presence BLOB NOT NULL,
+							class TEXT NOT NULL,
+							PRIMARY KEY (account_id, chat_id)
+						) STRICT;",
+						"CREATE TABLE keyvaluepairs (
+							k TEXT NOT NULL PRIMARY KEY,
+							v TEXT NOT NULL
+						) STRICT;",
+						"CREATE TABLE caps (
+							sha1 BLOB NOT NULL PRIMARY KEY,
+							caps BLOB NOT NULL
+						) STRICT;",
+						"CREATE TABLE services (
+							account_id TEXT NOT NULL,
+							service_id TEXT NOT NULL,
+							name TEXT,
+							node TEXT,
+							caps BLOB NOT NULL,
+							PRIMARY KEY (account_id, service_id)
+						) STRICT;",
+						"CREATE TABLE accounts (
+							account_id TEXT NOT NULL,
+							client_id TEXT NOT NULL,
+							display_name TEXT,
+							token TEXT,
+							fast_count INTEGER NOT NULL DEFAULT 0,
+							sm_state BLOB,
+							PRIMARY KEY (account_id)
+						) STRICT;",
+						"CREATE TABLE reactions (
+							account_id TEXT NOT NULL,
+							update_id TEXT NOT NULL,
+							mam_id TEXT,
+							mam_by TEXT,
+							stanza_id TEXT,
+							chat_id TEXT NOT NULL,
+							sender_id TEXT NOT NULL,
+							created_at INTEGER NOT NULL,
+							reactions BLOB NOT NULL,
+							kind INTEGER NOT NULL,
+							PRIMARY KEY (account_id, chat_id, sender_id, update_id)
+						) STRICT;",
+						"PRAGMA user_version = 1;"]);
+					}
+					return Promise.resolve(null);
+				}).then(_ -> {
+					if (version < 2) {
+						return exec(["ALTER TABLE chats ADD COLUMN notifications_filtered INTEGER;",
+						"ALTER TABLE chats ADD COLUMN notify_mention INTEGER NOT NULL DEFAULT 0;",
+						"ALTER TABLE chats ADD COLUMN notify_reply INTEGER NOT NULL DEFAULT 0;",
+						"PRAGMA user_version = 2;"]);
+					}
+					return Promise.resolve(null);
+				});
+			});
 		});
 	}
 
diff --git a/snikket/persistence/SqliteDriver.hx b/snikket/persistence/SqliteDriver.hx
index 4f4c7fe..b63c912 100644
--- a/snikket/persistence/SqliteDriver.hx
+++ b/snikket/persistence/SqliteDriver.hx
@@ -3,39 +3,72 @@ package snikket.persistence;
 import haxe.io.Bytes;
 import thenshim.Promise;
 import sys.db.Connection;
+using Lambda;
 
-// TODO: consider doing background threads for operations
 class SqliteDriver {
-	final db: Connection;
+	final dbs: sys.thread.Deque<Connection> = new sys.thread.Deque();
+	private final writePool: sys.thread.IThreadPool = new sys.thread.FixedThreadPool(1);
+	private final readPool: sys.thread.IThreadPool;
+	private final dbfile: String;
+	private final ready: Promise<Bool>;
+	private var setReady: (Bool)->Void;
+	private var mainLoop: sys.thread.EventLoop;
 
-	public function new(dbfile: String) {
-		db = sys.db.Sqlite.open(dbfile);
-		db.request("PRAGMA journal_mode=WAL");
+	public function new(dbfile: String, migrate: (Array<String>->Promise<haxe.iterators.ArrayIterator<Dynamic>>)->Promise<Any>) {
+		this.dbfile = dbfile;
+		readPool = Config.constrainedMemoryMode ? writePool : new sys.thread.ElasticThreadPool(10);
+		ready = new Promise((resolve, reject) -> setReady = resolve);
+		mainLoop = sys.thread.Thread.current().events;
+
+		writePool.run(() -> {
+			final db = sys.db.Sqlite.open(dbfile);
+			db.request("PRAGMA journal_mode=WAL");
+			dbs.push(db);
+			migrate((sql) -> this.execute(writePool, sql, [])).then(_ -> {
+				setReady(true);
+			});
+		});
 	}
 
-	public function exec(sql: haxe.extern.EitherType<String, Array<String>>, ?params: Array<Dynamic>) {
-		var result = null;
-		final qs = if (Std.isOfType(sql, String)) {
-			[sql];
-		} else {
-			cast (sql, Array<Dynamic>);
-		}
-		try {
-			for (q in qs) {
-				if (result == null) {
-					final prepared = prepare(q, params ?? []);
-					result = db.request(prepared);
-				} else {
-					db.request(q);
+	private function execute(pool: sys.thread.IThreadPool, qs: Array<String>, params: Array<Dynamic>) {
+		return new Promise((resolve, reject) -> {
+			pool.run(() -> {
+				var db = dbs.pop(false);
+				try {
+					if (db == null) {
+						db = sys.db.Sqlite.open(dbfile);
+					}
+					var result = null;
+					for (q in qs) {
+						if (result == null) {
+							final prepared = prepare(db, q, params);
+							result = db.request(prepared);
+						} else {
+							db.request(q);
+						}
+					}
+					// In testing, not copying to an array here caused BAD ACCESS sometimes
+					// Though from sqlite docs it seems like it should be safe?
+					final arr = { iterator: () -> result }.array();
+					dbs.push(db);
+					mainLoop.run(() -> { resolve(arr.iterator()); });
+				} catch (e) {
+					dbs.push(db);
+					mainLoop.run(() -> reject(e));
 				}
-			}
-			return Promise.resolve(result);
-		} catch (e) {
-			return Promise.reject(e);
-		}
+			});
+		});
+	}
+
+	public function exec(sql: haxe.extern.EitherType<String, Array<String>>, ?params: Array<Dynamic>) {
+		return ready.then(_ -> {
+			final qs = Std.isOfType(sql, String) ? [sql] : sql;
+			final pool = StringTools.startsWith(qs[0], "SELECT") ? readPool : writePool;
+			return execute(pool, qs, params ?? []);
+		});
 	}
 
-	private function prepare(sql:String, params: Array<Dynamic>): String {
+	private function prepare(db: Connection, sql:String, params: Array<Dynamic>): String {
 		return ~/\?/gm.map(sql, f -> {
 			var p = params.shift();
 			return switch (Type.typeof(p)) {
diff --git a/snikket/persistence/SqliteDriver.js.hx b/snikket/persistence/SqliteDriver.js.hx
index 996ed76..5346302 100644
--- a/snikket/persistence/SqliteDriver.js.hx
+++ b/snikket/persistence/SqliteDriver.js.hx
@@ -14,7 +14,7 @@ class SqliteDriver {
 	private var sqlite: Promiser;
 	private var dbId: String;
 
-	public function new(dbfile: String) {
+	public function new(dbfile: String, migrate: (Array<String>->Promise<haxe.iterators.ArrayIterator<Dynamic>>)->Promise<Any>) {
 		Worker1.v2({
 			worker: () -> new js.html.Worker(
 				untyped new js.html.URL("sqlite-worker1.mjs", js.Syntax.code("import.meta.url")),
@@ -25,6 +25,7 @@ class SqliteDriver {
 			return sqlite("open", { filename: dbfile, vfs: "opfs-sahpool" });
 		}).then(openResult -> {
 			dbId = openResult.dbId;
+			return migrate((sql) -> this.exec(sql));
 		});
 	}