EXPERIMENTAL: add an adaptive jitter buffer implementation to the FNE to better deal with peers on bad connections that may send packets out-of-sequence;

r05a04_dev
Bryan Biedenkapp 2 months ago
parent b5d480ed24
commit 80215c00a1

@ -63,6 +63,41 @@ master:
# This port is advertised to the network as a globally WAN accessible port.
advertisedWANPort: 62031
#
# Adaptive Jitter Buffer Configuration
# NOTE: In 99% of cases, the adaptive jitter buffer, if needed, should only be enabled on a per-peer basis,
# and remain disabled for all peers globally. Enabling the jitter buffer adds latency to voice traffic,
# and should only be used in specific network conditions where high jitter or out-of-order packets are
# common (i.e. satellite links, cellular networks, etc.).
#
jitterBuffer:
# Flag indicating whether the adaptive jitter buffer is enabled by default for all peers.
enabled: false
# Default maximum buffer size in frames (range: 2-8 frames).
# Larger values provide more reordering capability but add latency.
# Recommended: 4 frames for most networks, 6-8 for high-jitter links (satellite, cellular).
defaultMaxSize: 4
# Default maximum wait time in microseconds (range: 10000-200000 us).
# Frames exceeding this age are delivered even if gaps exist.
# Recommended: 40000 us (40ms) for terrestrial, 80000 us (80ms) for satellite.
defaultMaxWait: 40000
# Per-peer jitter buffer overrides for specific peers with problematic network conditions.
peerOverrides: []
# Example: Satellite link peer with high latency and jitter
# - peerId: 31003
# enabled: true
# maxSize: 6
# maxWait: 80000
# Example: Cellular peer with variable jitter
# - peerId: 31004
# enabled: true
# maxSize: 5
# maxWait: 60000
# Example: Fiber peer with low latency - disable jitter buffer
# - peerId: 31005
# enabled: false
# Flag indicating whether or not denied traffic will be logged.
# (This is useful for debugging talkgroup rules and other ACL issues, but can be very noisy on a busy system.)
logDenials: false

