git » sdk » commit 0dad51e

Store message status and update based on SM

author Stephen Paul Weber
2023-11-15 03:58:23 UTC
committer Stephen Paul Weber
2023-11-15 17:48:29 UTC
parent cb1476ac4c24e60cbdc52cd5b94dc6bef2856c07

Store message status and update based on SM

Makefile +1 -1
xmpp/ChatMessage.hx +9 -0
xmpp/Client.hx +29 -0
xmpp/Persistence.hx +1 -0
xmpp/persistence/browser.js +48 -12
xmpp/streams/XmppJsStream.hx +25 -6

diff --git a/Makefile b/Makefile
index 8887228..ea10689 100644
--- a/Makefile
+++ b/Makefile
@@ -13,6 +13,6 @@ run-nodejs: test.node.js
 browser.js:
 	haxe browser.hxml
 	echo "var exports = {};" > browser.js
-	sed -e 's/hxEnums\["xmpp.EventResult"\] = {/hxEnums["xmpp.EventResult"] = $$hx_exports.xmpp.EventResult = {/' < browser.haxe.js | sed -e 's/hxEnums\["xmpp.MessageDirection"\] = {/hxEnums["xmpp.MessageDirection"] = $$hx_exports.xmpp.MessageDirection = {/' | sed -e 's/hxEnums\["xmpp.UiState"\] = {/hxEnums["xmpp.UiState"] = $$hx_exports.xmpp.UiState = {/' >> browser.js
+	sed -e 's/hxEnums\["xmpp.EventResult"\] = {/hxEnums["xmpp.EventResult"] = $$hx_exports.xmpp.EventResult = {/' < browser.haxe.js | sed -e 's/hxEnums\["xmpp.MessageDirection"\] = {/hxEnums["xmpp.MessageDirection"] = $$hx_exports.xmpp.MessageDirection = {/' | sed -e 's/hxEnums\["xmpp.UiState"\] = {/hxEnums["xmpp.UiState"] = $$hx_exports.xmpp.UiState = {/' | sed -e 's/hxEnums\["xmpp.MessageStatus"\] = {/hxEnums["xmpp.MessageStatus"] = $$hx_exports.xmpp.MessageStatus = {/' >> browser.js
 	cat xmpp/persistence/*.js >> browser.js
 	echo "export const { xmpp } = exports;" >> browser.js
diff --git a/xmpp/ChatMessage.hx b/xmpp/ChatMessage.hx
index 4e9d83b..d67f587 100644
--- a/xmpp/ChatMessage.hx
+++ b/xmpp/ChatMessage.hx
@@ -10,6 +10,13 @@ enum MessageDirection {
 	MessageSent;
 }
 
+enum MessageStatus {
+	MessagePending; // Message is waiting in client for sending
+	MessageDeliveredToServer; // Server acknowledged receipt of the message
+	MessageDeliveredToDevice; //The message has been delivered to at least one client device
+	MessageFailedToSend; // There was an error sending this message
+}
+
 class ChatAttachment {
 	public final uris: Array<String>;
 
@@ -42,6 +49,7 @@ class ChatMessage {
 	public var lang (default, null): Null<String> = null;
 
 	public var direction: MessageDirection = MessageReceived;
+	public var status: MessageStatus = MessagePending;
 	public var versions: Array<ChatMessage> = [];
 
 	public function new() { }
@@ -50,6 +58,7 @@ class ChatMessage {
 		if (stanza.attr.get("type") == "error") return null;
 
 		var msg = new ChatMessage();
+		msg.status = MessageDeliveredToDevice; // Delivered to us, a device
 		msg.timestamp = stanza.findText("{urn:xmpp:delay}delay@stamp") ?? Date.format(std.Date.now());
 		msg.lang = stanza.attr.get("xml:lang");
 		msg.text = stanza.getChildText("body");
diff --git a/xmpp/Client.hx b/xmpp/Client.hx
index a56c89d..1a7b510 100644
--- a/xmpp/Client.hx
+++ b/xmpp/Client.hx
@@ -6,6 +6,7 @@ import haxe.io.BytesData;
 import js.html.rtc.IceServer; // only typedefs, should be portable
 import xmpp.Caps;
 import xmpp.Chat;
+import xmpp.ChatMessage;
 import xmpp.EventEmitter;
 import xmpp.EventHandler;
 import xmpp.PubsubEvent;
@@ -56,6 +57,34 @@ class Client extends xmpp.EventEmitter {
 			return EventHandled;
 		});
 
+		stream.on("sm/ack", (data) -> {
+			persistence.updateMessageStatus(
+				accountId(),
+				data.id,
+				MessageDeliveredToServer,
+				(chatMessage) -> {
+					for (handler in chatMessageHandlers) {
+						handler(chatMessage);
+					}
+				}
+			);
+			return EventHandled;
+		});
+
+		stream.on("sm/fail", (data) -> {
+			persistence.updateMessageStatus(
+				accountId(),
+				data.id,
+				MessageFailedToSend,
+				(chatMessage) -> {
+					for (handler in chatMessageHandlers) {
+						handler(chatMessage);
+					}
+				}
+			);
+			return EventHandled;
+		});
+
 		stream.on("message", function(event) {
 			final stanza:Stanza = event.stanza;
 			final from = stanza.attr.get("from") == null ? null : JID.parse(stanza.attr.get("from"));
diff --git a/xmpp/Persistence.hx b/xmpp/Persistence.hx
index febda3e..609061d 100644
--- a/xmpp/Persistence.hx
+++ b/xmpp/Persistence.hx
@@ -10,6 +10,7 @@ abstract class Persistence {
 	abstract public function getChats(accountId: String, callback: (chats:Array<SerializedChat>)->Void):Void;
 	abstract public function getChatsUnreadDetails(accountId: String, chats: Array<Chat>, callback: (details:Array<{ chatId: String, message: ChatMessage, unreadCount: Int }>)->Void):Void;
 	abstract public function storeMessage(accountId: String, message: ChatMessage):Void;
+	abstract public function updateMessageStatus(accountId: String, localId: String, status:MessageStatus, callback: (ChatMessage)->Void):Void;
 	abstract public function correctMessage(accountId: String, localId: String, message: ChatMessage, callback: (ChatMessage)->Void):Void;
 	abstract public function getMessages(accountId: String, chatId: String, beforeId: Null<String>, beforeTime: Null<String>, callback: (messages:Array<ChatMessage>)->Void):Void;
 	abstract public function getMediaUri(hashAlgorithm:String, hash:BytesData, callback: (uri:Null<String>)->Void):Void;
diff --git a/xmpp/persistence/browser.js b/xmpp/persistence/browser.js
index 9c21d77..fb6ab76 100644
--- a/xmpp/persistence/browser.js
+++ b/xmpp/persistence/browser.js
@@ -12,12 +12,16 @@ exports.xmpp.persistence = {
 				if (!db.objectStoreNames.contains("messages")) {
 					const messages = upgradeDb.createObjectStore("messages", { keyPath: ["account", "serverIdBy", "serverId", "localId"] });
 					messages.createIndex("chats", ["account", "chatId", "timestamp"]);
-					messages.createIndex("localId", ["account", "chatId", "localId"]);
+					messages.createIndex("localId", ["account", "localId", "chatId"]);
 				}
 				const messages = event.target.transaction.objectStore("messages");
 				if (!messages.indexNames.contains("accounts")) {
 					messages.createIndex("accounts", ["account", "timestamp"]);
 				}
+				if (messages.index("localId").keyPath.toString() !== "account,localId,chatId") {
+					messages.deleteIndex("localId");
+					messages.createIndex("localId", ["account", "localId", "chatId"]);
+				}
 				if (!db.objectStoreNames.contains("keyvaluepairs")) {
 					upgradeDb.createObjectStore("keyvaluepairs");
 				}
@@ -38,6 +42,11 @@ exports.xmpp.persistence = {
 					openDb(db.version + 1);
 					return;
 				}
+				if (tx.objectStore("messages").index("localId").keyPath.toString() !== "account,localId,chatId") {
+					db.close();
+					openDb(db.version + 1);
+					return;
+				}
 			};
 		}
 		openDb();
@@ -74,6 +83,22 @@ exports.xmpp.persistence = {
 			message.text = value.text;
 			message.lang = value.lang;
 			message.direction = value.direction == "MessageReceived" ? xmpp.MessageDirection.MessageReceived : xmpp.MessageDirection.MessageSent;
+			switch (value.status) {
+				case "MessagePending":
+					message.status = xmpp.MessageStatus.MessagePending;
+					break;
+				case "MessageDeliveredToServer":
+					message.status = xmpp.MessageStatus.MessageDeliveredToServer;
+					break;
+				case "MessageDeliveredToDevice":
+					message.status = xmpp.MessageStatus.MessageDeliveredToDevice;
+					break;
+				case "MessageFailedToSend":
+					message.status = xmpp.MessageStatus.MessageFailedToSend;
+					break;
+				default:
+					message.status = message.serverId ? xmpp.MessageStatus.MessageDeliveredToServer : xmpp.MessageStatus.MessagePending;
+			}
 			message.versions = (value.versions || []).map(hydrateMessage);
 			return message;
 		}
@@ -94,6 +119,7 @@ exports.xmpp.persistence = {
 				replyTo: message.replyTo.map((r) => r.asString()),
 				timestamp: new Date(message.timestamp),
 				direction: message.direction.toString(),
+				status: message.status.toString(),
 				versions: message.versions.map((m) => serializeMessage(account, m)),
 			}
 		}
@@ -216,7 +242,7 @@ exports.xmpp.persistence = {
 				if (!message.serverId && !message.localId) throw "Cannot store a message with no id";
 				if (!message.serverId && message.isIncoming()) throw "Cannot store an incoming message with no server id";
 				if (message.serverId && !message.serverIdBy) throw "Cannot store a message with a server id and no by";
-				promisifyRequest(store.index("localId").openCursor(IDBKeyRange.only([account, message.chatId(), message.localId || []]))).then((result) => {
+				promisifyRequest(store.index("localId").openCursor(IDBKeyRange.only([account, message.localId || [], message.chatId()]))).then((result) => {
 					if (result?.value && !message.isIncoming() && result?.value.direction === "MessageSent") {
 						// Duplicate, we trust our own sent ids
 						return promisifyRequest(result.delete());
@@ -226,26 +252,36 @@ exports.xmpp.persistence = {
 				});
 			},
 
+			updateMessageStatus: function(account, localId, status, callback) {
+				const tx = db.transaction(["messages"], "readwrite");
+				const store = tx.objectStore("messages");
+				promisifyRequest(store.index("localId").openCursor(IDBKeyRange.bound([account, localId], [account, localId, []]))).then((result) => {
+					if (result?.value && result.value.direction == "MessageSent") {
+						const newStatus = { ...result.value, status: status.toString() };
+						result.update(newStatus);
+						callback(hydrateMessage(newStatus));
+					}
+				});
+			},
+
 			correctMessage: function(account, localId, message, callback) {
 				const tx = db.transaction(["messages"], "readwrite");
 				const store = tx.objectStore("messages");
-				const cursor = store.index("localId").openCursor(IDBKeyRange.only([account, message.chatId(), localId]));
-				cursor.onsuccess = (event) => {
-					if (event.target.result?.value && event.target.result.value.sender == message.senderId()) {
+				promisifyRequest(store.index("localId").openCursor(IDBKeyRange.only([account, localId, message.chatId()]))).then((result) => {
+					if (result?.value && result.value.sender == message.senderId()) {
 						// Note, this strategy loses the ids of the replacement messages
 						const withAnnotation = serializeMessage(account, message);
-						withAnnotation.serverIdBy = event.target.result.value.serverIdBy;
-						withAnnotation.serverId = event.target.result.value.serverId;
-						withAnnotation.localId = event.target.result.value.localId;
-						withAnnotation.versions = [{ ...event.target.result.value, versions: [] }].concat(event.target.result.value.versions || [])
-						event.target.result.update(withAnnotation);
+						withAnnotation.serverIdBy = result.value.serverIdBy;
+						withAnnotation.serverId = result.value.serverId;
+						withAnnotation.localId = result.value.localId;
+						withAnnotation.versions = [{ ...result.value, versions: [] }].concat(result.value.versions || [])
+						result.update(withAnnotation);
 						callback(hydrateMessage(withAnnotation));
 					} else {
 						this.storeMessage(account, message);
 						callback(message);
 					}
-				};
-				cursor.onerror = console.error;
+				});
 			},
 
 			getMessages: function(account, chatId, beforeId, beforeTime, callback) {
diff --git a/xmpp/streams/XmppJsStream.hx b/xmpp/streams/XmppJsStream.hx
index e3623ee..5638cbc 100644
--- a/xmpp/streams/XmppJsStream.hx
+++ b/xmpp/streams/XmppJsStream.hx
@@ -176,9 +176,17 @@ class XmppJsStream extends GenericStream {
 					this.state.event("connection-success");
 				}
 				this.onStanza(convertToStanza(stanza));
-				if (xmpp.streamManagement.enabled && xmpp.streamManagement.allowResume) {
-					this.trigger("sm/update", xmpp.streamManagement);
-				}
+				triggerSMupdate();
+			});
+
+			xmpp.on("stream-management/ack", (stanza) -> {
+				if (stanza.name == "message") this.trigger("sm/ack", { id: stanza.attrs.id });
+				triggerSMupdate();
+			});
+
+			xmpp.on("stream-management/fail", (stanza) -> {
+				if (stanza.name == "message") this.trigger("sm/fail", { id: stanza.attrs.id });
+				triggerSMupdate();
 			});
 
 			resumed = false;
@@ -232,9 +240,7 @@ class XmppJsStream extends GenericStream {
 			pending.push(convertFromStanza(stanza));
 		} else {
 			client.send(convertFromStanza(stanza));
-			if (client.streamManagement.enabled && client.streamManagement.allowResume) {
-				this.trigger("sm/update", client.streamManagement);
-			}
+			triggerSMupdate();
 		}
 	}
 
@@ -242,6 +248,19 @@ class XmppJsStream extends GenericStream {
 		return XmppJsId.id();
 	}
 
+	private function triggerSMupdate() {
+		if (!client.streamManagement.enabled || !client.streamManagement.allowResume) return;
+		this.trigger(
+			"sm/update",
+			{
+				id: client.streamManagement.id,
+				outbound: client.streamManagement.outbound,
+				inbound: client.streamManagement.inbound,
+				outbound_q: (client.streamManagement.outbound_q ?? []).map((stanza) -> stanza.toString()),
+			}
+		);
+	}
+
 	private function fromIqResult(result: IqResult): Any {
 		switch (result) {
 		case IqResultElement(el): return convertFromStanza(el);