| author | Stephen Paul Weber
<singpolyma@singpolyma.net> 2026-05-10 18:35:48 UTC |
| committer | Stephen Paul Weber
<singpolyma@singpolyma.net> 2026-05-10 18:38:39 UTC |
| parent | beb577753571811cd60385bbd52df371c6ee378e |
| borogove/Client.hx | +6 | -5 |
| borogove/Persistence.hx | +3 | -2 |
| borogove/Push.hx | +5 | -2 |
| borogove/persistence/Dummy.hx | +3 | -3 |
| borogove/persistence/IDB.js | +10 | -9 |
| borogove/persistence/Sqlite.hx | +16 | -8 |
diff --git a/borogove/Client.hx b/borogove/Client.hx index 77723e1..8c896e1 100644 --- a/borogove/Client.hx +++ b/borogove/Client.hx @@ -166,7 +166,7 @@ class Client extends EventEmitter { }); stream.on("sm/update", (data) -> { - persistence.storeStreamManagement(this.accountId(), stream.emitSMupdates ? data.sm : null); + persistence.storeStreamManagement(this.accountId(), stream.emitSMupdates ? data.sm : null, sortId); return EventHandled; }); @@ -713,7 +713,8 @@ class Client extends EventEmitter { stream.emitSMupdates = false; // We don't care until after sync startOffline().then(_ -> persistence.getStreamManagement(accountId()) - ).then((sm) -> { + ).then((data) -> { + if (data.sortId > sortId) sortId = data.sortId; stream.on("auth/password-needed", (data: { ?mechanisms: Array<{ name: String, canFast: Bool, canOther: Bool }> }) -> { fastMechanism = data.mechanisms?.find((mech) -> mech.canFast)?.name; if (token == null || (fastMechanism == null && data.mechanisms != null)) { @@ -725,14 +726,14 @@ class Client extends EventEmitter { stream.on("auth/fail", (data) -> { if (token != null) { token = null; - stream.connect(jid.asString(), sm); + stream.connect(jid.asString(), data.sm); } else { - stream.connect(jid.asString(), sm); + stream.connect(jid.asString(), data.sm); } return EventHandled; }); firstSync = true; - stream.connect(jid.asString(), sm); + stream.connect(jid.asString(), data.sm); }); } diff --git a/borogove/Persistence.hx b/borogove/Persistence.hx index 0799ed4..febb2b3 100644 --- a/borogove/Persistence.hx +++ b/borogove/Persistence.hx @@ -219,9 +219,10 @@ interface Persistence { @param accountId the account to store resumption data for @param data stream management payload, or null to clear it + @param sortId highest sortId ever seen by this stream **/ @HaxeCBridge.noemit - public function storeStreamManagement(accountId:String, data:Null<BytesData>):Void; + public function storeStreamManagement(accountId:String, data:Null<BytesData>, sortId: String):Void; /** Load stream management resumption data for an account @@ -230,7 +231,7 @@ interface Persistence { @returns Promise resolving to stored resumption data or null **/ @HaxeCBridge.noemit - public function getStreamManagement(accountId:String): Promise<Null<BytesData>>; + public function getStreamManagement(accountId:String): Promise<{ sm: Null<BytesData>, sortId: String }>; /** Store metadata about a discovered service diff --git a/borogove/Push.hx b/borogove/Push.hx index 4eb2c7f..e814a71 100644 --- a/borogove/Push.hx +++ b/borogove/Push.hx @@ -39,8 +39,11 @@ class Push { final message = ChatMessage.fromStanza(stanza, JID.parse(stanza.attr.get("to")).asBare()); if (message != null) { // TODO: this puts every push at the same sortId until the next sync - persistence.syncPoint(message.account(), message.type == MessageChannel ? message.chatId() : null).then(point -> { - final sortId = FractionalIndexing.between(point?.sortId, null, FractionalIndexing.BASE_95_DIGITS); + (message.type == MessageChannel ? + persistence.syncPoint(message.account(), message.chatId()).then(point -> point?.sortId ?? "a ") : + persistence.getStreamManagement(message.account()).then(data -> data.sortId) + ).then(sortId -> { + final sortId = FractionalIndexing.between(sortId, null, FractionalIndexing.BASE_95_DIGITS); final toStore = ChatMessage.fromStanza( stanza, JID.parse(stanza.attr.get("to")).asBare(), diff --git a/borogove/persistence/Dummy.hx b/borogove/persistence/Dummy.hx index 636bb59..425df28 100644 --- a/borogove/persistence/Dummy.hx +++ b/borogove/persistence/Dummy.hx @@ -128,11 +128,11 @@ class Dummy implements Persistence { } @HaxeCBridge.noemit - public function storeStreamManagement(accountId:String, sm:Null<BytesData>) { } + public function storeStreamManagement(accountId:String, sm:Null<BytesData>, sortId: String) { } @HaxeCBridge.noemit - public function getStreamManagement(accountId:String): Promise<Null<BytesData>> { - return Promise.resolve(null); + public function getStreamManagement(accountId:String): Promise<{ sm: Null<BytesData>, sortId: String }> { + return Promise.resolve({ sm: null, sortId: "a " }); } @HaxeCBridge.noemit diff --git a/borogove/persistence/IDB.js b/borogove/persistence/IDB.js index 86d93d0..fa69d80 100644 --- a/borogove/persistence/IDB.js +++ b/borogove/persistence/IDB.js @@ -992,27 +992,28 @@ export default async (dbname, media, tokenize, stemmer) => { }; }, - storeStreamManagement: function(account, sm) { + async storeStreamManagement(account, sm, sortId) { // Don't bother on ios, the indexeddb is too broken // https://bugs.webkit.org/show_bug.cgi?id=287876 if (navigator.userAgent.match(/(iPad|iPhone|iPod)/g)) return; const tx = db.transaction(["keyvaluepairs"], "readwrite"); const store = tx.objectStore("keyvaluepairs"); - const req = store.put(sm, "sm:" + account); - req.onerror = () => { console.error("storeStreamManagement", req.error.name, req.error.message); } + await Promise.all([ + await promisifyRequest(store.put(sm, "sm:" + account)), + await promisifyRequest(store.put(sortId, "sortId:" + account)) + ]); }, async getStreamManagement(account) { const tx = db.transaction(["keyvaluepairs"], "readonly"); const store = tx.objectStore("keyvaluepairs"); - const v = await promisifyRequest(store.get("sm:" + account)); - if (v instanceof ArrayBuffer) { - return v; - } else if(!v) { - return null; + const sm = await promisifyRequest(store.get("sm:" + account)) || null; + const sortId = await promisifyRequest(store.get("sortId:" + account)) ?? "a "; + if (sm instanceof ArrayBuffer || !sm) { + return { sm, sortId }; } else { - return new Blob([JSON.stringify(v)], {type: "text/plain; charset=utf-8"}).arrayBuffer(); + return { sm: new Blob([JSON.stringify(sm)], {type: "text/plain; charset=utf-8"}).arrayBuffer(), sortId }; } }, diff --git a/borogove/persistence/Sqlite.hx b/borogove/persistence/Sqlite.hx index 126b045..848d712 100644 --- a/borogove/persistence/Sqlite.hx +++ b/borogove/persistence/Sqlite.hx @@ -228,6 +228,12 @@ class Sqlite implements Persistence implements KeyValueStore { "PRAGMA user_version = 9"]); } return Promise.resolve(null); + }).then(_ -> { + if (version < 10) { + return exec(["ALTER TABLE accounts ADD COLUMN sort_id TEXT NOT NULL DEFAULT 'a '", + "PRAGMA user_version = 10"]); + } + return Promise.resolve(null); }); }); }); @@ -771,29 +777,31 @@ class Sqlite implements Persistence implements KeyValueStore { private var smStoreInProgress = false; private var smStoreNext: Null<BytesData> = null; + private var smStoreIdNext: String = "a "; @HaxeCBridge.noemit - public function storeStreamManagement(accountId:String, sm:Null<BytesData>) { + public function storeStreamManagement(accountId:String, sm:Null<BytesData>, sortId:String) { smStoreNext = sm; + smStoreIdNext = sortId; if (!smStoreInProgress) { smStoreInProgress = true; db.exec( - "UPDATE accounts SET sm_state=? WHERE account_id=?", - [sm, accountId] + "UPDATE accounts SET sm_state=?, sort_id=? WHERE account_id=?", + [sm, sortId, accountId] ).then(_ -> { smStoreInProgress = false; - if (smStoreNext != sm) storeStreamManagement(accountId, sm); + if (smStoreNext != sm || smStoreIdNext != sortId) storeStreamManagement(accountId, sm, sortId); }); } } @HaxeCBridge.noemit - public function getStreamManagement(accountId:String): Promise<Null<BytesData>> { - return db.exec("SELECT sm_state FROM accounts WHERE account_id=?", [accountId]).then(result -> { + public function getStreamManagement(accountId:String): Promise<{ sm: Null<BytesData>, sortId: String }> { + return db.exec("SELECT sm_state, sort_id FROM accounts WHERE account_id=?", [accountId]).then(result -> { for (row in result) { - return row.sm_state; + return { sm: row.sm_state, sortId: row.sort_id }; } - return null; + return { sm: null, sortId: "a " }; }); }