#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "TCPacketDef.h" // Specialized thread-safe queue for STCPacket by value, avoiding template conflict class CTCPacketQueue { std::queue q; std::mutex m; std::condition_variable cv; public: void Push(const STCPacket& p) { std::lock_guard l(m); q.push(p); cv.notify_one(); } bool Pop(STCPacket& p, int ms) { std::unique_lock l(m); // Wait up to ms if queue is empty if (q.empty()) { if (ms <= 0) return false; // wait_for returns false if timeout, true if predicate is true if (!cv.wait_for(l, std::chrono::milliseconds(ms), [this]{ return !q.empty(); })) { return false; // timeout } } p = q.front(); q.pop(); return true; } }; class CTCSocket { public: CTCSocket(); virtual ~CTCSocket(); virtual bool Open(const std::string &address, const std::string &modules, uint16_t port) = 0; void Close(); void Close(char module); bool Send(const STCPacket *packet); bool IsConnected(char module) const; int GetFD(char module) const; // Legacy compat: returns 1 if connected, -1 if not std::string GetAndClearStats(); protected: nng_socket m_Sock; std::thread m_Thread; std::atomic m_Running; std::atomic m_Connected; std::string m_Modules; // Per-module input queues std::map> m_Queues; // Client queue (receives all) // Client queue (receives all) std::shared_ptr m_ClientQueue; // Track seen modules for logging std::set m_SeenModules; // Packet counters std::map m_PacketCounts; std::mutex m_StatsMutex; void Dispatcher(); }; class CTCServer : public CTCSocket { public: CTCServer() : CTCSocket() {} ~CTCServer() {} bool Open(const std::string &address, const std::string &modules, uint16_t port); bool Receive(char module, STCPacket *packet, int ms); bool AnyAreClosed() const; bool Accept(); // Checks NNG state }; class CTCClient : public CTCSocket { public: CTCClient() : CTCSocket() {} ~CTCClient() {} bool Open(const std::string &address, const std::string &modules, uint16_t port); void Receive(std::queue> &queue, int ms); void ReConnect(); // No-op in NNG };