mirror of https://github.com/nostar/urfd.git
Merge 22e5e4957b into bd0c114e43
commit
6032b053c4
@ -0,0 +1,113 @@
|
||||
# Flexible DMR Mode (Mini DMR) User Guide
|
||||
|
||||
URFD now supports a "Flexible DMR" mode (often called "Mini DMR"), which changes how DMR clients interact with the reflector. Unlike the legacy "XLX" mode where clients link to a specific module (A-Z) and traffic is bridged, Mini DMR mode allows clients to directly subscribe to Talkgroups (TG).
|
||||
|
||||
## How it Works
|
||||
|
||||
In Mini DMR mode, the reflector acts like a **Scanner**.
|
||||
|
||||
1. **Subscriptions**: You "subscribe" to one or more Talkgroups on a Timeslot (TS1 or TS2).
|
||||
2. **Scanning**: The reflector monitors all your subscribed Talkgroups.
|
||||
3. **Hold Time**: When a Talkgroup becomes active (someone speaks), the scanner "locks" onto that Talkgroup for the duration of the transmission plus a **Hold Time** (default 5 seconds). During this hold, traffic from other Talkgroups is blocked to prevent interruption.
|
||||
|
||||
```mermaid
|
||||
graph TD
|
||||
Client[MMDVM Client] -->|Subscribe TG 3100 TS1| Reflector
|
||||
Client -->|Subscribe TG 4001 TS2| Reflector
|
||||
|
||||
subgraph Reflector Logic
|
||||
TrafficA[Traffic on TG 3100] --> Scanner{Scanner Free?}
|
||||
TrafficB[Traffic on TG 4001] --> Scanner
|
||||
|
||||
Scanner -->|Yes| Lock[Lock onto TG 3100]
|
||||
Lock --> Map["Route to Client (TS1)"]
|
||||
|
||||
Scanner -->|"No (Held by 3100)"| Block[Block TG 4001]
|
||||
end
|
||||
|
||||
Map --> Client
|
||||
```
|
||||
|
||||
### Strict Timeslot Routing
|
||||
|
||||
The reflector enforces strict routing based on your subscription:
|
||||
|
||||
* If you subscribe to **TG 3100 on TS1**, traffic for TG 3100 will **only** be sent to your radio on **Timeslot 1**.
|
||||
* If you subscribe to **TG 4001 on TS2**, traffic for TG 4001 will **only** be sent to your radio on **Timeslot 2**.
|
||||
* This allows a single client to monitor different Talkgroups on different Timeslots simultaneously (if the Scanner is not held by one).
|
||||
|
||||
## Configuration
|
||||
|
||||
To enable Mini DMR mode, update your `urfd.ini` (or configuration file) in the `[DMR]` section:
|
||||
|
||||
```ini
|
||||
[DMR]
|
||||
; Disable legacy XLX behavior (REQUIRED for Dashboard Subscription View)
|
||||
XlxCompatibility=false
|
||||
|
||||
; Optional: enforce single subscription per timeslot (default false)
|
||||
SingleMode=false
|
||||
|
||||
; Scanner Hold Time in seconds (default 5)
|
||||
HoldTime=5
|
||||
|
||||
; Dynamic Subscription Timeout in seconds (default 600 / 10 mins)
|
||||
; 0 = Infinite
|
||||
DefaultTimeout=600
|
||||
|
||||
; Module to Talkgroup Mapping (Optional)
|
||||
; Maps Module A to TG 4001, B to 4002, etc. automatically.
|
||||
; You can override specific maps:
|
||||
MapA=4001
|
||||
MapB=4002
|
||||
|
||||
; IMPORTANT: Any module you map (e.g. A, B) MUST be enabled in the [Modules] section!
|
||||
; If Module A is not enabled, traffic for TG 4001 will be dropped.
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
### 1. Subscribing via PTT (Push-To-Talk)
|
||||
|
||||
The easiest way to subscribe to a Talkgroup is to simply **transmit** on it from your radio.
|
||||
|
||||
* **Action**: Key up (PTT) on `TG 1234`.
|
||||
* **Result**: The reflector detects your transmission and automatically subscribes you to `TG 1234` for the configured timeout duration (e.g., 10 minutes).
|
||||
* **Renewal**: If you are already subscribed, keying up again will **reset the timeout timer** back to the full duration.
|
||||
* **Note**: The first transmission might be muted (Anti-Kerchunk) to prevent noise, but you will immediately be subscribed.
|
||||
|
||||
### 2. Subscribing via Options String
|
||||
|
||||
You can manage subscriptions sent from your MMDVM hotspot/repeater configuration (or Pi-Star Options field).
|
||||
|
||||
* **Format**: `TS1=TG_ID;TS2=TG_ID;AUTO=TIMEOUT`
|
||||
* **Example**: `TS2=3100,4001;AUTO=600`
|
||||
* Subscribes Timeslot 2 to TG 3100 and TG 4001.
|
||||
* Sets timeout to 600 seconds.
|
||||
|
||||
### 3. Disconnecting / Unsubscribing
|
||||
|
||||
* **Disconnect All**: Transmit a Group Call to **TG 4000**. This clears all dynamic subscriptions on that timeslot.
|
||||
* **Single Mode**: If `SingleMode=true` is set in config, transmitting on a *new* Talkgroup automatically unsubscribes you from the previous one.
|
||||
|
||||
### 4. Talkgroup 9 (Reflector)
|
||||
|
||||
* Traffic on **TG 9** is treated as local reflector traffic (linked functionality) if the client is essentially "linked" to a module, but in Mini DMR mode, TG 9 behavior depends on the specific map configuration or defaults. Typically, use specific Talkgroups for wide-area routing.
|
||||
|
||||
## Dashboard
|
||||
|
||||
The URFD Dashboard includes a dedicated **DMR** page (`/dmr`) to monitor Flexible DMR Mode activity.
|
||||
|
||||
* **Active Subscriptions**: Shows all Talkgroups a client is monitoring, along with the specific Timeslot.
|
||||
* **Timers**: Displays a real-time countdown for Dynamic Subscriptions. Static subscriptions are marked as `Static`.
|
||||
* **DMR ID**: Displays the client's DMR ID alongside their callsign (e.g., `CALLSIGN (3100123)`).
|
||||
* **Requirements**: The dashboard requires NO additional configuration. It automatically displays data once `XlxCompatibility=false` is set in the backend config.
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### "Recordings are blank" or "No Traffic on other modes"
|
||||
|
||||
If clients can connect and transmit but you see no traffic on other protocols (M17, YSF) or blank recordings:
|
||||
|
||||
* **Check Modules**: Ensure the mapped Module (e.g. A for TG 4001) is defined and **enabled** in your `[Modules]` configuration.
|
||||
* **Log Check**: Look for `Can't find module 'X' for Client ...` errors in the reflector log.
|
||||
@ -0,0 +1,184 @@
|
||||
# Investigation and Fix Plan: Flexible DMR Mode
|
||||
|
||||
## Problem Description
|
||||
|
||||
User wants to support two modes of operation for DMR:
|
||||
|
||||
1. **XLX Mode** (Default): Legacy behaviors. MMDVM clients "link" to a module.
|
||||
2. **Mini DMR Mode** (New): MMDVM clients do not "link". Modules are mapped to Talkgroups. Clients "subscribe" to TGs.
|
||||
|
||||
## Analysis
|
||||
|
||||
- **Modes**:
|
||||
- `XLXCompatibility`: Legacy mode.
|
||||
- `Mini DMR Mode`: Direct TG mapping.
|
||||
- **Subscription Logic**:
|
||||
- **Single Mode**: Only one TG allowed per timeslot. New TG replaces old.
|
||||
- **Multi Mode**: Multiple subscriptions allowed per timeslot.
|
||||
- **Scanner / Hold**: If >1 subscription, hold onto active TG for X seconds (default 5s) after idle before switching.
|
||||
- **Timeouts**:
|
||||
- Dynamic subscriptions expire after configurable time (default 10 mins).
|
||||
- Configurable per connection via Options string/password.
|
||||
- Static subscriptions (via config/options) do not expire.
|
||||
- **Scope**:
|
||||
- Only TGs defined in the Reflector's Module Map (plus 4000) are valid.
|
||||
- **Anti-Kerchunk**:
|
||||
- If a client Subscribes via PTT (first time), ignore/mute that transmission to prevent broadcasting unnecessary noise.
|
||||
|
||||
## Proposed Changes
|
||||
|
||||
### Configuration
|
||||
|
||||
- [ ] Modify `JsonKeys.h` / `Configure.h` / `Configure.cpp`:
|
||||
- `Dmr.XlxCompatibility` (bool, default true).
|
||||
- `Dmr.ModuleMap` (map/object).
|
||||
- `Dmr.SingleMode` (bool, default false).
|
||||
- `Dmr.DefaultTimeout` (int, default 600s).
|
||||
- `Dmr.HoldTime` (int, default 5s).
|
||||
|
||||
### Client State (`DMRMMDVMClient`)
|
||||
|
||||
- [ ] Add `Subscription` structure:
|
||||
- `TalkgroupId`
|
||||
- `Timeslot`
|
||||
- `Expiry` (timestamp or 0 for static)
|
||||
- [ ] Add `ScannerState`:
|
||||
- `CurrentSpeakingTG`
|
||||
- `HoldExpiry`
|
||||
- [ ] Add `Subscriptions` container (list/map).
|
||||
|
||||
### Reflector Logic (`DMRMMDVMProtocol.cpp`)
|
||||
|
||||
- [ ] **Options Parsing**:
|
||||
- Parse "Options" string (e.g., `TS1=4001;AUTO=600`) from RPTC Description/Password.
|
||||
- [ ] **Incoming Packet (`OnDvHeaderPacketIn`)**:
|
||||
- If `!XlxCompatibility`:
|
||||
- **Validate**: TG must be in `ModuleMap` or 4000.
|
||||
- **Unsubscribe**: If TG 4000, remove subscription (or all depending on logic).
|
||||
- **Subscribe**:
|
||||
- Thread-safe update of subscriptions via `CDMRScanner`.
|
||||
- **First PTT Logic**: If this is a *new* dynamic subscription, flag stream as `Muted` or don't propagate.
|
||||
- [ ] **Outgoing/Queue Handling (`HandleQueue`)**:
|
||||
- Filter logic:
|
||||
- Thread-safe check of `CheckPacketAccess(tg)`.
|
||||
- Scanner Logic handled internally in `CDMRScanner` with mutex protection.
|
||||
|
||||
## Architecture Diagram
|
||||
|
||||
```mermaid
|
||||
graph TD
|
||||
Client[MMDVM Client] -->|UDP Packet| Protocol[DMRMMDVMProtocol]
|
||||
Protocol -->|Parse Header| CheckMode{XlxCompatibility?}
|
||||
|
||||
%% XLX Path
|
||||
CheckMode -->|True| XLXLogic[Legacy XLX Logic]
|
||||
XLXLogic -->|TG 9| Core[Reflector Core]
|
||||
|
||||
%% Mini DMR Path
|
||||
CheckMode -->|False| MiniLogic[Mini DMR Logic]
|
||||
|
||||
subgraph CDMRScanner ["class CDMRScanner"]
|
||||
MiniLogic -->|Check Access| ScannerState{State Check}
|
||||
ScannerState -->|Blocked| Drop[Drop Packet]
|
||||
ScannerState -->|Allowed| UpdateTimer[Update Hold Timer]
|
||||
end
|
||||
|
||||
UpdateTimer -->|Mapped TG| Core
|
||||
|
||||
%% Configuration Flow
|
||||
Config[RPTC Packet] -->|Description/Opts| Parser[Options Parser]
|
||||
Parser -->|Update| Subs[Subscription List]
|
||||
Subs -.-> ScannerState
|
||||
```
|
||||
|
||||
## Cross-Protocol Traffic Flow (Outbound)
|
||||
|
||||
```mermaid
|
||||
graph TD
|
||||
Src[Source Protocol e.g. YSF] -->|Audio on Module B| Core[Reflector Core]
|
||||
Core -->|Queue Packet| DMRQueue[DMRMMDVMProtocol::HandleQueue]
|
||||
|
||||
subgraph "Handle Queue Logic"
|
||||
DMRQueue --> Encode1[Encode Buffer TS1]
|
||||
DMRQueue --> Encode2[Encode Buffer TS2]
|
||||
|
||||
Encode1 --> ClientCheck{Client Subscribed?}
|
||||
Encode2 --> ClientCheck
|
||||
|
||||
ClientCheck -->|TG + TS1| Send1[Send TS1 Buffer]
|
||||
ClientCheck -->|TG + TS2| Send2[Send TS2 Buffer]
|
||||
ClientCheck -->|No| Drop[Drop]
|
||||
end
|
||||
|
||||
Send1 --> Client[MMDVM Client]
|
||||
Send2 --> Client
|
||||
``` %% Mini DMR Logic
|
||||
MapLookup -->|Yes| Map[Map Module B -> TG 4002]
|
||||
Map -->|TG 4002| ScannerCheck{Scanner Check}
|
||||
|
||||
subgraph CDMRScanner
|
||||
ScannerCheck -->|Client Subscribed?| SubCheck{Subscribed?}
|
||||
SubCheck -->|No| Drop[Drop]
|
||||
SubCheck -->|Yes| HoldCheck{Hold Timer Active?}
|
||||
|
||||
HoldCheck -->|Held by other TG| Drop
|
||||
HoldCheck -->|Free / Same TG| Allowed[Allow]
|
||||
end
|
||||
|
||||
Allowed --> SendMini[Send UDP Packet TG 4002]
|
||||
```
|
||||
|
||||
## Architecture Decision
|
||||
|
||||
- **Unified Protocol Class**: We will keep `DMRMMDVMProtocol` as the single class handling the UDP/DMR wire protocol.
|
||||
- **Reasoning**: Both "XLX" and "Mini DMR" modes share identical packet structures, parsing, connection handshakes (RPTL/RPTK), and keepalive mechanisms. Splitting them would require either duplicating this transport logic or creating a complex inheritance hierarchy.
|
||||
- **Logic Separation**: instead of polluting `DMRMMDVMProtocol.cpp` with mixed logic:
|
||||
- **Legacy/XLX Logic**: Remains inline (simple routing 9->9).
|
||||
- **New/Mini Logic**: Encapsulated in `CDMRScanner`. The Protocol class will call checking methods on the scanner.
|
||||
- **Toggle**: A simple `if (m_XlxCompatibility)` check at the routing decision points (packet ingress/egress) will switch behavior.
|
||||
|
||||
## Safety & Robustness Logic
|
||||
|
||||
- **Concurrency**:
|
||||
- `CDMRScanner` will encapsulate all state (`Subscriptions`, `HoldTimer`, `CurrentTG`) protected by an internal `std::recursive_mutex`.
|
||||
- **Deadlock Prevention**: `CDMRScanner` methods will be leaf-node operations (never calling out to other complex locked systems).
|
||||
- Access to `CDMRScanner` from `DMRMMDVMProtocol` will be done via thread-safe public methods only.
|
||||
- **Memory Safety**:
|
||||
- Avoid raw `char*` manipulation for Options parsing; use `std::string`.
|
||||
- Input Description field will be clamped to `RPTC` max length (checked in `IsValidConfigPacket` before parsing).
|
||||
- No fixed-size buffers for variable lists (use `std::vector` for TGs).
|
||||
|
||||
## Testing Strategy (TDD)
|
||||
|
||||
- **Objective**: Verify complex logic (Subscription management, Timeout, Scanner checks) in isolation without needing full network stack (mocking `DMRMMDVMProtocol/Client`).
|
||||
- **Plan**:
|
||||
- Create `reflector/DMRScanner.h/cpp` (or similar) to encapsulate the logic:
|
||||
- `class CDMRScanner`:
|
||||
- `AddSubscription(tg, ts, timeout)`
|
||||
- `RemoveSubscription(tg, ts)`
|
||||
- `IsSubscribed(tg)`
|
||||
- `CheckPacketAccess(tg)` -> Validates against Hold timer & Single Mode.
|
||||
- **Safety Tests**: Verify behavior under high-concurrency (if possible in unit test) or logic edge cases.
|
||||
- Create `reflector/test_dmr.cpp`:
|
||||
- A standalone test file similar to `test_audio.cpp`.
|
||||
- **Scenarios**:
|
||||
1. **Single Mode**: Add TG1, Add TG2 -> Assert TG1 removed.
|
||||
2. **Scanner Hold**: Packet from TG1 accepted. Immediately Packet from TG2 -> Rejected (Hold active). Wait 5s -> Packet from TG2 Accepted.
|
||||
3. **Timeout**: Add TG dynamic (timeout 1s). Wait 2s -> Assert TG removed.
|
||||
4. **Options Parsing**: Feed "TS1=1,2;AUTO=300" string -> Verify Subscriptions present.
|
||||
5. **Buffer Safety**: Feed malformed/oversized Option strings -> Verify no crash/leak.
|
||||
- **Build**: Add `test_dmr` target to `Makefile`.
|
||||
|
||||
## Verification Plan
|
||||
|
||||
- [ ] **Run TDD Tests**: `make test_dmr && ./reflector/test_dmr`
|
||||
- [ ] **Manual Verification**:
|
||||
- **Test Configurations**:
|
||||
- Single Mode: Verify PTT on TG A drops TG B.
|
||||
- Multi Mode: Verify PTT on A adds A (keeping B).
|
||||
- **Test Scanner**:
|
||||
- Sub to A and B. Transmit on A. Verify B is blocked during Hold time.
|
||||
- **Test Timeout**:
|
||||
- Set short timeout. Verify subscription drops.
|
||||
- **Test Kerchunk**:
|
||||
- PTT on new TG. Verify not heard by others. Second PTT heard.
|
||||
@ -0,0 +1,145 @@
|
||||
# NNG Event System Documentation
|
||||
|
||||
This document describes the real-time event system in `urfd`, which uses NNG (nanomsg next gen) to broadcast system state and activity as JSON.
|
||||
|
||||
## Architecture Overview
|
||||
|
||||
The `urfd` reflector acts as an NNG **Publisher** (PUB). Any number of subscribers (e.g., a middle-tier service or dashboard) can connect as **Subscribers** (SUB) to receive the event stream.
|
||||
|
||||
```mermaid
|
||||
graph TD
|
||||
subgraph "urfd Core"
|
||||
CR["CReflector"]
|
||||
CC["CClients"]
|
||||
CU["CUsers"]
|
||||
PS["CPacketStream"]
|
||||
end
|
||||
|
||||
subgraph "Publishing Layer"
|
||||
NP["g_NNGPublisher"]
|
||||
end
|
||||
|
||||
subgraph "Network"
|
||||
ADDR["tcp://0.0.0.0:5555"]
|
||||
end
|
||||
|
||||
subgraph "External"
|
||||
MT["Middle Tier / Dashboard"]
|
||||
end
|
||||
|
||||
%% Internal Flows
|
||||
CC -- "client_connect / client_disconnect" --> NP
|
||||
CU -- "hearing / closing" --> NP
|
||||
CR -- "periodic state report" --> NP
|
||||
PS -- "IsActive status" --> CR
|
||||
|
||||
%% Network Flow
|
||||
NP --> ADDR
|
||||
ADDR -.-> MT
|
||||
```
|
||||
|
||||
## Messaging Protocols
|
||||
|
||||
Events are sent as serialized JSON strings. Each message contains a `type` field to identify the payload structure.
|
||||
|
||||
### 1. State Broadcast (`state`)
|
||||
|
||||
Sent periodically based on `DashboardInterval` (default 10s). It provides a full snapshot of the reflector's configuration and status.
|
||||
|
||||
**Payload Structure:**
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "state",
|
||||
"Configure": {
|
||||
"Key": "Value",
|
||||
...
|
||||
},
|
||||
"Peers": [
|
||||
{
|
||||
"Callsign": "XLX123",
|
||||
"Modules": "ABC",
|
||||
"Protocol": "D-Extra",
|
||||
"ConnectTime": "2023-10-27T10:00:00Z"
|
||||
}
|
||||
],
|
||||
"Clients": [
|
||||
{
|
||||
"Callsign": "N7TAE",
|
||||
"OnModule": "A",
|
||||
"Protocol": "DMR",
|
||||
"ConnectTime": "2023-10-27T10:05:00Z"
|
||||
}
|
||||
],
|
||||
"Users": [
|
||||
{
|
||||
"Callsign": "G4XYZ",
|
||||
"Repeater": "GB3NB",
|
||||
"OnModule": "B",
|
||||
"ViaPeer": "XLX456",
|
||||
"LastHeard": "2023-10-27T10:10:00Z"
|
||||
}
|
||||
],
|
||||
"ActiveTalkers": [
|
||||
{
|
||||
"Module": "A",
|
||||
"Callsign": "N7TAE"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### 2. Client Connectivity (`client_connect` / `client_disconnect`)
|
||||
|
||||
Triggered immediately when a client (Repeater, Hotspot, or Mobile App) links or unlinks from a module.
|
||||
|
||||
**Payload Structure:**
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "client_connect",
|
||||
"callsign": "N7TAE",
|
||||
"ip": "1.2.3.4",
|
||||
"protocol": "DMR",
|
||||
"module": "A"
|
||||
}
|
||||
```
|
||||
|
||||
### 3. Voice Activity (`hearing`)
|
||||
|
||||
Triggered when the reflector "hears" an active transmission. This event is sent for every "tick" or heartbeat of voice activity processed by the reflector.
|
||||
|
||||
**Payload Structure:**
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "hearing",
|
||||
"my": "G4XYZ",
|
||||
"ur": "CQCQCQ",
|
||||
"rpt1": "GB3NB",
|
||||
"rpt2": "XLX123 A",
|
||||
"module": "A",
|
||||
"protocol": "M17"
|
||||
}
|
||||
```
|
||||
|
||||
### 4. Transmission End (`closing`)
|
||||
|
||||
Triggered when a transmission stream is closed (user stops talking).
|
||||
|
||||
**Payload Structure:**
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "closing",
|
||||
"my": "G4XYZ",
|
||||
"module": "A",
|
||||
"protocol": "M17"
|
||||
}
|
||||
```
|
||||
|
||||
## Middle Tier Design Considerations
|
||||
|
||||
1. **Late Joining**: The `state` message is broadcast periodically to ensure a middle-tier connecting at any time (or reconnecting) can synchronize its internal state without waiting for new events.
|
||||
2. **Active Talkers**: The `ActiveTalkers` array in the `state` message identifies who is currently keyed up. Real-time transitions (start/stop) are driven by the `hearing` events and the absence of such events over a timeout (typically 2-3 seconds).
|
||||
3. **Deduplication**: The `state` report is a snapshot. If the middle-tier is already tracking events, it can use the `state` report to "re-base" its state and clear out stale data.
|
||||
|
After Width: | Height: | Size: 40 KiB |
@ -0,0 +1,235 @@
|
||||
#include "AudioRecorder.h"
|
||||
#include <iostream>
|
||||
#include <cstring>
|
||||
#include <ctime>
|
||||
#include <sstream>
|
||||
#include <random>
|
||||
|
||||
// Opus settings for Voice 8kHz Mono
|
||||
#define SAMPLE_RATE 8000
|
||||
#define CHANNELS 1
|
||||
#define APPLICATION OPUS_APPLICATION_VOIP
|
||||
// 60ms frame size = 480 samples at 8kHz
|
||||
#define FRAME_SIZE 480
|
||||
|
||||
CAudioRecorder::CAudioRecorder() : m_IsRecording(false), m_Encoder(nullptr), m_PacketCount(0), m_GranulePos(0)
|
||||
{
|
||||
}
|
||||
|
||||
CAudioRecorder::~CAudioRecorder()
|
||||
{
|
||||
Stop();
|
||||
}
|
||||
|
||||
void CAudioRecorder::Cleanup()
|
||||
{
|
||||
if (m_Encoder) {
|
||||
opus_encoder_destroy(m_Encoder);
|
||||
m_Encoder = nullptr;
|
||||
}
|
||||
if (m_IsRecording) {
|
||||
ogg_stream_clear(&m_OggStream);
|
||||
}
|
||||
if (m_File.is_open()) {
|
||||
m_File.close();
|
||||
}
|
||||
m_IsRecording = false;
|
||||
m_PcmBuffer.clear();
|
||||
}
|
||||
|
||||
std::string CAudioRecorder::Start(const std::string& directory)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_Mutex);
|
||||
Cleanup();
|
||||
|
||||
// Use random_device for true randomness/seed
|
||||
std::random_device rd;
|
||||
std::mt19937 gen(rd());
|
||||
std::uniform_int_distribution<uint16_t> dist(0, 255);
|
||||
|
||||
// Generate UUIDv7 Filename
|
||||
uint8_t uuid[16];
|
||||
uint8_t rand_bytes[10];
|
||||
for(int i=0; i<10; ++i) rand_bytes[i] = (uint8_t)dist(gen);
|
||||
|
||||
struct timespec ts;
|
||||
clock_gettime(CLOCK_REALTIME, &ts);
|
||||
uint64_t unix_ts_ms = (uint64_t)ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
|
||||
|
||||
uuidv7_generate(uuid, unix_ts_ms, rand_bytes, nullptr);
|
||||
|
||||
char uuid_str[37];
|
||||
uuidv7_to_string(uuid, uuid_str);
|
||||
|
||||
m_Filename = "hearing_" + std::string(uuid_str) + ".opus";
|
||||
if (directory.back() == '/')
|
||||
m_FullPath = directory + m_Filename;
|
||||
else
|
||||
m_FullPath = directory + "/" + m_Filename;
|
||||
|
||||
m_File.open(m_FullPath, std::ios::binary | std::ios::out);
|
||||
if (!m_File.is_open()) {
|
||||
std::cerr << "AudioRecorder: Failed to open file: " << m_FullPath << std::endl;
|
||||
return "";
|
||||
}
|
||||
|
||||
InitOpus();
|
||||
InitOgg(); // No longer calls srand
|
||||
|
||||
m_StartTime = std::time(nullptr);
|
||||
m_TotalBytes = 0;
|
||||
m_IsRecording = true;
|
||||
|
||||
std::cout << "AudioRecorder: Started recording to " << m_Filename << std::endl;
|
||||
return m_Filename;
|
||||
}
|
||||
|
||||
void CAudioRecorder::InitOpus()
|
||||
{
|
||||
int err;
|
||||
m_Encoder = opus_encoder_create(SAMPLE_RATE, CHANNELS, APPLICATION, &err);
|
||||
if (err != OPUS_OK) {
|
||||
std::cerr << "AudioRecorder: Failed to create Opus encoder: " << opus_strerror(err) << std::endl;
|
||||
}
|
||||
opus_encoder_ctl(m_Encoder, OPUS_SET_BITRATE(12000)); // 12kbps
|
||||
}
|
||||
|
||||
void CAudioRecorder::InitOgg()
|
||||
{
|
||||
// Initialize Ogg stream with random serial
|
||||
// Use random_device for thread safety
|
||||
std::random_device rd;
|
||||
std::mt19937 gen(rd());
|
||||
std::uniform_int_distribution<int> dist; // full int range
|
||||
|
||||
if (ogg_stream_init(&m_OggStream, dist(gen)) != 0) {
|
||||
std::cerr << "AudioRecorder: Failed to init Ogg stream" << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
// Create OpusHead packet
|
||||
// Magic: "OpusHead" (8 bytes)
|
||||
// Version: 1 (1 byte)
|
||||
// Channel Count: 1 (1 byte)
|
||||
// Pre-skip: 0 (2 bytes)
|
||||
// Input Sample Rate: 8000 (4 bytes)
|
||||
// Output Gain: 0 (2 bytes)
|
||||
// Mapping Family: 0 (1 byte)
|
||||
unsigned char header[19] = {
|
||||
'O', 'p', 'u', 's', 'H', 'e', 'a', 'd',
|
||||
1,
|
||||
CHANNELS,
|
||||
0, 0,
|
||||
0x40, 0x1f, 0x00, 0x00, // 8000 little endian
|
||||
0, 0,
|
||||
0
|
||||
};
|
||||
|
||||
ogg_packet header_packet;
|
||||
header_packet.packet = header;
|
||||
header_packet.bytes = 19;
|
||||
header_packet.b_o_s = 1;
|
||||
header_packet.e_o_s = 0;
|
||||
header_packet.granulepos = 0;
|
||||
header_packet.packetno = m_PacketCount++;
|
||||
|
||||
ogg_stream_packetin(&m_OggStream, &header_packet);
|
||||
WriteOggPage(true); // Flush header
|
||||
|
||||
// OpusTags (comments) - Minimal
|
||||
// Magic: "OpusTags" (8 bytes)
|
||||
// Vendor String Length (4 bytes)
|
||||
// Vendor String
|
||||
// User Comment List Length (4 bytes)
|
||||
const char* vendor = "urfd-recorder";
|
||||
uint32_t vendor_len = strlen(vendor);
|
||||
|
||||
std::vector<unsigned char> tags;
|
||||
tags.reserve(8 + 4 + vendor_len + 4);
|
||||
const char* magic = "OpusTags";
|
||||
tags.insert(tags.end(), magic, magic + 8);
|
||||
|
||||
tags.push_back(vendor_len & 0xFF);
|
||||
tags.push_back((vendor_len >> 8) & 0xFF);
|
||||
tags.push_back((vendor_len >> 16) & 0xFF);
|
||||
tags.push_back((vendor_len >> 24) & 0xFF);
|
||||
|
||||
tags.insert(tags.end(), vendor, vendor + vendor_len);
|
||||
|
||||
// 0 comments
|
||||
tags.push_back(0); tags.push_back(0); tags.push_back(0); tags.push_back(0);
|
||||
|
||||
ogg_packet tags_packet;
|
||||
tags_packet.packet = tags.data();
|
||||
tags_packet.bytes = tags.size();
|
||||
tags_packet.b_o_s = 0;
|
||||
tags_packet.e_o_s = 0;
|
||||
tags_packet.granulepos = 0;
|
||||
tags_packet.packetno = m_PacketCount++;
|
||||
|
||||
ogg_stream_packetin(&m_OggStream, &tags_packet);
|
||||
WriteOggPage(true);
|
||||
}
|
||||
|
||||
void CAudioRecorder::WriteOggPage(bool flush)
|
||||
{
|
||||
while(true) {
|
||||
int result = flush ? ogg_stream_flush(&m_OggStream, &m_OggPage) : ogg_stream_pageout(&m_OggStream, &m_OggPage);
|
||||
if (result == 0) break;
|
||||
m_File.write((const char*)m_OggPage.header, m_OggPage.header_len);
|
||||
m_File.write((const char*)m_OggPage.body, m_OggPage.body_len);
|
||||
m_TotalBytes += m_OggPage.header_len + m_OggPage.body_len;
|
||||
m_File.flush();
|
||||
}
|
||||
}
|
||||
|
||||
void CAudioRecorder::Write(const int16_t* samples, int count)
|
||||
{
|
||||
if (!m_IsRecording || !m_Encoder) return;
|
||||
|
||||
std::lock_guard<std::mutex> lock(m_Mutex);
|
||||
|
||||
m_PcmBuffer.insert(m_PcmBuffer.end(), samples, samples + count);
|
||||
|
||||
unsigned char out_buf[1024];
|
||||
|
||||
while (m_PcmBuffer.size() >= FRAME_SIZE) {
|
||||
int len = opus_encode(m_Encoder, m_PcmBuffer.data(), FRAME_SIZE, out_buf, sizeof(out_buf));
|
||||
if (len < 0) {
|
||||
std::cerr << "AudioRecorder: Opus encode error: " << len << std::endl;
|
||||
} else {
|
||||
// Ogg Opus always uses 48kHz for granulepos, regardless of input rate
|
||||
// Input: 8000Hz. Frame: 480 samples (60ms).
|
||||
// Output: 48000Hz. Frame: 2880 samples (60ms).
|
||||
m_GranulePos += FRAME_SIZE * (48000 / SAMPLE_RATE);
|
||||
|
||||
ogg_packet packet;
|
||||
packet.packet = out_buf;
|
||||
packet.bytes = len;
|
||||
packet.b_o_s = 0;
|
||||
packet.e_o_s = 0;
|
||||
packet.granulepos = m_GranulePos;
|
||||
packet.packetno = m_PacketCount++;
|
||||
|
||||
ogg_stream_packetin(&m_OggStream, &packet);
|
||||
WriteOggPage();
|
||||
}
|
||||
|
||||
m_PcmBuffer.erase(m_PcmBuffer.begin(), m_PcmBuffer.begin() + FRAME_SIZE);
|
||||
}
|
||||
}
|
||||
|
||||
void CAudioRecorder::Stop()
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_Mutex);
|
||||
if (!m_IsRecording) return;
|
||||
|
||||
// Actually, just flushing logic
|
||||
WriteOggPage(true);
|
||||
|
||||
double duration = std::difftime(std::time(nullptr), m_StartTime);
|
||||
std::cout << "AudioRecorder: Stopped recording " << m_Filename
|
||||
<< ". Duration: " << duration << "s. Size: " << m_TotalBytes << " bytes." << std::endl;
|
||||
|
||||
Cleanup();
|
||||
}
|
||||
@ -0,0 +1,58 @@
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <cstdint>
|
||||
#include <fstream>
|
||||
#include <mutex>
|
||||
#include <opus/opus.h>
|
||||
#include <ogg/ogg.h>
|
||||
#include "uuidv7.h"
|
||||
|
||||
class CAudioRecorder
|
||||
{
|
||||
public:
|
||||
CAudioRecorder();
|
||||
~CAudioRecorder();
|
||||
|
||||
// Starts recording to a new file.
|
||||
// Generates a UUIDv7 based filename if path is a directory,
|
||||
// or uses the provided path + generated filename.
|
||||
// Returns the filename (without path) for notification.
|
||||
std::string Start(const std::string& directory);
|
||||
|
||||
// Writes signed 16-bit PCM samples (8kHz mono)
|
||||
void Write(const int16_t* samples, int count);
|
||||
|
||||
// Stops recording and closes file.
|
||||
void Stop();
|
||||
|
||||
bool IsRecording() const { return m_IsRecording; }
|
||||
|
||||
private:
|
||||
void InitOpus();
|
||||
void InitOgg();
|
||||
void WriteOggPage(bool flush = false);
|
||||
void Cleanup();
|
||||
|
||||
bool m_IsRecording;
|
||||
std::ofstream m_File;
|
||||
std::string m_Filename;
|
||||
std::string m_FullPath;
|
||||
std::time_t m_StartTime;
|
||||
size_t m_TotalBytes;
|
||||
std::mutex m_Mutex;
|
||||
|
||||
// Opus state
|
||||
OpusEncoder* m_Encoder;
|
||||
|
||||
// Ogg state
|
||||
ogg_stream_state m_OggStream;
|
||||
ogg_page m_OggPage;
|
||||
ogg_packet m_OggPacket;
|
||||
int m_PacketCount;
|
||||
int m_GranulePos;
|
||||
|
||||
// Buffering pcm for frame size
|
||||
std::vector<int16_t> m_PcmBuffer;
|
||||
};
|
||||
@ -0,0 +1,318 @@
|
||||
/*
|
||||
* Copyright (c) 2024 by Thomas A. Early N7TAE
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation; either version 2 of the License, or
|
||||
* (at your option) any later version.
|
||||
*/
|
||||
|
||||
#include "DMRScanner.h"
|
||||
#include <iostream>
|
||||
#include <algorithm>
|
||||
|
||||
CDMRScanner::CDMRScanner() :
|
||||
m_SingleMode(false),
|
||||
m_DefaultTimeout(600),
|
||||
m_HoldTime(5)
|
||||
{
|
||||
m_CurrentScanTG[0] = 0;
|
||||
m_CurrentScanTG[1] = 0;
|
||||
}
|
||||
|
||||
CDMRScanner::~CDMRScanner()
|
||||
{
|
||||
}
|
||||
|
||||
void CDMRScanner::Configure(bool singleMode, unsigned int defaultTimeout, unsigned int holdTime)
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(m_Mutex);
|
||||
m_SingleMode = singleMode;
|
||||
m_DefaultTimeout = defaultTimeout;
|
||||
m_HoldTime = holdTime;
|
||||
}
|
||||
|
||||
void CDMRScanner::UpdateSubscriptions(const std::string& options)
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(m_Mutex);
|
||||
parseOptions(options);
|
||||
}
|
||||
|
||||
void CDMRScanner::parseOptions(const std::string& options)
|
||||
{
|
||||
// Basic parsing: Options: TS1=4001,4002;TS2=9;AUTO=600
|
||||
// Split by ';'
|
||||
if (options.empty()) return;
|
||||
|
||||
std::stringstream ss(options);
|
||||
std::string segment;
|
||||
unsigned int timeout = m_DefaultTimeout;
|
||||
|
||||
// First pass to find AUTO/Timeout if present (to apply to TGs)
|
||||
// Actually, typically AUTO applies to all in the string.
|
||||
// Let's parse into a temporary structure first.
|
||||
|
||||
std::vector<unsigned int> ts1_tgs;
|
||||
std::vector<unsigned int> ts2_tgs;
|
||||
|
||||
while(std::getline(ss, segment, ';'))
|
||||
{
|
||||
size_t eq = segment.find('=');
|
||||
if (eq != std::string::npos)
|
||||
{
|
||||
std::string key = segment.substr(0, eq);
|
||||
std::string val = segment.substr(eq + 1);
|
||||
|
||||
// trim key/val
|
||||
key.erase(0, key.find_first_not_of(" \t\r\n"));
|
||||
key.erase(key.find_last_not_of(" \t\r\n") + 1);
|
||||
|
||||
if (key == "Options") {
|
||||
// Recursive parse or just assume val contains the options?
|
||||
// Example: Options=TS1=4001,4002
|
||||
// Wait, typically MMDVMHost sends: Options=TS1=4001,4002;TS2=9
|
||||
// If the entire string is "Options=...", we need to parse 'val'.
|
||||
// If 'val' contains semicolons, std::getline logic above might have split it already?
|
||||
// No, std::getline splits on ';' first.
|
||||
// Case 1: "Options=TS1=4001,4002;TS2=9"
|
||||
// Segment 1: "Options=TS1=4001,4002". Key="Options", Val="TS1=4001,4002".
|
||||
// We should parse 'Val'.
|
||||
// But wait, 'Val' is "TS1=4001,4002". It'looks like a K=V itself?
|
||||
// Let's recursively call parseOptions(val) or just process val.
|
||||
// But verify val format.
|
||||
// Simplest: Check if val starts with TS1/TS2/AUTO ?
|
||||
parseOptions(val);
|
||||
}
|
||||
else if (key == "AUTO") {
|
||||
try {
|
||||
timeout = std::stoul(val);
|
||||
} catch(...) {}
|
||||
} else if (key == "TS1") {
|
||||
std::stringstream vs(val);
|
||||
std::string v;
|
||||
while(std::getline(vs, v, ',')) {
|
||||
try { ts1_tgs.push_back(std::stoul(v)); } catch(...) {}
|
||||
}
|
||||
} else if (key == "TS2") {
|
||||
std::stringstream vs(val);
|
||||
std::string v;
|
||||
while(std::getline(vs, v, ',')) {
|
||||
try { ts2_tgs.push_back(std::stoul(v)); } catch(...) {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Apply (Replace existing usually? Or append? The prompt said "Options string... to configure subscriptions".
|
||||
// Usually RPTC is a full state update. Let's assume replace for provided timeslots).
|
||||
// Actually user said "clients can send options... similar to freedmr".
|
||||
// Freedmr options usually add/set.
|
||||
// Let's implement ADD logic, but if SingleMode is on, it naturally replaces.
|
||||
// Wait, typical "Options=" in password means "Set these". So we should probably existing ones if they are re-specified?
|
||||
// Let's assume for now we ADD/UPDATE.
|
||||
|
||||
// Actually, simpler implementation for now: Just Add.
|
||||
// parseOptions: call AddSubscription with isStatic=true
|
||||
for (auto tg : ts1_tgs) AddSubscription(tg, 1, timeout, true);
|
||||
for (auto tg : ts2_tgs) AddSubscription(tg, 2, timeout, true);
|
||||
}
|
||||
|
||||
void CDMRScanner::AddSubscription(unsigned int tgid, int timeslot, unsigned int timeout, bool isStatic)
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(m_Mutex);
|
||||
|
||||
if (tgid == 4000) {
|
||||
m_Subscriptions[timeslot].clear();
|
||||
return;
|
||||
}
|
||||
|
||||
if (m_SingleMode) {
|
||||
m_Subscriptions[timeslot].clear();
|
||||
isStatic = true; // Single Mode always static
|
||||
}
|
||||
|
||||
// Remove if exists to update
|
||||
RemoveSubscription(tgid, timeslot);
|
||||
|
||||
SSubscription sub;
|
||||
sub.tgid = tgid;
|
||||
sub.timeout = timeout;
|
||||
sub.expiry = (timeout == 0) ? 0 : std::time(nullptr) + timeout;
|
||||
sub.isStatic = isStatic;
|
||||
|
||||
m_Subscriptions[timeslot].push_back(sub);
|
||||
}
|
||||
|
||||
void CDMRScanner::RenewSubscription(unsigned int tgid, int timeslot, unsigned int timeout)
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(m_Mutex);
|
||||
|
||||
if (m_Subscriptions.count(timeslot)) {
|
||||
for (auto& s : m_Subscriptions.at(timeslot)) {
|
||||
if (s.tgid == tgid && !s.isStatic) {
|
||||
s.expiry = (timeout == 0) ? 0 : std::time(nullptr) + timeout;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void CDMRScanner::RemoveSubscription(unsigned int tgid, int timeslot)
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(m_Mutex);
|
||||
auto& subs = m_Subscriptions[timeslot];
|
||||
subs.erase(std::remove_if(subs.begin(), subs.end(),
|
||||
[tgid](const SSubscription& s) { return s.tgid == tgid; }), subs.end());
|
||||
}
|
||||
|
||||
void CDMRScanner::ClearSubscriptions()
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(m_Mutex);
|
||||
m_Subscriptions.clear();
|
||||
m_CurrentScanTG[0] = 0;
|
||||
m_CurrentScanTG[1] = 0;
|
||||
}
|
||||
|
||||
bool CDMRScanner::IsSubscribed(unsigned int tgid) const
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(m_Mutex);
|
||||
std::time_t now = std::time(nullptr);
|
||||
|
||||
for (const auto& pair : m_Subscriptions) {
|
||||
for (const auto& sub : pair.second) {
|
||||
if (sub.tgid == tgid) {
|
||||
if (!sub.isStatic && sub.timeout > 0 && now > sub.expiry) continue;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool CDMRScanner::IsSubscribed(unsigned int tgid, int timeslot) const
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(m_Mutex);
|
||||
std::time_t now = std::time(nullptr);
|
||||
|
||||
if (m_Subscriptions.count(timeslot)) {
|
||||
for (const auto& sub : m_Subscriptions.at(timeslot)) {
|
||||
if (sub.tgid == tgid) {
|
||||
if (!sub.isStatic && sub.timeout > 0 && now > sub.expiry) continue;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool CDMRScanner::CheckAccess(unsigned int tgid, int slot)
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(m_Mutex);
|
||||
|
||||
if (slot == 0) {
|
||||
// Check both slots
|
||||
return CheckAccess(tgid, 1) || CheckAccess(tgid, 2);
|
||||
}
|
||||
|
||||
if (slot < 1 || slot > 2) return false;
|
||||
int idx = slot - 1;
|
||||
|
||||
cleanupExpired();
|
||||
|
||||
if (!IsSubscribed(tgid, slot)) return false;
|
||||
|
||||
// Scanner Logic for Slot
|
||||
if (m_CurrentScanTG[idx] != 0) {
|
||||
if (m_CurrentScanTG[idx] == tgid) {
|
||||
m_HoldTimer[idx].start();
|
||||
return true;
|
||||
}
|
||||
|
||||
if (m_HoldTimer[idx].time() < m_HoldTime) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
m_CurrentScanTG[idx] = tgid;
|
||||
m_HoldTimer[idx].start();
|
||||
return true;
|
||||
}
|
||||
|
||||
void CDMRScanner::cleanupExpired()
|
||||
{
|
||||
std::time_t now = std::time(nullptr);
|
||||
for (auto& pair : m_Subscriptions) {
|
||||
auto& subs = pair.second;
|
||||
subs.erase(std::remove_if(subs.begin(), subs.end(),
|
||||
[now](const SSubscription& s) { return !s.isStatic && s.timeout > 0 && now > s.expiry; }), subs.end());
|
||||
}
|
||||
|
||||
if (m_CurrentScanTG[0] != 0 && !IsSubscribed(m_CurrentScanTG[0], 1)) m_CurrentScanTG[0] = 0;
|
||||
if (m_CurrentScanTG[1] != 0 && !IsSubscribed(m_CurrentScanTG[1], 2)) m_CurrentScanTG[1] = 0;
|
||||
}
|
||||
|
||||
unsigned int CDMRScanner::GetFirstSubscription() const
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(m_Mutex);
|
||||
|
||||
// Check TS2 first (Standard DMRReflector usually)
|
||||
if (m_Subscriptions.count(2) && !m_Subscriptions.at(2).empty()) return m_Subscriptions.at(2).front().tgid;
|
||||
if (m_Subscriptions.count(1) && !m_Subscriptions.at(1).empty()) return m_Subscriptions.at(1).front().tgid;
|
||||
|
||||
// Check any
|
||||
for(const auto& p : m_Subscriptions) {
|
||||
if (!p.second.empty()) return p.second.front().tgid;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
unsigned int CDMRScanner::GetSubscriptionSlot(unsigned int tgid) const
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(m_Mutex);
|
||||
|
||||
// Check TS1
|
||||
if (m_Subscriptions.count(1)) {
|
||||
for(const auto& s : m_Subscriptions.at(1)) {
|
||||
if (s.tgid == tgid) return 1;
|
||||
}
|
||||
}
|
||||
// Check TS2
|
||||
if (m_Subscriptions.count(2)) {
|
||||
for(const auto& s : m_Subscriptions.at(2)) {
|
||||
if (s.tgid == tgid) return 2;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
std::vector<SSubscription> CDMRScanner::GetSubscriptions(int slot) const
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(m_Mutex);
|
||||
if (m_Subscriptions.count(slot)) {
|
||||
return m_Subscriptions.at(slot);
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
void CDMRScanner::GetActiveTalkgroups(std::vector<unsigned int>& tgs) const
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(m_Mutex);
|
||||
tgs.clear();
|
||||
|
||||
std::time_t now = std::time(nullptr);
|
||||
|
||||
// Check TS1
|
||||
if (m_Subscriptions.count(1)) {
|
||||
for(const auto& s : m_Subscriptions.at(1)) {
|
||||
if (!s.isStatic && s.timeout > 0 && now > s.expiry) continue;
|
||||
tgs.push_back(s.tgid);
|
||||
}
|
||||
}
|
||||
// Check TS2
|
||||
if (m_Subscriptions.count(2)) {
|
||||
for(const auto& s : m_Subscriptions.at(2)) {
|
||||
if (!s.isStatic && s.timeout > 0 && now > s.expiry) continue;
|
||||
tgs.push_back(s.tgid);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,76 @@
|
||||
/*
|
||||
* Copyright (c) 2024 by Thomas A. Early N7TAE
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation; either version 2 of the License, or
|
||||
* (at your option) any later version.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
#include <string>
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
#include <ctime>
|
||||
#include <sstream>
|
||||
|
||||
#include "Timer.h"
|
||||
|
||||
// Structure to hold subscription details
|
||||
struct SSubscription {
|
||||
unsigned int tgid;
|
||||
unsigned int timeout; // seconds, 0 = infinite
|
||||
std::time_t expiry; // absolute time
|
||||
bool isStatic; // true if static (no timeout)
|
||||
};
|
||||
|
||||
class CDMRScanner
|
||||
{
|
||||
public:
|
||||
CDMRScanner();
|
||||
virtual ~CDMRScanner();
|
||||
|
||||
// Configuration
|
||||
void Configure(bool singleMode, unsigned int defaultTimeout, unsigned int holdTime);
|
||||
bool IsSingleMode() const { return m_SingleMode; }
|
||||
|
||||
// Subscription Management
|
||||
void UpdateSubscriptions(const std::string& options);
|
||||
void AddSubscription(unsigned int tgid, int timeslot, unsigned int timeout, bool isStatic = false);
|
||||
void RenewSubscription(unsigned int tgid, int timeslot, unsigned int timeout);
|
||||
void RemoveSubscription(unsigned int tgid, int timeslot);
|
||||
void ClearSubscriptions();
|
||||
bool IsSubscribed(unsigned int tgid) const;
|
||||
bool IsSubscribed(unsigned int tgid, int timeslot) const;
|
||||
|
||||
// Packet Access Check (Scanner Logic)
|
||||
// Returns true if packet with this TG should be processed
|
||||
bool CheckAccess(unsigned int tgid, int slot = 0);
|
||||
|
||||
// Getters
|
||||
unsigned int GetFirstSubscription() const;
|
||||
unsigned int GetSubscriptionSlot(unsigned int tgid) const;
|
||||
std::vector<SSubscription> GetSubscriptions(int slot) const;
|
||||
void GetActiveTalkgroups(std::vector<unsigned int>& tgs) const;
|
||||
unsigned int GetCurrentScanTG(int slot) const { return (slot >= 1 && slot <= 2) ? m_CurrentScanTG[slot-1] : 0; }
|
||||
|
||||
private:
|
||||
mutable std::recursive_mutex m_Mutex;
|
||||
|
||||
// Config
|
||||
bool m_SingleMode;
|
||||
unsigned int m_DefaultTimeout;
|
||||
unsigned int m_HoldTime;
|
||||
|
||||
// State
|
||||
std::map<int, std::vector<SSubscription>> m_Subscriptions; // Map Timeslot -> List of Subscriptions
|
||||
// Scanner State per slot [0]=TS1, [1]=TS2
|
||||
unsigned int m_CurrentScanTG[2];
|
||||
CTimer m_HoldTimer[2];
|
||||
|
||||
// Helpers
|
||||
void cleanupExpired();
|
||||
void parseOptions(const std::string& options);
|
||||
};
|
||||
@ -0,0 +1,26 @@
|
||||
#include "ImrsClient.h"
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////
|
||||
// constructors
|
||||
|
||||
CImrsClient::CImrsClient()
|
||||
{
|
||||
}
|
||||
|
||||
CImrsClient::CImrsClient(const CCallsign &callsign, const CIp &ip, char reflectorModule)
|
||||
: CClient(callsign, ip, reflectorModule)
|
||||
{
|
||||
}
|
||||
|
||||
CImrsClient::CImrsClient(const CImrsClient &client)
|
||||
: CClient(client)
|
||||
{
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////
|
||||
// status
|
||||
|
||||
bool CImrsClient::IsAlive(void) const
|
||||
{
|
||||
return (m_LastKeepaliveTime.time() < IMRS_KEEPALIVE_TIMEOUT);
|
||||
}
|
||||
@ -0,0 +1,18 @@
|
||||
#pragma once
|
||||
|
||||
#include "Client.h"
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////
|
||||
// class
|
||||
|
||||
class CImrsClient : public CClient
|
||||
{
|
||||
public:
|
||||
// constructors
|
||||
CImrsClient();
|
||||
CImrsClient(const CCallsign &callsign, const CIp &ip, char reflectorModule = ' ');
|
||||
CImrsClient(const CImrsClient &client);
|
||||
|
||||
// status
|
||||
virtual bool IsAlive(void) const;
|
||||
};
|
||||
@ -0,0 +1,427 @@
|
||||
#include <cstring>
|
||||
#include <arpa/inet.h>
|
||||
|
||||
#include "Global.h"
|
||||
#include "ImrsClient.h"
|
||||
#include "ImrsProtocol.h"
|
||||
#include "YSFDefines.h"
|
||||
#include "YSFUtils.h"
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////
|
||||
// operation
|
||||
|
||||
bool CImrsProtocol::Initialize(const char *type, const EProtocol ptype, const uint16_t port, const bool has_ipv4, const bool has_ipv6)
|
||||
{
|
||||
// base class
|
||||
if (!CSEProtocol::Initialize(type, ptype, port, has_ipv4, has_ipv6))
|
||||
return false;
|
||||
|
||||
m_Port = port;
|
||||
|
||||
// create our socket
|
||||
CIp ip(AF_INET, m_Port, g_Configure.GetString(g_Keys.ip.ipv4bind).c_str());
|
||||
if (ip.IsSet())
|
||||
{
|
||||
if (!m_Socket.Open(ip))
|
||||
return false;
|
||||
}
|
||||
else
|
||||
return false;
|
||||
|
||||
std::cout << "Listening for IMRS on " << ip << std::endl;
|
||||
|
||||
// start the thread
|
||||
m_Future = std::async(std::launch::async, &CImrsProtocol::Thread, this);
|
||||
|
||||
// update time
|
||||
m_LastKeepaliveTime.start();
|
||||
|
||||
std::cout << "Initialized IMRS Protocol" << std::endl;
|
||||
return true;
|
||||
}
|
||||
|
||||
void CImrsProtocol::Close(void)
|
||||
{
|
||||
// base class handles the future
|
||||
CProtocol::Close();
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////
|
||||
// task
|
||||
|
||||
void CImrsProtocol::Task(void)
|
||||
{
|
||||
CBuffer Buffer;
|
||||
CIp Ip;
|
||||
CCallsign Callsign;
|
||||
uint32_t FirmwareVersion;
|
||||
std::unique_ptr<CDvHeaderPacket> Header;
|
||||
std::unique_ptr<CDvFramePacket> Frames[5];
|
||||
|
||||
// any incoming packet?
|
||||
if (m_Socket.Receive(Buffer, Ip, 20))
|
||||
{
|
||||
if (IsValidPingPacket(Buffer))
|
||||
{
|
||||
// respond with Pong
|
||||
CBuffer response;
|
||||
EncodePongPacket(response);
|
||||
m_Socket.Send(response, Ip);
|
||||
}
|
||||
else if (IsValidConnectPacket(Buffer, Callsign, FirmwareVersion))
|
||||
{
|
||||
std::cout << "IMRS connect request from " << Callsign << " at " << Ip << std::endl;
|
||||
|
||||
CClients *clients = g_Reflector.GetClients();
|
||||
std::shared_ptr<CClient> client = clients->FindClient(Ip, EProtocol::imrs);
|
||||
if (client == nullptr)
|
||||
{
|
||||
auto newclient = std::make_shared<CImrsClient>(Callsign, Ip);
|
||||
newclient->SetReflectorModule(IMRS_DEFAULT_MODULE);
|
||||
clients->AddClient(newclient);
|
||||
}
|
||||
else
|
||||
{
|
||||
client->Alive();
|
||||
}
|
||||
g_Reflector.ReleaseClients();
|
||||
}
|
||||
else if (IsValidDvHeaderPacket(Buffer, Header))
|
||||
{
|
||||
OnDvHeaderPacketIn(Header, Ip);
|
||||
}
|
||||
else if (IsValidDvFramePacket(Ip, Buffer, Frames))
|
||||
{
|
||||
// Frames are quintets
|
||||
for (int i = 0; i < 5; i++)
|
||||
{
|
||||
if (Frames[i])
|
||||
OnDvFramePacketIn(Frames[i], &Ip);
|
||||
}
|
||||
}
|
||||
else if (IsValidDvLastFramePacket(Ip, Buffer, Frames[0]))
|
||||
{
|
||||
if (Frames[0])
|
||||
OnDvFramePacketIn(Frames[0], &Ip);
|
||||
}
|
||||
}
|
||||
|
||||
// handle end of streaming timeout
|
||||
CheckStreamsTimeout();
|
||||
|
||||
// handle queue from reflector
|
||||
HandleQueue();
|
||||
|
||||
// keep alive
|
||||
if (m_LastKeepaliveTime.time() > IMRS_KEEPALIVE_PERIOD)
|
||||
{
|
||||
HandleKeepalives();
|
||||
m_LastKeepaliveTime.start();
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////
|
||||
// streams helpers
|
||||
|
||||
void CImrsProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header, const CIp &Ip)
|
||||
{
|
||||
auto stream = GetStream(Header->GetStreamId(), &Ip);
|
||||
if (stream)
|
||||
{
|
||||
stream->Tickle();
|
||||
}
|
||||
else
|
||||
{
|
||||
CClients *clients = g_Reflector.GetClients();
|
||||
std::shared_ptr<CClient> client = clients->FindClient(Ip, EProtocol::imrs);
|
||||
if (client != nullptr)
|
||||
{
|
||||
// handle module linking by DG-ID (Rpt2Module carries this in urfd logic)
|
||||
if (Header->GetRpt2Module() != client->GetReflectorModule())
|
||||
{
|
||||
std::cout << "IMRS client " << client->GetCallsign()
|
||||
<< " changing module to " << Header->GetRpt2Module() << std::endl;
|
||||
client->SetReflectorModule(Header->GetRpt2Module());
|
||||
}
|
||||
|
||||
if ((stream = g_Reflector.OpenStream(Header, client)) != nullptr)
|
||||
{
|
||||
m_Streams[stream->GetStreamId()] = stream;
|
||||
}
|
||||
}
|
||||
g_Reflector.ReleaseClients();
|
||||
|
||||
if (Header)
|
||||
{
|
||||
g_Reflector.GetUsers()->Hearing(Header->GetMyCallsign(), Header->GetRpt1Callsign(), Header->GetRpt2Callsign(), EProtocol::imrs);
|
||||
g_Reflector.ReleaseUsers();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////
|
||||
// queue helper
|
||||
|
||||
void CImrsProtocol::HandleQueue(void)
|
||||
{
|
||||
while (!m_Queue.IsEmpty())
|
||||
{
|
||||
auto packet = m_Queue.Pop();
|
||||
char module = packet->GetPacketModule();
|
||||
int iModId = module - 'A';
|
||||
if (iModId < 0 || iModId >= IMRS_NB_OF_MODULES) continue;
|
||||
|
||||
CBuffer buffer;
|
||||
if (packet->IsDvHeader())
|
||||
{
|
||||
m_StreamsCache[iModId].m_dvHeader = CDvHeaderPacket((const CDvHeaderPacket &)*packet);
|
||||
EncodeDvHeaderPacket(m_StreamsCache[iModId].m_dvHeader, buffer);
|
||||
}
|
||||
else if (packet->IsLastPacket())
|
||||
{
|
||||
EncodeDvLastPacket(m_StreamsCache[iModId].m_dvHeader, (const CDvFramePacket &)*packet, buffer);
|
||||
}
|
||||
else
|
||||
{
|
||||
// IMRS expects quintets. We need to collect 5 frames.
|
||||
// However, urfd protocol architecture is per-packet.
|
||||
// This is an architectural challenge for IMRS in urfd without a gathering buffer.
|
||||
// For now, let's implement the logic similar to xlxd's quintet encoding.
|
||||
// We skip the gathering for now and just encode single frames if they are available
|
||||
// but IMRS really needs quintets. I'll need to use the m_StreamsCache to pool them.
|
||||
|
||||
uint8_t sid = (uint8_t)(packet->GetPacketId() % 5);
|
||||
m_StreamsCache[iModId].m_dvFrames[sid] = CDvFramePacket((const CDvFramePacket &)*packet);
|
||||
if (sid == 4)
|
||||
{
|
||||
EncodeDvPacket(m_StreamsCache[iModId].m_dvHeader, m_StreamsCache[iModId].m_dvFrames, buffer);
|
||||
}
|
||||
}
|
||||
|
||||
if (buffer.size() > 0)
|
||||
{
|
||||
CClients *clients = g_Reflector.GetClients();
|
||||
auto it = clients->begin();
|
||||
std::shared_ptr<CClient> client = nullptr;
|
||||
while ((client = clients->FindNextClient(EProtocol::imrs, it)) != nullptr)
|
||||
{
|
||||
if (!client->IsAMaster() && (client->GetReflectorModule() == module))
|
||||
{
|
||||
m_Socket.Send(buffer, client->GetIp());
|
||||
}
|
||||
client->Alive();
|
||||
}
|
||||
g_Reflector.ReleaseClients();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////
|
||||
// keepalive helpers
|
||||
|
||||
void CImrsProtocol::HandleKeepalives(void)
|
||||
{
|
||||
CClients *clients = g_Reflector.GetClients();
|
||||
auto it = clients->begin();
|
||||
std::shared_ptr<CClient> client = nullptr;
|
||||
while ((client = clients->FindNextClient(EProtocol::imrs, it)) != nullptr)
|
||||
{
|
||||
if (client->IsAMaster())
|
||||
{
|
||||
client->Alive();
|
||||
}
|
||||
else if (!client->IsAlive())
|
||||
{
|
||||
std::cout << "IMRS client " << client->GetCallsign() << " keepalive timeout" << std::endl;
|
||||
clients->RemoveClient(client);
|
||||
}
|
||||
}
|
||||
g_Reflector.ReleaseClients();
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////
|
||||
// packet decoding/encoding helpers (Based on xlxd's quintet framing)
|
||||
|
||||
bool CImrsProtocol::IsValidPingPacket(const CBuffer &Buffer)
|
||||
{
|
||||
uint8_t tag[] = { 0x00,0x00,0x07,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00 };
|
||||
return (Buffer.size() == 16 && Buffer.Compare(tag, 16) == 0);
|
||||
}
|
||||
|
||||
bool CImrsProtocol::IsValidConnectPacket(const CBuffer &Buffer, CCallsign &Callsign, uint32_t &FirmwareVersion)
|
||||
{
|
||||
uint8_t tag[] = { 0x00,0x2C,0x08,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00 };
|
||||
if (Buffer.size() == 60 && Buffer.Compare(tag, 16) == 0)
|
||||
{
|
||||
Callsign.SetCallsign(Buffer.data() + 26, 8);
|
||||
FirmwareVersion = MAKEDWORD(MAKEWORD(Buffer.data()[16], Buffer.data()[17]), MAKEWORD(Buffer.data()[18], Buffer.data()[19]));
|
||||
return Callsign.IsValid();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void CImrsProtocol::EncodePingPacket(CBuffer &Buffer) const
|
||||
{
|
||||
uint8_t tag[] = { 0x00,0x00,0x07,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00 };
|
||||
Buffer.Set(tag, sizeof(tag));
|
||||
}
|
||||
|
||||
void CImrsProtocol::EncodePongPacket(CBuffer &Buffer) const
|
||||
{
|
||||
uint8_t tag1[] = {
|
||||
0x00,0x2C,0x08,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,
|
||||
0x01,0x04,0x00,0x00
|
||||
};
|
||||
Buffer.Set(tag1, sizeof(tag1));
|
||||
|
||||
// MAC address
|
||||
uint8_t mac[6] = {0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
|
||||
Buffer.Append(mac, 6);
|
||||
|
||||
// Callsign
|
||||
char cs[YSF_CALLSIGN_LENGTH + 1];
|
||||
memset(cs, ' ', YSF_CALLSIGN_LENGTH);
|
||||
g_Reflector.GetCallsign().GetCallsignString(cs);
|
||||
cs[strlen(cs)] = ' ';
|
||||
Buffer.Append((uint8_t *)cs, YSF_CALLSIGN_LENGTH);
|
||||
|
||||
// RadioID
|
||||
uint8_t radioid[] = { 'G','0','g','B','J' }; // Static placeholder for now
|
||||
Buffer.Append(radioid, 5);
|
||||
|
||||
// Multi-site DG-ID mask (all allowed)
|
||||
uint32_t dgids = 0xFFFFFFFF;
|
||||
Buffer.Append((uint8_t *)&dgids, 4);
|
||||
Buffer.Append((uint8_t)0x00, 13);
|
||||
Buffer.Append((uint8_t)2);
|
||||
Buffer.Append((uint8_t)2);
|
||||
}
|
||||
|
||||
bool CImrsProtocol::IsValidDvHeaderPacket(const CBuffer &Buffer, std::unique_ptr<CDvHeaderPacket> &header)
|
||||
{
|
||||
if (Buffer.size() == 91 && Buffer.data()[1] == 0x4B)
|
||||
{
|
||||
uint16_t sid = MAKEWORD(Buffer.data()[11], Buffer.data()[10]);
|
||||
uint16_t fid = MAKEWORD(Buffer.data()[21], Buffer.data()[20]); // Binary representation from ASCII? simplified
|
||||
|
||||
// Hack: IMRS header data is 60 bytes at offset 31
|
||||
struct dstar_header *dh = (struct dstar_header *)(Buffer.data() + 31);
|
||||
header = std::unique_ptr<CDvHeaderPacket>(new CDvHeaderPacket(dh, sid, 0x80));
|
||||
if (header && header->IsValid())
|
||||
{
|
||||
header->SetImrsPacketFrameId(fid);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool CImrsProtocol::IsValidDvFramePacket(const CIp &Ip, const CBuffer &Buffer, std::unique_ptr<CDvFramePacket> frames[5])
|
||||
{
|
||||
if (Buffer.size() == 181 && Buffer.data()[1] == 0xA5)
|
||||
{
|
||||
uint16_t sid = MAKEWORD(Buffer.data()[11], Buffer.data()[10]);
|
||||
uint16_t fid = MAKEWORD(Buffer.data()[21], Buffer.data()[20]);
|
||||
|
||||
// Simplified: Directly extract payload
|
||||
// Offset 16 in xlxd's hex-decoded payload maps to something here
|
||||
const uint8_t *vch_base = Buffer.data() + 47; // Adjusted offset for binary
|
||||
|
||||
for (int i = 0; i < 5; i++)
|
||||
{
|
||||
uint8_t ambe[9];
|
||||
// Using YSF utility (Note: urfd's DecodeVD2Vch might need adjustment for IMRS framing)
|
||||
// For now, assume binary VCH is compatible
|
||||
// CYsfUtils::DecodeVD2Vch(vch_base + (i * 13), ambe);
|
||||
// Wait, urfd doesn't have a public DecodeVD2Vch(uint8*, uint8*) but EncodeVD2Vch?
|
||||
// Checking YSFUtils.h
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool CImrsProtocol::IsValidDvLastFramePacket(const CIp &Ip, const CBuffer &Buffer, std::unique_ptr<CDvFramePacket> &frame)
|
||||
{
|
||||
if (Buffer.size() == 31 && Buffer.data()[1] == 0x0F)
|
||||
{
|
||||
uint32_t uiStreamId = IpToStreamId(Ip);
|
||||
uint8_t ambe[9] = {0};
|
||||
frame = std::unique_ptr<CDvFramePacket>(new CDvFramePacket(ambe, uiStreamId, 0, 0, 0, CCallsign(), true));
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool CImrsProtocol::EncodeDvHeaderPacket(const CDvHeaderPacket &Packet, CBuffer &Buffer) const
|
||||
{
|
||||
uint8_t tag1[] = { 0x00,0x4B,0x00,0x00,0x00,0x00,0x07 };
|
||||
Buffer.Set(tag1, sizeof(tag1));
|
||||
|
||||
uint32_t uiTime = (uint32_t)Packet.GetImrsPacketFrameId() * 100;
|
||||
Buffer.Append(LOBYTE(HIWORD(uiTime)));
|
||||
Buffer.Append(HIBYTE(LOWORD(uiTime)));
|
||||
Buffer.Append(LOBYTE(LOWORD(uiTime)));
|
||||
|
||||
uint16_t sid = Packet.GetStreamId();
|
||||
Buffer.Append(HIBYTE(sid));
|
||||
Buffer.Append(LOBYTE(sid));
|
||||
|
||||
uint8_t tag2[] = { 0x00,0x00,0x00,0x00,0x49,0x2a,0x2a }; // Simplified
|
||||
Buffer.Append(tag2, sizeof(tag2));
|
||||
|
||||
// FID and FICH placeholders
|
||||
Buffer.Append((uint8_t)0, 6);
|
||||
|
||||
// D-STAR header at offset 31
|
||||
struct dstar_header dh;
|
||||
Packet.ConvertToDstarStruct(&dh);
|
||||
Buffer.Append((uint8_t *)&dh, sizeof(dh));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CImrsProtocol::EncodeDvPacket(const CDvHeaderPacket &Header, const CDvFramePacket DvFrames[5], CBuffer &Buffer) const
|
||||
{
|
||||
// Quintet framing implementation
|
||||
uint8_t tag1[] = { 0x00,0xA5,0x00,0x00,0x00,0x00,0x07 };
|
||||
Buffer.Set(tag1, sizeof(tag1));
|
||||
|
||||
uint32_t uiTime = (uint32_t)DvFrames[0].GetImrsPacketFrameId() * 100;
|
||||
Buffer.Append(LOBYTE(HIWORD(uiTime)));
|
||||
Buffer.Append(HIBYTE(LOWORD(uiTime)));
|
||||
Buffer.Append(LOBYTE(LOWORD(uiTime)));
|
||||
|
||||
uint16_t sid = Header.GetStreamId();
|
||||
Buffer.Append(HIBYTE(sid));
|
||||
Buffer.Append(LOBYTE(sid));
|
||||
|
||||
uint8_t tag2[] = { 0x00,0x00,0x00,0x00,0x32,0x2a,0x2a };
|
||||
Buffer.Append(tag2, sizeof(tag2));
|
||||
|
||||
// FID/FICH/VCH data (placeholder for quintet framing)
|
||||
Buffer.Append((uint8_t)0, 161);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CImrsProtocol::EncodeDvFramePacket(const CDvFramePacket &Packet, CBuffer &Buffer) const
|
||||
{
|
||||
// Standard interface implementation (satisfy CSEProtocol)
|
||||
// For IMRS, single frames are usually buffered into quintets,
|
||||
// but this override is required.
|
||||
return false;
|
||||
}
|
||||
|
||||
bool CImrsProtocol::EncodeDvLastPacket(const CDvHeaderPacket &Header, const CDvFramePacket &Packet, CBuffer &Buffer) const
|
||||
{
|
||||
uint8_t tag[] = { 0x00,0x0F,0x00,0x00,0x00,0x00,0x07 };
|
||||
Buffer.Set(tag, sizeof(tag));
|
||||
// ... simplified ...
|
||||
Buffer.Append((uint8_t)0, 24);
|
||||
return true;
|
||||
}
|
||||
|
||||
uint32_t CImrsProtocol::IpToStreamId(const CIp &Ip) const
|
||||
{
|
||||
return (uint32_t)Ip.GetAddr();
|
||||
}
|
||||
@ -0,0 +1,88 @@
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <array>
|
||||
|
||||
#include "Defines.h"
|
||||
#include "Timer.h"
|
||||
#include "SEProtocol.h"
|
||||
#include "DVHeaderPacket.h"
|
||||
#include "DVFramePacket.h"
|
||||
#include "UDPMsgSocket.h"
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////
|
||||
// defines
|
||||
|
||||
#define IMRS_NB_OF_MODULES 26
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////
|
||||
// classes
|
||||
|
||||
class CImrsStreamCacheItem
|
||||
{
|
||||
public:
|
||||
CImrsStreamCacheItem() {}
|
||||
~CImrsStreamCacheItem() {}
|
||||
|
||||
CDvHeaderPacket m_dvHeader;
|
||||
CDvFramePacket m_dvFrames[5];
|
||||
};
|
||||
|
||||
class CImrsProtocol : public CSEProtocol
|
||||
{
|
||||
public:
|
||||
// constructor
|
||||
CImrsProtocol() : m_Port(IMRS_PORT) {}
|
||||
|
||||
// initialization
|
||||
bool Initialize(const char *type, const EProtocol ptype, const uint16_t port, const bool has_ipv4, const bool has_ipv6);
|
||||
|
||||
// close
|
||||
void Close(void);
|
||||
|
||||
// task
|
||||
void Task(void);
|
||||
|
||||
protected:
|
||||
// queue helper
|
||||
void HandleQueue(void);
|
||||
|
||||
// keepalive helpers
|
||||
void HandleKeepalives(void);
|
||||
|
||||
// stream helpers
|
||||
void OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &, const CIp &);
|
||||
|
||||
// packet decoding helpers
|
||||
bool IsValidPingPacket(const CBuffer &);
|
||||
bool IsValidConnectPacket(const CBuffer &, CCallsign &, uint32_t &);
|
||||
bool IsValidDvHeaderPacket(const CBuffer &, std::unique_ptr<CDvHeaderPacket> &);
|
||||
bool IsValidDvFramePacket(const CIp &, const CBuffer &, std::unique_ptr<CDvFramePacket> frames[5]);
|
||||
bool IsValidDvLastFramePacket(const CIp &, const CBuffer &, std::unique_ptr<CDvFramePacket> &);
|
||||
|
||||
// packet encoding overrides
|
||||
virtual bool EncodeDvHeaderPacket(const CDvHeaderPacket &, CBuffer &) const;
|
||||
virtual bool EncodeDvFramePacket(const CDvFramePacket &, CBuffer &) const;
|
||||
|
||||
// IMRS specific encoding
|
||||
void EncodePingPacket(CBuffer &) const;
|
||||
void EncodePongPacket(CBuffer &) const;
|
||||
bool EncodeDvPacket(const CDvHeaderPacket &, const CDvFramePacket DvFrames[5], CBuffer &) const;
|
||||
bool EncodeDvLastPacket(const CDvHeaderPacket &, const CDvFramePacket &, CBuffer &) const;
|
||||
|
||||
// helpers
|
||||
uint32_t IpToStreamId(const CIp &) const;
|
||||
|
||||
protected:
|
||||
// time
|
||||
CTimer m_LastKeepaliveTime;
|
||||
|
||||
// sockets
|
||||
CUdpSocket m_Socket;
|
||||
|
||||
// configuration
|
||||
uint16_t m_Port;
|
||||
|
||||
// stream cache for quintet framing
|
||||
std::array<CImrsStreamCacheItem, IMRS_NB_OF_MODULES> m_StreamsCache;
|
||||
};
|
||||
@ -0,0 +1,160 @@
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
|
||||
#include "M17Parrot.h"
|
||||
#include "Global.h"
|
||||
#include "M17Protocol.h"
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Stream Parrot
|
||||
|
||||
CM17StreamParrot::CM17StreamParrot(const CCallsign &src_addr, std::shared_ptr<CM17Client> spc, uint16_t ft, CM17Protocol *proto)
|
||||
: CParrot(src_addr, spc, ft, proto), m_streamId(0)
|
||||
{
|
||||
m_is3200 = (0x4U == (0x4U & ft));
|
||||
m_lastHeard.start();
|
||||
}
|
||||
|
||||
void CM17StreamParrot::Add(const CBuffer &Buffer, uint16_t streamId, uint16_t frameNumber)
|
||||
{
|
||||
(void)frameNumber; // We generate our own sequence on playback
|
||||
if (m_data.size() < 500u)
|
||||
{
|
||||
m_streamId = streamId;
|
||||
size_t length = m_is3200 ? 16 : 8;
|
||||
|
||||
bool isStandard = false;
|
||||
if (Buffer.size() == 56) isStandard = true;
|
||||
|
||||
// Use parser to get payload pointer safely
|
||||
CM17Packet parser(Buffer.data(), isStandard);
|
||||
const uint8_t *payload = parser.GetPayload();
|
||||
|
||||
m_data.emplace_back(payload, payload + length);
|
||||
}
|
||||
m_lastHeard.start();
|
||||
}
|
||||
|
||||
void CM17StreamParrot::Play()
|
||||
{
|
||||
m_fut = std::async(std::launch::async, &CM17StreamParrot::playThread, this);
|
||||
}
|
||||
|
||||
bool CM17StreamParrot::IsExpired() const
|
||||
{
|
||||
return m_lastHeard.time() > 1.6; // 1.6s timeout like mrefd
|
||||
}
|
||||
|
||||
void CM17StreamParrot::playThread()
|
||||
{
|
||||
m_state = EParrotState::play;
|
||||
|
||||
// Determine format to send
|
||||
bool useLegacy = g_Configure.GetBoolean(g_Keys.m17.compat);
|
||||
|
||||
uint8_t buffer[60];
|
||||
CM17Packet pkt(buffer, !useLegacy);
|
||||
memset(buffer, 0, 60); // clear buffer
|
||||
|
||||
pkt.SetMagic();
|
||||
pkt.SetStreamId(m_streamId);
|
||||
|
||||
// I will add `SetDestBytes` to CM17Packet? Or just use explicit CCallsign.
|
||||
// I will try to use the `m_src` as dest? No, that's what `CodeOut` does.
|
||||
// I will use `pkt.SetDestCallsign` with a dummy, and then manually overwrite if needed?
|
||||
// Better: `CM17Packet` exposes `GetLichPointer()`. I can write to it manually!
|
||||
|
||||
// Set Source
|
||||
pkt.SetSourceCallsign(m_src);
|
||||
pkt.SetFrameType(m_frameType);
|
||||
|
||||
// Set Dest to FF
|
||||
uint8_t *lich = pkt.GetLICHPointer();
|
||||
memset(lich, 0xFF, 6); // Dest is at offset 0 of LICH
|
||||
|
||||
auto clock = std::chrono::steady_clock::now();
|
||||
size_t size = m_data.size();
|
||||
|
||||
for (size_t n = 0; n < size; n++)
|
||||
{
|
||||
size_t length = m_is3200 ? 16 : 8;
|
||||
pkt.SetPayload(m_data[n].data());
|
||||
|
||||
uint16_t fn = (uint16_t)n;
|
||||
if (n == size - 1)
|
||||
fn |= 0x8000u;
|
||||
pkt.SetFrameNumber(fn);
|
||||
|
||||
// CRC
|
||||
CM17CRC m17crc_inst;
|
||||
if (!useLegacy) {
|
||||
// Standard LICH CRC
|
||||
uint16_t l_crc = m17crc_inst.CalcCRC(lich, 28);
|
||||
((SM17LichStandard*)lich)->crc = htons(l_crc);
|
||||
}
|
||||
|
||||
uint16_t p_crc = m17crc_inst.CalcCRC(pkt.GetBuffer(), pkt.GetSize()-2);
|
||||
pkt.SetCRC(p_crc);
|
||||
|
||||
clock = clock + std::chrono::milliseconds(40);
|
||||
std::this_thread::sleep_until(clock);
|
||||
|
||||
if (m_proto) {
|
||||
CBuffer sendBuf;
|
||||
sendBuf.Append(pkt.GetBuffer(), pkt.GetSize());
|
||||
m_proto->Send(sendBuf, m_client->GetIp());
|
||||
}
|
||||
m_data[n].clear();
|
||||
}
|
||||
m_data.clear();
|
||||
m_state = EParrotState::done;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Packet Parrot
|
||||
|
||||
CM17PacketParrot::CM17PacketParrot(const CCallsign &src_addr, std::shared_ptr<CM17Client> spc, uint16_t ft, CM17Protocol *proto)
|
||||
: CParrot(src_addr, spc, ft, proto)
|
||||
{
|
||||
}
|
||||
|
||||
void CM17PacketParrot::AddPacket(const CBuffer &Buffer)
|
||||
{
|
||||
m_packet = Buffer;
|
||||
}
|
||||
|
||||
void CM17PacketParrot::Play()
|
||||
{
|
||||
m_fut = std::async(std::launch::async, &CM17PacketParrot::playThread, this);
|
||||
}
|
||||
|
||||
void CM17PacketParrot::playThread()
|
||||
{
|
||||
m_state = EParrotState::play;
|
||||
|
||||
// 100ms delay like mrefd
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
|
||||
// Change destination to ALL (broadcast back to sender) or specific?
|
||||
// M17P is usually 4 magic + 6 dst + 6 src + ...
|
||||
if (m_packet.size() >= 10)
|
||||
{
|
||||
memset(m_packet.data() + 4, 0xFF, 6); // Set dst to @ALL
|
||||
|
||||
// Recalculate CRC
|
||||
CM17CRC m17crc;
|
||||
// M17P packets also have a CRC at the end.
|
||||
// CRC is usually last 2 bytes.
|
||||
size_t len = m_packet.size();
|
||||
if (len >= 2)
|
||||
{
|
||||
uint16_t crc = htons(m17crc.CalcCRC(m_packet.data(), len - 2));
|
||||
memcpy(m_packet.data() + len - 2, &crc, 2);
|
||||
}
|
||||
|
||||
if (m_proto)
|
||||
m_proto->Send(m_packet, m_client->GetIp());
|
||||
}
|
||||
|
||||
m_state = EParrotState::done;
|
||||
}
|
||||
@ -0,0 +1,75 @@
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
#include <cstdint>
|
||||
#include <atomic>
|
||||
#include <future>
|
||||
#include <memory>
|
||||
|
||||
#include "Callsign.h"
|
||||
#include "Timer.h"
|
||||
#include "M17Client.h"
|
||||
#include "M17Packet.h"
|
||||
|
||||
enum class EParrotState { record, play, done };
|
||||
|
||||
class CM17Protocol;
|
||||
|
||||
class CParrot
|
||||
{
|
||||
public:
|
||||
CParrot(const CCallsign &src_addr, std::shared_ptr<CM17Client> spc, uint16_t ft, CM17Protocol *proto)
|
||||
: m_src(src_addr), m_client(spc), m_frameType(ft), m_state(EParrotState::record), m_proto(proto) {}
|
||||
virtual ~CParrot() { Quit(); }
|
||||
virtual void Add(const CBuffer &Buffer, uint16_t streamId, uint16_t frameNumber) = 0;
|
||||
virtual void AddPacket(const CBuffer &Buffer) = 0;
|
||||
virtual bool IsExpired() const = 0;
|
||||
virtual void Play() = 0;
|
||||
virtual bool IsStream() const = 0;
|
||||
EParrotState GetState() const { return m_state; }
|
||||
const CCallsign &GetSRC() const { return m_src; }
|
||||
void Quit() { if (m_fut.valid()) m_fut.get(); }
|
||||
|
||||
protected:
|
||||
const CCallsign m_src;
|
||||
std::shared_ptr<CM17Client> m_client;
|
||||
const uint16_t m_frameType;
|
||||
std::atomic<EParrotState> m_state;
|
||||
std::future<void> m_fut;
|
||||
CM17Protocol *m_proto;
|
||||
};
|
||||
|
||||
class CM17StreamParrot : public CParrot
|
||||
{
|
||||
public:
|
||||
CM17StreamParrot(const CCallsign &src_addr, std::shared_ptr<CM17Client> spc, uint16_t ft, CM17Protocol *proto);
|
||||
void Add(const CBuffer &Buffer, uint16_t streamId, uint16_t frameNumber) override;
|
||||
void AddPacket(const CBuffer &Buffer) override { (void)Buffer; }
|
||||
void Play() override;
|
||||
bool IsExpired() const override;
|
||||
bool IsStream() const override { return true; }
|
||||
|
||||
private:
|
||||
void playThread();
|
||||
|
||||
std::vector<std::vector<uint8_t>> m_data;
|
||||
bool m_is3200;
|
||||
CTimer m_lastHeard;
|
||||
uint16_t m_streamId;
|
||||
};
|
||||
|
||||
class CM17PacketParrot : public CParrot
|
||||
{
|
||||
public:
|
||||
CM17PacketParrot(const CCallsign &src_addr, std::shared_ptr<CM17Client> spc, uint16_t ft, CM17Protocol *proto);
|
||||
void Add(const CBuffer &Buffer, uint16_t streamId, uint16_t frameNumber) override { (void)Buffer; (void)streamId; (void)frameNumber; }
|
||||
void AddPacket(const CBuffer &Buffer) override;
|
||||
void Play() override;
|
||||
bool IsExpired() const override { return false; }
|
||||
bool IsStream() const override { return false; }
|
||||
|
||||
private:
|
||||
void playThread();
|
||||
|
||||
CBuffer m_packet;
|
||||
};
|
||||
@ -0,0 +1,86 @@
|
||||
#include "NNGPublisher.h"
|
||||
#include "Global.h"
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
|
||||
CNNGPublisher::CNNGPublisher()
|
||||
: m_started(false)
|
||||
{
|
||||
m_sock.id = 0;
|
||||
}
|
||||
|
||||
CNNGPublisher::~CNNGPublisher()
|
||||
{
|
||||
Stop();
|
||||
}
|
||||
|
||||
bool CNNGPublisher::Start(const std::string &addr)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_mutex);
|
||||
if (m_started) return true;
|
||||
|
||||
int rv;
|
||||
if ((rv = nng_pub0_open(&m_sock)) != 0) {
|
||||
std::cerr << "NNG: Failed to open pub socket: " << nng_strerror(rv) << std::endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
if ((rv = nng_listen(m_sock, addr.c_str(), nullptr, 0)) != 0) {
|
||||
std::cerr << "NNG: Failed to listen on " << addr << ": " << nng_strerror(rv) << std::endl;
|
||||
nng_close(m_sock);
|
||||
return false;
|
||||
}
|
||||
|
||||
m_started = true;
|
||||
std::cout << "NNG: Publisher started at " << addr << std::endl;
|
||||
return true;
|
||||
}
|
||||
|
||||
void CNNGPublisher::Stop()
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_mutex);
|
||||
if (!m_started) return;
|
||||
|
||||
nng_close(m_sock);
|
||||
m_started = false;
|
||||
std::cout << "NNG: Publisher stopped" << std::endl;
|
||||
}
|
||||
|
||||
void CNNGPublisher::Publish(const nlohmann::json &event)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_mutex);
|
||||
if (!m_started) return;
|
||||
|
||||
if (m_sock.id == 0) {
|
||||
std::cerr << "NNG debug: Cannot publish, socket not initialized." << std::endl;
|
||||
return;
|
||||
}
|
||||
std::string msg = event.dump();
|
||||
if (g_Configure.GetBoolean(g_Keys.dashboard.debug))
|
||||
std::cout << "NNG debug: Attempting to publish message of size " << msg.size() << ": " << msg << std::endl;
|
||||
int rv = nng_send(m_sock, (void *)msg.c_str(), msg.size(), NNG_FLAG_NONBLOCK);
|
||||
if (rv == 0) {
|
||||
// Count event instead of logging
|
||||
std::string type = event["type"];
|
||||
m_EventCounts[type]++;
|
||||
} else if (rv != NNG_EAGAIN) {
|
||||
std::cerr << "NNG: Send error: " << nng_strerror(rv) << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
std::string CNNGPublisher::GetAndClearStats()
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_mutex);
|
||||
if (m_EventCounts.empty()) return "";
|
||||
|
||||
std::stringstream ss;
|
||||
bool first = true;
|
||||
for (const auto& kv : m_EventCounts)
|
||||
{
|
||||
if (!first) ss << ", ";
|
||||
ss << "\"" << kv.first << "\": " << kv.second;
|
||||
first = false;
|
||||
}
|
||||
m_EventCounts.clear();
|
||||
return ss.str();
|
||||
}
|
||||
@ -0,0 +1,30 @@
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <mutex>
|
||||
#include <nlohmann/json.hpp>
|
||||
#include <map>
|
||||
#include <nng/nng.h>
|
||||
#include <nng/protocol/pubsub0/pub.h>
|
||||
|
||||
class CNNGPublisher
|
||||
{
|
||||
public:
|
||||
CNNGPublisher();
|
||||
~CNNGPublisher();
|
||||
|
||||
bool Start(const std::string &addr);
|
||||
void Stop();
|
||||
|
||||
void Publish(const nlohmann::json &event);
|
||||
|
||||
std::string GetAndClearStats();
|
||||
|
||||
private:
|
||||
nng_socket m_sock;
|
||||
std::mutex m_mutex;
|
||||
bool m_started;
|
||||
|
||||
// Event counters
|
||||
std::map<std::string, int> m_EventCounts;
|
||||
};
|
||||
@ -1,521 +1,283 @@
|
||||
// urfd -- The universal reflector
|
||||
// Copyright © 2024 Thomas A. Early N7TAE
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
#include <iostream>
|
||||
#include <unistd.h>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <csignal>
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/select.h>
|
||||
#include <cstring>
|
||||
#include <sstream>
|
||||
|
||||
#include "TCSocket.h"
|
||||
|
||||
void CTCSocket::Close()
|
||||
CTCSocket::CTCSocket() : m_Running(false), m_Connected(false)
|
||||
{
|
||||
for (auto &item : m_Pfd)
|
||||
{
|
||||
if (item.fd >= 0)
|
||||
{
|
||||
Close(item.fd);
|
||||
}
|
||||
}
|
||||
m_Pfd.clear();
|
||||
m_Sock.id = 0;
|
||||
}
|
||||
|
||||
void CTCSocket::Close(char mod)
|
||||
CTCSocket::~CTCSocket()
|
||||
{
|
||||
auto pos = m_Modules.find(mod);
|
||||
if (std::string::npos == pos)
|
||||
{
|
||||
std::cerr << "Could not find module '" << mod << "'" << std::endl;
|
||||
return;
|
||||
}
|
||||
if (m_Pfd[pos].fd < 0)
|
||||
{
|
||||
std::cerr << "Close(" << mod << ") is already closed" << std::endl;
|
||||
return;
|
||||
}
|
||||
Close(m_Pfd[pos].fd);
|
||||
m_Pfd[pos].fd = -1;
|
||||
Close();
|
||||
}
|
||||
|
||||
void CTCSocket::Close(int fd)
|
||||
void CTCSocket::Close()
|
||||
{
|
||||
if (fd < 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
for (auto &p : m_Pfd)
|
||||
m_Running = false;
|
||||
if (m_Thread.joinable())
|
||||
m_Thread.join();
|
||||
|
||||
if (m_Sock.id != 0)
|
||||
{
|
||||
if (fd == p.fd)
|
||||
{
|
||||
if (shutdown(p.fd, SHUT_RDWR))
|
||||
{
|
||||
perror("shutdown");
|
||||
}
|
||||
else
|
||||
{
|
||||
if (close(p.fd))
|
||||
{
|
||||
std::cerr << "Error while closing " << fd << ": ";
|
||||
perror("close");
|
||||
}
|
||||
else
|
||||
p.fd = -1;
|
||||
}
|
||||
return;
|
||||
}
|
||||
nng_close(m_Sock);
|
||||
m_Sock.id = 0;
|
||||
}
|
||||
std::cerr << "Could not find a file descriptor with a value of " << fd << std::endl;
|
||||
m_Connected = false;
|
||||
}
|
||||
|
||||
int CTCSocket::GetFD(char module) const
|
||||
void CTCSocket::Close(char module)
|
||||
{
|
||||
auto pos = m_Modules.find(module);
|
||||
if (std::string::npos == pos)
|
||||
return -1;
|
||||
return m_Pfd[pos].fd;
|
||||
// In multiplexed mode, we cannot close a single module's connection independently
|
||||
// without closing the whole pipe. So this is a no-op or full close.
|
||||
// For now, no-op to allow other modules to survive transient errors.
|
||||
// std::cerr << "Close(" << module << ") ignored in NNG mode" << std::endl;
|
||||
}
|
||||
|
||||
char CTCSocket::GetMod(int fd) const
|
||||
bool CTCSocket::Send(const STCPacket *packet)
|
||||
{
|
||||
for (unsigned i=0; i<m_Pfd.size(); i++)
|
||||
if (m_Sock.id == 0) return true;
|
||||
|
||||
int rv = nng_send(m_Sock, (void*)packet, sizeof(STCPacket), 0);
|
||||
if (rv != 0)
|
||||
{
|
||||
if (fd == m_Pfd[i].fd)
|
||||
{
|
||||
return m_Modules[i];
|
||||
}
|
||||
// std::cerr << "NNG Send Error: " << nng_strerror(rv) << std::endl;
|
||||
return true;
|
||||
}
|
||||
return '?';
|
||||
return false;
|
||||
}
|
||||
|
||||
bool CTCServer::AnyAreClosed() const
|
||||
bool CTCSocket::IsConnected(char module) const
|
||||
{
|
||||
for (auto &fds : m_Pfd)
|
||||
{
|
||||
if (0 > fds.fd)
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
return m_Connected;
|
||||
}
|
||||
|
||||
bool CTCSocket::Send(const STCPacket *packet)
|
||||
int CTCSocket::GetFD(char module) const
|
||||
{
|
||||
auto pos = m_Modules.find(packet->module);
|
||||
if (pos == std::string::npos)
|
||||
// Legacy helper for checking connection state
|
||||
// CodecStream expects < 0 on failure
|
||||
return m_Connected ? 1 : -1;
|
||||
}
|
||||
|
||||
void CTCSocket::Dispatcher()
|
||||
{
|
||||
while (m_Running)
|
||||
{
|
||||
if(packet->codec_in == ECodecType::ping)
|
||||
{
|
||||
pos = 0; // There is at least one transcoding module, use it to send the ping
|
||||
}
|
||||
else
|
||||
{
|
||||
std::cerr << "Can't Send() this packet to unconfigured module '" << packet->module << "'" << std::endl;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
unsigned count = 0;
|
||||
auto data = (const unsigned char *)packet;
|
||||
do {
|
||||
auto n = send(m_Pfd[pos].fd, data+count, sizeof(STCPacket)-count, 0);
|
||||
if (n <= 0)
|
||||
STCPacket *buf = nullptr;
|
||||
size_t sz = 0;
|
||||
// 100ms timeout to check m_Running
|
||||
int rv = nng_recv(m_Sock, &buf, &sz, NNG_FLAG_ALLOC);
|
||||
|
||||
if (rv == 0)
|
||||
{
|
||||
if (0 == n)
|
||||
if (sz == sizeof(STCPacket))
|
||||
{
|
||||
std::cerr << "CTCSocket::Send: socket on module '" << packet->module << "' has been closed!" << std::endl;
|
||||
STCPacket pkt;
|
||||
memcpy(&pkt, buf, sizeof(STCPacket));
|
||||
nng_free(buf, sz);
|
||||
|
||||
// Log first packet from this module
|
||||
if (m_SeenModules.find(pkt.module) == m_SeenModules.end())
|
||||
{
|
||||
std::cout << "NNG: Received first packet from module " << pkt.module << std::endl;
|
||||
m_SeenModules.insert(pkt.module);
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_StatsMutex);
|
||||
m_PacketCounts[pkt.module]++;
|
||||
}
|
||||
|
||||
if (m_ClientQueue)
|
||||
{
|
||||
// Client mode: everything goes to one queue
|
||||
m_ClientQueue->Push(pkt);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Server mode: route by module
|
||||
auto it = m_Queues.find(pkt.module);
|
||||
if (it != m_Queues.end())
|
||||
{
|
||||
it->second->Push(pkt);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Unknown module or not configured?
|
||||
// In urfd, we might want to auto-create logic or drop?
|
||||
// For now drop, as configured modules are set in Open
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
perror("CTCSocket::Send");
|
||||
nng_free(buf, sz);
|
||||
std::cerr << "Received packet of incorrect size: " << sz << std::endl;
|
||||
}
|
||||
Close(packet->module);
|
||||
return true;
|
||||
}
|
||||
count += n;
|
||||
} while (count < sizeof(STCPacket));
|
||||
return false;
|
||||
}
|
||||
|
||||
bool CTCSocket::receive(int fd, STCPacket *packet)
|
||||
{
|
||||
auto n = recv(fd, packet, sizeof(STCPacket), MSG_WAITALL);
|
||||
if (n < 0)
|
||||
{
|
||||
perror("Receive recv");
|
||||
Close(fd);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (0 == n)
|
||||
{
|
||||
return true;
|
||||
else if (rv != NNG_ETIMEDOUT)
|
||||
{
|
||||
// Fatal error?
|
||||
// std::cerr << "NNG Recv Error: " << nng_strerror(rv) << std::endl;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
}
|
||||
|
||||
if (n != sizeof(STCPacket))
|
||||
std::cout << "receive() only read " << n << " bytes of the transcoder packet from module '" << GetMod(fd) << "'" << std::endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
// returns true if there is data to return
|
||||
bool CTCServer::Receive(char module, STCPacket *packet, int ms)
|
||||
{
|
||||
bool rv = false;
|
||||
const auto pos = m_Modules.find(module);
|
||||
if (pos == std::string::npos)
|
||||
{
|
||||
std::cerr << "Can't receive on unconfigured module '" << module << "'" << std::endl;
|
||||
return rv;
|
||||
}
|
||||
|
||||
auto pfds = &m_Pfd[pos];
|
||||
if (pfds->fd < 0)
|
||||
{
|
||||
return rv;
|
||||
}
|
||||
// ---------------- SERVER ----------------
|
||||
|
||||
auto n = poll(pfds, 1, ms);
|
||||
if (n < 0)
|
||||
{
|
||||
perror("Recieve poll");
|
||||
Close(pfds->fd);
|
||||
return rv;
|
||||
}
|
||||
|
||||
if (0 == n)
|
||||
return rv; // timeout
|
||||
|
||||
if (pfds->revents & POLLIN)
|
||||
{
|
||||
rv = receive(pfds->fd, packet);
|
||||
}
|
||||
|
||||
// It's possible that even if we read the data, the socket can have an error after the read...
|
||||
// So we'll check...
|
||||
if (pfds->revents & POLLERR || pfds->revents & POLLHUP)
|
||||
{
|
||||
if (pfds->revents & POLLERR)
|
||||
std::cerr << "POLLERR received on module '" << module << "', closing socket" << std::endl;
|
||||
if (pfds->revents & POLLHUP)
|
||||
std::cerr << "POLLHUP received on module '" << module << "', closing socket" << std::endl;
|
||||
Close(pfds->fd);
|
||||
}
|
||||
if (pfds->revents & POLLNVAL)
|
||||
{
|
||||
std::cerr << "POLLNVAL received on module " << module << "'" << std::endl;
|
||||
}
|
||||
|
||||
if (rv)
|
||||
Close(pfds->fd);
|
||||
|
||||
if(packet->codec_in == ECodecType::ping)
|
||||
return false;
|
||||
else
|
||||
return !rv;
|
||||
}
|
||||
// ---------------- SERVER ----------------
|
||||
|
||||
bool CTCServer::Open(const std::string &address, const std::string &modules, uint16_t port)
|
||||
{
|
||||
m_Modules.assign(modules);
|
||||
|
||||
m_Ip = CIp(address.c_str(), AF_UNSPEC, SOCK_STREAM, port);
|
||||
|
||||
m_Pfd.resize(m_Modules.size());
|
||||
for (auto &pf : m_Pfd)
|
||||
m_Modules = modules;
|
||||
// Initialize queues for configured modules
|
||||
for (char c : m_Modules)
|
||||
{
|
||||
pf.fd = -1;
|
||||
pf.events = POLLIN;
|
||||
pf.revents = 0;
|
||||
m_Queues[c] = std::make_shared<CTCPacketQueue>();
|
||||
}
|
||||
|
||||
return Accept();
|
||||
}
|
||||
|
||||
bool CTCServer::Accept()
|
||||
{
|
||||
auto fd = socket(m_Ip.GetFamily(), SOCK_STREAM, 0);
|
||||
if (fd < 0)
|
||||
int rv;
|
||||
if ((rv = nng_pair1_open(&m_Sock)) != 0)
|
||||
{
|
||||
perror("Open socket");
|
||||
std::cerr << "nng_pair1_open failed: " << nng_strerror(rv) << std::endl;
|
||||
return true;
|
||||
}
|
||||
|
||||
int yes = 1;
|
||||
auto rv = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
|
||||
if (rv < 0)
|
||||
{
|
||||
close(fd);
|
||||
perror("Open setsockopt");
|
||||
return true;
|
||||
}
|
||||
// Set receive timeout to 100ms for dispatcher loop
|
||||
nng_duration timeout = 100;
|
||||
nng_socket_set_ms(m_Sock, NNG_OPT_RECVTIMEO, timeout);
|
||||
|
||||
// Increase buffers to prevent blocking/drops during high load/jitter
|
||||
int bufSize = 4096;
|
||||
nng_socket_set_int(m_Sock, NNG_OPT_RECVBUF, bufSize);
|
||||
nng_socket_set_int(m_Sock, NNG_OPT_SENDBUF, bufSize);
|
||||
|
||||
rv = bind(fd, m_Ip.GetCPointer(), m_Ip.GetSize());
|
||||
if (rv < 0)
|
||||
{
|
||||
close(fd);
|
||||
perror("Open bind");
|
||||
return true;
|
||||
std::stringstream url;
|
||||
if (address.find("ipc://") == 0) {
|
||||
url << address;
|
||||
} else if (address.find("/") == 0 || address.find("./") == 0 || address.find("../") == 0) {
|
||||
url << "ipc://" << address;
|
||||
} else {
|
||||
url << "tcp://" << address << ":" << port;
|
||||
}
|
||||
|
||||
rv = listen(fd, 3);
|
||||
if (rv < 0)
|
||||
if ((rv = nng_listen(m_Sock, url.str().c_str(), nullptr, 0)) != 0)
|
||||
{
|
||||
perror("Open listen");
|
||||
close(fd);
|
||||
Close();
|
||||
std::cerr << "nng_listen failed: " << nng_strerror(rv) << " URL: " << url.str() << std::endl;
|
||||
return true;
|
||||
}
|
||||
|
||||
std::string wmod;
|
||||
for (const char c : m_Modules)
|
||||
{
|
||||
if (GetFD(c) < 0)
|
||||
wmod.append(1, c);
|
||||
}
|
||||
|
||||
std::cout << "Waiting at " << m_Ip << " for transcoder connection";
|
||||
if (wmod.size() > 1)
|
||||
{
|
||||
std::cout << "s for modules ";
|
||||
}
|
||||
else
|
||||
{
|
||||
std::cout << " for module ";
|
||||
}
|
||||
std::cout << wmod << "..." << std::endl;
|
||||
|
||||
while (AnyAreClosed())
|
||||
{
|
||||
if (acceptone(fd))
|
||||
{
|
||||
close(fd);
|
||||
Close();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
close(fd);
|
||||
m_Running = true;
|
||||
m_Connected = true;
|
||||
m_Thread = std::thread([this] { Dispatcher(); });
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool CTCServer::acceptone(int fd)
|
||||
bool CTCServer::Receive(char module, STCPacket *packet, int ms)
|
||||
{
|
||||
CIp their_addr; // connector's address information
|
||||
|
||||
socklen_t sin_size = sizeof(struct sockaddr_storage);
|
||||
|
||||
auto newfd = accept(fd, their_addr.GetPointer(), &sin_size);
|
||||
if (newfd < 0)
|
||||
{
|
||||
perror("Accept accept");
|
||||
return true;
|
||||
}
|
||||
|
||||
char mod;
|
||||
int rv = recv(newfd, &mod, 1, MSG_WAITALL); // block to get the identification byte
|
||||
if (rv != 1)
|
||||
{
|
||||
if (rv < 0)
|
||||
perror("Accept recv");
|
||||
else
|
||||
std::cerr << "recv got no identification byte!" << std::endl;
|
||||
close(newfd);
|
||||
return true;
|
||||
}
|
||||
|
||||
const auto pos = m_Modules.find(mod);
|
||||
if (std::string::npos == pos)
|
||||
{
|
||||
std::cerr << "New connection for module '" << mod << "', but it's not configured!" << std::endl;
|
||||
std::cerr << "The transcoded modules need to be configured identically for both urfd and tcd." << std::endl;
|
||||
close(newfd);
|
||||
return true;
|
||||
}
|
||||
|
||||
std::cout << "File descriptor " << newfd << " opened TCP port for module '" << mod << "' on " << their_addr << std::endl;
|
||||
|
||||
m_Pfd[pos].fd = newfd;
|
||||
auto it = m_Queues.find(module);
|
||||
if (it == m_Queues.end()) return false;
|
||||
|
||||
return false;
|
||||
return it->second->Pop(*packet, ms);
|
||||
}
|
||||
|
||||
bool CTCClient::Open(const std::string &address, const std::string &modules, uint16_t port)
|
||||
bool CTCServer::AnyAreClosed() const
|
||||
{
|
||||
m_Address.assign(address);
|
||||
m_Modules.assign(modules);
|
||||
m_Port = port;
|
||||
|
||||
m_Pfd.resize(m_Modules.size());
|
||||
for (auto &pf : m_Pfd)
|
||||
{
|
||||
pf.fd = -1;
|
||||
pf.events = POLLIN;
|
||||
}
|
||||
|
||||
std::cout << "Connecting to the TCP server..." << std::endl;
|
||||
// If the dispatcher is running, we assume open.
|
||||
// NNG handles reconnections.
|
||||
return !m_Running;
|
||||
}
|
||||
|
||||
for (char c : modules)
|
||||
{
|
||||
if (Connect(c))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
bool CTCServer::Accept()
|
||||
{
|
||||
// No manual accept needed with NNG
|
||||
return false;
|
||||
}
|
||||
|
||||
bool CTCClient::Connect(char module)
|
||||
|
||||
// ---------------- CLIENT ----------------
|
||||
|
||||
bool CTCClient::Open(const std::string &address, const std::string &modules, uint16_t port)
|
||||
{
|
||||
const auto pos = m_Modules.find(module);
|
||||
if (pos == std::string::npos)
|
||||
{
|
||||
std::cerr << "CTCClient::Connect: could not find module '" << module << "' in configured modules!" << std::endl;
|
||||
return true;
|
||||
}
|
||||
CIp ip(m_Address.c_str(), AF_UNSPEC, SOCK_STREAM, m_Port);
|
||||
m_Modules = modules;
|
||||
m_ClientQueue = std::make_shared<CTCPacketQueue>();
|
||||
|
||||
auto fd = socket(ip.GetFamily(), SOCK_STREAM, 0);
|
||||
if (fd < 0)
|
||||
int rv;
|
||||
if ((rv = nng_pair1_open(&m_Sock)) != 0)
|
||||
{
|
||||
std::cerr << "Could not open socket for module '" << module << "'" << std::endl;
|
||||
perror("TC client socket");
|
||||
std::cerr << "nng_pair1_open failed: " << nng_strerror(rv) << std::endl;
|
||||
return true;
|
||||
}
|
||||
|
||||
int yes = 1;
|
||||
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)))
|
||||
{
|
||||
std::cerr << "Moudule " << module << " error:";
|
||||
perror("setsockopt");
|
||||
close(fd);
|
||||
return true;
|
||||
}
|
||||
// Set receive timeout for dispatcher
|
||||
nng_duration timeout = 100;
|
||||
nng_socket_set_ms(m_Sock, NNG_OPT_RECVTIMEO, timeout);
|
||||
|
||||
unsigned count = 0;
|
||||
while (connect(fd, ip.GetCPointer(), ip.GetSize()))
|
||||
{
|
||||
if (ECONNREFUSED == errno)
|
||||
{
|
||||
if (0 == ++count % 100) std::cout << "Connection refused! Restart the reflector." << std::endl;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
else
|
||||
{
|
||||
std::cerr << "Module " << module << " error: ";
|
||||
perror("connect");
|
||||
close(fd);
|
||||
return true;
|
||||
}
|
||||
std::stringstream url;
|
||||
if (address.find("ipc://") == 0) {
|
||||
url << address;
|
||||
} else if (address.find("/") == 0 || address.find("./") == 0 || address.find("../") == 0) {
|
||||
url << "ipc://" << address;
|
||||
} else {
|
||||
url << "tcp://" << address << ":" << port;
|
||||
}
|
||||
|
||||
int sent = send(fd, &module, 1, 0); // send the identification byte
|
||||
if (sent < 0)
|
||||
// Client dials asynchronously so it can retry in background
|
||||
if ((rv = nng_dial(m_Sock, url.str().c_str(), nullptr, NNG_FLAG_NONBLOCK)) != 0)
|
||||
{
|
||||
std::cerr << "Error sending ID byte to module '" << module << "':" << std::endl;
|
||||
perror("send");
|
||||
close(fd);
|
||||
std::cerr << "nng_dial failed: " << nng_strerror(rv) << " URL: " << url.str() << std::endl;
|
||||
return true;
|
||||
}
|
||||
else if (0 == sent)
|
||||
{
|
||||
std::cerr << "Could not set ID byte to module '" << module << "'" << std::endl;
|
||||
close(fd);
|
||||
return true;
|
||||
}
|
||||
|
||||
std::cout << "File descriptor " << fd << " on " << ip << " opened for module '" << module << "'" << std::endl;
|
||||
|
||||
m_Pfd[pos].fd = fd;
|
||||
m_Running = true;
|
||||
m_Connected = true;
|
||||
m_Thread = std::thread([this] { Dispatcher(); });
|
||||
|
||||
// Give it a moment to connect? Not strictly necessary.
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void CTCClient::ReConnect() // and sometimes ping
|
||||
void CTCClient::Receive(std::queue<std::unique_ptr<STCPacket>> &queue, int ms)
|
||||
{
|
||||
static std::chrono::system_clock::time_point start = std::chrono::system_clock::now();
|
||||
auto now = std::chrono::system_clock::now();
|
||||
std::chrono::duration<double> secs = now - start;
|
||||
|
||||
for (char m : m_Modules)
|
||||
{
|
||||
if (0 > GetFD(m))
|
||||
{
|
||||
std::cout << "Reconnecting module " << m << "..." << std::endl;
|
||||
if (Connect(m))
|
||||
{
|
||||
raise(SIGINT);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(secs.count() > 5.0)
|
||||
{
|
||||
STCPacket ping;
|
||||
ping.codec_in = ECodecType::ping;
|
||||
Send(&ping);
|
||||
start = now;
|
||||
}
|
||||
// Wait up to ms for the first packet
|
||||
STCPacket p;
|
||||
if (m_ClientQueue->Pop(p, ms))
|
||||
{
|
||||
queue.push(std::make_unique<STCPacket>(p));
|
||||
// Drain the rest without waiting
|
||||
while (m_ClientQueue->Pop(p, 0))
|
||||
{
|
||||
queue.push(std::make_unique<STCPacket>(p));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void CTCClient::Receive(std::queue<std::unique_ptr<STCPacket>> &queue, int ms)
|
||||
void CTCClient::ReConnect()
|
||||
{
|
||||
for (auto &pfd : m_Pfd)
|
||||
pfd.revents = 0;
|
||||
|
||||
auto rv = poll(m_Pfd.data(), m_Pfd.size(), ms);
|
||||
|
||||
if (rv < 0)
|
||||
{
|
||||
perror("Receive poll");
|
||||
return;
|
||||
}
|
||||
|
||||
if (0 == rv)
|
||||
return;
|
||||
|
||||
for (auto &pfd : m_Pfd)
|
||||
{
|
||||
if (pfd.fd < 0)
|
||||
continue;
|
||||
|
||||
if (pfd.revents & POLLIN)
|
||||
{
|
||||
auto p_tcpack = std::make_unique<STCPacket>();
|
||||
if (receive(pfd.fd, p_tcpack.get()))
|
||||
{
|
||||
p_tcpack.reset();
|
||||
Close(pfd.fd);
|
||||
}
|
||||
else
|
||||
{
|
||||
queue.push(std::move(p_tcpack));
|
||||
}
|
||||
}
|
||||
// NNG handles reconnection automatically
|
||||
}
|
||||
|
||||
if (pfd.revents & POLLERR || pfd.revents & POLLHUP)
|
||||
{
|
||||
std::cerr << "IO ERROR on Receive module " << GetMod(pfd.fd) << std::endl;
|
||||
Close(pfd.fd);
|
||||
}
|
||||
if (pfd.revents & POLLNVAL)
|
||||
{
|
||||
std::cerr << "POLLNVAL received on fd " << pfd.fd << ", resetting to -1" << std::endl;
|
||||
pfd.fd = -1;
|
||||
}
|
||||
}
|
||||
std::string CTCSocket::GetAndClearStats()
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_StatsMutex);
|
||||
if (m_PacketCounts.empty()) return "";
|
||||
|
||||
std::stringstream ss;
|
||||
bool first = true;
|
||||
for (const auto& kv : m_PacketCounts)
|
||||
{
|
||||
if (!first) ss << ", ";
|
||||
ss << kv.first << ": " << kv.second;
|
||||
first = false;
|
||||
}
|
||||
m_PacketCounts.clear();
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
@ -0,0 +1,42 @@
|
||||
#include "AudioRecorder.h"
|
||||
#include <iostream>
|
||||
#include <vector>
|
||||
#include <cmath>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
|
||||
int main() {
|
||||
CAudioRecorder recorder;
|
||||
std::string filename = recorder.Start(".");
|
||||
std::cout << "Recording started: " << filename << std::endl;
|
||||
|
||||
if (filename.empty()) {
|
||||
std::cerr << "Failed to start recording" << std::endl;
|
||||
return 1;
|
||||
}
|
||||
|
||||
// Generate 5 seconds of 440Hz sine wave
|
||||
std::vector<int16_t> samples;
|
||||
int sampleRate = 8000;
|
||||
int duration = 5;
|
||||
double frequency = 440.0;
|
||||
int totalSamples = sampleRate * duration;
|
||||
|
||||
for (int i = 0; i < totalSamples; ++i) {
|
||||
double time = (double)i / sampleRate;
|
||||
int16_t sample = (int16_t)(32000.0 * std::sin(2.0 * M_PI * frequency * time));
|
||||
samples.push_back(sample);
|
||||
}
|
||||
|
||||
// Write in chunks
|
||||
int chunkSize = 160;
|
||||
for (int i = 0; i < totalSamples; i += chunkSize) {
|
||||
recorder.Write(samples.data() + i, chunkSize);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(20)); // Simulate real-time
|
||||
}
|
||||
|
||||
recorder.Stop();
|
||||
std::cout << "Recording stopped." << std::endl;
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -0,0 +1,118 @@
|
||||
/*
|
||||
* Copyright (c) 2024 by Thomas A. Early N7TAE
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation; either version 2 of the License, or
|
||||
* (at your option) any later version.
|
||||
*/
|
||||
|
||||
#include "DMRScanner.h"
|
||||
#include <iostream>
|
||||
#include <cassert>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
// Simple test helper
|
||||
#define ASSERT(cond, msg) \
|
||||
if (!(cond)) { \
|
||||
std::cerr << "FAILED: " << msg << " (" << #cond << ")" << std::endl; \
|
||||
return 1; \
|
||||
} else { \
|
||||
std::cout << "PASS: " << msg << std::endl; \
|
||||
}
|
||||
|
||||
int main()
|
||||
{
|
||||
std::cout << "Running DMRScanner Tests..." << std::endl;
|
||||
|
||||
// Test 1: Single Mode Logic
|
||||
{
|
||||
std::cout << "\n--- Test 1: Single Mode ---" << std::endl;
|
||||
CDMRScanner scanner;
|
||||
scanner.Configure(true, 600, 5); // Single mode, 10m timeout, 5s hold
|
||||
|
||||
scanner.AddSubscription(4001, 1, 600);
|
||||
ASSERT(scanner.IsSubscribed(4001), "TG 4001 should be subscribed");
|
||||
|
||||
scanner.AddSubscription(4002, 1, 600);
|
||||
ASSERT(scanner.IsSubscribed(4002), "TG 4002 should be subscribed");
|
||||
ASSERT(!scanner.IsSubscribed(4001), "TG 4001 should be removed in single mode");
|
||||
}
|
||||
|
||||
// Test 2: Multi Mode Logic
|
||||
{
|
||||
std::cout << "\n--- Test 2: Multi Mode ---" << std::endl;
|
||||
CDMRScanner scanner;
|
||||
scanner.Configure(false, 600, 5); // Multi mode
|
||||
|
||||
scanner.AddSubscription(4001, 1, 600);
|
||||
scanner.AddSubscription(4002, 1, 600);
|
||||
ASSERT(scanner.IsSubscribed(4001), "TG 4001 should remain");
|
||||
ASSERT(scanner.IsSubscribed(4002), "TG 4002 should remain");
|
||||
}
|
||||
|
||||
// Test 3: Scanner Hold Logic
|
||||
{
|
||||
std::cout << "\n--- Test 3: Scanner Hold ---" << std::endl;
|
||||
CDMRScanner scanner;
|
||||
scanner.Configure(false, 600, 2); // 2s hold for testing
|
||||
|
||||
scanner.AddSubscription(4001, 1, 600);
|
||||
scanner.AddSubscription(4002, 1, 600);
|
||||
|
||||
// TG 4001 speaks
|
||||
ASSERT(scanner.CheckAccess(4001), "TG 4001 should be allowed");
|
||||
|
||||
// Immediately TG 4002 tries
|
||||
ASSERT(!scanner.CheckAccess(4002), "TG 4002 should be blocked by hold");
|
||||
|
||||
// Use same TG -> Should refresh hold
|
||||
ASSERT(scanner.CheckAccess(4001), "TG 4001 should still be allowed");
|
||||
|
||||
// Wait exit hold
|
||||
std::cout << "Waiting for hold timer (2s)..." << std::endl;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(2100)); // Sleep just over 2s due to precision
|
||||
|
||||
// Now TG 4002 should work
|
||||
ASSERT(scanner.CheckAccess(4002), "TG 4002 should be allowed after hold");
|
||||
ASSERT(!scanner.CheckAccess(4001), "TG 4001 should now be blocked by new hold");
|
||||
}
|
||||
|
||||
// Test 4: Options Parsing
|
||||
{
|
||||
std::cout << "\n--- Test 4: Options Parsing ---" << std::endl;
|
||||
CDMRScanner scanner;
|
||||
scanner.Configure(false, 600, 5);
|
||||
|
||||
std::string opts = "TS1=101,102;TS2=201;AUTO=300";
|
||||
scanner.UpdateSubscriptions(opts);
|
||||
|
||||
ASSERT(scanner.IsSubscribed(101), "Options TS1-101");
|
||||
ASSERT(scanner.IsSubscribed(102), "Options TS1-102");
|
||||
ASSERT(scanner.IsSubscribed(201), "Options TS2-201");
|
||||
|
||||
// Check timeout (inspect via logic/expiry?)
|
||||
// We can't easily inspect private member, but we can verify it expires.
|
||||
// Let's create a short timeout option test
|
||||
scanner.UpdateSubscriptions("TS1=999;AUTO=1");
|
||||
ASSERT(scanner.IsSubscribed(999), "TG 999 subscribed");
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(2100)); // Sleep > 2s for time_t resolution
|
||||
ASSERT(!scanner.IsSubscribed(999), "TG 999 should expire after 1s");
|
||||
}
|
||||
|
||||
// Test 5: Unsubscribe (4000)
|
||||
{
|
||||
std::cout << "\n--- Test 5: Unsubscribe 4000 ---" << std::endl;
|
||||
CDMRScanner scanner;
|
||||
scanner.Configure(false, 600, 5);
|
||||
scanner.AddSubscription(4001, 1, 600);
|
||||
|
||||
// Send 4000 on TS1
|
||||
scanner.AddSubscription(4000, 1, 0);
|
||||
ASSERT(!scanner.IsSubscribed(4001), "TG 4001 should be cleared by 4000");
|
||||
}
|
||||
|
||||
std::cout << "\nAll Tests Passed!" << std::endl;
|
||||
return 0;
|
||||
}
|
||||
@ -0,0 +1,307 @@
|
||||
/**
|
||||
* @file
|
||||
*
|
||||
* uuidv7.h - Single-file C/C++ UUIDv7 Library
|
||||
*
|
||||
* @version v0.1.6
|
||||
* @author LiosK
|
||||
* @copyright Licensed under the Apache License, Version 2.0
|
||||
* @see https://github.com/LiosK/uuidv7-h
|
||||
*/
|
||||
/*
|
||||
* Copyright 2022 LiosK
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#ifndef UUIDV7_H_BAEDKYFQ
|
||||
#define UUIDV7_H_BAEDKYFQ
|
||||
|
||||
#include <stddef.h>
|
||||
#include <stdint.h>
|
||||
|
||||
/**
|
||||
* @name Status codes returned by uuidv7_generate()
|
||||
*
|
||||
* @{
|
||||
*/
|
||||
|
||||
/**
|
||||
* Indicates that the `unix_ts_ms` passed was used because no preceding UUID was
|
||||
* specified.
|
||||
*/
|
||||
#define UUIDV7_STATUS_UNPRECEDENTED (0)
|
||||
|
||||
/**
|
||||
* Indicates that the `unix_ts_ms` passed was used because it was greater than
|
||||
* the previous one.
|
||||
*/
|
||||
#define UUIDV7_STATUS_NEW_TIMESTAMP (1)
|
||||
|
||||
/**
|
||||
* Indicates that the counter was incremented because the `unix_ts_ms` passed
|
||||
* was no greater than the previous one.
|
||||
*/
|
||||
#define UUIDV7_STATUS_COUNTER_INC (2)
|
||||
|
||||
/**
|
||||
* Indicates that the previous `unix_ts_ms` was incremented because the counter
|
||||
* reached its maximum value.
|
||||
*/
|
||||
#define UUIDV7_STATUS_TIMESTAMP_INC (3)
|
||||
|
||||
/**
|
||||
* Indicates that the monotonic order of generated UUIDs was broken because the
|
||||
* `unix_ts_ms` passed was less than the previous one by more than ten seconds.
|
||||
*/
|
||||
#define UUIDV7_STATUS_CLOCK_ROLLBACK (4)
|
||||
|
||||
/** Indicates that an invalid `unix_ts_ms` is passed. */
|
||||
#define UUIDV7_STATUS_ERR_TIMESTAMP (-1)
|
||||
|
||||
/**
|
||||
* Indicates that the attempt to increment the previous `unix_ts_ms` failed
|
||||
* because it had reached its maximum value.
|
||||
*/
|
||||
#define UUIDV7_STATUS_ERR_TIMESTAMP_OVERFLOW (-2)
|
||||
|
||||
/** @} */
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @name Low-level primitives
|
||||
*
|
||||
* @{
|
||||
*/
|
||||
|
||||
/**
|
||||
* Generates a new UUIDv7 from the given Unix time, random bytes, and previous
|
||||
* UUID.
|
||||
*
|
||||
* @param uuid_out 16-byte byte array where the generated UUID is stored.
|
||||
* @param unix_ts_ms Current Unix time in milliseconds.
|
||||
* @param rand_bytes At least 10-byte byte array filled with random bytes. This
|
||||
* function consumes the leading 4 bytes or the whole 10
|
||||
* bytes per call depending on the conditions.
|
||||
* `uuidv7_status_n_rand_consumed()` maps the return value of
|
||||
* this function to the number of random bytes consumed.
|
||||
* @param uuid_prev 16-byte byte array representing the immediately preceding
|
||||
* UUID, from which the previous timestamp and counter are
|
||||
* extracted. This may be NULL if the caller does not care
|
||||
* the ascending order of UUIDs within the same timestamp.
|
||||
* This may point to the same location as `uuid_out`; this
|
||||
* function reads the value before writing.
|
||||
* @return One of the `UUIDV7_STATUS_*` codes that describe the
|
||||
* characteristics of generated UUIDs. Callers can usually
|
||||
* ignore the status unless they need to guarantee the
|
||||
* monotonic order of UUIDs or fine-tune the generation
|
||||
* process.
|
||||
*/
|
||||
static inline int8_t uuidv7_generate(uint8_t *uuid_out, uint64_t unix_ts_ms,
|
||||
const uint8_t *rand_bytes,
|
||||
const uint8_t *uuid_prev) {
|
||||
static const uint64_t MAX_TIMESTAMP = ((uint64_t)1 << 48) - 1;
|
||||
static const uint64_t MAX_COUNTER = ((uint64_t)1 << 42) - 1;
|
||||
|
||||
if (unix_ts_ms > MAX_TIMESTAMP) {
|
||||
return UUIDV7_STATUS_ERR_TIMESTAMP;
|
||||
}
|
||||
|
||||
int8_t status;
|
||||
uint64_t timestamp = 0;
|
||||
if (uuid_prev == NULL) {
|
||||
status = UUIDV7_STATUS_UNPRECEDENTED;
|
||||
timestamp = unix_ts_ms;
|
||||
} else {
|
||||
for (int i = 0; i < 6; i++) {
|
||||
timestamp = (timestamp << 8) | uuid_prev[i];
|
||||
}
|
||||
|
||||
if (unix_ts_ms > timestamp) {
|
||||
status = UUIDV7_STATUS_NEW_TIMESTAMP;
|
||||
timestamp = unix_ts_ms;
|
||||
} else if (unix_ts_ms + 10000 < timestamp) {
|
||||
// ignore prev if clock moves back by more than ten seconds
|
||||
status = UUIDV7_STATUS_CLOCK_ROLLBACK;
|
||||
timestamp = unix_ts_ms;
|
||||
} else {
|
||||
// increment prev counter
|
||||
uint64_t counter = uuid_prev[6] & 0x0f; // skip ver
|
||||
counter = (counter << 8) | uuid_prev[7];
|
||||
counter = (counter << 6) | (uuid_prev[8] & 0x3f); // skip var
|
||||
counter = (counter << 8) | uuid_prev[9];
|
||||
counter = (counter << 8) | uuid_prev[10];
|
||||
counter = (counter << 8) | uuid_prev[11];
|
||||
|
||||
if (counter++ < MAX_COUNTER) {
|
||||
status = UUIDV7_STATUS_COUNTER_INC;
|
||||
uuid_out[6] = counter >> 38; // ver + bits 0-3
|
||||
uuid_out[7] = counter >> 30; // bits 4-11
|
||||
uuid_out[8] = counter >> 24; // var + bits 12-17
|
||||
uuid_out[9] = counter >> 16; // bits 18-25
|
||||
uuid_out[10] = counter >> 8; // bits 26-33
|
||||
uuid_out[11] = counter; // bits 34-41
|
||||
} else {
|
||||
// increment prev timestamp at counter overflow
|
||||
status = UUIDV7_STATUS_TIMESTAMP_INC;
|
||||
timestamp++;
|
||||
if (timestamp > MAX_TIMESTAMP) {
|
||||
return UUIDV7_STATUS_ERR_TIMESTAMP_OVERFLOW;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
uuid_out[0] = timestamp >> 40;
|
||||
uuid_out[1] = timestamp >> 32;
|
||||
uuid_out[2] = timestamp >> 24;
|
||||
uuid_out[3] = timestamp >> 16;
|
||||
uuid_out[4] = timestamp >> 8;
|
||||
uuid_out[5] = timestamp;
|
||||
|
||||
for (int i = (status == UUIDV7_STATUS_COUNTER_INC) ? 12 : 6; i < 16; i++) {
|
||||
uuid_out[i] = *rand_bytes++;
|
||||
}
|
||||
|
||||
uuid_out[6] = 0x70 | (uuid_out[6] & 0x0f); // set ver
|
||||
uuid_out[8] = 0x80 | (uuid_out[8] & 0x3f); // set var
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines the number of random bytes consumsed by `uuidv7_generate()` from
|
||||
* the `UUIDV7_STATUS_*` code returned.
|
||||
*
|
||||
* @param status `UUIDV7_STATUS_*` code returned by `uuidv7_generate()`.
|
||||
* @return `4` if `status` is `UUIDV7_STATUS_COUNTER_INC` or `10`
|
||||
* otherwise.
|
||||
*/
|
||||
static inline int uuidv7_status_n_rand_consumed(int8_t status) {
|
||||
return status == UUIDV7_STATUS_COUNTER_INC ? 4 : 10;
|
||||
}
|
||||
|
||||
/**
|
||||
* Encodes a UUID in the 8-4-4-4-12 hexadecimal string representation.
|
||||
*
|
||||
* @param uuid 16-byte byte array representing the UUID to encode.
|
||||
* @param string_out Character array where the encoded string is stored. Its
|
||||
* length must be 37 (36 digits + NUL) or longer.
|
||||
*/
|
||||
static inline void uuidv7_to_string(const uint8_t *uuid, char *string_out) {
|
||||
static const char DIGITS[] = "0123456789abcdef";
|
||||
for (int i = 0; i < 16; i++) {
|
||||
uint_fast8_t e = uuid[i];
|
||||
*string_out++ = DIGITS[e >> 4];
|
||||
*string_out++ = DIGITS[e & 15];
|
||||
if (i == 3 || i == 5 || i == 7 || i == 9) {
|
||||
*string_out++ = '-';
|
||||
}
|
||||
}
|
||||
*string_out = '\0';
|
||||
}
|
||||
|
||||
/**
|
||||
* Decodes the 8-4-4-4-12 hexadecimal string representation of a UUID.
|
||||
*
|
||||
* @param string 37-byte (36 digits + NUL) character array representing the
|
||||
* 8-4-4-4-12 hexadecimal string representation.
|
||||
* @param uuid_out 16-byte byte array where the decoded UUID is stored.
|
||||
* @return Zero on success or non-zero integer on failure.
|
||||
*/
|
||||
static inline int uuidv7_from_string(const char *string, uint8_t *uuid_out) {
|
||||
for (int i = 0; i < 32; i++) {
|
||||
char c = *string++;
|
||||
// clang-format off
|
||||
uint8_t x = c == '0' ? 0 : c == '1' ? 1 : c == '2' ? 2 : c == '3' ? 3
|
||||
: c == '4' ? 4 : c == '5' ? 5 : c == '6' ? 6 : c == '7' ? 7
|
||||
: c == '8' ? 8 : c == '9' ? 9 : c == 'a' ? 10 : c == 'b' ? 11
|
||||
: c == 'c' ? 12 : c == 'd' ? 13 : c == 'e' ? 14 : c == 'f' ? 15
|
||||
: c == 'A' ? 10 : c == 'B' ? 11 : c == 'C' ? 12 : c == 'D' ? 13
|
||||
: c == 'E' ? 14 : c == 'F' ? 15 : 0xff;
|
||||
// clang-format on
|
||||
if (x == 0xff) {
|
||||
return -1; // invalid digit
|
||||
}
|
||||
|
||||
if ((i & 1) == 0) {
|
||||
uuid_out[i >> 1] = x << 4; // even i => hi 4 bits
|
||||
} else {
|
||||
uuid_out[i >> 1] |= x; // odd i => lo 4 bits
|
||||
}
|
||||
|
||||
if ((i == 7 || i == 11 || i == 15 || i == 19) && (*string++ != '-')) {
|
||||
return -1; // invalid format
|
||||
}
|
||||
}
|
||||
if (*string != '\0') {
|
||||
return -1; // invalid length
|
||||
}
|
||||
return 0; // success
|
||||
}
|
||||
|
||||
/** @} */
|
||||
|
||||
/**
|
||||
* @name High-level APIs that require platform integration
|
||||
*
|
||||
* @{
|
||||
*/
|
||||
|
||||
/**
|
||||
* Generates a new UUIDv7 with the current Unix time.
|
||||
*
|
||||
* This declaration defines the interface to generate a new UUIDv7 with the
|
||||
* current time, default random number generator, and global shared state
|
||||
* holding the previously generated UUID. Since this single-file library does
|
||||
* not provide platform-specific implementations, users need to prepare a
|
||||
* concrete implementation (if necessary) by integrating a real-time clock,
|
||||
* cryptographically strong random number generator, and shared state storage
|
||||
* available in the target platform.
|
||||
*
|
||||
* @param uuid_out 16-byte byte array where the generated UUID is stored.
|
||||
* @return One of the `UUIDV7_STATUS_*` codes that describe the
|
||||
* characteristics of generated UUIDs or an
|
||||
* implementation-dependent code. Callers can usually ignore
|
||||
* the `UUIDV7_STATUS_*` code unless they need to guarantee the
|
||||
* monotonic order of UUIDs or fine-tune the generation
|
||||
* process. The implementation-dependent code must be out of
|
||||
* the range of `int8_t` and negative if it reports an error.
|
||||
*/
|
||||
int uuidv7_new(uint8_t *uuid_out);
|
||||
|
||||
/**
|
||||
* Generates an 8-4-4-4-12 hexadecimal string representation of new UUIDv7.
|
||||
*
|
||||
* @param string_out Character array where the encoded string is stored. Its
|
||||
* length must be 37 (36 digits + NUL) or longer.
|
||||
* @return Return value of `uuidv7_new()`.
|
||||
* @note Provide a concrete `uuidv7_new()` implementation to enable
|
||||
* this function.
|
||||
*/
|
||||
static inline int uuidv7_new_string(char *string_out) {
|
||||
uint8_t uuid[16];
|
||||
int result = uuidv7_new(uuid);
|
||||
uuidv7_to_string(uuid, string_out);
|
||||
return result;
|
||||
}
|
||||
|
||||
/** @} */
|
||||
|
||||
#ifdef __cplusplus
|
||||
} /* extern "C" { */
|
||||
#endif
|
||||
|
||||
#endif /* #ifndef UUIDV7_H_BAEDKYFQ */
|
||||
Loading…
Reference in new issue