| author | Stephen Paul Weber
<singpolyma@singpolyma.net> 2025-03-04 15:42:26 UTC |
| committer | Stephen Paul Weber
<singpolyma@singpolyma.net> 2025-03-04 16:02:27 UTC |
| parent | 431d3f471880fb0363ac4f0d58c949bc1b875acf |
| .gitignore | +3 | -1 |
| Makefile | +4 | -2 |
| browserjs.hxml | +1 | -0 |
| cpp.hxml | +1 | -0 |
| nodejs.hxml | +1 | -0 |
| npm/index.ts | +6 | -3 |
| npm/package.json | +1 | -0 |
| snikket/Chat.hx | +36 | -33 |
| snikket/Client.hx | +39 | -29 |
| snikket/Persistence.hx | +2 | -2 |
| snikket/jingle/Session.hx | +12 | -12 |
| snikket/persistence/Dummy.hx | +3 | -3 |
| snikket/persistence/{browser.js => IDB.js} | +85 | -111 |
| snikket/persistence/KeyValueStore.hx | +6 | -0 |
| snikket/persistence/MediaStore.hx | +11 | -0 |
| snikket/persistence/MediaStoreCache.js | +80 | -0 |
| snikket/persistence/MediaStoreFS.hx | +89 | -0 |
| snikket/persistence/Sqlite.hx | +439 | -349 |
| snikket/persistence/SqliteDriver.hx | +46 | -0 |
| snikket/persistence/SqliteDriver.js.hx | +56 | -0 |
| snikket/persistence/sqlite-worker1.mjs | +38 | -0 |
diff --git a/.gitignore b/.gitignore index d5856fa..3f28a3a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ npm/package-lock.json npm/*.d.ts -npm/browser.js +npm/MediaStoreCache.js +npm/sqlite-worker1.mjs +npm/IDB.js npm/snikket.js npm/snikket-enums.ts npm/snikket-enums.js diff --git a/Makefile b/Makefile index 6e79179..ab8a9ad 100644 --- a/Makefile +++ b/Makefile @@ -37,8 +37,10 @@ npm/snikket.js: sed -i '1ivar exports = {};' npm/snikket.js echo "export const snikket = exports.snikket;" >> npm/snikket.js -npm: npm/snikket-browser.js npm/snikket.js snikket/persistence/browser.js - cp snikket/persistence/browser.js npm +npm: npm/snikket-browser.js npm/snikket.js snikket/persistence/IDB.js snikket/persistence/MediaStoreCache.js snikket/persistence/sqlite-worker1.mjs + cp snikket/persistence/IDB.js npm + cp snikket/persistence/MediaStoreCache.js npm + cp snikket/persistence/sqlite-worker1.mjs npm cd npm && npx tsc --esModuleInterop --lib esnext,dom --target esnext --preserveConstEnums -d index.ts sed -i '1iimport { snikket as enums } from "./snikket-enums.js";' npm/index.js diff --git a/browserjs.hxml b/browserjs.hxml index 77f23e1..2c14580 100644 --- a/browserjs.hxml +++ b/browserjs.hxml @@ -11,6 +11,7 @@ snikket.Client snikket.Push snikket.Version +snikket.persistence.Sqlite -D js-es=6 -D hxtsdgen_enums_ts diff --git a/cpp.hxml b/cpp.hxml index 5e436d5..5619ddc 100644 --- a/cpp.hxml +++ b/cpp.hxml @@ -11,6 +11,7 @@ snikket.Client snikket.Push snikket.persistence.Dummy snikket.persistence.Sqlite +snikket.persistence.MediaStoreFS --cpp cpp -D HXCPP_ALIGN_ALLOC diff --git a/nodejs.hxml b/nodejs.hxml index 7a6c171..8182b7d 100644 --- a/nodejs.hxml +++ b/nodejs.hxml @@ -12,6 +12,7 @@ snikket.Client snikket.Push snikket.Version +snikket.persistence.Sqlite -D js-es=6 -D hxtsdgen_enums_ts diff --git a/npm/index.ts b/npm/index.ts index c6c0009..f067e2d 100644 --- a/npm/index.ts +++ b/npm/index.ts @@ -1,4 +1,5 @@ -import browserp from "./browser.js"; +import IDBjs from "./IDB.js"; +import MediaStoreCachejs from "./MediaStoreCache.js"; import { snikket as enums } from "./snikket-enums.js"; import { snikket } from "./snikket.js"; @@ -33,6 +34,8 @@ export import UiState = enums.UiState; export import UserState = enums.UserState; export namespace persistence { - export import browser = browserp; - export import Dummy = snikket.persistence.Dummy; + export import IDB = IDBjs; + export import MediaStoreCache = MediaStoreCachejs; + export import Dummy = snikket.persistence.Dummy; + export import Sqlite = snikket.persistence.Sqlite; } diff --git a/npm/package.json b/npm/package.json index 01fdbdc..7baf45f 100644 --- a/npm/package.json +++ b/npm/package.json @@ -6,6 +6,7 @@ "type": "module", "files": [ "*.js", + "*.mjs", "*.ts" ], "browser": { diff --git a/snikket/Chat.hx b/snikket/Chat.hx index fc369b4..44742a6 100644 --- a/snikket/Chat.hx +++ b/snikket/Chat.hx @@ -46,6 +46,7 @@ abstract class Chat { private var persistence:Persistence; @:allow(snikket) private var avatarSha1:Null<BytesData> = null; + @:allow(snikket) private var presence:Map<String, Presence> = []; private var trusted:Bool = false; /** @@ -123,16 +124,12 @@ abstract class Chat { abstract public function getMessagesAround(aroundId:Null<String>, aroundTime:Null<String>, handler:(Array<ChatMessage>)->Void):Void; private function fetchFromSync(sync: MessageSync, callback: (Array<ChatMessage>)->Void) { - final promises = []; sync.onMessages((messageList) -> { final chatMessages = []; for (m in messageList.messages) { switch (m) { case ChatMessageStanza(message): - final chatMessage = prepareIncomingMessage(message, new Stanza("message", { from: message.senderId() })); - promises.push(new thenshim.Promise((resolve, reject) -> { - client.storeMessage(chatMessage, resolve); - })); + chatMessages.push(prepareIncomingMessage(message, new Stanza("message", { from: message.senderId() }))); case ReactionUpdateStanza(update): persistence.storeReaction(client.accountId(), update, (m)->{}); case ModerateMessageStanza(action): @@ -141,7 +138,7 @@ abstract class Chat { // ignore } } - thenshim.PromiseTools.all(promises).then((chatMessages) -> { + client.storeMessages(chatMessages, (chatMessages) -> { callback(chatMessages.filter((m) -> m != null && m.chatId() == chatId)); }); }); @@ -295,7 +292,7 @@ abstract class Chat { **/ public function togglePinned(): Void { uiState = uiState == Pinned ? Open : Pinned; - persistence.storeChat(client.accountId(), this); + persistence.storeChats(client.accountId(), [this]); client.sortChats(); client.trigger("chats/update", [this]); } @@ -307,7 +304,7 @@ abstract class Chat { if (reportSpam != null && !onServer) throw "Can't report SPAM if not sending to server"; isBlocked = true; if (uiState == Closed) { - persistence.storeChat(client.accountId(), this); + persistence.storeChats(client.accountId(), [this]); } else { close(); // close persists } @@ -330,7 +327,7 @@ abstract class Chat { public function unblock(onServer: Bool): Void { isBlocked = false; uiState = Open; - persistence.storeChat(client.accountId(), this); + persistence.storeChats(client.accountId(), [this]); client.trigger("chats/update", [this]); if (onServer) { stream.sendIq( @@ -592,7 +589,7 @@ abstract class Chat { readUpToId = upTo; readUpToBy = upToBy; - persistence.storeChat(client.accountId(), this); + persistence.storeChats(client.accountId(), [this]); persistence.getMessagesBefore(client.accountId(), chatId, null, null, (messages) -> { var i = messages.length; while (--i >= 0) { @@ -734,17 +731,17 @@ class DirectChat extends Chat { message.resetLocalId(); message.versions = [toSend]; // This is a correction message.localId = localId; - client.storeMessage(message, (corrected) -> { - toSend.versions = corrected.localId == localId ? corrected.versions : [message]; + client.storeMessages([message], (corrected) -> { + toSend.versions = corrected[0].localId == localId ? corrected[0].versions : [message]; for (recipient in message.recipients) { message.to = recipient; client.sendStanza(toSend.asStanza()); } if (localId == lastMessage?.localId) { - setLastMessage(corrected); + setLastMessage(corrected[0]); client.trigger("chats/update", [this]); } - client.notifyMessageHandlers(corrected, CorrectionEvent); + client.notifyMessageHandlers(corrected[0], CorrectionEvent); }); } @@ -756,7 +753,7 @@ class DirectChat extends Chat { final fromStanza = Message.fromStanza(message.asStanza(), client.jid).parsed; switch (fromStanza) { case ChatMessageStanza(_): - client.storeMessage(message, (stored) -> { + client.storeMessages([message], (stored) -> { for (recipient in message.recipients) { message.to = recipient; final stanza = message.asStanza(); @@ -769,7 +766,7 @@ class DirectChat extends Chat { } setLastMessage(message); client.trigger("chats/update", [this]); - client.notifyMessageHandlers(stored, stored.versions.length > 1 ? CorrectionEvent : DeliveryEvent); + client.notifyMessageHandlers(stored[0], stored[0].versions.length > 1 ? CorrectionEvent : DeliveryEvent); }); case ReactionUpdateStanza(update): persistence.storeReaction(client.accountId(), update, (stored) -> { @@ -890,7 +887,7 @@ class DirectChat extends Chat { if (typingTimer != null) typingTimer.stop(); // Should this remove from roster? uiState = Closed; - persistence.storeChat(client.accountId(), this); + persistence.storeChats(client.accountId(), [this]); sendChatState("gone", null); client.trigger("chats/update", [this]); } @@ -1019,15 +1016,14 @@ class Channel extends Chat { final chatMessages = []; sync.onMessages((messageList) -> { final promises = []; + final pageChatMessages = []; for (m in messageList.messages) { switch (m) { case ChatMessageStanza(message): for (hash in message.inlineHashReferences()) { client.fetchMediaByHash([hash], [message.from]); } - promises.push(new thenshim.Promise((resolve, reject) -> { - client.storeMessage(message, resolve); - })); + pageChatMessages.push(message); case ReactionUpdateStanza(update): promises.push(new thenshim.Promise((resolve, reject) -> { persistence.storeReaction(client.accountId(), update, (_) -> resolve(null)); @@ -1040,11 +1036,18 @@ class Channel extends Chat { // ignore } } + promises.push(new thenshim.Promise((resolve, reject) -> { + client.storeMessages(pageChatMessages, resolve); + })); thenshim.PromiseTools.all(promises).then((stored) -> { - for (message in stored) { - client.notifySyncMessageHandlers(message); - if (message != null && message.chatId() == chatId) chatMessages.push(message); - if (chatMessages.length > 1000) chatMessages.shift(); // Save some RAM + for (messages in stored) { + if (messages != null) { + for (message in messages) { + client.notifySyncMessageHandlers(message); + if (message != null && message.chatId() == chatId) chatMessages.push(message); + if (chatMessages.length > 1000) chatMessages.shift(); // Save some RAM + } + } } if (sync.hasMore()) { sync.fetchNext(); @@ -1098,7 +1101,7 @@ class Channel extends Chat { if (discoGet.getResult() != null) { disco = discoGet.getResult(); persistence.storeCaps(discoGet.getResult()); - persistence.storeChat(client.accountId(), this); + persistence.storeChats(client.accountId(), [this]); } if (callback != null) callback(); }); @@ -1233,14 +1236,14 @@ class Channel extends Chat { message.resetLocalId(); message.versions = [toSend]; // This is a correction message.localId = localId; - client.storeMessage(message, (corrected) -> { - toSend.versions = corrected.localId == localId ? corrected.versions : [message]; + client.storeMessages([message], (corrected) -> { + toSend.versions = corrected[0].localId == localId ? corrected[0].versions : [message]; client.sendStanza(toSend.asStanza()); + client.notifyMessageHandlers(corrected[0], CorrectionEvent); if (localId == lastMessage?.localId) { - setLastMessage(corrected); + setLastMessage(corrected[0]); client.trigger("chats/update", [this]); } - client.notifyMessageHandlers(corrected, CorrectionEvent); }); } @@ -1261,11 +1264,11 @@ class Channel extends Chat { activeThread = message.threadId; stanza.tag("active", { xmlns: "http://jabber.org/protocol/chatstates" }).up(); } - client.storeMessage(message, (stored) -> { + client.storeMessages([message], (stored) -> { client.sendStanza(stanza); - setLastMessage(stored); + setLastMessage(stored[0]); + client.notifyMessageHandlers(stored[0], stored[0].versions.length > 1 ? CorrectionEvent : DeliveryEvent); client.trigger("chats/update", [this]); - client.notifyMessageHandlers(stored, stored.versions.length > 1 ? CorrectionEvent : DeliveryEvent); }); case ReactionUpdateStanza(update): persistence.storeReaction(client.accountId(), update, (stored) -> { @@ -1400,7 +1403,7 @@ class Channel extends Chat { public function close() { if (typingTimer != null) typingTimer.stop(); uiState = Closed; - persistence.storeChat(client.accountId(), this); + persistence.storeChats(client.accountId(), [this]); selfPing(false); bookmark(); // TODO: what if not previously bookmarked? sendChatState("gone", null); diff --git a/snikket/Client.hx b/snikket/Client.hx index c6d205f..8213bf3 100644 --- a/snikket/Client.hx +++ b/snikket/Client.hx @@ -182,7 +182,7 @@ class Client extends EventEmitter { if (chatMessage.serverId == null) { updateChat(chatMessage); } else { - persistence.storeMessage(accountId(), chatMessage, updateChat); + storeMessages([chatMessage], (stored) -> updateChat(stored[0])); } } case ReactionUpdateStanza(update): @@ -287,7 +287,7 @@ class Client extends EventEmitter { if (avatarSha1 != null) { final chat = this.getDirectChat(JID.parse(pubsubEvent.getFrom()).asBare().asString(), false); chat.setAvatarSha1(avatarSha1); - persistence.storeChat(accountId(), chat); + persistence.storeChats(accountId(), [chat]); persistence.hasMedia("sha-1", avatarSha1, (has) -> { if (has) { this.trigger("chats/update", [chat]); @@ -321,7 +321,7 @@ class Client extends EventEmitter { startChatWith(item.attr.get("id"), (caps) -> Closed, (chat) -> chat.markReadUpToId(upTo.attr.get("id"), upTo.attr.get("by"))); } else { chat.markReadUpToId(upTo.attr.get("id"), upTo.attr.get("by"), () -> { - persistence.storeChat(accountId(), chat); + persistence.storeChats(accountId(), [chat]); this.trigger("chats/update", [chat]); }); } @@ -405,10 +405,10 @@ class Client extends EventEmitter { if (item.subscription != "remove") { final chat = getDirectChat(item.jid, false); chat.updateFromRoster(item); - persistence.storeChat(accountId(), chat); - chatsToUpdate.push(chat); + chatsToUpdate.push(cast (chat, Chat)); } } + persistence.storeChats(accountId(), chatsToUpdate); this.trigger("chats/update", chatsToUpdate); return IqResult; @@ -465,12 +465,12 @@ class Client extends EventEmitter { } if (c == null) { chat.setPresence(JID.parse(stanza.attr.get("from")).resource, new Presence(null, mucUser)); - persistence.storeChat(accountId(), chat); + persistence.storeChats(accountId(), [chat]); if (chat.livePresence()) this.trigger("chats/update", [chat]); } else { final handleCaps = (caps) -> { chat.setPresence(JID.parse(stanza.attr.get("from")).resource, new Presence(caps, mucUser)); - persistence.storeChat(accountId(), chat); + persistence.storeChats(accountId(), [chat]); return chat; }; @@ -506,7 +506,7 @@ class Client extends EventEmitter { if (avatarSha1Hex != null) { final avatarSha1 = Hash.fromHex("sha-1", avatarSha1Hex)?.hash; chat.setAvatarSha1(avatarSha1); - persistence.storeChat(accountId(), chat); + persistence.storeChats(accountId(), [chat]); persistence.hasMedia("sha-1", avatarSha1, (has) -> { if (has) { if (chat.livePresence()) this.trigger("chats/update", [chat]); @@ -535,7 +535,7 @@ class Client extends EventEmitter { } // Maybe in the future record it as offine rather than removing it chat.removePresence(JID.parse(stanza.attr.get("from")).resource); - persistence.storeChat(accountId(), chat); + persistence.storeChats(accountId(), [chat]); this.trigger("chats/update", [chat]); } @@ -874,7 +874,7 @@ class Client extends EventEmitter { } else { if (existingChat.uiState == Closed) existingChat.uiState = Open; channel?.selfPing(true); - persistence.storeChat(accountId(), existingChat); + persistence.storeChats(accountId(), [existingChat]); this.trigger("chats/update", [existingChat]); return existingChat; } @@ -888,7 +888,7 @@ class Client extends EventEmitter { } else { getDirectChat(availableChat.chatId, false); } - persistence.storeChat(accountId(), chat); + persistence.storeChats(accountId(), [chat]); this.trigger("chats/update", [chat]); return chat; } @@ -927,7 +927,7 @@ class Client extends EventEmitter { } } final chat = new DirectChat(this, this.stream, this.persistence, chatId); - persistence.storeChat(accountId(), chat); + persistence.storeChats(accountId(), [chat]); chats.unshift(chat); if (triggerIfNew) this.trigger("chats/update", [chat]); return chat; @@ -1178,15 +1178,15 @@ class Client extends EventEmitter { if (chat.isBlocked) return; // Don't notify blocked chats if (chat.uiState == Closed) { chat.uiState = Open; - persistence.storeChat(accountId(), chat); + persistence.storeChats(accountId(), [chat]); } final pinnedCount = chat.uiState == Pinned ? 0 : chats.fold((item, result) -> result + (item.uiState == Pinned ? 1 : 0), 0); var idx = chats.indexOf(chat); if (idx > pinnedCount) { chats.splice(idx, 1); chats.insert(pinnedCount, chat); - if (trigger) this.trigger("chats/update", [chat]); } + if (trigger) this.trigger("chats/update", [chat]); } @:allow(snikket) @@ -1199,8 +1199,8 @@ class Client extends EventEmitter { } @:allow(snikket) - private function storeMessage(message: ChatMessage, ?callback: Null<(ChatMessage)->Void>) { - persistence.storeMessage(accountId(), message, callback ?? (_)->{}); + private function storeMessages(messages: Array<ChatMessage>, ?callback: Null<(Array<ChatMessage>)->Void>) { + persistence.storeMessages(accountId(), messages, callback ?? (_)->{}); } @:allow(snikket) @@ -1293,9 +1293,9 @@ class Client extends EventEmitter { for (item in rosterGet.getResult()) { var chat = getDirectChat(item.jid, false); chat.updateFromRoster(item); - persistence.storeChat(accountId(), chat); - chatsToUpdate.push(chat); + chatsToUpdate.push(cast (chat, Chat)); } + persistence.storeChats(accountId(), chatsToUpdate); this.trigger("chats/update", chatsToUpdate); }); sendQuery(rosterGet); @@ -1310,7 +1310,7 @@ class Client extends EventEmitter { if (err == null || err?.name == "service-unavailable" || err?.name == "feature-not-implemented") { final chat = getDirectChat(jid, false); handleChat(chat); - persistence.storeChat(accountId(), chat); + persistence.storeChats(accountId(), [chat]); } } else { persistence.storeCaps(resultCaps); @@ -1319,11 +1319,11 @@ class Client extends EventEmitter { final chat = new Channel(this, this.stream, this.persistence, jid, uiState, false, null, resultCaps); handleChat(chat); chats.unshift(chat); - persistence.storeChat(accountId(), chat); + persistence.storeChats(accountId(), [chat]); } else { final chat = getDirectChat(jid, false); handleChat(chat); - persistence.storeChat(accountId(), chat); + persistence.storeChats(accountId(), [chat]); } } }); @@ -1347,6 +1347,7 @@ class Client extends EventEmitter { final mdsGet = new PubsubGet(null, "urn:xmpp:mds:displayed:0"); mdsGet.onFinished(() -> { + final chatsToUpdate = []; for (item in mdsGet.getResult()) { if (item.attr.get("id") != null) { final upTo = item.getChild("displayed", "urn:xmpp:mds:displayed:0")?.getChild("stanza-id", "urn:xmpp:sid:0"); @@ -1355,15 +1356,17 @@ class Client extends EventEmitter { startChatWith(item.attr.get("id"), (caps) -> Closed, (chat) -> chat.markReadUpToId(upTo.attr.get("id"), upTo.attr.get("by"))); } else { chat.markReadUpToId(upTo.attr.get("id"), upTo.attr.get("by")); - persistence.storeChat(accountId(), chat); + chatsToUpdate.push(chat); } } } + persistence.storeChats(accountId(), chatsToUpdate); }); sendQuery(mdsGet); final pubsubGet = new PubsubGet(null, "urn:xmpp:bookmarks:1"); pubsubGet.onFinished(() -> { + final chatsToUpdate = []; for (item in pubsubGet.getResult()) { if (item.attr.get("id") != null) { final chat = getChat(item.attr.get("id")); @@ -1384,10 +1387,11 @@ class Client extends EventEmitter { ); } else { chat.updateFromBookmark(item); - persistence.storeChat(accountId(), chat); + chatsToUpdate.push(chat); } } } + persistence.storeChats(accountId(), chatsToUpdate); callback(); }); sendQuery(pubsubGet); @@ -1426,12 +1430,11 @@ class Client extends EventEmitter { sync.setNewestPageFirst(false); sync.onMessages((messageList) -> { final promises = []; + final chatMessages = []; for (m in messageList.messages) { switch (m) { case ChatMessageStanza(message): - promises.push(new thenshim.Promise((resolve, reject) -> { - persistence.storeMessage(accountId(), message, resolve); - })); + chatMessages.push(message); case ReactionUpdateStanza(update): promises.push(new thenshim.Promise((resolve, reject) -> { persistence.storeReaction(accountId(), update, (_) -> resolve(null)); @@ -1444,11 +1447,18 @@ class Client extends EventEmitter { // ignore } } + promises.push(new thenshim.Promise((resolve, reject) -> { + persistence.storeMessages(accountId(), chatMessages, resolve); + })); trace("SYNC: MAM page wait for writes"); - thenshim.PromiseTools.all(promises).then((storedMessages) -> { + thenshim.PromiseTools.all(promises).then((results) -> { if (syncMessageHandlers.length > 0) { - for (message in storedMessages) { - notifySyncMessageHandlers(message); + for (messages in results) { + if (messages != null) { + for (message in messages) { + notifySyncMessageHandlers(message); + } + } } } diff --git a/snikket/Persistence.hx b/snikket/Persistence.hx index f63ea00..adebe48 100644 --- a/snikket/Persistence.hx +++ b/snikket/Persistence.hx @@ -10,12 +10,12 @@ import snikket.Message; #end interface Persistence { public function lastId(accountId: String, chatId: Null<String>, callback:(serverId:Null<String>)->Void):Void; - public function storeChat(accountId: String, chat: Chat):Void; + public function storeChats(accountId: String, chats: Array<Chat>):Void; public function getChats(accountId: String, callback: (chats:Array<SerializedChat>)->Void):Void; @HaxeCBridge.noemit public function getChatsUnreadDetails(accountId: String, chats: Array<Chat>, callback: (details:Array<{ chatId: String, message: ChatMessage, unreadCount: Int }>)->Void):Void; public function storeReaction(accountId: String, update: ReactionUpdate, callback: (Null<ChatMessage>)->Void):Void; - public function storeMessage(accountId: String, message: ChatMessage, callback: (ChatMessage)->Void):Void; + public function storeMessages(accountId: String, message: Array<ChatMessage>, callback: (Array<ChatMessage>)->Void):Void; public function updateMessage(accountId: String, message: ChatMessage):Void; public function updateMessageStatus(accountId: String, localId: String, status:MessageStatus, callback: (ChatMessage)->Void):Void; public function getMessage(accountId: String, chatId: String, serverId: Null<String>, localId: Null<String>, callback: (Null<ChatMessage>)->Void):Void; diff --git a/snikket/jingle/Session.hx b/snikket/jingle/Session.hx index 1483c9b..7ce4fcb 100644 --- a/snikket/jingle/Session.hx +++ b/snikket/jingle/Session.hx @@ -75,8 +75,8 @@ class IncomingProposedSession implements Session { // Store it for ourselves at least final event = new Stanza("ringing", { xmlns: "urn:xmpp:jingle-message:0", id: sid }); final msg = mkCallMessage(from, client.jid, event); - client.storeMessage(msg, (stored) -> { - client.notifyMessageHandlers(stored, CorrectionEvent); + client.storeMessages([msg], (stored) -> { + client.notifyMessageHandlers(stored[0], CorrectionEvent); }); client.trigger("call/ring", { chatId: from.asBare().asString(), session: this }); } @@ -87,8 +87,8 @@ class IncomingProposedSession implements Session { // Store it for ourselves at least final event = new Stanza("reject", { xmlns: "urn:xmpp:jingle-message:0", id: sid }); final msg = mkCallMessage(from, client.jid, event); - client.storeMessage(msg, (stored) -> { - client.notifyMessageHandlers(stored, CorrectionEvent); + client.storeMessages([msg], (stored) -> { + client.notifyMessageHandlers(stored[0], CorrectionEvent); }); client.getDirectChat(from.asBare().asString(), false).jingleSessions.remove(sid); } @@ -121,8 +121,8 @@ class IncomingProposedSession implements Session { client.sendPresence(from.asString()); final event = new Stanza("proceed", { xmlns: "urn:xmpp:jingle-message:0", id: sid }); final msg = mkCallMessage(from, client.jid, event); - client.storeMessage(msg, (stored) -> { - client.notifyMessageHandlers(stored, CorrectionEvent); + client.storeMessages([msg], (stored) -> { + client.notifyMessageHandlers(stored[0], CorrectionEvent); client.sendStanza( new Stanza("message", { to: from.asString(), type: "chat", id: msg.versions[0].localId }) .addChild(event) @@ -186,12 +186,12 @@ class OutgoingProposedSession implements Session { event.tag("description", { xmlns: "urn:xmpp:jingle:apps:rtp:1", media: "video" }).up(); } final msg = mkCallMessage(to, client.jid, event); - client.storeMessage(msg, (stored) -> { + client.storeMessages([msg], (stored) -> { final stanza = new Stanza("message", { to: to.asString(), type: "chat", id: msg.localId }) .addChild(event) .tag("store", { xmlns: "urn:xmpp:hints" }); client.sendStanza(stanza); - client.notifyMessageHandlers(stored, DeliveryEvent); + client.notifyMessageHandlers(stored[0], DeliveryEvent); client.trigger("call/ringing", { chatId: to.asBare().asString() }); }); } @@ -203,13 +203,13 @@ class OutgoingProposedSession implements Session { public function hangup() { final event = new Stanza("retract", { xmlns: "urn:xmpp:jingle-message:0", id: sid }); final msg = mkCallMessage(to, client.jid, event); - client.storeMessage(msg, (stored) -> { + client.storeMessages([msg], (stored) -> { client.sendStanza( new Stanza("message", { to: to.asString(), type: "chat", id: msg.versions[0].localId }) .addChild(event) .tag("store", { xmlns: "urn:xmpp:hints" }) ); - client.notifyMessageHandlers(stored, CorrectionEvent); + client.notifyMessageHandlers(stored[0], CorrectionEvent); }); client.getDirectChat(to.asBare().asString(), false).jingleSessions.remove(sid); } @@ -365,8 +365,8 @@ class InitiatedSession implements Session { final event = new Stanza("finish", { xmlns: "urn:xmpp:jingle-message:0", id: sid }); final msg = mkCallMessage(counterpart, client.jid, event); - client.storeMessage(msg, (stored) -> { - client.notifyMessageHandlers(stored, CorrectionEvent); + client.storeMessages([msg], (stored) -> { + client.notifyMessageHandlers(stored[0], CorrectionEvent); client.sendStanza( new Stanza("message", { to: counterpart.asString(), type: "chat", id: msg.versions[0].localId }) .addChild(event) diff --git a/snikket/persistence/Dummy.hx b/snikket/persistence/Dummy.hx index 17e8093..1c1afbe 100644 --- a/snikket/persistence/Dummy.hx +++ b/snikket/persistence/Dummy.hx @@ -29,7 +29,7 @@ class Dummy implements Persistence { } @HaxeCBridge.noemit - public function storeChat(accountId: String, chat: Chat) { } + public function storeChats(accountId: String, chat: Array<Chat>) { } @HaxeCBridge.noemit public function getChats(accountId: String, callback: (Array<SerializedChat>)->Void) { @@ -37,8 +37,8 @@ class Dummy implements Persistence { } @HaxeCBridge.noemit - public function storeMessage(accountId: String, message: ChatMessage, callback: (ChatMessage)->Void) { - callback(message); + public function storeMessages(accountId: String, messages: Array<ChatMessage>, callback: (Array<ChatMessage>)->Void) { + callback(messages); } @HaxeCBridge.noemit diff --git a/snikket/persistence/browser.js b/snikket/persistence/IDB.js similarity index 86% rename from snikket/persistence/browser.js rename to snikket/persistence/IDB.js index 51c21ce..f88c938 100644 --- a/snikket/persistence/browser.js +++ b/snikket/persistence/IDB.js @@ -4,7 +4,7 @@ import { snikket as enums } from "./snikket-enums.js"; import { snikket } from "./snikket.js"; -const browser = (dbname, tokenize, stemmer) => { +export default (dbname, media, tokenize, stemmer) => { if (!tokenize) tokenize = function(s) { return s.split(" "); } if (!stemmer) stemmer = function(s) { return s; } @@ -36,6 +36,7 @@ const browser = (dbname, tokenize, stemmer) => { }; dbOpenReq.onsuccess = (event) => { db = event.target.result; + window.db = db; if (!db.objectStoreNames.contains("messages") || !db.objectStoreNames.contains("keyvaluepairs") || !db.objectStoreNames.contains("chats") || !db.objectStoreNames.contains("services") || !db.objectStoreNames.contains("reactions")) { db.close(); openDb(db.version + 1); @@ -45,14 +46,6 @@ const browser = (dbname, tokenize, stemmer) => { } openDb(); - var cache = null; - caches.open(dbname).then((c) => cache = c); - - function mkNiUrl(hashAlgorithm, hashBytes) { - const b64url = btoa(Array.from(new Uint8Array(hashBytes), (x) => String.fromCodePoint(x)).join("")).replace(/\+/g, "-").replace(/\//g, "_").replace(/=/g, ""); - return "/.well-known/ni/" + hashAlgorithm + "/" + b64url; - } - function promisifyRequest(request) { return new Promise((resolve, reject) => { request.oncomplete = request.onsuccess = () => resolve(request.result); @@ -197,7 +190,7 @@ const browser = (dbname, tokenize, stemmer) => { return reactionsMap; } - return { + const obj = { lastId: function(account, jid, callback) { const tx = db.transaction(["messages"], "readonly"); const store = tx.objectStore("messages"); @@ -226,25 +219,27 @@ const browser = (dbname, tokenize, stemmer) => { } }, - storeChat: function(account, chat) { + storeChats: function(account, chats) { const tx = db.transaction(["chats"], "readwrite"); const store = tx.objectStore("chats"); - store.put({ - account: account, - chatId: chat.chatId, - trusted: chat.trusted, - avatarSha1: chat.avatarSha1, - presence: new Map([...chat.presence.entries()].map(([k, p]) => [k, { caps: p.caps?.ver(), mucUser: p.mucUser?.toString() }])), - displayName: chat.displayName, - uiState: chat.uiState, - isBlocked: chat.isBlocked, - extensions: chat.extensions?.toString(), - readUpToId: chat.readUpToId, - readUpToBy: chat.readUpToBy, - disco: chat.disco, - class: chat instanceof snikket.DirectChat ? "DirectChat" : (chat instanceof snikket.Channel ? "Channel" : "Chat") - }); + for (const chat of chats) { + store.put({ + account: account, + chatId: chat.chatId, + trusted: chat.trusted, + avatarSha1: chat.avatarSha1, + presence: new Map([...chat.presence.entries()].map(([k, p]) => [k, { caps: p.caps?.ver(), mucUser: p.mucUser?.toString() }])), + displayName: chat.displayName, + uiState: chat.uiState, + isBlocked: chat.isBlocked, + extensions: chat.extensions?.toString(), + readUpToId: chat.readUpToId, + readUpToBy: chat.readUpToBy, + disco: chat.disco, + class: chat instanceof snikket.DirectChat ? "DirectChat" : (chat instanceof snikket.Channel ? "Channel" : "Chat") + }); + } }, getChats: function(account, callback) { @@ -358,6 +353,12 @@ const browser = (dbname, tokenize, stemmer) => { })().then(callback); }, + storeMessages(account, messages, callback) { + Promise.all(messages.map(m => + new Promise(resolve => this.storeMessage(account, m, resolve)) + )).then(callback); + }, + storeMessage: function(account, message, callback) { if (!message.chatId()) throw "Cannot store a message with no chatId"; if (!message.serverId && !message.localId) throw "Cannot store a message with no id"; @@ -378,7 +379,7 @@ const browser = (dbname, tokenize, stemmer) => { this.getMessage(account, message.chatId(), reactionResult.value.serverId, reactionResult.value.localId, (reactToMessage) => { const previouslyAppended = hydrateReactionsArray(reactionResult.value.append, reactionResult.value.senderId, reactionResult.value.timestamp).map(r => r.key); const reactions = []; - for (const [k, reacts] of reactToMessage.reactions) { + for (const [k, reacts] of reactToMessage?.reactions || []) { for (const react of reacts) { if (react.senderId === message.senderId() && !previouslyAppended.includes(k)) reactions.push(react); } @@ -409,7 +410,8 @@ const browser = (dbname, tokenize, stemmer) => { event.target.result.continue(); } else { message.reactions = reactions; - store.put(serializeMessage(account, message)); + const req = store.put(serializeMessage(account, message)); + req.onerror = () => { window.mylog.push("MSG STORE ERROR: " + req.error.name + " " + req.error.message); } callback(message); } }; @@ -461,25 +463,38 @@ const browser = (dbname, tokenize, stemmer) => { this.getMessagesFromCursor(cursor, afterId, bound[0], callback); }, - getMessagesAround: function(account, chatId, id, time, callback) { - // TODO: if id is present but time is null, lookup time - if (!id && !time) throw "Around what?"; - const before = new Promise((resolve, reject) => - this.getMessagesBefore(account, chatId, id, time, resolve) - ); + getMessagesAround: function(account, chatId, id, timeArg, callback) { + if (!id && !timeArg) throw "Around what?"; + new Promise((resolve, reject) => { + if (timeArg) { + resolve(timeArg); + } else { + this.getMessage(account, chatId, id, null, (m) => { + m ? resolve(m.timestamp) : this.getMessage(account, chatId, null, id, (m2) => resolve(m2?.timestamp)); + }); + } + }).then((time) => { + if (!time) { + callback([]); + return; + } + const before = new Promise((resolve, reject) => + this.getMessagesBefore(account, chatId, id, time, resolve) + ); - const tx = db.transaction(["messages"], "readonly"); - const store = tx.objectStore("messages"); - const cursor = store.index("chats").openCursor( - IDBKeyRange.bound([account, chatId, new Date(time)], [account, chatId, []]), - "next" - ); - const aroundAndAfter = new Promise((resolve, reject) => - this.getMessagesFromCursor(cursor, null, null, resolve) - ); + const tx = db.transaction(["messages"], "readonly"); + const store = tx.objectStore("messages"); + const cursor = store.index("chats").openCursor( + IDBKeyRange.bound([account, chatId, new Date(time)], [account, chatId, []]), + "next" + ); + const aroundAndAfter = new Promise((resolve, reject) => + this.getMessagesFromCursor(cursor, null, null, resolve) + ); - Promise.all([before, aroundAndAfter]).then((result) => { - callback(result.flat()); + Promise.all([before, aroundAndAfter]).then((result) => { + callback(result.flat()); + }); }); }, @@ -540,75 +555,16 @@ const browser = (dbname, tokenize, stemmer) => { } }, - routeHashPathSW: function() { - const waitForMedia = async (uri) => { - const r = await this.getMediaResponse(uri); - if (r) return r; - await new Promise(resolve => setTimeout(resolve, 5000)); - return await waitForMedia(uri); - }; - - addEventListener("fetch", (event) => { - const url = new URL(event.request.url); - if (url.pathname.startsWith("/.well-known/ni/")) { - event.respondWith(waitForMedia(url.pathname)); - } - }); - }, - - getMediaResponse: async function(uri) { - uri = uri.replace(/^ni:\/\/\//, "/.well-known/ni/").replace(/;/, "/"); - var niUrl; - if (uri.split("/")[3] === "sha-256") { - niUrl = uri; - } else { - const tx = db.transaction(["keyvaluepairs"], "readonly"); - const store = tx.objectStore("keyvaluepairs"); - niUrl = await promisifyRequest(store.get(uri)); - if (!niUrl) { - return null; - } - } - - return await cache.match(niUrl); - }, - hasMedia: function(hashAlgorithm, hash, callback) { - (async () => { - const response = await this.getMediaResponse(mkNiUrl(hashAlgorithm, hash)); - return !!response; - })().then(callback); + media.hasMedia(hashAlgorithm, hash, callback); }, removeMedia: function(hashAlgorithm, hash) { - (async () => { - var niUrl; - if (hashAlgorithm === "sha-256") { - niUrl = mkNiUrl(hashAlgorithm, hash); - } else { - const tx = db.transaction(["keyvaluepairs"], "readonly"); - const store = tx.objectStore("keyvaluepairs"); - niUrl = await promisifyRequest(store.get(mkNiUrl(hashAlgorithm, hash))); - if (!niUrl) return; - } - - return await cache.delete(niUrl); - })(); + media.removeMedia(hashAlgorithm, hash); }, storeMedia: function(mime, buffer, callback) { - (async function() { - const sha256 = await crypto.subtle.digest("SHA-256", buffer); - const sha512 = await crypto.subtle.digest("SHA-512", buffer); - const sha1 = await crypto.subtle.digest("SHA-1", buffer); - const sha256NiUrl = mkNiUrl("sha-256", sha256); - await cache.put(sha256NiUrl, new Response(buffer, { headers: { "Content-Type": mime } })); - - const tx = db.transaction(["keyvaluepairs"], "readwrite"); - const store = tx.objectStore("keyvaluepairs"); - await promisifyRequest(store.put(sha256NiUrl, mkNiUrl("sha-1", sha1))); - await promisifyRequest(store.put(sha256NiUrl, mkNiUrl("sha-512", sha512))); - })().then(callback); + media.storeMedia(mime, buffer, callback); }, storeCaps: function(caps) { @@ -642,9 +598,14 @@ const browser = (dbname, tokenize, stemmer) => { }, storeStreamManagement: function(account, sm) { + // Don't bother on ios, the indexeddb is too broken + // https://bugs.webkit.org/show_bug.cgi?id=287876 + if (navigator.userAgent.match(/(iPad|iPhone|iPod)/g)) return; + const tx = db.transaction(["keyvaluepairs"], "readwrite"); const store = tx.objectStore("keyvaluepairs"); - store.put(sm, "sm:" + account).onerror = console.error; + const req = store.put(sm, "sm:" + account); + req.onerror = () => { console.error("storeStreamManagement", req.error.name, req.error.message); } }, getStreamManagement: function(account, callback) { @@ -769,8 +730,21 @@ const browser = (dbname, tokenize, stemmer) => { console.error(event); callback([]); } + }, + + get(k, callback) { + const tx = db.transaction(["keyvaluepairs"], "readonly"); + const store = tx.objectStore("keyvaluepairs"); + promisifyRequest(store.get(k)).then(callback); + }, + + set(k, v, callback) { + const tx = db.transaction(["keyvaluepairs"], "readwrite"); + const store = tx.objectStore("keyvaluepairs"); + promisifyRequest(store.put(v, k)).then(callback); } - } -}; + }; -export default browser; + media.setKV(obj); + return obj; +}; diff --git a/snikket/persistence/KeyValueStore.hx b/snikket/persistence/KeyValueStore.hx new file mode 100644 index 0000000..3ee5ad2 --- /dev/null +++ b/snikket/persistence/KeyValueStore.hx @@ -0,0 +1,6 @@ +package snikket.persistence; + +interface KeyValueStore { + public function get(k: String, callback: (Null<String>)->Void): Void; + public function set(k: String, v: Null<String>, callback: ()->Void): Void; +} diff --git a/snikket/persistence/MediaStore.hx b/snikket/persistence/MediaStore.hx new file mode 100644 index 0000000..e6faf3c --- /dev/null +++ b/snikket/persistence/MediaStore.hx @@ -0,0 +1,11 @@ +package snikket.persistence; + +import haxe.io.BytesData; + +interface MediaStore { + public function hasMedia(hashAlgorithm:String, hash:BytesData, callback: (has:Bool)->Void):Void; + public function removeMedia(hashAlgorithm:String, hash:BytesData):Void; + public function storeMedia(mime:String, bytes:BytesData, callback: ()->Void):Void; + @:allow(snikket) + private function setKV(kv: KeyValueStore):Void; +} diff --git a/snikket/persistence/MediaStoreCache.js b/snikket/persistence/MediaStoreCache.js new file mode 100644 index 0000000..757fe67 --- /dev/null +++ b/snikket/persistence/MediaStoreCache.js @@ -0,0 +1,80 @@ +// This example MediaStore is written in JavaScript +// so that SDK users can easily see how to write their own + +export default (cacheName) => { + var cache = null; + caches.open(cacheName).then((c) => cache = c); + + function mkNiUrl(hashAlgorithm, hashBytes) { + const b64url = btoa(Array.from(new Uint8Array(hashBytes), (x) => String.fromCodePoint(x)).join("")).replace(/\+/g, "-").replace(/\//g, "_").replace(/=/g, ""); + return "/.well-known/ni/" + hashAlgorithm + "/" + b64url; + } + + return { + setKV(kv) { + this.kv = kv; + }, + + storeMedia(mime, buffer, callback) { + (async () => { + const sha256 = await crypto.subtle.digest("SHA-256", buffer); + const sha1 = await crypto.subtle.digest("SHA-1", buffer); + const sha256NiUrl = mkNiUrl("sha-256", sha256); + await cache.put(sha256NiUrl, new Response(buffer, { headers: { "Content-Type": mime } })); + if (this.kv) await new Promise((resolve) => this.kv.set(mkNiUrl("sha-1", sha1), sha256NiUrl, resolve)); + })().then(callback); + }, + + removeMedia(hashAlgorithm, hash) { + (async () => { + let niUrl; + if (hashAlgorithm === "sha-256") { + niUrl = mkNiUrl(hashAlgorithm, hash); + } else { + niUrl = this.kv && await new Promise((resolve) => this.kv.get(mkNiUrl(hashAlgorithm, hash), resolve)); + if (!niUrl) return; + } + + return await cache.delete(niUrl); + })(); + }, + + routeHashPathSW() { + const waitForMedia = async (uri) => { + const r = await this.getMediaResponse(uri); + if (r) return r; + await new Promise(resolve => setTimeout(resolve, 5000)); + return await waitForMedia(uri); + }; + + addEventListener("fetch", (event) => { + const url = new URL(event.request.url); + if (url.pathname.startsWith("/.well-known/ni/")) { + event.respondWith(waitForMedia(url.pathname)); + } + }); + }, + + async getMediaResponse(uri) { + uri = uri.replace(/^ni:\/\/\//, "/.well-known/ni/").replace(/;/, "/"); + var niUrl; + if (uri.split("/")[3] === "sha-256") { + niUrl = uri; + } else { + niUrl = this.kv && await new Promise((resolve) => this.kv.get(uri, resolve)); + if (!niUrl) { + return null; + } + } + + return await cache.match(niUrl); + }, + + hasMedia(hashAlgorithm, hash, callback) { + (async () => { + const response = await this.getMediaResponse(mkNiUrl(hashAlgorithm, hash)); + return !!response; + })().then(callback); + } + }; +}; diff --git a/snikket/persistence/MediaStoreFS.hx b/snikket/persistence/MediaStoreFS.hx new file mode 100644 index 0000000..25fcc30 --- /dev/null +++ b/snikket/persistence/MediaStoreFS.hx @@ -0,0 +1,89 @@ +package snikket.persistence; + +#if cpp +import HaxeCBridge; +#end +import haxe.io.Bytes; +import haxe.io.BytesData; +import sys.FileSystem; +import sys.io.File; +import thenshim.Promise; + +#if cpp +@:build(HaxeCBridge.expose()) +#end +class MediaStoreFS implements MediaStore { + private final blobpath: String; + private var kv: Null<KeyValueStore> = null; + + public function new(path: String) { + blobpath = path; + } + + @:allow(snikket) + private function setKV(kv: KeyValueStore) { + this.kv = kv; + } + + @HaxeCBridge.noemit + public function getMediaPath(hashAlgorithm: String, hash: BytesData, callback: (Null<String>)->Void) { + if (hashAlgorithm == "sha-256") { + final path = blobpath + "/f" + new Hash(hashAlgorithm, hash).toHex(); + if (FileSystem.exists(path)) { + callback(FileSystem.absolutePath(path)); + } else { + callback(null); + } + } else { + final hash = new Hash(hashAlgorithm, hash); + get(hash.serializeUri()).then(sha256uri -> { + final sha256 = sha256uri == null ? null : Hash.fromUri(sha256uri); + if (sha256 == null) { + callback(null); + } else { + getMediaPath(sha256.algorithm, sha256.hash, callback); + } + }); + } + } + + @HaxeCBridge.noemit + public function hasMedia(hashAlgorithm:String, hash:BytesData, callback: (Bool)->Void) { + getMediaPath(hashAlgorithm, hash, path -> callback(path != null)); + } + + @HaxeCBridge.noemit + public function removeMedia(hashAlgorithm: String, hash: BytesData) { + getMediaPath(hashAlgorithm, hash, (path) -> { + if (path != null) FileSystem.deleteFile(path); + }); + } + + @HaxeCBridge.noemit + public function storeMedia(mime: String, bd: BytesData, callback: ()->Void) { + final bytes = Bytes.ofData(bd); + final sha1 = Hash.sha1(bytes); + final sha256 = Hash.sha256(bytes); + File.saveBytes(blobpath + "/f" + sha256.toHex(), bytes); + thenshim.PromiseTools.all([ + set(sha1.serializeUri(), sha256.serializeUri()), + set(sha256.serializeUri() + "#contentType", mime) + ]).then((_) -> callback()); + } + + private function set(k: String, v: Null<String>) { + if (kv == null) return Promise.resolve(null); + + return new Promise((resolve, reject) -> + kv.set(k, v, () -> resolve(null)) + ); + } + + private function get(k: String): Promise<Null<String>> { + if (kv == null) return Promise.resolve(null); + + return new Promise((resolve, reject) -> + kv.get(k, resolve) + ); + } +} diff --git a/snikket/persistence/Sqlite.hx b/snikket/persistence/Sqlite.hx index 24a4e1a..93e9a47 100644 --- a/snikket/persistence/Sqlite.hx +++ b/snikket/persistence/Sqlite.hx @@ -3,257 +3,372 @@ package snikket.persistence; #if cpp import HaxeCBridge; #end -import datetime.DateTime; +import haxe.DynamicAccess; import haxe.Json; import haxe.crypto.Base64; -import haxe.crypto.Sha1; -import haxe.crypto.Sha256; import haxe.io.Bytes; import haxe.io.BytesData; -import sys.FileSystem; -import sys.db.Connection; -import sys.io.File; +import thenshim.Promise; import snikket.Caps; import snikket.Chat; import snikket.Message; - -// TODO: consider doing background threads for operations +using Lambda; @:expose #if cpp @:build(HaxeCBridge.expose()) @:build(HaxeSwiftBridge.expose()) #end -class Sqlite implements Persistence { - final db: Connection; - final blobpath: String; +class Sqlite implements Persistence implements KeyValueStore { + final db: SqliteDriver; + final media: MediaStore; /** Create a basic persistence layer based on sqlite @param dbfile path to sqlite database - @params blobpath path to directory for blob storage + @params media a MediaStore to use for media @returns new persistence layer **/ - public function new(dbfile: String, blobpath: String) { - this.blobpath = blobpath; - db = sys.db.Sqlite.open(dbfile); - final version = db.request("PRAGMA user_version;").getIntResult(0); - if (version < 1) { - db.request("CREATE TABLE messages ( - account_id TEXT NOT NULL, - mam_id TEXT, - mam_by TEXT, - stanza_id TEXT NOT NULL, - sync_point BOOLEAN NOT NULL, - chat_id TEXT NOT NULL, - created_at INTEGER NOT NULL, - stanza TEXT NOT NULL, - PRIMARY KEY (account_id, mam_id, mam_by) - );"); - db.request("CREATE TABLE chats ( - account_id TEXT NOT NULL, - chat_id TEXT NOT NULL, - trusted BOOLEAN NOT NULL, - avatar_sha1 BLOB, - fn TEXT, - ui_state TEXT NOT NULL, - blocked BOOLEAN NOT NULL, - extensions TEXT, - read_up_to_id TEXT, - read_up_to_by TEXT, - class TEXT NOT NULL, - PRIMARY KEY (account_id, chat_id) - );"); - db.request("CREATE TABLE media ( - sha256 BLOB NOT NULL PRIMARY KEY, - sha1 BLOB NOT NULL UNIQUE, - mime TEXT NOT NULL - );"); - db.request("CREATE TABLE caps ( - sha1 BLOB NOT NULL UNIQUE, - caps JSONB NOT NULL - );"); - db.request("CREATE TABLE services ( - account_id TEXT NOT NULL, - service_id TEXT NOT NULL, - name TEXT, - node TEXT, - caps BLOB NOT NULL, - PRIMARY KEY (account_id, service_id) - );"); - db.request("CREATE TABLE logins ( - login TEXT NOT NULL, - client_id TEXT NOT NULL, - display_name TEXT, - token TEXT, - fast_count INTEGER NOT NULL DEFAULT 0, - PRIMARY KEY (login) - );"); - db.request("PRAGMA user_version = 1;"); - } + public function new(dbfile: String, media: MediaStore) { + this.media = media; + media.setKV(this); + db = new SqliteDriver(dbfile); + final version = db.exec("PRAGMA user_version;").then(iter -> { + final version = Std.parseInt(iter.next()?.user_version) ?? 0; + return if (version < 1) { + // messages cannot be STRICT because mam_id may be NULL + db.exec("CREATE TABLE messages ( + account_id TEXT NOT NULL, + mam_id TEXT, + mam_by TEXT, + stanza_id TEXT, + sync_point INTEGER NOT NULL, + chat_id TEXT NOT NULL, + created_at INTEGER NOT NULL, + status INTEGER NOT NULL, + direction INTEGER NOT NULL, + stanza TEXT NOT NULL, + PRIMARY KEY (account_id, mam_id, mam_by, stanza_id) + ); + CREATE INDEX messages_created_at ON messages (account_id, chat_id, created_at); + CREATE TABLE chats ( + account_id TEXT NOT NULL, + chat_id TEXT NOT NULL, + trusted INTEGER NOT NULL, + avatar_sha1 BLOB, + fn TEXT, + ui_state INTEGER NOT NULL, + blocked INTEGER NOT NULL, + extensions TEXT, + read_up_to_id TEXT, + read_up_to_by TEXT, + caps_ver BLOB, + presence BLOB NOT NULL, + class TEXT NOT NULL, + PRIMARY KEY (account_id, chat_id) + ) STRICT; + CREATE TABLE keyvaluepairs ( + k TEXT NOT NULL PRIMARY KEY, + v TEXT NOT NULL + ) STRICT; + CREATE TABLE caps ( + sha1 BLOB NOT NULL PRIMARY KEY, + caps BLOB NOT NULL + ) STRICT; + CREATE TABLE services ( + account_id TEXT NOT NULL, + service_id TEXT NOT NULL, + name TEXT, + node TEXT, + caps BLOB NOT NULL, + PRIMARY KEY (account_id, service_id) + ) STRICT; + CREATE TABLE accounts ( + account_id TEXT NOT NULL, + client_id TEXT NOT NULL, + display_name TEXT, + token TEXT, + fast_count INTEGER NOT NULL DEFAULT 0, + sm_state BLOB, + PRIMARY KEY (account_id) + ) STRICT; + PRAGMA user_version = 1;"); + } + }); } @HaxeCBridge.noemit - public function lastId(accountId: String, chatId: Null<String>, callback:(Null<String>)->Void):Void { - final q = new StringBuf(); - q.add("SELECT mam_id FROM messages WHERE mam_id IS NOT NULL AND sync_point AND account_id="); - db.addValue(q, accountId); - if (chatId != null) { - q.add(" AND chat_id="); - db.addValue(q, chatId); - } - q.add(";"); - try { - callback(db.request(q.toString()).getResult(0)); - } catch (e) { + public function get(k: String, callback: (Null<String>)->Void) { + db.exec("SELECT v FROM keyvaluepairs WHERE k=? LIMIT 1", [k]).then(iter -> { + for (row in iter) { + callback(row.v); + return; + } callback(null); + }); + } + + @HaxeCBridge.noemit + public function set(k: String, v: Null<String>, callback: ()->Void) { + if (v == null) { + db.exec("DELETE FROM keyvaluepairs WHERE k=?", [k]).then(_ -> { + callback(); + }); + } else { + db.exec("INSERT OR REPLACE INTO keyvaluepairs VALUES (?,?)", [k, v]).then(_ -> { + callback(); + }); } } @HaxeCBridge.noemit - public function storeChat(accountId: String, chat: Chat) { - // TODO: presence - // TODO: disco - trace("storeChat"); - final q = new StringBuf(); - q.add("INSERT OR REPLACE INTO chats VALUES ("); - db.addValue(q, accountId); - q.add(","); - db.addValue(q, chat.chatId); - q.add(","); - db.addValue(q, chat.isTrusted()); - if (chat.avatarSha1 == null) { - q.add(",NULL"); + public function lastId(accountId: String, chatId: Null<String>, callback:(Null<String>)->Void):Void { + final params = [accountId]; + var q = "SELECT mam_id FROM messages WHERE mam_id IS NOT NULL AND sync_point AND account_id=?"; + if (chatId == null) { + q += " AND mam_by=?"; + params.push(accountId); } else { - q.add(",X"); - db.addValue(q, Bytes.ofData(chat.avatarSha1).toHex()); + q += " AND chat_id=?"; + params.push(chatId); } - q.add(","); - db.addValue(q, chat.getDisplayName()); - q.add(","); - db.addValue(q, chat.uiState); - q.add(","); - db.addValue(q, chat.isBlocked); - q.add(","); - db.addValue(q, chat.extensions); - q.add(","); - db.addValue(q, chat.readUpTo()); - q.add(","); - db.addValue(q, chat.readUpToBy); - q.add(","); - db.addValue(q, Type.getClassName(Type.getClass(chat)).split(".").pop()); - q.add(");"); - db.request(q.toString()); + q += " ORDER BY ROWID DESC LIMIT 1"; + db.exec(q, params).then(iter -> callback(iter.next()?.mam_id), (_) -> callback(null)); } + private final storeChatBuffer: Map<String, Chat> = []; + private var storeChatTimer = null; + @HaxeCBridge.noemit - public function getChats(accountId: String, callback: (Array<SerializedChat>)->Void) { - // TODO: presence - // TODO: disco - final q = new StringBuf(); - q.add("SELECT chat_id, trusted, avatar_sha1, fn, ui_state, blocked, extensions, read_up_to_id, read_up_to_by, class FROM chats WHERE account_id="); - db.addValue(q, accountId); - final result = db.request(q.toString()); - final chats = []; - for (row in result) { - chats.push(new SerializedChat(row.chat_id, row.trusted, row.avatar_sha1, [], row.fn, row.ui_state, row.blocked, row.extensions, row.read_up_to_id, row.read_up_to_by, null, Reflect.field(row, "class"))); + public function storeChats(accountId: String, chats: Array<Chat>) { + if (storeChatTimer != null) { + storeChatTimer.stop(); + } + + for (chat in chats) { + storeChatBuffer[accountId + "\n" + chat.chatId] = chat; } - callback(chats); + + storeChatTimer = haxe.Timer.delay(() -> { + final mapPresence = (chat: Chat) -> { + final storePresence: DynamicAccess<{ ?caps: String, ?mucUser: String }> = {}; + for (resource => presence in chat.presence) { + storePresence[resource] = {}; + if (presence.caps != null) { + storeCaps(presence.caps); + storePresence[resource].caps = presence.caps.ver(); + } + if (presence.mucUser != null) { + storePresence[resource].mucUser = presence.mucUser.toString(); + } + } + return storePresence; + }; + final q = new StringBuf(); + q.add("INSERT OR REPLACE INTO chats VALUES "); + var first = true; + for (_ in storeChatBuffer) { + if (!first) q.add(","); + first = false; + q.add("(?,?,?,?,?,?,?,?,?,?,?,jsonb(?),?)"); + } + db.exec( + q.toString(), + storeChatBuffer.flatMap(chat -> { + final channel = Std.downcast(chat, Channel); + if (channel != null) storeCaps(channel.disco); + final row: Array<Dynamic> = [ + accountId, chat.chatId, chat.isTrusted(), chat.avatarSha1, + chat.getDisplayName(), chat.uiState, chat.isBlocked, + chat.extensions.toString(), chat.readUpTo(), chat.readUpToBy, + channel?.disco?.verRaw().hash, Json.stringify(mapPresence(chat)), + Type.getClassName(Type.getClass(chat)).split(".").pop() + ]; + return row; + }) + ); + storeChatTimer = null; + storeChatBuffer.clear(); + }, 100); } @HaxeCBridge.noemit - public function storeMessage(accountId: String, message: ChatMessage, callback: (ChatMessage)->Void) { - final q = new StringBuf(); - q.add("INSERT OR REPLACE INTO messages VALUES ("); - db.addValue(q, accountId); - q.add(","); - db.addValue(q, message.serverId); - q.add(","); - db.addValue(q, message.serverIdBy); - q.add(","); - db.addValue(q, message.localId); - q.add(","); - db.addValue(q, message.syncPoint); - q.add(","); - db.addValue(q, message.chatId()); - q.add(","); - db.addValue(q, DateTime.fromString(message.timestamp).getTime()); - q.add(","); - db.addValue(q, message.asStanza().toString()); - q.add(");"); - db.request(q.toString()); + public function getChats(accountId: String, callback: (Array<SerializedChat>)->Void) { + db.exec( + "SELECT chat_id, trusted, avatar_sha1, fn, ui_state, blocked, extensions, read_up_to_id, read_up_to_by, json(caps) AS caps, json(presence) AS presence, class FROM chats LEFT JOIN caps ON chats.caps_ver=caps.sha1 WHERE account_id=?", + [accountId] + ).then(result -> { + final fetchCaps: Map<BytesData, Bool> = []; + final chats: Array<Dynamic> = []; + for (row in result) { + final capsJson = row.caps == null ? null : Json.parse(row.caps); + row.capsObj = capsJson == null ? null : new Caps(capsJson.node, capsJson.identities.map(i -> new Identity(i.category, i.type, i.name)), capsJson.features); + final presenceJson: DynamicAccess<Dynamic> = Json.parse(row.presence); + row.presenceJson = presenceJson; + for (resource => presence in presenceJson) { + if (presence.caps) fetchCaps[Base64.decode(presence.caps).getData()] = true; + } + chats.push(row); + } + final fetchCapsSha1s = { iterator: () -> fetchCaps.keys() }.array(); + return db.exec( + "SELECT sha1, json(caps) AS caps FROM caps WHERE sha1 IN (" + fetchCapsSha1s.map(_ -> "?").join(",") + ")", + fetchCapsSha1s + ).then(capsResult -> { chats: chats, caps: capsResult }); + }).then(result -> { + final capsMap: Map<String, Caps> = []; + for (row in result.caps) { + final json = Json.parse(row.caps); + capsMap[Base64.encode(Bytes.ofData(row.sha1))] = new Caps(json.node, json.identities.map(i -> new Identity(i.category, i.type, i.name)), json.features); + } + final chats = []; + for (row in result.chats) { + final presenceMap: Map<String, Presence> = []; + final presenceJson: DynamicAccess<Dynamic> = row.presenceJson; + for (resource => presence in presenceJson) { + presenceMap[resource] = new Presence( + presence.caps == null ? null : capsMap[presence.caps], + presence.mucUser == null ? null : Stanza.parse(presence.mucUser) + ); + } + chats.push(new SerializedChat(row.chat_id, row.trusted, row.avatar_sha1, presenceMap, row.fn, row.ui_state, row.blocked, row.extensions, row.read_up_to_id, row.read_up_to_by, row.capsObj, Reflect.field(row, "class"))); + } + callback(chats); + }); + } + + @HaxeCBridge.noemit + public function storeMessages(accountId: String, messages: Array<ChatMessage>, callback: (Array<ChatMessage>)->Void) { + if (messages.length < 1) { + callback(messages); + return; + } + + final chatIds = []; + final localIds = []; + for (message in messages) { + if (message.serverId == null && message.localId == null) throw "Cannot store a message with no id"; + if (message.serverId == null && message.isIncoming()) throw "Cannot store an incoming message with no server id"; + if (message.serverId != null && message.serverIdBy == null) throw "Cannot store a message with a server id and no by"; + + if (!message.isIncoming() && message.versions.length < 1) { + // Duplicate, we trust our own sent ids + // Ideally this would be in a transaction with the insert, but then we can't use bind with async api + chatIds.push(message.chatId()); + localIds.push(message.localId); + } + } + + (if (chatIds.length > 0 && localIds.length > 0) { + final q = new StringBuf(); + q.add("DELETE FROM messages WHERE account_id=? AND direction=? AND chat_id IN ("); + q.add(chatIds.map(_ -> "?").join(",")); + q.add(") AND stanza_id IN ("); + q.add(localIds.map(_ -> "?").join(",")); + q.add(")"); + db.exec(q.toString(), ([accountId, MessageSent] : Array<Dynamic>).concat(chatIds).concat(localIds)); + } else { + Promise.resolve(null); + }).then(_ -> + db.exec( + "INSERT OR REPLACE INTO messages VALUES " + messages.map(_ -> "(?,?,?,?,?,?,CAST(unixepoch(?, 'subsec') * 1000 AS INTEGER),?,?,?)").join(","), + messages.flatMap(message -> ([ + accountId, message.serverId, message.serverIdBy, + message.localId, message.syncPoint, message.chatId(), + message.timestamp, message.status, message.direction, + message.asStanza().toString() + ] : Array<Dynamic>)) + ).then(_ -> callback(messages)) + ); // TODO: hydrate reply to stubs? // TODO: corrections // TODO: fetch reactions? - callback(message); } @HaxeCBridge.noemit public function updateMessage(accountId: String, message: ChatMessage) { - storeMessage(accountId, message, (_)->{}); + storeMessages(accountId, [message], (_)->{}); } - public function getMessage(accountId: String, chatId: String, serverId: Null<String>, localId: Null<String>, callback: (Null<ChatMessage>)->Void) { - final q = new StringBuf(); - q.add("SELECT stanza FROM messages WHERE account_id="); - db.addValue(q, accountId); - q.add(" AND chat_id="); - db.addValue(q, chatId); + var q = "SELECT stanza, direction, strftime('%FT%H:%M:%fZ', created_at / 1000.0, 'unixepoch') AS timestamp, mam_id, mam_by FROM messages WHERE account_id=? AND chat_id=?"; + final params = [accountId, chatId]; if (serverId != null) { - q.add(" AND mam_id="); - db.addValue(q, serverId); + q += " AND mam_id=?"; + params.push(serverId); } else if (localId != null) { - q.add(" AND stanza_id="); - db.addValue(q, localId); - } - q.add("LIMIT 1"); - final result = db.request(q.toString()); - final messages = []; - for (row in result) { - callback(ChatMessage.fromStanza(Stanza.parse(row.stanza), JID.parse(accountId))); // TODO - return; + q += " AND stanza_id=?"; + params.push(localId); } - callback(null); + q += "LIMIT 1"; + db.exec(q, params).then(result -> { + for (row in result) { + callback(hydrateMessage(accountId, row)); + return; + } + callback(null); + }); } - private function getMessages(accountId: String, chatId: String, time: String, op: String) { - final q = new StringBuf(); - q.add("SELECT stanza FROM messages WHERE account_id="); - db.addValue(q, accountId); - q.add(" AND chat_id="); - db.addValue(q, chatId); + private function getMessages(accountId: String, chatId: String, time: Null<String>, op: String): Promise<Iterator<ChatMessage>> { + var q = "SELECT stanza, direction, strftime('%FT%H:%M:%fZ', created_at / 1000.0, 'unixepoch') AS timestamp, mam_id, mam_by FROM messages WHERE account_id=? AND chat_id=?"; + final params = [accountId, chatId]; if (time != null) { - q.add(" AND created_at " + op); - db.addValue(q, DateTime.fromString(time).getTime()); - } - q.add("LIMIT 50"); - final result = db.request(q.toString()); - final messages = []; - for (row in result) { - messages.push(ChatMessage.fromStanza(Stanza.parse(row.stanza), JID.parse(accountId))); // TODO + q += " AND created_at " + op + "CAST(unixepoch(?, 'subsec') * 1000 AS INTEGER)"; + params.push(time); } - return messages; + q += " ORDER BY created_at"; + if (op == "<" || op == "<=") q += " DESC"; + q += ", ROWID"; + if (op == "<" || op == "<=") q += " DESC"; + q += " LIMIT 50"; + return db.exec(q, params).then(result -> ({ + hasNext: result.hasNext, + next: () -> hydrateMessage(accountId, result.next()) + })).then(iter -> + if (op == "<" || op == "<=") { + final arr = { iterator: () -> iter }.array(); + arr.reverse(); + final reviter = arr.iterator(); + { hasNext: reviter.hasNext, next: reviter.next }; + } else { + iter; + } + ); } @HaxeCBridge.noemit public function getMessagesBefore(accountId: String, chatId: String, beforeId: Null<String>, beforeTime: Null<String>, callback: (Array<ChatMessage>)->Void) { - callback(getMessages(accountId, chatId, beforeTime, "<")); + getMessages(accountId, chatId, beforeTime, "<").then(iter -> callback({ iterator: () -> iter }.array())); } @HaxeCBridge.noemit public function getMessagesAfter(accountId: String, chatId: String, afterId: Null<String>, afterTime: Null<String>, callback: (Array<ChatMessage>)->Void) { - callback(getMessages(accountId, chatId, afterTime, ">")); + getMessages(accountId, chatId, afterTime, ">").then(iter -> callback({ iterator: () -> iter }.array())); } @HaxeCBridge.noemit public function getMessagesAround(accountId: String, chatId: String, aroundId: Null<String>, aroundTime: Null<String>, callback: (Array<ChatMessage>)->Void) { - if (aroundTime == null) throw "Around what?"; - final before = getMessages(accountId, chatId, aroundTime, "<"); - final aroundAndAfter = getMessages(accountId, chatId, aroundTime, ">="); - callback(before.concat(aroundAndAfter)); + (if (aroundTime == null) { + new Promise((resolve, reject) -> getMessage(accountId, chatId, aroundId, null, resolve)).then(m -> + if (m != null) { + Promise.resolve(m.timestamp); + } else { + new Promise((resolve, reject) -> getMessage(accountId, chatId, null, aroundId, resolve)).then(m -> m?.timestamp); + } + ); + } else { + Promise.resolve(aroundTime); + }).then(aroundTime -> + thenshim.PromiseTools.all([ + getMessages(accountId, chatId, aroundTime, "<"), + getMessages(accountId, chatId, aroundTime, ">=") + ]) + ).then(results -> + callback(results.flatMap(iter -> { iterator: () -> iter }.array())) + ); } @HaxeCBridge.noemit @@ -263,53 +378,51 @@ class Sqlite implements Persistence { return; } + final params: Array<Dynamic> = [accountId]; // subq is first in final q, so subq params first + final subq = new StringBuf(); - subq.add("SELECT chat_id, MAX(ROWID) AS row FROM messages WHERE account_id="); - db.addValue(subq, accountId); + subq.add("SELECT chat_id, MAX(ROWID) AS row FROM messages WHERE account_id=?"); subq.add(" AND chat_id IN ("); for (i => chat in chats) { if (i != 0) subq.add(","); - db.addValue(subq, chat.chatId); + subq.add("?"); + params.push(chat.chatId); } subq.add(") AND (mam_id IN ("); var didOne = false; for (chat in chats) { if (chat.readUpTo() != null) { if (didOne) subq.add(","); - db.addValue(subq, chat.readUpTo()); + subq.add("?"); + params.push(chat.readUpTo()); didOne = true; } } - subq.add(") OR stanza_id IN ("); - didOne = false; - for (chat in chats) { - if (chat.readUpTo() != null) { - if (didOne) subq.add(","); - db.addValue(subq, chat.readUpTo()); - didOne = true; - } - } - subq.add(")) GROUP BY chat_id"); + subq.add(") OR direction=?) GROUP BY chat_id"); + params.push(MessageSent); final q = new StringBuf(); - q.add("SELECT chat_id as chatId, stanza, CASE WHEN subq.row IS NULL THEN COUNT(*) ELSE COUNT(*) - 1 END AS unreadCount, MAX(messages.created_at) "); - q.add("FROM messages LEFT JOIN ("); + q.add("SELECT chat_id AS chatId, stanza, direction, mam_id, mam_by, CASE WHEN subq.row IS NULL THEN COUNT(*) ELSE COUNT(*) - 1 END AS unreadCount, strftime('%FT%H:%M:%fZ', MAX(messages.created_at) / 1000.0, 'unixepoch') AS timestamp FROM messages LEFT JOIN ("); q.add(subq.toString()); - q.add(") subq USING (chat_id) WHERE account_id="); - db.addValue(q, accountId); - q.add(" AND chat_id IN ("); + q.add(") subq USING (chat_id) WHERE account_id=? AND chat_id IN ("); + params.push(accountId); for (i => chat in chats) { if (i != 0) q.add(","); - db.addValue(q, chat.chatId); + q.add("?"); + params.push(chat.chatId); } q.add(") AND (subq.row IS NULL OR messages.ROWID >= subq.row) GROUP BY chat_id;"); - final result = db.request(q.toString()); - final details = []; - for (row in result) { - row.message = ChatMessage.fromStanza(Stanza.parse(row.stanza), JID.parse(accountId)); // TODO - details.push(row); - } - callback(details); + db.exec(q.toString(), params).then(result -> { + final details = []; + for (row in result) { + details.push({ + unreadCount: row.unreadCount, + chatId: row.chatId, + message: hydrateMessage(accountId, row) + }); + } + callback(details); + }); } @HaxeCBridge.noemit @@ -319,211 +432,188 @@ class Sqlite implements Persistence { @HaxeCBridge.noemit public function updateMessageStatus(accountId: String, localId: String, status:MessageStatus, callback: (ChatMessage)->Void) { - callback(null); // TODO - } - - @HaxeCBridge.noemit - public function getMediaPath(hashAlgorithm:String, hash:BytesData) { - if (hashAlgorithm == "sha-256") { - final path = blobpath + "/f" + Bytes.ofData(hash).toHex(); - if (FileSystem.exists(path)) { - return FileSystem.absolutePath(path); - } else { - return null; - } - } else if (hashAlgorithm == "sha-1") { - final q = new StringBuf(); - q.add("SELECT sha256 FROM media WHERE sha1=X"); - db.addValue(q, Bytes.ofData(hash).toHex()); - q.add(" LIMIT 1"); - final result = db.request(q.toString()); + db.exec( + "UPDATE messages SET status=? WHERE account_id=? AND stanza_id=? AND direction=? AND status <> ?", + [status, accountId, localId, MessageSent, MessageDeliveredToDevice] + ).then(_ -> + db.exec( + "SELECT stanza, strftime('%FT%H:%M:%fZ') AS timestamp, mam_id, mam_by FROM messages WHERE account_id=? AND stanza_id=? AND direction=?", + [accountId, localId, MessageSent] + ) + ).then(result -> { for (row in result) { - return getMediaPath("sha-256", row.sha256); + callback(hydrateMessage(accountId, row)); + return; } - return null; - } else { - throw "Unknown hash algorithm: " + hashAlgorithm; - } + }); } @HaxeCBridge.noemit public function hasMedia(hashAlgorithm:String, hash:BytesData, callback: (Bool)->Void) { - callback(getMediaPath(hashAlgorithm, hash) != null); + media.hasMedia(hashAlgorithm, hash, callback); } @HaxeCBridge.noemit public function removeMedia(hashAlgorithm:String, hash:BytesData) { - final path = getMediaPath(hashAlgorithm, hash); - if (path != null) FileSystem.deleteFile(path); + media.removeMedia(hashAlgorithm, hash); } @HaxeCBridge.noemit - public function storeMedia(mime:String, bd:BytesData, callback: ()->Void) { - final bytes = Bytes.ofData(bd); - final sha256 = Sha256.make(bytes).toHex(); - final sha1 = Sha1.make(bytes).toHex(); - File.saveBytes(blobpath + "/f" + sha256, bytes); - - final q = new StringBuf(); - q.add("INSERT OR IGNORE INTO media VALUES (X"); - db.addValue(q, sha256); - q.add(",X"); - db.addValue(q, sha1); - q.add(","); - db.addValue(q, mime); - q.add(");"); - db.request(q.toString()); - - callback(); + public function storeMedia(mime: String, bd: BytesData, callback: ()->Void) { + media.storeMedia(mime, bd, callback); } @HaxeCBridge.noemit public function storeCaps(caps:Caps) { - final q = new StringBuf(); - q.add("INSERT OR IGNORE INTO caps VALUES (X"); - db.addValue(q, caps.verRaw().toHex()); - q.add(",jsonb("); - db.addValue(q, Json.stringify(caps)); - q.add("));"); - db.request(q.toString()); + db.exec( + "INSERT OR IGNORE INTO caps VALUES (?,jsonb(?))", + [caps.verRaw().hash, Json.stringify({ node: caps.node, identities: caps.identities, features: caps.features })] + ); } @HaxeCBridge.noemit public function getCaps(ver:String, callback: (Caps)->Void) { - final q = new StringBuf(); - q.add("SELECT json(caps) AS caps FROM caps WHERE sha1=X"); - db.addValue(q, Base64.decode(ver).toHex()); - q.add(" LIMIT 1"); - final result = db.request(q.toString()); - for (row in result) { - final json = Json.parse(row.caps); - callback(new Caps(json.node, json.identities.map(i -> new Identity(i.category, i.type, i.name)), json.features)); + final verData = try { + Base64.decode(ver).getData(); + } catch (e) { + callback(null); return; } - callback(null); + db.exec( + "SELECT json(caps) AS caps FROM caps WHERE sha1=? LIMIT 1", + [verData] + ).then(result -> { + for (row in result) { + final json = Json.parse(row.caps); + callback(new Caps(json.node, json.identities.map(i -> new Identity(i.category, i.type, i.name)), json.features)); + return; + } + callback(null); + }); } @HaxeCBridge.noemit - public function storeLogin(login:String, clientId:String, displayName:String, token:Null<String>) { + public function storeLogin(accountId:String, clientId:String, displayName:String, token:Null<String>) { + final params = [accountId, clientId, displayName]; final q = new StringBuf(); - q.add("INSERT INTO logins (login, client_id, display_name"); + q.add("INSERT INTO accounts (account_id, client_id, display_name"); if (token != null) { q.add(", token, fast_count"); } - q.add(") VALUES ("); - db.addValue(q, login); - q.add(","); - db.addValue(q, clientId); - q.add(","); - db.addValue(q, displayName); + q.add(") VALUES (?,?,?"); if (token != null) { - q.add(","); - db.addValue(q, token); + q.add(",?"); + params.push(token); q.add(",0"); // reset count to zero on new token } - q.add(") ON CONFLICT DO UPDATE SET client_id="); - db.addValue(q, clientId); - q.add(", display_name="); - db.addValue(q, displayName); + q.add(") ON CONFLICT DO UPDATE SET client_id=?"); + params.push(clientId); + q.add(", display_name=?"); + params.push(displayName); if (token != null) { - q.add(", token="); - db.addValue(q, token); + q.add(", token=?"); + params.push(token); q.add(", fast_count=0"); // reset count to zero on new token } - db.request(q.toString()); + db.exec(q.toString(), params); } @HaxeCBridge.noemit - public function getLogin(login:String, callback:(Null<String>, Null<String>, Int, Null<String>)->Void) { - final q = new StringBuf(); - q.add("SELECT client_id, display_name, token, fast_count FROM logins WHERE login="); - db.addValue(q, login); - q.add(" LIMIT 1"); - final result = db.request(q.toString()); - for (row in result) { - if (row.token != null) { - final update = new StringBuf(); - update.add("UPDATE logins SET fast_count=fast_count+1 WHERE login="); - db.addValue(update, login); - db.request(update.toString()); + public function getLogin(accountId:String, callback:(Null<String>, Null<String>, Int, Null<String>)->Void) { + db.exec( + "SELECT client_id, display_name, token, fast_count FROM accounts WHERE account_id=? LIMIT 1", + [accountId] + ).then(result -> { + for (row in result) { + if (row.token != null) { + db.exec("UPDATE accounts SET fast_count=fast_count+1 WHERE account_id=?", [accountId]); + } + callback(row.client_id, row.token, row.fast_count ?? 0, row.display_name); + return; } - callback(row.client_id, row.token, row.fast_count ?? 0, row.display_name); - return; - } - callback(null, null, 0, null); + callback(null, null, 0, null); + }); } @HaxeCBridge.noemit public function removeAccount(accountId:String, completely:Bool) { - var q = new StringBuf(); - q.add("DELETE FROM logins WHERE login="); - db.addValue(q, accountId); - db.request(q.toString()); - // TODO stream managemento + db.exec("DELETE FROM accounts WHERE account_id=?", [accountId]); if (!completely) return; - var q = new StringBuf(); - q.add("DELETE FROM messages WHERE account_id="); - db.addValue(q, accountId); - db.request(q.toString()); - - var q = new StringBuf(); - q.add("DELETE FROM chats WHERE account_id="); - db.addValue(q, accountId); - db.request(q.toString()); - - var q = new StringBuf(); - q.add("DELETE FROM services WHERE account_id="); - db.addValue(q, accountId); - db.request(q.toString()); + db.exec("DELETE FROM messages WHERE account_id=?", [accountId]); + db.exec("DELETE FROM chats WHERE account_id=?", [accountId]); + db.exec("DELETE FROM services WHERE account_id=?", [accountId]); } + private var smStoreInProgress = false; + private var smStoreNext: Null<BytesData> = null; @HaxeCBridge.noemit public function storeStreamManagement(accountId:String, sm:Null<BytesData>) { - // TODO + smStoreNext = sm; + if (!smStoreInProgress) { + smStoreInProgress = true; + db.exec( + "UPDATE accounts SET sm_state=? WHERE account_id=?", + [sm, accountId] + ).then(_ -> { + smStoreInProgress = false; + if (smStoreNext != sm) storeStreamManagement(accountId, sm); + }); + } } @HaxeCBridge.noemit public function getStreamManagement(accountId:String, callback: (Null<BytesData>)->Void) { - callback(null); // TODO + db.exec("SELECT sm_state FROM accounts WHERE account_id=?", [accountId]).then(result -> { + for (row in result) { + callback(row.sm_state); + return; + } + + callback(null); + }); } @HaxeCBridge.noemit public function storeService(accountId:String, serviceId:String, name:Null<String>, node:Null<String>, caps:Caps) { storeCaps(caps); - final q = new StringBuf(); - q.add("INSERT OR REPLACE INTO services VALUES ("); - db.addValue(q, accountId); - q.add(","); - db.addValue(q, serviceId); - q.add(","); - db.addValue(q, name); - q.add(","); - db.addValue(q, node); - q.add(",X"); - db.addValue(q, caps.verRaw().toHex()); - q.add(");"); - db.request(q.toString()); + db.exec( + "INSERT OR REPLACE INTO services VALUES (?,?,?,?,?)", + [accountId, serviceId, name, node, caps.verRaw().hash] + ); } @HaxeCBridge.noemit public function findServicesWithFeature(accountId:String, feature:String, callback:(Array<{serviceId:String, name:Null<String>, node:Null<String>, caps: Caps}>)->Void) { // Almost full scan shouldn't be too expensive, how many services are we aware of? - final q = new StringBuf(); - q.add("SELECT service_id, name, node, json(caps.caps) AS caps FROM services INNER JOIN caps ON services.caps=caps.sha1 WHERE account_id="); - db.addValue(q, accountId); - final result = db.request(q.toString()); - final services = []; - for (row in result) { - final json = Json.parse(row.caps); - if (json.features.contains(feature)) { - row.set("caps", new Caps(json.node, json.identities.map(i -> new Identity(i.category, i.type, i.name)), json.features)); - services.push(row); + db.exec( + "SELECT service_id, name, node, json(caps.caps) AS caps FROM services INNER JOIN caps ON services.caps=caps.sha1 WHERE account_id=?", + [accountId] + ).then(result -> { + final services = []; + for (row in result) { + final json = Json.parse(row.caps); + final features = json.features; + if (features.contains(feature)) { + row.set("caps", new Caps(json.node, json.identities.map(i -> new Identity(i.category, i.type, i.name)), features.array())); + services.push(row); + } } - } - callback(services); + callback(services); + }); + } + + private function hydrateMessage(accountId: String, row: { stanza: String, timestamp: String, direction: MessageDirection, mam_id: String, mam_by: String }) { + // TODO + final accountJid = JID.parse(accountId); + final x = ChatMessage.fromStanza(Stanza.parse(row.stanza), JID.parse(accountId)); + x.timestamp = row.timestamp; + x.direction = row.direction; + x.serverId = row.mam_id; + x.serverIdBy = row.mam_by; + return x; } } diff --git a/snikket/persistence/SqliteDriver.hx b/snikket/persistence/SqliteDriver.hx new file mode 100644 index 0000000..099d880 --- /dev/null +++ b/snikket/persistence/SqliteDriver.hx @@ -0,0 +1,46 @@ +package snikket.persistence; + +import haxe.io.Bytes; +import thenshim.Promise; +import sys.db.Connection; + +// TODO: consider doing background threads for operations +class SqliteDriver { + final db: Connection; + + public function new(dbfile: String) { + db = sys.db.Sqlite.open(dbfile); + } + + public function exec(sql: String, ?params: Array<Dynamic>) { + try { + final result = db.request(prepare(sql, params ?? [])); + return Promise.resolve(result); + } catch (e) { + return Promise.reject(e); + } + } + + private function prepare(sql:String, params: Array<Dynamic>): String { + return ~/\?/gm.map(sql, f -> { + var p = params.shift(); + return switch (Type.typeof(p)) { + case TClass(String): + db.quote(p); + case TBool: + p == true ? "1" : "0"; + case TFloat: + Std.string(p); + case TInt: + Std.string(p); + case TNull: + "NULL"; + case TClass(haxe.io.Bytes): + var bytes:Bytes = cast p; + "X'" + bytes.toHex() + "'"; + case _: + throw("UKNONWN: " + Type.typeof(p)); + } + }); + } +} diff --git a/snikket/persistence/SqliteDriver.js.hx b/snikket/persistence/SqliteDriver.js.hx new file mode 100644 index 0000000..0c185fb --- /dev/null +++ b/snikket/persistence/SqliteDriver.js.hx @@ -0,0 +1,56 @@ +package snikket.persistence; + +import haxe.io.Bytes; +import thenshim.Promise; + +typedef Promiser = (String, Dynamic) -> Promise<Dynamic>; + +@:js.import("@sqlite.org/sqlite-wasm", "sqlite3Worker1Promiser") +extern class Worker1 { + static var v2: ({ worker: () -> js.html.Worker }) -> Promise<Promiser>; +} + +class SqliteDriver { + private var sqlite: Promiser; + private var dbId: String; + + public function new(dbfile: String) { + Worker1.v2({ + worker: () -> new js.html.Worker( + untyped new js.html.URL("sqlite-worker1.mjs", untyped __js__('import.meta.url')), + untyped { type: "module" } + ) + }).then(promiser -> { + sqlite = promiser; + return sqlite("open", { filename: dbfile, vfs: "opfs-sahpool" }); + }).then(openResult -> { + dbId = openResult.dbId; + }); + } + + public function exec(sql: String, ?params: Array<Dynamic>): Promise<haxe.iterators.ArrayIterator<Dynamic>> { + if (sqlite == null || dbId == null) { + // Not ready yet + return new Promise((resolve, reject) -> haxe.Timer.delay(() -> resolve(null), 100)) + .then(_ -> exec(sql, params)); + } + + final items: Array<Dynamic> = []; + var signalAllDone; + final allDone = new Promise((resolve, reject) -> signalAllDone = resolve); + return sqlite('exec', { + dbId: dbId, + sql: sql, + bind: params, + rowMode: "object", + callback: (r) -> { + if (r.rowNumber == null) { + signalAllDone(null); + } else { + items.push(r.row); + } + null; + } + }).then(_ -> allDone).then(_ -> items.iterator()); + } +} diff --git a/snikket/persistence/sqlite-worker1.mjs b/snikket/persistence/sqlite-worker1.mjs new file mode 100644 index 0000000..b894a75 --- /dev/null +++ b/snikket/persistence/sqlite-worker1.mjs @@ -0,0 +1,38 @@ +/* + 2022-05-23 + + The author disclaims copyright to this source code. In place of a + legal notice, here is a blessing: + + * May you do good and not evil. + * May you find forgiveness for yourself and forgive others. + * May you share freely, never taking more than you give. + + *********************************************************************** + + This is a JS Worker file for the main sqlite3 api. It loads + sqlite3.js, initializes the module, and postMessage()'s a message + after the module is initialized: + + {type: 'sqlite3-api', result: 'worker1-ready'} + + This seemingly superfluous level of indirection is necessary when + loading sqlite3.js via a Worker. Instantiating a worker with new + Worker("sqlite.js") will not (cannot) call sqlite3InitModule() to + initialize the module due to a timing/order-of-operations conflict + (and that symbol is not exported in a way that a Worker loading it + that way can see it). Thus JS code wanting to load the sqlite3 + Worker-specific API needs to pass _this_ file (or equivalent) to the + Worker constructor and then listen for an event in the form shown + above in order to know when the module has completed initialization. + + This file accepts a URL arguments to adjust how it loads sqlite3.js: + + - `sqlite3.dir`, if set, treats the given directory name as the + directory from which `sqlite3.js` will be loaded. +*/ +import { default as sqlite3InitModule } from '@sqlite.org/sqlite-wasm'; +sqlite3InitModule().then(async (sqlite3) => { + await sqlite3.installOpfsSAHPoolVfs(); // enable opfs-sahpool + await sqlite3.initWorker1API(); +});