git » sdk » commit e6b81f1

Force ordered delivery of stanzas while live

author Stephen Paul Weber
2025-11-12 16:51:15 UTC
committer Stephen Paul Weber
2025-11-12 16:51:15 UTC
parent 8052bd2655a765ef2770c0492e196ee455ddcf9c

Force ordered delivery of stanzas while live

There can be async prep work to do before sending a stanza. Force
stanzas to go in order even if their prep work takes different amounts
of time.

The outbox can also be paused so that stanzas can be delayed eg for when
a MUC is still joining and it is not safe to send a message yet.

borogove/Chat.hx +22 -8
borogove/Outbox.hx +56 -0

diff --git a/borogove/Chat.hx b/borogove/Chat.hx
index 573e4e3..c6d7feb 100644
--- a/borogove/Chat.hx
+++ b/borogove/Chat.hx
@@ -11,6 +11,7 @@ import borogove.GenericStream;
 import borogove.ID;
 import borogove.Message;
 import borogove.MessageSync;
+import borogove.Outbox;
 import borogove.Reaction;
 #if !NO_JINGLE
 import borogove.calls.PeerConnection;
@@ -98,7 +99,7 @@ abstract class Chat {
 	private var isActive: Null<Bool> = null;
 	private var activeThread: Null<String> = null;
 	private var notificationSettings: Null<{reply: Bool, mention: Bool}> = null;
-
+	private var outbox = new Outbox();
 	private var _encryptionMode: EncryptionMode = Unencrypted;
 
 	@:allow(borogove)
@@ -785,6 +786,7 @@ class DirectChat extends Chat {
 	@:allow(borogove)
 	private function new(client:Client, stream:GenericStream, persistence:Persistence, chatId:String, uiState = Open, isBlocked = false, extensions: Null<Stanza> = null, readUpToId: Null<String> = null, readUpToBy: Null<String> = null, omemoContactDeviceIDs: Array<Int> = null) {
 		super(client, stream, persistence, chatId, uiState, isBlocked, extensions, readUpToId, readUpToBy, omemoContactDeviceIDs);
+		outbox.start();
 	}
 
 	@HaxeCBridge.noemit // on superclass as abstract
@@ -899,8 +901,9 @@ class DirectChat extends Chat {
 		final fromStanza = Message.fromStanza(message.build().asStanza(), client.jid).parsed;
 		switch (fromStanza) {
 			case ChatMessageStanza(_):
+				var outboxItem = outbox.newItem();
 				client.storeMessages([message.build()]).then((stored) -> {
-					for (recipient in message.recipients) {
+					thenshim.PromiseTools.all(message.recipients.map(recipient -> {
 						message.to = recipient;
 						final stanza = message.build().asStanza();
 						if (isActive != null) {
@@ -908,15 +911,20 @@ class DirectChat extends Chat {
 							activeThread = message.threadId;
 							stanza.tag("active", { xmlns: "http://jabber.org/protocol/chatstates" }).up();
 						}
-						// FIXME: Preserve ordering with a per-chat outbox of pending messages
 						#if NO_OMEMO
-						client.sendStanza(stanza);
+						return Promise.resolve(stanza);
 						#else
-						client.omemo.encryptMessage(recipient, stanza).then((encryptedStanza) -> {
-							client.sendStanza(encryptedStanza);
+						return client.omemo.encryptMessage(recipient, stanza).then((encryptedStanza) -> {
+							return Promise.resolve(stanza);
 						});
 						#end
-					}
+					})).then(stanzas -> {
+						outboxItem.handle(() -> {
+							for (stanza in stanzas) {
+								client.sendStanza(stanza);
+							}
+						});
+					});
 					setLastMessage(message.build());
 					client.notifyMessageHandlers(stored[0], stored[0].versions.length > 1 ? CorrectionEvent : DeliveryEvent);
 					client.trigger("chats/update", [this]);
@@ -1064,6 +1072,7 @@ class Channel extends Chat {
 			if (!disco.features.contains("http://jabber.org/protocol/muc")) {
 				// Not a MUC, what kind of channel is this?
 				forceLive = true;
+				outbox.start();
 			}
 		}
 	}
@@ -1085,6 +1094,7 @@ class Channel extends Chat {
 			if (!disco.features.contains("http://jabber.org/protocol/muc")) {
 				// Not a MUC, owhat kind of channel is this?
 				forceLive = true;
+				outbox.start();
 				return;
 			}
 			stream.sendIq(
@@ -1110,6 +1120,7 @@ class Channel extends Chat {
 	private function join() {
 		presence = []; // About to ask for a fresh set
 		_nickInUse = null;
+		outbox.pause();
 		inSync = false;
 		client.trigger("chats/update", [this]);
 		final desiredFullJid = JID.parse(chatId).withResource(client.displayName());
@@ -1168,8 +1179,10 @@ class Channel extends Chat {
 		final oneTen = presence?.mucUser?.allTags("status").find((status) -> status.attr.get("code") == "110");
 		if (oneTen != null) {
 			_nickInUse = resource;
+			outbox.start();
 		} else if (resource == _nickInUse) {
 			_nickInUse = null;
+			outbox.pause();
 		}
 		if (presence != null && presence.mucUser != null && oneTen == null) {
 			final existing = this.presence.get(resource);
@@ -1474,8 +1487,9 @@ class Channel extends Chat {
 					activeThread = message.threadId;
 					stanza.tag("active", { xmlns: "http://jabber.org/protocol/chatstates" }).up();
 				}
+				final outboxItem = outbox.newItem();
 				client.storeMessages([message.build()]).then((stored) -> {
-					client.sendStanza(stanza);
+					outboxItem.handle(() -> client.sendStanza(stanza));
 					setLastMessage(stored[0]);
 					client.notifyMessageHandlers(stored[0], stored[0].versions.length > 1 ? CorrectionEvent : DeliveryEvent);
 					client.trigger("chats/update", [this]);
diff --git a/borogove/Outbox.hx b/borogove/Outbox.hx
new file mode 100644
index 0000000..3a9d415
--- /dev/null
+++ b/borogove/Outbox.hx
@@ -0,0 +1,56 @@
+package borogove;
+
+class Outbox {
+	private final items = [];
+	private var paused = true;
+
+	public function new() { }
+
+	public function newItem() {
+		final item = new OutboxItem(this);
+		items.push(item);
+		return item;
+	}
+
+	@:allow(borogove.OutboxItem)
+	private function next() {
+		if (paused) return;
+		if (items.length < 1) return;
+		if (items[0].run()) {
+			items.shift();
+			next();
+		}
+	}
+
+	public function pause() {
+		paused = true;
+	}
+
+	public function start() {
+		paused = false;
+		next();
+	}
+}
+
+class OutboxItem {
+	private final outbox: Outbox;
+	private var _handle: Null<()->Void> = null;
+
+	@:allow(borogove.Outbox)
+	private function new(outbox: Outbox) {
+		this.outbox = outbox;
+	}
+
+	public function handle(f: ()->Void) {
+		_handle = f;
+		outbox.next();
+	}
+
+	@:allow(borogove.Outbox)
+	private function run() {
+		if (_handle == null) return false;
+
+		_handle();
+		return true;
+	}
+}