git » sdk » commit 4eeff63

Store and resume session management

author Stephen Paul Weber
2023-11-08 21:15:46 UTC
committer Stephen Paul Weber
2023-11-09 02:49:05 UTC
parent 5ac5fa533aece884f9b69c24660320c937dd97f6

Store and resume session management

xmpp/Chat.hx +3 -3
xmpp/Client.hx +60 -51
xmpp/GenericStream.hx +1 -1
xmpp/MessageSync.hx +2 -2
xmpp/Persistence.hx +2 -0
xmpp/persistence/browser.js +20 -0
xmpp/streams/XmppJsStream.hx +54 -11

diff --git a/xmpp/Chat.hx b/xmpp/Chat.hx
index 68ce6c5..4817fcd 100644
--- a/xmpp/Chat.hx
+++ b/xmpp/Chat.hx
@@ -224,7 +224,7 @@ abstract class Chat {
 			final from = JID.parse(stanza.attr.get("from"));
 			if (from.asBare() != JID.parse(this.chatId)) return EventUnhandled;
 
-			final chatMessage = ChatMessage.fromStanza(stanza, this.client.jid);
+			final chatMessage = ChatMessage.fromStanza(stanza, client.jid);
 			if (chatMessage != null) handler(chatMessage);
 
 			return EventUnhandled; // Allow others to get this event as well
@@ -253,7 +253,7 @@ class DirectChat extends Chat {
 				var sync = new MessageSync(this.client, this.stream, filter);
 				sync.onMessages((messages) -> {
 					for (message in messages.messages) {
-						persistence.storeMessage(client.jid, message);
+						persistence.storeMessage(client.accountId(), message);
 					}
 					handler(messages.messages.filter((m) -> m.chatId() == chatId));
 				});
@@ -493,7 +493,7 @@ class Channel extends Chat {
 				sync.onMessages((messages) -> {
 					for (message in messages.messages) {
 						message = prepareIncomingMessage(message, new Stanza("message", { from: message.senderId() }));
-						persistence.storeMessage(client.jid, message);
+						persistence.storeMessage(client.accountId(), message);
 					}
 					handler(messages.messages.filter((m) -> m.chatId() == chatId));
 				});
diff --git a/xmpp/Client.hx b/xmpp/Client.hx
index c8b77fd..f0d0eaa 100644
--- a/xmpp/Client.hx
+++ b/xmpp/Client.hx
@@ -50,54 +50,12 @@ class Client extends xmpp.EventEmitter {
 		this.persistence = persistence;
 		stream = new Stream();
 		stream.on("status/online", this.onConnected);
-	}
-
-	public function accountId() {
-		return JID.parse(jid).asBare().asString();
-	}
-
-	public function displayName() {
-		return JID.parse(jid).node;
-	}
-
-	public function start() {
-		persistence.getChats(jid, (protoChats) -> {
-			for (protoChat in protoChats) {
-				chats.push(protoChat.toChat(this, stream, persistence));
-			}
-			persistence.getChatsUnreadDetails(accountId(), chats, (details) -> {
-				for (detail in details) {
-					var chat = getChat(detail.chatId);
-					if (chat != null) {
-						chat.setLastMessage(detail.message);
-						chat.setUnreadCount(detail.unreadCount);
-					}
-				}
-				chats.sort((a, b) -> -Reflect.compare(a.lastMessageTimestamp() ?? "0", b.lastMessageTimestamp() ?? "0"));
-				this.trigger("chats/update", chats);
-
-				persistence.getLogin(jid, (login) -> {
-					if (login.token == null) {
-						stream.on("auth/password-needed", (data)->this.trigger("auth/password-needed", { jid: this.jid }));
-					} else {
-						stream.on("auth/password-needed", (data)->this.stream.trigger("auth/password", { password: login.token }));
-					}
-					stream.connect(login.clientId == null ? jid : jid + "/" + login.clientId);
-				});
-			});
+		stream.on("sm/update", (data) -> {
+			persistence.storeStreamManagement(accountId(), data.id, data.outbound, data.inbound);
+			return EventHandled;
 		});
-	}
 
-	public function addChatMessageListener(handler:ChatMessage->Void):Void {
-		chatMessageHandlers.push(handler);
-	}
-
-	private function onConnected(data) {
-		if (data != null && data.jid != null) {
-			final jidp = JID.parse(data.jid);
-			if (!jidp.isBare()) persistence.storeLogin(jidp.asBare().asString(), jidp.resource, null);
-		}
-		this.stream.on("message", function(event) {
+		stream.on("message", function(event) {
 			final stanza:Stanza = event.stanza;
 			final from = stanza.attr.get("from") == null ? null : JID.parse(stanza.attr.get("from"));
 
@@ -199,7 +157,7 @@ class Client extends xmpp.EventEmitter {
 			return EventUnhandled; // Allow others to get this event as well
 		});
 
-		this.stream.onIq(Set, "jingle", "urn:xmpp:jingle:1", (stanza) -> {
+		stream.onIq(Set, "jingle", "urn:xmpp:jingle:1", (stanza) -> {
 			final from = stanza.attr.get("from") == null ? null : JID.parse(stanza.attr.get("from"));
 			final jingle = stanza.getChild("jingle", "urn:xmpp:jingle:1");
 			final chat = getDirectChat(from.asBare().asString());
@@ -249,12 +207,11 @@ class Client extends xmpp.EventEmitter {
 			return IqResult;
 		});
 
-
-		this.stream.onIq(Get, "query", "http://jabber.org/protocol/disco#info", (stanza) -> {
+		stream.onIq(Get, "query", "http://jabber.org/protocol/disco#info", (stanza) -> {
 			return IqResultElement(caps.discoReply());
 		});
 
-		this.stream.onIq(Set, "query", "jabber:iq:roster", (stanza) -> {
+		stream.onIq(Set, "query", "jabber:iq:roster", (stanza) -> {
 			if (
 				stanza.attr.get("from") != null &&
 				stanza.attr.get("from") != JID.parse(jid).domain
@@ -278,7 +235,7 @@ class Client extends xmpp.EventEmitter {
 			return IqResult;
 		});
 
-		this.stream.on("presence", function(event) {
+		stream.on("presence", function(event) {
 			final stanza:Stanza = event.stanza;
 			final c = stanza.getChild("c", "http://jabber.org/protocol/caps");
 			if (stanza.attr.get("from") != null && stanza.attr.get("type") == null) {
@@ -326,6 +283,58 @@ class Client extends xmpp.EventEmitter {
 
 			return EventUnhandled;
 		});
+	}
+
+	public function accountId() {
+		return JID.parse(jid).asBare().asString();
+	}
+
+	public function displayName() {
+		return JID.parse(jid).node;
+	}
+
+	public function start() {
+		persistence.getChats(jid, (protoChats) -> {
+			for (protoChat in protoChats) {
+				chats.push(protoChat.toChat(this, stream, persistence));
+			}
+			persistence.getChatsUnreadDetails(accountId(), chats, (details) -> {
+				for (detail in details) {
+					var chat = getChat(detail.chatId);
+					if (chat != null) {
+						chat.setLastMessage(detail.message);
+						chat.setUnreadCount(detail.unreadCount);
+					}
+				}
+				chats.sort((a, b) -> -Reflect.compare(a.lastMessageTimestamp() ?? "0", b.lastMessageTimestamp() ?? "0"));
+				this.trigger("chats/update", chats);
+
+				persistence.getStreamManagement(accountId(), (smId, smOut, smIn) -> {
+					persistence.getLogin(jid, (login) -> {
+						if (login.clientId != null) jid = JID.parse(jid).asBare().asString() + "/" + login.clientId;
+						if (login.token == null) {
+							stream.on("auth/password-needed", (data)->this.trigger("auth/password-needed", { jid: this.jid }));
+						} else {
+							stream.on("auth/password-needed", (data)->this.stream.trigger("auth/password", { password: login.token }));
+						}
+						stream.connect(jid, smId == null || smId == "" ? null : { id: smId, outbound: smOut, inbound: smIn });
+					});
+				});
+			});
+		});
+	}
+
+	public function addChatMessageListener(handler:ChatMessage->Void):Void {
+		chatMessageHandlers.push(handler);
+	}
+
+	private function onConnected(data) { // Fired on connect or reconnect
+		if (data != null && data.jid != null) {
+			final jidp = JID.parse(data.jid);
+			if (!jidp.isBare()) persistence.storeLogin(jidp.asBare().asString(), jidp.resource, null);
+		}
+
+		if (data.resumed) return EventHandled;
 
 		// Enable carbons
 		sendStanza(
diff --git a/xmpp/GenericStream.hx b/xmpp/GenericStream.hx
index f025879..c7e1f9d 100644
--- a/xmpp/GenericStream.hx
+++ b/xmpp/GenericStream.hx
@@ -17,7 +17,7 @@ abstract class GenericStream extends EventEmitter {
 	
 	/* Connections and streams */
 
-	abstract public function connect(jid:String):Void;
+	abstract public function connect(jid:String, sm:Null<{id:String,outbound:Int,inbound:Int}>):Void;
 	abstract public function sendStanza(stanza:Stanza):Void;
 	abstract public function newId():String;
 	abstract public function onIq(type:IqRequestType, tag:String, xmlns:String, handler:(Stanza)->IqResult):Void;
diff --git a/xmpp/MessageSync.hx b/xmpp/MessageSync.hx
index 906ce96..ec39db3 100644
--- a/xmpp/MessageSync.hx
+++ b/xmpp/MessageSync.hx
@@ -34,7 +34,7 @@ class MessageSync {
 		this.client = client;
 		this.stream = stream;
 		this.filter = Reflect.copy(filter);
-		this.serviceJID = serviceJID != null ? serviceJID : client.jid;
+		this.serviceJID = serviceJID != null ? serviceJID : client.accountId();
 	}
 
 	public function fetchNext():Void {
@@ -61,7 +61,7 @@ class MessageSync {
 		var query = new MAMQuery(filter, serviceJID);
 		var resultHandler = stream.on("message", function (event) {
 			var message:Stanza = event.stanza;
-			var from = message.attr.exists("from") ? message.attr.get("from") : client.jid;
+			var from = message.attr.exists("from") ? message.attr.get("from") : client.accountId();
 			if (from != serviceJID) { // Only listen for results from the JID we queried
 				return EventUnhandled;
 			}
diff --git a/xmpp/Persistence.hx b/xmpp/Persistence.hx
index 8dc9d14..a252aee 100644
--- a/xmpp/Persistence.hx
+++ b/xmpp/Persistence.hx
@@ -17,4 +17,6 @@ abstract class Persistence {
 	abstract public function getCaps(ver:String, callback: (Caps)->Void):Void;
 	abstract public function storeLogin(login:String, clientId:String, token:Null<String>):Void;
 	abstract public function getLogin(login:String, callback:({ ?clientId: String, ?token: String })->Void):Void;
+	abstract public function storeStreamManagement(accountId:String, smId:String, outboundCount:Int, inboundCount:Int):Void;
+	abstract public function getStreamManagement(accountId:String, callback: (smId:String, outboundCount:Int, inboundCount:Int)->Void):Void;
 }
diff --git a/xmpp/persistence/browser.js b/xmpp/persistence/browser.js
index 2111115..1ba4e84 100644
--- a/xmpp/persistence/browser.js
+++ b/xmpp/persistence/browser.js
@@ -299,6 +299,26 @@ exports.xmpp.persistence = {
 				if (token != null) store.put(token, "login:token:" + login).onerror = console.error;
 			},
 
+			storeStreamManagement: function(account, id, outbound, inbound) {
+				const tx = db.transaction(["keyvaluepairs"], "readwrite");
+				const store = tx.objectStore("keyvaluepairs");
+				store.put({ id: id, outbound: outbound, inbound: inbound }, "sm:" + account).onerror = console.error;
+			},
+
+			getStreamManagement: function(account, callback) {
+				const tx = db.transaction(["keyvaluepairs"], "readonly");
+				const store = tx.objectStore("keyvaluepairs");
+				promisifyRequest(store.get("sm:" + account)).then(
+					(v) => {
+						callback(v?.id, v?.outbound, v?.inbound);
+					},
+					(e) => {
+						console.error(e);
+						callback(null, -1, -1);
+					}
+				);
+			},
+
 			getLogin: function(login, callback) {
 				const tx = db.transaction(["keyvaluepairs"], "readonly");
 				const store = tx.objectStore("keyvaluepairs");
diff --git a/xmpp/streams/XmppJsStream.hx b/xmpp/streams/XmppJsStream.hx
index b597e52..e3623ee 100644
--- a/xmpp/streams/XmppJsStream.hx
+++ b/xmpp/streams/XmppJsStream.hx
@@ -14,10 +14,13 @@ extern class XmppJsClient {
 	function start():Promise<Dynamic>;
 	function on(eventName:String, callback:(Dynamic)->Void):Void;
 	function send(stanza:XmppJsXml):Void;
+	var jid:XmppJsJID;
+	var status: String;
 	var iqCallee:{
 		get: (String, String, ({stanza: XmppJsXml})->Any)->Void,
 		set: (String, String, ({stanza: XmppJsXml})->Any)->Void,
 	};
+	var streamManagement: { id:String, outbound: Int, inbound: Int, enabled: Bool, allowResume: Bool };
 }
 
 @:jsRequire("@xmpp/jid", "jid")
@@ -78,6 +81,9 @@ class XmppJsStream extends GenericStream {
 	private var debug = true;
 	private var state:FSM;
 	private var pending:Array<XmppJsXml> = [];
+	private var pendingOnIq:Array<{type:IqRequestType,tag:String,xmlns:String,handler:(Stanza)->IqResult}> = [];
+	private var initialSM: Null<{id:String,outbound:Int,inbound:Int}> = null;
+	private var resumed = false;
 
 	override public function new() {
 		super();
@@ -139,15 +145,20 @@ class XmppJsStream extends GenericStream {
 				new XmppJsDebug(xmpp, true);
 			}
 
+			if (initialSM != null) {
+				xmpp.streamManagement.id = initialSM.id;
+				xmpp.streamManagement.outbound = initialSM.outbound;
+				xmpp.streamManagement.inbound = initialSM.inbound;
+				initialSM = null;
+			}
+
 			this.client = xmpp;
+			processPendingOnIq();
 
 			xmpp.on("online", function (jid) {
+				resumed = false;
 				this.jid = jid;
 				this.state.event("connection-success");
-				var item;
-				while ((item = pending.shift()) != null) {
-					client.send(item);
-				}
 			});
 
 			xmpp.on("offline", function (data) {
@@ -155,9 +166,22 @@ class XmppJsStream extends GenericStream {
 			});
 
 			xmpp.on("stanza", function (stanza) {
+				if (xmpp.status == "online" && this.state.can("connection-success")) {
+					resumed = xmpp.streamManagement.enabled && xmpp.streamManagement.id != null && xmpp.streamManagement.id != "";
+					if (xmpp.jid == null) {
+						xmpp.jid = this.jid;
+					} else {
+						this.jid = xmpp.jid;
+					}
+					this.state.event("connection-success");
+				}
 				this.onStanza(convertToStanza(stanza));
+				if (xmpp.streamManagement.enabled && xmpp.streamManagement.allowResume) {
+					this.trigger("sm/update", xmpp.streamManagement);
+				}
 			});
 
+			resumed = false;
 			xmpp.start().catchError(function (err) {
 				trace(err);
 			});
@@ -166,9 +190,10 @@ class XmppJsStream extends GenericStream {
 		this.trigger("auth/password-needed", {});
 	}
 
-	public function connect(jid:String) {
+	public function connect(jid:String, sm:Null<{id:String,outbound:Int,inbound:Int}>) {
 		this.state.event("connect-requested");
 		this.jid = new XmppJsJID(jid);
+		this.initialSM = sm;
 
 		resolveConnectionURI(this.jid.domain, this.connectWithURI);
 	}
@@ -207,6 +232,9 @@ class XmppJsStream extends GenericStream {
 			pending.push(convertFromStanza(stanza));
 		} else {
 			client.send(convertFromStanza(stanza));
+			if (client.streamManagement.enabled && client.streamManagement.allowResume) {
+				this.trigger("sm/update", client.streamManagement);
+			}
 		}
 	}
 
@@ -223,18 +251,33 @@ class XmppJsStream extends GenericStream {
 	}
 
 	public function onIq(type:IqRequestType, tag:String, xmlns:String, handler:(Stanza)->IqResult) {
-		switch (type) {
-		case Get:
-			client.iqCallee.get(xmlns, tag, (el) -> fromIqResult(handler(convertToStanza(el.stanza))));
-		case Set:
-			client.iqCallee.set(xmlns, tag, (el) -> fromIqResult(handler(convertToStanza(el.stanza))));
+		if (client == null) {
+			pendingOnIq.push({ type: type, tag: tag, xmlns: xmlns, handler: handler });
+		} else {
+			switch (type) {
+			case Get:
+				client.iqCallee.get(xmlns, tag, (el) -> fromIqResult(handler(convertToStanza(el.stanza))));
+			case Set:
+				client.iqCallee.set(xmlns, tag, (el) -> fromIqResult(handler(convertToStanza(el.stanza))));
+			}
+		}
+	}
+
+	private function processPendingOnIq() {
+		var item;
+		while ((item = pendingOnIq.shift()) != null) {
+			onIq(item.type, item.tag, item.xmlns, item.handler);
 		}
 	}
 
 	/* State handlers */
 
 	private function onOnline(event) {
-		trigger("status/online", { jid: jid.toString() });
+		var item;
+		while ((item = pending.shift()) != null) {
+			client.send(item);
+		}
+		trigger("status/online", { jid: jid.toString(), resumed: resumed });
 	}
 
 	private function onOffline(event) {