aiortc: Recording video on server discards beyond a few seconds

33 views Asked by At

I made changes to the server example to save video on server.

I am trying to record the received video track on server - but somehow that seems to be working only for a few seconds, after which it won't record further - and I get a video file which is just 3-7 seconds long, although the webcam stream stays on for much longer. Recording on the audio track works just fine.

I'm seeing some warning and guess that may be causing this anomaly -

[libx264 @ 0x10c310600] non-strictly-monotonic PTS
[mp4 @ 0x12e05bca0] Application provided invalid, non monotonically increasing dts to muxer in stream 0: 50176 >= 50176

The server.py has been modified as follows:

import argparse
import asyncio
import json
import logging
import os
import ssl
import uuid

from aiohttp import web
from aiortc import MediaStreamTrack, RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaBlackhole, MediaPlayer, MediaRecorder, MediaRelay
from aiortc.mediastreams import MediaStreamError
from av import AudioFrame
import asyncio
import fractions
import time
from av.frame import Frame


AUDIO_PTIME = 0.020  # 20ms audio packetization
VIDEO_CLOCK_RATE = 90000
VIDEO_PTIME = 1 / 30  # 30fps
VIDEO_TIME_BASE = fractions.Fraction(1, VIDEO_CLOCK_RATE)

ROOT = os.path.dirname(__file__)
logger = logging.getLogger("pc")
pcs = set()
relay = MediaRelay()

class EmptyAudioTrack(MediaStreamTrack):
    """
    A dummy audio track which reads silence.
    """

    kind = "audio"

    _start: float
    _timestamp: int

    async def recv(self) -> Frame:
        """
        Receive the next :class:`~av.audio.frame.AudioFrame`.

        The base implementation just reads silence, subclass
        :class:`AudioStreamTrack` to provide a useful implementation.
        """
        if self.readyState != "live":
            raise MediaStreamError

        sample_rate = 8000
        samples = int(AUDIO_PTIME * sample_rate)

        if hasattr(self, "_timestamp"):
            self._timestamp += samples
            wait = self._start + (self._timestamp / sample_rate) - time.time()
            await asyncio.sleep(wait)
        else:
            self._start = time.time()
            self._timestamp = 0

        frame = AudioFrame(format="s16", layout="mono", samples=samples)
        for p in frame.planes:
            p.update(bytes(p.buffer_size))
        frame.pts = self._timestamp
        frame.sample_rate = sample_rate
        frame.time_base = fractions.Fraction(1, sample_rate)
        return frame

async def index(request):
    content = open(os.path.join(ROOT, "index.html"), "r").read()
    return web.Response(content_type="text/html", text=content)


async def javascript(request):
    content = open(os.path.join(ROOT, "client.js"), "r").read()
    return web.Response(content_type="application/javascript", text=content)


