From 7a90f953fd5833d01f4427974b6d74d15518db53 Mon Sep 17 00:00:00 2001 From: Bryan Biedenkapp Date: Tue, 5 Dec 2023 13:15:13 -0500 Subject: [PATCH] add proper mutex locking to prevent thread clobbering during clocking (i.e. attempts to make modem and protocol clocking thread safe); split frame read nad write operations into their own threads; --- src/host/Host.cpp | 388 ++++++++++++++++++++++++++++------------------ 1 file changed, 233 insertions(+), 155 deletions(-) diff --git a/src/host/Host.cpp b/src/host/Host.cpp index d69545c4..a0d4e0ea 100644 --- a/src/host/Host.cpp +++ b/src/host/Host.cpp @@ -55,6 +55,7 @@ using namespace lookups; #include #include #include +#include #include #include @@ -755,6 +756,7 @@ int Host::run() } bool hasTxShutdown = false; + std::mutex clockingMutex; // Macro to start DMR duplex idle transmission (or beacon) #define START_DMR_DUPLEX_IDLE(x) \ @@ -767,94 +769,117 @@ int Host::run() // setup protocol processor threads /** Digital Mobile Radio */ - ThreadFunc dmrProcessThread([&, this]() { + ThreadFunc dmrFrameReadThread([&, this]() { #if defined(ENABLE_DMR) if (dmr != nullptr) { - LogDebug(LOG_HOST, "DMR, started frame processor"); + LogDebug(LOG_HOST, "DMR, started frame processor (modem read)"); while (!g_killed) { - // ------------------------------------------------------ - // -- Write to Modem Processing -- - // ------------------------------------------------------ - - // write DMR slot 1 frames to modem - writeFramesDMR1(dmr.get(), [&, this]() { - // if there is a P25 CC running; halt the CC - if (p25 != nullptr) { - if (p25->getCCRunning() && !p25->getCCHalted()) { - this->interruptP25Control(p25.get()); - } - } + clockingMutex.lock(); + { + // ------------------------------------------------------ + // -- Read from Modem Processing -- + // ------------------------------------------------------ - // if there is a NXDN CC running; halt the CC - if (nxdn != nullptr) { - if (nxdn->getCCRunning() && !nxdn->getCCHalted()) { - this->interruptNXDNControl(nxdn.get()); + // read DMR slot 1 frames from modem + readFramesDMR1(dmr.get(), [&, this]() { + if (dmr != nullptr) { + this->interruptDMRBeacon(dmr.get()); } - } - }); - - // write DMR slot 2 frames to modem - writeFramesDMR2(dmr.get(), [&, this]() { - // if there is a P25 CC running; halt the CC - if (p25 != nullptr) { - if (p25->getCCRunning() && !p25->getCCHalted()) { - this->interruptP25Control(p25.get()); - } - } - // if there is a NXDN CC running; halt the CC - if (nxdn != nullptr) { - if (nxdn->getCCRunning() && !nxdn->getCCHalted()) { - this->interruptNXDNControl(nxdn.get()); + // if there is a P25 CC running; halt the CC + if (p25 != nullptr) { + if (p25->getCCRunning() && !p25->getCCHalted()) { + this->interruptP25Control(p25.get()); + } } - } - }); - // ------------------------------------------------------ - // -- Read from Modem Processing -- - // ------------------------------------------------------ + // if there is a NXDN CC running; halt the CC + if (nxdn != nullptr) { + if (nxdn->getCCRunning() && !nxdn->getCCHalted()) { + this->interruptNXDNControl(nxdn.get()); + } + } + }); - // read DMR slot 1 frames from modem - readFramesDMR1(dmr.get(), [&, this]() { - if (dmr != nullptr) { - this->interruptDMRBeacon(dmr.get()); - } + // read DMR slot 2 frames from modem + readFramesDMR2(dmr.get(), [&, this]() { + if (dmr != nullptr) { + this->interruptDMRBeacon(dmr.get()); + } - // if there is a P25 CC running; halt the CC - if (p25 != nullptr) { - if (p25->getCCRunning() && !p25->getCCHalted()) { - this->interruptP25Control(p25.get()); + // if there is a P25 CC running; halt the CC + if (p25 != nullptr) { + if (p25->getCCRunning() && !p25->getCCHalted()) { + this->interruptP25Control(p25.get()); + } } - } - // if there is a NXDN CC running; halt the CC - if (nxdn != nullptr) { - if (nxdn->getCCRunning() && !nxdn->getCCHalted()) { - this->interruptNXDNControl(nxdn.get()); + // if there is a NXDN CC running; halt the CC + if (nxdn != nullptr) { + if (nxdn->getCCRunning() && !nxdn->getCCHalted()) { + this->interruptNXDNControl(nxdn.get()); + } } - } - }); + }); + } + clockingMutex.unlock(); - // read DMR slot 2 frames from modem - readFramesDMR2(dmr.get(), [&, this]() { - if (dmr != nullptr) { - this->interruptDMRBeacon(dmr.get()); - } + if (m_state != STATE_IDLE) + Thread::sleep(m_activeTickDelay); + if (m_state == STATE_IDLE) + Thread::sleep(m_idleTickDelay); + } + } +#endif // defined(ENABLE_DMR) + }); + dmrFrameReadThread.run(); + + ThreadFunc dmrFrameWriteThread([&, this]() { +#if defined(ENABLE_DMR) + if (dmr != nullptr) { + LogDebug(LOG_HOST, "DMR, started frame processor (modem write)"); + while (!g_killed) { + clockingMutex.lock(); + { + // ------------------------------------------------------ + // -- Write to Modem Processing -- + // ------------------------------------------------------ + + // write DMR slot 1 frames to modem + writeFramesDMR1(dmr.get(), [&, this]() { + // if there is a P25 CC running; halt the CC + if (p25 != nullptr) { + if (p25->getCCRunning() && !p25->getCCHalted()) { + this->interruptP25Control(p25.get()); + } + } - // if there is a P25 CC running; halt the CC - if (p25 != nullptr) { - if (p25->getCCRunning() && !p25->getCCHalted()) { - this->interruptP25Control(p25.get()); + // if there is a NXDN CC running; halt the CC + if (nxdn != nullptr) { + if (nxdn->getCCRunning() && !nxdn->getCCHalted()) { + this->interruptNXDNControl(nxdn.get()); + } + } + }); + + // write DMR slot 2 frames to modem + writeFramesDMR2(dmr.get(), [&, this]() { + // if there is a P25 CC running; halt the CC + if (p25 != nullptr) { + if (p25->getCCRunning() && !p25->getCCHalted()) { + this->interruptP25Control(p25.get()); + } } - } - // if there is a NXDN CC running; halt the CC - if (nxdn != nullptr) { - if (nxdn->getCCRunning() && !nxdn->getCCHalted()) { - this->interruptNXDNControl(nxdn.get()); + // if there is a NXDN CC running; halt the CC + if (nxdn != nullptr) { + if (nxdn->getCCRunning() && !nxdn->getCCHalted()) { + this->interruptNXDNControl(nxdn.get()); + } } - } - }); + }); + } + clockingMutex.unlock(); if (m_state != STATE_IDLE) Thread::sleep(m_activeTickDelay); @@ -864,49 +889,72 @@ int Host::run() } #endif // defined(ENABLE_DMR) }); - dmrProcessThread.run(); + dmrFrameWriteThread.run(); /** Project 25 */ - ThreadFunc p25ProcessThread([&, this]() { + ThreadFunc p25FrameReadThread([&, this]() { #if defined(ENABLE_P25) if (p25 != nullptr) { - LogDebug(LOG_HOST, "P25, started frame processor"); + LogDebug(LOG_HOST, "P25, started frame processor (modem read)"); while (!g_killed) { - // ------------------------------------------------------ - // -- Write to Modem Processing -- - // ------------------------------------------------------ - - // write P25 frames to modem - writeFramesP25(p25.get(), [&, this]() { - if (dmr != nullptr) { - this->interruptDMRBeacon(dmr.get()); - } + clockingMutex.lock(); + { + // ------------------------------------------------------ + // -- Read from Modem Processing -- + // ------------------------------------------------------ - // if there is a NXDN CC running; halt the CC - if (nxdn != nullptr) { - if (nxdn->getCCRunning() && !nxdn->getCCHalted()) { - this->interruptNXDNControl(nxdn.get()); + // read P25 frames from modem + readFramesP25(p25.get(), [&, this]() { + if (dmr != nullptr) { + this->interruptDMRBeacon(dmr.get()); } - } - }); - // ------------------------------------------------------ - // -- Read from Modem Processing -- - // ------------------------------------------------------ + // if there is a NXDN CC running; halt the CC + if (nxdn != nullptr) { + if (nxdn->getCCRunning() && !nxdn->getCCHalted()) { + this->interruptNXDNControl(nxdn.get()); + } + } + }); + } + clockingMutex.unlock(); - // read P25 frames from modem - readFramesP25(p25.get(), [&, this]() { - if (dmr != nullptr) { - this->interruptDMRBeacon(dmr.get()); - } + if (m_state != STATE_IDLE) + Thread::sleep(m_activeTickDelay); + if (m_state == STATE_IDLE) + Thread::sleep(m_idleTickDelay); + } + } +#endif // defined(ENABLE_P25) + }); + p25FrameReadThread.run(); - // if there is a NXDN CC running; halt the CC - if (nxdn != nullptr) { - if (nxdn->getCCRunning() && !nxdn->getCCHalted()) { - this->interruptNXDNControl(nxdn.get()); + ThreadFunc p25FrameWriteThread([&, this]() { +#if defined(ENABLE_P25) + if (p25 != nullptr) { + LogDebug(LOG_HOST, "P25, started frame processor (modem write)"); + while (!g_killed) { + clockingMutex.lock(); + { + // ------------------------------------------------------ + // -- Write to Modem Processing -- + // ------------------------------------------------------ + + // write P25 frames to modem + writeFramesP25(p25.get(), [&, this]() { + if (dmr != nullptr) { + this->interruptDMRBeacon(dmr.get()); } - } - }); + + // if there is a NXDN CC running; halt the CC + if (nxdn != nullptr) { + if (nxdn->getCCRunning() && !nxdn->getCCHalted()) { + this->interruptNXDNControl(nxdn.get()); + } + } + }); + } + clockingMutex.unlock(); if (m_state != STATE_IDLE) Thread::sleep(m_activeTickDelay); @@ -916,49 +964,72 @@ int Host::run() } #endif // defined(ENABLE_P25) }); - p25ProcessThread.run(); + p25FrameWriteThread.run(); /** Next Generation Digital Narrowband */ - ThreadFunc nxdnProcessThread([&, this]() { + ThreadFunc nxdnFrameReadThread([&, this]() { #if defined(ENABLE_NXDN) if (nxdn != nullptr) { - LogDebug(LOG_HOST, "NXDN, started frame processor"); + LogDebug(LOG_HOST, "NXDN, started frame processor (modem read)"); while (!g_killed) { - // ------------------------------------------------------ - // -- Write to Modem Processing -- - // ------------------------------------------------------ - - // write NXDN frames to modem - writeFramesNXDN(nxdn.get(), [&, this]() { - if (dmr != nullptr) { - this->interruptDMRBeacon(dmr.get()); - } + clockingMutex.lock(); + { + // ------------------------------------------------------ + // -- Read from Modem Processing -- + // ------------------------------------------------------ - // if there is a P25 CC running; halt the CC - if (p25 != nullptr) { - if (p25->getCCRunning() && !p25->getCCHalted()) { - this->interruptP25Control(p25.get()); + // read NXDN frames from modem + readFramesNXDN(nxdn.get(), [&, this]() { + if (dmr != nullptr) { + this->interruptDMRBeacon(dmr.get()); } - } - }); - // ------------------------------------------------------ - // -- Read from Modem Processing -- - // ------------------------------------------------------ + // if there is a P25 CC running; halt the CC + if (p25 != nullptr) { + if (p25->getCCRunning() && !p25->getCCHalted()) { + this->interruptP25Control(p25.get()); + } + } + }); + } + clockingMutex.unlock(); - // read NXDN frames from modem - readFramesNXDN(nxdn.get(), [&, this]() { - if (dmr != nullptr) { - this->interruptDMRBeacon(dmr.get()); - } + if (m_state != STATE_IDLE) + Thread::sleep(m_activeTickDelay); + if (m_state == STATE_IDLE) + Thread::sleep(m_idleTickDelay); + } + } +#endif // defined(ENABLE_NXDN) + }); + nxdnFrameReadThread.run(); + + ThreadFunc nxdnFrameWriteThread([&, this]() { +#if defined(ENABLE_NXDN) + if (nxdn != nullptr) { + LogDebug(LOG_HOST, "NXDN, started frame processor (modem write)"); + while (!g_killed) { + clockingMutex.lock(); + { + // ------------------------------------------------------ + // -- Write to Modem Processing -- + // ------------------------------------------------------ - // if there is a P25 CC running; halt the CC - if (p25 != nullptr) { - if (p25->getCCRunning() && !p25->getCCHalted()) { - this->interruptP25Control(p25.get()); + // write NXDN frames to modem + writeFramesNXDN(nxdn.get(), [&, this]() { + if (dmr != nullptr) { + this->interruptDMRBeacon(dmr.get()); } - } - }); + + // if there is a P25 CC running; halt the CC + if (p25 != nullptr) { + if (p25->getCCRunning() && !p25->getCCHalted()) { + this->interruptP25Control(p25.get()); + } + } + }); + } + clockingMutex.unlock(); if (m_state != STATE_IDLE) Thread::sleep(m_activeTickDelay); @@ -968,7 +1039,7 @@ int Host::run() } #endif // defined(ENABLE_NXDN) }); - nxdnProcessThread.run(); + nxdnFrameWriteThread.run(); // main execution loop while (!killed) { @@ -1007,34 +1078,38 @@ int Host::run() } } - // ------------------------------------------------------ - // -- Modem Clocking -- - // ------------------------------------------------------ + clockingMutex.lock(); + { + // ------------------------------------------------------ + // -- Modem Clocking -- + // ------------------------------------------------------ - ms = stopWatch.elapsed(); - stopWatch.start(); + ms = stopWatch.elapsed(); + stopWatch.start(); - m_modem->clock(ms); + m_modem->clock(ms); - // ------------------------------------------------------ - // -- Network, DMR, and P25 Clocking -- - // ------------------------------------------------------ + // ------------------------------------------------------ + // -- Network, DMR, and P25 Clocking -- + // ------------------------------------------------------ - if (m_network != nullptr) - m_network->clock(ms); + if (m_network != nullptr) + m_network->clock(ms); #if defined(ENABLE_DMR) - if (dmr != nullptr) - dmr->clock(ms); + if (dmr != nullptr) + dmr->clock(ms); #endif // defined(ENABLE_DMR) #if defined(ENABLE_P25) - if (p25 != nullptr) - p25->clock(ms); + if (p25 != nullptr) + p25->clock(ms); #endif // defined(ENABLE_P25) #if defined(ENABLE_NXDN) - if (nxdn != nullptr) - nxdn->clock(ms); + if (nxdn != nullptr) + nxdn->clock(ms); #endif // defined(ENABLE_NXDN) + } + clockingMutex.unlock(); // ------------------------------------------------------ // -- Timer Clocking -- @@ -1292,9 +1367,12 @@ int Host::run() if (g_killed) { // shutdown reader threads - dmrProcessThread.wait(); - p25ProcessThread.wait(); - nxdnProcessThread.wait(); + dmrFrameReadThread.wait(); + dmrFrameWriteThread.wait(); + p25FrameReadThread.wait(); + p25FrameWriteThread.wait(); + nxdnFrameReadThread.wait(); + nxdnFrameWriteThread.wait(); #if defined(ENABLE_DMR) if (dmr != nullptr) {