git » sdk » commit 852a2c2

Better bulk store for messages with IDB

author Stephen Paul Weber
2026-06-04 02:56:40 UTC
committer Stephen Paul Weber
2026-06-04 03:11:54 UTC
parent 7a7c2a8b86aa799192d3cdfe6c9bf8fa9184f4e2

Better bulk store for messages with IDB

borogove/persistence/IDB.js +77 -69

diff --git a/borogove/persistence/IDB.js b/borogove/persistence/IDB.js
index 4101744..802ff9b 100644
--- a/borogove/persistence/IDB.js
+++ b/borogove/persistence/IDB.js
@@ -348,22 +348,24 @@ export default async (dbname, media, tokenize, stemmer) => {
 		return message.build();
 	}
 
-	async function hydrateMessage(value) {
+	async function hydrateMessage(value, store) {
 		if (!value) return null;
 
 		const message = hydrateMessageSync(value);
-		const tx = db.transaction(["messages"], "readonly");
-		const store = tx.objectStore("messages");
+		if (!store) {
+			const tx = db.transaction(["messages"], "readonly");
+			store = tx.objectStore("messages");
+		}
 		if (value.replyToMessage && !value.replyToMessage[2]) value.replyToMessage[2] = value.serverIdBy ?? value.chatId;
 		const range = value.replyToMessage && value.replyToMessage[1] !== message.serverId && value.replyToMessage[3] !== message.localId && (!value.replyToMessage[3] ?
 			IDBKeyRange.bound(value.replyToMessage.slice(0, 3), [...value.replyToMessage.slice(0, 3), []])
 			: IDBKeyRange.only(value.replyToMessage));
-		const replyToMessage = range && await hydrateMessage((await promisifyRequest(store.openCursor(range)))?.value);
+		const replyToMessage = range && await hydrateMessage((await promisifyRequest(store.openCursor(range)))?.value, store);
 
 		message.replyToMessage = replyToMessage;
 		message.versions = await Promise.all((value.versions || []).map(v => {
 			v.versions = []; // No need for nested versions...
-			return hydrateMessage(v);
+			return hydrateMessage(v, store);
 		}));
 		return message;
 	}
@@ -736,11 +738,13 @@ export default async (dbname, media, tokenize, stemmer) => {
 			}));
 		},
 
