| author | Stephen Paul Weber
<singpolyma@singpolyma.net> 2025-09-16 20:40:59 UTC |
| committer | Stephen Paul Weber
<singpolyma@singpolyma.net> 2025-09-16 20:40:59 UTC |
| parent | 02446795e2744c28169b634dc92007aa53e6765a |
| HaxeCBridge.hx | +5 | -1 |
| snikket/Chat.hx | +86 | -112 |
| snikket/Client.hx | +69 | -74 |
| snikket/Persistence.hx | +18 | -17 |
| snikket/Push.hx | +1 | -1 |
| snikket/jingle/PeerConnection.cpp.hx | +1 | -2 |
| snikket/jingle/PeerConnection.js.hx | +0 | -1 |
| snikket/jingle/Session.hx | +7 | -6 |
| snikket/persistence/Dummy.hx | +35 | -34 |
| snikket/persistence/IDB.js | +156 | -190 |
| snikket/persistence/KeyValueStore.hx | +4 | -2 |
| snikket/persistence/MediaStore.hx | +3 | -2 |
| snikket/persistence/MediaStoreCache.js | +12 | -15 |
| snikket/persistence/MediaStoreFS.hx | +14 | -18 |
| snikket/persistence/Sqlite.hx | +77 | -93 |
diff --git a/HaxeCBridge.hx b/HaxeCBridge.hx index aa17655..94dbcbe 100644 --- a/HaxeCBridge.hx +++ b/HaxeCBridge.hx @@ -251,7 +251,11 @@ class HaxeCBridge { final aargs = atype.args; args.push({name: "handler", type: TPath({name: "Callable", pack: ["cpp"], params: [TPType(TFunction(aargs.concat([TPath({name: "RawPointer", pack: ["cpp"], params: [TPType(TPath({ name: "Void", pack: ["cpp"] }))]})]), TPath({name: "Void", pack: []})))]})}); promisify.push(macro v); - promisifyE.push(macro null); + if (atype.retainType == null) { + promisifyE.push(macro false); + } else { + promisifyE.push(macro null); + } if (atype.retainType == "Array") { promisify.push(macro v.length); promisifyE.push(macro 0); diff --git a/snikket/Chat.hx b/snikket/Chat.hx index 2e5dd1b..58a24bc 100644 --- a/snikket/Chat.hx +++ b/snikket/Chat.hx @@ -125,26 +125,28 @@ abstract class Chat { **/ abstract public function getMessagesAround(aroundId:Null<String>, aroundTime:Null<String>):Promise<Array<ChatMessage>>; - private function fetchFromSync(sync: MessageSync, callback: (Array<ChatMessage>)->Void) { - sync.onMessages((messageList) -> { - final chatMessages = []; - for (m in messageList.messages) { - switch (m) { + private function fetchFromSync(sync: MessageSync): Promise<Array<ChatMessage>> { + return new thenshim.Promise((resolve, reject) -> { + sync.onMessages((messageList) -> { + final chatMessages = []; + for (m in messageList.messages) { + switch (m) { case ChatMessageStanza(message): chatMessages.push(message); case ReactionUpdateStanza(update): - persistence.storeReaction(client.accountId(), update, (m)->{}); + persistence.storeReaction(client.accountId(), update); case ModerateMessageStanza(action): client.moderateMessage(action); default: // ignore + } } - } - client.storeMessages(chatMessages, (chatMessages) -> { - callback(chatMessages.filter((m) -> m != null && m.chatId() == chatId)); + client.storeMessages(chatMessages).then((chatMessages) -> { + resolve(chatMessages.filter((m) -> m != null && m.chatId() == chatId)); + }); }); + sync.fetchNext(); }); - sync.fetchNext(); } /** @@ -645,7 +647,7 @@ abstract class Chat { readUpToId = upTo; readUpToBy = upToBy; persistence.storeChats(client.accountId(), [this]); - persistence.getMessagesBefore(client.accountId(), chatId, null, null, (messages) -> { + persistence.getMessagesBefore(client.accountId(), chatId, null, null).then((messages) -> { var i = messages.length; while (--i >= 0) { if (messages[i].serverId == readUpToId || !messages[i].isIncoming()) break; @@ -664,7 +666,7 @@ abstract class Chat { return; } - persistence.getMessage(client.accountId(), chatId, readUpTo(), null, (readMessage) -> { + persistence.getMessage(client.accountId(), chatId, readUpTo(), null).then((readMessage) -> { if (readMessage != null && Reflect.compare(message.timestamp, readMessage.timestamp) <= 0) return; markReadUpToId(message.serverId, message.serverIdBy, callback); @@ -758,52 +760,39 @@ class DirectChat extends Chat { @HaxeCBridge.noemit // on superclass as abstract public function getMessagesBefore(beforeId:Null<String>, beforeTime:Null<String>):Promise<Array<ChatMessage>> { - return new thenshim.Promise((resolve, reject) -> { - persistence.getMessagesBefore(client.accountId(), chatId, beforeId, beforeTime, (messages) -> { - if (messages.length > 0) { - resolve(messages); - } else { - var filter:MAMQueryParams = { with: this.chatId }; - if (beforeId != null) filter.page = { before: beforeId }; - var sync = new MessageSync(this.client, this.stream, filter); - fetchFromSync(sync, resolve); - } - }); - }); + return persistence.getMessagesBefore(client.accountId(), chatId, beforeId, beforeTime).then((messages) -> + if (messages.length > 0) { + Promise.resolve(messages); + } else { + var filter:MAMQueryParams = { with: this.chatId }; + if (beforeId != null) filter.page = { before: beforeId }; + var sync = new MessageSync(this.client, this.stream, filter); + fetchFromSync(sync); + } + ); } @HaxeCBridge.noemit // on superclass as abstract public function getMessagesAfter(afterId:Null<String>, afterTime:Null<String>):Promise<Array<ChatMessage>> { - return new thenshim.Promise((resolve, reject) -> { - if (afterId == lastMessageId() && !syncing()) { - resolve([]); - return; + if (afterId == lastMessageId() && !syncing()) { + return Promise.resolve([]); + } + return persistence.getMessagesAfter(client.accountId(), chatId, afterId, afterTime).then((messages) -> + if (messages.length > 0) { + Promise.resolve(messages); + } else { + var filter:MAMQueryParams = { with: this.chatId }; + if (afterId != null) filter.page = { after: afterId }; + var sync = new MessageSync(this.client, this.stream, filter); + fetchFromSync(sync); } - persistence.getMessagesAfter(client.accountId(), chatId, afterId, afterTime, (messages) -> { - if (messages.length > 0) { - resolve(messages); - } else { - var filter:MAMQueryParams = { with: this.chatId }; - if (afterId != null) filter.page = { after: afterId }; - var sync = new MessageSync(this.client, this.stream, filter); - fetchFromSync(sync, resolve); - } - }); - }); + ); } @HaxeCBridge.noemit // on superclass as abstract public function getMessagesAround(aroundId:Null<String>, aroundTime:Null<String>):Promise<Array<ChatMessage>> { - return new thenshim.Promise((resolve, reject) -> { - persistence.getMessagesAround(client.accountId(), chatId, aroundId, aroundTime, (messages) -> { - if (messages.length > 0) { - resolve(messages); - } else { - // TODO - resolve([]); - } - }); - }); + // TODO: fetch more from MAM if nothing locally? + return persistence.getMessagesAround(client.accountId(), chatId, aroundId, aroundTime); } @:allow(snikket) @@ -829,7 +818,7 @@ class DirectChat extends Chat { message = prepareOutgoingMessage(message); message.versions = [message.build()]; // This is a correction message.localId = localId; - client.storeMessages([message.build()], (corrected) -> { + client.storeMessages([message.build()]).then((corrected) -> { message.versions = corrected[0].versions[corrected[0].versions.length - 1]?.localId == localId ? cast corrected[0].versions : [message.build()]; message.localId = toSendId; for (recipient in message.recipients) { @@ -853,7 +842,7 @@ class DirectChat extends Chat { final fromStanza = Message.fromStanza(message.build().asStanza(), client.jid).parsed; switch (fromStanza) { case ChatMessageStanza(_): - client.storeMessages([message.build()], (stored) -> { + client.storeMessages([message.build()]).then((stored) -> { for (recipient in message.recipients) { message.to = recipient; final stanza = message.build().asStanza(); @@ -869,7 +858,7 @@ class DirectChat extends Chat { client.notifyMessageHandlers(stored[0], stored[0].versions.length > 1 ? CorrectionEvent : DeliveryEvent); }); case ReactionUpdateStanza(update): - persistence.storeReaction(client.accountId(), update, (stored) -> { + persistence.storeReaction(client.accountId(), update).then((stored) -> { for (recipient in message.recipients) { message.to = recipient; client.sendStanza(message.build().asStanza()); @@ -905,7 +894,7 @@ class DirectChat extends Chat { } } final update = new ReactionUpdate(ID.long(), null, null, m.localId, m.chatId(), client.accountId(), Date.format(std.Date.now()), reactions, EmojiReactions); - persistence.storeReaction(client.accountId(), update, (stored) -> { + persistence.storeReaction(client.accountId(), update).then((stored) -> { final stanza = update.asStanza(); for (recipient in counterparts()) { stanza.attr.set("to", recipient); @@ -1069,7 +1058,7 @@ class Channel extends Chat { return stanza; } ); - persistence.lastId(client.accountId(), chatId, doSync); + persistence.lastId(client.accountId(), chatId).then(doSync); } private function selfPingSuccess() { @@ -1080,7 +1069,7 @@ class Channel extends Chat { // We did a self ping to see if we were in the room and found we are // But we may have missed messages if we were disconnected in the middle inSync = false; - persistence.lastId(client.accountId(), chatId, doSync); + persistence.lastId(client.accountId(), chatId).then(doSync); } @:allow(snikket) @@ -1149,9 +1138,9 @@ class Channel extends Chat { } pageChatMessages.push(message); case ReactionUpdateStanza(update): - promises.push(new thenshim.Promise((resolve, reject) -> { - persistence.storeReaction(client.accountId(), update, (_) -> resolve(null)); - })); + promises.push( + persistence.storeReaction(client.accountId(), update).then(_ -> null) + ); case ModerateMessageStanza(action): promises.push(new thenshim.Promise((resolve, reject) -> { client.moderateMessage(action).then((_) -> resolve(null)); @@ -1160,9 +1149,7 @@ class Channel extends Chat { // ignore } } - promises.push(new thenshim.Promise((resolve, reject) -> { - client.storeMessages(pageChatMessages, resolve); - })); + promises.push(client.storeMessages(pageChatMessages)); thenshim.PromiseTools.all(promises).then((stored) -> { for (messages in stored) { if (messages != null) { @@ -1291,62 +1278,49 @@ class Channel extends Chat { @HaxeCBridge.noemit // on superclass as abstract public function getMessagesBefore(beforeId:Null<String>, beforeTime:Null<String>):Promise<Array<ChatMessage>> { - return new thenshim.Promise((resolve, reject) -> { - persistence.getMessagesBefore(client.accountId(), chatId, beforeId, beforeTime, (messages) -> { - if (messages.length > 0) { - resolve(messages); - } else { - var filter:MAMQueryParams = {}; - if (beforeId != null) filter.page = { before: beforeId }; - var sync = new MessageSync(this.client, this.stream, filter, chatId); - sync.addContext((builder, stanza) -> { - builder = prepareIncomingMessage(builder, stanza); - builder.syncPoint = false; - return builder; - }); - fetchFromSync(sync, resolve); - } - }); - }); + return persistence.getMessagesBefore(client.accountId(), chatId, beforeId, beforeTime).then((messages) -> + if (messages.length > 0) { + Promise.resolve(messages); + } else { + var filter:MAMQueryParams = {}; + if (beforeId != null) filter.page = { before: beforeId }; + var sync = new MessageSync(this.client, this.stream, filter, chatId); + sync.addContext((builder, stanza) -> { + builder = prepareIncomingMessage(builder, stanza); + builder.syncPoint = false; + return builder; + }); + fetchFromSync(sync); + } + ); } @HaxeCBridge.noemit // on superclass as abstract public function getMessagesAfter(afterId:Null<String>, afterTime:Null<String>):Promise<Array<ChatMessage>> { - return new thenshim.Promise((resolve, reject) -> { - if (afterId == lastMessageId() && !syncing()) { - resolve([]); - return; + if (afterId == lastMessageId() && !syncing()) { + return Promise.resolve([]); + } + return persistence.getMessagesAfter(client.accountId(), chatId, afterId, afterTime).then((messages) -> + if (messages.length > 0) { + Promise.resolve(messages); + } else { + var filter:MAMQueryParams = {}; + if (afterId != null) filter.page = { after: afterId }; + var sync = new MessageSync(this.client, this.stream, filter, chatId); + sync.addContext((builder, stanza) -> { + builder = prepareIncomingMessage(builder, stanza); + builder.syncPoint = false; + return builder; + }); + fetchFromSync(sync); } - persistence.getMessagesAfter(client.accountId(), chatId, afterId, afterTime, (messages) -> { - if (messages.length > 0) { - resolve(messages); - } else { - var filter:MAMQueryParams = {}; - if (afterId != null) filter.page = { after: afterId }; - var sync = new MessageSync(this.client, this.stream, filter, chatId); - sync.addContext((builder, stanza) -> { - builder = prepareIncomingMessage(builder, stanza); - builder.syncPoint = false; - return builder; - }); - fetchFromSync(sync, resolve); - } - }); - }); + ); } @HaxeCBridge.noemit // on superclass as abstract public function getMessagesAround(aroundId:Null<String>, aroundTime:Null<String>):Promise<Array<ChatMessage>> { - return new thenshim.Promise((resolve, reject) -> { - persistence.getMessagesAround(client.accountId(), chatId, aroundId, aroundTime, (messages) -> { - if (messages.length > 0) { - resolve(messages); - } else { - // TODO - resolve([]); - } - }); - }); + // TODO: fetch more from MAM if nothing locally + return persistence.getMessagesAround(client.accountId(), chatId, aroundId, aroundTime); } @:allow(snikket) @@ -1379,7 +1353,7 @@ class Channel extends Chat { message = prepareOutgoingMessage(message); message.versions = [message.build()]; // This is a correction message.localId = localId; - client.storeMessages([message.build()], (corrected) -> { + client.storeMessages([message.build()]).then((corrected) -> { message.versions = corrected[0].versions[0]?.localId == localId ? cast corrected[0].versions : [message.build()]; message.localId = toSendId; client.sendStanza(message.build().asStanza()); @@ -1408,14 +1382,14 @@ class Channel extends Chat { activeThread = message.threadId; stanza.tag("active", { xmlns: "http://jabber.org/protocol/chatstates" }).up(); } - client.storeMessages([message.build()], (stored) -> { + client.storeMessages([message.build()]).then((stored) -> { client.sendStanza(stanza); setLastMessage(stored[0]); client.notifyMessageHandlers(stored[0], stored[0].versions.length > 1 ? CorrectionEvent : DeliveryEvent); client.trigger("chats/update", [this]); }); case ReactionUpdateStanza(update): - persistence.storeReaction(client.accountId(), update, (stored) -> { + persistence.storeReaction(client.accountId(), update).then((stored) -> { client.sendStanza(stanza); if (stored != null) client.notifyMessageHandlers(stored, ReactionEvent); }); @@ -1446,7 +1420,7 @@ class Channel extends Chat { } } final update = new ReactionUpdate(ID.long(), m.serverId, m.chatId(), null, m.chatId(), getFullJid().asString(), Date.format(std.Date.now()), reactions, EmojiReactions); - persistence.storeReaction(client.accountId(), update, (stored) -> { + persistence.storeReaction(client.accountId(), update).then((stored) -> { final stanza = update.asStanza(); stanza.attr.set("to", chatId); client.sendStanza(stanza); diff --git a/snikket/Client.hx b/snikket/Client.hx index a5373dd..5663046 100644 --- a/snikket/Client.hx +++ b/snikket/Client.hx @@ -5,6 +5,7 @@ import sha.SHA256; import haxe.crypto.Base64; import haxe.io.Bytes; import haxe.io.BytesData; +import thenshim.Promise; import snikket.jingle.IceServer; import snikket.jingle.PeerConnection; import snikket.Caps; @@ -122,9 +123,8 @@ class Client extends EventEmitter { persistence.updateMessageStatus( accountId(), data.id, - MessageDeliveredToServer, - (m) -> notifyMessageHandlers(m, StatusEvent) - ); + MessageDeliveredToServer + ).then((m) -> notifyMessageHandlers(m, StatusEvent), _ -> null); return EventHandled; }); @@ -132,9 +132,8 @@ class Client extends EventEmitter { persistence.updateMessageStatus( accountId(), data.id, - MessageFailedToSend, - (m) -> notifyMessageHandlers(m, StatusEvent) - ); + MessageFailedToSend + ).then((m) -> notifyMessageHandlers(m, StatusEvent), _ -> null); return EventHandled; }); @@ -187,14 +186,14 @@ class Client extends EventEmitter { if (chatMessage.serverId == null) { updateChat(chatMessage); } else { - storeMessages([chatMessage], (stored) -> updateChat(stored[0])); + storeMessages([chatMessage]).then((stored) -> updateChat(stored[0])); } } case ReactionUpdateStanza(update): for (hash in update.inlineHashReferences()) { fetchMediaByHash([hash], [from]); } - persistence.storeReaction(accountId(), update, (stored) -> if (stored != null) notifyMessageHandlers(stored, ReactionEvent)); + persistence.storeReaction(accountId(), update).then((stored) -> if (stored != null) notifyMessageHandlers(stored, ReactionEvent)); case ModerateMessageStanza(action): moderateMessage(action).then((stored) -> if (stored != null) notifyMessageHandlers(stored, CorrectionEvent)); default: @@ -293,7 +292,7 @@ class Client extends EventEmitter { final chat = this.getDirectChat(JID.parse(pubsubEvent.getFrom()).asBare().asString(), false); chat.setAvatarSha1(avatarSha1); persistence.storeChats(accountId(), [chat]); - persistence.hasMedia("sha-1", avatarSha1, (has) -> { + persistence.hasMedia("sha-1", avatarSha1).then((has) -> { if (has) { this.trigger("chats/update", [chat]); } else { @@ -303,7 +302,7 @@ class Client extends EventEmitter { if (item == null) return; final dataNode = item.getChild("data", "urn:xmpp:avatar:data"); if (dataNode == null) return; - persistence.storeMedia(mime, Base64.decode(StringTools.replace(dataNode.getText(), "\n", "")).getData(), () -> { + persistence.storeMedia(mime, Base64.decode(StringTools.replace(dataNode.getText(), "\n", "")).getData()).then(_ -> { this.trigger("chats/update", [chat]); }); }); @@ -479,7 +478,7 @@ class Client extends EventEmitter { return chat; }; - persistence.getCaps(c.attr.get("ver"), (caps) -> { + persistence.getCaps(c.attr.get("ver")).then((caps) -> { if (caps == null) { final pending = pendingCaps.get(c.attr.get("ver")); if (pending == null) { @@ -511,7 +510,7 @@ class Client extends EventEmitter { final avatarSha1 = Hash.fromHex("sha-1", avatarSha1Hex)?.hash; chat.setAvatarSha1(avatarSha1); persistence.storeChats(accountId(), [chat]); - persistence.hasMedia("sha-1", avatarSha1, (has) -> { + persistence.hasMedia("sha-1", avatarSha1).then((has) -> { if (has) { if (chat.livePresence()) this.trigger("chats/update", [chat]); } else { @@ -519,7 +518,7 @@ class Client extends EventEmitter { vcardGet.onFinished(() -> { final vcard = vcardGet.getResult(); if (vcard.photo == null) return; - persistence.storeMedia(vcard.photo.mime, vcard.photo.data.getData(), () -> { + persistence.storeMedia(vcard.photo.mime, vcard.photo.data.getData()).then(_ -> { this.trigger("chats/update", [chat]); }); }); @@ -551,65 +550,65 @@ class Client extends EventEmitter { Start this client running and trying to connect to the server **/ public function start() { - startOffline(() -> { - persistence.getStreamManagement(accountId(), (sm) -> { - stream.on("auth/password-needed", (data) -> { - fastMechanism = data.mechanisms?.find((mech) -> mech.canFast)?.name; - if (token == null || (fastMechanism == null && data.mechanimsms != null)) { - this.trigger("auth/password-needed", { accountId: accountId() }); - } else { - this.stream.trigger("auth/password", { password: token, mechanism: fastMechanism, fastCount: fastCount }); - } - }); - stream.on("auth/fail", (data) -> { - if (token != null) { - token = null; - stream.connect(jid.asString(), sm); - } else { - stream.connect(jid.asString(), sm); - } - return EventHandled; - }); - stream.connect(jid.asString(), sm); + startOffline().then(_ -> + persistence.getStreamManagement(accountId()) + ).then((sm) -> { + stream.on("auth/password-needed", (data) -> { + fastMechanism = data.mechanisms?.find((mech) -> mech.canFast)?.name; + if (token == null || (fastMechanism == null && data.mechanimsms != null)) { + this.trigger("auth/password-needed", { accountId: accountId() }); + } else { + this.stream.trigger("auth/password", { password: token, mechanism: fastMechanism, fastCount: fastCount }); + } + }); + stream.on("auth/fail", (data) -> { + if (token != null) { + token = null; + stream.connect(jid.asString(), sm); + } else { + stream.connect(jid.asString(), sm); + } + return EventHandled; }); + stream.connect(jid.asString(), sm); }); } /** Gets the client ready to use but does not connect to the server **/ - public function startOffline(ready: ()->Void) { + public function startOffline(): Promise<Bool> { #if cpp // Do a big GC before starting a new client cpp.NativeGc.run(true); #end - persistence.getLogin(accountId(), (clientId, loadedToken, loadedFastCount, displayName) -> { - token = loadedToken; - fastCount = loadedFastCount; - stream.clientId = clientId ?? ID.long(); + return persistence.getLogin(accountId()).then(login -> { + token = login.token; + fastCount = login.fastCount; + stream.clientId = login.clientId ?? ID.long(); jid = jid.withResource(stream.clientId); - if (!updateDisplayName(displayName) && clientId == null) { + if (!updateDisplayName(login.displayName) && login.clientId == null) { persistence.storeLogin(jid.asBare().asString(), stream.clientId, this.displayName(), null); } - persistence.getChats(accountId(), (protoChats) -> { - var oneProtoChat = null; - while ((oneProtoChat = protoChats.pop()) != null) { - chats.push(oneProtoChat.toChat(this, stream, persistence)); + return persistence.getChats(accountId()); + }).then((protoChats) -> { + var oneProtoChat = null; + while ((oneProtoChat = protoChats.pop()) != null) { + chats.push(oneProtoChat.toChat(this, stream, persistence)); + } + return persistence.getChatsUnreadDetails(accountId(), chats); + }).then((details) -> { + for (detail in details) { + var chat = getChat(detail.chatId); + if (chat != null) { + chat.setLastMessage(detail.message); + chat.setUnreadCount(detail.unreadCount); } - persistence.getChatsUnreadDetails(accountId(), chats, (details) -> { - for (detail in details) { - var chat = getChat(detail.chatId); - if (chat != null) { - chat.setLastMessage(detail.message); - chat.setUnreadCount(detail.unreadCount); - } - } - sortChats(); - this.trigger("chats/update", chats); - ready(); - }); - }); + } + sortChats(); + this.trigger("chats/update", chats); + true; }); } @@ -713,7 +712,7 @@ class Client extends EventEmitter { trace("SYNC: details"); inSync = true; - persistence.getChatsUnreadDetails(accountId(), chats, (details) -> { + persistence.getChatsUnreadDetails(accountId(), chats).then((details) -> { for (detail in details) { var chat = getChat(detail.chatId) ?? getDirectChat(detail.chatId, false); final initialLastId = chat.lastMessageId(); @@ -751,7 +750,7 @@ class Client extends EventEmitter { Turn a file into a ChatAttachment for attaching to a ChatMessage **/ public function prepareAttachment(source: AttachmentSource, callback: (Null<ChatAttachment>)->Void) { - persistence.findServicesWithFeature(accountId(), "urn:xmpp:http:upload:0", (services) -> { + persistence.findServicesWithFeature(accountId(), "urn:xmpp:http:upload:0").then((services) -> { final sha256 = new sha.SHA256(); source.tinkSource().chunked().forEach((chunk) -> { sha256.update(chunk); @@ -928,7 +927,7 @@ class Client extends EventEmitter { @:allow(snikket) private function moderateMessage(action: ModerationAction): Promise<Null<ChatMessage>> { return new thenshim.Promise((resolve, reject) -> - persistence.getMessage(accountId(), action.chatId, action.moderateServerId, null, (moderateMessage) -> { + persistence.getMessage(accountId(), action.chatId, action.moderateServerId, null).then((moderateMessage) -> { if (moderateMessage == null) return resolve(null); for(attachment in moderateMessage.attachments) { for(hash in attachment.hashes) { @@ -1242,19 +1241,17 @@ class Client extends EventEmitter { private function fetchMediaByHashOneCounterpart(hashes: Array<Hash>, counterpart: JID) { if (hashes.length < 1) return thenshim.Promise.reject("no hashes left"); - return new thenshim.Promise((resolve, reject) -> - persistence.hasMedia(hashes[0].algorithm, hashes[0].hash, resolve) - ).then (has -> { - if (has) return thenshim.Promise.resolve(null); + return persistence.hasMedia(hashes[0].algorithm, hashes[0].hash).then (has -> { + if (has) return Promise.resolve(null); - return new thenshim.Promise((resolve, reject) -> { + return new Promise((resolve, reject) -> { final q = BoB.forHash(counterpart.asString(), hashes[0]); q.onFinished(() -> { final r = q.getResult(); if (r == null) { reject("bad or no result from BoB query"); } else { - persistence.storeMedia(r.type, r.bytes.getData(), () -> resolve(null)); + persistence.storeMedia(r.type, r.bytes.getData()).then(_ -> resolve(null)); } }); sendQuery(q); @@ -1288,8 +1285,8 @@ class Client extends EventEmitter { } @:allow(snikket) - private function storeMessages(messages: Array<ChatMessage>, ?callback: Null<(Array<ChatMessage>)->Void>) { - persistence.storeMessages(accountId(), messages, callback ?? (_)->{}); + private function storeMessages(messages: Array<ChatMessage>): Promise<Null<Array<ChatMessage>>> { + return persistence.storeMessages(accountId(), messages); } @:allow(snikket) @@ -1490,7 +1487,7 @@ class Client extends EventEmitter { if (Std.isOfType(persistence, snikket.persistence.Dummy)) { callback(true); // No reason to sync if we're not storing anyway } else { - persistence.lastId(accountId(), null, (lastId) -> doSync(callback, lastId)); + persistence.lastId(accountId(), null).then((lastId) -> doSync(callback, lastId)); } } @@ -1531,9 +1528,9 @@ class Client extends EventEmitter { chatMessages.push(message); if (message.type == MessageChat) chatIds[message.chatId()] = true; case ReactionUpdateStanza(update): - promises.push(new thenshim.Promise((resolve, reject) -> { - persistence.storeReaction(accountId(), update, (_) -> resolve(null)); - })); + promises.push( + persistence.storeReaction(accountId(), update).then(_ -> null) + ); case ModerateMessageStanza(action): promises.push(new thenshim.Promise((resolve, reject) -> { moderateMessage(action).then((_) -> resolve(null)); @@ -1542,9 +1539,7 @@ class Client extends EventEmitter { // ignore } } - promises.push(new thenshim.Promise((resolve, reject) -> { - persistence.storeMessages(accountId(), chatMessages, resolve); - })); + promises.push(persistence.storeMessages(accountId(), chatMessages)); trace("SYNC: MAM page wait for writes"); thenshim.PromiseTools.all(promises).then((results) -> { if (syncMessageHandlers.length > 0) { diff --git a/snikket/Persistence.hx b/snikket/Persistence.hx index 18b774b..5948867 100644 --- a/snikket/Persistence.hx +++ b/snikket/Persistence.hx @@ -4,36 +4,37 @@ import haxe.io.BytesData; import snikket.Chat; import snikket.ChatMessage; import snikket.Message; +import thenshim.Promise; #if cpp @:build(HaxeSwiftBridge.expose()) #end interface Persistence { - public function lastId(accountId: String, chatId: Null<String>, callback:(serverId:Null<String>)->Void):Void; + public function lastId(accountId: String, chatId: Null<String>): Promise<Null<String>>; public function storeChats(accountId: String, chats: Array<Chat>):Void; - public function getChats(accountId: String, callback: (chats:Array<SerializedChat>)->Void):Void; + public function getChats(accountId: String): Promise<Array<SerializedChat>>; @HaxeCBridge.noemit - public function getChatsUnreadDetails(accountId: String, chats: Array<Chat>, callback: (details:Array<{ chatId: String, message: ChatMessage, unreadCount: Int }>)->Void):Void; - public function storeReaction(accountId: String, update: ReactionUpdate, callback: (Null<ChatMessage>)->Void):Void; - public function storeMessages(accountId: String, message: Array<ChatMessage>, callback: (Array<ChatMessage>)->Void):Void; + public function getChatsUnreadDetails(accountId: String, chats: Array<Chat>): Promise<Array<{ chatId: String, message: ChatMessage, unreadCount: Int }>>; + public function storeReaction(accountId: String, update: ReactionUpdate): Promise<Null<ChatMessage>>; + public function storeMessages(accountId: String, message: Array<ChatMessage>): Promise<Array<ChatMessage>>; public function updateMessage(accountId: String, message: ChatMessage):Void; - public function updateMessageStatus(accountId: String, localId: String, status:MessageStatus, callback: (ChatMessage)->Void):Void; - public function getMessage(accountId: String, chatId: String, serverId: Null<String>, localId: Null<String>, callback: (Null<ChatMessage>)->Void):Void; - public function getMessagesBefore(accountId: String, chatId: String, beforeId: Null<String>, beforeTime: Null<String>, callback: (messages:Array<ChatMessage>)->Void):Void; - public function getMessagesAfter(accountId: String, chatId: String, afterId: Null<String>, afterTime: Null<String>, callback: (messages:Array<ChatMessage>)->Void):Void; - public function getMessagesAround(accountId: String, chatId: String, aroundId: Null<String>, aroundTime: Null<String>, callback: (messages:Array<ChatMessage>)->Void):Void; - public function hasMedia(hashAlgorithm:String, hash:BytesData, callback: (has:Bool)->Void):Void; - public function storeMedia(mime:String, bytes:BytesData, callback: ()->Void):Void; + public function updateMessageStatus(accountId: String, localId: String, status:MessageStatus): Promise<ChatMessage>; + public function getMessage(accountId: String, chatId: String, serverId: Null<String>, localId: Null<String>): Promise<Null<ChatMessage>>; + public function getMessagesBefore(accountId: String, chatId: String, beforeId: Null<String>, beforeTime: Null<String>): Promise<Array<ChatMessage>>; + public function getMessagesAfter(accountId: String, chatId: String, afterId: Null<String>, afterTime: Null<String>): Promise<Array<ChatMessage>>; + public function getMessagesAround(accountId: String, chatId: String, aroundId: Null<String>, aroundTime: Null<String>): Promise<Array<ChatMessage>>; + public function hasMedia(hashAlgorithm:String, hash:BytesData): Promise<Bool>; + public function storeMedia(mime:String, bytes:BytesData): Promise<Bool>; public function removeMedia(hashAlgorithm:String, hash:BytesData):Void; public function storeCaps(caps:Caps):Void; - public function getCaps(ver:String, callback: (Null<Caps>)->Void):Void; + public function getCaps(ver:String): Promise<Null<Caps>>; public function storeLogin(login:String, clientId:String, displayName:String, token:Null<String>):Void; - public function getLogin(login:String, callback:(clientId:Null<String>, token:Null<String>, fastCount: Int, displayName:Null<String>)->Void):Void; + public function getLogin(login:String): Promise<{ clientId:Null<String>, token:Null<String>, fastCount: Int, displayName:Null<String> }>; public function removeAccount(accountId: String, completely:Bool):Void; - public function listAccounts(callback:(Array<String>)->Void):Void; + public function listAccounts(): Promise<Array<String>>; public function storeStreamManagement(accountId:String, data:Null<BytesData>):Void; - public function getStreamManagement(accountId:String, callback: (Null<BytesData>)->Void):Void; + public function getStreamManagement(accountId:String): Promise<Null<BytesData>>; public function storeService(accountId:String, serviceId:String, name:Null<String>, node:Null<String>, caps:Caps):Void; @HaxeCBridge.noemit - public function findServicesWithFeature(accountId:String, feature:String, callback:(Array<{serviceId:String, name:Null<String>, node:Null<String>, caps: Caps}>)->Void):Void; + public function findServicesWithFeature(accountId:String, feature:String): Promise<Array<{serviceId:String, name:Null<String>, node:Null<String>, caps: Caps}>>; } diff --git a/snikket/Push.hx b/snikket/Push.hx index 7b22cb9..f416100 100644 --- a/snikket/Push.hx +++ b/snikket/Push.hx @@ -31,7 +31,7 @@ class Push { // Assume incoming message final message = ChatMessage.fromStanza(stanza, JID.parse(stanza.attr.get("to")).asBare()); if (message != null) { - persistence.storeMessages(message.account(), [message], (_)->{}); + persistence.storeMessages(message.account(), [message]); return Notification.fromChatMessage(message); } else { return Notification.fromThinStanza(stanza); diff --git a/snikket/jingle/PeerConnection.cpp.hx b/snikket/jingle/PeerConnection.cpp.hx index 6c37e3c..14d8866 100644 --- a/snikket/jingle/PeerConnection.cpp.hx +++ b/snikket/jingle/PeerConnection.cpp.hx @@ -1,5 +1,6 @@ package snikket.jingle; +import thenshim.Promise; import snikket.ID; import HaxeCBridge; using Lambda; @@ -972,5 +973,3 @@ class PeerConnection { if (event == "connectionstatechange") stateChangeListeners.push(callback); } } - -typedef Promise<T> = thenshim.Promise<T>; diff --git a/snikket/jingle/PeerConnection.js.hx b/snikket/jingle/PeerConnection.js.hx index d6773d7..dd5f3d1 100644 --- a/snikket/jingle/PeerConnection.js.hx +++ b/snikket/jingle/PeerConnection.js.hx @@ -8,7 +8,6 @@ extern class PeerConnection extends js.html.rtc.PeerConnection { } typedef SdpType = js.html.rtc.SdpType; -typedef Promise<T> = js.lib.Promise<T>; typedef MediaStream = js.html.MediaStream; typedef MediaStreamTrack = js.html.MediaStreamTrack; typedef DTMFSender = js.html.rtc.DTMFSender; diff --git a/snikket/jingle/Session.hx b/snikket/jingle/Session.hx index f2ef059..2371c59 100644 --- a/snikket/jingle/Session.hx +++ b/snikket/jingle/Session.hx @@ -1,5 +1,6 @@ package snikket.jingle; +import thenshim.Promise; import snikket.ChatMessage; import snikket.Message; import snikket.ID; @@ -86,7 +87,7 @@ class IncomingProposedSession implements Session { // Store it for ourselves at least final event = new Stanza("ringing", { xmlns: "urn:xmpp:jingle-message:0", id: sid }); final msg = mkCallMessage(from, client.jid, event); - client.storeMessages([msg], (stored) -> { + client.storeMessages([msg]).then((stored) -> { client.notifyMessageHandlers(stored[0], CorrectionEvent); }); client.trigger("call/ring", { session: this }); @@ -98,7 +99,7 @@ class IncomingProposedSession implements Session { // Store it for ourselves at least final event = new Stanza("reject", { xmlns: "urn:xmpp:jingle-message:0", id: sid }); final msg = mkCallMessage(from, client.jid, event); - client.storeMessages([msg], (stored) -> { + client.storeMessages([msg]).then((stored) -> { client.notifyMessageHandlers(stored[0], CorrectionEvent); }); client.getDirectChat(from.asBare().asString(), false).jingleSessions.remove(sid); @@ -132,7 +133,7 @@ class IncomingProposedSession implements Session { client.sendPresence(from.asString()); final event = new Stanza("proceed", { xmlns: "urn:xmpp:jingle-message:0", id: sid }); final msg = mkCallMessage(from, client.jid, event); - client.storeMessages([msg], (stored) -> { + client.storeMessages([msg]).then((stored) -> { client.notifyMessageHandlers(stored[0], CorrectionEvent); client.sendStanza( new Stanza("message", { to: from.asString(), type: "chat", id: msg.versions[0].localId }) @@ -203,7 +204,7 @@ class OutgoingProposedSession implements Session { event.tag("description", { xmlns: "urn:xmpp:jingle:apps:rtp:1", media: "video" }).up(); } final msg = mkCallMessage(to, client.jid, event); - client.storeMessages([msg], (stored) -> { + client.storeMessages([msg]).then((stored) -> { final stanza = new Stanza("message", { to: to.asString(), type: "chat", id: msg.localId }) .addChild(event) .tag("store", { xmlns: "urn:xmpp:hints" }); @@ -220,7 +221,7 @@ class OutgoingProposedSession implements Session { public function hangup() { final event = new Stanza("retract", { xmlns: "urn:xmpp:jingle-message:0", id: sid }); final msg = mkCallMessage(to, client.jid, event); - client.storeMessages([msg], (stored) -> { + client.storeMessages([msg]).then((stored) -> { client.sendStanza( new Stanza("message", { to: to.asString(), type: "chat", id: msg.versions[0].localId }) .addChild(event) @@ -392,7 +393,7 @@ class InitiatedSession implements Session { final event = new Stanza("finish", { xmlns: "urn:xmpp:jingle-message:0", id: sid }); final msg = mkCallMessage(counterpart, client.jid, event); - client.storeMessages([msg], (stored) -> { + client.storeMessages([msg]).then((stored) -> { client.notifyMessageHandlers(stored[0], CorrectionEvent); client.sendStanza( new Stanza("message", { to: counterpart.asString(), type: "chat", id: msg.versions[0].localId }) diff --git a/snikket/persistence/Dummy.hx b/snikket/persistence/Dummy.hx index 39ba123..0bb991d 100644 --- a/snikket/persistence/Dummy.hx +++ b/snikket/persistence/Dummy.hx @@ -7,6 +7,7 @@ import haxe.io.BytesData; import snikket.Caps; import snikket.Chat; import snikket.Message; +import thenshim.Promise; // TODO: consider doing background threads for operations @@ -24,21 +25,21 @@ class Dummy implements Persistence { public function new() { } @HaxeCBridge.noemit - public function lastId(accountId: String, chatId: Null<String>, callback:(Null<String>)->Void):Void { - callback(null); + public function lastId(accountId: String, chatId: Null<String>): Promise<Null<String>> { + return Promise.resolve(null); } @HaxeCBridge.noemit public function storeChats(accountId: String, chat: Array<Chat>) { } @HaxeCBridge.noemit - public function getChats(accountId: String, callback: (Array<SerializedChat>)->Void) { - callback([]); + public function getChats(accountId: String): Promise<Array<SerializedChat>> { + return Promise.resolve([]); } @HaxeCBridge.noemit - public function storeMessages(accountId: String, messages: Array<ChatMessage>, callback: (Array<ChatMessage>)->Void) { - callback(messages); + public function storeMessages(accountId: String, messages: Array<ChatMessage>): Promise<Array<ChatMessage>> { + return Promise.resolve(messages); } @HaxeCBridge.noemit @@ -46,38 +47,38 @@ class Dummy implements Persistence { } @HaxeCBridge.noemit - public function getMessage(accountId: String, chatId: String, serverId: Null<String>, localId: Null<String>, callback: (Null<ChatMessage>)->Void) { - callback(null); + public function getMessage(accountId: String, chatId: String, serverId: Null<String>, localId: Null<String>): Promise<Null<ChatMessage>> { + return Promise.resolve(null); } @HaxeCBridge.noemit - public function getMessagesBefore(accountId: String, chatId: String, beforeId: Null<String>, beforeTime: Null<String>, callback: (Array<ChatMessage>)->Void) { - callback([]); + public function getMessagesBefore(accountId: String, chatId: String, beforeId: Null<String>, beforeTime: Null<String>): Promise<Array<ChatMessage>> { + return Promise.resolve([]); } @HaxeCBridge.noemit - public function getMessagesAfter(accountId: String, chatId: String, afterId: Null<String>, afterTime: Null<String>, callback: (Array<ChatMessage>)->Void) { - callback([]); + public function getMessagesAfter(accountId: String, chatId: String, afterId: Null<String>, afterTime: Null<String>): Promise<Array<ChatMessage>> { + return Promise.resolve([]); } @HaxeCBridge.noemit - public function getMessagesAround(accountId: String, chatId: String, aroundId: Null<String>, aroundTime: Null<String>, callback: (Array<ChatMessage>)->Void) { - callback([]); + public function getMessagesAround(accountId: String, chatId: String, aroundId: Null<String>, aroundTime: Null<String>): Promise<Array<ChatMessage>> { + return Promise.resolve([]); } @HaxeCBridge.noemit - public function getChatsUnreadDetails(accountId: String, chats: Array<Chat>, callback: (Array<{ chatId: String, message: ChatMessage, unreadCount: Int }>)->Void) { - callback([]); + public function getChatsUnreadDetails(accountId: String, chats: Array<Chat>): Promise<Array<{ chatId: String, message: ChatMessage, unreadCount: Int }>> { + return Promise.resolve([]); } @HaxeCBridge.noemit - public function storeReaction(accountId: String, update: ReactionUpdate, callback: (Null<ChatMessage>)->Void) { - callback(null); + public function storeReaction(accountId: String, update: ReactionUpdate): Promise<Null<ChatMessage>> { + return Promise.resolve(null); } @HaxeCBridge.noemit - public function updateMessageStatus(accountId: String, localId: String, status:MessageStatus, callback: (ChatMessage)->Void) { - callback(null); + public function updateMessageStatus(accountId: String, localId: String, status:MessageStatus): Promise<ChatMessage> { + return Promise.reject("Dummy cannot updateMessageStatus"); } @HaxeCBridge.noemit @@ -86,13 +87,13 @@ class Dummy implements Persistence { } @HaxeCBridge.noemit - public function hasMedia(hashAlgorithm:String, hash:BytesData, callback: (Bool)->Void) { - callback(false); + public function hasMedia(hashAlgorithm:String, hash:BytesData): Promise<Bool> { + return Promise.resolve(false); } @HaxeCBridge.noemit - public function storeMedia(mime:String, bd:BytesData, callback: ()->Void) { - callback(); + public function storeMedia(mime:String, bd:BytesData): Promise<Bool> { + return Promise.resolve(false); } @HaxeCBridge.noemit @@ -103,39 +104,39 @@ class Dummy implements Persistence { public function storeCaps(caps:Caps) { } @HaxeCBridge.noemit - public function getCaps(ver:String, callback: (Caps)->Void) { - callback(null); + public function getCaps(ver:String): Promise<Caps> { + return Promise.resolve(null); } @HaxeCBridge.noemit public function storeLogin(login:String, clientId:String, displayName:String, token:Null<String>) { } @HaxeCBridge.noemit - public function getLogin(login:String, callback:(Null<String>, Null<String>, Int, Null<String>)->Void) { - callback(null, null, 0, null); + public function getLogin(login:String): Promise<{ clientId:Null<String>, token:Null<String>, fastCount: Int, displayName:Null<String> }> { + return Promise.resolve({ clientId: null, token: null, fastCount: 0, displayName: null }); } @HaxeCBridge.noemit public function removeAccount(accountId:String, completely:Bool) { } @HaxeCBridge.noemit - public function listAccounts(callback:(Array<String>)->Void) { - callback([]); + public function listAccounts(): Promise<Array<String>> { + return Promise.resolve([]); } @HaxeCBridge.noemit public function storeStreamManagement(accountId:String, sm:Null<BytesData>) { } @HaxeCBridge.noemit - public function getStreamManagement(accountId:String, callback: (Null<BytesData>)->Void) { - callback(null); + public function getStreamManagement(accountId:String): Promise<Null<BytesData>> { + return Promise.resolve(null); } @HaxeCBridge.noemit public function storeService(accountId:String, serviceId:String, name:Null<String>, node:Null<String>, caps:Caps) { } @HaxeCBridge.noemit - public function findServicesWithFeature(accountId:String, feature:String, callback:(Array<{serviceId:String, name:Null<String>, node:Null<String>, caps: Caps}>)->Void) { - callback([]); + public function findServicesWithFeature(accountId:String, feature:String): Promise<Array<{serviceId:String, name:Null<String>, node:Null<String>, caps: Caps}>> { + return Promise.resolve([]); } } diff --git a/snikket/persistence/IDB.js b/snikket/persistence/IDB.js index b64f137..80eabd9 100644 --- a/snikket/persistence/IDB.js +++ b/snikket/persistence/IDB.js @@ -195,7 +195,7 @@ export default (dbname, media, tokenize, stemmer) => { } const obj = { - lastId: function(account, jid, callback) { + lastId: async function(account, jid) { const tx = db.transaction(["messages"], "readonly"); const store = tx.objectStore("messages"); var cursor = null; @@ -210,17 +210,14 @@ export default (dbname, media, tokenize, stemmer) => { "prev" ); } - cursor.onsuccess = (event) => { - if (!event.target.result || (event.target.result.value.syncPoint && event.target.result.value.serverId && (jid || event.target.result.value.serverIdBy === account))) { - callback(event.target.result ? event.target.result.value.serverId : null); + while (true) { + const result = await promisifyRequest(cursor); + if (!result || (result.value.syncPoint && result.value.serverId && (jid || result.value.serverIdBy === account))) { + return result ? result.value.serverId : null; } else { - event.target.result.continue(); + result.continue(); } } - cursor.onerror = (event) => { - console.error(event); - callback(null); - } }, storeChats: function(account, chats) { @@ -247,35 +244,33 @@ export default (dbname, media, tokenize, stemmer) => { } }, - getChats: function(account, callback) { - (async () => { - const tx = db.transaction(["chats"], "readonly"); - const store = tx.objectStore("chats"); - const range = IDBKeyRange.bound([account], [account, []]); - const result = await promisifyRequest(store.getAll(range)); - return await Promise.all(result.map(async (r) => new snikket.SerializedChat( - r.chatId, - r.trusted, - r.avatarSha1, - new Map(await Promise.all((r.presence instanceof Map ? [...r.presence.entries()] : Object.entries(r.presence)).map( - async ([k, p]) => [k, new snikket.Presence(p.caps && await new Promise((resolve) => this.getCaps(p.caps, resolve)), p.mucUser && snikket.Stanza.parse(p.mucUser))] - ))), - r.displayName, - r.uiState, - r.isBlocked, - r.extensions, - r.readUpToId, - r.readUpToBy, - r.notificationSettings === undefined ? null : r.notificationSettings != null, - r.notificationSettings?.mention, - r.notificationSettings?.reply, - r.disco ? new snikket.Caps(r.disco.node, r.disco.identities, r.disco.features) : null, - r.class - ))); - })().then(callback); + getChats: async function(account) { + const tx = db.transaction(["chats"], "readonly"); + const store = tx.objectStore("chats"); + const range = IDBKeyRange.bound([account], [account, []]); + const result = await promisifyRequest(store.getAll(range)); + return await Promise.all(result.map(async (r) => new snikket.SerializedChat( + r.chatId, + r.trusted, + r.avatarSha1, + new Map(await Promise.all((r.presence instanceof Map ? [...r.presence.entries()] : Object.entries(r.presence)).map( + async ([k, p]) => [k, new snikket.Presence(p.caps && await this.getCaps(p.caps), p.mucUser && snikket.Stanza.parse(p.mucUser))] + ))), + r.displayName, + r.uiState, + r.isBlocked, + r.extensions, + r.readUpToId, + r.readUpToBy, + r.notificationSettings === undefined ? null : r.notificationSettings != null, + r.notificationSettings?.mention, + r.notificationSettings?.reply, + r.disco ? new snikket.Caps(r.disco.node, r.disco.identities, r.disco.features) : null, + r.class + ))); }, - getChatsUnreadDetails: function(account, chatsArray, callback) { + getChatsUnreadDetails: async function(account, chatsArray) { const tx = db.transaction(["messages"], "readonly"); const store = tx.objectStore("messages"); @@ -287,10 +282,11 @@ export default (dbname, media, tokenize, stemmer) => { chatsArray.forEach((chat) => chats[chat.chatId] = chat); const result = {}; var rowCount = 0; - cursor.onsuccess = (event) => { - if (event.target.result && rowCount < 40000) { + while (true) { + const cresult = await promisifyRequest(cursor); + if (cresult && rowCount < 40000) { rowCount++; - const value = event.target.result.value; + const value = cresult.value; if (result[value.chatId]) { result[value.chatId] = result[value.chatId].then((details) => { if (!details.foundAll) { @@ -308,63 +304,55 @@ export default (dbname, media, tokenize, stemmer) => { const haveRead = readUpTo === value.serverId || readUpTo === value.localId || value.direction == enums.MessageDirection.MessageSent; result[value.chatId] = hydrateMessage(value).then((m) => ({ chatId: value.chatId, message: m, unreadCount: haveRead ? 0 : 1, foundAll: haveRead })); } - event.target.result.continue(); + cresult.continue(); } else { - Promise.all(Object.values(result)).then(callback); + return await Promise.all(Object.values(result)); } } - cursor.onerror = (event) => { - console.error(event); - callback([]); - } }, - getMessage: function(account, chatId, serverId, localId, callback) { + getMessage: async function(account, chatId, serverId, localId) { const tx = db.transaction(["messages"], "readonly"); const store = tx.objectStore("messages"); - (async function() { - let result; - if (serverId) { - result = await promisifyRequest(store.openCursor(IDBKeyRange.bound([account, serverId], [account, serverId, []]))); - } else { - result = await promisifyRequest(store.index("localId").openCursor(IDBKeyRange.only([account, localId, chatId]))); - } - if (!result || !result.value) return null; - const message = result.value; - return await hydrateMessage(message); - })().then(callback); + let result; + if (serverId) { + result = await promisifyRequest(store.openCursor(IDBKeyRange.bound([account, serverId], [account, serverId, []]))); + } else { + result = await promisifyRequest(store.index("localId").openCursor(IDBKeyRange.only([account, localId, chatId]))); + } + if (!result || !result.value) return null; + const message = result.value; + return await hydrateMessage(message); }, - storeReaction: function(account, update, callback) { - (async function() { - const tx = db.transaction(["messages", "reactions"], "readwrite"); - const store = tx.objectStore("messages"); - const reactionStore = tx.objectStore("reactions"); - let result; - if (update.serverId) { - result = await promisifyRequest(store.openCursor(IDBKeyRange.bound([account, update.serverId, update.serverIdBy], [account, update.serverId, update.serverIdBy, []]))); - } else { - result = await promisifyRequest(store.index("localId").openCursor(IDBKeyRange.only([account, update.localId, update.chatId]))); - } - const lastFromSender = await promisifyRequest(reactionStore.index("senders").openCursor(IDBKeyRange.bound( - [account, update.chatId, update.serverId || update.localId, update.senderId], - [account, update.chatId, update.serverId || update.localId, update.senderId, []] - ), "prev")); - const reactions = update.getReactions(hydrateReactionsArray(lastFromSender?.value?.reactions)); - await promisifyRequest(reactionStore.put({...update, reactions: reactions, append: (update.kind === enums.ReactionUpdateKind.AppendReactions ? update.reactions : null), messageId: update.serverId || update.localId, timestamp: new Date(update.timestamp), account: account})); - if (!result || !result.value) return null; - if (lastFromSender?.value && lastFromSender.value.timestamp > new Date(update.timestamp)) return; - const message = result.value; - setReactions(message.reactions, update.senderId, reactions); - store.put(message); - return await hydrateMessage(message); - })().then(callback); + storeReaction: async function(account, update) { + const tx = db.transaction(["messages", "reactions"], "readwrite"); + const store = tx.objectStore("messages"); + const reactionStore = tx.objectStore("reactions"); + let result; + if (update.serverId) { + result = await promisifyRequest(store.openCursor(IDBKeyRange.bound([account, update.serverId, update.serverIdBy], [account, update.serverId, update.serverIdBy, []]))); + } else { + result = await promisifyRequest(store.index("localId").openCursor(IDBKeyRange.only([account, update.localId, update.chatId]))); + } + const lastFromSender = await promisifyRequest(reactionStore.index("senders").openCursor(IDBKeyRange.bound( + [account, update.chatId, update.serverId || update.localId, update.senderId], + [account, update.chatId, update.serverId || update.localId, update.senderId, []] + ), "prev")); + const reactions = update.getReactions(hydrateReactionsArray(lastFromSender?.value?.reactions)); + await promisifyRequest(reactionStore.put({...update, reactions: reactions, append: (update.kind === enums.ReactionUpdateKind.AppendReactions ? update.reactions : null), messageId: update.serverId || update.localId, timestamp: new Date(update.timestamp), account: account})); + if (!result || !result.value) return null; + if (lastFromSender?.value && lastFromSender.value.timestamp > new Date(update.timestamp)) return; + const message = result.value; + setReactions(message.reactions, update.senderId, reactions); + store.put(message); + return await hydrateMessage(message); }, storeMessages(account, messages, callback) { - Promise.all(messages.map(m => + return Promise.all(messages.map(m => new Promise(resolve => this.storeMessage(account, m, resolve)) - )).then(callback); + )); }, storeMessage: function(account, message, callback) { @@ -373,9 +361,9 @@ export default (dbname, media, tokenize, stemmer) => { if (!message.serverId && message.isIncoming()) throw "Cannot store an incoming message with no server id"; if (message.serverId && !message.serverIdBy) throw "Cannot store a message with a server id and no by"; - new Promise((resolve) => + ( // Hydrate reply stubs - message.replyToMessage && !message.replyToMessage.serverIdBy ? this.getMessage(account, message.chatId(), message.replyToMessage.serverId, message.replyToMessage.localId, resolve) : resolve(message.replyToMessage) + message.replyToMessage && !message.replyToMessage.serverIdBy ? this.getMessage(account, message.chatId(), message.replyToMessage.serverId, message.replyToMessage.localId) : Promise.resolve(message.replyToMessage) ).then((replyToMessage) => { message.replyToMessage = replyToMessage; const tx = db.transaction(["messages", "reactions"], "readwrite"); @@ -385,7 +373,7 @@ export default (dbname, media, tokenize, stemmer) => { promisifyRequest(tx.objectStore("reactions").openCursor(IDBKeyRange.only([account, message.chatId(), message.senderId, message.localId || ""]))) ]).then(([result, reactionResult]) => { if (reactionResult?.value?.append && message.html().trim() == "") { - this.getMessage(account, message.chatId(), reactionResult.value.serverId, reactionResult.value.localId, (reactToMessage) => { + this.getMessage(account, message.chatId(), reactionResult.value.serverId, reactionResult.value.localId).then((reactToMessage) => { const previouslyAppended = hydrateReactionsArray(reactionResult.value.append, reactionResult.value.senderId, reactionResult.value.timestamp).map(r => r.key); const reactions = []; for (const [k, reacts] of reactToMessage?.reactions || []) { @@ -441,19 +429,19 @@ export default (dbname, media, tokenize, stemmer) => { store.put(serializeMessage(account, message)); }, - updateMessageStatus: function(account, localId, status, callback) { + updateMessageStatus: async function(account, localId, status) { const tx = db.transaction(["messages"], "readwrite"); const store = tx.objectStore("messages"); - promisifyRequest(store.index("localId").openCursor(IDBKeyRange.bound([account, localId], [account, localId, []]))).then((result) => { - if (result?.value && result.value.direction === enums.MessageDirection.MessageSent && result.value.status !== enums.MessageStatus.MessageDeliveredToDevice) { - const newStatus = { ...result.value, status: status }; - result.update(newStatus); - hydrateMessage(newStatus).then(callback); - } - }); + const result = await promisifyRequest(store.index("localId").openCursor(IDBKeyRange.bound([account, localId], [account, localId, []]))); + if (result?.value && result.value.direction === enums.MessageDirection.MessageSent && result.value.status !== enums.MessageStatus.MessageDeliveredToDevice) { + const newStatus = { ...result.value, status: status }; + result.update(newStatus); + return await hydrateMessage(newStatus); + } + throw "Message not found: " + localId; }, - getMessagesBefore: function(account, chatId, beforeId, beforeTime, callback) { + getMessagesBefore: async function(account, chatId, beforeId, beforeTime) { // TODO: if beforeId is present but beforeTime is null, lookup time const bound = beforeTime ? new Date(beforeTime) : []; const tx = db.transaction(["messages"], "readonly"); @@ -462,10 +450,11 @@ export default (dbname, media, tokenize, stemmer) => { IDBKeyRange.bound([account, chatId], [account, chatId, bound]), "prev" ); - this.getMessagesFromCursor(cursor, beforeId, bound, (messages) => callback(messages.reverse())); + const messages = await this.getMessagesFromCursor(cursor, beforeId, bound); + return messages.reverse(); }, - getMessagesAfter: function(account, chatId, afterId, afterTime, callback) { + getMessagesAfter: async function(account, chatId, afterId, afterTime) { // TODO: if afterId is present but afterTime is null, lookup time const bound = afterTime ? [new Date(afterTime)] : []; const tx = db.transaction(["messages"], "readonly"); @@ -474,64 +463,49 @@ export default (dbname, media, tokenize, stemmer) => { IDBKeyRange.bound([account, chatId].concat(bound), [account, chatId, []]), "next" ); - this.getMessagesFromCursor(cursor, afterId, bound[0], callback); + return this.getMessagesFromCursor(cursor, afterId, bound[0]); }, - getMessagesAround: function(account, chatId, id, timeArg, callback) { + getMessagesAround: async function(account, chatId, id, timeArg) { if (!id && !timeArg) throw "Around what?"; - new Promise((resolve, reject) => { - if (timeArg) { - resolve(timeArg); - } else { - this.getMessage(account, chatId, id, null, (m) => { - m ? resolve(m.timestamp) : this.getMessage(account, chatId, null, id, (m2) => resolve(m2?.timestamp)); - }); - } - }).then((time) => { - if (!time) { - callback([]); - return; - } - const before = new Promise((resolve, reject) => - this.getMessagesBefore(account, chatId, id, time, resolve) - ); - const tx = db.transaction(["messages"], "readonly"); - const store = tx.objectStore("messages"); - const cursor = store.index("chats").openCursor( - IDBKeyRange.bound([account, chatId, new Date(time)], [account, chatId, []]), - "next" - ); - const aroundAndAfter = new Promise((resolve, reject) => - this.getMessagesFromCursor(cursor, null, null, resolve) - ); + const time = await ( + timeArg ? Promise.resolve(timeArg) : + this.getMessage(account, chatId, id, null).then((m) => + m ? m.timestamp : this.getMessage(account, chatId, null, id).then((m2) => m2?.timestamp) + ) + ); + if (!time) return []; - Promise.all([before, aroundAndAfter]).then((result) => { - callback(result.flat()); - }); - }); + const before = this.getMessagesBefore(account, chatId, id, time); + const tx = db.transaction(["messages"], "readonly"); + const store = tx.objectStore("messages"); + const cursor = store.index("chats").openCursor( + IDBKeyRange.bound([account, chatId, new Date(time)], [account, chatId, []]), + "next" + ); + const aroundAndAfter = this.getMessagesFromCursor(cursor, null, null); + + return Promise.all([before, aroundAndAfter]).then(result => result.flat()); }, - getMessagesFromCursor: function(cursor, id, bound, callback) { + getMessagesFromCursor: async function(cursor, id, bound) { const result = []; - cursor.onsuccess = (event) => { - if (event.target.result && result.length < 50) { - const value = event.target.result.value; + while (true) { + const cresult = await promisifyRequest(cursor); + if (cresult && result.length < 50) { + const value = cresult.value; if (value.serverId === id || value.localId === id || (value.timestamp && value.timestamp.getTime() === (bound instanceof Date && bound.getTime()))) { - event.target.result.continue(); - return; + cresult.continue(); + continue; } result.push(hydrateMessage(value)); - event.target.result.continue(); + cresult.continue(); } else { - Promise.all(result).then(callback); + return await Promise.all(result); } } - cursor.onerror = (event) => { - console.error(event); - callback([]); - } }, searchMessages: function(account, chatId, q, callback) { @@ -569,16 +543,16 @@ export default (dbname, media, tokenize, stemmer) => { } }, - hasMedia: function(hashAlgorithm, hash, callback) { - media.hasMedia(hashAlgorithm, hash, callback); + hasMedia: function(hashAlgorithm, hash) { + return media.hasMedia(hashAlgorithm, hash); }, removeMedia: function(hashAlgorithm, hash) { media.removeMedia(hashAlgorithm, hash); }, - storeMedia: function(mime, buffer, callback) { - media.storeMedia(mime, buffer, callback); + storeMedia: function(mime, buffer) { + return media.storeMedia(mime, buffer); }, storeCaps: function(caps) { @@ -587,17 +561,15 @@ export default (dbname, media, tokenize, stemmer) => { store.put(caps, "caps:" + caps.ver()).onerror = console.error; }, - getCaps: function(ver, callback) { - (async function() { - const tx = db.transaction(["keyvaluepairs"], "readonly"); - const store = tx.objectStore("keyvaluepairs"); - const raw = await promisifyRequest(store.get("caps:" + ver)); - if (raw) { - return (new snikket.Caps(raw.node, raw.identities.map((identity) => new snikket.Identity(identity.category, identity.type, identity.name)), raw.features)); - } + getCaps: async function(ver) { + const tx = db.transaction(["keyvaluepairs"], "readonly"); + const store = tx.objectStore("keyvaluepairs"); + const raw = await promisifyRequest(store.get("caps:" + ver)); + if (raw) { + return new snikket.Caps(raw.node, raw.identities.map((identity) => new snikket.Identity(identity.category, identity.type, identity.name)), raw.features); + } - return null; - })().then(callback); + return null; }, storeLogin: function(login, clientId, displayName, token) { @@ -622,30 +594,23 @@ export default (dbname, media, tokenize, stemmer) => { req.onerror = () => { console.error("storeStreamManagement", req.error.name, req.error.message); } }, - getStreamManagement: function(account, callback) { + async getStreamManagement(account) { const tx = db.transaction(["keyvaluepairs"], "readonly"); const store = tx.objectStore("keyvaluepairs"); - promisifyRequest(store.get("sm:" + account)).then( - (v) => { - if (v instanceof ArrayBuffer) { - callback(v); - } else if(!v) { - callback(null); - } else { - new Blob([JSON.stringify(v)], {type: "text/plain; charset=utf-8"}).arrayBuffer().then(callback); - } - }, - (e) => { - console.error(e); - callback(null); - } - ); + const v = await promisifyRequest(store.get("sm:" + account)); + if (v instanceof ArrayBuffer) { + return v; + } else if(!v) { + return null; + } else { + return new Blob([JSON.stringify(v)], {type: "text/plain; charset=utf-8"}).arrayBuffer(); + } }, getLogin: function(login, callback) { const tx = db.transaction(["keyvaluepairs"], "readwrite"); const store = tx.objectStore("keyvaluepairs"); - Promise.all([ + return Promise.all([ promisifyRequest(store.get("login:clientId:" + login)), promisifyRequest(store.get("login:token:" + login)), promisifyRequest(store.get("login:fastCount:" + login)), @@ -654,10 +619,7 @@ export default (dbname, media, tokenize, stemmer) => { if (result[1]) { store.put((result[2] || 0) + 1, "login:fastCount:" + login).onerror = console.error; } - callback(result[0], result[1], result[2] || 0, result[3]); - }).catch((e) => { - console.error(e); - callback(null, null, 0, null); + return { clientId: result[0], token: result[1], fastCount: result[2] || 0, displayName: result[3] }; }); }, @@ -709,6 +671,13 @@ export default (dbname, media, tokenize, stemmer) => { }; }, + async listAccounts() { + const tx = db.transaction(["keyvaluepairs"], "readonly"); + const store = tx.objectStore("keyvaluepairs"); + const keys = await promisifyRequest(store.getAllKeys(IDBKeyRange.bound("login:clientId:", "login:clientId:\uffff"))); + return keys.map(k => k.substring(15)); + }, + storeService(account, serviceId, name, node, caps) { this.storeCaps(caps); @@ -724,38 +693,35 @@ export default (dbname, media, tokenize, stemmer) => { }); }, - findServicesWithFeature(account, feature, callback) { + async findServicesWithFeature(account, feature) { const tx = db.transaction(["services"], "readonly"); const store = tx.objectStore("services"); // Almost full scan shouldn't be too expensive, how many services are we aware of? const cursor = store.openCursor(IDBKeyRange.bound([account], [account, []])); const result = []; - cursor.onsuccess = (event) => { - if (event.target.result) { - const value = event.target.result.value; - result.push(new Promise((resolve) => this.getCaps(value.caps, (caps) => resolve({ ...value, caps: caps })))); - event.target.result.continue(); + while (true) { + const cresult = await promisifyRequest(cursor); + if (cresult) { + const value = cresult.value; + result.push(this.getCaps(value.caps).then((caps) => ({ ...value, caps: caps }))); + cresult.continue(); } else { - Promise.all(result).then((items) => items.filter((item) => item.caps && item.caps.features.includes(feature))).then(callback); + return await Promise.all(result).then((items) => items.filter((item) => item.caps && item.caps.features.includes(feature))); } } - cursor.onerror = (event) => { - console.error(event); - callback([]); - } }, - get(k, callback) { + get(k) { const tx = db.transaction(["keyvaluepairs"], "readonly"); const store = tx.objectStore("keyvaluepairs"); - promisifyRequest(store.get(k)).then(callback); + return promisifyRequest(store.get(k)); }, - set(k, v, callback) { + set(k, v) { const tx = db.transaction(["keyvaluepairs"], "readwrite"); const store = tx.objectStore("keyvaluepairs"); - promisifyRequest(store.put(v, k)).then(callback); + return promisifyRequest(store.put(v, k)); } }; diff --git a/snikket/persistence/KeyValueStore.hx b/snikket/persistence/KeyValueStore.hx index 7b1a7e5..e134e1f 100644 --- a/snikket/persistence/KeyValueStore.hx +++ b/snikket/persistence/KeyValueStore.hx @@ -1,9 +1,11 @@ package snikket.persistence; +import thenshim.Promise; + #if cpp @:build(HaxeSwiftBridge.expose()) #end interface KeyValueStore { - public function get(k: String, callback: (Null<String>)->Void): Void; - public function set(k: String, v: Null<String>, callback: ()->Void): Void; + public function get(k: String): Promise<Null<String>>; + public function set(k: String, v: Null<String>): Promise<Bool>; } diff --git a/snikket/persistence/MediaStore.hx b/snikket/persistence/MediaStore.hx index c0d89bb..9f1bec8 100644 --- a/snikket/persistence/MediaStore.hx +++ b/snikket/persistence/MediaStore.hx @@ -1,14 +1,15 @@ package snikket.persistence; +import thenshim.Promise; import haxe.io.BytesData; #if cpp @:build(HaxeSwiftBridge.expose()) #end interface MediaStore { - public function hasMedia(hashAlgorithm:String, hash:BytesData, callback: (has:Bool)->Void):Void; + public function hasMedia(hashAlgorithm:String, hash:BytesData): Promise<Bool>; public function removeMedia(hashAlgorithm:String, hash:BytesData):Void; - public function storeMedia(mime:String, bytes:BytesData, callback: ()->Void):Void; + public function storeMedia(mime:String, bytes:BytesData): Promise<Bool>; @:allow(snikket) private function setKV(kv: KeyValueStore):Void; } diff --git a/snikket/persistence/MediaStoreCache.js b/snikket/persistence/MediaStoreCache.js index 7afdba9..d16e3c5 100644 --- a/snikket/persistence/MediaStoreCache.js +++ b/snikket/persistence/MediaStoreCache.js @@ -15,14 +15,13 @@ export default (cacheName) => { this.kv = kv; }, - storeMedia(mime, buffer, callback) { - (async () => { - const sha256 = await crypto.subtle.digest("SHA-256", buffer); - const sha1 = await crypto.subtle.digest("SHA-1", buffer); - const sha256NiUrl = mkNiUrl("sha-256", sha256); - await cache.put(sha256NiUrl, new Response(buffer, { headers: { "Content-Type": mime } })); - if (this.kv) await new Promise((resolve) => this.kv.set(mkNiUrl("sha-1", sha1), sha256NiUrl, resolve)); - })().then(callback); + async storeMedia(mime, buffer) { + const sha256 = await crypto.subtle.digest("SHA-256", buffer); + const sha1 = await crypto.subtle.digest("SHA-1", buffer); + const sha256NiUrl = mkNiUrl("sha-256", sha256); + await cache.put(sha256NiUrl, new Response(buffer, { headers: { "Content-Type": mime } })); + if (this.kv) await this.kv.set(mkNiUrl("sha-1", sha1), sha256NiUrl); + return true; }, removeMedia(hashAlgorithm, hash) { @@ -31,7 +30,7 @@ export default (cacheName) => { if (hashAlgorithm === "sha-256") { niUrl = mkNiUrl(hashAlgorithm, hash); } else { - niUrl = this.kv && await new Promise((resolve) => this.kv.get(mkNiUrl(hashAlgorithm, hash), resolve)); + niUrl = this.kv && await this.kv.get(mkNiUrl(hashAlgorithm, hash)); if (!niUrl) return; } @@ -61,7 +60,7 @@ export default (cacheName) => { if (uri.split("/")[3] === "sha-256") { niUrl = uri; } else { - niUrl = this.kv && await new Promise((resolve) => this.kv.get(uri, resolve)); + niUrl = this.kv && await this.kv.get(uri); if (!niUrl) { return null; } @@ -70,11 +69,9 @@ export default (cacheName) => { return await cache.match(niUrl); }, - hasMedia(hashAlgorithm, hash, callback) { - (async () => { - const response = await this.getMediaResponse(mkNiUrl(hashAlgorithm, hash)); - return !!response; - })().then(callback); + async hasMedia(hashAlgorithm, hash, callback) { + const response = await this.getMediaResponse(mkNiUrl(hashAlgorithm, hash)); + return !!response; } }; }; diff --git a/snikket/persistence/MediaStoreFS.hx b/snikket/persistence/MediaStoreFS.hx index af671a9..2f3c8da 100644 --- a/snikket/persistence/MediaStoreFS.hx +++ b/snikket/persistence/MediaStoreFS.hx @@ -26,66 +26,62 @@ class MediaStoreFS implements MediaStore { this.kv = kv; } - public function getMediaPath(uri: String, callback: (Null<String>)->Void) { + public function getMediaPath(uri: String): Promise<Null<String>> { final hash = Hash.fromUri(uri); if (hash.algorithm == "sha-256") { final path = blobpath + "/f" + hash.toHex(); if (FileSystem.exists(path)) { - callback(FileSystem.absolutePath(path)); + return Promise.resolve(FileSystem.absolutePath(path)); } else { - callback(null); + return Promise.resolve(null); } } else { - get(hash.serializeUri()).then(sha256uri -> { + return get(hash.serializeUri()).then(sha256uri -> { final sha256 = sha256uri == null ? null : Hash.fromUri(sha256uri); if (sha256 == null) { - callback(null); + return Promise.resolve(null); } else { - getMediaPath(sha256.toUri(), callback); + return getMediaPath(sha256.toUri()); } }); } } @HaxeCBridge.noemit - public function hasMedia(hashAlgorithm:String, hash:BytesData, callback: (Bool)->Void) { + public function hasMedia(hashAlgorithm:String, hash:BytesData): Promise<Bool> { final hash = new Hash(hashAlgorithm, hash); - getMediaPath(hash.toUri(), path -> callback(path != null)); + return getMediaPath(hash.toUri()).then(path -> path != null); } @HaxeCBridge.noemit public function removeMedia(hashAlgorithm: String, hash: BytesData) { final hash = new Hash(hashAlgorithm, hash); - getMediaPath(hash.toUri(), (path) -> { + getMediaPath(hash.toUri()).then((path) -> { if (path != null) FileSystem.deleteFile(path); }); } @HaxeCBridge.noemit - public function storeMedia(mime: String, bd: BytesData, callback: ()->Void) { + public function storeMedia(mime: String, bd: BytesData): Promise<Bool> { final bytes = Bytes.ofData(bd); final sha1 = Hash.sha1(bytes); final sha256 = Hash.sha256(bytes); File.saveBytes(blobpath + "/f" + sha256.toHex(), bytes); - thenshim.PromiseTools.all([ + return thenshim.PromiseTools.all([ set(sha1.serializeUri(), sha256.serializeUri()), set(sha256.serializeUri() + "#contentType", mime) - ]).then((_) -> callback()); + ]).then(_ -> true); } private function set(k: String, v: Null<String>) { if (kv == null) return Promise.resolve(null); - return new Promise((resolve, reject) -> - kv.set(k, v, () -> resolve(null)) - ); + return kv.set(k, v); } private function get(k: String): Promise<Null<String>> { if (kv == null) return Promise.resolve(null); - return new Promise((resolve, reject) -> - kv.get(k, resolve) - ); + return kv.get(k); } } diff --git a/snikket/persistence/Sqlite.hx b/snikket/persistence/Sqlite.hx index 3dcf096..d509761 100644 --- a/snikket/persistence/Sqlite.hx +++ b/snikket/persistence/Sqlite.hx @@ -129,31 +129,26 @@ class Sqlite implements Persistence implements KeyValueStore { } @HaxeCBridge.noemit - public function get(k: String, callback: (Null<String>)->Void) { - db.exec("SELECT v FROM keyvaluepairs WHERE k=? LIMIT 1", [k]).then(iter -> { + public function get(k: String): Promise<Null<String>> { + return db.exec("SELECT v FROM keyvaluepairs WHERE k=? LIMIT 1", [k]).then(iter -> { for (row in iter) { - callback(row.v); - return; + return row.v; } - callback(null); + return null; }); } @HaxeCBridge.noemit - public function set(k: String, v: Null<String>, callback: ()->Void) { - if (v == null) { - db.exec("DELETE FROM keyvaluepairs WHERE k=?", [k]).then(_ -> { - callback(); - }); + public function set(k: String, v: Null<String>): Promise<Bool> { + return if (v == null) { + db.exec("DELETE FROM keyvaluepairs WHERE k=?", [k]).then(_ -> true); } else { - db.exec("INSERT OR REPLACE INTO keyvaluepairs VALUES (?,?)", [k, v]).then(_ -> { - callback(); - }); + db.exec("INSERT OR REPLACE INTO keyvaluepairs VALUES (?,?)", [k, v]).then(_ -> true); } } @HaxeCBridge.noemit - public function lastId(accountId: String, chatId: Null<String>, callback:(Null<String>)->Void):Void { + public function lastId(accountId: String, chatId: Null<String>): Promise<Null<String>> { final params = [accountId]; var q = "SELECT mam_id FROM messages WHERE mam_id IS NOT NULL AND sync_point AND account_id=?"; if (chatId == null) { @@ -164,7 +159,7 @@ class Sqlite implements Persistence implements KeyValueStore { params.push(chatId); } q += " ORDER BY ROWID DESC LIMIT 1"; - db.exec(q, params).then(iter -> callback(iter.next()?.mam_id), (_) -> callback(null)); + return db.exec(q, params).then(iter -> cast (iter.next()?.mam_id, Null<String>)); } private final storeChatBuffer: Map<String, Chat> = []; @@ -227,8 +222,8 @@ class Sqlite implements Persistence implements KeyValueStore { } @HaxeCBridge.noemit - public function getChats(accountId: String, callback: (Array<SerializedChat>)->Void) { - db.exec( + public function getChats(accountId: String): Promise<Array<SerializedChat>> { + return db.exec( "SELECT chat_id, trusted, avatar_sha1, fn, ui_state, blocked, extensions, read_up_to_id, read_up_to_by, notifications_filtered, notify_mention, notify_reply, json(caps) AS caps, caps_ver, json(presence) AS presence, class FROM chats LEFT JOIN caps ON chats.caps_ver=caps.sha1 WHERE account_id=?", [accountId] ).then(result -> { @@ -272,14 +267,13 @@ class Sqlite implements Persistence implements KeyValueStore { chats.push(new SerializedChat(row.chat_id, row.trusted != 0, row.avatar_sha1, presenceMap, row.fn, row.ui_state, row.blocked != 0, row.extensions, row.read_up_to_id, row.read_up_to_by, row.notifications_filtered == null ? null : row.notifications_filtered != 0, row.notify_mention != 0, row.notify_reply != 0, row.capsObj, Reflect.field(row, "class"))); } return chats; - }).then(callback); + }); } @HaxeCBridge.noemit - public function storeMessages(accountId: String, messages: Array<ChatMessage>, callback: (Array<ChatMessage>)->Void) { + public function storeMessages(accountId: String, messages: Array<ChatMessage>): Promise<Array<ChatMessage>> { if (messages.length < 1) { - callback(messages); - return; + return Promise.resolve(messages); } final chatIds = []; @@ -301,7 +295,7 @@ class Sqlite implements Persistence implements KeyValueStore { } } - (if (chatIds.length > 0 && localIds.length > 0) { + return (if (chatIds.length > 0 && localIds.length > 0) { // Hmm, this loses the original timestamp though final q = new StringBuf(); q.add("DELETE FROM messages WHERE account_id=? AND direction=? AND chat_id IN ("); @@ -328,7 +322,7 @@ class Sqlite implements Persistence implements KeyValueStore { }) ) ).then(_ -> - hydrateReplyTo(accountId, messages, replyTos).then(ms -> hydrateReactions(accountId, ms)).then(callback) + hydrateReplyTo(accountId, messages, replyTos).then(ms -> hydrateReactions(accountId, ms)) ); // TODO: retract custom emoji? @@ -336,10 +330,10 @@ class Sqlite implements Persistence implements KeyValueStore { @HaxeCBridge.noemit public function updateMessage(accountId: String, message: ChatMessage) { - storeMessages(accountId, [message], (_)->{}); + storeMessages(accountId, [message]); } - public function getMessage(accountId: String, chatId: String, serverId: Null<String>, localId: Null<String>, callback: (Null<ChatMessage>)->Void) { + public function getMessage(accountId: String, chatId: String, serverId: Null<String>, localId: Null<String>): Promise<Null<ChatMessage>> { var q = "SELECT stanza, direction, type, status, strftime('%FT%H:%M:%fZ', created_at / 1000.0, 'unixepoch') AS timestamp, sender_id, mam_id, mam_by, sync_point FROM messages WHERE account_id=? AND chat_id=?"; final params = [accountId, chatId]; if (serverId != null) { @@ -350,17 +344,15 @@ class Sqlite implements Persistence implements KeyValueStore { params.push(localId); } q += "LIMIT 1"; - db.exec(q, params).then(result -> hydrateMessages(accountId, result)).then(messages -> { - for (message in messages) { + return db.exec(q, params).then(result -> hydrateMessages(accountId, result)).then(messages -> + thenshim.PromiseTools.all(messages.map(message -> (if (message.replyToMessage != null) { hydrateReplyTo(accountId, [message], [{ chatId: chatId, serverId: message.replyToMessage.serverId, localId: message.replyToMessage.localId }]); } else { Promise.resolve([message]); - }).then(messages -> hydrateReactions(accountId, messages)).then(hydrated -> callback(hydrated[0])); - return; - } - callback(null); - }); + }).then(messages -> hydrateReactions(accountId, messages)) + )).then(items -> items.flatten()).then(items -> items.length > 0 ? items[0] : null) + ); } private function getMessages(accountId: String, chatId: String, time: Null<String>, op: String): Promise<Array<ChatMessage>> { @@ -406,23 +398,23 @@ class Sqlite implements Persistence implements KeyValueStore { } @HaxeCBridge.noemit - public function getMessagesBefore(accountId: String, chatId: String, beforeId: Null<String>, beforeTime: Null<String>, callback: (Array<ChatMessage>)->Void) { - getMessages(accountId, chatId, beforeTime, "<").then(callback); + public function getMessagesBefore(accountId: String, chatId: String, beforeId: Null<String>, beforeTime: Null<String>): Promise<Array<ChatMessage>> { + return getMessages(accountId, chatId, beforeTime, "<"); } @HaxeCBridge.noemit - public function getMessagesAfter(accountId: String, chatId: String, afterId: Null<String>, afterTime: Null<String>, callback: (Array<ChatMessage>)->Void) { - getMessages(accountId, chatId, afterTime, ">").then(callback); + public function getMessagesAfter(accountId: String, chatId: String, afterId: Null<String>, afterTime: Null<String>): Promise<Array<ChatMessage>> { + return getMessages(accountId, chatId, afterTime, ">"); } @HaxeCBridge.noemit - public function getMessagesAround(accountId: String, chatId: String, aroundId: Null<String>, aroundTime: Null<String>, callback: (Array<ChatMessage>)->Void) { - (if (aroundTime == null) { - new Promise((resolve, reject) -> getMessage(accountId, chatId, aroundId, null, resolve)).then(m -> + public function getMessagesAround(accountId: String, chatId: String, aroundId: Null<String>, aroundTime: Null<String>): Promise<Array<ChatMessage>> { + return (if (aroundTime == null) { + getMessage(accountId, chatId, aroundId, null).then(m -> if (m != null) { Promise.resolve(m.timestamp); } else { - new Promise((resolve, reject) -> getMessage(accountId, chatId, null, aroundId, resolve)).then(m -> m?.timestamp); + getMessage(accountId, chatId, null, aroundId).then(m -> m?.timestamp); } ); } else { @@ -432,19 +424,16 @@ class Sqlite implements Persistence implements KeyValueStore { getMessages(accountId, chatId, aroundTime, "<"), getMessages(accountId, chatId, aroundTime, ">=") ]) - ).then(results -> - callback(results.flatMap(arr -> arr)) - ); + ).then(results -> results.flatten()); } @HaxeCBridge.noemit - public function getChatsUnreadDetails(accountId: String, chats: Array<Chat>, callback: (Array<{ chatId: String, message: ChatMessage, unreadCount: Int }>)->Void) { + public function getChatsUnreadDetails(accountId: String, chats: Array<Chat>): Promise<Array<{ chatId: String, message: ChatMessage, unreadCount: Int }>> { if (chats == null || chats.length < 1) { - callback([]); - return; + return Promise.resolve([]); } - Promise.resolve(null).then(_ -> { + return Promise.resolve(null).then(_ -> { final params: Array<Dynamic> = [accountId]; // subq is first in final q, so subq params first final subq = new StringBuf(); @@ -482,7 +471,7 @@ class Sqlite implements Persistence implements KeyValueStore { return db.exec(q.toString(), params); }).then(result -> { iterator: () -> result }.array() - ).then((rows: Array<Dynamic>) -> { + ).then((rows: Array<Dynamic>) -> Promise.resolve(hydrateMessages(accountId, rows.iterator())).then(messages -> { final details = []; for (i => m in messages) { @@ -492,14 +481,14 @@ class Sqlite implements Persistence implements KeyValueStore { message: m }); } - callback(details); - }); - }); + return details; + }) + ); } @HaxeCBridge.noemit - public function storeReaction(accountId: String, update: ReactionUpdate, callback: (Null<ChatMessage>)->Void) { - db.exec( + public function storeReaction(accountId: String, update: ReactionUpdate): Promise<Null<ChatMessage>> { + return db.exec( "INSERT OR REPLACE INTO reactions VALUES (?,?,?,?,?,?,?,CAST(unixepoch(?, 'subsec') * 1000 AS INTEGER),jsonb(?),?)", [ accountId, update.updateId, update.serverId, update.serverIdBy, @@ -507,36 +496,34 @@ class Sqlite implements Persistence implements KeyValueStore { JsonPrinter.print(update.reactions), update.kind ] ).then(_ -> - this.getMessage(accountId, update.chatId, update.serverId, update.localId, callback) + this.getMessage(accountId, update.chatId, update.serverId, update.localId) ); } @HaxeCBridge.noemit - public function updateMessageStatus(accountId: String, localId: String, status:MessageStatus, callback: (ChatMessage)->Void) { - db.exec( + public function updateMessageStatus(accountId: String, localId: String, status:MessageStatus): Promise<ChatMessage> { + return db.exec( "UPDATE messages SET status=? WHERE account_id=? AND stanza_id=? AND direction=? AND status <> ?", [status, accountId, localId, MessageSent, MessageDeliveredToDevice] ).then(_ -> db.exec( - "SELECT stanza, direction, type, status, strftime('%FT%H:%M:%fZ', created_at / 1000.0, 'unixepoch') AS timestamp, sender_id, correction_id AS stanza_id, mam_id, mam_by, sync_point FROM messages WHERE account_id=? AND stanza_id=? AND direction=?", + "SELECT stanza, direction, type, status, strftime('%FT%H:%M:%fZ', created_at / 1000.0, 'unixepoch') AS timestamp, sender_id, correction_id AS stanza_id, mam_id, mam_by, sync_point FROM messages WHERE account_id=? AND stanza_id=? AND direction=? LIMIT 1", [accountId, localId, MessageSent] ) - ).then(result -> { - final messages = hydrateMessages(accountId, result); - for (message in messages) { + ).then(result -> + thenshim.PromiseTools.all(hydrateMessages(accountId, result).map(message -> (if (message.replyToMessage != null) { hydrateReplyTo(accountId, [message], [{ chatId: message.chatId(), serverId: message.replyToMessage.serverId, localId: message.replyToMessage.localId }]); } else { Promise.resolve([message]); - }).then(messages -> hydrateReactions(accountId, messages)).then(hydrated -> callback(hydrated[0])); - return; - } - }); + }).then(messages -> hydrateReactions(accountId, messages)) + )) + ).then(hydrated -> hydrated.flatten()).then(hydrated -> hydrated.length > 0 ? Promise.resolve(hydrated[0]) : Promise.reject("Message not found: " + localId)); } @HaxeCBridge.noemit - public function hasMedia(hashAlgorithm:String, hash:BytesData, callback: (Bool)->Void) { - media.hasMedia(hashAlgorithm, hash, callback); + public function hasMedia(hashAlgorithm:String, hash:BytesData): Promise<Bool> { + return media.hasMedia(hashAlgorithm, hash); } @HaxeCBridge.noemit @@ -545,8 +532,8 @@ class Sqlite implements Persistence implements KeyValueStore { } @HaxeCBridge.noemit - public function storeMedia(mime: String, bd: BytesData, callback: ()->Void) { - media.storeMedia(mime, bd, callback); + public function storeMedia(mime: String, bd: BytesData): Promise<Bool> { + return media.storeMedia(mime, bd); } @HaxeCBridge.noemit @@ -571,23 +558,21 @@ class Sqlite implements Persistence implements KeyValueStore { } @HaxeCBridge.noemit - public function getCaps(ver:String, callback: (Caps)->Void) { + public function getCaps(ver:String): Promise<Caps> { final verData = try { Base64.decode(ver).getData(); } catch (e) { - callback(null); - return; + return Promise.resolve(null); } - db.exec( + return db.exec( "SELECT json(caps) AS caps FROM caps WHERE sha1=? LIMIT 1", [verData] ).then(result -> { for (row in result) { final json = Json.parse(row.caps); - callback(new Caps(json.node, json.identities.map(i -> new Identity(i.category, i.type, i.name)), json.features, verData)); - return; + return new Caps(json.node, json.identities.map(i -> new Identity(i.category, i.type, i.name)), json.features, verData); } - callback(null); + return null; }); } @@ -618,20 +603,20 @@ class Sqlite implements Persistence implements KeyValueStore { } @HaxeCBridge.noemit - public function getLogin(accountId:String, callback:(Null<String>, Null<String>, Int, Null<String>)->Void) { - db.exec( - "SELECT client_id, display_name, token, fast_count FROM accounts WHERE account_id=? LIMIT 1", + public function getLogin(accountId: String): Promise<{ clientId:Null<String>, token:Null<String>, fastCount: Int, displayName:Null<String> }> { + return db.exec( + "SELECT client_id AS clientId, display_name AS displayName, token, COALESCE(fast_count, 0) AS fastCount FROM accounts WHERE account_id=? LIMIT 1", [accountId] ).then(result -> { for (row in result) { - if (row.token != null) { + final r: Dynamic = row; + if (r.token != null) { db.exec("UPDATE accounts SET fast_count=fast_count+1 WHERE account_id=?", [accountId]); } - callback(row.client_id, row.token, row.fast_count ?? 0, row.display_name); - return; + return r; } - callback(null, null, 0, null); + return { clientId: null, token: null, fastCount: 0, displayName: null }; }); } @@ -646,9 +631,9 @@ class Sqlite implements Persistence implements KeyValueStore { } - public function listAccounts(callback:(Array<String>)->Void) { - db.exec("SELECT account_id FROM accounts").then(result -> - callback(result == null ? [] : { iterator: () -> result }.map(row -> row.account_id)) + public function listAccounts(): Promise<Array<String>> { + return db.exec("SELECT account_id FROM accounts").then(result -> + result == null ? [] : { iterator: () -> result }.map(row -> row.account_id) ); } @@ -670,14 +655,13 @@ class Sqlite implements Persistence implements KeyValueStore { } @HaxeCBridge.noemit - public function getStreamManagement(accountId:String, callback: (Null<BytesData>)->Void) { - db.exec("SELECT sm_state FROM accounts WHERE account_id=?", [accountId]).then(result -> { + public function getStreamManagement(accountId:String): Promise<Null<BytesData>> { + return db.exec("SELECT sm_state FROM accounts WHERE account_id=?", [accountId]).then(result -> { for (row in result) { - callback(row.sm_state); - return; + return row.sm_state; } - callback(null); + return null; }); } @@ -692,9 +676,9 @@ class Sqlite implements Persistence implements KeyValueStore { } @HaxeCBridge.noemit - public function findServicesWithFeature(accountId:String, feature:String, callback:(Array<{serviceId:String, name:Null<String>, node:Null<String>, caps: Caps}>)->Void) { + public function findServicesWithFeature(accountId:String, feature:String): Promise<Array<{serviceId:String, name:Null<String>, node:Null<String>, caps: Caps}>> { // Almost full scan shouldn't be too expensive, how many services are we aware of? - db.exec( + return db.exec( "SELECT service_id, name, node, json(caps.caps) AS caps FROM services INNER JOIN caps ON services.caps=caps.sha1 WHERE account_id=?", [accountId] ).then(result -> { @@ -711,7 +695,7 @@ class Sqlite implements Persistence implements KeyValueStore { }); } } - callback(services); + return services; }); }