| author | Stephen Paul Weber
<singpolyma@singpolyma.net> 2023-11-15 03:58:23 UTC |
| committer | Stephen Paul Weber
<singpolyma@singpolyma.net> 2023-11-15 17:48:29 UTC |
| parent | cb1476ac4c24e60cbdc52cd5b94dc6bef2856c07 |
| Makefile | +1 | -1 |
| xmpp/ChatMessage.hx | +9 | -0 |
| xmpp/Client.hx | +29 | -0 |
| xmpp/Persistence.hx | +1 | -0 |
| xmpp/persistence/browser.js | +48 | -12 |
| xmpp/streams/XmppJsStream.hx | +25 | -6 |
diff --git a/Makefile b/Makefile index 8887228..ea10689 100644 --- a/Makefile +++ b/Makefile @@ -13,6 +13,6 @@ run-nodejs: test.node.js browser.js: haxe browser.hxml echo "var exports = {};" > browser.js - sed -e 's/hxEnums\["xmpp.EventResult"\] = {/hxEnums["xmpp.EventResult"] = $$hx_exports.xmpp.EventResult = {/' < browser.haxe.js | sed -e 's/hxEnums\["xmpp.MessageDirection"\] = {/hxEnums["xmpp.MessageDirection"] = $$hx_exports.xmpp.MessageDirection = {/' | sed -e 's/hxEnums\["xmpp.UiState"\] = {/hxEnums["xmpp.UiState"] = $$hx_exports.xmpp.UiState = {/' >> browser.js + sed -e 's/hxEnums\["xmpp.EventResult"\] = {/hxEnums["xmpp.EventResult"] = $$hx_exports.xmpp.EventResult = {/' < browser.haxe.js | sed -e 's/hxEnums\["xmpp.MessageDirection"\] = {/hxEnums["xmpp.MessageDirection"] = $$hx_exports.xmpp.MessageDirection = {/' | sed -e 's/hxEnums\["xmpp.UiState"\] = {/hxEnums["xmpp.UiState"] = $$hx_exports.xmpp.UiState = {/' | sed -e 's/hxEnums\["xmpp.MessageStatus"\] = {/hxEnums["xmpp.MessageStatus"] = $$hx_exports.xmpp.MessageStatus = {/' >> browser.js cat xmpp/persistence/*.js >> browser.js echo "export const { xmpp } = exports;" >> browser.js diff --git a/xmpp/ChatMessage.hx b/xmpp/ChatMessage.hx index 4e9d83b..d67f587 100644 --- a/xmpp/ChatMessage.hx +++ b/xmpp/ChatMessage.hx @@ -10,6 +10,13 @@ enum MessageDirection { MessageSent; } +enum MessageStatus { + MessagePending; // Message is waiting in client for sending + MessageDeliveredToServer; // Server acknowledged receipt of the message + MessageDeliveredToDevice; //The message has been delivered to at least one client device + MessageFailedToSend; // There was an error sending this message +} + class ChatAttachment { public final uris: Array<String>; @@ -42,6 +49,7 @@ class ChatMessage { public var lang (default, null): Null<String> = null; public var direction: MessageDirection = MessageReceived; + public var status: MessageStatus = MessagePending; public var versions: Array<ChatMessage> = []; public function new() { } @@ -50,6 +58,7 @@ class ChatMessage { if (stanza.attr.get("type") == "error") return null; var msg = new ChatMessage(); + msg.status = MessageDeliveredToDevice; // Delivered to us, a device msg.timestamp = stanza.findText("{urn:xmpp:delay}delay@stamp") ?? Date.format(std.Date.now()); msg.lang = stanza.attr.get("xml:lang"); msg.text = stanza.getChildText("body"); diff --git a/xmpp/Client.hx b/xmpp/Client.hx index a56c89d..1a7b510 100644 --- a/xmpp/Client.hx +++ b/xmpp/Client.hx @@ -6,6 +6,7 @@ import haxe.io.BytesData; import js.html.rtc.IceServer; // only typedefs, should be portable import xmpp.Caps; import xmpp.Chat; +import xmpp.ChatMessage; import xmpp.EventEmitter; import xmpp.EventHandler; import xmpp.PubsubEvent; @@ -56,6 +57,34 @@ class Client extends xmpp.EventEmitter { return EventHandled; }); + stream.on("sm/ack", (data) -> { + persistence.updateMessageStatus( + accountId(), + data.id, + MessageDeliveredToServer, + (chatMessage) -> { + for (handler in chatMessageHandlers) { + handler(chatMessage); + } + } + ); + return EventHandled; + }); + + stream.on("sm/fail", (data) -> { + persistence.updateMessageStatus( + accountId(), + data.id, + MessageFailedToSend, + (chatMessage) -> { + for (handler in chatMessageHandlers) { + handler(chatMessage); + } + } + ); + return EventHandled; + }); + stream.on("message", function(event) { final stanza:Stanza = event.stanza; final from = stanza.attr.get("from") == null ? null : JID.parse(stanza.attr.get("from")); diff --git a/xmpp/Persistence.hx b/xmpp/Persistence.hx index febda3e..609061d 100644 --- a/xmpp/Persistence.hx +++ b/xmpp/Persistence.hx @@ -10,6 +10,7 @@ abstract class Persistence { abstract public function getChats(accountId: String, callback: (chats:Array<SerializedChat>)->Void):Void; abstract public function getChatsUnreadDetails(accountId: String, chats: Array<Chat>, callback: (details:Array<{ chatId: String, message: ChatMessage, unreadCount: Int }>)->Void):Void; abstract public function storeMessage(accountId: String, message: ChatMessage):Void; + abstract public function updateMessageStatus(accountId: String, localId: String, status:MessageStatus, callback: (ChatMessage)->Void):Void; abstract public function correctMessage(accountId: String, localId: String, message: ChatMessage, callback: (ChatMessage)->Void):Void; abstract public function getMessages(accountId: String, chatId: String, beforeId: Null<String>, beforeTime: Null<String>, callback: (messages:Array<ChatMessage>)->Void):Void; abstract public function getMediaUri(hashAlgorithm:String, hash:BytesData, callback: (uri:Null<String>)->Void):Void; diff --git a/xmpp/persistence/browser.js b/xmpp/persistence/browser.js index 9c21d77..fb6ab76 100644 --- a/xmpp/persistence/browser.js +++ b/xmpp/persistence/browser.js @@ -12,12 +12,16 @@ exports.xmpp.persistence = { if (!db.objectStoreNames.contains("messages")) { const messages = upgradeDb.createObjectStore("messages", { keyPath: ["account", "serverIdBy", "serverId", "localId"] }); messages.createIndex("chats", ["account", "chatId", "timestamp"]); - messages.createIndex("localId", ["account", "chatId", "localId"]); + messages.createIndex("localId", ["account", "localId", "chatId"]); } const messages = event.target.transaction.objectStore("messages"); if (!messages.indexNames.contains("accounts")) { messages.createIndex("accounts", ["account", "timestamp"]); } + if (messages.index("localId").keyPath.toString() !== "account,localId,chatId") { + messages.deleteIndex("localId"); + messages.createIndex("localId", ["account", "localId", "chatId"]); + } if (!db.objectStoreNames.contains("keyvaluepairs")) { upgradeDb.createObjectStore("keyvaluepairs"); } @@ -38,6 +42,11 @@ exports.xmpp.persistence = { openDb(db.version + 1); return; } + if (tx.objectStore("messages").index("localId").keyPath.toString() !== "account,localId,chatId") { + db.close(); + openDb(db.version + 1); + return; + } }; } openDb(); @@ -74,6 +83,22 @@ exports.xmpp.persistence = { message.text = value.text; message.lang = value.lang; message.direction = value.direction == "MessageReceived" ? xmpp.MessageDirection.MessageReceived : xmpp.MessageDirection.MessageSent; + switch (value.status) { + case "MessagePending": + message.status = xmpp.MessageStatus.MessagePending; + break; + case "MessageDeliveredToServer": + message.status = xmpp.MessageStatus.MessageDeliveredToServer; + break; + case "MessageDeliveredToDevice": + message.status = xmpp.MessageStatus.MessageDeliveredToDevice; + break; + case "MessageFailedToSend": + message.status = xmpp.MessageStatus.MessageFailedToSend; + break; + default: + message.status = message.serverId ? xmpp.MessageStatus.MessageDeliveredToServer : xmpp.MessageStatus.MessagePending; + } message.versions = (value.versions || []).map(hydrateMessage); return message; } @@ -94,6 +119,7 @@ exports.xmpp.persistence = { replyTo: message.replyTo.map((r) => r.asString()), timestamp: new Date(message.timestamp), direction: message.direction.toString(), + status: message.status.toString(), versions: message.versions.map((m) => serializeMessage(account, m)), } } @@ -216,7 +242,7 @@ exports.xmpp.persistence = { if (!message.serverId && !message.localId) throw "Cannot store a message with no id"; if (!message.serverId && message.isIncoming()) throw "Cannot store an incoming message with no server id"; if (message.serverId && !message.serverIdBy) throw "Cannot store a message with a server id and no by"; - promisifyRequest(store.index("localId").openCursor(IDBKeyRange.only([account, message.chatId(), message.localId || []]))).then((result) => { + promisifyRequest(store.index("localId").openCursor(IDBKeyRange.only([account, message.localId || [], message.chatId()]))).then((result) => { if (result?.value && !message.isIncoming() && result?.value.direction === "MessageSent") { // Duplicate, we trust our own sent ids return promisifyRequest(result.delete()); @@ -226,26 +252,36 @@ exports.xmpp.persistence = { }); }, + updateMessageStatus: function(account, localId, status, callback) { + const tx = db.transaction(["messages"], "readwrite"); + const store = tx.objectStore("messages"); + promisifyRequest(store.index("localId").openCursor(IDBKeyRange.bound([account, localId], [account, localId, []]))).then((result) => { + if (result?.value && result.value.direction == "MessageSent") { + const newStatus = { ...result.value, status: status.toString() }; + result.update(newStatus); + callback(hydrateMessage(newStatus)); + } + }); + }, + correctMessage: function(account, localId, message, callback) { const tx = db.transaction(["messages"], "readwrite"); const store = tx.objectStore("messages"); - const cursor = store.index("localId").openCursor(IDBKeyRange.only([account, message.chatId(), localId])); - cursor.onsuccess = (event) => { - if (event.target.result?.value && event.target.result.value.sender == message.senderId()) { + promisifyRequest(store.index("localId").openCursor(IDBKeyRange.only([account, localId, message.chatId()]))).then((result) => { + if (result?.value && result.value.sender == message.senderId()) { // Note, this strategy loses the ids of the replacement messages const withAnnotation = serializeMessage(account, message); - withAnnotation.serverIdBy = event.target.result.value.serverIdBy; - withAnnotation.serverId = event.target.result.value.serverId; - withAnnotation.localId = event.target.result.value.localId; - withAnnotation.versions = [{ ...event.target.result.value, versions: [] }].concat(event.target.result.value.versions || []) - event.target.result.update(withAnnotation); + withAnnotation.serverIdBy = result.value.serverIdBy; + withAnnotation.serverId = result.value.serverId; + withAnnotation.localId = result.value.localId; + withAnnotation.versions = [{ ...result.value, versions: [] }].concat(result.value.versions || []) + result.update(withAnnotation); callback(hydrateMessage(withAnnotation)); } else { this.storeMessage(account, message); callback(message); } - }; - cursor.onerror = console.error; + }); }, getMessages: function(account, chatId, beforeId, beforeTime, callback) { diff --git a/xmpp/streams/XmppJsStream.hx b/xmpp/streams/XmppJsStream.hx index e3623ee..5638cbc 100644 --- a/xmpp/streams/XmppJsStream.hx +++ b/xmpp/streams/XmppJsStream.hx @@ -176,9 +176,17 @@ class XmppJsStream extends GenericStream { this.state.event("connection-success"); } this.onStanza(convertToStanza(stanza)); - if (xmpp.streamManagement.enabled && xmpp.streamManagement.allowResume) { - this.trigger("sm/update", xmpp.streamManagement); - } + triggerSMupdate(); + }); + + xmpp.on("stream-management/ack", (stanza) -> { + if (stanza.name == "message") this.trigger("sm/ack", { id: stanza.attrs.id }); + triggerSMupdate(); + }); + + xmpp.on("stream-management/fail", (stanza) -> { + if (stanza.name == "message") this.trigger("sm/fail", { id: stanza.attrs.id }); + triggerSMupdate(); }); resumed = false; @@ -232,9 +240,7 @@ class XmppJsStream extends GenericStream { pending.push(convertFromStanza(stanza)); } else { client.send(convertFromStanza(stanza)); - if (client.streamManagement.enabled && client.streamManagement.allowResume) { - this.trigger("sm/update", client.streamManagement); - } + triggerSMupdate(); } } @@ -242,6 +248,19 @@ class XmppJsStream extends GenericStream { return XmppJsId.id(); } + private function triggerSMupdate() { + if (!client.streamManagement.enabled || !client.streamManagement.allowResume) return; + this.trigger( + "sm/update", + { + id: client.streamManagement.id, + outbound: client.streamManagement.outbound, + inbound: client.streamManagement.inbound, + outbound_q: (client.streamManagement.outbound_q ?? []).map((stanza) -> stanza.toString()), + } + ); + } + private function fromIqResult(result: IqResult): Any { switch (result) { case IqResultElement(el): return convertFromStanza(el);