@ -0,0 +1,344 @@
# Adaptive Jitter Buffer Configuration Guide
## Overview
The FNE (Fixed Network Equipment) includes an adaptive jitter buffer system that can automatically reorder out-of-sequence RTP packets from peers experiencing network issues such as:
- **Satellite links** with high latency and variable jitter
- **Cellular connections** with packet reordering
- **Congested network paths** causing sporadic delays
- **Multi-path routing** leading to out-of-order delivery
The jitter buffer operates with **zero latency for perfect networks** - if packets arrive in order, they pass through immediately without buffering. Only out-of-order packets trigger the adaptive buffering mechanism.
## How It Works
### Zero-Latency Fast Path
When packets arrive in perfect sequence order, they are processed immediately with **no additional latency**. The jitter buffer is effectively transparent.
### Adaptive Reordering
When an out-of-order packet is detected:
1. The jitter buffer holds the packet temporarily
2. Waits for missing packets to arrive
3. Delivers frames in correct sequence order
4. Times out after a configurable period if gaps persist
### Per-Peer, Per-Stream Isolation
- Each peer connection can have independent jitter buffer settings
- Within each peer, each call/stream has its own isolated buffer
- This prevents one problematic stream from affecting others
## Configuration
### Location
Jitter buffer configuration is defined in the FNE configuration file (typically `fne-config.yml`) under the `master` section:
```yaml
master:
# ... other master configuration ...
jitterBuffer:
enabled: false
defaultMaxSize: 4
defaultMaxWait: 40000
peerOverrides:
- peerId: 31003
enabled: true
maxSize: 6
maxWait: 80000
```
### Parameters
#### Global Settings
- **enabled** (boolean, default: `false`)
- Master enable/disable switch for jitter buffering
- When `false`, all peers operate with zero-latency pass-through
- When `true`, peers use jitter buffering with default parameters
- **defaultMaxSize** (integer, range: 2-8, default: `4`)
- Maximum number of frames to buffer per stream
- Larger values provide more reordering capability but add latency
- **Recommended values:**
- `4` - Standard networks (LAN, stable WAN)
- `6` - High-jitter networks (cellular, congested paths)
- `8` - Extreme conditions (satellite, very poor links)
- **defaultMaxWait** (integer, range: 10000-200000 microseconds, default: `40000`)
- Maximum time to wait for missing packets
- Frames older than this are delivered even with gaps
- **Recommended values:**
- `40000` (40ms) - Terrestrial networks
- `60000` (60ms) - Cellular networks
- `80000` (80ms) - Satellite links
#### Per-Peer Overrides
The `peerOverrides` array allows you to customize jitter buffer behavior for specific peers:
```yaml
peerOverrides:
# Satellite link - high latency, requires larger buffer
- peerId: 31003
enabled: true
maxSize: 6
maxWait: 80000
# Cellular peer - variable jitter
- peerId: 31004
enabled: true
maxSize: 5
maxWait: 60000
# Local fiber peer - disable jitter buffer for minimal latency
- peerId: 31005
enabled: false
```
Each override entry supports:
- **peerId** (integer) - The peer ID to configure
- **enabled** (boolean) - Enable/disable for this specific peer
- **maxSize** (integer, range: 2-8) - Buffer size override
- **maxWait** (integer, range: 10000-200000) - Timeout override
## Configuration Examples
### Example 1: Disabled (Default)
For networks with reliable connectivity:
```yaml
master:
jitterBuffer:
enabled: false
defaultMaxSize: 4
defaultMaxWait: 40000
```
All peers operate with zero-latency pass-through. Best for:
- Local area networks
- Stable dedicated connections
- Networks with minimal packet loss/reordering
### Example 2: Global Enable with Defaults
Enable jitter buffering for all peers with conservative settings:
```yaml
master:
jitterBuffer:
enabled: true
defaultMaxSize: 4
defaultMaxWait: 40000
```
Good starting point for:
- Mixed network environments
- Networks with occasional jitter
- General purpose deployments
### Example 3: Selective Peer Configuration
Enable only for problematic peers:
```yaml
master:
jitterBuffer:
enabled: false # Disabled by default
defaultMaxSize: 4
defaultMaxWait: 40000
peerOverrides:
# Enable only for satellite peer
- peerId: 31003
enabled: true
maxSize: 8
maxWait: 80000
# Enable for cellular peer
- peerId: 31004
enabled: true
maxSize: 6
maxWait: 60000
```
Recommended approach for:
- Mostly stable networks with a few problem peers
- Minimizing overall system latency
- Targeted optimization
### Example 4: High-Jitter Network
For challenging network environments:
```yaml
master:
jitterBuffer:
enabled: true
defaultMaxSize: 6
defaultMaxWait: 60000
peerOverrides:
# Satellite link needs even more buffering
- peerId: 31003
enabled: true
maxSize: 8
maxWait: 100000
```
Suitable for:
- Wide area networks with variable quality
- Networks with frequent reordering
- Deployments with multiple satellite/cellular links
## Monitoring and Tuning
### Startup Logging
When the FNE starts, jitter buffer configuration is displayed in the logs:
```
I: 2025-12-03 22:07:11.374 Jitter Buffer Enabled: yes
I: 2025-12-03 22:07:11.374 Jitter Buffer Default Max Size: 6 frames
I: 2025-12-03 22:07:11.374 Jitter Buffer Default Max Wait: 50000 microseconds
I: 2025-12-03 22:07:11.374 Jitter Buffer Peer Overrides: 3 peer(s) configured
```
### Per-Peer Configuration Logging
When a peer connects and jitter buffering is enabled, you'll see (with verbose logging):
```
I: 2025-12-03 22:10:15.234 PEER 31003 jitter buffer configured (override): enabled=yes, maxSize=8, maxWait=80000
I: 2025-12-03 22:10:16.456 PEER 31004 jitter buffer configured (default): enabled=yes, maxSize=4, maxWait=40000
```
### Future Monitoring (To Be Implemented)
Planned monitoring capabilities:
- Per-peer jitter buffer statistics (reordered frames, dropped frames, timeouts)
- InfluxDB metrics for trend analysis
- REST API endpoints for runtime statistics
- Automatic tuning recommendations based on observed behavior
## Performance Characteristics
### CPU Impact
- **Zero-latency path:** Negligible overhead (~1 comparison per packet)
- **Buffering path:** Minimal overhead (~map lookup + timestamp check)
- **Memory:** ~500 bytes per active stream buffer
### Latency Impact
- **In-order packets:** 0ms additional latency
- **Out-of-order packets:** Buffered until:
- Missing packets arrive, OR
- `maxWait` timeout expires
- **Typical latency:** 10-40ms for reordered packets on terrestrial networks
### Effectiveness
Based on the adaptive jitter buffer design:
- **100% pass-through** for perfect networks (zero latency)
- **~95-99% recovery** of out-of-order packets within timeout window
- **Automatic timeout delivery** prevents indefinite stalling
## Troubleshooting
### Symptom: Audio/Data Gaps Despite Jitter Buffer
**Possible Causes:**
1. `maxWait` timeout too short for network conditions
2. `maxSize` buffer too small for reordering depth
3. Actual packet loss (not just reordering)
**Solutions:**
- Increase `maxWait` by 20-40ms increments
- Increase `maxSize` by 1-2 frames
- Verify network packet loss with diagnostics
### Symptom: Excessive Latency
**Possible Causes:**
1. Jitter buffer enabled on stable connections
2. `maxWait` set too high
3. `maxSize` set too large
**Solutions:**
- Disable jitter buffer for known-good peers using overrides
- Reduce `maxWait` in 10-20ms decrements
- Reduce `maxSize` to minimum (2-4 frames)
### Symptom: No Improvement
**Possible Causes:**
1. Jitter buffer not actually enabled for the problematic peer
2. Issues beyond reordering (e.g., corruption, auth failures)
3. Problems at application layer, not transport layer
**Solutions:**
- Verify peer override configuration is correct
- Check FNE logs for peer-specific configuration messages
- Enable verbose and debug logging to trace packet flow
## Technical Details
### RTP Sequence Number Handling
The jitter buffer correctly handles RTP sequence number wraparound (RFC 3550):
- Sequence numbers are 16-bit unsigned integers (0-65535)
- After 65535, sequence resets to 0
- Buffer correctly calculates sequence differences across wraparound
- No special configuration needed
### Thread Safety
- Each peer's jitter buffer map is protected by a mutex
- Per-stream buffers operate independently
- Safe for concurrent access from multiple worker threads
### Memory Management
- Buffered frames use RAII for automatic cleanup
- Timed-out frames are automatically freed
- Stream buffers cleaned up when streams end
- No memory leaks under normal operation
## Best Practices
1. **Start Disabled**: Begin with jitter buffering disabled and enable only as needed
2. **Target Specific Peers**: Use per-peer overrides rather than global enable when possible
3. **Conservative Tuning**: Start with default parameters and adjust incrementally
4. **Monitor Performance**: Watch for signs of latency or audio quality issues
5. **Document Changes**: Keep records of which peers need special configuration
6. **Test Thoroughly**: Validate changes don't introduce unintended latency
## Reference
### Related Documentation
- `ADAPTIVE_JITTER_BUFFER.md` - Technical implementation details
- `AdaptiveJitterBuffer.example.cpp` - Code examples
- `fne-config.example.yml` - Full configuration example
### Configuration Schema
```yaml
jitterBuffer:
enabled: <boolean> # false
defaultMaxSize: <2-8> # 4
defaultMaxWait: <10000-200000> # 40000
peerOverrides:
- peerId: <integer> # Required
enabled: <boolean> # Optional, defaults to global enabled
maxSize: <2-8> # Optional, defaults to defaultMaxSize
maxWait: <10000-200000> # Optional, defaults to defaultMaxWait
```
## Version History
- **December 2025** - Initial implementation
- Zero-latency fast path
- Per-peer, per-stream adaptive buffering
- Configuration parsing and validation
- Sequence wraparound support

