git » sdk » commit 8147250

Only store one message/status at a time

author Stephen Paul Weber
2026-03-16 20:11:22 UTC
committer Stephen Paul Weber
2026-03-16 20:11:22 UTC
parent 5cfcc1495605d84fc31807d9516ea951a8ea95c8

Only store one message/status at a time

borogove/AsyncLock.hx +17 -0
borogove/persistence/Sqlite.hx +38 -36

diff --git a/borogove/AsyncLock.hx b/borogove/AsyncLock.hx
new file mode 100644
index 0000000..b7fd4e1
--- /dev/null
+++ b/borogove/AsyncLock.hx
@@ -0,0 +1,17 @@
+package borogove;
+
+import thenshim.Promise;
+
+class AsyncLock {
+	private var p: Promise<Any>;
+
+	public function new() {
+		p = Promise.resolve(null);
+	}
+
+	public function run<T>(fn: () -> Promise<T>): Promise<T> {
+		final next = p.then(_ -> fn());
+		p = next.then(_->{}, _->{}); // prevent chain break
+		return next;
+	}
+}
diff --git a/borogove/persistence/Sqlite.hx b/borogove/persistence/Sqlite.hx
index f5189da..24b492d 100644
--- a/borogove/persistence/Sqlite.hx
+++ b/borogove/persistence/Sqlite.hx
@@ -31,6 +31,7 @@ using Lambda;
 class Sqlite implements Persistence implements KeyValueStore {
 	final db: SqliteDriver;
 	final media: MediaStore;
+	final storeMessagesSerialized = new AsyncLock();
 
 	@:allow(borogove)
 	private static function prepare(q: { sql: String, ?params: Array<Dynamic> }): String {
@@ -384,27 +385,26 @@ class Sqlite implements Persistence implements KeyValueStore {
 			}
 		}
 
-		// Hmm, if there is an existing one this loses the original timestamp though
-		return db.exec(
-			"INSERT OR REPLACE INTO messages VALUES " + messages.map(_ -> "(?,?,?,?,?,?,?,?,CAST(unixepoch(?, 'subsec') * 1000 AS INTEGER),?,?,?,?,?)").join(","),
-			messages.flatMap(m -> {
-				final correctable = m;
-				final message = m.versions.length == 1 ? m.versions[0] : m; // TODO: storing multiple versions at once? We never do that right now
-				([
-					accountId, message.serverId ?? "", message.serverIdBy ?? "",
-					message.localId ?? "", correctable.callSid() ?? correctable.localId ?? correctable.serverId, correctable.syncPoint,
-					correctable.chatId(), correctable.senderId,
-					message.timestamp, message.status, message.direction, message.type,
-					message.asStanza().toString(), message.statusText
-				] : Array<Dynamic>);
-			})
-		).then(_ ->
-			hydrateReplyTo(accountId, messages, replyTos)
-		).then(ms ->
-			hydrateReactions(accountId, ms)
-		).then(hydrated ->
-			// We delay one more cycle to match updateMessageStatus
-			Promise.resolve(hydrated)
+		return storeMessagesSerialized.run(() ->
+			// Hmm, if there is an existing one this loses the original timestamp though
+			db.exec(
+				"INSERT OR REPLACE INTO messages VALUES " + messages.map(_ -> "(?,?,?,?,?,?,?,?,CAST(unixepoch(?, 'subsec') * 1000 AS INTEGER),?,?,?,?,?)").join(","),
+				messages.flatMap(m -> {
+					final correctable = m;
+					final message = m.versions.length == 1 ? m.versions[0] : m; // TODO: storing multiple versions at once? We never do that right now
+					([
+						accountId, message.serverId ?? "", message.serverIdBy ?? "",
+						message.localId ?? "", correctable.callSid() ?? correctable.localId ?? correctable.serverId, correctable.syncPoint,
+						correctable.chatId(), correctable.senderId,
+						message.timestamp, message.status, message.direction, message.type,
+						message.asStanza().toString(), message.statusText
+					] : Array<Dynamic>);
+				})
+			).then(_ ->
+				hydrateReplyTo(accountId, messages, replyTos)
+			).then(ms ->
+				hydrateReactions(accountId, ms)
+			)
 		);
 
 		// TODO: retract custom emoji?
@@ -554,21 +554,23 @@ class Sqlite implements Persistence implements KeyValueStore {
 
 	@HaxeCBridge.noemit
 	public function updateMessageStatus(accountId: String, localId: String, status: MessageStatus, statusText: Null<String>): Promise<ChatMessage> {
-		return db.exec(
-			"UPDATE messages SET status=?, status_text=? WHERE account_id=? AND stanza_id=? AND direction=? AND status <> ? AND status <> ? RETURNING stanza, direction, type, status, status_text, strftime('%FT%H:%M:%fZ', created_at / 1000.0, 'unixepoch') AS timestamp, sender_id, correction_id AS stanza_id, mam_id, mam_by, sync_point",
-			[status, statusText, accountId, localId, MessageSent, MessageDeliveredToDevice, MessageFailedToSend]
-		).then(result ->
-			thenshim.PromiseTools.all(hydrateMessages(accountId, result).map(message ->{
-				return (if (message.replyToMessage != null) {
-					hydrateReplyTo(accountId, [message], [{ chatId: message.chatId(), serverId: message.replyToMessage.serverId, localId: message.replyToMessage.localId }]);
-				} else {
-					Promise.resolve([message]);
-				}).then(messages -> hydrateReactions(accountId, messages));}
-			))
-		).then(hydrated -> {
-			final flat = hydrated.flatten();
-			return flat.length > 0 ? Promise.resolve(flat[0]) : Promise.reject("Message not found: " + localId);
-		});
+		return storeMessagesSerialized.run(() ->
+			db.exec(
+				"UPDATE messages SET status=?, status_text=? WHERE account_id=? AND stanza_id=? AND direction=? AND status <> ? AND status <> ? RETURNING stanza, direction, type, status, status_text, strftime('%FT%H:%M:%fZ', created_at / 1000.0, 'unixepoch') AS timestamp, sender_id, correction_id AS stanza_id, mam_id, mam_by, sync_point",
+				[status, statusText, accountId, localId, MessageSent, MessageDeliveredToDevice, MessageFailedToSend]
+			).then(result ->
+				thenshim.PromiseTools.all(hydrateMessages(accountId, result).map(message ->{
+					return (if (message.replyToMessage != null) {
+						hydrateReplyTo(accountId, [message], [{ chatId: message.chatId(), serverId: message.replyToMessage.serverId, localId: message.replyToMessage.localId }]);
+					} else {
+						Promise.resolve([message]);
+					}).then(messages -> hydrateReactions(accountId, messages));}
+				))
+			).then(hydrated -> {
+				final flat = hydrated.flatten();
+				return flat.length > 0 ? Promise.resolve(flat[0]) : Promise.reject("Message not found: " + localId);
+			})
+		);
 	}
 
 	@HaxeCBridge.noemit