async def offer(request):
    params = await request.json()
    offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])

    pc = RTCPeerConnection()
    pc_id = "PeerConnection(%s)" % uuid.uuid4()
    pcs.add(pc)

    # Audio Recorder for incoming audio track
    audio_rec = None

    # Video Recorder for incoming video track
    video_rec = None

    # All the incoming audio tracks
    incoming_audio_tracks = [] 

    def log_info(msg, *args):
        logger.info(pc_id + " " + msg, *args)

    log_info("Created for %s", request.remote)

    #if args.record_to:
    if True: 
        # Record the incoming track stream to wav file locally
        audio_rec = MediaRecorder(os.path.join(ROOT, "recording-step-1.mp3"))
    else:
        audio_rec = MediaBlackhole()

    # Initialise mediarecorder
    video_rec = MediaRecorder(
        os.path.join(ROOT, "video-recording.mp4"))

    @pc.on("datachannel")
    async def on_datachannel(channel):
        @channel.on("message")
        async def on_message(message):
            nonlocal audio_rec
            nonlocal video_rec
            print(f'Got message on datachannel {message}')
            if isinstance(message, str) and message.startswith("ping"):
                print("Got ping")
                # Uncomment below to see back-and-forth communication of "ping" and "pong"
                #channel.send("pong" + message[4:])
            elif isinstance(message, str) and message.startswith("trackended"):
                """
                This is received when 'Stop' is clicked on the browser.
                
                The following is current state:
                (a) Incoming track (Browser -> Server) was sending microphone audio stream
                    This stream is getting 'recorded' on server to a mp3 file (recording-step-1.mp3)
                (b) Outgoing track (Server -> Browser) is sending a silence stream
                
                The state should now change to:
                (a) Existing recording of incoming stream should be stopped
                    The recording-step-1.mp3 file is now ready
                (b) The incoming stream is still sending the audio from the microphone
                    We need to 'purge' this stream (via mediablackhole), as we don't need it temporarily, because 
                    the Browser will now play the audio that server will stream (demo-instruct.wav) - see point c
                (c) The outgoing track which was sending silence now needs to start streaming 'demo-instruct.wav)

                """
                # stop the recording, and add player audio
                await audio_rec.stop()

                # Now start discarding the incoming track 
                audio_rec = MediaBlackhole()
                for track in incoming_audio_tracks:
                    audio_rec.addTrack(track)
                await audio_rec.start()

                # Find the sender associated with the outgoing track - where we're sending silence
                silence_sender = next((sender for sender in pc.getSenders() if isinstance(sender.track, EmptyAudioTrack)), None)

                if silence_sender:
                    # Start sending 'demo-instruct.wav' stream now on sender track by replacing track
                    # from silence to mediaplayer which streams wav file
                    # create a MediaPlayer with the WAV file
                    player = MediaPlayer(os.path.join(ROOT, "demo-instruct.wav"))
                    # Replace the silence track with the actual audio track
                    silence_sender.replaceTrack(player.audio)
            elif isinstance(message, str) and message.startswith("trackstart"):
                """
                This is received when 'Restart' is clicked on the browser.

                The following is the current state:
                (a) Incoming track stream is still sending the client microphone audio 
                    which is going into blackhole recorder and being purged/ignored
                (b) Outgoing track stream is sending demo-instruct.wav

                The state should change to:
                (a) Incoming track stream should get saved into recording-step-2.mp3
                (b) Outgoing track stream should start sending silence audio stream (aka empty audio track)
                """
                # Stop the sending of MediaPlayer stream back to EmptyTrack
                empty_track = EmptyAudioTrack()
                mediaplayer_sender = next((sender for sender in pc.getSenders() if isinstance(sender.track, MediaStreamTrack)), None)
                # Replace the actual audio track with empty track
                mediaplayer_sender.replaceTrack(empty_track)

                # Stop the previous recorder - which pointed to black hole recorder
                await audio_rec.stop()

                # start the recording, and add player audio
                new_recorder = MediaRecorder(os.path.join(ROOT, "recording-step-2.mp3"))
                for track in incoming_audio_tracks:
                    new_recorder.addTrack(track)
                audio_rec = new_recorder
                await audio_rec.start()
                

    @pc.on("connectionstatechange")
    async def on_connectionstatechange():
        log_info("Connection state is %s", pc.connectionState)
        if pc.connectionState == "failed":
            await pc.close()    
            pcs.discard(pc)

    @pc.on("track")
    def on_track(track):
        log_info("Track %s received", track.kind)

        if track.kind == "audio":
            incoming_audio_tracks.append(track)
            
            # Start to send silence empty audio in outgoing track stream
            empty_track = EmptyAudioTrack()
            pc.addTrack(empty_track)
            # Start recording incoming microphone audio stream into recording wav file
            audio_rec.addTrack(track)

        elif track.kind == "video":
            #if args.record_to:
            if True:
                video_rec.addTrack(relay.subscribe(track))

        @track.on("ended")
        async def on_ended():
            log_info("Track %s ended", track.kind)
            if audio_rec:
                await audio_rec.stop()
            if video_rec:
                await video_rec.stop()

    # handle offer
    await pc.setRemoteDescription(offer)
    if audio_rec:
        await audio_rec.start()
    if video_rec:
        await video_rec.start()

    # send answer
    answer = await pc.createAnswer()
    await pc.setLocalDescription(answer)

    return web.Response(
        content_type="application/json",
        text=json.dumps(
            {"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}
        ),
    )


async def on_shutdown(app):
    # close peer connections
    coros = [pc.close() for pc in pcs]
    await asyncio.gather(*coros)
    pcs.clear()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="WebRTC audio / video / data-channels demo"
    )
    parser.add_argument("--cert-file", help="SSL certificate file (for HTTPS)")
    parser.add_argument("--key-file", help="SSL key file (for HTTPS)")
    parser.add_argument(
        "--host", default="0.0.0.0", help="Host for HTTP server (default: 0.0.0.0)"
    )
    parser.add_argument(
        "--port", type=int, default=8080, help="Port for HTTP server (default: 8080)"
    )
    parser.add_argument("--record-to", help="Write received media to a file.")
    parser.add_argument("--verbose", "-v", action="count")
    args = parser.parse_args()

    if args.verbose:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)

    if args.cert_file:
        ssl_context = ssl.SSLContext()
        ssl_context.load_cert_chain(args.cert_file, args.key_file)
    else:
        ssl_context = None

    app = web.Application()
    app.on_shutdown.append(on_shutdown)
    app.router.add_get("/", index)
    app.router.add_get("/client.js", javascript)
    app.router.add_post("/offer", offer)
    web.run_app(
        app, access_log=None, host=args.host, port=args.port, ssl_context=ssl_context
    )

