git » sdk » commit f3a4086

Store highest sortId seen for an account

author Stephen Paul Weber
2026-05-10 18:35:48 UTC
committer Stephen Paul Weber
2026-05-10 18:38:39 UTC
parent beb577753571811cd60385bbd52df371c6ee378e

Store highest sortId seen for an account

borogove/Client.hx +6 -5
borogove/Persistence.hx +3 -2
borogove/Push.hx +5 -2
borogove/persistence/Dummy.hx +3 -3
borogove/persistence/IDB.js +10 -9
borogove/persistence/Sqlite.hx +16 -8

diff --git a/borogove/Client.hx b/borogove/Client.hx
index 77723e1..8c896e1 100644
--- a/borogove/Client.hx
+++ b/borogove/Client.hx
@@ -166,7 +166,7 @@ class Client extends EventEmitter {
 		});
 
 		stream.on("sm/update", (data) -> {
-			persistence.storeStreamManagement(this.accountId(), stream.emitSMupdates ? data.sm : null);
+			persistence.storeStreamManagement(this.accountId(), stream.emitSMupdates ? data.sm : null, sortId);
 			return EventHandled;
 		});
 
@@ -713,7 +713,8 @@ class Client extends EventEmitter {
 		stream.emitSMupdates = false; // We don't care until after sync
 		startOffline().then(_ ->
 			persistence.getStreamManagement(accountId())
-		).then((sm) -> {
+		).then((data) -> {
+			if (data.sortId > sortId) sortId = data.sortId;
 			stream.on("auth/password-needed", (data: { ?mechanisms: Array<{ name: String, canFast: Bool, canOther: Bool }> }) -> {
 				fastMechanism = data.mechanisms?.find((mech) -> mech.canFast)?.name;
 				if (token == null || (fastMechanism == null && data.mechanisms != null)) {
@@ -725,14 +726,14 @@ class Client extends EventEmitter {
 			stream.on("auth/fail", (data) -> {
 				if (token != null) {
 					token = null;
-					stream.connect(jid.asString(), sm);
+					stream.connect(jid.asString(), data.sm);
 				} else {
-					stream.connect(jid.asString(), sm);
+					stream.connect(jid.asString(), data.sm);
 				}
 				return EventHandled;
 			});
 			firstSync = true;
-			stream.connect(jid.asString(), sm);
+			stream.connect(jid.asString(), data.sm);
 		});
 	}
 
diff --git a/borogove/Persistence.hx b/borogove/Persistence.hx
index 0799ed4..febb2b3 100644
--- a/borogove/Persistence.hx
+++ b/borogove/Persistence.hx
@@ -219,9 +219,10 @@ interface Persistence {
 
 		@param accountId the account to store resumption data for
 		@param data stream management payload, or null to clear it
+		@param sortId highest sortId ever seen by this stream
 	**/
 	@HaxeCBridge.noemit
-	public function storeStreamManagement(accountId:String, data:Null<BytesData>):Void;
+	public function storeStreamManagement(accountId:String, data:Null<BytesData>, sortId: String):Void;
 
 	/**
 		Load stream management resumption data for an account
@@ -230,7 +231,7 @@ interface Persistence {
 		@returns Promise resolving to stored resumption data or null
 	**/
 	@HaxeCBridge.noemit
-	public function getStreamManagement(accountId:String): Promise<Null<BytesData>>;
+	public function getStreamManagement(accountId:String): Promise<{ sm: Null<BytesData>, sortId: String }>;
 
 	/**
 		Store metadata about a discovered service
diff --git a/borogove/Push.hx b/borogove/Push.hx
index 4eb2c7f..e814a71 100644
--- a/borogove/Push.hx
+++ b/borogove/Push.hx
@@ -39,8 +39,11 @@ class Push {
 		final message = ChatMessage.fromStanza(stanza, JID.parse(stanza.attr.get("to")).asBare());
 		if (message != null) {
 			// TODO: this puts every push at the same sortId until the next sync
-			persistence.syncPoint(message.account(), message.type == MessageChannel ? message.chatId() : null).then(point -> {
-				final sortId = FractionalIndexing.between(point?.sortId, null, FractionalIndexing.BASE_95_DIGITS);
+			(message.type == MessageChannel ?
+				persistence.syncPoint(message.account(), message.chatId()).then(point -> point?.sortId ?? "a ") :
+				persistence.getStreamManagement(message.account()).then(data -> data.sortId)
+			).then(sortId -> {
+				final sortId = FractionalIndexing.between(sortId, null, FractionalIndexing.BASE_95_DIGITS);
 				final toStore = ChatMessage.fromStanza(
 					stanza,
 					JID.parse(stanza.attr.get("to")).asBare(),
diff --git a/borogove/persistence/Dummy.hx b/borogove/persistence/Dummy.hx
index 636bb59..425df28 100644
--- a/borogove/persistence/Dummy.hx
+++ b/borogove/persistence/Dummy.hx
@@ -128,11 +128,11 @@ class Dummy implements Persistence {
 	}
 
 	@HaxeCBridge.noemit
-	public function storeStreamManagement(accountId:String, sm:Null<BytesData>) { }
+	public function storeStreamManagement(accountId:String, sm:Null<BytesData>, sortId: String) { }
 
 	@HaxeCBridge.noemit
-	public function getStreamManagement(accountId:String): Promise<Null<BytesData>> {
-		return Promise.resolve(null);
+	public function getStreamManagement(accountId:String): Promise<{ sm: Null<BytesData>, sortId: String }> {
+		return Promise.resolve({ sm: null, sortId: "a " });
 	}
 
 	@HaxeCBridge.noemit
diff --git a/borogove/persistence/IDB.js b/borogove/persistence/IDB.js
index 86d93d0..fa69d80 100644
--- a/borogove/persistence/IDB.js
+++ b/borogove/persistence/IDB.js
@@ -992,27 +992,28 @@ export default async (dbname, media, tokenize, stemmer) => {
 			};
 		},
 
-		storeStreamManagement: function(account, sm) {
+		async storeStreamManagement(account, sm, sortId) {
 			// Don't bother on ios, the indexeddb is too broken
 			// https://bugs.webkit.org/show_bug.cgi?id=287876
 			if (navigator.userAgent.match(/(iPad|iPhone|iPod)/g)) return;
 
 			const tx = db.transaction(["keyvaluepairs"], "readwrite");
 			const store = tx.objectStore("keyvaluepairs");
-			const req = store.put(sm, "sm:" + account);
-			req.onerror = () => { console.error("storeStreamManagement", req.error.name, req.error.message); }
+			await Promise.all([
+				await promisifyRequest(store.put(sm, "sm:" + account)),
+				await promisifyRequest(store.put(sortId, "sortId:" + account))
+			]);
 		},
 
 		async getStreamManagement(account) {
 			const tx = db.transaction(["keyvaluepairs"], "readonly");
 			const store = tx.objectStore("keyvaluepairs");
-			const v = await promisifyRequest(store.get("sm:" + account));
-			if (v instanceof ArrayBuffer) {
-				return v;
-			} else if(!v) {
-				return null;
+			const sm = await promisifyRequest(store.get("sm:" + account)) || null;
+			const sortId = await promisifyRequest(store.get("sortId:" + account)) ?? "a ";
+			if (sm instanceof ArrayBuffer || !sm) {
+				return { sm, sortId };
 			} else {
-				return new Blob([JSON.stringify(v)], {type: "text/plain; charset=utf-8"}).arrayBuffer();
+				return { sm: new Blob([JSON.stringify(sm)], {type: "text/plain; charset=utf-8"}).arrayBuffer(), sortId };
 			}
 		},
 
diff --git a/borogove/persistence/Sqlite.hx b/borogove/persistence/Sqlite.hx
index 126b045..848d712 100644
--- a/borogove/persistence/Sqlite.hx
+++ b/borogove/persistence/Sqlite.hx
@@ -228,6 +228,12 @@ class Sqlite implements Persistence implements KeyValueStore {
 						"PRAGMA user_version = 9"]);
 					}
 					return Promise.resolve(null);
+				}).then(_ -> {
+					if (version < 10) {
+						return exec(["ALTER TABLE accounts ADD COLUMN sort_id TEXT NOT NULL DEFAULT 'a '",
+						"PRAGMA user_version = 10"]);
+					}
+					return Promise.resolve(null);
 				});
 			});
 		});
@@ -771,29 +777,31 @@ class Sqlite implements Persistence implements KeyValueStore {
 
 	private var smStoreInProgress = false;
 	private var smStoreNext: Null<BytesData> = null;
+	private var smStoreIdNext: String = "a ";
 	@HaxeCBridge.noemit
-	public function storeStreamManagement(accountId:String, sm:Null<BytesData>) {
+	public function storeStreamManagement(accountId:String, sm:Null<BytesData>, sortId:String) {
 		smStoreNext = sm;
+		smStoreIdNext = sortId;
 		if (!smStoreInProgress) {
 			smStoreInProgress = true;
 			db.exec(
-				"UPDATE accounts SET sm_state=? WHERE account_id=?",
-				[sm, accountId]
+				"UPDATE accounts SET sm_state=?, sort_id=? WHERE account_id=?",
+				[sm, sortId, accountId]
 			).then(_ -> {
 				smStoreInProgress = false;
-				if (smStoreNext != sm) storeStreamManagement(accountId, sm);
+				if (smStoreNext != sm || smStoreIdNext != sortId) storeStreamManagement(accountId, sm, sortId);
 			});
 		}
 	}
 
 	@HaxeCBridge.noemit
-	public function getStreamManagement(accountId:String): Promise<Null<BytesData>> {
-		return db.exec("SELECT sm_state FROM accounts  WHERE account_id=?", [accountId]).then(result -> {
+	public function getStreamManagement(accountId:String): Promise<{ sm: Null<BytesData>, sortId: String }> {
+		return db.exec("SELECT sm_state, sort_id FROM accounts  WHERE account_id=?", [accountId]).then(result -> {
 			for (row in result) {
-				return row.sm_state;
+				return { sm: row.sm_state, sortId: row.sort_id };
 			}
 
-			return null;
+			return { sm: null, sortId: "a " };
 		});
 	}