| author | Stephen Paul Weber
<singpolyma@singpolyma.net> 2024-09-21 02:00:51 UTC |
| committer | Stephen Paul Weber
<singpolyma@singpolyma.net> 2024-09-24 03:01:46 UTC |
| parent | 0f601cdcaa2ca8ca2f9cfa7d366b2168e64c7144 |
| snikket/jingle/PeerConnection.cpp.hx | +36 | -5 |
diff --git a/snikket/jingle/PeerConnection.cpp.hx b/snikket/jingle/PeerConnection.cpp.hx index 7849222..45e950a 100644 --- a/snikket/jingle/PeerConnection.cpp.hx +++ b/snikket/jingle/PeerConnection.cpp.hx @@ -344,7 +344,11 @@ class MediaStreamTrack { private var opusEncoder: cpp.Struct<OpusEncoder>; private var rtpPacketizationConfig: SharedPtr<RtpPacketizationConfig>; private final eventLoop: sys.thread.EventLoop; + private var timer: haxe.Timer; + private var audioQ: Array<{stamp: Float, channels: Int, payloadType: cpp.UInt8, clockRate: Int, payload: Array<cpp.UInt8>}> = []; private var alive = true; + private var waitForQ = false; + private var bufferSizeInSeconds = 0.0; @:allow(snikket) private var media(get, default): StdOptional<DescriptionMedia>; @@ -361,7 +365,21 @@ class MediaStreamTrack { @:allow(snikket) private function new() { - eventLoop = sys.thread.Thread.createWithEventLoop(() -> while(alive) { sys.thread.Thread.processEvents(); sys.thread.Thread.current().events.wait(); }).events; + eventLoop = sys.thread.Thread.createWithEventLoop(() -> { + timer = new haxe.Timer(10); // This timer will stop when the audioloop for this track stops + timer.run = () -> { + if (audioQ.length > 0 && audioQ[audioQ.length - 1].stamp <= haxe.Timer.stamp()) { + final packet = audioQ.pop(); + write(packet.payload, packet.payloadType, packet.clockRate); + advanceTimestamp(Std.int(packet.payload.length / packet.channels)); + } + if (waitForQ && audioQ.length < (50+50*bufferSizeInSeconds)) { + waitForQ = false; + notifyReadyForData(false); + } + }; + while(alive) { sys.thread.Thread.processEvents(); sys.thread.Thread.current().events.wait(); } + }).events; } private function get_media() { @@ -497,7 +515,13 @@ class MediaStreamTrack { private function notifyReadyForData(fromCPP: Bool) { untyped __cpp__("if (fromCPP) { int base = 0; hx::SetTopOfStack(&base, true); }"); // allow running haxe code on foreign thread if (readyForPCMCallback != null) { - eventLoop.run(() -> readyForPCMCallback()); + eventLoop.run(() -> { + if (audioQ.length > (50+50*bufferSizeInSeconds)) { + waitForQ = true; + } else { + readyForPCMCallback(); + } + }); } untyped __cpp__("if (fromCPP) { hx::SetTopOfStack((int*)0, true); }"); // unregister with GC } @@ -514,8 +538,15 @@ class MediaStreamTrack { if (format == null) throw "Unsupported audo format: " + clockRate + "/" + channels; eventLoop.run(() -> { if (track.ref.isClosed()) return; + final stamp = if (audioQ.length < 1) { + bufferSizeInSeconds = Math.max(bufferSizeInSeconds, bufferSizeInSeconds + 0.1); + haxe.Timer.stamp() + bufferSizeInSeconds; + } else { + audioQ[0].stamp + (pcm.length / (clockRate / 1000)) / 1000.0; + } if (format.format == "PCMU") { - write(pcm.map(pcmToUlaw), format.payloadType, clockRate); + final packet = { channels: channels, payloadType: format.payloadType, clockRate: clockRate, payload: pcm.map(pcmToUlaw), stamp: stamp }; + audioQ.unshift(packet); } else if (format.format == "opus") { if (untyped __cpp__("!{0}", opusEncoder)) { opusEncoder = OpusEncoder.create(clockRate, channels, untyped __cpp__("OPUS_APPLICATION_VOIP"), null); // assume only one opus clockRate+channels for this track @@ -526,11 +557,11 @@ class MediaStreamTrack { final rawOpus = new haxe.ds.Vector(pcm.length * 2).toData(); // Shoudn't be bigger than the input final encoded = OpusEncoder.encode(opusEncoder, cpp.Pointer.ofArray(pcm), Std.int(pcm.length / channels), cpp.Pointer.ofArray(rawOpus), rawOpus.length); rawOpus.resize(encoded); - write(rawOpus, format.payloadType, clockRate); + final packet = { channels: channels, payloadType: format.payloadType, clockRate: clockRate, payload: rawOpus, stamp: stamp }; + audioQ.unshift(packet); } else { trace("Ignoring audio meant to go out as", format.format, format.clockRate, format.channels); } - advanceTimestamp(Std.int(pcm.length / channels)); notifyReadyForData(false); }); }