@ -0,0 +1,272 @@
// SPDX-License-Identifier: GPL-2.0-only
/*
* Digital Voice Modem - Common Library
* GPLv2 Open Source. Use is subject to license terms.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* Copyright (C) 2025 Bryan Biedenkapp, N2PLL
*
*/
#include "common/Log.h"
#include "network/AdaptiveJitterBuffer.h"
using namespace network;
#include <algorithm>
#include <cassert>
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
#define RTP_SEQ_MOD (1U << 16) // 65536
// ---------------------------------------------------------------------------
// Public Class Members
// ---------------------------------------------------------------------------
/* Initializes a new instance of the AdaptiveJitterBuffer class. */
AdaptiveJitterBuffer::AdaptiveJitterBuffer(uint16_t maxBufferSize, uint32_t maxWaitTime) :
m_buffer(),
m_mutex(),
m_nextExpectedSeq(0U),
m_maxBufferSize(maxBufferSize),
m_maxWaitTime(maxWaitTime),
m_totalFrames(0ULL),
m_reorderedFrames(0ULL),
m_droppedFrames(0ULL),
m_timedOutFrames(0ULL),
m_initialized(false)
{
assert(maxBufferSize > 0U);
assert(maxWaitTime > 0U);
}
/* Finalizes a instance of the AdaptiveJitterBuffer class. */
AdaptiveJitterBuffer::~AdaptiveJitterBuffer()
{
std::lock_guard<std::mutex> lock(m_mutex);
// clean up any buffered frames
for (auto& pair : m_buffer) {
if (pair.second != nullptr) {
delete pair.second;
}
}
m_buffer.clear();
}
/* Processes an incoming RTP frame. */
bool AdaptiveJitterBuffer::processFrame(uint16_t seq, const uint8_t* data, uint32_t length,
std::vector<BufferedFrame*>& readyFrames)
{
if (data == nullptr || length == 0U) {
return false;
}
std::lock_guard<std::mutex> lock(m_mutex);
m_totalFrames++;
// initialize on first frame
if (!m_initialized) {
m_nextExpectedSeq = seq;
m_initialized = true;
}
// zero-latency fast path: in-order packet
if (seq == m_nextExpectedSeq) {
// create frame and add to ready list
BufferedFrame* frame = new BufferedFrame(seq, data, length);
readyFrames.push_back(frame);
// advance expected sequence
m_nextExpectedSeq = (m_nextExpectedSeq + 1) & 0xFFFF;
// flush any subsequent sequential frames from buffer
flushSequentialFrames(readyFrames);
return true;
}
int32_t diff = seqDiff(seq, m_nextExpectedSeq);
// frame is in the past (duplicate or very late)
if (diff < 0) {
// check if it's severely out of order (> 1000 packets behind)
if (diff < -1000) {
// ;ikely a sequence wraparound with new stream - reset
m_nextExpectedSeq = seq;
m_buffer.clear();
BufferedFrame* frame = new BufferedFrame(seq, data, length);
readyFrames.push_back(frame);
m_nextExpectedSeq = (m_nextExpectedSeq + 1) & 0xFFFF;
return true;
}
// drop duplicate/late frame
m_droppedFrames++;
return false;
}
// frame is in the future - buffer it
m_reorderedFrames++;
// check buffer capacity
if (m_buffer.size() >= m_maxBufferSize) {
// buffer is full - drop oldest frame to make room
auto oldestIt = m_buffer.begin();
delete oldestIt->second;
m_buffer.erase(oldestIt);
m_droppedFrames++;
}
// add frame to buffer
BufferedFrame* frame = new BufferedFrame(seq, data, length);
m_buffer[seq] = frame;
// check if we now have the next expected frame
flushSequentialFrames(readyFrames);
return true;
}
/* Checks for timed-out buffered frames and forces their delivery. */
void AdaptiveJitterBuffer::checkTimeouts(std::vector<BufferedFrame*>& timedOutFrames,
uint64_t currentTime)
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_buffer.empty()) {
return;
}
// get current time if not provided
if (currentTime == 0ULL) {
currentTime = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
}
// find frames that have exceeded the wait time
std::vector<uint16_t> toRemove;
for (auto& pair : m_buffer) {
BufferedFrame* frame = pair.second;
if (frame != nullptr) {
uint64_t age = currentTime - frame->timestamp;
if (age >= m_maxWaitTime) {
toRemove.push_back(pair.first);
}
}
}
// remove and deliver timed-out frames in sequence order
if (!toRemove.empty()) {
// sort by sequence number
std::sort(toRemove.begin(), toRemove.end(), [this](uint16_t a, uint16_t b) {
return seqDiff(a, b) < 0;
});
for (uint16_t seq : toRemove) {
auto it = m_buffer.find(seq);
if (it != m_buffer.end() && it->second != nullptr) {
timedOutFrames.push_back(it->second);
m_buffer.erase(it);
m_timedOutFrames++;
// update next expected sequence to skip the gap
int32_t diff = seqDiff(seq, m_nextExpectedSeq);
if (diff >= 0) {
m_nextExpectedSeq = (seq + 1) & 0xFFFF;
// try to flush any sequential frames after this one
flushSequentialFrames(timedOutFrames);
}
}
}
}
}
/* Resets the jitter buffer state. */
void AdaptiveJitterBuffer::reset(bool clearStats)
{
std::lock_guard<std::mutex> lock(m_mutex);
// clean up buffered frames
for (auto& pair : m_buffer) {
if (pair.second != nullptr) {
delete pair.second;
}
}
m_buffer.clear();
m_initialized = false;
m_nextExpectedSeq = 0U;
if (clearStats) {
m_totalFrames = 0ULL;
m_reorderedFrames = 0ULL;
m_droppedFrames = 0ULL;
m_timedOutFrames = 0ULL;
}
}
/* Gets statistics about jitter buffer performance. */
void AdaptiveJitterBuffer::getStatistics(uint64_t& totalFrames, uint64_t& reorderedFrames,
uint64_t& droppedFrames, uint64_t& timedOutFrames) const
{
std::lock_guard<std::mutex> lock(m_mutex);
totalFrames = m_totalFrames;
reorderedFrames = m_reorderedFrames;
droppedFrames = m_droppedFrames;
timedOutFrames = m_timedOutFrames;
}
// ---------------------------------------------------------------------------
// Private Class Members
// ---------------------------------------------------------------------------
/* Delivers all sequential frames from the buffer. */
void AdaptiveJitterBuffer::flushSequentialFrames(std::vector<BufferedFrame*>& readyFrames)
{
while (!m_buffer.empty()) {
auto it = m_buffer.find(m_nextExpectedSeq);
if (it == m_buffer.end()) {
// gap in sequence - stop flushing
break;
}
// found next sequential frame
BufferedFrame* frame = it->second;
readyFrames.push_back(frame);
m_buffer.erase(it);
// advance to next expected sequence
m_nextExpectedSeq = (m_nextExpectedSeq + 1) & 0xFFFF;
}
}
/* Calculates sequence number difference handling wraparound. */
int32_t AdaptiveJitterBuffer::seqDiff(uint16_t seq1, uint16_t seq2) const
{
// handle RTP sequence number wraparound (RFC 3550)
int32_t diff = (int32_t)seq1 - (int32_t)seq2;
// adjust for wraparound
if (diff > (int32_t)(RTP_SEQ_MOD / 2)) {
diff -= (int32_t)RTP_SEQ_MOD;
} else if (diff < -(int32_t)(RTP_SEQ_MOD / 2)) {
diff += (int32_t)RTP_SEQ_MOD;
}
return diff;
}

