| author | Stephen Paul Weber
<singpolyma@singpolyma.net> 2025-11-12 16:51:15 UTC |
| committer | Stephen Paul Weber
<singpolyma@singpolyma.net> 2025-11-12 16:51:15 UTC |
| parent | 8052bd2655a765ef2770c0492e196ee455ddcf9c |
| borogove/Chat.hx | +22 | -8 |
| borogove/Outbox.hx | +56 | -0 |
diff --git a/borogove/Chat.hx b/borogove/Chat.hx index 573e4e3..c6d7feb 100644 --- a/borogove/Chat.hx +++ b/borogove/Chat.hx @@ -11,6 +11,7 @@ import borogove.GenericStream; import borogove.ID; import borogove.Message; import borogove.MessageSync; +import borogove.Outbox; import borogove.Reaction; #if !NO_JINGLE import borogove.calls.PeerConnection; @@ -98,7 +99,7 @@ abstract class Chat { private var isActive: Null<Bool> = null; private var activeThread: Null<String> = null; private var notificationSettings: Null<{reply: Bool, mention: Bool}> = null; - + private var outbox = new Outbox(); private var _encryptionMode: EncryptionMode = Unencrypted; @:allow(borogove) @@ -785,6 +786,7 @@ class DirectChat extends Chat { @:allow(borogove) private function new(client:Client, stream:GenericStream, persistence:Persistence, chatId:String, uiState = Open, isBlocked = false, extensions: Null<Stanza> = null, readUpToId: Null<String> = null, readUpToBy: Null<String> = null, omemoContactDeviceIDs: Array<Int> = null) { super(client, stream, persistence, chatId, uiState, isBlocked, extensions, readUpToId, readUpToBy, omemoContactDeviceIDs); + outbox.start(); } @HaxeCBridge.noemit // on superclass as abstract @@ -899,8 +901,9 @@ class DirectChat extends Chat { final fromStanza = Message.fromStanza(message.build().asStanza(), client.jid).parsed; switch (fromStanza) { case ChatMessageStanza(_): + var outboxItem = outbox.newItem(); client.storeMessages([message.build()]).then((stored) -> { - for (recipient in message.recipients) { + thenshim.PromiseTools.all(message.recipients.map(recipient -> { message.to = recipient; final stanza = message.build().asStanza(); if (isActive != null) { @@ -908,15 +911,20 @@ class DirectChat extends Chat { activeThread = message.threadId; stanza.tag("active", { xmlns: "http://jabber.org/protocol/chatstates" }).up(); } - // FIXME: Preserve ordering with a per-chat outbox of pending messages #if NO_OMEMO - client.sendStanza(stanza); + return Promise.resolve(stanza); #else - client.omemo.encryptMessage(recipient, stanza).then((encryptedStanza) -> { - client.sendStanza(encryptedStanza); + return client.omemo.encryptMessage(recipient, stanza).then((encryptedStanza) -> { + return Promise.resolve(stanza); }); #end - } + })).then(stanzas -> { + outboxItem.handle(() -> { + for (stanza in stanzas) { + client.sendStanza(stanza); + } + }); + }); setLastMessage(message.build()); client.notifyMessageHandlers(stored[0], stored[0].versions.length > 1 ? CorrectionEvent : DeliveryEvent); client.trigger("chats/update", [this]); @@ -1064,6 +1072,7 @@ class Channel extends Chat { if (!disco.features.contains("http://jabber.org/protocol/muc")) { // Not a MUC, what kind of channel is this? forceLive = true; + outbox.start(); } } } @@ -1085,6 +1094,7 @@ class Channel extends Chat { if (!disco.features.contains("http://jabber.org/protocol/muc")) { // Not a MUC, owhat kind of channel is this? forceLive = true; + outbox.start(); return; } stream.sendIq( @@ -1110,6 +1120,7 @@ class Channel extends Chat { private function join() { presence = []; // About to ask for a fresh set _nickInUse = null; + outbox.pause(); inSync = false; client.trigger("chats/update", [this]); final desiredFullJid = JID.parse(chatId).withResource(client.displayName()); @@ -1168,8 +1179,10 @@ class Channel extends Chat { final oneTen = presence?.mucUser?.allTags("status").find((status) -> status.attr.get("code") == "110"); if (oneTen != null) { _nickInUse = resource; + outbox.start(); } else if (resource == _nickInUse) { _nickInUse = null; + outbox.pause(); } if (presence != null && presence.mucUser != null && oneTen == null) { final existing = this.presence.get(resource); @@ -1474,8 +1487,9 @@ class Channel extends Chat { activeThread = message.threadId; stanza.tag("active", { xmlns: "http://jabber.org/protocol/chatstates" }).up(); } + final outboxItem = outbox.newItem(); client.storeMessages([message.build()]).then((stored) -> { - client.sendStanza(stanza); + outboxItem.handle(() -> client.sendStanza(stanza)); setLastMessage(stored[0]); client.notifyMessageHandlers(stored[0], stored[0].versions.length > 1 ? CorrectionEvent : DeliveryEvent); client.trigger("chats/update", [this]); diff --git a/borogove/Outbox.hx b/borogove/Outbox.hx new file mode 100644 index 0000000..3a9d415 --- /dev/null +++ b/borogove/Outbox.hx @@ -0,0 +1,56 @@ +package borogove; + +class Outbox { + private final items = []; + private var paused = true; + + public function new() { } + + public function newItem() { + final item = new OutboxItem(this); + items.push(item); + return item; + } + + @:allow(borogove.OutboxItem) + private function next() { + if (paused) return; + if (items.length < 1) return; + if (items[0].run()) { + items.shift(); + next(); + } + } + + public function pause() { + paused = true; + } + + public function start() { + paused = false; + next(); + } +} + +class OutboxItem { + private final outbox: Outbox; + private var _handle: Null<()->Void> = null; + + @:allow(borogove.Outbox) + private function new(outbox: Outbox) { + this.outbox = outbox; + } + + public function handle(f: ()->Void) { + _handle = f; + outbox.next(); + } + + @:allow(borogove.Outbox) + private function run() { + if (_handle == null) return false; + + _handle(); + return true; + } +}