git » sdk » commit 5e0f193

Move timer to the main thread

author Stephen Paul Weber
2024-10-17 03:56:49 UTC
committer Stephen Paul Weber
2024-10-17 03:56:49 UTC
parent 805e6643f7b76eb76c724226757781520d7f7d25

Move timer to the main thread

That way it can't get blocked by the PCM thread being slow, which would
defeat the purpose of having the timer.

snikket/jingle/PeerConnection.cpp.hx +35 -15

diff --git a/snikket/jingle/PeerConnection.cpp.hx b/snikket/jingle/PeerConnection.cpp.hx
index 01b13f4..9536b41 100644
--- a/snikket/jingle/PeerConnection.cpp.hx
+++ b/snikket/jingle/PeerConnection.cpp.hx
@@ -349,6 +349,7 @@ class MediaStreamTrack {
 	private var alive = true;
 	private var waitForQ = false;
 	private var bufferSizeInSeconds = 0.0;
+	private var mutex = new sys.thread.Mutex();
 
 	@:allow(snikket)
 	private var media(get, default): StdOptional<DescriptionMedia>;
@@ -366,23 +367,31 @@ class MediaStreamTrack {
 	@:allow(snikket)
 	private function new() {
 		eventLoop = sys.thread.Thread.createWithEventLoop(() -> {
-			timer = new haxe.Timer(10); // This timer will stop when the audioloop for this track stops
-			timer.run = () -> {
-				if (untyped __cpp__("!_gthis->track")) return;
-				if (!alive || !track.ref.isOpen()) return;
-
-				if (audioQ.length > 0 && audioQ[audioQ.length - 1].stamp <= haxe.Timer.stamp()) {
-					final packet = audioQ.pop();
-					write(packet.payload, packet.payloadType, packet.clockRate);
-					advanceTimestamp(Std.int(packet.payload.length / packet.channels));
-				}
-				if (waitForQ && audioQ.length < (50+50*bufferSizeInSeconds)) {
-					waitForQ = false;
-					notifyReadyForData(false);
-				}
-			};
 			while(alive) { sys.thread.Thread.processEvents(); sys.thread.Thread.current().events.wait(); }
 		}).events;
+
+		timer = new haxe.Timer(10);
+		timer.run = () -> {
+			mutex.acquire();
+			if (untyped __cpp__("!_gthis->track")) {
+				mutex.release();
+				return;
+			}
+			if (!alive || !track.ref.isOpen()) {
+				mutex.release();
+				return;
+			}
+			if (audioQ.length > 0 && audioQ[audioQ.length - 1].stamp <= haxe.Timer.stamp()) {
+				final packet = audioQ.pop();
+				write(packet.payload, packet.payloadType, packet.clockRate);
+				advanceTimestamp(Std.int(packet.payload.length / packet.channels));
+			}
+			if (waitForQ && audioQ.length < (50+50*bufferSizeInSeconds)) {
+				waitForQ = false;
+				notifyReadyForData(false);
+			}
+			mutex.release();
+		};
 	}
 
 	private function get_media() {
@@ -520,7 +529,9 @@ class MediaStreamTrack {
 		if (readyForPCMCallback != null) {
 			eventLoop.run(() -> {
 				if (audioQ.length > (50+50*bufferSizeInSeconds)) {
+					mutex.acquire();
 					waitForQ = true;
+					mutex.release();
 				} else {
 					readyForPCMCallback();
 				}
@@ -540,15 +551,19 @@ class MediaStreamTrack {
 		final format = Lambda.find(supportedAudioFormats, format -> format.clockRate == clockRate && format.channels == channels);
 		if (format == null) throw "Unsupported audo format: " + clockRate + "/" + channels;
 		eventLoop.run(() -> {
+			mutex.acquire();
 			final stamp = if (audioQ.length < 1) {
 				bufferSizeInSeconds = Math.max(bufferSizeInSeconds, bufferSizeInSeconds + 0.1);
 				haxe.Timer.stamp() + bufferSizeInSeconds;
 			} else {
 				audioQ[0].stamp + (pcm.length / (clockRate / 1000)) / 1000.0;
 			}
+			mutex.release();
 			if (format.format == "PCMU") {
 				final packet = { channels: channels, payloadType: format.payloadType, clockRate: clockRate, payload: pcm.map(pcmToUlaw), stamp: stamp };
+				mutex.acquire();
 				audioQ.unshift(packet);
+				mutex.release();
 			} else if (format.format == "opus") {
 				if (untyped __cpp__("!{0}", opusEncoder)) {
 					opusEncoder = OpusEncoder.create(clockRate, channels, untyped __cpp__("OPUS_APPLICATION_VOIP"), null); // assume only one opus clockRate+channels for this track
@@ -560,7 +575,9 @@ class MediaStreamTrack {
 				final encoded = OpusEncoder.encode(opusEncoder, cpp.Pointer.ofArray(pcm), Std.int(pcm.length / channels), cpp.Pointer.ofArray(rawOpus), rawOpus.length);
 				rawOpus.resize(encoded);
 				final packet = { channels: channels, payloadType: format.payloadType, clockRate: clockRate, payload: rawOpus, stamp: stamp };
+				mutex.acquire();
 				audioQ.unshift(packet);
+				mutex.release();
 			} else {
 				trace("Ignoring audio meant to go out as", format.format, format.clockRate, format.channels);
 			}
@@ -590,12 +607,15 @@ class MediaStreamTrack {
 	}
 
 	public function stop() {
+		timer.stop();
+		mutex.acquire();
 		alive = false;
 		if (track.ref.isOpen()) track.ref.close();
 		if (untyped __cpp__("opus")) {
 			OpusDecoder.destroy(opus);
 			opus = null;
 		}
+		mutex.release();
 	}
 }