g2_ircddb now uses std::async

pull/1/head
Tom Early 9 years ago
parent 3721812fdd
commit fd1a13d850

@ -46,9 +46,10 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <netdb.h> #include <netdb.h>
#include <regex.h> #include <regex.h>
#include <pthread.h>
#include <atomic> #include <atomic>
#include <future>
#include <exception>
#include <string> #include <string>
#include <map> #include <map>
#include <libconfig.h++> #include <libconfig.h++>
@ -105,10 +106,8 @@ static int g2_sock = -1;
static unsigned char readBuffer2[2000]; /* 56 or 27, max is 56 */ static unsigned char readBuffer2[2000]; /* 56 or 27, max is 56 */
static struct sockaddr_in fromDst4; static struct sockaddr_in fromDst4;
/* // Incoming data from remote systems
Incoming data from remote systems // must be fed into our local repeater modules.
must be fed into our local repeater modules.
*/
static struct { static struct {
/* help with header re-generation */ /* help with header re-generation */
unsigned char saved_hdr[58]; /* repeater format */ unsigned char saved_hdr[58]; /* repeater format */
@ -165,7 +164,6 @@ static struct {
int num_dv_frames; int num_dv_frames;
int num_dv_silent_frames; int num_dv_silent_frames;
int num_bit_errors; int num_bit_errors;
} band_txt[3]; /* 0=A, 1=B, 2=C */ } band_txt[3]; /* 0=A, 1=B, 2=C */
/* Used to validate MYCALL input */ /* Used to validate MYCALL input */
@ -182,15 +180,15 @@ static pthread_mutex_t irc_data_mutex = PTHREAD_MUTEX_INITIALIZER;
static int open_port(const PORTIP &pip); static int open_port(const PORTIP &pip);
static void calcPFCS(unsigned char *packet, int len); static void calcPFCS(unsigned char *packet, int len);
static void *get_irc_data(void *arg); static void GetIRCDataThread();
static int get_yrcall_rptr_from_cache(char *call, char *arearp_cs, char *zonerp_cs, char *mod, char *ip, char RoU); static int get_yrcall_rptr_from_cache(char *call, char *arearp_cs, char *zonerp_cs, char *mod, char *ip, char RoU);
static bool get_yrcall_rptr(char *call, char *arearp_cs, char *zonerp_cs, char *mod, char *ip, char RoU); static bool get_yrcall_rptr(char *call, char *arearp_cs, char *zonerp_cs, char *mod, char *ip, char RoU);
static int read_config(char *); static int read_config(char *);
static void runit(); static void runit();
static void sigCatch(int signum); static void sigCatch(int signum);
static void *echotest(void *arg); static void PlayFileThread(char *file);
static void compute_aprs_hash(); static void compute_aprs_hash();
static void *send_aprs_beacon(void *arg); static void APRSBeaconThread();
/* aprs functions, borrowed from my retired IRLP node 4201 */ /* aprs functions, borrowed from my retired IRLP node 4201 */
static void gps_send(short int rptr_idx); static void gps_send(short int rptr_idx);
@ -521,7 +519,7 @@ static int open_port(const PORTIP &pip)
} }
/* receive data from the irc server and save it */ /* receive data from the irc server and save it */
static void *get_irc_data(void *arg) static void GetIRCDataThread()
{ {
struct timespec req; struct timespec req;
@ -537,32 +535,27 @@ static void *get_irc_data(void *arg)
short THRESHOLD_MAX = 100; short THRESHOLD_MAX = 100;
short last_status = 0; short last_status = 0;
arg = arg;
act.sa_handler = sigCatch; act.sa_handler = sigCatch;
sigemptyset(&act.sa_mask); sigemptyset(&act.sa_mask);
act.sa_flags = SA_RESTART; act.sa_flags = SA_RESTART;
if (sigaction(SIGTERM, &act, 0) != 0) { if (sigaction(SIGTERM, &act, 0) != 0) {
traceit("sigaction-TERM failed, error=%d\n", errno); traceit("GetIRCDataThread: sigaction-TERM failed, error=%d\n", errno);
traceit("get_irc_data thread exiting...\n");
keep_running = false; keep_running = false;
pthread_exit(NULL); return;
} }
if (sigaction(SIGINT, &act, 0) != 0) { if (sigaction(SIGINT, &act, 0) != 0) {
traceit("sigaction-INT failed, error=%d\n", errno); traceit("GetIRCDataThread: sigaction-INT failed, error=%d\n", errno);
traceit("get_irc_data thread exiting...\n");
keep_running = false; keep_running = false;
pthread_exit(NULL); return;
} }
if (sigaction(SIGPIPE, &act, 0) != 0) { if (sigaction(SIGPIPE, &act, 0) != 0) {
traceit("sigaction-PIPE failed, error=%d\n", errno); traceit("GetIRCDataThread: sigaction-PIPE failed, error=%d\n", errno);
traceit("get_irc_data thread exiting...\n");
keep_running = false; keep_running = false;
pthread_exit(NULL); return;
} }
while (keep_running) { while (keep_running) {
threshold ++; threshold++;
if (threshold >= THRESHOLD_MAX) { if (threshold >= THRESHOLD_MAX) {
rc = ii->getConnectionState(); rc = ii->getConnectionState();
if ((rc == 0) || (rc == 10)) { if ((rc == 0) || (rc == 10)) {
@ -643,8 +636,8 @@ static void *get_irc_data(void *arg)
req.tv_nsec = 500000000; // 500 milli req.tv_nsec = 500000000; // 500 milli
nanosleep(&req, NULL); nanosleep(&req, NULL);
} }
traceit("get_irc_data thread exiting...\n"); traceit("GetIRCDataThread exiting...\n");
pthread_exit(NULL); return;
} }
/* return codes: 0=OK(found it), 1=TRY AGAIN, 2=FAILED(bad data) */ /* return codes: 0=OK(found it), 1=TRY AGAIN, 2=FAILED(bad data) */
@ -764,16 +757,6 @@ static void sigCatch(int signum)
return; return;
} }
static int CreateThread(pthread_t *thread, void *(*start_sub)(void *), void *arg = (void *)0)
{
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
int rc = pthread_create(thread, &attr, start_sub, arg);
pthread_attr_destroy(&attr);
return rc;
}
/* run the main loop for g2_ircddb */ /* run the main loop for g2_ircddb */
static void runit() static void runit()
{ {
@ -800,7 +783,7 @@ static void runit()
long num_recs = 0L; long num_recs = 0L;
short int rec_len = 56; short int rec_len = 56;
pthread_t aprs_beacon_thread, echo_thread, irc_data_thread; std::future<void> aprs_future, irc_data_future;
/* START: TEXT crap */ /* START: TEXT crap */
bool new_group[3] = { true, true, true }; bool new_group[3] = { true, true, true };
@ -826,16 +809,22 @@ static void runit()
/* start the beacon thread */ /* start the beacon thread */
if (bool_send_aprs) { if (bool_send_aprs) {
if (CreateThread(&aprs_beacon_thread, send_aprs_beacon)) try {
traceit("failed to start the aprs beacon thread\n"); aprs_future = std::async(std::launch::async, APRSBeaconThread);
else } catch (const std::exception &e) {
traceit("Failed to start the APRSBeaconThread. Exception: %s\n", e.what());
}
if (aprs_future.valid())
traceit("APRS beacon thread started\n"); traceit("APRS beacon thread started\n");
} }
if (CreateThread(&irc_data_thread, get_irc_data)) { try {
traceit("failed to start the get_irc_data thread\n"); irc_data_future = std::async(std::launch::async, GetIRCDataThread);
} catch (const std::exception &e) {
traceit("Failed to start GetIRCDataThread. Exception: %s\n", e.what());
keep_running = false; keep_running = false;
} else }
if (keep_running)
traceit("get_irc_data thread started\n"); traceit("get_irc_data thread started\n");
ii->kickWatchdog(IRCDDB_VERSION); ii->kickWatchdog(IRCDDB_VERSION);
@ -856,8 +845,10 @@ static void runit()
// traceit("Closed echotest audio file:[%s]\n", recd[i].file); // traceit("Closed echotest audio file:[%s]\n", recd[i].file);
/* START: echotest thread setup */ /* START: echotest thread setup */
if (CreateThread(&echo_thread, echotest, (void *)recd[i].file)) { try {
traceit("failed to start echotest thread\n"); std::async(std::launch::async, PlayFileThread, recd[i].file);
} catch (const std::exception &e) {
traceit("Failed to start echotest thread. Exception: %s\n", e.what());
// when the echotest thread runs, it deletes the file, // when the echotest thread runs, it deletes the file,
// Because the echotest thread did NOT start, we delete the file here // Because the echotest thread did NOT start, we delete the file here
unlink(recd[i].file); unlink(recd[i].file);
@ -1523,8 +1514,11 @@ static void runit()
if (i >= 0) { if (i >= 0) {
/* voicemail file is closed */ /* voicemail file is closed */
if ((vm[i].fd == -1) && (vm[i].file[0] != '\0')) { if ((vm[i].fd == -1) && (vm[i].file[0] != '\0')) {
if (CreateThread(&echo_thread, echotest, (void *)vm[i].file)) try {
traceit("failed to start playing back voicemail\n"); std::async(std::launch::async, PlayFileThread, vm[i].file);
} catch (const std::exception &e) {
traceit("Filed to start voicemail playback. Exception: %s\n", e.what());
}
} else } else
traceit("No voicemail to recall or still recording\n"); traceit("No voicemail to recall or still recording\n");
} }
@ -2182,12 +2176,12 @@ static void runit()
// traceit("Closed echotest audio file:[%s]\n", recd[i].file); // traceit("Closed echotest audio file:[%s]\n", recd[i].file);
/* we are in echotest mode, so play it back */ /* we are in echotest mode, so play it back */
if (CreateThread(&echo_thread, echotest, (void *)recd[i].file)) { try {
traceit("failed to start echotest thread\n"); std::async(std::launch::async, PlayFileThread, recd[i].file);
/* } catch (const std::exception &e) {
When the echotest thread runs, it deletes the file, traceit("failed to start PlayFileThread. Exception: %s\n", e.what());
Because the echotest thread did NOT start, we delete the file here // When the echotest thread runs, it deletes the file,
*/ // Because the echotest thread did NOT start, we delete the file here
unlink(recd[i].file); unlink(recd[i].file);
} }
} }
@ -2263,6 +2257,14 @@ static void runit()
FD_CLR (srv_sock,&fdset); FD_CLR (srv_sock,&fdset);
} }
} }
// thread clean-up
if (bool_send_aprs) {
if (aprs_future.valid())
aprs_future.get();
}
irc_data_future.get();
return;
} }
static void compute_aprs_hash() static void compute_aprs_hash()
@ -2294,7 +2296,7 @@ static void compute_aprs_hash()
return; return;
} }
void *send_aprs_beacon(void *arg) void APRSBeaconThread()
{ {
struct timespec req; struct timespec req;
@ -2324,25 +2326,20 @@ void *send_aprs_beacon(void *arg)
*/ */
short THRESHOLD_COUNTDOWN = 15; short THRESHOLD_COUNTDOWN = 15;
arg = arg;
act.sa_handler = sigCatch; act.sa_handler = sigCatch;
sigemptyset(&act.sa_mask); sigemptyset(&act.sa_mask);
act.sa_flags = SA_RESTART; act.sa_flags = SA_RESTART;
if (sigaction(SIGTERM, &act, 0) != 0) { if (sigaction(SIGTERM, &act, 0) != 0) {
traceit("sigaction-TERM failed, error=%d\n", errno); traceit("APRSBeaconThread: sigaction-TERM failed, error=%d\n", errno);
traceit("beacon thread exiting...\n"); return;
pthread_exit(NULL);
} }
if (sigaction(SIGINT, &act, 0) != 0) { if (sigaction(SIGINT, &act, 0) != 0) {
traceit("sigaction-INT failed, error=%d\n", errno); traceit("APRSBeaconThread: sigaction-INT failed, error=%d\n", errno);
traceit("beacon thread exiting...\n"); return;
pthread_exit(NULL);
} }
if (sigaction(SIGPIPE, &act, 0) != 0) { if (sigaction(SIGPIPE, &act, 0) != 0) {
traceit("sigaction-PIPE failed, error=%d\n", errno); traceit("APRSBeaconThread: sigaction-PIPE failed, error=%d\n", errno);
traceit("beacon thread exiting...\n"); return;
pthread_exit(NULL);
} }
time(&last_keepalive_time); time(&last_keepalive_time);
@ -2497,17 +2494,14 @@ void *send_aprs_beacon(void *arg)
} }
} }
traceit("APRS beacon thread exiting...\n"); traceit("APRS beacon thread exiting...\n");
pthread_exit(NULL); return;
} }
static void *echotest(void *arg) static void PlayFileThread(char *file)
{ {
char *file = (char *)arg;
struct timespec req; struct timespec req;
FILE *fp = NULL;
unsigned short rlen = 0; unsigned short rlen = 0;
size_t nread = 0;
unsigned char dstar_buf[56]; unsigned char dstar_buf[56];
unsigned char rptr_buf[58]; unsigned char rptr_buf[58];
short int i = 0; short int i = 0;
@ -2518,39 +2512,36 @@ static void *echotest(void *arg)
act.sa_flags = SA_RESTART; act.sa_flags = SA_RESTART;
if (sigaction(SIGTERM, &act, 0) != 0) { if (sigaction(SIGTERM, &act, 0) != 0) {
traceit("sigaction-TERM failed, error=%d\n", errno); traceit("sigaction-TERM failed, error=%d\n", errno);
traceit("echotest thread exiting...\n"); return;
pthread_exit(NULL);
} }
if (sigaction(SIGINT, &act, 0) != 0) { if (sigaction(SIGINT, &act, 0) != 0) {
traceit("sigaction-INT failed, error=%d\n", errno); traceit("sigaction-INT failed, error=%d\n", errno);
traceit("echotest thread exiting...\n"); return;
pthread_exit(NULL);
} }
if (sigaction(SIGPIPE, &act, 0) != 0) { if (sigaction(SIGPIPE, &act, 0) != 0) {
traceit("sigaction-PIPE failed, error=%d\n", errno); traceit("sigaction-PIPE failed, error=%d\n", errno);
traceit("echotest thread exiting...\n"); return;
pthread_exit(NULL);
} }
traceit("File to playback:[%s]\n", file); traceit("File to playback:[%s]\n", file);
fp = fopen(file, "rb"); FILE *fp = fopen(file, "rb");
if (!fp) { if (!fp) {
traceit("Failed to open file %s\n", file); traceit("Failed to open file %s\n", file);
pthread_exit(NULL); return;
} }
nread = fread(dstar_buf, 10, 1, fp); size_t nread = fread(dstar_buf, 10, 1, fp);
if (nread != 1) { if (nread != 1) {
traceit("Cant read first 10 bytes in %s\n", file); traceit("Cant read first 10 bytes in %s\n", file);
fclose(fp); fclose(fp);
pthread_exit(NULL); return;
} }
if (memcmp(dstar_buf, "DVTOOL", 6) != 0) { if (memcmp(dstar_buf, "DVTOOL", 6) != 0) {
traceit("DVTOOL keyword not found in %s\n", file); traceit("DVTOOL keyword not found in %s\n", file);
fclose(fp); fclose(fp);
pthread_exit(NULL); return;
} }
sleep(play_wait); sleep(play_wait);
@ -2623,7 +2614,7 @@ static void *echotest(void *arg)
if (!strstr(file, "voicemail.dat")) if (!strstr(file, "voicemail.dat"))
unlink(file); unlink(file);
traceit("Finished playing\n"); traceit("Finished playing\n");
pthread_exit(NULL); return;
} }
static void qrgs_and_maps() static void qrgs_and_maps()

Loading…
Cancel
Save

Powered by TurnKey Linux.