The client is like so:

// get DOM elements
var dataChannelLog = document.getElementById('data-channel'),
    iceConnectionLog = document.getElementById('ice-connection-state'),
    iceGatheringLog = document.getElementById('ice-gathering-state'),
    signalingLog = document.getElementById('signaling-state');

// peer connection
var pc = null;

// data channel
var dc = null, dcInterval = null;

function createPeerConnection() {
    var config = {
        sdpSemantics: 'unified-plan'
    };

    if (document.getElementById('use-stun').checked) {
        config.iceServers = [{ urls: ['stun:stun.l.google.com:19302'] }];
    }

    pc = new RTCPeerConnection(config);

    // register some listeners to help debugging
    pc.addEventListener('icegatheringstatechange', () => {
        iceGatheringLog.textContent += ' -> ' + pc.iceGatheringState;
    }, false);
    iceGatheringLog.textContent = pc.iceGatheringState;

    pc.addEventListener('iceconnectionstatechange', () => {
        iceConnectionLog.textContent += ' -> ' + pc.iceConnectionState;
    }, false);
    iceConnectionLog.textContent = pc.iceConnectionState;

    pc.addEventListener('signalingstatechange', () => {
        signalingLog.textContent += ' -> ' + pc.signalingState;
    }, false);
    signalingLog.textContent = pc.signalingState;

    // connect audio / video
    pc.addEventListener('track', (evt) => {
        console.log("Came in event listener for track event");
        if (evt.track.kind == 'audio') {
            document.getElementById('audio').srcObject = evt.streams[0];
            console.log("Added remote audio stream");
        }
    });

    return pc;
}

function enumerateInputDevices() {
    const populateSelect = (select, devices) => {
        let counter = 1;
        devices.forEach((device) => {
            const option = document.createElement('option');
            option.value = device.deviceId;
            option.text = device.label || ('Device #' + counter);
            select.appendChild(option);
            counter += 1;
        });
    };

    navigator.mediaDevices.enumerateDevices().then((devices) => {
        populateSelect(
            document.getElementById('audio-input'),
            devices.filter((device) => device.kind == 'audioinput')
        );
        populateSelect(
            document.getElementById('video-input'),
            devices.filter((device) => device.kind == 'videoinput')
        );
    }).catch((e) => {
        alert(e);
    });
}

