git » sdk » commit 5ac5fa5

Sync MUC MAM and data model fixes needed for that

author Stephen Paul Weber
2023-11-08 18:14:49 UTC
committer Stephen Paul Weber
2023-11-08 21:16:35 UTC
parent c5ed3368b664b6f493466d380097708373d3b53d

Sync MUC MAM and data model fixes needed for that

xmpp/Chat.hx +48 -1
xmpp/ChatMessage.hx +1 -0
xmpp/MessageSync.hx +2 -0
xmpp/persistence/browser.js +25 -5

diff --git a/xmpp/Chat.hx b/xmpp/Chat.hx
index 8caf254..68ce6c5 100644
--- a/xmpp/Chat.hx
+++ b/xmpp/Chat.hx
@@ -115,8 +115,9 @@ abstract class Chat {
 		final presence = presence.get(resource);
 		if (presence != null) {
 			presence.caps = caps;
+			setPresence(resource, presence);
 		} else {
-			this.presence.set(resource, new Presence(caps));
+			setPresence(resource, new Presence(caps));
 		}
 	}
 
@@ -262,6 +263,7 @@ class DirectChat extends Chat {
 	}
 
 	public function prepareIncomingMessage(message:ChatMessage, stanza:Stanza) {
+		message.syncPoint = true; // TODO: if client is done initial MAM. right now it always is
 		return message;
 	}
 