-		getMessage: async function(account, chatId, serverId, localId) {
+		getMessage: async function(account, chatId, serverId, localId, store) {
 			if (!serverId && !localId) throw "Can't getMessage by no id";
 
-			const tx = db.transaction(["messages"], "readonly");
-			const store = tx.objectStore("messages");
+			if (!store) {
+				const tx = db.transaction(["messages"], "readonly");
+				store = tx.objectStore("messages");
+			}
 			let result = null;
 			if (serverId) {
 				const cursor = store.openCursor(IDBKeyRange.bound([account, serverId], [account, serverId, []]));
@@ -758,11 +762,13 @@ export default async (dbname, media, tokenize, stemmer) => {
 			}
 			if (!result || !result.value) return null;
 			const message = result.value;
-			return await hydrateMessage(message);
+			return await hydrateMessage(message, store);
 		},
 
-		storeReaction: async function(account, update) {
-			const tx = db.transaction(["messages", "reactions"], "readwrite");
+		async storeReaction(account, update, tx) {
+			if (!tx) {
+				tx = db.transaction(["messages", "reactions"], "readwrite");
+			}
 			const store = tx.objectStore("messages");
 			const reactionStore = tx.objectStore("reactions");
 			let result;
@@ -782,77 +788,79 @@ export default async (dbname, media, tokenize, stemmer) => {
 			const message = result.value;
 			setReactions(message.reactions, update.senderId, reactions);
 			store.put(message);
-			return await hydrateMessage(message);
+			return await hydrateMessage(message, store);
 		},
 
 		async storeMessages(account, messages) {
-			return Promise.all(messages.map(m =>
-				new Promise(resolve => this.storeMessage(account, m, resolve))
-			));
+			const tx = db.transaction(["messages", "reactions"], "readwrite");
+			const store = tx.objectStore("messages");
+			const promises = [];
+tx.onerror = console.error;
+			for (const [index, m] of messages.entries()) {
+				const isLast = index + 1 >= messages.length;
+				promises.push(this.storeMessage(tx, store, account, m, isLast));
+			}
+
+			return await Promise.all(promises);
 		},
 
-		storeMessage: function(account, message, callback) {
+		async storeMessage(tx, store, account, message, wait) {
 			if (!message.chatId()) throw "Cannot store a message with no chatId";
 			if (!message.sortId) throw "Cannot store a message with no sortId";
 			if (!message.serverId && !message.localId) throw "Cannot store a message with no id";
 			if (!message.serverId && message.isIncoming()) throw "Cannot store an incoming message with no server id";
 			if (message.serverId && !message.serverIdBy) throw "Cannot store a message with a server id and no by";
 
-			(
-				// Hydrate reply stubs
-				message.replyToMessage && !message.replyToMessage.stanza ?
-					this.getMessage(account, message.chatId(), message.replyToMessage.serverId, message.replyToMessage.localId) :
-					Promise.resolve(message.replyToMessage)
-			).then((replyToMessage) => {
-				message.replyToMessage = replyToMessage ?? message.replyToMessage;
-				const tx = db.transaction(["messages", "reactions"], "readwrite");
-				const store = tx.objectStore("messages");
-				return Promise.all([
-					promisifyRequest(store.index("localId").openCursor(IDBKeyRange.only([account, message.localId || [], message.chatId()]))),
-					promisifyRequest(tx.objectStore("reactions").openCursor(IDBKeyRange.only([account, message.chatId(), message.senderId, message.localId || ""])))
-				]).then(([result, reactionResult]) => {
-					if (reactionResult?.value?.append && message.html().trim() == "") {
-						this.getMessage(account, message.chatId(), reactionResult.value.serverId, reactionResult.value.localId).then((reactToMessage) => {
-							const previouslyAppended = hydrateReactionsArray(reactionResult.value.append, reactionResult.value.senderId, reactionResult.value.timestamp).map(r => r.key);
-							const reactions = [];
-							for (const [k, reacts] of reactToMessage?.reactions || []) {
-								for (const react of reacts) {
-									if (react.senderId === message.senderId && !previouslyAppended.includes(k)) reactions.push(react);
-								}
-							}
-							this.storeReaction(account, new borogove_ReactionUpdate(message.localId, reactionResult.value.serverId, reactionResult.value.serverIdBy, reactionResult.value.localId, message.chatId(), message.senderId, message.timestamp, reactions, enums.borogove_ReactionUpdateKind.CompleteReactions), callback);
-						});
-						return true;
-					} else if (result?.value && !message.isIncoming() && result?.value.direction === enums.borogove_MessageDirection.MessageSent && message.versions.length < 1) {
-						// Duplicate, we trust our own sent ids
-						return promisifyRequest(result.delete());
-					} else if (result?.value && (result.value.senderId == message.senderId || result.value.type == enums.borogove_MessageType.MessageCall) && (message.versions.length > 0 || (result.value.versions || []).length > 0)) {
-						hydrateMessage(correctMessage(account, message, result)).then(callback);
-						return true;
+			// Hydrate reply stubs
+			const replyToMessage = message.replyToMessage && !message.replyToMessage.stanza ?
+				await this.getMessage(account, message.chatId(), message.replyToMessage.serverId, message.replyToMessage.localId, store) :
+				message.replyToMessage;
+			message.replyToMessage = replyToMessage ?? message.replyToMessage;
+
+			const result = await promisifyRequest(store.index("localId").openCursor(IDBKeyRange.only([account, message.localId || [], message.chatId()])));
+			const reactionResult = await promisifyRequest(tx.objectStore("reactions").openCursor(IDBKeyRange.only([account, message.chatId(), message.senderId, message.localId || ""])));
+			if (reactionResult?.value?.append && message.html().trim() == "") {
+				const reactToMesssage = await this.getMessage(account, message.chatId(), reactionResult.value.serverId, reactionResult.value.localId, store);
+				const previouslyAppended = hydrateReactionsArray(reactionResult.value.append, reactionResult.value.senderId, reactionResult.value.timestamp).map(r => r.key);
+				const reactions = [];
+				for (const [k, reacts] of reactToMessage?.reactions || []) {
+					for (const react of reacts) {
+						if (react.senderId === message.senderId && !previouslyAppended.includes(k)) reactions.push(react);
 					}
-				}).then((done) => {
-					if (!done) {
-						// There may be reactions already if we are paging backwards
-						const cursor = tx.objectStore("reactions").index("senders").openCursor(IDBKeyRange.bound([account, message.chatId(), message.getReplyId() || ""], [account, message.chatId(), message.getReplyId() || "", []]), "prev");
-						const reactions = new Map();
-						const reactionTimes = new Map();
-						cursor.onsuccess = (event) => {
-							if (event.target.result && event.target.result.value) {
-								const time = reactionTimes.get(event.target.result.senderId);
-								if (!time || time < event.target.result.value.timestamp) {
-									setReactions(reactions, event.target.result.value.senderId, hydrateReactionsArray(event.target.result.value.reactions, event.target.result.senderId, event.target.result.timestamp));
-									reactionTimes.set(event.target.result.value.senderId, event.target.result.value.timestamp);
-								}
-								event.target.result.continue();
-							} else {
-								message.reactions = reactions;
-								promisifyRequest(store.put(serializeMessage(account, message))).then(() => callback(message));
-							}
-						};
-						cursor.onerror = console.error;
+				}
+				return await this.storeReaction(account, new borogove_ReactionUpdate(message.localId, reactionResult.value.serverId, reactionResult.value.serverIdBy, reactionResult.value.localId, message.chatId(), message.senderId, message.timestamp, reactions, enums.borogove_ReactionUpdateKind.CompleteReactions), tx);
+			} else if (result?.value && !message.isIncoming() && result?.value.direction === enums.borogove_MessageDirection.MessageSent && message.versions.length < 1) {
+				// Duplicate, we trust our own sent ids
+				await promisifyRequest(result.delete());
+			} else if (result?.value && (result.value.senderId == message.senderId || result.value.type == enums.borogove_MessageType.MessageCall) && (message.versions.length > 0 || (result.value.versions || []).length > 0)) {
+				return await hydrateMessage(correctMessage(account, message, result), store);
+			}
+
+			// There may be reactions already if we are paging backwards
+			const cursor = tx.objectStore("reactions").index("senders").openCursor(IDBKeyRange.bound([account, message.chatId(), message.getReplyId() || ""], [account, message.chatId(), message.getReplyId() || "", []]), "prev");
+			const reactions = new Map();
+			const reactionTimes = new Map();
+			while (true) {
+				const cresult = await promisifyRequest(cursor);
+				if (!cresult?.value) {
+					message.reactions = reactions;
+					const toPut = serializeMessage(account, message);
+					if (wait) {
+						await promisifyRequest(store.put(toPut));
+					} else {
+						store.put(toPut).onerror = console.error;
 					}
-				});
-			});
+					return message;
+				}
+
+				const time = reactionTimes.get(cresult.value.senderId);
+				if (!time || time < cresult.value.timestamp) {
+					setReactions(reactions, cresult.value.senderId, hydrateReactionsArray(cresult.value.reactions, cresult.senderId, cresult.timestamp));
+					reactionTimes.set(cresult.value.senderId, cresult.value.timestamp);
+				}
+
+				cresult.continue();
+			}
 		},
 
 		updateMessage: function(account, message) {