git » sdk » commit 6bd4e8b

Queue outgoing audio packets

author Stephen Paul Weber
2024-09-21 02:00:51 UTC
committer Stephen Paul Weber
2024-09-24 03:01:46 UTC
parent 0f601cdcaa2ca8ca2f9cfa7d366b2168e64c7144

Queue outgoing audio packets

With RTP, sending too fast or too slow can cause playback issues for the
receiver. So we queue up a bit (start at a 100ms buffer and growing to a
max of 5000ms depending on if we ever run out) and send it out at a
constant rate with a timer, to smooth over any timing bumps in the
production of any single packet.

If packets come too fast we stop asking for them until the queue shrinks
again.

If some packets come too slow we keep sending out of the queue so long
as we have any. If every single packet takes more than its length to
produce you'll be forever behind and get weird lag issues still.

snikket/jingle/PeerConnection.cpp.hx +36 -5

diff --git a/snikket/jingle/PeerConnection.cpp.hx b/snikket/jingle/PeerConnection.cpp.hx
index 7849222..45e950a 100644
--- a/snikket/jingle/PeerConnection.cpp.hx
+++ b/snikket/jingle/PeerConnection.cpp.hx
@@ -344,7 +344,11 @@ class MediaStreamTrack {
 	private var opusEncoder: cpp.Struct<OpusEncoder>;
 	private var rtpPacketizationConfig: SharedPtr<RtpPacketizationConfig>;
 	private final eventLoop: sys.thread.EventLoop;
+	private var timer: haxe.Timer;
+	private var audioQ: Array<{stamp: Float, channels: Int, payloadType: cpp.UInt8, clockRate: Int, payload: Array<cpp.UInt8>}> = [];
 	private var alive = true;
+	private var waitForQ = false;
+	private var bufferSizeInSeconds = 0.0;
 
 	@:allow(snikket)
 	private var media(get, default): StdOptional<DescriptionMedia>;
@@ -361,7 +365,21 @@ class MediaStreamTrack {
 
 	@:allow(snikket)
 	private function new() {
-		eventLoop = sys.thread.Thread.createWithEventLoop(() -> while(alive) { sys.thread.Thread.processEvents(); sys.thread.Thread.current().events.wait(); }).events;
+		eventLoop = sys.thread.Thread.createWithEventLoop(() -> {
+			timer = new haxe.Timer(10); // This timer will stop when the audioloop for this track stops
+			timer.run = () -> {
+				if (audioQ.length > 0 && audioQ[audioQ.length - 1].stamp <= haxe.Timer.stamp()) {
+					final packet = audioQ.pop();
+					write(packet.payload, packet.payloadType, packet.clockRate);
+					advanceTimestamp(Std.int(packet.payload.length / packet.channels));
+				}
+				if (waitForQ && audioQ.length < (50+50*bufferSizeInSeconds)) {
+					waitForQ = false;
+					notifyReadyForData(false);
+				}
+			};
+			while(alive) { sys.thread.Thread.processEvents(); sys.thread.Thread.current().events.wait(); }
+		}).events;
 	}
 
 	private function get_media() {
@@ -497,7 +515,13 @@ class MediaStreamTrack {
 	private function notifyReadyForData(fromCPP: Bool) {
 		untyped __cpp__("if (fromCPP) { int base = 0; hx::SetTopOfStack(&base, true); }"); // allow running haxe code on foreign thread
 		if (readyForPCMCallback != null) {
-			eventLoop.run(() -> readyForPCMCallback());
+			eventLoop.run(() -> {
+				if (audioQ.length > (50+50*bufferSizeInSeconds)) {
+					waitForQ = true;
+				} else {
+					readyForPCMCallback();
+				}
+			});
 		}
 		untyped __cpp__("if (fromCPP) { hx::SetTopOfStack((int*)0, true); }"); // unregister with GC
 	}
@@ -514,8 +538,15 @@ class MediaStreamTrack {
 		if (format == null) throw "Unsupported audo format: " + clockRate + "/" + channels;
 		eventLoop.run(() -> {
 			if (track.ref.isClosed()) return;
+			final stamp = if (audioQ.length < 1) {
+				bufferSizeInSeconds = Math.max(bufferSizeInSeconds, bufferSizeInSeconds + 0.1);
+				haxe.Timer.stamp() + bufferSizeInSeconds;
+			} else {
+				audioQ[0].stamp + (pcm.length / (clockRate / 1000)) / 1000.0;
+			}
 			if (format.format == "PCMU") {
-				write(pcm.map(pcmToUlaw), format.payloadType, clockRate);
+				final packet = { channels: channels, payloadType: format.payloadType, clockRate: clockRate, payload: pcm.map(pcmToUlaw), stamp: stamp };
+				audioQ.unshift(packet);
 			} else if (format.format == "opus") {
 				if (untyped __cpp__("!{0}", opusEncoder)) {
 					opusEncoder = OpusEncoder.create(clockRate, channels, untyped __cpp__("OPUS_APPLICATION_VOIP"), null); // assume only one opus clockRate+channels for this track
@@ -526,11 +557,11 @@ class MediaStreamTrack {
 				final rawOpus = new haxe.ds.Vector(pcm.length * 2).toData(); // Shoudn't be bigger than the input
 				final encoded = OpusEncoder.encode(opusEncoder, cpp.Pointer.ofArray(pcm), Std.int(pcm.length / channels), cpp.Pointer.ofArray(rawOpus), rawOpus.length);
 				rawOpus.resize(encoded);
-				write(rawOpus, format.payloadType, clockRate);
+				final packet = { channels: channels, payloadType: format.payloadType, clockRate: clockRate, payload: rawOpus, stamp: stamp };
+				audioQ.unshift(packet);
 			} else {
 				trace("Ignoring audio meant to go out as", format.format, format.clockRate, format.channels);
 			}
-			advanceTimestamp(Std.int(pcm.length / channels));
 			notifyReadyForData(false);
 		});
 	}