@@ -351,6 +353,7 @@ class DirectChat extends Chat {
 @:expose
 class Channel extends Chat {
 	public var disco: Caps = new Caps("", [], ["http://jabber.org/protocol/muc"]);
+	private var inSync = true;
 
 	public function new(client:Client, stream:GenericStream, persistence:Persistence, chatId:String, uiState = Open, extensions = null, ?disco: Caps) {
 		super(client, stream, persistence, chatId, uiState, extensions);
@@ -381,6 +384,7 @@ class Channel extends Chat {
 					if (err.name == "item-not-found") return; // Nick was changed?
 					(shouldRefreshDisco ? refreshDisco : (cb)->cb())(() -> {
 						presence = {}; // About to ask for a fresh set
+						inSync = false;
 						client.sendPresence(
 							getFullJid().asString(),
 							(stanza) -> {
@@ -397,6 +401,48 @@ class Channel extends Chat {
 		);
 	}
 
+	override public function setPresence(resource:String, presence:Presence) {
+		super.setPresence(resource, presence);
+		if (!inSync && resource == client.displayName()) {
+			persistence.lastId(client.accountId(), chatId, doSync);
+		}
+	}
+
+	private function doSync(lastId: Null<String>) {
+		var thirtyDaysAgo = Date.format(
+			DateTools.delta(std.Date.now(), DateTools.days(-3))
+		);
+		var sync = new MessageSync(
+			client,
+			stream,
+			lastId == null ? { startTime: thirtyDaysAgo } : { page: { after: lastId } },
+			chatId
+		);
+		sync.setNewestPageFirst(false);
+		sync.onMessages((messageList) -> {
+			for (message in messageList.messages) {
+				persistence.storeMessage(client.accountId(), message);
+			}
+			if (sync.hasMore()) {
+				sync.fetchNext();
+			} else {
+				inSync = true;
+				final lastFromSync = messageList.messages[messageList.messages.length - 1];
+				if (lastFromSync != null && Reflect.compare(lastFromSync.timestamp, lastMessageTimestamp()) > 0) {
+					setLastMessage(lastFromSync);
+					client.trigger("chats/update", [this]);
+				}
+			}
+		});
+		sync.onError((stanza) -> {
+			if (lastId != null) {
+				// Gap in sync, out newest message has expired from server
+				doSync(null);
+			}
+		});
+		sync.fetchNext();
+	}
+
 	public function refreshDisco(?callback: ()->Void) {
 		final discoGet = new DiscoInfoGet(chatId);
 		discoGet.onFinished(() -> {
@@ -458,6 +504,7 @@ class Channel extends Chat {
 
 	public function prepareIncomingMessage(message:ChatMessage, stanza:Stanza) {
 		// TODO: mark type!=groupchat as whisper somehow
+		message.syncPoint = inSync;
 		message.sender = JID.parse(stanza.attr.get("from")); // MUC always needs full JIDs
 		if (message.senderId() == getFullJid().asString()) {
 			message.recipients = message.replyTo;
diff --git a/xmpp/ChatMessage.hx b/xmpp/ChatMessage.hx
index 5247f54..097c683 100644
--- a/xmpp/ChatMessage.hx
+++ b/xmpp/ChatMessage.hx
@@ -21,6 +21,7 @@ class ChatMessage {
 	public var localId (default, set) : Null<String> = null;
 	public var serverId (default, set) : Null<String> = null;
 	public var serverIdBy : Null<String> = null;
+	public var syncPoint : Bool = false;
 
 	public var timestamp (default, set) : Null<String> = null;
 
diff --git a/xmpp/MessageSync.hx b/xmpp/MessageSync.hx
index d899d3f..906ce96 100644
--- a/xmpp/MessageSync.hx
+++ b/xmpp/MessageSync.hx
@@ -84,6 +84,8 @@ class MessageSync {
 			if (msg == null) return EventHandled;
 
 			msg.serverId = result.attr.get("id");
+			msg.serverIdBy = serviceJID;
+			msg.syncPoint = true;
 			msg.timestamp = timestamp;
 
 			messages.push(msg);
diff --git a/xmpp/persistence/browser.js b/xmpp/persistence/browser.js
index d26443b..2111115 100644
--- a/xmpp/persistence/browser.js
+++ b/xmpp/persistence/browser.js
@@ -14,6 +14,10 @@ exports.xmpp.persistence = {
 					messages.createIndex("chats", ["account", "chatId", "timestamp"]);
 					messages.createIndex("localId", ["account", "chatId", "localId"]);
 				}
+				const messages = event.target.transaction.objectStore("messages");
+				if (!messages.indexNames.contains("accounts")) {
+					messages.createIndex("accounts", ["account", "timestamp"]);
+				}
 				if (!db.objectStoreNames.contains("keyvaluepairs")) {
 					upgradeDb.createObjectStore("keyvaluepairs");
 				}
@@ -26,6 +30,13 @@ exports.xmpp.persistence = {
 				if (!db.objectStoreNames.contains("messages") || !db.objectStoreNames.contains("keyvaluepairs") || !db.objectStoreNames.contains("chats")) {
 					db.close();
 					openDb(db.version + 1);
+					return;
+				}
+				const tx = db.transaction(["messages"], "readonly");
+				if (!tx.objectStore("messages").indexNames.contains("accounts")) {
+					db.close();
+					openDb(db.version + 1);
+					return;
 				}
 			};
 		}
@@ -51,6 +62,7 @@ exports.xmpp.persistence = {
 			message.localId = value.localId ? value.localId : null;
 			message.serverId = value.serverId ? value.serverId : null;
 			message.serverIdBy = value.serverIdBy ? value.serverIdBy : null;
+			message.syncPoint = !!value.syncPoint;
 			message.timestamp = value.timestamp && value.timestamp.toISOString();
 			message.to = value.to && xmpp.JID.parse(value.to);
 			message.from = value.from && xmpp.JID.parse(value.from);
@@ -66,13 +78,19 @@ exports.xmpp.persistence = {
 		}
 
 		return {
+			test: function() {
+				//messages = upgradeDb.createObjectStore("messages", { keyPath: ["account", "serverIdBy", "serverId", "localId"] });
+				const tx = db.transaction(["messages"], "readonly");
+				const store = tx.objectStore("messages");
+				return promisifyRequest(store.get(["singpolyma@singpolyma.net", "singpolyma@singpolyma.net", "5a6398a2-d560-43b1-988c-8852edc59521", ""]));
+			},
 			lastId: function(account, jid, callback) {
 				const tx = db.transaction(["messages"], "readonly");
 				const store = tx.objectStore("messages");
 				var cursor = null;
 				if (jid === null) {
-					cursor = store.index("chats").openCursor(
-						IDBKeyRange.bound([account], [account, [], []]),
+					cursor = store.index("accounts").openCursor(
+						IDBKeyRange.bound([account], [account, []]),
 						"prev"
 					);
 				} else {
@@ -82,7 +100,7 @@ exports.xmpp.persistence = {
 					);
 				}
 				cursor.onsuccess = (event) => {
-					if (!event.target.result || (event.target.result.value.serverId && (jid || event.target.result.value.serverIdBy === account))) {
+					if (!event.target.result || (event.target.result.value.syncPoint && event.target.result.value.serverId && (jid || event.target.result.value.serverIdBy === account))) {
 						callback(event.target.result ? event.target.result.value.serverId : null);
 					} else {
 						event.target.result.continue();
@@ -133,8 +151,8 @@ exports.xmpp.persistence = {
 				const tx = db.transaction(["messages"], "readonly");
 				const store = tx.objectStore("messages");
 
-				const cursor = store.index("chats").openCursor(
-					IDBKeyRange.bound([account], [account, [], []]),
+				const cursor = store.index("accounts").openCursor(
+					IDBKeyRange.bound([account], [account, []]),
 					"prev"
 				);
 				const chats = {};
@@ -175,6 +193,7 @@ exports.xmpp.persistence = {
 				const store = tx.objectStore("messages");
 				if (!message.serverId && !message.localId) throw "Cannot store a message with no id";
 				if (!message.serverId && message.isIncoming()) throw "Cannot store an incoming message with no server id";
+				if (message.serverId && !message.serverIdBy) throw "Cannot store a message with a server id and no by";
 				promisifyRequest(store.index("localId").get([account, message.chatId(), message.localId || []])).then((result) => {
 					if (result && !message.isIncoming() && result.direction === "MessageSent") return; // duplicate, we trust our own stanza ids
 
@@ -183,6 +202,7 @@ exports.xmpp.persistence = {
 						serverId: message.serverId || "",
 						serverIdBy: message.serverIdBy || "",
 						localId: message.localId || "",
+						syncPoint: !!message.syncPoint,
 						account: account,
 						chatId: message.chatId(),
 						to: message.to?.asString(),