add proper mutex locking to prevent thread clobbering during clocking (i.e. attempts to make modem and protocol clocking thread safe); split frame read nad write operations into their own threads;

pull/42/head
Bryan Biedenkapp 2 years ago
parent 629c2fe75e
commit 7a90f953fd

@ -55,6 +55,7 @@ using namespace lookups;
#include <algorithm> #include <algorithm>
#include <functional> #include <functional>
#include <vector> #include <vector>
#include <mutex>
#include <sys/types.h> #include <sys/types.h>
#include <unistd.h> #include <unistd.h>
@ -755,6 +756,7 @@ int Host::run()
} }
bool hasTxShutdown = false; bool hasTxShutdown = false;
std::mutex clockingMutex;
// Macro to start DMR duplex idle transmission (or beacon) // Macro to start DMR duplex idle transmission (or beacon)
#define START_DMR_DUPLEX_IDLE(x) \ #define START_DMR_DUPLEX_IDLE(x) \
@ -767,94 +769,117 @@ int Host::run()
// setup protocol processor threads // setup protocol processor threads
/** Digital Mobile Radio */ /** Digital Mobile Radio */
ThreadFunc dmrProcessThread([&, this]() { ThreadFunc dmrFrameReadThread([&, this]() {
#if defined(ENABLE_DMR) #if defined(ENABLE_DMR)
if (dmr != nullptr) { if (dmr != nullptr) {
LogDebug(LOG_HOST, "DMR, started frame processor"); LogDebug(LOG_HOST, "DMR, started frame processor (modem read)");
while (!g_killed) { while (!g_killed) {
// ------------------------------------------------------ clockingMutex.lock();
// -- Write to Modem Processing -- {
// ------------------------------------------------------ // ------------------------------------------------------
// -- Read from Modem Processing --
// write DMR slot 1 frames to modem // ------------------------------------------------------
writeFramesDMR1(dmr.get(), [&, this]() {
// if there is a P25 CC running; halt the CC
if (p25 != nullptr) {
if (p25->getCCRunning() && !p25->getCCHalted()) {
this->interruptP25Control(p25.get());
}
}
// if there is a NXDN CC running; halt the CC // read DMR slot 1 frames from modem
if (nxdn != nullptr) { readFramesDMR1(dmr.get(), [&, this]() {
if (nxdn->getCCRunning() && !nxdn->getCCHalted()) { if (dmr != nullptr) {
this->interruptNXDNControl(nxdn.get()); this->interruptDMRBeacon(dmr.get());
} }
}
});
// write DMR slot 2 frames to modem
writeFramesDMR2(dmr.get(), [&, this]() {
// if there is a P25 CC running; halt the CC
if (p25 != nullptr) {
if (p25->getCCRunning() && !p25->getCCHalted()) {
this->interruptP25Control(p25.get());
}
}
// if there is a NXDN CC running; halt the CC // if there is a P25 CC running; halt the CC
if (nxdn != nullptr) { if (p25 != nullptr) {
if (nxdn->getCCRunning() && !nxdn->getCCHalted()) { if (p25->getCCRunning() && !p25->getCCHalted()) {
this->interruptNXDNControl(nxdn.get()); this->interruptP25Control(p25.get());
}
} }
}
});
// ------------------------------------------------------ // if there is a NXDN CC running; halt the CC
// -- Read from Modem Processing -- if (nxdn != nullptr) {
// ------------------------------------------------------ if (nxdn->getCCRunning() && !nxdn->getCCHalted()) {
this->interruptNXDNControl(nxdn.get());
}
}
});
// read DMR slot 1 frames from modem // read DMR slot 2 frames from modem
readFramesDMR1(dmr.get(), [&, this]() { readFramesDMR2(dmr.get(), [&, this]() {
if (dmr != nullptr) { if (dmr != nullptr) {
this->interruptDMRBeacon(dmr.get()); this->interruptDMRBeacon(dmr.get());
} }
// if there is a P25 CC running; halt the CC // if there is a P25 CC running; halt the CC
if (p25 != nullptr) { if (p25 != nullptr) {
if (p25->getCCRunning() && !p25->getCCHalted()) { if (p25->getCCRunning() && !p25->getCCHalted()) {
this->interruptP25Control(p25.get()); this->interruptP25Control(p25.get());
}
} }
}
// if there is a NXDN CC running; halt the CC // if there is a NXDN CC running; halt the CC
if (nxdn != nullptr) { if (nxdn != nullptr) {
if (nxdn->getCCRunning() && !nxdn->getCCHalted()) { if (nxdn->getCCRunning() && !nxdn->getCCHalted()) {
this->interruptNXDNControl(nxdn.get()); this->interruptNXDNControl(nxdn.get());
}
} }
} });
}); }
clockingMutex.unlock();
// read DMR slot 2 frames from modem if (m_state != STATE_IDLE)
readFramesDMR2(dmr.get(), [&, this]() { Thread::sleep(m_activeTickDelay);
if (dmr != nullptr) { if (m_state == STATE_IDLE)
this->interruptDMRBeacon(dmr.get()); Thread::sleep(m_idleTickDelay);
} }
}
#endif // defined(ENABLE_DMR)
});
dmrFrameReadThread.run();
ThreadFunc dmrFrameWriteThread([&, this]() {
#if defined(ENABLE_DMR)
if (dmr != nullptr) {
LogDebug(LOG_HOST, "DMR, started frame processor (modem write)");
while (!g_killed) {
clockingMutex.lock();
{
// ------------------------------------------------------
// -- Write to Modem Processing --
// ------------------------------------------------------
// write DMR slot 1 frames to modem
writeFramesDMR1(dmr.get(), [&, this]() {
// if there is a P25 CC running; halt the CC
if (p25 != nullptr) {
if (p25->getCCRunning() && !p25->getCCHalted()) {
this->interruptP25Control(p25.get());
}
}
// if there is a P25 CC running; halt the CC // if there is a NXDN CC running; halt the CC
if (p25 != nullptr) { if (nxdn != nullptr) {
if (p25->getCCRunning() && !p25->getCCHalted()) { if (nxdn->getCCRunning() && !nxdn->getCCHalted()) {
this->interruptP25Control(p25.get()); this->interruptNXDNControl(nxdn.get());
}
}
});
// write DMR slot 2 frames to modem
writeFramesDMR2(dmr.get(), [&, this]() {
// if there is a P25 CC running; halt the CC
if (p25 != nullptr) {
if (p25->getCCRunning() && !p25->getCCHalted()) {
this->interruptP25Control(p25.get());
}
} }
}
// if there is a NXDN CC running; halt the CC // if there is a NXDN CC running; halt the CC
if (nxdn != nullptr) { if (nxdn != nullptr) {
if (nxdn->getCCRunning() && !nxdn->getCCHalted()) { if (nxdn->getCCRunning() && !nxdn->getCCHalted()) {
this->interruptNXDNControl(nxdn.get()); this->interruptNXDNControl(nxdn.get());
}
} }
} });
}); }
clockingMutex.unlock();
if (m_state != STATE_IDLE) if (m_state != STATE_IDLE)
Thread::sleep(m_activeTickDelay); Thread::sleep(m_activeTickDelay);
@ -864,49 +889,72 @@ int Host::run()
} }
#endif // defined(ENABLE_DMR) #endif // defined(ENABLE_DMR)
}); });
dmrProcessThread.run(); dmrFrameWriteThread.run();
/** Project 25 */ /** Project 25 */
ThreadFunc p25ProcessThread([&, this]() { ThreadFunc p25FrameReadThread([&, this]() {
#if defined(ENABLE_P25) #if defined(ENABLE_P25)
if (p25 != nullptr) { if (p25 != nullptr) {
LogDebug(LOG_HOST, "P25, started frame processor"); LogDebug(LOG_HOST, "P25, started frame processor (modem read)");
while (!g_killed) { while (!g_killed) {
// ------------------------------------------------------ clockingMutex.lock();
// -- Write to Modem Processing -- {
// ------------------------------------------------------ // ------------------------------------------------------
// -- Read from Modem Processing --
// write P25 frames to modem // ------------------------------------------------------
writeFramesP25(p25.get(), [&, this]() {
if (dmr != nullptr) {
this->interruptDMRBeacon(dmr.get());
}
// if there is a NXDN CC running; halt the CC // read P25 frames from modem
if (nxdn != nullptr) { readFramesP25(p25.get(), [&, this]() {
if (nxdn->getCCRunning() && !nxdn->getCCHalted()) { if (dmr != nullptr) {
this->interruptNXDNControl(nxdn.get()); this->interruptDMRBeacon(dmr.get());
} }
}
});
// ------------------------------------------------------ // if there is a NXDN CC running; halt the CC
// -- Read from Modem Processing -- if (nxdn != nullptr) {
// ------------------------------------------------------ if (nxdn->getCCRunning() && !nxdn->getCCHalted()) {
this->interruptNXDNControl(nxdn.get());
}
}
});
}
clockingMutex.unlock();
// read P25 frames from modem if (m_state != STATE_IDLE)
readFramesP25(p25.get(), [&, this]() { Thread::sleep(m_activeTickDelay);
if (dmr != nullptr) { if (m_state == STATE_IDLE)
this->interruptDMRBeacon(dmr.get()); Thread::sleep(m_idleTickDelay);
} }
}
#endif // defined(ENABLE_P25)
});
p25FrameReadThread.run();
// if there is a NXDN CC running; halt the CC ThreadFunc p25FrameWriteThread([&, this]() {
if (nxdn != nullptr) { #if defined(ENABLE_P25)
if (nxdn->getCCRunning() && !nxdn->getCCHalted()) { if (p25 != nullptr) {
this->interruptNXDNControl(nxdn.get()); LogDebug(LOG_HOST, "P25, started frame processor (modem write)");
while (!g_killed) {
clockingMutex.lock();
{
// ------------------------------------------------------
// -- Write to Modem Processing --
// ------------------------------------------------------
// write P25 frames to modem
writeFramesP25(p25.get(), [&, this]() {
if (dmr != nullptr) {
this->interruptDMRBeacon(dmr.get());
} }
}
}); // if there is a NXDN CC running; halt the CC
if (nxdn != nullptr) {
if (nxdn->getCCRunning() && !nxdn->getCCHalted()) {
this->interruptNXDNControl(nxdn.get());
}
}
});
}
clockingMutex.unlock();
if (m_state != STATE_IDLE) if (m_state != STATE_IDLE)
Thread::sleep(m_activeTickDelay); Thread::sleep(m_activeTickDelay);
@ -916,49 +964,72 @@ int Host::run()
} }
#endif // defined(ENABLE_P25) #endif // defined(ENABLE_P25)
}); });
p25ProcessThread.run(); p25FrameWriteThread.run();
/** Next Generation Digital Narrowband */ /** Next Generation Digital Narrowband */
ThreadFunc nxdnProcessThread([&, this]() { ThreadFunc nxdnFrameReadThread([&, this]() {
#if defined(ENABLE_NXDN) #if defined(ENABLE_NXDN)
if (nxdn != nullptr) { if (nxdn != nullptr) {
LogDebug(LOG_HOST, "NXDN, started frame processor"); LogDebug(LOG_HOST, "NXDN, started frame processor (modem read)");
while (!g_killed) { while (!g_killed) {
// ------------------------------------------------------ clockingMutex.lock();
// -- Write to Modem Processing -- {
// ------------------------------------------------------ // ------------------------------------------------------
// -- Read from Modem Processing --
// write NXDN frames to modem // ------------------------------------------------------
writeFramesNXDN(nxdn.get(), [&, this]() {
if (dmr != nullptr) {
this->interruptDMRBeacon(dmr.get());
}
// if there is a P25 CC running; halt the CC // read NXDN frames from modem
if (p25 != nullptr) { readFramesNXDN(nxdn.get(), [&, this]() {
if (p25->getCCRunning() && !p25->getCCHalted()) { if (dmr != nullptr) {
this->interruptP25Control(p25.get()); this->interruptDMRBeacon(dmr.get());
} }
}
});
// ------------------------------------------------------ // if there is a P25 CC running; halt the CC
// -- Read from Modem Processing -- if (p25 != nullptr) {
// ------------------------------------------------------ if (p25->getCCRunning() && !p25->getCCHalted()) {
this->interruptP25Control(p25.get());
}
}
});
}
clockingMutex.unlock();
// read NXDN frames from modem if (m_state != STATE_IDLE)
readFramesNXDN(nxdn.get(), [&, this]() { Thread::sleep(m_activeTickDelay);
if (dmr != nullptr) { if (m_state == STATE_IDLE)
this->interruptDMRBeacon(dmr.get()); Thread::sleep(m_idleTickDelay);
} }
}
#endif // defined(ENABLE_NXDN)
});
nxdnFrameReadThread.run();
ThreadFunc nxdnFrameWriteThread([&, this]() {
#if defined(ENABLE_NXDN)
if (nxdn != nullptr) {
LogDebug(LOG_HOST, "NXDN, started frame processor (modem write)");
while (!g_killed) {
clockingMutex.lock();
{
// ------------------------------------------------------
// -- Write to Modem Processing --
// ------------------------------------------------------
// if there is a P25 CC running; halt the CC // write NXDN frames to modem
if (p25 != nullptr) { writeFramesNXDN(nxdn.get(), [&, this]() {
if (p25->getCCRunning() && !p25->getCCHalted()) { if (dmr != nullptr) {
this->interruptP25Control(p25.get()); this->interruptDMRBeacon(dmr.get());
} }
}
}); // if there is a P25 CC running; halt the CC
if (p25 != nullptr) {
if (p25->getCCRunning() && !p25->getCCHalted()) {
this->interruptP25Control(p25.get());
}
}
});
}
clockingMutex.unlock();
if (m_state != STATE_IDLE) if (m_state != STATE_IDLE)
Thread::sleep(m_activeTickDelay); Thread::sleep(m_activeTickDelay);
@ -968,7 +1039,7 @@ int Host::run()
} }
#endif // defined(ENABLE_NXDN) #endif // defined(ENABLE_NXDN)
}); });
nxdnProcessThread.run(); nxdnFrameWriteThread.run();
// main execution loop // main execution loop
while (!killed) { while (!killed) {
@ -1007,34 +1078,38 @@ int Host::run()
} }
} }
// ------------------------------------------------------ clockingMutex.lock();
// -- Modem Clocking -- {
// ------------------------------------------------------ // ------------------------------------------------------
// -- Modem Clocking --
// ------------------------------------------------------
ms = stopWatch.elapsed(); ms = stopWatch.elapsed();
stopWatch.start(); stopWatch.start();
m_modem->clock(ms); m_modem->clock(ms);
// ------------------------------------------------------ // ------------------------------------------------------
// -- Network, DMR, and P25 Clocking -- // -- Network, DMR, and P25 Clocking --
// ------------------------------------------------------ // ------------------------------------------------------
if (m_network != nullptr) if (m_network != nullptr)
m_network->clock(ms); m_network->clock(ms);
#if defined(ENABLE_DMR) #if defined(ENABLE_DMR)
if (dmr != nullptr) if (dmr != nullptr)
dmr->clock(ms); dmr->clock(ms);
#endif // defined(ENABLE_DMR) #endif // defined(ENABLE_DMR)
#if defined(ENABLE_P25) #if defined(ENABLE_P25)
if (p25 != nullptr) if (p25 != nullptr)
p25->clock(ms); p25->clock(ms);
#endif // defined(ENABLE_P25) #endif // defined(ENABLE_P25)
#if defined(ENABLE_NXDN) #if defined(ENABLE_NXDN)
if (nxdn != nullptr) if (nxdn != nullptr)
nxdn->clock(ms); nxdn->clock(ms);
#endif // defined(ENABLE_NXDN) #endif // defined(ENABLE_NXDN)
}
clockingMutex.unlock();
// ------------------------------------------------------ // ------------------------------------------------------
// -- Timer Clocking -- // -- Timer Clocking --
@ -1292,9 +1367,12 @@ int Host::run()
if (g_killed) { if (g_killed) {
// shutdown reader threads // shutdown reader threads
dmrProcessThread.wait(); dmrFrameReadThread.wait();
p25ProcessThread.wait(); dmrFrameWriteThread.wait();
nxdnProcessThread.wait(); p25FrameReadThread.wait();
p25FrameWriteThread.wait();
nxdnFrameReadThread.wait();
nxdnFrameWriteThread.wait();
#if defined(ENABLE_DMR) #if defined(ENABLE_DMR)
if (dmr != nullptr) { if (dmr != nullptr) {

Loading…
Cancel
Save

Powered by TurnKey Linux.