| author | Stephen Paul Weber
<singpolyma@singpolyma.net> 2026-03-24 18:17:04 UTC |
| committer | Stephen Paul Weber
<singpolyma@singpolyma.net> 2026-03-30 14:24:55 UTC |
| parent | 4d740c86d4a7c2ebda8962694381902e49344d91 |
| Makefile | +1 | -0 |
| borogove/AsyncLock.hx | +1 | -0 |
| borogove/Chat.hx | +54 | -36 |
| borogove/ChatMessage.hx | +5 | -0 |
| borogove/ChatMessageBuilder.hx | +4 | -0 |
| borogove/Client.hx | +29 | -10 |
| borogove/Message.hx | +1 | -1 |
| borogove/MessageSync.hx | +13 | -21 |
| borogove/Persistence.hx | +1 | -1 |
| borogove/Push.hx | +13 | -1 |
| borogove/calls/Session.hx | +9 | -8 |
| borogove/persistence/Dummy.hx | +1 | -1 |
| borogove/persistence/IDB.js | +247 | -105 |
| borogove/persistence/Sqlite.hx | +51 | -31 |
| borogove/persistence/SqliteDriver.hx | +3 | -0 |
| browserjs.hxml | +1 | -0 |
| cpp.hxml | +1 | -0 |
| nodejs.hxml | +1 | -0 |
| optional-sqlite.awk | +1 | -0 |
| test.hxml | +1 | -0 |
diff --git a/Makefile b/Makefile index 749a557..dda7c7a 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,7 @@ hx-build-dep: haxelib --quiet install hsluv haxelib --quiet install tink_http haxelib --quiet install uuidv7 + haxelib --quiet install fractional-indexing haxelib --quiet install thenshim haxelib --quiet install HtmlParser haxelib --quiet install hxnodejs diff --git a/borogove/AsyncLock.hx b/borogove/AsyncLock.hx index b7fd4e1..e9e4e36 100644 --- a/borogove/AsyncLock.hx +++ b/borogove/AsyncLock.hx @@ -2,6 +2,7 @@ package borogove; import thenshim.Promise; +@:expose class AsyncLock { private var p: Promise<Any>; diff --git a/borogove/Chat.hx b/borogove/Chat.hx index b4e8418..d539564 100644 --- a/borogove/Chat.hx +++ b/borogove/Chat.hx @@ -54,6 +54,8 @@ enum abstract EncryptionMode(Int) { var EncryptedOMEMO; // Use OMEMO } +final UUIDv7_PATTERN = ~/^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-7[0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$/; + @:expose #if cpp @:build(HaxeCBridge.expose()) @@ -98,6 +100,7 @@ abstract class Chat { @:allow(borogove) private var extensions: Stanza; private var _unreadCount = 0; + @:allow(borogove) private var readUpToId: Null<String>; @:allow(borogove) private var readUpToBy: Null<String>; @@ -886,8 +889,7 @@ abstract class Chat { @:allow(borogove) private function markReadUpToId(upTo: String, upToBy: String): Promise<Any> { if (upTo == null) return Promise.reject(null); - if (readUpToId == upTo) { - if (lastMessage != null && lastMessage.serverId == readUpToId) setUnreadCount(0); + if (readUpToId == upTo || (UUIDv7_PATTERN.match(readUpToId) && UUIDv7_PATTERN.match(upTo) && upTo < readUpToId)) { return Promise.reject(null); } @@ -904,12 +906,12 @@ abstract class Chat { return Promise.reject(null); } - if (readUpToId == null) { + if (readUpToId == null || (UUIDv7_PATTERN.match(readUpToId) && UUIDv7_PATTERN.match(message.serverId))) { return markReadUpToId(message.serverId, message.serverIdBy); } return readUpTo().then((readMessage) -> { - if (readMessage != null && Reflect.compare(message.timestamp, readMessage.timestamp) <= 0) { + if (readMessage != null && message.sortId <= readMessage.sortId) { return Promise.reject(null); } @@ -988,12 +990,12 @@ class DirectChat extends Chat { if (before != null && before.chatId() != chatId) throw "Cannot look before from a different chat"; return persistence.getMessagesBefore(client.accountId(), chatId, before).then((messages) -> - if (messages.length > 0) { + if (messages.length > 0 || (before != null && before.serverId == null)) { Promise.resolve(messages); } else { var filter:MAMQueryParams = { with: this.chatId }; if (before?.serverId != null) filter.page = { before: before.serverId }; - var sync = new MessageSync(this.client, this.stream, filter); + var sync = new MessageSync(this.client, this.stream, filter, null, before?.sortId); fetchFromSync(sync); } ); @@ -1007,12 +1009,12 @@ class DirectChat extends Chat { } return persistence.getMessagesAfter(client.accountId(), chatId, after).then((messages) -> - if (messages.length > 0) { + if (messages.length > 0 || (after != null && after.serverId == null)) { Promise.resolve(messages); } else { var filter:MAMQueryParams = { with: this.chatId }; if (after?.serverId != null) filter.page = { after: after.serverId }; - var sync = new MessageSync(this.client, this.stream, filter); + var sync = new MessageSync(this.client, this.stream, filter, after?.sortId, null); fetchFromSync(sync); } ); @@ -1029,6 +1031,7 @@ class DirectChat extends Chat { @:allow(borogove) private function prepareIncomingMessage(message:ChatMessageBuilder, stanza:Stanza) { message.syncPoint = !syncing(); + if (message.sortId == null) message.sortId = client.nextSortId(); return message; } @@ -1051,15 +1054,15 @@ class DirectChat extends Chat { message.versions = [message.build()]; // This is a correction message.localId = correct.localId; final outboxItem = outbox.newItem(); - client.storeMessages([message.build()]).then((corrected) -> { - message.versions = corrected[0].versions[corrected[0].versions.length - 1]?.localId == correct.localId ? cast corrected[0].versions : [message.build()]; + client.storeMessageBuilder(message).then((corrected) -> { + message.versions = corrected.versions[corrected.versions.length - 1]?.localId == correct.localId ? cast corrected.versions : [message.build()]; message.localId = toSendId; sendMessageStanza(message.build().asStanza(), outboxItem); - if (corrected[0].canReplace(lastMessage)) { - setLastMessage(corrected[0]); + if (corrected.canReplace(lastMessage)) { + setLastMessage(corrected); client.trigger("chats/update", [this]); } - client.notifyMessageHandlers(corrected[0], CorrectionEvent); + client.notifyMessageHandlers(corrected, CorrectionEvent); }); } @@ -1074,7 +1077,7 @@ class DirectChat extends Chat { switch (fromStanza) { case ChatMessageStanza(_): final outboxItem = outbox.newItem(); - client.storeMessages([message.build()]).then((stored) -> { + client.storeMessageBuilder(message).then((stored) -> { final stanza = message.build().asStanza(); if (isActive != null) { isActive = true; @@ -1082,8 +1085,8 @@ class DirectChat extends Chat { stanza.tag("active", { xmlns: "http://jabber.org/protocol/chatstates" }).up(); } sendMessageStanza(stanza, outboxItem); - setLastMessage(message.build()); - client.notifyMessageHandlers(stored[0], stored[0].versions.length > 1 ? CorrectionEvent : DeliveryEvent); + setLastMessage(stored); + client.notifyMessageHandlers(stored, stored.versions.length > 1 ? CorrectionEvent : DeliveryEvent); client.trigger("chats/update", [this]); }); case ReactionUpdateStanza(update): @@ -1266,6 +1269,7 @@ class Channel extends Chat { private var sync = null; private var forceLive = false; private var _nickInUse = null; + private var sortId = null; @:allow(borogove) private function new(client:Client, stream:GenericStream, persistence:Persistence, chatId:String, uiState = Open, isBookmarked = false, isBlocked = false, extensions = null, readUpToId = null, readUpToBy = null, ?disco: Caps) { @@ -1344,7 +1348,7 @@ class Channel extends Chat { return stanza; } ); - persistence.lastId(client.accountId(), chatId).then(doSync); + persistence.syncPoint(client.accountId(), chatId).then((point) -> doSync(point)); } private function selfPingSuccess() { @@ -1355,7 +1359,7 @@ class Channel extends Chat { // We did a self ping to see if we were in the room and found we are // But we may have missed messages if we were disconnected in the middle inSync = false; - persistence.lastId(client.accountId(), chatId).then(doSync); + persistence.syncPoint(client.accountId(), chatId).then(point -> doSync(point)); } override public function getDisplayName() { @@ -1442,7 +1446,7 @@ class Channel extends Chat { } } - private function doSync(lastId: Null<String>) { + private function doSync(syncPoint: Null<ChatMessage>, ?sortA: Null<String>) { if (!disco.features.contains("urn:xmpp:mam:2")) { inSync = true; return; @@ -1455,10 +1459,11 @@ class Channel extends Chat { sync = new MessageSync( client, stream, - lastId == null ? { startTime: threeDaysAgo } : { page: { after: lastId } }, + syncPoint == null ? { startTime: threeDaysAgo } : { page: { after: syncPoint.serverId } }, + sortA ?? syncPoint?.sortId, + null, chatId ); - sync.setNewestPageFirst(false); sync.addContext((builder, stanza) -> { builder = prepareIncomingMessage(builder, stanza); builder.syncPoint = true; @@ -1525,7 +1530,7 @@ class Channel extends Chat { dedupedMessages.sort((x, y) -> Reflect.compare(x.timestamp, y.timestamp)); final lastFromSync = dedupedMessages[dedupedMessages.length - 1]; - if (lastFromSync != null && (lastMessage?.timestamp == null || Reflect.compare(lastFromSync.timestamp, lastMessage?.timestamp) > 0)) { + if (lastFromSync != null && (lastMessage == null || lastFromSync.sortId > lastMessage.sortId)) { setLastMessage(lastFromSync); client.sortChats(); } @@ -1542,9 +1547,9 @@ class Channel extends Chat { }); sync.onError((stanza) -> { sync = null; - if (lastId != null) { + if (syncPoint != null) { // Gap in sync, out newest message has expired from server - doSync(null); + doSync(null, syncPoint.sortId); } else { trace("SYNC failed", chatId, stanza); } @@ -1621,6 +1626,11 @@ trace("XYZZY no MUC avatar locally matching so fetch vcard", chatId, avatarSha1H return !inSync || !livePresence(); } + override private function setLastMessage(message:Null<ChatMessage>) { + super.setLastMessage(message); + if (message != null && message.type == MessageChannel && (sortId == null || sortId < message.sortId)) sortId = message.sortId; + } + override public function canAudioCall():Bool { return disco?.features?.contains("urn:xmpp:jingle:apps:rtp:audio") ?? false; } @@ -1676,12 +1686,12 @@ trace("XYZZY no MUC avatar locally matching so fetch vcard", chatId, avatarSha1H if (before != null && before.chatId() != chatId) throw "Cannot look before from a different chat"; return persistence.getMessagesBefore(client.accountId(), chatId, before).then((messages) -> - if (messages.length > 0) { + if (messages.length > 0 || (before != null && before.serverId == null)) { Promise.resolve(messages); } else { var filter:MAMQueryParams = {}; if (before?.serverId != null) filter.page = { before: before.serverId }; - var sync = new MessageSync(this.client, this.stream, filter, chatId); + var sync = new MessageSync(this.client, this.stream, filter, null, before?.sortId, chatId); sync.addContext((builder, stanza) -> { builder = prepareIncomingMessage(builder, stanza); builder.syncPoint = false; @@ -1700,12 +1710,12 @@ trace("XYZZY no MUC avatar locally matching so fetch vcard", chatId, avatarSha1H } return persistence.getMessagesAfter(client.accountId(), chatId, after).then((messages) -> - if (messages.length > 0) { + if (messages.length > 0 || (after != null && after.serverId == null)) { Promise.resolve(messages); } else { var filter:MAMQueryParams = {}; if (after?.serverId != null) filter.page = { after: after.serverId }; - var sync = new MessageSync(this.client, this.stream, filter, chatId); + var sync = new MessageSync(this.client, this.stream, filter, after?.sortId, null, chatId); sync.addContext((builder, stanza) -> { builder = prepareIncomingMessage(builder, stanza); builder.syncPoint = false; @@ -1728,6 +1738,13 @@ trace("XYZZY no MUC avatar locally matching so fetch vcard", chatId, avatarSha1H private function prepareIncomingMessage(message:ChatMessageBuilder, stanza:Stanza) { message.syncPoint = !syncing(); if (message.type == MessageChat) message.type = MessageChannelPrivate; + if (message.sortId == null) { + if (sortId != null && message.type == MessageChannel) { + sortId = message.sortId = FractionalIndexing.between(sortId, null, FractionalIndexing.BASE_95_DIGITS); + } else { + message.sortId = client.nextSortId(); + } + } message.senderId = stanza.attr.get("from"); // MUC always needs full JIDs if (message.senderId == getFullJid().asString()) { message.recipients = message.replyTo; @@ -1746,6 +1763,7 @@ trace("XYZZY no MUC avatar locally matching so fetch vcard", chatId, avatarSha1H message.to = JID.parse(chatId); message.recipients = [message.to]; if (message.localId == null) message.localId = ID.unique(); + if (sortId != null && message.sortId == null) sortId = message.sortId = FractionalIndexing.between(sortId, null, FractionalIndexing.BASE_95_DIGITS); return message; } @@ -1756,13 +1774,13 @@ trace("XYZZY no MUC avatar locally matching so fetch vcard", chatId, avatarSha1H message.versions = [message.build()]; // This is a correction message.localId = correct.localId; final outboxItem = outbox.newItem(); - client.storeMessages([message.build()]).then((corrected) -> { - message.versions = corrected[0].versions[0]?.localId == correct.localId ? cast corrected[0].versions : [message.build()]; + client.storeMessageBuilder(message).then((corrected) -> { + message.versions = corrected.versions[0]?.localId == correct.localId ? cast corrected.versions : [message.build()]; message.localId = toSendId; sendMessageStanza(message.build().asStanza(), outboxItem); - client.notifyMessageHandlers(corrected[0], CorrectionEvent); - if (corrected[0].canReplace(lastMessage)) { - setLastMessage(corrected[0]); + client.notifyMessageHandlers(corrected, CorrectionEvent); + if (corrected.canReplace(lastMessage)) { + setLastMessage(corrected); client.trigger("chats/update", [this]); } }); @@ -1787,10 +1805,10 @@ trace("XYZZY no MUC avatar locally matching so fetch vcard", chatId, avatarSha1H stanza.tag("active", { xmlns: "http://jabber.org/protocol/chatstates" }).up(); } final outboxItem = outbox.newItem(); - client.storeMessages([message.build()]).then((stored) -> { + client.storeMessageBuilder(message).then((stored) -> { sendMessageStanza(stanza, outboxItem); - setLastMessage(stored[0]); - client.notifyMessageHandlers(stored[0], stored[0].versions.length > 1 ? CorrectionEvent : DeliveryEvent); + setLastMessage(stored); + client.notifyMessageHandlers(stored, stored.versions.length > 1 ? CorrectionEvent : DeliveryEvent); client.trigger("chats/update", [this]); }); case ReactionUpdateStanza(update): diff --git a/borogove/ChatMessage.hx b/borogove/ChatMessage.hx index f827ace..aa0d223 100644 --- a/borogove/ChatMessage.hx +++ b/borogove/ChatMessage.hx @@ -132,6 +132,9 @@ class ChatMessage { **/ public final type: MessageType; + @:allow(borogove) + private final sortId : Null<String>; + @:allow(borogove) private final syncPoint : Bool; @@ -250,6 +253,7 @@ class ChatMessage { ?localId: Null<String>, ?serverId: Null<String>, ?serverIdBy: Null<String>, + ?sortId: Null<String>, ?type: MessageType, ?syncPoint: Bool, ?replyId: Null<String>, @@ -277,6 +281,7 @@ class ChatMessage { this.localId = params.localId; this.serverId = params.serverId; this.serverIdBy = params.serverIdBy; + this.sortId = params.sortId; this.type = params.type ?? MessageChat; this.syncPoint = params.syncPoint ?? false; this.replyId = params.replyId; diff --git a/borogove/ChatMessageBuilder.hx b/borogove/ChatMessageBuilder.hx index 5b2e94d..d1bf7d5 100644 --- a/borogove/ChatMessageBuilder.hx +++ b/borogove/ChatMessageBuilder.hx @@ -45,6 +45,9 @@ class ChatMessageBuilder { **/ public var serverIdBy: Null<String> = null; + @:allow(borogove) + private var sortId: Null<String> = null; + /** The type of this message (Chat, Call, etc) **/ @@ -339,6 +342,7 @@ class ChatMessageBuilder { localId: localId, serverId: serverId, serverIdBy: serverIdBy, + sortId: sortId, type: type, syncPoint: syncPoint, replyId: replyId, diff --git a/borogove/Client.hx b/borogove/Client.hx index 64ee6f5..9483645 100644 --- a/borogove/Client.hx +++ b/borogove/Client.hx @@ -97,6 +97,7 @@ class Client extends EventEmitter { private var fastMechanism: Null<String> = null; private var token: Null<String> = null; private var fastCount: Null<Int> = null; + private var sortId: String = "a "; private final pendingCaps: Map<String, Array<(Null<Caps>)->Chat>> = []; private final brokenAvatars: Map<String, JID> = []; @:allow(borogove) @@ -706,7 +707,11 @@ class Client extends EventEmitter { // Do a big GC before starting a new client cpp.NativeGc.run(true); #end - return persistence.getLogin(accountId()).then(login -> { + return persistence.syncPoint(accountId(), null).then((point) -> { + if (point?.sortId != null) sortId = point.sortId; + + return persistence.getLogin(accountId()); + }).then(login -> { token = login.token; fastCount = login.fastCount; stream.clientId = login.clientId ?? ID.unique(); @@ -1439,7 +1444,19 @@ class Client extends EventEmitter { } @:allow(borogove) - private function storeMessages(messages: Array<ChatMessage>): Promise<Null<Array<ChatMessage>>> { + private function nextSortId() { + sortId = FractionalIndexing.between(sortId, null, FractionalIndexing.BASE_95_DIGITS); + return sortId; + } + + @:allow(borogove) + private function storeMessageBuilder(builder: ChatMessageBuilder): Promise<ChatMessage> { + if (builder.sortId == null) builder.sortId = nextSortId(); + return storeMessages([builder.build()]).then(result -> result[0]); + } + + @:allow(borogove) + private function storeMessages(messages: Array<ChatMessage>): Promise<Array<ChatMessage>> { return persistence.storeMessages(accountId(), messages); } @@ -1720,7 +1737,7 @@ class Client extends EventEmitter { if (Std.isOfType(persistence, borogove.persistence.Dummy)) { callback(true); // No reason to sync if we're not storing anyway } else { - persistence.lastId(accountId(), null).then((lastId) -> doSync(callback, lastId)); + persistence.syncPoint(accountId(), null).then((point) -> doSync(callback, point)); } } @@ -1739,16 +1756,17 @@ class Client extends EventEmitter { } #end - private function doSync(callback: Null<(Bool)->Void>, lastId: Null<String>) { + private function doSync(callback: Null<(Bool)->Void>, syncPoint: Null<ChatMessage>, ?sortA: Null<String>) { var thirtyDaysAgo = Date.format( DateTools.delta(std.Date.now(), DateTools.days(-30)) ); var sync = new MessageSync( this, stream, - lastId == null ? { startTime: thirtyDaysAgo } : { page: { after: lastId } } + syncPoint == null ? { startTime: thirtyDaysAgo } : { page: { after: syncPoint.serverId } }, + sortA ?? syncPoint?.sortId, + null ); - sync.setNewestPageFirst(false); sync.addContext((builder, stanza) -> { builder.syncPoint = true; return builder; @@ -1783,13 +1801,14 @@ class Client extends EventEmitter { // ignore } } - promises.push(persistence.storeMessages(accountId(), chatMessages)); + promises.push(storeMessages(chatMessages)); trace("SYNC: MAM page wait for writes"); thenshim.PromiseTools.all(promises).then((results) -> { for (messages in results) { if (messages != null) { for (message in messages) { this.trigger("message/sync", message); + sortId = message.sortId; } } } @@ -1816,9 +1835,9 @@ class Client extends EventEmitter { }); }); sync.onError((stanza) -> { - if (lastId != null) { - // Gap in sync, out newest message has expired from server - doSync(callback, null); + if (syncPoint != null) { + // Gap in sync, our newest message has expired from server + doSync(callback, null, syncPoint.sortId); } else { trace("SYNC: error", stanza); if (callback != null) callback(false); diff --git a/borogove/Message.hx b/borogove/Message.hx index 197d5ff..22827da 100644 --- a/borogove/Message.hx +++ b/borogove/Message.hx @@ -88,7 +88,7 @@ class Message { if (carbon == null) carbon = stanza.getChild("sent", "urn:xmpp:carbons:2"); if (carbon != null) { var fwd = carbon.getChild("forwarded", "urn:xmpp:forward:0"); - if(fwd != null) return fromStanza(fwd.getFirstChild(), localJid, null, encryptionInfo); + if(fwd != null) return fromStanza(fwd.getFirstChild(), localJid, addContext, encryptionInfo); } } diff --git a/borogove/MessageSync.hx b/borogove/MessageSync.hx index 63e4d53..60bfde8 100644 --- a/borogove/MessageSync.hx +++ b/borogove/MessageSync.hx @@ -32,16 +32,19 @@ class MessageSync { private var handler:MessageListHandler; private var contextHandler:(ChatMessageBuilder, Stanza)->ChatMessageBuilder = (b,_)->b; private var errorHandler:(Stanza)->Void; + private var sortA:Null<String>; + private final sortB:Null<String>; public var lastPage(default, null):ResultSetPageResult; public var progress(default, null): Int = 0; private var complete:Bool = false; - private var newestPageFirst:Bool = true; public var jmi(default, null): Map<String, Stanza> = []; - public function new(client:Client, stream:GenericStream, filter:MessageFilter, ?serviceJID:String) { + public function new(client:Client, stream:GenericStream, filter:MessageFilter, sortA: Null<String>, sortB: Null<String>, ?serviceJID:String) { this.client = client; this.stream = stream; this.filter = Reflect.copy(filter); + this.sortA = sortA; + this.sortB = sortB; this.serviceJID = serviceJID != null ? serviceJID : client.accountId(); } @@ -53,26 +56,14 @@ class MessageSync { throw new Exception("Attempt to fetch messages, but already complete"); } final promisedMessages:Array<Promise<Message>> = []; - if (lastPage == null) { - if (newestPageFirst == true && (filter.page == null || (filter.page.before == null && filter.page.after == null))) { - if (filter.page == null) - filter.page = {}; - filter.page.before = ""; // Request last page of results - } - } else { - if (filter.page == null) - filter.page = {}; - if (newestPageFirst == true) { - filter.page.before = lastPage.first; - } else { - filter.page.after = lastPage.last; - } + if (lastPage != null) { + if (filter.page == null) filter.page = {}; + filter.page.after = lastPage.last; } var query = new MAMQuery(filter, serviceJID); var previousMessageTime = ""; var counterSameTime = 0; final eventToken = stream.on("message", function (event) { - progress++; var message:Stanza = event.stanza; var from = message.attr.exists("from") ? message.attr.get("from") : client.accountId(); if (from != serviceJID) { // Only listen for results from the JID we queried @@ -82,10 +73,12 @@ class MessageSync { if (result == null || result.attr.get("queryid") != query.queryId) { // Not (a|our) MAM result return EventUnhandled; } + progress++; var originalMessage = result.findChild("{urn:xmpp:forward:0}forwarded/{jabber:client}message"); if (originalMessage == null) { // No message, nothing for us to do return EventHandled; } + sortA = FractionalIndexing.between(sortA, sortB, FractionalIndexing.BASE_95_DIGITS); var timestamp = result.findText("{urn:xmpp:forward:0}forwarded/{urn:xmpp:delay}delay@stamp"); if (timestamp == null) { trace("MAM result with no timestamp", result); @@ -120,6 +113,7 @@ class MessageSync { trace("MAM: Decrypted stanza: "+decryptedStanza); return Message.fromStanza(decryptedStanza, client.jid, (builder, stanza) -> { + builder.sortId = sortA; builder.serverId = result.attr.get("id"); builder.serverIdBy = serviceJID; builder.encryption = decryptionResult.encryptionInfo; @@ -129,6 +123,7 @@ class MessageSync { }, (err) -> { trace("MAM: Decryption failed: "+err); return Message.fromStanza(originalMessage, client.jid, (builder, stanza) -> { + builder.sortId = sortA; builder.serverId = result.attr.get("id"); builder.serverIdBy = serviceJID; if (timestamp != null && builder.timestamp == null) builder.timestamp = timestamp; @@ -144,6 +139,7 @@ class MessageSync { trace("MAM: Processing non-OMEMO message from " + originalMessage.attr.get("from")); final msg = Message.fromStanza(originalMessage, client.jid, (builder, stanza) -> { + builder.sortId = sortA; builder.serverId = result.attr.get("id"); builder.serverIdBy = serviceJID; if (timestamp != null && builder.timestamp == null) builder.timestamp = timestamp; @@ -193,8 +189,4 @@ class MessageSync { public function onError(handler:(Stanza)->Void) { this.errorHandler = handler; } - - public function setNewestPageFirst(newestPageFirst:Bool):Void { - this.newestPageFirst = newestPageFirst; - } } diff --git a/borogove/Persistence.hx b/borogove/Persistence.hx index 177ee67..9e56d99 100644 --- a/borogove/Persistence.hx +++ b/borogove/Persistence.hx @@ -21,7 +21,7 @@ using borogove.SignalProtocol; #end @:expose interface Persistence { - public function lastId(accountId: String, chatId: Null<String>): Promise<Null<String>>; + public function syncPoint(accountId: String, chatId: Null<String>): Promise<Null<ChatMessage>>; public function storeChats(accountId: String, chats: Array<Chat>):Void; @HaxeCBridge.noemit public function getChats(accountId: String): Promise<Array<SerializedChat>>; diff --git a/borogove/Push.hx b/borogove/Push.hx index 1390996..4eb2c7f 100644 --- a/borogove/Push.hx +++ b/borogove/Push.hx @@ -38,7 +38,19 @@ class Push { // Assume incoming message final message = ChatMessage.fromStanza(stanza, JID.parse(stanza.attr.get("to")).asBare()); if (message != null) { - persistence.storeMessages(message.account(), [message]); + // TODO: this puts every push at the same sortId until the next sync + persistence.syncPoint(message.account(), message.type == MessageChannel ? message.chatId() : null).then(point -> { + final sortId = FractionalIndexing.between(point?.sortId, null, FractionalIndexing.BASE_95_DIGITS); + final toStore = ChatMessage.fromStanza( + stanza, + JID.parse(stanza.attr.get("to")).asBare(), + (builder, stanza) -> { + builder.sortId = sortId; + return builder; + } + ); + persistence.storeMessages(message.account(), [toStore]); + }); return Notification.fromChatMessage(message); } else { return Notification.fromThinStanza(stanza); diff --git a/borogove/calls/Session.hx b/borogove/calls/Session.hx index c50f878..9409794 100644 --- a/borogove/calls/Session.hx +++ b/borogove/calls/Session.hx @@ -51,12 +51,12 @@ interface Session { public function dtmf():Null<borogove.calls.PeerConnection.DTMFSender>; } -private function mkCallMessage(to: JID, from: JID, event: Stanza) { +private function mkCallMessage(to: JID, client: Client, event: Stanza) { final m = new ChatMessageBuilder(); m.type = MessageCall; m.to = to; m.recipients = [to.asBare()]; - m.from = from; + m.from = client.jid; m.sender = m.from.asBare(); m.replyTo = [m.sender]; m.direction = MessageSent; @@ -68,6 +68,7 @@ private function mkCallMessage(to: JID, from: JID, event: Stanza) { m.versions = [m.build()]; } m.localId = event.attr.get("id"); + m.sortId = client.nextSortId(); return m.build(); } @@ -89,7 +90,7 @@ class IncomingProposedSession implements Session { // XEP-0353 says to send <ringing/> but that leaks presence if not careful // Store it for ourselves at least final event = new Stanza("ringing", { xmlns: "urn:xmpp:jingle-message:0", id: sid }); - final msg = mkCallMessage(from, client.jid, event); + final msg = mkCallMessage(from, client, event); client.storeMessages([msg]).then((stored) -> { client.notifyMessageHandlers(stored[0], CorrectionEvent); }); @@ -101,7 +102,7 @@ class IncomingProposedSession implements Session { // It also tells all other devices to stop ringing, which you may or may not want // Store it for ourselves at least final event = new Stanza("reject", { xmlns: "urn:xmpp:jingle-message:0", id: sid }); - final msg = mkCallMessage(from, client.jid, event); + final msg = mkCallMessage(from, client, event); client.storeMessages([msg]).then((stored) -> { client.notifyMessageHandlers(stored[0], CorrectionEvent); }); @@ -136,7 +137,7 @@ class IncomingProposedSession implements Session { accepted = true; client.sendPresence(from.asString()); final event = new Stanza("proceed", { xmlns: "urn:xmpp:jingle-message:0", id: sid }); - final msg = mkCallMessage(from, client.jid, event); + final msg = mkCallMessage(from, client, event); client.storeMessages([msg]).then((stored) -> { client.notifyMessageHandlers(stored[0], CorrectionEvent); client.sendStanza( @@ -219,7 +220,7 @@ class OutgoingProposedSession implements Session { if (video) { event.tag("description", { xmlns: "urn:xmpp:jingle:apps:rtp:1", media: "video" }).up(); } - final msg = mkCallMessage(to, client.jid, event); + final msg = mkCallMessage(to, client, event); client.storeMessages([msg]).then((stored) -> { final stanza = new Stanza("message", { to: to.asString(), type: "chat", id: msg.localId }) .addChild(event) @@ -232,7 +233,7 @@ class OutgoingProposedSession implements Session { public function hangup() { final event = new Stanza("retract", { xmlns: "urn:xmpp:jingle-message:0", id: sid }); - final msg = mkCallMessage(to, client.jid, event); + final msg = mkCallMessage(to, client, event); client.storeMessages([msg]).then((stored) -> { client.sendStanza( new Stanza("message", { to: to.asString(), type: "chat", id: msg.versions[0].localId }) @@ -427,7 +428,7 @@ class InitiatedSession implements Session { } final event = new Stanza("finish", { xmlns: "urn:xmpp:jingle-message:0", id: sid }); - final msg = mkCallMessage(counterpart, client.jid, event); + final msg = mkCallMessage(counterpart, client, event); client.storeMessages([msg]).then((stored) -> { client.notifyMessageHandlers(stored[0], CorrectionEvent); client.sendStanza( diff --git a/borogove/persistence/Dummy.hx b/borogove/persistence/Dummy.hx index 50261ab..4593252 100644 --- a/borogove/persistence/Dummy.hx +++ b/borogove/persistence/Dummy.hx @@ -30,7 +30,7 @@ class Dummy implements Persistence { public function new() { } @HaxeCBridge.noemit - public function lastId(accountId: String, chatId: Null<String>): Promise<Null<String>> { + public function syncPoint(accountId: String, chatId: Null<String>): Promise<Null<ChatMessage>> { return Promise.resolve(null); } diff --git a/borogove/persistence/IDB.js b/borogove/persistence/IDB.js index fd88c1e..25fff62 100644 --- a/borogove/persistence/IDB.js +++ b/borogove/persistence/IDB.js @@ -16,11 +16,67 @@ import { borogove_ReactionUpdate, borogove_SerializedChat, borogove_Stanza, + FractionalIndexing_between, + FractionalIndexing_BASE_95_DIGITS } from "./borogove.js"; import * as enums from "./borogove-enums.js"; export default async (dbname, media, tokenize, stemmer) => { - if (!tokenize) tokenize = function(s) { return s.split(" "); } + const stopwords = [ + "about", + "after", + "all", + "am", + "an", + "and", + "are", + "as", + "at", + "be", + "but", + "by", + "can", + "com", + "de", + "do", + "en", + "for", + "from", + "he", + "how", + "in", + "is", + "isn", + "it", + "la", + "ll", + "of", + "on", + "or", + "our", + "she", + "so", + "that", + "the", + "they", + "this", + "to", + "too", + "und", + "was", + "we", + "what", + "when", + "where", + "who", + "will", + "with", + "www", + "you" + ]; + if (!tokenize) tokenize = function(s) { + return s.toLowerCase().split(/\s*\b/).filter(w => w.length > 1 && w.match(/\w/) && !stopwords.includes(w)); + } if (!stemmer) stemmer = function(s) { return s; } // Helper functions to convert binary data to storage-safe strings @@ -40,17 +96,54 @@ export default async (dbname, media, tokenize, stemmer) => { return bytes.buffer; } + async function migrationAddSortIdAndTerms(db, accountIds) { + const start = new Date().getTime(); + for (const account of accountIds) { + const tx = db.transaction(["messages"], "readwrite"); + const store = tx.objectStore("messages"); + const count = await promisifyRequest(store.count()); + var index = FractionalIndexing_between("a ", null, FractionalIndexing_BASE_95_DIGITS); + const cursor = store.index("accounts").openCursor( + IDBKeyRange.bound([account], [account, []]) + ); + let i = 0; + let updates = []; + const flushUpdates = () => { + for (const update of updates) { + store.put(update); + } + updates = []; + }; + while (true) { + const cresult = await promisifyRequest(cursor); + if (!cresult) { + flushUpdates(); + break; + } + const sortId = index = FractionalIndexing_between(index, null, FractionalIndexing_BASE_95_DIGITS); + const terms = [...new Set(tokenize((cresult.value.text || "").replace(/^>.*/mg, "")).map(stemmer))].sort(); + updates.push({ ...cresult.value, sortId, terms }); + if (i++ % 1000 === 0) { + console.log("Migrating... " + i + " / " + count); + } + if (updates.length > 10000) flushUpdates(); + if (cresult) cresult.continue(); + } + await promisifyRequest(tx); + } + console.log("Migrating done", new Date().getTime() - start); + } + function openDb(version) { return new Promise((resolve, reject) => { var dbOpenReq = indexedDB.open(dbname, version); dbOpenReq.onerror = console.error; dbOpenReq.onupgradeneeded = (event) => { const db = event.target.result; + const tx = event.target.transaction; if (!db.objectStoreNames.contains("messages")) { const messages = db.createObjectStore("messages", { keyPath: ["account", "serverId", "serverIdBy", "localId"] }); - messages.createIndex("chats", ["account", "chatId", "timestamp"]); messages.createIndex("localId", ["account", "localId", "chatId"]); - messages.createIndex("accounts", ["account", "timestamp"]); } if (!db.objectStoreNames.contains("keyvaluepairs")) { db.createObjectStore("keyvaluepairs"); @@ -77,6 +170,23 @@ export default async (dbname, media, tokenize, stemmer) => { if (!db.objectStoreNames.contains("omemo_sessions_meta")) { db.createObjectStore("omemo_sessions_meta", { keyPath: ["account", "address"] }); } + + const messagesIndexNames = tx.objectStore("messages").indexNames; + if (!messagesIndexNames.contains("chatsBySortId")) { + tx.objectStore("messages").createIndex("chatsBySortId", ["account", "chatId", "sortId"]); + } + if (!messagesIndexNames.contains("accountsBySortId")) { + tx.objectStore("messages").createIndex("accountsBySortId", ["account", "sortId"]); + } + if (!messagesIndexNames.contains("terms")) { + tx.objectStore("messages").createIndex("terms", "terms", { multiEntry: true }); + } + if (messagesIndexNames.contains("accounts")) { + tx.objectStore("messages").deleteIndex("accounts"); + } + if (messagesIndexNames.contains("chats")) { + tx.objectStore("messages").deleteIndex("chats"); + } }; dbOpenReq.onsuccess = (event) => { const db = event.target.result; @@ -90,18 +200,50 @@ export default async (dbname, media, tokenize, stemmer) => { "omemo_sessions", "omemo_sessions_meta" ]; - for(let storeName of storeNames) { + for(const storeName of storeNames) { if(!db.objectStoreNames.contains(storeName)) { db.close(); openDb(db.version + 1).then(resolve, reject); return; } } - resolve(db); + const tx = db.transaction(["messages", "keyvaluepairs"], "readonly"); + const messagesIndexNames = tx.objectStore("messages").indexNames; + const wantIndexNames = ["chatsBySortId", "accountsBySortId", "terms"]; + for(const indexName of wantIndexNames) { + if(!messagesIndexNames.contains(indexName)) { + db.close(); + openDb(db.version + 1).then(resolve, reject); + return; + } + } + + (async () => { + const kv = tx.objectStore("keyvaluepairs"); + const ranMigrationAddSortIdAndTerms = await promisifyRequest(kv.get("__migrationAddSortIdAndTerms")); + if (!ranMigrationAddSortIdAndTerms && messagesIndexNames.contains("accounts")) { + const keys = await promisifyRequest(kv.getAllKeys(IDBKeyRange.bound("login:clientId:", "login:clientId:\uffff"))); + const accountIds = keys.map(k => k.substring(15)); + await migrationAddSortIdAndTerms(db, accountIds); + + const writeKV = db.transaction(["keyvaluepairs"], "readwrite"); + await promisifyRequest(writeKV.objectStore("keyvaluepairs").put(new Date(), "__migrationAddSortIdAndTerms")); + } + + if (messagesIndexNames.contains("accounts") || messagesIndexNames.contains("chats")) { + db.close(); + openDb(db.version + 1).then(resolve, reject); + return; + } + + resolve(db); + })(); }; }); } + const db = await openDb(); + const recentCorrections = {}; function promisifyRequest(request) { return new Promise((resolve, reject) => { @@ -151,6 +293,7 @@ export default async (dbname, media, tokenize, stemmer) => { message.serverId = value.serverId ? value.serverId : null; message.serverIdBy = value.serverIdBy ? value.serverIdBy : null; message.replyId = value.replyId ? value.replyId : null; + message.sortId = value.sortId ? value.sortId : null; message.syncPoint = !!value.syncPoint; message.direction = value.direction; message.status = value.status; @@ -207,6 +350,7 @@ export default async (dbname, media, tokenize, stemmer) => { versions: message.versions.map((m) => serializeMessage(account, m)), payloads: message.payloads.map((p) => p.toString()), stanza: message.stanza?.toString(), + terms: [...new Set(tokenize((message.text || "").replace(/^>.*/mg, "")).map(stemmer))].sort() } } @@ -222,7 +366,9 @@ export default async (dbname, media, tokenize, stemmer) => { head.serverId = result.value.serverId; head.localId = result.value.localId; head.replyId = result.value.replyId; - head.timestamp = result.value.timestamp; // Edited version is not newer + // Edited version is not newer + head.timestamp = result.value.timestamp; + head.sortId = result.value.sortId; head.versions = versions; head.reactions = result.value.reactions; // Preserve these, edit doesn't touch them // Calls can "edit" from multiple senders, but the original direction and sender holds @@ -235,6 +381,11 @@ export default async (dbname, media, tokenize, stemmer) => { head.recipients = result.value.recipients; } result.update(head); + if (!message.isIncoming()) { + for (const version of newVersions) { + recentCorrections[version.localId] = head.localId; + } + } return head; } @@ -254,17 +405,17 @@ export default async (dbname, media, tokenize, stemmer) => { } const obj = { - lastId: async function(account, chatId) { + syncPoint: async function(account, chatId) { const tx = db.transaction(["messages"], "readonly"); const store = tx.objectStore("messages"); var cursor = null; if (chatId === null) { - cursor = store.index("accounts").openCursor( + cursor = store.index("accountsBySortId").openCursor( IDBKeyRange.bound([account], [account, []]), "prev" ); } else { - cursor = store.index("chats").openCursor( + cursor = store.index("chatsBySortId").openCursor( IDBKeyRange.bound([account, chatId], [account, chatId, []]), "prev" ); @@ -272,7 +423,9 @@ export default async (dbname, media, tokenize, stemmer) => { while (true) { const result = await promisifyRequest(cursor); if (!result || (result.value.syncPoint && result.value.serverId && ((chatId && result.value.serverIdBy == chatId) || result.value.serverIdBy === account))) { - return result ? result.value.serverId : null; + if (!result?.value) return null; + + return await hydrateMessage(result.value); } else { result.continue(); } @@ -338,47 +491,41 @@ export default async (dbname, media, tokenize, stemmer) => { ))); }, - getChatsUnreadDetails: async function(account, chatsArray) { + getChatUnreadDetails: async function(account, chat) { const tx = db.transaction(["messages"], "readonly"); const store = tx.objectStore("messages"); - const cursor = store.index("accounts").openCursor( - IDBKeyRange.bound([account], [account, []]), + const cursor = store.index("chatsBySortId").openCursor( + IDBKeyRange.bound([account, chat.chatId], [account, chat.chatId, []]), "prev" ); - const chats = {}; - chatsArray.forEach((chat) => chats[chat.chatId] = chat); - const result = {}; - var rowCount = 0; + let rowCount = 0; + let unreadCount = 0; + let lastMessage = null; while (true) { const cresult = await promisifyRequest(cursor); - if (cresult && rowCount < 40000) { - rowCount++; - const value = cresult.value; - if (chats[value.chatId]) { - if (result[value.chatId]) { - result[value.chatId] = result[value.chatId].then((details) => { - if (!details.foundAll) { - const readUpTo = chats[value.chatId]?.readUpToId; - if (readUpTo === value.serverId || readUpTo === value.localId || value.direction == enums.borogove_MessageDirection.MessageSent) { - details.foundAll = true; - } else { - details.unreadCount++; - } - } - return details; - }); - } else { - const readUpTo = chats[value.chatId]?.readUpToId; - const haveRead = readUpTo === value.serverId || readUpTo === value.localId || value.direction == enums.borogove_MessageDirection.MessageSent; - result[value.chatId] = hydrateMessage(value).then((m) => ({ chatId: value.chatId, message: m, unreadCount: haveRead ? 0 : 1, foundAll: haveRead })); - } - } - cresult.continue(); + if (!cresult || rowCount > 2000) break; + + rowCount++; + const value = cresult.value; + if (!lastMessage) lastMessage = hydrateMessage(value); + if (chat.readUpToId === value.serverId || value.direction == enums.borogove_MessageDirection.MessageSent) { + break; } else { - return await Promise.all(Object.values(result)); + unreadCount++; } + cresult.continue(); } + + const message = await lastMessage; + return { message, unreadCount }; + }, + + getChatsUnreadDetails: function(account, chatsArray) { + return Promise.all(chatsArray.map(async chat => { + const details = await this.getChatUnreadDetails(account, chat); + return { chatId: chat.chatId, ...details }; + })); }, getMessage: async function(account, chatId, serverId, localId) { @@ -386,9 +533,18 @@ export default async (dbname, media, tokenize, stemmer) => { const tx = db.transaction(["messages"], "readonly"); const store = tx.objectStore("messages"); - let result; + let result = null; if (serverId) { - result = await promisifyRequest(store.openCursor(IDBKeyRange.bound([account, serverId], [account, serverId, []]))); + const cursor = store.openCursor(IDBKeyRange.bound([account, serverId], [account, serverId, []])); + while (true) { + const cresult = await promisifyRequest(cursor); + if (!cresult) break; + if (cresult.value.chatId === chatId) { + result = cresult; + break; + } + cresult.continue(); + } } else { result = await promisifyRequest(store.index("localId").openCursor(IDBKeyRange.only([account, localId, chatId]))); } @@ -429,6 +585,7 @@ export default async (dbname, media, tokenize, stemmer) => { storeMessage: function(account, message, callback) { if (!message.chatId()) throw "Cannot store a message with no chatId"; + if (!message.sortId) throw "Cannot store a message with no sortId"; if (!message.serverId && !message.localId) throw "Cannot store a message with no id"; if (!message.serverId && message.isIncoming()) throw "Cannot store an incoming message with no server id"; if (message.serverId && !message.serverIdBy) throw "Cannot store a message with a server id and no by"; @@ -502,81 +659,67 @@ export default async (dbname, media, tokenize, stemmer) => { }, updateMessageStatus: async function(account, localId, status, statusText) { + const idToLookup = recentCorrections[localId] ?? localId; const tx = db.transaction(["messages"], "readwrite"); const store = tx.objectStore("messages"); - const result = await promisifyRequest(store.index("localId").openCursor(IDBKeyRange.bound([account, localId], [account, localId, []]))); - if (result?.value && result.value.direction === enums.borogove_MessageDirection.MessageSent && ![enums.borogove_MessageStatus.MessageDeliveredToDevice, enums.borogove_MessageStatus.MessageFailedToSend].includes(result.value.status)) { - const newStatus = { ...result.value, status, statusText }; + const result = await promisifyRequest(store.index("localId").openCursor(IDBKeyRange.bound([account, idToLookup], [account, idToLookup, []]))); + const value = result?.value; + if (value && value.direction === enums.borogove_MessageDirection.MessageSent && ![enums.borogove_MessageStatus.MessageDeliveredToDevice, enums.borogove_MessageStatus.MessageFailedToSend].includes(value.status)) { + const newStatus = (value.versions?.length || 0) > 0 ? + { ...result.value, versions: [{ ...value.versions[0], status, statusText }, ...value.versions.slice(1)], status, statusText } : + { ...value, status, statusText }; result.update(newStatus); return await hydrateMessage(newStatus); } - // Maybe a correction? Check recent messages - const cursor = store.index("accounts").openCursor( - IDBKeyRange.bound([account], [account, []]), - "prev" - ); - let count = 0; - for (let count = 0; count < 1000; count++) { - const cresult = await promisifyRequest(cursor); - if (!cresult) break; - - const value = cresult.value; - if (value?.versions?.[0]?.localId === localId && value?.direction === enums.borogove_MessageDirection.MessageSent && ![enums.borogove_MessageStatus.MessageDeliveredToDevice, enums.borogove_MessageStatus.MessageFailedToSend].includes(result.value.status)) { - const newStatus = { ...value, versions: [{ ...value.versions[0], status, statusText }, ...value.versions.slice(1)], status, statusText }; - cresult.update(newStatus); - return await hydrateMessage(newStatus); - } - cresult.continue(); - } - throw "Message not found: " + localId; }, getMessagesBefore: async function(account, chatId, before) { - const bound = before ? new Date(before.timestamp) : []; const tx = db.transaction(["messages"], "readonly"); const store = tx.objectStore("messages"); - const cursor = store.index("chats").openCursor( - IDBKeyRange.bound([account, chatId], [account, chatId, bound]), + const cursor = store.index("chatsBySortId").openCursor( + IDBKeyRange.bound([account, chatId], [account, chatId, before?.sortId || []]), "prev" ); - const messages = await this.getMessagesFromCursor(cursor, before?.serverId || before?.localId, bound); + const messages = await this.getMessagesFromCursor(cursor, before); return messages.reverse(); }, getMessagesAfter: async function(account, chatId, after) { - const bound = after ? [new Date(after.timestamp)] : []; + const bound = after?.sortId ? [after.sortId] : []; const tx = db.transaction(["messages"], "readonly"); const store = tx.objectStore("messages"); - const cursor = store.index("chats").openCursor( + const cursor = store.index("chatsBySortId").openCursor( IDBKeyRange.bound([account, chatId, ...bound], [account, chatId, []]), "next" ); - return this.getMessagesFromCursor(cursor, after?.serverId || after?.localId, bound[0]); + return this.getMessagesFromCursor(cursor, after); }, getMessagesAround: async function(account, around) { + if (!around) throw "Cannot look around nothing"; + const chatId = around.chatId(); const before = this.getMessagesBefore(account, chatId, around); const tx = db.transaction(["messages"], "readonly"); const store = tx.objectStore("messages"); - const cursor = store.index("chats").openCursor( - IDBKeyRange.bound([account, chatId, new Date(around.timestamp)], [account, chatId, []]), + const cursor = store.index("chatsBySortId").openCursor( + IDBKeyRange.bound([account, chatId, around.sortId], [account, chatId, []]), "next" ); - const aroundAndAfter = this.getMessagesFromCursor(cursor, null, null); + const aroundAndAfter = this.getMessagesFromCursor(cursor, null); return Promise.all([before, aroundAndAfter]).then(result => result.flat()); }, - getMessagesFromCursor: async function(cursor, id, bound) { + getMessagesFromCursor: async function(cursor, notIncluding) { const result = []; while (true) { const cresult = await promisifyRequest(cursor); if (cresult && result.length < 50) { const value = cresult.value; - if (value.serverId === id || value.localId === id || (value.timestamp && value.timestamp.getTime() === (bound instanceof Date && bound.getTime()))) { + if ((notIncluding?.serverId && notIncluding?.serverId === value?.serverId) || (notIncluding?.localId && !value?.serverId && notIncluding?.localId === value?.localId)) { cresult.continue(); continue; } @@ -589,39 +732,38 @@ export default async (dbname, media, tokenize, stemmer) => { } }, - searchMessages: function(account, chatId, q, callback) { + searchMessages: async function(account, chatId, q) { + const qTerms = new Set(tokenize(q).map(stemmer)); const tx = db.transaction(["messages"], "readonly"); const store = tx.objectStore("messages"); - var cursor; - if (chatId) { - cursor = store.index("chats").openCursor( - IDBKeyRange.bound([account, chatId], [account, chatId, []]), - "prev" - ); - } else if (account) { - cursor = store.index("accounts").openCursor( - IDBKeyRange.bound([account], [account, []]), - "prev" - ); - } else { - cursor = store.openCursor(undefined, "prev"); - } - const qTok = new Set(tokenize(q).map(stemmer)); - cursor.onsuccess = (event) => { - if (event.target.result) { - const value = event.target.result.value; - if (value.text && new Set(tokenize(value.text).map(stemmer)).isSupersetOf(qTok)) { - if (!callback(q, hydrateMessageSync(value))) return; - } - event.target.result.continue(); - } else { - callback(null); + const index = store.index("terms"); + + // Figure out which search term matches the fewest messages + let probeTerm = null; + let probeScore = null; + for (const term of qTerms) { + const score = await promisifyRequest(index.count(IDBKeyRange.only(term))); + if (!probeTerm || score < probeScore) { + probeTerm = term; + probeScore = score; } } - cursor.onerror = (event) => { - console.error(event); - callback(null); + + // Using the smallest list of messages that match one term + // Find the ones that match every term + const result = []; + const cursor = index.openCursor(IDBKeyRange.only(probeTerm)); + while (true) { + const cresult = await promisifyRequest(cursor); + if (!cresult?.value) break; + + if (cresult.value.account === account && (!chatId || cresult.value.chatId === chatId) && new Set(cresult.value.terms || []).isSupersetOf(qTerms)) { + result.push(hydrateMessageSync(cresult.value)); + } + cresult.continue(); } + + return result.sort((a, b) => a.timestamp < b.timstamp ? -1 : (a.timestamp > b.timestamp ? 1 : 0)); }, hasMedia: function(hashAlgorithm, hash) { diff --git a/borogove/persistence/Sqlite.hx b/borogove/persistence/Sqlite.hx index 24b492d..e3e7c7b 100644 --- a/borogove/persistence/Sqlite.hx +++ b/borogove/persistence/Sqlite.hx @@ -198,6 +198,29 @@ class Sqlite implements Persistence implements KeyValueStore { "PRAGMA user_version = 7"]); } return Promise.resolve(null); + }).then(_ -> { + if (version < 8) { + return exec(["ALTER TABLE messages ADD COLUMN sort_id TEXT NOT NULL DEFAULT 'a '", + "CREATE INDEX messages_sort_id ON messages (account_id, chat_id, sort_id)"]).then(_ -> + exec(["SELECT ROWID FROM messages ORDER BY created_at"]) + ).then(rows -> { + var promise = Promise.resolve(null); + var toInsert = []; + var sortId = "a "; + for (row in rows) { + sortId = FractionalIndexing.between(sortId, null); + toInsert.push("UPDATE messages SET sort_id='" + StringTools.replace(sortId, "'", "''") + "' WHERE ROWID=" + row.rowid); + if (toInsert.length >= 10000) { + promise = promise.then(_ -> exec(toInsert)); + toInsert = []; + } + } + return promise.then(_ -> exec(toInsert)); + }).then(_ -> + exec(["PRAGMA user_version = 8"]) + ); + } + return Promise.resolve(null); }); }); }); @@ -223,14 +246,9 @@ class Sqlite implements Persistence implements KeyValueStore { } @HaxeCBridge.noemit - public function lastId(accountId: String, chatId: Null<String>): Promise<Null<String>> { + public function syncPoint(accountId: String, chatId: Null<String>): Promise<Null<ChatMessage>> { final params = [accountId]; - var q = "SELECT mam_id, MAX(row) FROM (SELECT mam_id, ROWID as row FROM messages"; - if (chatId == null) { - // Index would actually slow us down here because we order by ROWID and barely filter - q += " NOT INDEXED"; - } - q += " WHERE mam_id IS NOT NULL AND sync_point AND account_id=?"; + var q = "SELECT stanza, direction, type, status, status_text, strftime('%FT%H:%M:%fZ', created_at / 1000.0, 'unixepoch') AS timestamp, sender_id, mam_id, mam_by, sync_point, sort_id FROM messages WHERE mam_id IS NOT NULL AND mam_id<>'' AND sync_point AND account_id=?"; if (chatId == null) { q += " AND mam_by=?"; params.push(accountId); @@ -239,11 +257,9 @@ class Sqlite implements Persistence implements KeyValueStore { params.push(chatId); } if (chatId != null) { - // Surely it is in the most recent 1000 - q += " ORDER BY created_at DESC LIMIT 1000"; + q += " ORDER BY sort_id DESC LIMIT 1"; } - q += ")"; - return db.exec(q, params).then(iter -> cast (iter.next()?.mam_id, Null<String>)); + return db.exec(q, params).then(result -> hydrateMessages(accountId, result)[0]); } private final storeChatBuffer: Map<String, Chat> = []; @@ -295,7 +311,7 @@ class Sqlite implements Persistence implements KeyValueStore { final row: Array<Dynamic> = [ accountId, chat.chatId, chat.isTrusted(), chat.avatarSha1, chat.getDisplayName(), chat.uiState, chat.isBlocked, - chat.extensions.toString(), chat.readUpTo(), chat.readUpToBy, + chat.extensions.toString(), chat.readUpToId, chat.readUpToBy, channel?.disco?.verRaw().hash, Json.stringify(mapPresence(chat)), Type.getClassName(Type.getClass(chat)).split(".").pop(), chat.notificationsFiltered(), chat.notifyMention(), chat.notifyReply(), @@ -370,6 +386,7 @@ class Sqlite implements Persistence implements KeyValueStore { final localIds = []; final replyTos = []; for (message in messages) { + if (message.sortId == null) throw "Cannot store a message with no sortId"; if (message.serverId == null && message.localId == null) throw "Cannot store a message with no id"; if (message.serverId == null && message.isIncoming()) throw "Cannot store an incoming message with no server id"; if (message.serverId != null && message.serverIdBy == null) throw "Cannot store a message with a server id and no by"; @@ -388,7 +405,7 @@ class Sqlite implements Persistence implements KeyValueStore { return storeMessagesSerialized.run(() -> // Hmm, if there is an existing one this loses the original timestamp though db.exec( - "INSERT OR REPLACE INTO messages VALUES " + messages.map(_ -> "(?,?,?,?,?,?,?,?,CAST(unixepoch(?, 'subsec') * 1000 AS INTEGER),?,?,?,?,?)").join(","), + "INSERT OR REPLACE INTO messages VALUES " + messages.map(_ -> "(?,?,?,?,?,?,?,?,CAST(unixepoch(?, 'subsec') * 1000 AS INTEGER),?,?,?,?,?,?)").join(","), messages.flatMap(m -> { final correctable = m; final message = m.versions.length == 1 ? m.versions[0] : m; // TODO: storing multiple versions at once? We never do that right now @@ -397,7 +414,7 @@ class Sqlite implements Persistence implements KeyValueStore { message.localId ?? "", correctable.callSid() ?? correctable.localId ?? correctable.serverId, correctable.syncPoint, correctable.chatId(), correctable.senderId, message.timestamp, message.status, message.direction, message.type, - message.asStanza().toString(), message.statusText + message.asStanza().toString(), message.statusText, message.sortId ] : Array<Dynamic>); }) ).then(_ -> @@ -425,7 +442,7 @@ class Sqlite implements Persistence implements KeyValueStore { @returns Promise resolving to the message or null **/ public function getMessage(accountId: String, chatId: String, serverId: Null<String>, localId: Null<String>): Promise<Null<ChatMessage>> { - var q = "SELECT stanza, direction, type, status, status_text, strftime('%FT%H:%M:%fZ', created_at / 1000.0, 'unixepoch') AS timestamp, sender_id, mam_id, mam_by, sync_point FROM messages WHERE account_id=? AND chat_id=?"; + var q = "SELECT stanza, direction, type, status, status_text, strftime('%FT%H:%M:%fZ', created_at / 1000.0, 'unixepoch') AS timestamp, sender_id, mam_id, mam_by, sort_id, sync_point FROM messages WHERE account_id=? AND chat_id=?"; final params = [accountId, chatId]; if (serverId != null) { q += " AND mam_id=?"; @@ -446,16 +463,17 @@ class Sqlite implements Persistence implements KeyValueStore { ); } - private function getMessages(accountId: String, chatId: String, time: Null<String>, op: String): Promise<Array<ChatMessage>> { + private function getMessages(accountId: String, chatId: String, sortId: Null<String>, op: String): Promise<Array<ChatMessage>> { var q = "WITH page AS (SELECT stanza_id, mam_id FROM messages where account_id=? AND chat_id=? AND (stanza_id IS NULL OR stanza_id='' OR stanza_id=correction_id)"; final params: Array<Dynamic> = [accountId, chatId]; - if (time != null) { - q += " AND messages.created_at " + op + "CAST(unixepoch(?, 'subsec') * 1000 AS INTEGER)"; - params.push(time); + if (sortId != null) { + q += " AND messages.sort_id " + op + " ?"; + params.push(sortId); } q += " ORDER BY messages.created_at"; if (op == "<" || op == "<=") q += " DESC"; - q += ", messages.ROWID"; + // Should not need this, but just in case? + q += ", messages.created_at"; if (op == "<" || op == "<=") q += " DESC"; q += " LIMIT 50) "; q += "SELECT @@ -471,12 +489,13 @@ class Sqlite implements Persistence implements KeyValueStore { messages.sender_id, messages.mam_id, messages.mam_by, + messages.sort_id, messages.sync_point, MAX(versions.created_at) FROM messages INNER JOIN messages versions USING (correction_id, sender_id) WHERE (messages.stanza_id, messages.mam_id) IN (SELECT * FROM page) AND messages.account_id=? AND messages.chat_id=? GROUP BY correction_id, CASE WHEN messages.type=? THEN 'call' ELSE messages.sender_id END"; - q += " ORDER BY messages.created_at"; + q += " ORDER BY messages.sort_id"; if (op == "<" || op == "<=") q += " DESC"; - q += ", messages.ROWID"; + q += ", messages.created_at"; if (op == "<" || op == "<=") q += " DESC"; params.push(accountId); @@ -501,27 +520,27 @@ class Sqlite implements Persistence implements KeyValueStore { @HaxeCBridge.noemit public function getMessagesBefore(accountId: String, chatId: String, before: Null<ChatMessage>): Promise<Array<ChatMessage>> { - return getMessages(accountId, chatId, before?.timestamp, "<"); + return getMessages(accountId, chatId, before?.sortId, "<"); } @HaxeCBridge.noemit public function getMessagesAfter(accountId: String, chatId: String, after: Null<ChatMessage>): Promise<Array<ChatMessage>> { - return getMessages(accountId, chatId, after?.timestamp, ">"); + return getMessages(accountId, chatId, after?.sortId, ">"); } @HaxeCBridge.noemit public function getMessagesAround(accountId: String, around: ChatMessage): Promise<Array<ChatMessage>> { final chatId = around.chatId(); return thenshim.PromiseTools.all([ - getMessages(accountId, chatId, around.timestamp, "<"), - getMessages(accountId, chatId, around.timestamp, ">=") + getMessages(accountId, chatId, around.sortId, "<"), + getMessages(accountId, chatId, around.sortId, ">=") ]).then(results -> results.flatten()); } private function getChatUnreadDetails(accountId: String, chat: Chat): Promise<{ chatId: String, message: ChatMessage, unreadCount: Int }> { return db.exec( - "WITH subq as (SELECT ROWID as row, COALESCE(MAX(created_at), 0) as created_at FROM messages where account_id=? AND chat_id=? AND (mam_id=? OR direction=?)) SELECT chat_id AS chatId, stanza, direction, type, status, status_text, sender_id, mam_id, mam_by, sync_point, CASE WHEN (SELECT row FROM subq) IS NULL THEN COUNT(*) ELSE COUNT(*) - 1 END AS unreadCount, strftime('%FT%H:%M:%fZ', MAX(messages.created_at) / 1000.0, 'unixepoch') AS timestamp FROM messages WHERE account_id=? AND chat_id=? AND (stanza_id IS NULL OR stanza_id='' OR stanza_id=correction_id) AND (messages.created_at >= (SELECT created_at FROM subq) AND (messages.created_at <> (SELECT created_at FROM subq) OR messages.ROWID = (SELECT row FROM subq)))", - [accountId, chat.chatId, chat.readUpTo(), MessageSent, accountId, chat.chatId] + "WITH subq as (SELECT ROWID as row, COALESCE(MAX(sort_id, 'a ') as sort_id FROM messages where account_id=? AND chat_id=? AND (mam_id=? OR direction=?)) SELECT chat_id AS chatId, stanza, direction, type, status, status_text, sender_id, mam_id, mam_by, MAX(sort_id), sync_point, CASE WHEN (SELECT row FROM subq) IS NULL THEN COUNT(*) ELSE COUNT(*) - 1 END AS unreadCount, strftime('%FT%H:%M:%fZ', messages.created_at / 1000.0, 'unixepoch') AS timestamp FROM messages WHERE account_id=? AND chat_id=? AND (stanza_id IS NULL OR stanza_id='' OR stanza_id=correction_id) AND (messages.sort_id >= (SELECT sort_id FROM subq) AND (messages.sort_id <> (SELECT sort_id FROM subq) OR messages.ROWID = (SELECT row FROM subq)))", + [accountId, chat.chatId, chat.readUpToId, MessageSent, accountId, chat.chatId] ).then(result -> { final row: Dynamic = result.next(); final lastMessage = row.stanza == null ? [] : hydrateMessages(accountId, [row].iterator()); @@ -556,7 +575,7 @@ class Sqlite implements Persistence implements KeyValueStore { public function updateMessageStatus(accountId: String, localId: String, status: MessageStatus, statusText: Null<String>): Promise<ChatMessage> { return storeMessagesSerialized.run(() -> db.exec( - "UPDATE messages SET status=?, status_text=? WHERE account_id=? AND stanza_id=? AND direction=? AND status <> ? AND status <> ? RETURNING stanza, direction, type, status, status_text, strftime('%FT%H:%M:%fZ', created_at / 1000.0, 'unixepoch') AS timestamp, sender_id, correction_id AS stanza_id, mam_id, mam_by, sync_point", + "UPDATE messages SET status=?, status_text=? WHERE account_id=? AND stanza_id=? AND direction=? AND status <> ? AND status <> ? RETURNING stanza, direction, type, status, status_text, strftime('%FT%H:%M:%fZ', created_at / 1000.0, 'unixepoch') AS timestamp, sender_id, correction_id AS stanza_id, sort_id, mam_id, mam_by, sync_point", [status, statusText, accountId, localId, MessageSent, MessageDeliveredToDevice, MessageFailedToSend] ).then(result -> thenshim.PromiseTools.all(hydrateMessages(accountId, result).map(message ->{ @@ -839,7 +858,7 @@ class Sqlite implements Persistence implements KeyValueStore { final stanzaIds = []; final stanzaIdsS = []; var params = [accountId]; - final qStart = "SELECT chat_id, stanza_id, stanza, direction, type, status, status_text, strftime('%FT%H:%M:%fZ', created_at / 1000.0, 'unixepoch') AS timestamp, sender_id, mam_id, mam_by, sync_point FROM messages WHERE account_id=?"; + final qStart = "SELECT chat_id, stanza_id, stanza, direction, type, status, status_text, strftime('%FT%H:%M:%fZ', created_at / 1000.0, 'unixepoch') AS timestamp, sender_id, mam_id, mam_by, sort_id, sync_point FROM messages WHERE account_id=?"; for (parent in replyTos) { if (parent.serverId != null) { mamIds.push(parent.chatId); @@ -875,7 +894,7 @@ class Sqlite implements Persistence implements KeyValueStore { }); } - private function hydrateMessages(accountId: String, rows: Iterator<{ stanza: String, timestamp: String, direction: MessageDirection, type: MessageType, status: MessageStatus, status_text: Null<String>, mam_id: String, mam_by: String, sync_point: Int, sender_id: String, ?stanza_id: String, ?versions: String, ?version_times: String }>): Array<ChatMessage> { + private function hydrateMessages(accountId: String, rows: Iterator<{ stanza: String, timestamp: String, direction: MessageDirection, type: MessageType, status: MessageStatus, status_text: Null<String>, mam_id: String, mam_by: String, sort_id: String, sync_point: Int, sender_id: String, ?stanza_id: String, ?versions: String, ?version_times: String }>): Array<ChatMessage> { // TODO: Calls can "edit" from multiple senders, but the original direction and sender holds final accountJid = JID.parse(accountId); return { iterator: () -> rows }.map(row -> ChatMessage.fromStanza(Stanza.parse(row.stanza), accountJid, (builder, _) -> { @@ -887,6 +906,7 @@ class Sqlite implements Persistence implements KeyValueStore { builder.senderId = row.sender_id; builder.serverId = row.mam_id == "" ? null : row.mam_id; builder.serverIdBy = row.mam_by == "" ? null : row.mam_by; + builder.sortId = row.sort_id; if (builder.direction != row.direction) { builder.direction = row.direction; final replyTo = builder.replyTo; diff --git a/borogove/persistence/SqliteDriver.hx b/borogove/persistence/SqliteDriver.hx index 296440e..06baff8 100644 --- a/borogove/persistence/SqliteDriver.hx +++ b/borogove/persistence/SqliteDriver.hx @@ -39,6 +39,7 @@ class SqliteDriver { db = sys.db.Sqlite.open(dbfile); } var result = null; + if (qs.length > 1) db.request("BEGIN TRANSACTION"); for (q in qs) { final prepared = Sqlite.prepare(q); result = db.request(prepared); @@ -46,9 +47,11 @@ class SqliteDriver { // In testing, not copying to an array here caused BAD ACCESS sometimes // Though from sqlite docs it seems like it should be safe? final arr = { iterator: () -> result }.array(); + if (qs.length > 1) db.request("COMMIT"); dbs.push(db); resolve(arr.iterator()); } catch (e) { + if (qs.length > 1) db.request("ROLLBACK"); dbs.push(db); reject(e); } diff --git a/browserjs.hxml b/browserjs.hxml index 72e8b01..7e849e0 100644 --- a/browserjs.hxml +++ b/browserjs.hxml @@ -7,6 +7,7 @@ --library thenshim --library tink_http --library uuidv7 +--library fractional-indexing borogove.Client borogove.Register diff --git a/cpp.hxml b/cpp.hxml index b83d044..4b0954e 100644 --- a/cpp.hxml +++ b/cpp.hxml @@ -7,6 +7,7 @@ --library thenshim --library HtmlParser --library uuidv7 +--library fractional-indexing HaxeCBridge borogove.Client diff --git a/nodejs.hxml b/nodejs.hxml index f8de008..aca615d 100644 --- a/nodejs.hxml +++ b/nodejs.hxml @@ -8,6 +8,7 @@ --library thenshim --library tink_http --library uuidv7 +--library fractional-indexing borogove.Client borogove.Register diff --git a/optional-sqlite.awk b/optional-sqlite.awk index c59f117..9d8e763 100644 --- a/optional-sqlite.awk +++ b/optional-sqlite.awk @@ -65,5 +65,6 @@ skipping { END { print "export { borogove_Map }" >> "npm/browser-no-sqlite.js" + print "export { FractionalIndexing_between, FractionalIndexing_BASE_95_DIGITS }" >> "npm/browser-no-sqlite.js" print "export { $bind, $getIterator, Std, EReg, Type, Reflect, Lambda, haxe_io_Bytes, haxe_Timer, haxe_Exception, haxe_crypto_Base64, haxe_iterators_ArrayIterator, js_Boot, js_lib_HaxeIterator, thenshim_Promise, thenshim_PromiseTools }" >> "npm/browser-no-sqlite.js" } diff --git a/test.hxml b/test.hxml index 6534de8..2c7f27b 100644 --- a/test.hxml +++ b/test.hxml @@ -6,6 +6,7 @@ --library thenshim --library tink_http --library uuidv7 +--library fractional-indexing --library utest