| author | Stephen Paul Weber
<singpolyma@singpolyma.net> 2023-11-08 21:15:46 UTC |
| committer | Stephen Paul Weber
<singpolyma@singpolyma.net> 2023-11-09 02:49:05 UTC |
| parent | 5ac5fa533aece884f9b69c24660320c937dd97f6 |
| xmpp/Chat.hx | +3 | -3 |
| xmpp/Client.hx | +60 | -51 |
| xmpp/GenericStream.hx | +1 | -1 |
| xmpp/MessageSync.hx | +2 | -2 |
| xmpp/Persistence.hx | +2 | -0 |
| xmpp/persistence/browser.js | +20 | -0 |
| xmpp/streams/XmppJsStream.hx | +54 | -11 |
diff --git a/xmpp/Chat.hx b/xmpp/Chat.hx index 68ce6c5..4817fcd 100644 --- a/xmpp/Chat.hx +++ b/xmpp/Chat.hx @@ -224,7 +224,7 @@ abstract class Chat { final from = JID.parse(stanza.attr.get("from")); if (from.asBare() != JID.parse(this.chatId)) return EventUnhandled; - final chatMessage = ChatMessage.fromStanza(stanza, this.client.jid); + final chatMessage = ChatMessage.fromStanza(stanza, client.jid); if (chatMessage != null) handler(chatMessage); return EventUnhandled; // Allow others to get this event as well @@ -253,7 +253,7 @@ class DirectChat extends Chat { var sync = new MessageSync(this.client, this.stream, filter); sync.onMessages((messages) -> { for (message in messages.messages) { - persistence.storeMessage(client.jid, message); + persistence.storeMessage(client.accountId(), message); } handler(messages.messages.filter((m) -> m.chatId() == chatId)); }); @@ -493,7 +493,7 @@ class Channel extends Chat { sync.onMessages((messages) -> { for (message in messages.messages) { message = prepareIncomingMessage(message, new Stanza("message", { from: message.senderId() })); - persistence.storeMessage(client.jid, message); + persistence.storeMessage(client.accountId(), message); } handler(messages.messages.filter((m) -> m.chatId() == chatId)); }); diff --git a/xmpp/Client.hx b/xmpp/Client.hx index c8b77fd..f0d0eaa 100644 --- a/xmpp/Client.hx +++ b/xmpp/Client.hx @@ -50,54 +50,12 @@ class Client extends xmpp.EventEmitter { this.persistence = persistence; stream = new Stream(); stream.on("status/online", this.onConnected); - } - - public function accountId() { - return JID.parse(jid).asBare().asString(); - } - - public function displayName() { - return JID.parse(jid).node; - } - - public function start() { - persistence.getChats(jid, (protoChats) -> { - for (protoChat in protoChats) { - chats.push(protoChat.toChat(this, stream, persistence)); - } - persistence.getChatsUnreadDetails(accountId(), chats, (details) -> { - for (detail in details) { - var chat = getChat(detail.chatId); - if (chat != null) { - chat.setLastMessage(detail.message); - chat.setUnreadCount(detail.unreadCount); - } - } - chats.sort((a, b) -> -Reflect.compare(a.lastMessageTimestamp() ?? "0", b.lastMessageTimestamp() ?? "0")); - this.trigger("chats/update", chats); - - persistence.getLogin(jid, (login) -> { - if (login.token == null) { - stream.on("auth/password-needed", (data)->this.trigger("auth/password-needed", { jid: this.jid })); - } else { - stream.on("auth/password-needed", (data)->this.stream.trigger("auth/password", { password: login.token })); - } - stream.connect(login.clientId == null ? jid : jid + "/" + login.clientId); - }); - }); + stream.on("sm/update", (data) -> { + persistence.storeStreamManagement(accountId(), data.id, data.outbound, data.inbound); + return EventHandled; }); - } - public function addChatMessageListener(handler:ChatMessage->Void):Void { - chatMessageHandlers.push(handler); - } - - private function onConnected(data) { - if (data != null && data.jid != null) { - final jidp = JID.parse(data.jid); - if (!jidp.isBare()) persistence.storeLogin(jidp.asBare().asString(), jidp.resource, null); - } - this.stream.on("message", function(event) { + stream.on("message", function(event) { final stanza:Stanza = event.stanza; final from = stanza.attr.get("from") == null ? null : JID.parse(stanza.attr.get("from")); @@ -199,7 +157,7 @@ class Client extends xmpp.EventEmitter { return EventUnhandled; // Allow others to get this event as well }); - this.stream.onIq(Set, "jingle", "urn:xmpp:jingle:1", (stanza) -> { + stream.onIq(Set, "jingle", "urn:xmpp:jingle:1", (stanza) -> { final from = stanza.attr.get("from") == null ? null : JID.parse(stanza.attr.get("from")); final jingle = stanza.getChild("jingle", "urn:xmpp:jingle:1"); final chat = getDirectChat(from.asBare().asString()); @@ -249,12 +207,11 @@ class Client extends xmpp.EventEmitter { return IqResult; }); - - this.stream.onIq(Get, "query", "http://jabber.org/protocol/disco#info", (stanza) -> { + stream.onIq(Get, "query", "http://jabber.org/protocol/disco#info", (stanza) -> { return IqResultElement(caps.discoReply()); }); - this.stream.onIq(Set, "query", "jabber:iq:roster", (stanza) -> { + stream.onIq(Set, "query", "jabber:iq:roster", (stanza) -> { if ( stanza.attr.get("from") != null && stanza.attr.get("from") != JID.parse(jid).domain @@ -278,7 +235,7 @@ class Client extends xmpp.EventEmitter { return IqResult; }); - this.stream.on("presence", function(event) { + stream.on("presence", function(event) { final stanza:Stanza = event.stanza; final c = stanza.getChild("c", "http://jabber.org/protocol/caps"); if (stanza.attr.get("from") != null && stanza.attr.get("type") == null) { @@ -326,6 +283,58 @@ class Client extends xmpp.EventEmitter { return EventUnhandled; }); + } + + public function accountId() { + return JID.parse(jid).asBare().asString(); + } + + public function displayName() { + return JID.parse(jid).node; + } + + public function start() { + persistence.getChats(jid, (protoChats) -> { + for (protoChat in protoChats) { + chats.push(protoChat.toChat(this, stream, persistence)); + } + persistence.getChatsUnreadDetails(accountId(), chats, (details) -> { + for (detail in details) { + var chat = getChat(detail.chatId); + if (chat != null) { + chat.setLastMessage(detail.message); + chat.setUnreadCount(detail.unreadCount); + } + } + chats.sort((a, b) -> -Reflect.compare(a.lastMessageTimestamp() ?? "0", b.lastMessageTimestamp() ?? "0")); + this.trigger("chats/update", chats); + + persistence.getStreamManagement(accountId(), (smId, smOut, smIn) -> { + persistence.getLogin(jid, (login) -> { + if (login.clientId != null) jid = JID.parse(jid).asBare().asString() + "/" + login.clientId; + if (login.token == null) { + stream.on("auth/password-needed", (data)->this.trigger("auth/password-needed", { jid: this.jid })); + } else { + stream.on("auth/password-needed", (data)->this.stream.trigger("auth/password", { password: login.token })); + } + stream.connect(jid, smId == null || smId == "" ? null : { id: smId, outbound: smOut, inbound: smIn }); + }); + }); + }); + }); + } + + public function addChatMessageListener(handler:ChatMessage->Void):Void { + chatMessageHandlers.push(handler); + } + + private function onConnected(data) { // Fired on connect or reconnect + if (data != null && data.jid != null) { + final jidp = JID.parse(data.jid); + if (!jidp.isBare()) persistence.storeLogin(jidp.asBare().asString(), jidp.resource, null); + } + + if (data.resumed) return EventHandled; // Enable carbons sendStanza( diff --git a/xmpp/GenericStream.hx b/xmpp/GenericStream.hx index f025879..c7e1f9d 100644 --- a/xmpp/GenericStream.hx +++ b/xmpp/GenericStream.hx @@ -17,7 +17,7 @@ abstract class GenericStream extends EventEmitter { /* Connections and streams */ - abstract public function connect(jid:String):Void; + abstract public function connect(jid:String, sm:Null<{id:String,outbound:Int,inbound:Int}>):Void; abstract public function sendStanza(stanza:Stanza):Void; abstract public function newId():String; abstract public function onIq(type:IqRequestType, tag:String, xmlns:String, handler:(Stanza)->IqResult):Void; diff --git a/xmpp/MessageSync.hx b/xmpp/MessageSync.hx index 906ce96..ec39db3 100644 --- a/xmpp/MessageSync.hx +++ b/xmpp/MessageSync.hx @@ -34,7 +34,7 @@ class MessageSync { this.client = client; this.stream = stream; this.filter = Reflect.copy(filter); - this.serviceJID = serviceJID != null ? serviceJID : client.jid; + this.serviceJID = serviceJID != null ? serviceJID : client.accountId(); } public function fetchNext():Void { @@ -61,7 +61,7 @@ class MessageSync { var query = new MAMQuery(filter, serviceJID); var resultHandler = stream.on("message", function (event) { var message:Stanza = event.stanza; - var from = message.attr.exists("from") ? message.attr.get("from") : client.jid; + var from = message.attr.exists("from") ? message.attr.get("from") : client.accountId(); if (from != serviceJID) { // Only listen for results from the JID we queried return EventUnhandled; } diff --git a/xmpp/Persistence.hx b/xmpp/Persistence.hx index 8dc9d14..a252aee 100644 --- a/xmpp/Persistence.hx +++ b/xmpp/Persistence.hx @@ -17,4 +17,6 @@ abstract class Persistence { abstract public function getCaps(ver:String, callback: (Caps)->Void):Void; abstract public function storeLogin(login:String, clientId:String, token:Null<String>):Void; abstract public function getLogin(login:String, callback:({ ?clientId: String, ?token: String })->Void):Void; + abstract public function storeStreamManagement(accountId:String, smId:String, outboundCount:Int, inboundCount:Int):Void; + abstract public function getStreamManagement(accountId:String, callback: (smId:String, outboundCount:Int, inboundCount:Int)->Void):Void; } diff --git a/xmpp/persistence/browser.js b/xmpp/persistence/browser.js index 2111115..1ba4e84 100644 --- a/xmpp/persistence/browser.js +++ b/xmpp/persistence/browser.js @@ -299,6 +299,26 @@ exports.xmpp.persistence = { if (token != null) store.put(token, "login:token:" + login).onerror = console.error; }, + storeStreamManagement: function(account, id, outbound, inbound) { + const tx = db.transaction(["keyvaluepairs"], "readwrite"); + const store = tx.objectStore("keyvaluepairs"); + store.put({ id: id, outbound: outbound, inbound: inbound }, "sm:" + account).onerror = console.error; + }, + + getStreamManagement: function(account, callback) { + const tx = db.transaction(["keyvaluepairs"], "readonly"); + const store = tx.objectStore("keyvaluepairs"); + promisifyRequest(store.get("sm:" + account)).then( + (v) => { + callback(v?.id, v?.outbound, v?.inbound); + }, + (e) => { + console.error(e); + callback(null, -1, -1); + } + ); + }, + getLogin: function(login, callback) { const tx = db.transaction(["keyvaluepairs"], "readonly"); const store = tx.objectStore("keyvaluepairs"); diff --git a/xmpp/streams/XmppJsStream.hx b/xmpp/streams/XmppJsStream.hx index b597e52..e3623ee 100644 --- a/xmpp/streams/XmppJsStream.hx +++ b/xmpp/streams/XmppJsStream.hx @@ -14,10 +14,13 @@ extern class XmppJsClient { function start():Promise<Dynamic>; function on(eventName:String, callback:(Dynamic)->Void):Void; function send(stanza:XmppJsXml):Void; + var jid:XmppJsJID; + var status: String; var iqCallee:{ get: (String, String, ({stanza: XmppJsXml})->Any)->Void, set: (String, String, ({stanza: XmppJsXml})->Any)->Void, }; + var streamManagement: { id:String, outbound: Int, inbound: Int, enabled: Bool, allowResume: Bool }; } @:jsRequire("@xmpp/jid", "jid") @@ -78,6 +81,9 @@ class XmppJsStream extends GenericStream { private var debug = true; private var state:FSM; private var pending:Array<XmppJsXml> = []; + private var pendingOnIq:Array<{type:IqRequestType,tag:String,xmlns:String,handler:(Stanza)->IqResult}> = []; + private var initialSM: Null<{id:String,outbound:Int,inbound:Int}> = null; + private var resumed = false; override public function new() { super(); @@ -139,15 +145,20 @@ class XmppJsStream extends GenericStream { new XmppJsDebug(xmpp, true); } + if (initialSM != null) { + xmpp.streamManagement.id = initialSM.id; + xmpp.streamManagement.outbound = initialSM.outbound; + xmpp.streamManagement.inbound = initialSM.inbound; + initialSM = null; + } + this.client = xmpp; + processPendingOnIq(); xmpp.on("online", function (jid) { + resumed = false; this.jid = jid; this.state.event("connection-success"); - var item; - while ((item = pending.shift()) != null) { - client.send(item); - } }); xmpp.on("offline", function (data) { @@ -155,9 +166,22 @@ class XmppJsStream extends GenericStream { }); xmpp.on("stanza", function (stanza) { + if (xmpp.status == "online" && this.state.can("connection-success")) { + resumed = xmpp.streamManagement.enabled && xmpp.streamManagement.id != null && xmpp.streamManagement.id != ""; + if (xmpp.jid == null) { + xmpp.jid = this.jid; + } else { + this.jid = xmpp.jid; + } + this.state.event("connection-success"); + } this.onStanza(convertToStanza(stanza)); + if (xmpp.streamManagement.enabled && xmpp.streamManagement.allowResume) { + this.trigger("sm/update", xmpp.streamManagement); + } }); + resumed = false; xmpp.start().catchError(function (err) { trace(err); }); @@ -166,9 +190,10 @@ class XmppJsStream extends GenericStream { this.trigger("auth/password-needed", {}); } - public function connect(jid:String) { + public function connect(jid:String, sm:Null<{id:String,outbound:Int,inbound:Int}>) { this.state.event("connect-requested"); this.jid = new XmppJsJID(jid); + this.initialSM = sm; resolveConnectionURI(this.jid.domain, this.connectWithURI); } @@ -207,6 +232,9 @@ class XmppJsStream extends GenericStream { pending.push(convertFromStanza(stanza)); } else { client.send(convertFromStanza(stanza)); + if (client.streamManagement.enabled && client.streamManagement.allowResume) { + this.trigger("sm/update", client.streamManagement); + } } } @@ -223,18 +251,33 @@ class XmppJsStream extends GenericStream { } public function onIq(type:IqRequestType, tag:String, xmlns:String, handler:(Stanza)->IqResult) { - switch (type) { - case Get: - client.iqCallee.get(xmlns, tag, (el) -> fromIqResult(handler(convertToStanza(el.stanza)))); - case Set: - client.iqCallee.set(xmlns, tag, (el) -> fromIqResult(handler(convertToStanza(el.stanza)))); + if (client == null) { + pendingOnIq.push({ type: type, tag: tag, xmlns: xmlns, handler: handler }); + } else { + switch (type) { + case Get: + client.iqCallee.get(xmlns, tag, (el) -> fromIqResult(handler(convertToStanza(el.stanza)))); + case Set: + client.iqCallee.set(xmlns, tag, (el) -> fromIqResult(handler(convertToStanza(el.stanza)))); + } + } + } + + private function processPendingOnIq() { + var item; + while ((item = pendingOnIq.shift()) != null) { + onIq(item.type, item.tag, item.xmlns, item.handler); } } /* State handlers */ private function onOnline(event) { - trigger("status/online", { jid: jid.toString() }); + var item; + while ((item = pending.shift()) != null) { + client.send(item); + } + trigger("status/online", { jid: jid.toString(), resumed: resumed }); } private function onOffline(event) {