function negotiate() {
    return pc.createOffer().then((offer) => {
        return pc.setLocalDescription(offer);
    }).then(() => {
        // wait for ICE gathering to complete
        return new Promise((resolve) => {
            if (pc.iceGatheringState === 'complete') {
                resolve();
            } else {
                function checkState() {
                    if (pc.iceGatheringState === 'complete') {
                        pc.removeEventListener('icegatheringstatechange', checkState);
                        resolve();
                    }
                }
                pc.addEventListener('icegatheringstatechange', checkState);
            }
        });
    }).then(() => {
        var offer = pc.localDescription;
        var codec;

        codec = document.getElementById('audio-codec').value;
        if (codec !== 'default') {
            offer.sdp = sdpFilterCodec('audio', codec, offer.sdp);
        }

        codec = document.getElementById('video-codec').value;
        if (codec !== 'default') {
            offer.sdp = sdpFilterCodec('video', codec, offer.sdp);
        }

        document.getElementById('offer-sdp').textContent = offer.sdp;
        return fetch('/offer', {
            body: JSON.stringify({
                sdp: offer.sdp,
                type: offer.type,
                video_transform: document.getElementById('video-transform').value
            }),
            headers: {
                'Content-Type': 'application/json'
            },
            method: 'POST'
        });
    }).then((response) => {
        return response.json();
    }).then((answer) => {
        document.getElementById('answer-sdp').textContent = answer.sdp;
        return pc.setRemoteDescription(answer);
    }).catch((e) => {
        alert(e);
    });
}

function start() {
    document.getElementById('start').style.display = 'none';

    pc = createPeerConnection();

    var time_start = null;

    const current_stamp = () => {
        if (time_start === null) {
            time_start = new Date().getTime();
            return 0;
        } else {
            return new Date().getTime() - time_start;
        }
    };

    if (document.getElementById('use-datachannel').checked) {
        var parameters = JSON.parse(document.getElementById('datachannel-parameters').value);

        dc = pc.createDataChannel('chat', parameters);
        dc.addEventListener('close', () => {
            clearInterval(dcInterval);
            dataChannelLog.textContent += '- close\n';
        });
        dc.addEventListener('open', () => {
            dataChannelLog.textContent += '- open\n';
            dcInterval = setInterval(() => {
                var message = 'ping ' + current_stamp();
                dataChannelLog.textContent += '> ' + message + '\n';
                dc.send(message);
            }, 1000);
        });
        dc.addEventListener('message', (evt) => {
            dataChannelLog.textContent += '< ' + evt.data + '\n';

            if (evt.data.substring(0, 4) === 'pong') {
                var elapsed_ms = current_stamp() - parseInt(evt.data.substring(5), 10);
                dataChannelLog.textContent += ' RTT ' + elapsed_ms + ' ms\n';
            }

            if (evt.data.substring(0, 10) === 'trackended') {
                // This is received from python when the audio wav file streamed
                // is completed
                        
            }
        });
    }

    // Build media constraints.

    const constraints = {
        audio: false,
        video: false
    };

    if (document.getElementById('use-audio').checked) {
        const audioConstraints = {};

        const device = document.getElementById('audio-input').value;
        if (device) {
            audioConstraints.deviceId = { exact: device };
        }

        constraints.audio = Object.keys(audioConstraints).length ? audioConstraints : true;
    }

    if (document.getElementById('use-video').checked) {
        const videoConstraints = {};

        const device = document.getElementById('video-input').value;
        if (device) {
            videoConstraints.deviceId = { exact: device };
        }

        const resolution = document.getElementById('video-resolution').value;
        if (resolution) {
            const dimensions = resolution.split('x');
            videoConstraints.width = parseInt(dimensions[0], 0);
            videoConstraints.height = parseInt(dimensions[1], 0);
        }

        constraints.video = Object.keys(videoConstraints).length ? videoConstraints : true;
    }

    // Acquire media and start negociation.

    if (constraints.audio || constraints.video) {
        if (constraints.video) {
            document.getElementById('media').style.display = 'block';
        }
        navigator.mediaDevices.getUserMedia(constraints).then((stream) => {

            document.getElementById('video').srcObject = stream;

            stream.getTracks().forEach((track) => {
                pc.addTrack(track, stream);
            });
            return negotiate();
        }, (err) => {
            alert('Could not acquire media: ' + err);
        });
    } else {
        negotiate();
    }

    document.getElementById('stop').style.display = 'inline-block';
}