@ -0,0 +1,207 @@
// SPDX-License-Identifier: GPL-2.0-only
/*
* Digital Voice Modem - Common Library
* GPLv2 Open Source. Use is subject to license terms.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* Copyright (C) 2025 Bryan Biedenkapp, N2PLL
*
*/
/**
* @file AdaptiveJitterBuffer.h
* @ingroup network_core
* @file AdaptiveJitterBuffer.cpp
* @ingroup network_core
*/
#if !defined(__ADAPTIVE_JITTER_BUFFER_H__)
#define __ADAPTIVE_JITTER_BUFFER_H__
#include "common/Defines.h"
#include <cstdint>
#include <cstring>
#include <map>
#include <vector>
#include <mutex>
#include <chrono>
namespace network
{
// ---------------------------------------------------------------------------
// Structure Declaration
// ---------------------------------------------------------------------------
/**
* @brief Represents a buffered frame in the jitter buffer.
* @ingroup network_core
*/
struct BufferedFrame {
uint16_t seq; //<! RTP sequence number
uint8_t* data; //<! Frame data
uint32_t length; //<! Frame length
uint64_t timestamp; //<! Reception timestamp (microseconds)
/**
* @brief Initializes a new instance of the BufferedFrame struct.
*/
BufferedFrame() : seq(0U), data(nullptr), length(0U), timestamp(0ULL) { /* stub */ }
/**
* @brief Initializes a new instance of the BufferedFrame struct.
* @param sequence RTP sequence number.
* @param buffer Frame data buffer.
* @param len Frame length.
*/
BufferedFrame(uint16_t sequence, const uint8_t* buffer, uint32_t len) :
seq(sequence),
data(nullptr),
length(len),
timestamp(std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count())
{
if (len > 0U && buffer != nullptr) {
data = new uint8_t[len];
::memcpy(data, buffer, len);
}
}
/**
* @brief Finalizes a instance of the BufferedFrame struct.
*/
~BufferedFrame()
{
if (data != nullptr) {
delete[] data;
data = nullptr;
}
}
};
// ---------------------------------------------------------------------------
// Class Declaration
// ---------------------------------------------------------------------------
/**
* @brief Implements an adaptive jitter buffer for RTP streams.
* @ingroup network_core
*
* This class provides minimal-latency jitter buffering with a zero-latency
* fast path for in-order packets. Out-of-order packets are buffered briefly
* to allow reordering, with adaptive timeout based on observed jitter.
*/
class HOST_SW_API AdaptiveJitterBuffer {
public:
/**
* @brief Initializes a new instance of the AdaptiveJitterBuffer class.
* @param maxBufferSize Maximum number of frames to buffer (default: 4).
* @param maxWaitTime Maximum time to wait for out-of-order frames in microseconds (default: 40000 = 40ms).
*/
AdaptiveJitterBuffer(uint16_t maxBufferSize = 4U, uint32_t maxWaitTime = 40000U);
/**
* @brief Finalizes a instance of the AdaptiveJitterBuffer class.
*/
~AdaptiveJitterBuffer();
/**
* @brief Processes an incoming RTP frame.
* @param seq RTP sequence number.
* @param data Frame data.
* @param length Frame length.
* @param[out] readyFrames Vector of frames ready for delivery (in sequence order).
* @returns bool True if frame was processed successfully, otherwise false.
*
* This method implements a zero-latency fast path for in-order packets.
* Out-of-order packets are buffered and returned when they become sequential.
*/
bool processFrame(uint16_t seq, const uint8_t* data, uint32_t length,
std::vector<BufferedFrame*>& readyFrames);
/**
* @brief Checks for timed-out buffered frames and forces their delivery.
* @param[out] timedOutFrames Vector of frames that have exceeded the wait time.
* @param currentTime Current time in microseconds (0 = use system clock).
*
* This should be called periodically (e.g., every 10-20ms) to ensure
* buffered frames are delivered even if missing packets never arrive.
*/
void checkTimeouts(std::vector<BufferedFrame*>& timedOutFrames,
uint64_t currentTime = 0ULL);
/**
* @brief Resets the jitter buffer state.
* @param clearStats If true, also resets statistics (default: false).
*
* This should be called when a stream ends or restarts.
*/
void reset(bool clearStats = false);
/**
* @brief Gets the current buffer occupancy.
* @returns size_t Number of frames currently buffered.
*/
size_t getBufferSize() const { return m_buffer.size(); }
/**
* @brief Gets the next expected sequence number.
* @returns uint16_t Next expected sequence number.
*/
uint16_t getNextExpectedSeq() const { return m_nextExpectedSeq; }
/**
* @brief Gets statistics about jitter buffer performance.
* @param[out] totalFrames Total frames processed.
* @param[out] reorderedFrames Frames that were out-of-order but successfully reordered.
* @param[out] droppedFrames Frames dropped due to buffer overflow or severe reordering.
* @param[out] timedOutFrames Frames delivered due to timeout (missing packets).
*/
void getStatistics(uint64_t& totalFrames, uint64_t& reorderedFrames,
uint64_t& droppedFrames, uint64_t& timedOutFrames) const;
/**
* @brief Sets the maximum buffer size.
* @param maxBufferSize Maximum number of frames to buffer.
*/
void setMaxBufferSize(uint16_t maxBufferSize) { m_maxBufferSize = maxBufferSize; }
/**
* @brief Sets the maximum wait time for out-of-order frames.
* @param maxWaitTime Maximum wait time in microseconds.
*/
void setMaxWaitTime(uint32_t maxWaitTime) { m_maxWaitTime = maxWaitTime; }
private:
std::map<uint16_t, BufferedFrame*> m_buffer;
mutable std::mutex m_mutex;
uint16_t m_nextExpectedSeq;
uint16_t m_maxBufferSize;
uint32_t m_maxWaitTime;
uint64_t m_totalFrames;
uint64_t m_reorderedFrames;
uint64_t m_droppedFrames;
uint64_t m_timedOutFrames;
bool m_initialized;
/**
* @brief Delivers all sequential frames from the buffer.
* @param[out] readyFrames Vector to append ready frames to.
*
* Internal helper that flushes all frames starting from m_nextExpectedSeq
* until a gap is encountered.
*/
void flushSequentialFrames(std::vector<BufferedFrame*>& readyFrames);
/**
* @brief Calculates sequence number difference handling wraparound.
* @param seq1 First sequence number.
* @param seq2 Second sequence number.
* @returns int32_t Signed difference (seq1 - seq2).
*/
int32_t seqDiff(uint16_t seq1, uint16_t seq2) const;
};
} // namespace network
#endif // __ADAPTIVE_JITTER_BUFFER_H__

