| author | Stephen Paul Weber
<singpolyma@singpolyma.net> 2026-03-16 20:11:22 UTC |
| committer | Stephen Paul Weber
<singpolyma@singpolyma.net> 2026-03-16 20:11:22 UTC |
| parent | 5cfcc1495605d84fc31807d9516ea951a8ea95c8 |
| borogove/AsyncLock.hx | +17 | -0 |
| borogove/persistence/Sqlite.hx | +38 | -36 |
diff --git a/borogove/AsyncLock.hx b/borogove/AsyncLock.hx new file mode 100644 index 0000000..b7fd4e1 --- /dev/null +++ b/borogove/AsyncLock.hx @@ -0,0 +1,17 @@ +package borogove; + +import thenshim.Promise; + +class AsyncLock { + private var p: Promise<Any>; + + public function new() { + p = Promise.resolve(null); + } + + public function run<T>(fn: () -> Promise<T>): Promise<T> { + final next = p.then(_ -> fn()); + p = next.then(_->{}, _->{}); // prevent chain break + return next; + } +} diff --git a/borogove/persistence/Sqlite.hx b/borogove/persistence/Sqlite.hx index f5189da..24b492d 100644 --- a/borogove/persistence/Sqlite.hx +++ b/borogove/persistence/Sqlite.hx @@ -31,6 +31,7 @@ using Lambda; class Sqlite implements Persistence implements KeyValueStore { final db: SqliteDriver; final media: MediaStore; + final storeMessagesSerialized = new AsyncLock(); @:allow(borogove) private static function prepare(q: { sql: String, ?params: Array<Dynamic> }): String { @@ -384,27 +385,26 @@ class Sqlite implements Persistence implements KeyValueStore { } } - // Hmm, if there is an existing one this loses the original timestamp though - return db.exec( - "INSERT OR REPLACE INTO messages VALUES " + messages.map(_ -> "(?,?,?,?,?,?,?,?,CAST(unixepoch(?, 'subsec') * 1000 AS INTEGER),?,?,?,?,?)").join(","), - messages.flatMap(m -> { - final correctable = m; - final message = m.versions.length == 1 ? m.versions[0] : m; // TODO: storing multiple versions at once? We never do that right now - ([ - accountId, message.serverId ?? "", message.serverIdBy ?? "", - message.localId ?? "", correctable.callSid() ?? correctable.localId ?? correctable.serverId, correctable.syncPoint, - correctable.chatId(), correctable.senderId, - message.timestamp, message.status, message.direction, message.type, - message.asStanza().toString(), message.statusText - ] : Array<Dynamic>); - }) - ).then(_ -> - hydrateReplyTo(accountId, messages, replyTos) - ).then(ms -> - hydrateReactions(accountId, ms) - ).then(hydrated -> - // We delay one more cycle to match updateMessageStatus - Promise.resolve(hydrated) + return storeMessagesSerialized.run(() -> + // Hmm, if there is an existing one this loses the original timestamp though + db.exec( + "INSERT OR REPLACE INTO messages VALUES " + messages.map(_ -> "(?,?,?,?,?,?,?,?,CAST(unixepoch(?, 'subsec') * 1000 AS INTEGER),?,?,?,?,?)").join(","), + messages.flatMap(m -> { + final correctable = m; + final message = m.versions.length == 1 ? m.versions[0] : m; // TODO: storing multiple versions at once? We never do that right now + ([ + accountId, message.serverId ?? "", message.serverIdBy ?? "", + message.localId ?? "", correctable.callSid() ?? correctable.localId ?? correctable.serverId, correctable.syncPoint, + correctable.chatId(), correctable.senderId, + message.timestamp, message.status, message.direction, message.type, + message.asStanza().toString(), message.statusText + ] : Array<Dynamic>); + }) + ).then(_ -> + hydrateReplyTo(accountId, messages, replyTos) + ).then(ms -> + hydrateReactions(accountId, ms) + ) ); // TODO: retract custom emoji? @@ -554,21 +554,23 @@ class Sqlite implements Persistence implements KeyValueStore { @HaxeCBridge.noemit public function updateMessageStatus(accountId: String, localId: String, status: MessageStatus, statusText: Null<String>): Promise<ChatMessage> { - return db.exec( - "UPDATE messages SET status=?, status_text=? WHERE account_id=? AND stanza_id=? AND direction=? AND status <> ? AND status <> ? RETURNING stanza, direction, type, status, status_text, strftime('%FT%H:%M:%fZ', created_at / 1000.0, 'unixepoch') AS timestamp, sender_id, correction_id AS stanza_id, mam_id, mam_by, sync_point", - [status, statusText, accountId, localId, MessageSent, MessageDeliveredToDevice, MessageFailedToSend] - ).then(result -> - thenshim.PromiseTools.all(hydrateMessages(accountId, result).map(message ->{ - return (if (message.replyToMessage != null) { - hydrateReplyTo(accountId, [message], [{ chatId: message.chatId(), serverId: message.replyToMessage.serverId, localId: message.replyToMessage.localId }]); - } else { - Promise.resolve([message]); - }).then(messages -> hydrateReactions(accountId, messages));} - )) - ).then(hydrated -> { - final flat = hydrated.flatten(); - return flat.length > 0 ? Promise.resolve(flat[0]) : Promise.reject("Message not found: " + localId); - }); + return storeMessagesSerialized.run(() -> + db.exec( + "UPDATE messages SET status=?, status_text=? WHERE account_id=? AND stanza_id=? AND direction=? AND status <> ? AND status <> ? RETURNING stanza, direction, type, status, status_text, strftime('%FT%H:%M:%fZ', created_at / 1000.0, 'unixepoch') AS timestamp, sender_id, correction_id AS stanza_id, mam_id, mam_by, sync_point", + [status, statusText, accountId, localId, MessageSent, MessageDeliveredToDevice, MessageFailedToSend] + ).then(result -> + thenshim.PromiseTools.all(hydrateMessages(accountId, result).map(message ->{ + return (if (message.replyToMessage != null) { + hydrateReplyTo(accountId, [message], [{ chatId: message.chatId(), serverId: message.replyToMessage.serverId, localId: message.replyToMessage.localId }]); + } else { + Promise.resolve([message]); + }).then(messages -> hydrateReactions(accountId, messages));} + )) + ).then(hydrated -> { + final flat = hydrated.flatten(); + return flat.length > 0 ? Promise.resolve(flat[0]) : Promise.reject("Message not found: " + localId); + }) + ); } @HaxeCBridge.noemit