git » sdk » main » tree

[main] / borogove / streams / XmppJsStream.hx

package borogove.streams;

import haxe.Json;
import haxe.io.Bytes;
import haxe.io.BytesData;
import js.lib.Promise;
using Lambda;

import borogove.FSM;
import borogove.GenericStream;
import borogove.Stanza;
import borogove.Util;

@:js.import(@default "@xmpp/sasl-scram-sha-1")
extern class XmppJsScramSha1 {
	@:selfCall
	function new(sasl: Dynamic);
}

@:js.import("@xmpp/client", "client")
extern class XmppJsClient {
	function new(options:Dynamic);
	function start():Promise<Dynamic>;
	function stop():Promise<Dynamic>;
	function on(eventName:String, callback:(Dynamic)->Void):Void;
	function send(stanza:XmppJsXml):Void;
	var jid:XmppJsJID;
	var streamFrom:Null<XmppJsJID>;
	var status: String;
	var iqCallee:{
		get: (String, String, ({stanza: XmppJsXml})->Any)->Void,
		set: (String, String, ({stanza: XmppJsXml})->Any)->Void,
	};
	var middleware: { use:(({stanza: XmppJsXml})->Void)->Void };
	var streamFeatures: { use:(String,String,({}, ()->Void, XmppJsXml)->Void)->Void };
	var streamManagement: {
		id:String,
		outbound: Int,
		inbound: Int,
		outbound_q: Array<{ stanza: XmppJsXml, stamp: String }>,
		enabled: Bool,
		allowResume: Bool,
		on: (String, Dynamic->Void)->Void
	};
	var saslFactory: Dynamic;
	var fast: {
		saveToken: ({ token: String, expiry: String, mechanism: String })->Promise<Any>
	};
}

@:js.import("@xmpp/jid", "jid")
extern class XmppJsJID {
	function new(jid:String);
	function toString():String;

	var local(default, set):String;
	var domain(default, set):String;
	var resource(default, set):String;
}

@:js.import(@default "@xmpp/debug")
extern class XmppJsDebug {
	@:selfCall
	function new(client:XmppJsClient, force:Bool):Void;
}

@:js.import(@default "@xmpp/xml")
extern class XmppJsXml {
	@:selfCall
	@:overload(function(tagName:String, ?attr:Dynamic):XmppJsXml { })
	function new();
	@:overload(function(textContent:String):Void { })
	function append(el:XmppJsXml):Void;
	function toString():String;

	var name:String;
	var attrs:Dynamic;
	var children:Array<Dynamic>;
}

@:js.import(@star "ltx") // The default XML library used by xmpp.js
extern class XmppJsLtx {
	static function isNode(el:Dynamic):Bool;
	static function isElement(el:Dynamic):Bool;
	static function isText(el:Dynamic):Bool;
	static function parse(input:String):XmppJsXml;
}

@:js.import(@default "@xmpp/id")
extern class XmppJsId {
	@:selfCall
	static function id():String;
}

@:js.import(@default "@xmpp/error")
extern class XmppJsError {
	public final name: String;
	public final condition: String;
	public final text: String;
	public final application: String;
}

class XmppJsStream extends GenericStream {
	private var client:XmppJsClient;
	private var jid:XmppJsJID;
	private var debug = true;
	private var state:FSM;
	private var pending:Array<XmppJsXml> = [];
	private var pendingOnIq:Array<{type:IqRequestType,tag:String,xmlns:String,handler:(Stanza)->IqResult}> = [];
	private var initialSM: Null<BytesData> = null;
	private var resumed = false;
	private var everConnected = false;

	override public function new() {
		super();
		state = new FSM({
			transitions: [
				{ name: "connect-requested", from: ["offline"], to: "connecting" },
				{ name: "connection-success", from: ["connecting"], to: "online" },
				{ name: "connection-error", from: ["connecting"], to: "offline" },
				{ name: "connection-closed", from: ["connecting", "online"], to: "offline" },
			],
			state_handlers: [
				"online" => this.onOnline,
				"offline" => this.onOffline,
			],
			transition_handlers: [
				"connection-error" => this.onError,
			],
		}, "offline");
	}