@ -134,6 +134,10 @@ FNENetwork::FNENetwork(HostFNE* host, const std::string& address, uint16_t port,
m_influxOrg("dvm"),
m_influxBucket("dvm"),
m_influxLogRawData(false),
m_jitterBufferEnabled(false),
m_jitterMaxSize(4U),
m_jitterMaxWait(40000U),
m_peerJitterOverrides(),
m_threadPool(workerCnt, "fne"),
m_disablePacketData(false),
m_dumpPacketData(false),
@ -236,6 +240,50 @@ void FNENetwork::setOptions(yaml::Node& conf, bool printOptions)
m_parrotOnlyOriginating = conf["parrotOnlyToOrginiatingPeer"].as<bool>(false);
// jitter buffer configuration
yaml::Node jitterConf = conf["jitterBuffer"];
m_jitterBufferEnabled = jitterConf["enabled"].as<bool>(false);
m_jitterMaxSize = (uint16_t)jitterConf["defaultMaxSize"].as<uint32_t>(4U);
m_jitterMaxWait = jitterConf["defaultMaxWait"].as<uint32_t>(40000U);
// clamp jitter buffer parameters
if (m_jitterMaxSize < 2U)
m_jitterMaxSize = 2U;
if (m_jitterMaxSize > 8U)
m_jitterMaxSize = 8U;
if (m_jitterMaxWait < 10000U)
m_jitterMaxWait = 10000U;
if (m_jitterMaxWait > 200000U)
m_jitterMaxWait = 200000U;
// parse per-peer jitter buffer overrides
m_peerJitterOverrides.clear();
yaml::Node peerOverrides = jitterConf["peerOverrides"];
if (peerOverrides.size() > 0U) {
for (size_t i = 0; i < peerOverrides.size(); i++) {
yaml::Node peerConf = peerOverrides[i];
uint32_t peerId = peerConf["peerId"].as<uint32_t>(0U);
if (peerId != 0U) {
JitterBufferConfig jbConfig;
jbConfig.enabled = peerConf["enabled"].as<bool>(m_jitterBufferEnabled);
jbConfig.maxSize = (uint16_t)peerConf["maxSize"].as<uint32_t>(m_jitterMaxSize);
jbConfig.maxWait = peerConf["maxWait"].as<uint32_t>(m_jitterMaxWait);
// clamp per-peer parameters
if (jbConfig.maxSize < 2U)
jbConfig.maxSize = 2U;
if (jbConfig.maxSize > 8U)
jbConfig.maxSize = 8U;
if (jbConfig.maxWait < 10000U)
jbConfig.maxWait = 10000U;
if (jbConfig.maxWait > 200000U)
jbConfig.maxWait = 200000U;
m_peerJitterOverrides[peerId] = jbConfig;
}
}
}
#if defined(ENABLE_SSL)
m_kmfServicesEnabled = conf["kmfServicesEnabled"].as<bool>(false);
uint16_t kmfOtarPort = conf["kmfOtarPort"].as<uint16_t>(64414U);
@ -351,6 +399,14 @@ void FNENetwork::setOptions(yaml::Node& conf, bool printOptions)
LogInfo(" InfluxDB Bucket: %s", m_influxBucket.c_str());
LogInfo(" InfluxDB Log Raw TSBK/CSBK/RCCH: %s", m_influxLogRawData ? "yes" : "no");
}
LogInfo(" Jitter Buffer Enabled: %s", m_jitterBufferEnabled ? "yes" : "no");
if (m_jitterBufferEnabled) {
LogInfo(" Jitter Buffer Default Max Size: %u frames", m_jitterMaxSize);
LogInfo(" Jitter Buffer Default Max Wait: %u microseconds", m_jitterMaxWait);
if (!m_peerJitterOverrides.empty()) {
LogInfo(" Jitter Buffer Peer Overrides: %zu peer(s) configured", m_peerJitterOverrides.size());
}
}
LogInfo(" Parrot Repeat to Only Originating Peer: %s", m_parrotOnlyOriginating ? "yes" : "no");
LogInfo(" P25 OTAR KMF Services Enabled: %s", m_kmfServicesEnabled ? "yes" : "no");
LogInfo(" P25 OTAR KMF Listening Address: %s", m_address.c_str());
@ -375,6 +431,33 @@ void FNENetwork::setLookups(lookups::RadioIdLookup* ridLookup, lookups::Talkgrou
m_adjSiteMapLookup = adjSiteMapLookup;
}
/* Applies jitter buffer configuration to a peer connection. */
void FNENetwork::applyJitterBufferConfig(uint32_t peerId, FNEPeerConnection* connection)
{
if (connection == nullptr) {
return;
}
// check if there's a per-peer override
auto it = m_peerJitterOverrides.find(peerId);
if (it != m_peerJitterOverrides.end()) {
const JitterBufferConfig& config = it->second;
connection->setJitterBufferParams(config.enabled, config.maxSize, config.maxWait);
if (m_verbose) {
LogInfoEx(LOG_MASTER, "PEER %u jitter buffer configured (override): enabled=%s, maxSize=%u, maxWait=%u",
peerId, config.enabled ? "yes" : "no", config.maxSize, config.maxWait);
}
} else {
// use default settings
connection->setJitterBufferParams(m_jitterBufferEnabled, m_jitterMaxSize, m_jitterMaxWait);
if (m_verbose && m_jitterBufferEnabled) {
LogInfoEx(LOG_MASTER, "PEER %u jitter buffer configured (default): enabled=%s, maxSize=%u, maxWait=%u",
peerId, m_jitterBufferEnabled ? "yes" : "no", m_jitterMaxSize, m_jitterMaxWait);
}
}
}
/* Sets endpoint preshared encryption key. */
void FNENetwork::setPresharedKey(const uint8_t* presharedKey)
@ -496,6 +579,16 @@ void FNENetwork::clock(uint32_t ms)
uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
// check jitter buffer timeouts for all peers
m_peers.shared_lock();
for (auto& peer : m_peers) {
FNEPeerConnection* connection = peer.second;
if (connection != nullptr && connection->jitterBufferEnabled()) {
connection->checkJitterTimeouts();
}
}
m_peers.unlock();
if (m_forceListUpdate) {
for (auto peer : m_peers) {
peerMetadataUpdate(peer.first);
@ -932,7 +1025,22 @@ void FNENetwork::taskNetworkRx(NetPacketRequest* req)
if (connection->connected() && connection->address() == ip) {
if (network->m_dmrEnabled) {
if (network->m_tagDMR != nullptr) {
network->m_tagDMR->processFrame(req->buffer, req->length, peerId, ssrc, req->rtpHeader.getSequence(), streamId);
// check if jitter buffer is enabled for this peer
if (connection->jitterBufferEnabled()) {
AdaptiveJitterBuffer* buffer = connection->getOrCreateJitterBuffer(streamId);
std::vector<BufferedFrame*> readyFrames;
buffer->processFrame(req->rtpHeader.getSequence(), req->buffer, req->length, readyFrames);
// process all frames that are now ready (in sequence order)
for (BufferedFrame* frame : readyFrames) {
network->m_tagDMR->processFrame(frame->data, frame->length, peerId, ssrc, frame->seq, streamId);
delete frame;
}
} else {
// zero-latency fast path: no jitter buffer
network->m_tagDMR->processFrame(req->buffer, req->length, peerId, ssrc, req->rtpHeader.getSequence(), streamId);
}
}
} else {
network->writePeerNAK(peerId, streamId, TAG_DMR_DATA, NET_CONN_NAK_MODE_NOT_ENABLED);
@ -958,7 +1066,22 @@ void FNENetwork::taskNetworkRx(NetPacketRequest* req)
if (connection->connected() && connection->address() == ip) {
if (network->m_p25Enabled) {
if (network->m_tagP25 != nullptr) {
network->m_tagP25->processFrame(req->buffer, req->length, peerId, ssrc, req->rtpHeader.getSequence(), streamId);
// check if jitter buffer is enabled for this peer
if (connection->jitterBufferEnabled()) {
AdaptiveJitterBuffer* buffer = connection->getOrCreateJitterBuffer(streamId);
std::vector<BufferedFrame*> readyFrames;
buffer->processFrame(req->rtpHeader.getSequence(), req->buffer, req->length, readyFrames);
// process all frames that are now ready (in sequence order)
for (BufferedFrame* frame : readyFrames) {
network->m_tagP25->processFrame(frame->data, frame->length, peerId, ssrc, frame->seq, streamId);
delete frame;
}
} else {
// zero-latency fast path: no jitter buffer
network->m_tagP25->processFrame(req->buffer, req->length, peerId, ssrc, req->rtpHeader.getSequence(), streamId);
}
}
} else {
network->writePeerNAK(peerId, streamId, TAG_P25_DATA, NET_CONN_NAK_MODE_NOT_ENABLED);
@ -984,7 +1107,22 @@ void FNENetwork::taskNetworkRx(NetPacketRequest* req)
if (connection->connected() && connection->address() == ip) {
if (network->m_nxdnEnabled) {
if (network->m_tagNXDN != nullptr) {
network->m_tagNXDN->processFrame(req->buffer, req->length, peerId, ssrc, req->rtpHeader.getSequence(), streamId);
// check if jitter buffer is enabled for this peer
if (connection->jitterBufferEnabled()) {
AdaptiveJitterBuffer* buffer = connection->getOrCreateJitterBuffer(streamId);
std::vector<BufferedFrame*> readyFrames;
buffer->processFrame(req->rtpHeader.getSequence(), req->buffer, req->length, readyFrames);
// process all frames that are now ready (in sequence order)
for (BufferedFrame* frame : readyFrames) {
network->m_tagNXDN->processFrame(frame->data, frame->length, peerId, ssrc, frame->seq, streamId);
delete frame;
}
} else {
// zero-latency fast path: no jitter buffer
network->m_tagNXDN->processFrame(req->buffer, req->length, peerId, ssrc, req->rtpHeader.getSequence(), streamId);
}
}
} else {
network->writePeerNAK(peerId, streamId, TAG_NXDN_DATA, NET_CONN_NAK_MODE_NOT_ENABLED);
@ -1010,7 +1148,22 @@ void FNENetwork::taskNetworkRx(NetPacketRequest* req)
if (connection->connected() && connection->address() == ip) {
if (network->m_analogEnabled) {
if (network->m_tagAnalog != nullptr) {
network->m_tagAnalog->processFrame(req->buffer, req->length, peerId, ssrc, req->rtpHeader.getSequence(), streamId);
// check if jitter buffer is enabled for this peer
if (connection->jitterBufferEnabled()) {
AdaptiveJitterBuffer* buffer = connection->getOrCreateJitterBuffer(streamId);
std::vector<BufferedFrame*> readyFrames;
buffer->processFrame(req->rtpHeader.getSequence(), req->buffer, req->length, readyFrames);
// process all frames that are now ready (in sequence order)
for (BufferedFrame* frame : readyFrames) {
network->m_tagAnalog->processFrame(frame->data, frame->length, peerId, ssrc, frame->seq, streamId);
delete frame;
}
} else {
// zero-latency fast path: no jitter buffer
network->m_tagAnalog->processFrame(req->buffer, req->length, peerId, ssrc, req->rtpHeader.getSequence(), streamId);
}
}
} else {
network->writePeerNAK(peerId, streamId, TAG_ANALOG_DATA, NET_CONN_NAK_MODE_NOT_ENABLED);
@ -1049,6 +1202,7 @@ void FNENetwork::taskNetworkRx(NetPacketRequest* req)
FNEPeerConnection* connection = new FNEPeerConnection(peerId, req->address, req->addrLen);
connection->lastPing(now);
network->applyJitterBufferConfig(peerId, connection);
network->setupRepeaterLogin(peerId, streamId, connection);
// check if the peer is in the peer ACL list
@ -1079,6 +1233,7 @@ void FNENetwork::taskNetworkRx(NetPacketRequest* req)
connection = new FNEPeerConnection(peerId, req->address, req->addrLen);
connection->lastPing(now);
network->applyJitterBufferConfig(peerId, connection);
network->erasePeerAffiliations(peerId);
network->setupRepeaterLogin(peerId, streamId, connection);

@ -393,6 +393,20 @@ namespace network
bool m_influxLogRawData;
influxdb::ServerInfo m_influxServer;
/**
* @brief Structure containing jitter buffer configuration for a peer.
*/
struct JitterBufferConfig {
bool enabled; //!< Jitter buffer enabled flag
uint16_t maxSize; //!< Maximum buffer size in frames
uint32_t maxWait; //!< Maximum wait time in microseconds
};
bool m_jitterBufferEnabled;
uint16_t m_jitterMaxSize;
uint32_t m_jitterMaxWait;
std::unordered_map<uint32_t, JitterBufferConfig> m_peerJitterOverrides;
ThreadPool m_threadPool;
bool m_disablePacketData;
@ -432,6 +446,13 @@ namespace network
*/
void logSpanningTree(FNEPeerConnection* connection = nullptr);
/**
* @brief Applies jitter buffer configuration to a peer connection.
* @param peerId Peer ID.
* @param connection Instance of the FNEPeerConnection class.
*/
void applyJitterBufferConfig(uint32_t peerId, FNEPeerConnection* connection);
/**
* @brief Erases a stream ID from the given peer ID connection.
* @param peerId Peer ID.

@ -0,0 +1,70 @@
// SPDX-License-Identifier: GPL-2.0-only
/*
* Digital Voice Modem - Converged FNE Software
* GPLv2 Open Source. Use is subject to license terms.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* Copyright (C) 2025 Bryan Biedenkapp, N2PLL
*
*/
#include "common/Log.h"
#include "network/FNEPeerConnection.h"
using namespace network;
// ---------------------------------------------------------------------------
// Public Class Members
// ---------------------------------------------------------------------------
/* Gets or creates a jitter buffer for the specified stream. */
AdaptiveJitterBuffer* FNEPeerConnection::getOrCreateJitterBuffer(uint64_t streamId)
{
std::lock_guard<std::mutex> lock(m_jitterMutex);
if (m_jitterBuffers.find(streamId) == m_jitterBuffers.end()) {
m_jitterBuffers[streamId] = new AdaptiveJitterBuffer(m_jitterMaxSize, m_jitterMaxWait);
}
return m_jitterBuffers[streamId];
}
/* Cleans up jitter buffer for the specified stream. */
void FNEPeerConnection::cleanupJitterBuffer(uint64_t streamId)
{
std::lock_guard<std::mutex> lock(m_jitterMutex);
auto it = m_jitterBuffers.find(streamId);
if (it != m_jitterBuffers.end()) {
delete it->second;
m_jitterBuffers.erase(it);
}
}
/* Checks for timed-out buffered frames across all streams. */
void FNEPeerConnection::checkJitterTimeouts()
{
if (!m_jitterBufferEnabled) {
return;
}
std::lock_guard<std::mutex> lock(m_jitterMutex);
// check timeouts for all active jitter buffers
for (auto& pair : m_jitterBuffers) {
AdaptiveJitterBuffer* buffer = pair.second;
if (buffer != nullptr) {
std::vector<BufferedFrame*> timedOutFrames;
buffer->checkTimeouts(timedOutFrames);
// note: timed-out frames are handled by the calling context
// this method just ensures the buffers are checked periodically
// the frames themselves are cleaned up by the caller
for (BufferedFrame* frame : timedOutFrames) {
delete frame;
}
}
}
}

@ -10,14 +10,18 @@
/**
* @file FNEPeerConnection.h
* @ingroup fne_network
* @file FNEPeerConnection.cpp
* @ingroup fne_network
*/
#if !defined(__FNE_PEER_CONNECTION_H__)
#define __FNE_PEER_CONNECTION_H__
#include "fne/Defines.h"
#include "common/network/BaseNetwork.h"
#include "common/network/AdaptiveJitterBuffer.h"
#include <string>
#include <map>
#include <shared_mutex>
namespace network
@ -60,7 +64,12 @@ namespace network
m_isConventionalPeer(false),
m_isSysView(false),
m_config(),
m_peerLockMtx()
m_peerLockMtx(),
m_jitterBuffers(),
m_jitterMutex(),
m_jitterBufferEnabled(false),
m_jitterMaxSize(4U),
m_jitterMaxWait(40000U)
{
/* stub */
}
@ -91,7 +100,12 @@ namespace network
m_isConventionalPeer(false),
m_isSysView(false),
m_config(),
m_peerLockMtx()
m_peerLockMtx(),
m_jitterBuffers(),
m_jitterMutex(),
m_jitterBufferEnabled(false),
m_jitterMaxSize(4U),
m_jitterMaxWait(40000U)
{
assert(id > 0U);
assert(sockStorageLen > 0U);
@ -124,6 +138,43 @@ namespace network
*/
inline void unlock() const { m_peerLockMtx.unlock(); }
/**
* @brief Gets or creates a jitter buffer for the specified stream.
* @param streamId Stream ID.
* @returns AdaptiveJitterBuffer* Jitter buffer instance.
*/
AdaptiveJitterBuffer* getOrCreateJitterBuffer(uint64_t streamId);
/**
* @brief Cleans up jitter buffer for the specified stream.
* @param streamId Stream ID.
*/
void cleanupJitterBuffer(uint64_t streamId);
/**
* @brief Checks for timed-out buffered frames across all streams.
*/
void checkJitterTimeouts();
/**
* @brief Gets jitter buffer enabled state.
* @returns bool True if jitter buffer is enabled.
*/
bool jitterBufferEnabled() const { return m_jitterBufferEnabled; }
/**
* @brief Sets jitter buffer parameters.
* @param enabled Enable/disable jitter buffer.
* @param maxSize Maximum buffer size in frames.
* @param maxWait Maximum wait time in microseconds.
*/
void setJitterBufferParams(bool enabled, uint16_t maxSize = 4U, uint32_t maxWait = 40000U)
{
m_jitterBufferEnabled = enabled;
m_jitterMaxSize = maxSize;
m_jitterMaxWait = maxWait;
}
public:
/**
* @brief Peer ID.
@ -218,6 +269,13 @@ namespace network
private:
mutable std::mutex m_peerLockMtx;
std::map<uint64_t, AdaptiveJitterBuffer*> m_jitterBuffers;
mutable std::mutex m_jitterMutex;
bool m_jitterBufferEnabled;
uint16_t m_jitterMaxSize;
uint32_t m_jitterMaxWait;
};
} // namespace network

Loading…
Cancel
Save

Powered by TurnKey Linux.