function restart() {
    document.getElementById('restart').style.display = 'none';

    // close local audio / video
    pc.getSenders().forEach((sender) => {
        console.log("Stopping a sender");
        //sender.track.dispatchEvent(new Event("ended"));
        //sender.track.stop();
        //sender.track.enabled = false;
    });

    // close data channel
    if (dc) {
        var message = 'trackstart';
        dc.send(message);
        console.log('sending trackstart to datachannel');
        //dc.close();
    }

    document.getElementById('stop').style.display = 'inline-block';
}

function stop() {
    document.getElementById('stop').style.display = 'none';


    // close transceivers
    //if (pc.getTransceivers) {
    //    pc.getTransceivers().forEach((transceiver) => {
    //        if (transceiver.stop) {
    //            transceiver.stop();
    //        }
    //    });
    //}

    // close local audio / video
    pc.getSenders().forEach((sender) => {
        console.log("Stopping a sender");
        //sender.track.dispatchEvent(new Event("ended"));
        //sender.track.stop();
        //sender.track.enabled = false;
    });

    // close data channel
    if (dc) {
        var message = 'trackended';
        dc.send(message);
        console.log('sending trackended to datachannel');
        //dc.close();
    }

    // close peer connection
    /*
    setTimeout(() => {
        // close transceivers
        if (pc.getTransceivers) {
            pc.getTransceivers().forEach((transceiver) => {
                if (transceiver.stop) {
                    transceiver.stop();
                }
            });
    }
        setTimeout(() => {
            pc.close();
        }, 500);
    }, 5000);
    */
    document.getElementById('restart').style.display = 'inline-block';
}

function sdpFilterCodec(kind, codec, realSdp) {
    var allowed = []
    var rtxRegex = new RegExp('a=fmtp:(\\d+) apt=(\\d+)\r$');
    var codecRegex = new RegExp('a=rtpmap:([0-9]+) ' + escapeRegExp(codec))
    var videoRegex = new RegExp('(m=' + kind + ' .*?)( ([0-9]+))*\\s*$')

    var lines = realSdp.split('\n');

    var isKind = false;
    for (var i = 0; i < lines.length; i++) {
        if (lines[i].startsWith('m=' + kind + ' ')) {
            isKind = true;
        } else if (lines[i].startsWith('m=')) {
            isKind = false;
        }

        if (isKind) {
            var match = lines[i].match(codecRegex);
            if (match) {
                allowed.push(parseInt(match[1]));
            }

            match = lines[i].match(rtxRegex);
            if (match && allowed.includes(parseInt(match[2]))) {
                allowed.push(parseInt(match[1]));
            }
        }
    }

    var skipRegex = 'a=(fmtp|rtcp-fb|rtpmap):([0-9]+)';
    var sdp = '';

    isKind = false;
    for (var i = 0; i < lines.length; i++) {
        if (lines[i].startsWith('m=' + kind + ' ')) {
            isKind = true;
        } else if (lines[i].startsWith('m=')) {
            isKind = false;
        }

        if (isKind) {
            var skipMatch = lines[i].match(skipRegex);
            if (skipMatch && !allowed.includes(parseInt(skipMatch[2]))) {
                continue;
            } else if (lines[i].match(videoRegex)) {
                sdp += lines[i].replace(videoRegex, '$1 ' + allowed.join(' ')) + '\n';
            } else {
                sdp += lines[i] + '\n';
            }
        } else {
            sdp += lines[i] + '\n';
        }
    }

    return sdp;
}