	public function connect(jidS:String, sm:Null<BytesData>) {
		this.state.event("connect-requested");
		this.jid = new XmppJsJID(jidS);
		this.initialSM = sm;

		final waitForCreds = new js.lib.Promise((resolve, reject) -> {
			this.on("auth/password", (event: Dynamic) -> {
				if (event.username == null) event.username = jid.local;
				resolve(event);
				return EventHandled;
			});
		});

		final clientId = jid.resource;
		final xmpp = new XmppJsClient({
			service: jid.domain,
			resource: jid.resource,
			credentials: (callback, mechanisms: Dynamic, fast: Null<{mechanism: String}>) -> {
				everConnected = true;
				this.clientId = Std.is(mechanisms, Array) ? clientId : null;
				final mechs: Array<{name: String, canFast: Bool, canOther: Bool}> =
					(fast == null ? [] : [{ name: fast.mechanism, canFast: true, canOther: false }]).concat(
						(Std.is(mechanisms, Array) ? mechanisms : [mechanisms]).map((m: String) -> { name: m, canFast: false, canOther: true })
					);
				final mech = mechs.find((m) -> m.canOther)?.name;
				this.trigger("auth/password-needed", { mechanisms: mechs });
				return waitForCreds.then((creds) -> {
					creds.username = jid.local;
					// xmpp.js doesn't support fastCount for now, and expects the cred to be called token when using FAST
					if (creds.fastCount != null) {
						try {
							creds = { username: jid.local, token: Json.parse(creds.password), mechanism: null };
						} catch (e) {
							// JSON parse error, so just proceed and let auth fail
							// token of empty string causes exceptions so don't do that
							creds = { password: null, fastCount: null, username: jid.local, token: { token: "fail", mechanism: creds.mechanism }, mechanism: null };
						}
					}
					return callback(creds, creds.mechanism ?? mech, new XmppJsXml("user-agent", { id: clientId }));
				});
			}
		});
		new XmppJsScramSha1(xmpp.saslFactory);
		xmpp.jid = this.jid;

		xmpp.streamFeatures.use("csi", "urn:xmpp:csi:0", (ctx, next, feature) -> {
			csi = true;
			return next();
		});

		if(this.debug) {
			new XmppJsDebug(xmpp, true);
		}

		if (initialSM != null) {
			final parsedSM = Json.parse(Bytes.ofData(initialSM).toString());
			final parsedPending: Null<Array<String>> = parsedSM.pending;
			if (parsedPending != null) {
				for (item in parsedPending) {
					pending.push(XmppJsLtx.parse(item));
				}
			}
			xmpp.streamManagement.id = parsedSM.id;
			xmpp.streamManagement.outbound = parsedSM.outbound;
			xmpp.streamManagement.inbound = parsedSM.inbound;
			xmpp.streamManagement.outbound_q = (parsedSM.outbound_q ?? []).map(
				item -> { stanza: XmppJsLtx.parse(item.stanza), stamp: item.stamp }
			);
			initialSM = null;
		}

		this.client = xmpp;
		processPendingOnIq();

		xmpp.on("online", function (jid) {
			resumed = false;
			this.jid = jid;
			this.state.event("connection-success");
		});

		xmpp.on("offline", function (data) {
			this.state.event("connection-closed");
		});

		xmpp.streamManagement.on("resumed", (_) -> {
			resumed = true;
			if (xmpp.jid == null) {
				xmpp.jid = this.jid;
			} else {
				this.jid = xmpp.jid;
			}
			this.state.event("connection-success");
		});

		xmpp.on("stanza", function (stanza) {
			triggerSMupdate();
			this.onStanza(convertToStanza(stanza));
		});

		xmpp.streamManagement.on("ack", (stanza) -> {
			if (stanza?.name == "message" && stanza?.attrs?.id != null) this.trigger("sm/ack", { id: stanza.attrs.id });
			triggerSMupdate();
		});

		xmpp.streamManagement.on("fail", (stanza) -> {
			if (stanza.name == "message" && stanza.attrs.id != null) this.trigger("sm/fail", { id: stanza.attrs.id });
			triggerSMupdate();
		});

		xmpp.fast.saveToken = (token) -> {
			token.token = Json.stringify(token);
			this.trigger("fast-token", token);
			return Promise.resolve(null);
		};

		xmpp.on("status", (status) -> {
			if (status == "disconnect") {
				if (this.state.can("connection-closed")) this.state.event("connection-closed");
			} else if(status == "connecting") {
				if (this.state.can("connect-requested")) this.state.event("connect-requested");
			}
		});

		resumed = false;
		xmpp.start().catchError(function (err) {
			if (this.state.can("connection-error")) this.state.event("connection-error");
			final xmppError = Std.downcast(err, XmppJsError);
			if (xmppError?.name == "SASLError") {
				this.trigger("auth/fail", xmppError);
			} else {
				trace(err);
			}
		});
	}

