Files
moonlight-common-c/src/AudioStream.c
T
Andy Grundman 82ee2d6590 Improve support for high-resolution stats
* This patch adds a new microsecond-resolution function call, LiGetMicroseconds(), to complement
the existing LiGetMillis(). Many variables used by stats have been updated to work at this
higher resolution and now provide better results when displaying e.g. sub-millisecond frametime stats.
To try and avoid confusion, variables that now contain microseconds have been renamed with a suffix
of 'Us', and those ending in 'Ms' contain milliseconds. I originally experimented with nanoseconds but it
felt like overkill for our needs.

Public API in Limelight.h:
uint64_t LiGetMicroseconds(void);
uint64_t LiGetMillis(void);
const RTP_AUDIO_STATS* LiGetRTPAudioStats(void);  // provides access to RTP data for the overlay stats
const RTP_VIDEO_STATS* LiGetRTPVideoStats(void);

Note: Users of this library may need to make changes. If using LiGetMillis() to track the duration of
something that is shown to the user, consider switching to LiGetMicroseconds(). Remember to divide by
1000 at time of display to show in milliseconds.
2025-11-06 20:27:01 -06:00

483 lines
16 KiB
C

#include "Limelight-internal.h"
static SOCKET rtpSocket = INVALID_SOCKET;
static LINKED_BLOCKING_QUEUE packetQueue;
static RTP_AUDIO_QUEUE rtpAudioQueue;
static PLT_THREAD udpPingThread;
static PLT_THREAD receiveThread;
static PLT_THREAD decoderThread;
static PPLT_CRYPTO_CONTEXT audioDecryptionCtx;
static uint32_t avRiKeyId;
static unsigned short lastSeq;
static bool pingThreadStarted;
static bool receivedDataFromPeer;
static uint64_t firstReceiveTime;
#ifdef LC_DEBUG
#define INVALID_OPUS_HEADER 0x00
static uint8_t opusHeaderByte;
#endif
#define MAX_PACKET_SIZE 1400
typedef struct _QUEUE_AUDIO_PACKET_HEADER {
LINKED_BLOCKING_QUEUE_ENTRY lentry;
int size;
} QUEUED_AUDIO_PACKET_HEADER, *PQUEUED_AUDIO_PACKET_HEADER;
typedef struct _QUEUED_AUDIO_PACKET {
QUEUED_AUDIO_PACKET_HEADER header;
char data[MAX_PACKET_SIZE];
} QUEUED_AUDIO_PACKET, *PQUEUED_AUDIO_PACKET;
static void AudioPingThreadProc(void* context) {
char legacyPingData[] = { 0x50, 0x49, 0x4E, 0x47 };
LC_SOCKADDR saddr;
LC_ASSERT(AudioPortNumber != 0);
memcpy(&saddr, &RemoteAddr, sizeof(saddr));
SET_PORT(&saddr, AudioPortNumber);
// We do not check for errors here. Socket errors will be handled
// on the read-side in ReceiveThreadProc(). This avoids potential
// issues related to receiving ICMP port unreachable messages due
// to sending a packet prior to the host PC binding to that port.
int pingCount = 0;
while (!PltIsThreadInterrupted(&udpPingThread)) {
if (AudioPingPayload.payload[0] != 0) {
pingCount++;
AudioPingPayload.sequenceNumber = BE32(pingCount);
sendto(rtpSocket, (char*)&AudioPingPayload, sizeof(AudioPingPayload), 0, (struct sockaddr*)&saddr, AddrLen);
}
else {
sendto(rtpSocket, legacyPingData, sizeof(legacyPingData), 0, (struct sockaddr*)&saddr, AddrLen);
}
PltSleepMsInterruptible(&udpPingThread, 500);
}
}
// Initialize the audio stream and start
int initializeAudioStream(void) {
LbqInitializeLinkedBlockingQueue(&packetQueue, 30);
RtpaInitializeQueue(&rtpAudioQueue);
lastSeq = 0;
receivedDataFromPeer = false;
pingThreadStarted = false;
firstReceiveTime = 0;
audioDecryptionCtx = PltCreateCryptoContext();
#ifdef LC_DEBUG
opusHeaderByte = INVALID_OPUS_HEADER;
#endif
// Copy and byte-swap the AV RI key ID used for the audio encryption IV
memcpy(&avRiKeyId, StreamConfig.remoteInputAesIv, sizeof(avRiKeyId));
avRiKeyId = BE32(avRiKeyId);
return 0;
}
// This is called when the RTSP SETUP message is parsed and the audio port
// number is parsed out of it. Alternatively, it's also called if parsing fails
// and will use the well known audio port instead.
int notifyAudioPortNegotiationComplete(void) {
LC_ASSERT(!pingThreadStarted);
LC_ASSERT(AudioPortNumber != 0);
// For GFE 3.22 compatibility, we must start the audio ping thread before the RTSP handshake.
// It will not reply to our RTSP PLAY request until the audio ping has been received.
rtpSocket = bindUdpSocket(RemoteAddr.ss_family, &LocalAddr, AddrLen, 0, SOCK_QOS_TYPE_AUDIO);
if (rtpSocket == INVALID_SOCKET) {
return LastSocketFail();
}
// We may receive audio before our threads are started, but that's okay. We'll
// drop the first 1 second of audio packets to catch up with the backlog.
int err = PltCreateThread("AudioPing", AudioPingThreadProc, NULL, &udpPingThread);
if (err != 0) {
return err;
}
pingThreadStarted = true;
return 0;
}
static void freePacketList(PLINKED_BLOCKING_QUEUE_ENTRY entry) {
PLINKED_BLOCKING_QUEUE_ENTRY nextEntry;
while (entry != NULL) {
nextEntry = entry->flink;
// The entry is stored within the data allocation
free(entry->data);
entry = nextEntry;
}
}
// Tear down the audio stream once we're done with it
void destroyAudioStream(void) {
if (rtpSocket != INVALID_SOCKET) {
if (pingThreadStarted) {
PltInterruptThread(&udpPingThread);
PltJoinThread(&udpPingThread);
}
closeSocket(rtpSocket);
rtpSocket = INVALID_SOCKET;
}
PltDestroyCryptoContext(audioDecryptionCtx);
freePacketList(LbqDestroyLinkedBlockingQueue(&packetQueue));
RtpaCleanupQueue(&rtpAudioQueue);
}
static bool queuePacketToLbq(PQUEUED_AUDIO_PACKET* packet) {
int err;
do {
err = LbqOfferQueueItem(&packetQueue, *packet, &(*packet)->header.lentry);
if (err == LBQ_SUCCESS) {
// The LBQ owns the buffer now
*packet = NULL;
}
else if (err == LBQ_BOUND_EXCEEDED) {
Limelog("Audio packet queue overflow\n");
// The audio queue is full, so free all existing items and try again
freePacketList(LbqFlushQueueItems(&packetQueue));
}
} while (err == LBQ_BOUND_EXCEEDED);
return err == LBQ_SUCCESS;
}
static void decodeInputData(PQUEUED_AUDIO_PACKET packet) {
// If the packet size is zero, this is a placeholder for a missing
// packet. Trigger packet loss concealment logic in libopus by
// invoking the decoder with a NULL buffer.
if (packet->header.size == 0) {
AudioCallbacks.decodeAndPlaySample(NULL, 0);
return;
}
PRTP_PACKET rtp = (PRTP_PACKET)&packet->data[0];
if (lastSeq != 0 && (unsigned short)(lastSeq + 1) != rtp->sequenceNumber) {
Limelog("Network dropped audio data (expected %d, but received %d)\n", lastSeq + 1, rtp->sequenceNumber);
}
lastSeq = rtp->sequenceNumber;
if (AudioEncryptionEnabled) {
// We must have room for the AES padding which may be written to the buffer
unsigned char decryptedOpusData[ROUND_TO_PKCS7_PADDED_LEN(MAX_PACKET_SIZE)];
unsigned char iv[16] = { 0 };
int dataLength = packet->header.size - sizeof(*rtp);
LC_ASSERT(dataLength <= MAX_PACKET_SIZE);
// The IV is the avkeyid (equivalent to the rikeyid) +
// the RTP sequence number, in big endian.
uint32_t ivSeq = BE32(avRiKeyId + rtp->sequenceNumber);
memcpy(iv, &ivSeq, sizeof(ivSeq));
if (!PltDecryptMessage(audioDecryptionCtx, ALGORITHM_AES_CBC, CIPHER_FLAG_RESET_IV | CIPHER_FLAG_FINISH,
(unsigned char*)StreamConfig.remoteInputAesKey, sizeof(StreamConfig.remoteInputAesKey),
iv, sizeof(iv),
NULL, 0,
(unsigned char*)(rtp + 1), dataLength,
decryptedOpusData, &dataLength)) {
Limelog("Failed to decrypt audio packet (sequence number: %u)\n", rtp->sequenceNumber);
LC_ASSERT_VT(false);
return;
}
#ifdef LC_DEBUG
if (opusHeaderByte == INVALID_OPUS_HEADER) {
opusHeaderByte = decryptedOpusData[0];
LC_ASSERT_VT(opusHeaderByte != INVALID_OPUS_HEADER);
}
else {
// Opus header should stay constant for the entire stream.
// If it doesn't, it may indicate that the RtpAudioQueue
// incorrectly recovered a data shard or the decryption
// of the audio packet failed. Sunshine violates this for
// surround sound in some cases, so just ignore it.
LC_ASSERT_VT(decryptedOpusData[0] == opusHeaderByte || IS_SUNSHINE());
}
#endif
AudioCallbacks.decodeAndPlaySample((char*)decryptedOpusData, dataLength);
}
else {
#ifdef LC_DEBUG
if (opusHeaderByte == INVALID_OPUS_HEADER) {
opusHeaderByte = ((uint8_t*)(rtp + 1))[0];
LC_ASSERT_VT(opusHeaderByte != INVALID_OPUS_HEADER);
}
else {
// Opus header should stay constant for the entire stream.
// If it doesn't, it may indicate that the RtpAudioQueue
// incorrectly recovered a data shard. Sunshine violates
// this for surround sound in some cases, so just ignore it.
LC_ASSERT_VT(((uint8_t*)(rtp + 1))[0] == opusHeaderByte || IS_SUNSHINE());
}
#endif
AudioCallbacks.decodeAndPlaySample((char*)(rtp + 1), packet->header.size - sizeof(*rtp));
}
}
static void AudioReceiveThreadProc(void* context) {
PRTP_PACKET rtp;
PQUEUED_AUDIO_PACKET packet;
int queueStatus;
bool useSelect;
uint32_t packetsToDrop;
int waitingForAudioMs;
packet = NULL;
packetsToDrop = 500 / AudioPacketDuration;
if (setNonFatalRecvTimeoutMs(rtpSocket, UDP_RECV_POLL_TIMEOUT_MS) < 0) {
// SO_RCVTIMEO failed, so use select() to wait
useSelect = true;
}
else {
// SO_RCVTIMEO timeout set for recv()
useSelect = false;
}
waitingForAudioMs = 0;
while (!PltIsThreadInterrupted(&receiveThread)) {
if (packet == NULL) {
packet = (PQUEUED_AUDIO_PACKET)malloc(sizeof(*packet));
if (packet == NULL) {
Limelog("Audio Receive: malloc() failed\n");
ListenerCallbacks.connectionTerminated(-1);
break;
}
}
packet->header.size = recvUdpSocket(rtpSocket, &packet->data[0], MAX_PACKET_SIZE, useSelect);
if (packet->header.size < 0) {
Limelog("Audio Receive: recvUdpSocket() failed: %d\n", (int)LastSocketError());
ListenerCallbacks.connectionTerminated(LastSocketFail());
break;
}
else if (packet->header.size == 0) {
// Receive timed out; try again
if (!receivedDataFromPeer) {
waitingForAudioMs += UDP_RECV_POLL_TIMEOUT_MS;
}
else {
// If we hit this path, there are no queued audio packets on the host PC,
// so we don't need to drop anything.
packetsToDrop = 0;
}
continue;
}
if (packet->header.size < (int)sizeof(RTP_PACKET)) {
// Runt packet
continue;
}
rtp = (PRTP_PACKET)&packet->data[0];
if (!receivedDataFromPeer) {
receivedDataFromPeer = true;
Limelog("Received first audio packet after %d ms\n", waitingForAudioMs);
if (firstReceiveTime != 0) {
// XXX firstReceiveTime is never set here...
// We're already dropping 500ms of audio so this probably doesn't matter
packetsToDrop += (uint32_t)(PltGetMillis() - firstReceiveTime) / AudioPacketDuration;
}
Limelog("Initial audio resync period: %d milliseconds\n", packetsToDrop * AudioPacketDuration);
}
// GFE accumulates audio samples before we are ready to receive them, so
// we will drop the ones that arrived before the receive thread was ready.
if (packetsToDrop > 0) {
// Only count actual audio data (not FEC) in the packets to drop calculation
if (rtp->packetType == 97) {
packetsToDrop--;
}
continue;
}
// Convert fields to host byte-order
rtp->sequenceNumber = BE16(rtp->sequenceNumber);
rtp->timestamp = BE32(rtp->timestamp);
rtp->ssrc = BE32(rtp->ssrc);
queueStatus = RtpaAddPacket(&rtpAudioQueue, (PRTP_PACKET)&packet->data[0], (uint16_t)packet->header.size);
if (RTPQ_HANDLE_NOW(queueStatus)) {
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
if (!queuePacketToLbq(&packet)) {
// An exit signal was received
break;
}
else {
// Ownership should have been taken by the LBQ
LC_ASSERT(packet == NULL);
}
}
else {
decodeInputData(packet);
}
}
else {
if (RTPQ_PACKET_CONSUMED(queueStatus)) {
// The queue consumed our packet, so we must allocate a new one
packet = NULL;
}
if (RTPQ_PACKET_READY(queueStatus)) {
// If packets are ready, pull them and send them to the decoder
uint16_t length;
PQUEUED_AUDIO_PACKET queuedPacket;
while ((queuedPacket = (PQUEUED_AUDIO_PACKET)RtpaGetQueuedPacket(&rtpAudioQueue, sizeof(QUEUED_AUDIO_PACKET_HEADER), &length)) != NULL) {
// Populate header data (not preserved in queued packets)
queuedPacket->header.size = length;
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
if (!queuePacketToLbq(&queuedPacket)) {
// An exit signal was received
free(queuedPacket);
break;
}
else {
// Ownership should have been taken by the LBQ
LC_ASSERT(queuedPacket == NULL);
}
}
else {
decodeInputData(queuedPacket);
free(queuedPacket);
}
}
// Break on exit
if (queuedPacket != NULL) {
break;
}
}
}
}
if (packet != NULL) {
free(packet);
}
}
static void AudioDecoderThreadProc(void* context) {
int err;
PQUEUED_AUDIO_PACKET packet;
while (!PltIsThreadInterrupted(&decoderThread)) {
err = LbqWaitForQueueElement(&packetQueue, (void**)&packet);
if (err != LBQ_SUCCESS) {
// An exit signal was received
return;
}
decodeInputData(packet);
free(packet);
}
}
void stopAudioStream(void) {
if (!receivedDataFromPeer) {
Limelog("No audio traffic was ever received from the host!\n");
}
AudioCallbacks.stop();
PltInterruptThread(&receiveThread);
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
// Signal threads waiting on the LBQ
LbqSignalQueueShutdown(&packetQueue);
PltInterruptThread(&decoderThread);
}
PltJoinThread(&receiveThread);
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
PltJoinThread(&decoderThread);
}
AudioCallbacks.cleanup();
}
int startAudioStream(void* audioContext, int arFlags) {
int err;
OPUS_MULTISTREAM_CONFIGURATION chosenConfig;
if (HighQualitySurroundEnabled) {
LC_ASSERT(HighQualitySurroundSupported);
LC_ASSERT(HighQualityOpusConfig.channelCount != 0);
LC_ASSERT(HighQualityOpusConfig.streams != 0);
chosenConfig = HighQualityOpusConfig;
}
else {
LC_ASSERT(NormalQualityOpusConfig.channelCount != 0);
LC_ASSERT(NormalQualityOpusConfig.streams != 0);
chosenConfig = NormalQualityOpusConfig;
}
chosenConfig.samplesPerFrame = 48 * AudioPacketDuration;
err = AudioCallbacks.init(StreamConfig.audioConfiguration, &chosenConfig, audioContext, arFlags);
if (err != 0) {
return err;
}
AudioCallbacks.start();
err = PltCreateThread("AudioRecv", AudioReceiveThreadProc, NULL, &receiveThread);
if (err != 0) {
AudioCallbacks.stop();
closeSocket(rtpSocket);
AudioCallbacks.cleanup();
return err;
}
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
err = PltCreateThread("AudioDec", AudioDecoderThreadProc, NULL, &decoderThread);
if (err != 0) {
AudioCallbacks.stop();
PltInterruptThread(&receiveThread);
PltJoinThread(&receiveThread);
closeSocket(rtpSocket);
AudioCallbacks.cleanup();
return err;
}
}
return 0;
}
int LiGetPendingAudioFrames(void) {
return LbqGetItemCount(&packetQueue);
}
int LiGetPendingAudioDuration(void) {
return LiGetPendingAudioFrames() * AudioPacketDuration;
}
const RTP_AUDIO_STATS* LiGetRTPAudioStats(void) {
return &rtpAudioQueue.stats;
}