function escapeRegExp(string) {
    return string.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); // $& means the whole matched string
}

enumerateInputDevices();

And the index.html:

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8"/>
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>WebRTC demo</title>
    <style>
    button {
        padding: 8px 16px;
    }

    pre {
        overflow-x: hidden;
        overflow-y: auto;
    }

    video {
        width: 100%;
    }

    .option {
        margin-bottom: 8px;
    }

    #media {
        max-width: 1280px;
    }
    </style>
</head>
<body>

<h2>Options</h2>
<div class="option">
    <input id="use-datachannel" checked="checked" type="checkbox"/>
    <label for="use-datachannel">Use datachannel</label>
    <select id="datachannel-parameters">
        <option value='{"ordered": true}'>Ordered, reliable</option>
        <option value='{"ordered": false, "maxRetransmits": 0}'>Unordered, no retransmissions</option>
        <option value='{"ordered": false, "maxPacketLifetime": 500}'>Unordered, 500ms lifetime</option>
    </select>
</div>
<div class="option">
    <input id="use-audio" checked="checked" type="checkbox"/>
    <label for="use-audio">Use audio</label>
    <select id="audio-input">
        <option value="" selected>Default device</option>
    </select>
    <select id="audio-codec">
        <option value="default" selected>Default codecs</option>
        <option value="opus/48000/2">Opus</option>
        <option value="PCMU/8000">PCMU</option>
        <option value="PCMA/8000">PCMA</option>
    </select>
</div>
<div class="option">
    <input id="use-video" type="checkbox"/>
    <label for="use-video">Use video</label>
    <select id="video-input">
        <option value="" selected>Default device</option>
    </select>
    <select id="video-resolution">
        <option value="" selected>Default resolution</option>
        <option value="320x240">320x240</option>
        <option value="640x480">640x480</option>
        <option value="960x540">960x540</option>
        <option value="1280x720">1280x720</option>
    </select>
    <select id="video-transform">
        <option value="none" selected>No transform</option>
        <option value="edges">Edge detection</option>
        <option value="cartoon">Cartoon effect</option>
        <option value="rotate">Rotate</option>
    </select>
    <select id="video-codec">
        <option value="default" selected>Default codecs</option>
        <option value="VP8/90000">VP8</option>
        <option value="H264/90000">H264</option>
    </select>
</div>
<div class="option">
    <input id="use-stun" type="checkbox"/>
    <label for="use-stun">Use STUN server</label>
</div>

<button id="start" onclick="start()">Start</button>
<button id="stop" style="display: none" onclick="stop()">Stop</button>
<button id="restart" style="display: none" onclick="restart()">Restart</button>

<h2>State</h2>
<p>
    ICE gathering state: <span id="ice-gathering-state"></span>
</p>
<p>
    ICE connection state: <span id="ice-connection-state"></span>
</p>
<p>
    Signaling state: <span id="signaling-state"></span>
</p>

<div id="media" style="display: none">
    <h2>Media</h2>

    <audio id="audio" autoplay="true"></audio>
    <video id="video" autoplay="true" playsinline muted="true"></video>
</div>

<h2>Data channel</h2>
<pre id="data-channel" style="height: 200px;"></pre>

<h2>SDP</h2>

<h3>Offer</h3>
<pre id="offer-sdp"></pre>

<h3>Answer</h3>
<pre id="answer-sdp"></pre>

<script src="client.js"></script>
</body>
</html>

0

There are 0 answers