	public function disconnect() {
		if (client == null) return;
		client.stop();
	}

	public static function convertFromStanza(el:Stanza):XmppJsXml {
		var xml = new XmppJsXml(el.name, el.attr);
		if(el.children.length > 0) {
			for(child in el.children) {
				switch(child) {
					case Element(stanza): xml.append(convertFromStanza(stanza));
					case CData(text): xml.append(text.content);
				};
			}
		}
		return xml;
	}

	private static function convertToStanza(el:XmppJsXml):Stanza {
		var stanza = new Stanza(el.name, el.attrs);
		for (child in el.children) {
			if(XmppJsLtx.isText(child)) {
				stanza.text(cast(child, String));
			} else {
				stanza.addChild(convertToStanza(child));
			}
		}
		return stanza;
	}

	public static function parseStanza(input:String):Stanza {
		return convertToStanza(XmppJsLtx.parse(input));
	}

	public function sendStanza(stanza:Stanza) {
		if (client == null || client.status != "online") {
			pending.push(convertFromStanza(stanza));
		} else {
			client.send(convertFromStanza(stanza));
		}
		triggerSMupdate();
	}

	public function newId():String {
		return XmppJsId.id();
	}

	private function triggerSMupdate() {
		if (client == null || !client.streamManagement.enabled || !emitSMupdates) return;
		this.trigger(
			"sm/update",
			{
				sm: bytesOfString(Json.stringify({
					id: client.streamManagement.id,
					outbound: client.streamManagement.outbound,
					inbound: client.streamManagement.inbound,
					outbound_q: (client.streamManagement.outbound_q ?? []).map((item) -> { stanza: item.stanza.toString(), stamp: item.stamp }),
					pending: pending.map((stanza) -> stanza.toString())
				})).getData()
			}
		);
	}

	private function fromIqResult(result: IqResult): Any {
		switch (result) {
		case IqResultElement(el): return convertFromStanza(el);
		case IqResult: return true;
		case IqNoResult: return false;
		}
	}

	public function onIq(type:IqRequestType, tag:String, xmlns:String, handler:(Stanza)->IqResult) {
		if (client == null) {
			pendingOnIq.push({ type: type, tag: tag, xmlns: xmlns, handler: handler });
		} else {
			switch (type) {
			case Get:
				client.iqCallee.get(xmlns, tag, (el) -> fromIqResult(handler(convertToStanza(el.stanza))));
			case Set:
				client.iqCallee.set(xmlns, tag, (el) -> fromIqResult(handler(convertToStanza(el.stanza))));
			}
		}
	}

	private function processPendingOnIq() {
		var item;
		while ((item = pendingOnIq.shift()) != null) {
			onIq(item.type, item.tag, item.xmlns, item.handler);
		}
	}

	/* State handlers */

	private function onOnline(event) {
		everConnected = true;
		var item;
		while ((item = pending.shift()) != null) {
			client.send(item);
		}
		triggerSMupdate();
		trigger("status/online", { jid: jid.toString(), resumed: resumed });
	}

	private function onOffline(event) {
		trigger("status/offline", {});
	}

	private function onError(event) {
		if (!everConnected) trigger("status/error", {});
		// If everConnected then we are retrying so not fatal
		return true;
	}
}