Files
moonlight-common-c/src/VideoStream.c
T
Cameron Gutman c13f4a323f Use connected datagram sockets for audio and video
This ensures unwanted data from other sources is dropped and that our own source address doesn't change while streaming.
2023-09-12 23:29:05 -05:00

363 lines
12 KiB
C

#include "Limelight-internal.h"
#define FIRST_FRAME_MAX 1500
#define FIRST_FRAME_TIMEOUT_SEC 10
#define FIRST_FRAME_PORT 47996
static RTP_VIDEO_QUEUE rtpQueue;
static SOCKET rtpSocket = INVALID_SOCKET;
static SOCKET firstFrameSocket = INVALID_SOCKET;
static PLT_THREAD udpPingThread;
static PLT_THREAD receiveThread;
static PLT_THREAD decoderThread;
static bool receivedDataFromPeer;
static uint64_t firstDataTimeMs;
static bool receivedFullFrame;
// We can't request an IDR frame until the depacketizer knows
// that a packet was lost. This timeout bounds the time that
// the RTP queue will wait for missing/reordered packets.
#define RTP_QUEUE_DELAY 10
// This is the desired number of video packets that can be
// stored in the socket's receive buffer. 2048 is chosen
// because it should be large enough for all reasonable
// frame sizes (probably 2 or 3 frames) without using too
// much kernel memory with larger packet sizes. It also
// can smooth over transient pauses in network traffic
// and subsequent packet/frame bursts that follow.
#define RTP_RECV_PACKETS_BUFFERED 2048
// Initialize the video stream
void initializeVideoStream(void) {
initializeVideoDepacketizer(StreamConfig.packetSize);
RtpvInitializeQueue(&rtpQueue);
receivedDataFromPeer = false;
firstDataTimeMs = 0;
receivedFullFrame = false;
}
// Clean up the video stream
void destroyVideoStream(void) {
destroyVideoDepacketizer();
RtpvCleanupQueue(&rtpQueue);
}
// UDP Ping proc
static void VideoPingThreadProc(void* context) {
char legacyPingData[] = { 0x50, 0x49, 0x4E, 0x47 };
// We do not check for errors here. Socket errors will be handled
// on the read-side in ReceiveThreadProc(). This avoids potential
// issues related to receiving ICMP port unreachable messages due
// to sending a packet prior to the host PC binding to that port.
int pingCount = 0;
while (!PltIsThreadInterrupted(&udpPingThread)) {
if (VideoPingPayload.payload[0] != 0) {
pingCount++;
VideoPingPayload.sequenceNumber = BE32(pingCount);
send(rtpSocket, (char*)&VideoPingPayload, sizeof(VideoPingPayload), 0);
}
else {
send(rtpSocket, legacyPingData, sizeof(legacyPingData), 0);
}
PltSleepMsInterruptible(&udpPingThread, 500);
}
}
// Receive thread proc
static void VideoReceiveThreadProc(void* context) {
int err;
int bufferSize, receiveSize;
char* buffer;
int queueStatus;
bool useSelect;
int waitingForVideoMs;
receiveSize = StreamConfig.packetSize + MAX_RTP_HEADER_SIZE;
bufferSize = receiveSize + sizeof(RTPV_QUEUE_ENTRY);
buffer = NULL;
if (setNonFatalRecvTimeoutMs(rtpSocket, UDP_RECV_POLL_TIMEOUT_MS) < 0) {
// SO_RCVTIMEO failed, so use select() to wait
useSelect = true;
}
else {
// SO_RCVTIMEO timeout set for recv()
useSelect = false;
}
waitingForVideoMs = 0;
while (!PltIsThreadInterrupted(&receiveThread)) {
PRTP_PACKET packet;
if (buffer == NULL) {
buffer = (char*)malloc(bufferSize);
if (buffer == NULL) {
Limelog("Video Receive: malloc() failed\n");
ListenerCallbacks.connectionTerminated(-1);
return;
}
}
err = recvUdpSocket(rtpSocket, buffer, receiveSize, useSelect);
if (err < 0) {
Limelog("Video Receive: recvUdpSocket() failed: %d\n", (int)LastSocketError());
ListenerCallbacks.connectionTerminated(LastSocketFail());
break;
}
else if (err == 0) {
if (!receivedDataFromPeer) {
// If we wait many seconds without ever receiving a video packet,
// assume something is broken and terminate the connection.
waitingForVideoMs += UDP_RECV_POLL_TIMEOUT_MS;
if (waitingForVideoMs >= FIRST_FRAME_TIMEOUT_SEC * 1000) {
Limelog("Terminating connection due to lack of video traffic\n");
ListenerCallbacks.connectionTerminated(ML_ERROR_NO_VIDEO_TRAFFIC);
break;
}
}
// Receive timed out; try again
continue;
}
if (!receivedDataFromPeer) {
receivedDataFromPeer = true;
Limelog("Received first video packet after %d ms\n", waitingForVideoMs);
firstDataTimeMs = PltGetMillis();
}
if (!receivedFullFrame) {
uint64_t now = PltGetMillis();
if (now - firstDataTimeMs >= FIRST_FRAME_TIMEOUT_SEC * 1000) {
Limelog("Terminating connection due to lack of a successful video frame\n");
ListenerCallbacks.connectionTerminated(ML_ERROR_NO_VIDEO_FRAME);
break;
}
}
if (err < (int)sizeof(RTP_PACKET)) {
// Runt packet
continue;
}
// Convert fields to host byte-order
packet = (PRTP_PACKET)&buffer[0];
packet->sequenceNumber = BE16(packet->sequenceNumber);
packet->timestamp = BE32(packet->timestamp);
packet->ssrc = BE32(packet->ssrc);
queueStatus = RtpvAddPacket(&rtpQueue, packet, err, (PRTPV_QUEUE_ENTRY)&buffer[receiveSize]);
if (queueStatus == RTPF_RET_QUEUED) {
// The queue owns the buffer
buffer = NULL;
}
}
if (buffer != NULL) {
free(buffer);
}
}
void notifyKeyFrameReceived(void) {
// Remember that we got a full frame successfully
receivedFullFrame = true;
}
// Decoder thread proc
static void VideoDecoderThreadProc(void* context) {
while (!PltIsThreadInterrupted(&decoderThread)) {
VIDEO_FRAME_HANDLE frameHandle;
PDECODE_UNIT decodeUnit;
if (!LiWaitForNextVideoFrame(&frameHandle, &decodeUnit)) {
return;
}
LiCompleteVideoFrame(frameHandle, VideoCallbacks.submitDecodeUnit(decodeUnit));
}
}
// Read the first frame of the video stream
int readFirstFrame(void) {
// All that matters is that we close this socket.
// This starts the flow of video on Gen 3 servers.
closeSocket(firstFrameSocket);
firstFrameSocket = INVALID_SOCKET;
return 0;
}
// Terminate the video stream
void stopVideoStream(void) {
if (!receivedDataFromPeer) {
Limelog("No video traffic was ever received from the host!\n");
}
VideoCallbacks.stop();
// Wake up client code that may be waiting on the decode unit queue
stopVideoDepacketizer();
PltInterruptThread(&udpPingThread);
PltInterruptThread(&receiveThread);
if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) {
PltInterruptThread(&decoderThread);
}
if (firstFrameSocket != INVALID_SOCKET) {
shutdownTcpSocket(firstFrameSocket);
}
PltJoinThread(&udpPingThread);
PltJoinThread(&receiveThread);
if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) {
PltJoinThread(&decoderThread);
}
PltCloseThread(&udpPingThread);
PltCloseThread(&receiveThread);
if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) {
PltCloseThread(&decoderThread);
}
if (firstFrameSocket != INVALID_SOCKET) {
closeSocket(firstFrameSocket);
firstFrameSocket = INVALID_SOCKET;
}
if (rtpSocket != INVALID_SOCKET) {
closeSocket(rtpSocket);
rtpSocket = INVALID_SOCKET;
}
VideoCallbacks.cleanup();
}
// Start the video stream
int startVideoStream(void* rendererContext, int drFlags) {
int err;
firstFrameSocket = INVALID_SOCKET;
// This must be called before the decoder thread starts submitting
// decode units
LC_ASSERT(NegotiatedVideoFormat != 0);
err = VideoCallbacks.setup(NegotiatedVideoFormat, StreamConfig.width,
StreamConfig.height, StreamConfig.fps, rendererContext, drFlags);
if (err != 0) {
return err;
}
rtpSocket = bindUdpSocket(RemoteAddr.ss_family, RTP_RECV_PACKETS_BUFFERED * (StreamConfig.packetSize + MAX_RTP_HEADER_SIZE));
if (rtpSocket == INVALID_SOCKET) {
VideoCallbacks.cleanup();
return LastSocketError();
}
// Connect our video socket to the target address and port
LC_ASSERT(VideoPortNumber != 0);
err = connectUdpSocket(rtpSocket, &RemoteAddr, RemoteAddrLen, VideoPortNumber);
if (err != 0) {
VideoCallbacks.cleanup();
closeSocket(rtpSocket);
return err;
}
VideoCallbacks.start();
err = PltCreateThread("VideoRecv", VideoReceiveThreadProc, NULL, &receiveThread);
if (err != 0) {
VideoCallbacks.stop();
closeSocket(rtpSocket);
VideoCallbacks.cleanup();
return err;
}
if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) {
err = PltCreateThread("VideoDec", VideoDecoderThreadProc, NULL, &decoderThread);
if (err != 0) {
VideoCallbacks.stop();
PltInterruptThread(&receiveThread);
PltJoinThread(&receiveThread);
PltCloseThread(&receiveThread);
closeSocket(rtpSocket);
VideoCallbacks.cleanup();
return err;
}
}
if (AppVersionQuad[0] == 3) {
// Connect this socket to open port 47998 for our ping thread
firstFrameSocket = connectTcpSocket(&RemoteAddr, RemoteAddrLen,
FIRST_FRAME_PORT, FIRST_FRAME_TIMEOUT_SEC);
if (firstFrameSocket == INVALID_SOCKET) {
VideoCallbacks.stop();
stopVideoDepacketizer();
PltInterruptThread(&receiveThread);
if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) {
PltInterruptThread(&decoderThread);
}
PltJoinThread(&receiveThread);
if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) {
PltJoinThread(&decoderThread);
}
PltCloseThread(&receiveThread);
if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) {
PltCloseThread(&decoderThread);
}
closeSocket(rtpSocket);
VideoCallbacks.cleanup();
return LastSocketError();
}
}
// Start pinging before reading the first frame so GFE knows where
// to send UDP data
err = PltCreateThread("VideoPing", VideoPingThreadProc, NULL, &udpPingThread);
if (err != 0) {
VideoCallbacks.stop();
stopVideoDepacketizer();
PltInterruptThread(&receiveThread);
if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) {
PltInterruptThread(&decoderThread);
}
PltJoinThread(&receiveThread);
if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) {
PltJoinThread(&decoderThread);
}
PltCloseThread(&receiveThread);
if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) {
PltCloseThread(&decoderThread);
}
closeSocket(rtpSocket);
if (firstFrameSocket != INVALID_SOCKET) {
closeSocket(firstFrameSocket);
firstFrameSocket = INVALID_SOCKET;
}
VideoCallbacks.cleanup();
return err;
}
if (AppVersionQuad[0] == 3) {
// Read the first frame to start the flow of video
err = readFirstFrame();
if (err != 0) {
stopVideoStream();
return err;
}
}
return 0;
}