From 0374ed2f54199eaa8e5a7e028be880f20b399607 Mon Sep 17 00:00:00 2001 From: Bryan Biedenkapp Date: Wed, 28 Jun 2023 14:07:32 -0400 Subject: [PATCH] correct bad implementation of RTP SSRC, SSRC should be the *sending* peer ID; --- src/network/FNENetwork.cpp | 8 ++++---- src/network/FrameQueue.cpp | 27 ++++++++++++++++++++------- src/network/FrameQueue.h | 3 +++ src/network/Network.cpp | 11 ++++++++++- src/network/Network.h | 2 ++ 5 files changed, 39 insertions(+), 12 deletions(-) diff --git a/src/network/FNENetwork.cpp b/src/network/FNENetwork.cpp index cff5bd0c..1b8985d0 100644 --- a/src/network/FNENetwork.cpp +++ b/src/network/FNENetwork.cpp @@ -913,7 +913,7 @@ bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const sockaddr_storage addr = m_peers[peerId].socketStorage(); uint32_t addrLen = m_peers[peerId].sockStorageLen(); - m_frameQueue->enqueueMessage(data, length, streamId, peerId, opcode, pktSeq, addr, addrLen); + m_frameQueue->enqueueMessage(data, length, streamId, peerId, m_peerId, opcode, pktSeq, addr, addrLen); if (queueOnly) return true; return m_frameQueue->flushQueue(); @@ -1041,7 +1041,7 @@ bool FNENetwork::writePeerNAK(uint32_t peerId, const char* tag, sockaddr_storage LogWarning(LOG_NET, "%s from unauth PEER %u", tag, peerId); - m_frameQueue->enqueueMessage(buffer, 10U, createStreamId(), peerId, + m_frameQueue->enqueueMessage(buffer, 10U, createStreamId(), peerId, m_peerId, { NET_FUNC_NAK, NET_SUBFUNC_NOP }, 0U, addr, addrLen); return m_frameQueue->flushQueue(); } @@ -1060,7 +1060,7 @@ void FNENetwork::writePeers(FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t addrLen = peer.second.sockStorageLen(); uint16_t pktSeq = peer.second.pktLastSeq(); - m_frameQueue->enqueueMessage(data, length, streamId, peerId, opcode, pktSeq, addr, addrLen); + m_frameQueue->enqueueMessage(data, length, streamId, peerId, m_peerId, opcode, pktSeq, addr, addrLen); } m_frameQueue->flushQueue(); @@ -1080,7 +1080,7 @@ void FNENetwork::writePeers(FrameQueue::OpcodePair opcode, const uint8_t* data, sockaddr_storage addr = peer.second.socketStorage(); uint32_t addrLen = peer.second.sockStorageLen(); - m_frameQueue->enqueueMessage(data, length, streamId, peerId, opcode, pktSeq, addr, addrLen); + m_frameQueue->enqueueMessage(data, length, streamId, peerId, m_peerId, opcode, pktSeq, addr, addrLen); } m_frameQueue->flushQueue(); diff --git a/src/network/FrameQueue.cpp b/src/network/FrameQueue.cpp index 5a889120..032e3801 100644 --- a/src/network/FrameQueue.cpp +++ b/src/network/FrameQueue.cpp @@ -133,11 +133,6 @@ UInt8Array FrameQueue::read(int& messageLength, sockaddr_storage& address, uint3 *fneHeader = _fneHeader; } - // ensure the RTP synchronization source ID matches the FNE stream ID - if (_rtpHeader.getSSRC() != _fneHeader.getStreamId()) { - LogWarning(LOG_NET, "FrameQueue::read(), RTP header and FNE header do not agree on stream ID? %u != %u", _rtpHeader.getSSRC(), _fneHeader.getStreamId()); - } - // copy message messageLength = _fneHeader.getMessageLength(); __UNIQUE_UINT8_ARRAY(message, messageLength); @@ -171,6 +166,25 @@ UInt8Array FrameQueue::read(int& messageLength, sockaddr_storage& address, uint3 /// void FrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId, OpcodePair opcode, uint16_t rtpSeq, sockaddr_storage& addr, uint32_t addrLen) +{ + enqueueMessage(message, length, streamId, peerId, peerId, opcode, rtpSeq, addr, addrLen); +} + +/// +/// Cache "message" to frame queue. +/// +/// Message buffer to frame and queue. +/// Length of message. +/// Message stream ID. +/// Peer ID. +/// RTP SSRC ID. +/// Opcode. +/// RTP Sequence. +/// IP address to write data to. +/// +/// +void FrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId, + uint32_t ssrc, OpcodePair opcode, uint16_t rtpSeq, sockaddr_storage& addr, uint32_t addrLen) { assert(message != nullptr); assert(length > 0U); @@ -184,13 +198,12 @@ void FrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, uint32_ header.setPayloadType(DVM_RTP_PAYLOAD_TYPE); header.setSequence(rtpSeq); - header.setSSRC(streamId); + header.setSSRC(ssrc); // properly flag control opcodes if ((opcode.first == NET_FUNC_TRANSFER) || (opcode.first == NET_FUNC_GRANT)) { header.setPayloadType(DVM_CTRL_RTP_PAYLOAD_TYPE); header.setSequence(0U); - header.setSSRC(0U); } header.encode(buffer); diff --git a/src/network/FrameQueue.h b/src/network/FrameQueue.h index fccd99f3..37de81d4 100644 --- a/src/network/FrameQueue.h +++ b/src/network/FrameQueue.h @@ -63,6 +63,9 @@ namespace network /// Cache "message" to frame queue. void enqueueMessage(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId, OpcodePair opcode, uint16_t rtpSeq, sockaddr_storage& addr, uint32_t addrLen); + /// Cache "message" to frame queue. + void enqueueMessage(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId, + uint32_t ssrc, OpcodePair opcode, uint16_t rtpSeq, sockaddr_storage& addr, uint32_t addrLen); /// Flush the message queue. bool flushQueue(); diff --git a/src/network/Network.cpp b/src/network/Network.cpp index a39260e5..c27d7864 100644 --- a/src/network/Network.cpp +++ b/src/network/Network.cpp @@ -98,7 +98,8 @@ Network::Network(const std::string& address, uint16_t port, uint16_t localPort, m_height(0), m_location(), m_restApiPassword(), - m_restApiPort(0) + m_restApiPort(0), + m_remotePeerId(0U) { assert(!address.empty()); assert(port > 0U); @@ -267,6 +268,12 @@ void Network::clock(uint32_t ms) fneHeader.getStreamId(), fneHeader.getFunction(), fneHeader.getSubFunction()); } + // ensure the RTP synchronization source ID matches the FNE peer ID + if (m_remotePeerId != 0U && rtpHeader.getSSRC() != m_remotePeerId) { + LogWarning(LOG_NET, "RTP header and traffic session do not agree on remote peer ID? %u != %u", rtpHeader.getSSRC(), m_remotePeerId); + // should this be a fatal error? + } + // is this RTP packet destined for us? uint32_t peerId = fneHeader.getPeerId(); if (m_peerId != peerId) { @@ -507,6 +514,7 @@ void Network::clock(uint32_t ms) case NET_STAT_WAITING_CONFIG: LogMessage(LOG_NET, "Logged into the master successfully"); m_loginStreamId = 0U; + m_remotePeerId = rtpHeader.getSSRC(); pktSeq(true); m_status = NET_STAT_RUNNING; m_timeoutTimer.start(); @@ -641,6 +649,7 @@ bool Network::writeLogin() Utils::dump(1U, "Network Message, Login", buffer, 8U); m_loginStreamId = createStreamId(); + m_remotePeerId = 0U; m_frameQueue->enqueueMessage(buffer, 8U, m_loginStreamId, m_peerId, { NET_FUNC_RPTL, NET_SUBFUNC_NOP }, pktSeq(true), m_addr, m_addrLen); return m_frameQueue->flushQueue(); diff --git a/src/network/Network.h b/src/network/Network.h index 453dea94..eaedce47 100644 --- a/src/network/Network.h +++ b/src/network/Network.h @@ -133,6 +133,8 @@ namespace network std::string m_restApiPassword; uint16_t m_restApiPort; + uint32_t m_remotePeerId; + /// Writes login request to the network. bool writeLogin(); /// Writes network authentication challenge.