Multi thread

This commit is contained in:
jun7 2018-06-05 05:54:27 +09:00
parent a6485d7dfa
commit 1bf0baaa4b
2 changed files with 70 additions and 23 deletions

4
ab.c
View File

@ -164,8 +164,11 @@ static void init()
g_free(path); g_free(path);
} }
static GMutex datam;
static char *datafunc(char *req) static char *datafunc(char *req)
{ {
g_mutex_lock(&datam);
if (initt) if (initt)
{ {
g_thread_join(initt); g_thread_join(initt);
@ -187,6 +190,7 @@ static char *datafunc(char *req)
D(BLOCKED %s, req) D(BLOCKED %s, req)
#endif #endif
g_mutex_unlock(&datam);
return ret; return ret;
} }

View File

@ -96,6 +96,9 @@ static char *preparepp(char *exe, char *name)
static bool ipcsend(char *exe, char *name, static bool ipcsend(char *exe, char *name,
Com type, char *caller, char *data) Com type, char *caller, char *data)
{ {
static GMutex sendm;
g_mutex_lock(&sendm);
//D(ipcsend exe:%s name:%s, exe, name) //D(ipcsend exe:%s name:%s, exe, name)
char *path = preparepp(exe, name); char *path = preparepp(exe, name);
@ -111,6 +114,8 @@ static bool ipcsend(char *exe, char *name,
close(pp); close(pp);
g_free(path); g_free(path);
g_mutex_unlock(&sendm);
return ret; return ret;
} }
@ -136,21 +141,30 @@ static char *svrexe = NULL;
static GMainLoop *sloop = NULL; static GMainLoop *sloop = NULL;
static wyebdataf dataf = NULL; static wyebdataf dataf = NULL;
static GHashTable *orders = NULL; static GHashTable *orders = NULL;
static GMutex ordersm;
static gboolean quit(gpointer p) static gboolean quitif(gpointer p)
{ {
g_mutex_lock(&ordersm);
if (!g_hash_table_size(orders))
{
DD(SVR QUITS\n) DD(SVR QUITS\n)
g_main_loop_quit(sloop); g_main_loop_quit(sloop);
return false; }
g_mutex_unlock(&ordersm);
return true;
} }
static void until(int sec) static void until(int sec)
{ {
if (!sloop) return; if (!sloop) return;
static guint last = 0; static guint last = 0;
static GMutex lastm;
g_mutex_lock(&lastm);
if (last) if (last)
g_source_remove(last); g_source_remove(last);
last = g_timeout_add_full(G_PRIORITY_LOW * 2, sec * 1000, quit, NULL, NULL); last = g_timeout_add_full(G_PRIORITY_LOW * 2, sec * 1000, quitif, NULL, NULL);
g_mutex_unlock(&lastm);
} }
static gpointer pingt(gpointer p) static gpointer pingt(gpointer p)
{ {
@ -174,12 +188,6 @@ void wyebwatch(char *exe, char *caller, wyebdataf func)
fatal(1); fatal(1);
} }
static gboolean quitif(gpointer p)
{
static int cnt = 0;
if (cnt++ > 30 /* 3 secs */ || !g_hash_table_size(orders)) quit(NULL);
return true;
}
static void monitorcb(GFileMonitor *m, GFile *f, GFile *o, GFileMonitorEvent e, static void monitorcb(GFileMonitor *m, GFile *f, GFile *o, GFileMonitorEvent e,
gpointer p) gpointer p)
{ {
@ -215,13 +223,36 @@ bool wyebsvr(int argc, char **argv, wyebdataf func)
return true; return true;
} }
static void getdata(char *caller, char *req)
{
char *data = dataf(req);
if (*caller && !ipcsend(svrexe, caller, CCret, "", data))
fatal(2);
typedef struct {
char *caller;
char *req;
int until;
} Dataargs;
static gpointer getdata(gpointer p)
{
Dataargs *args = p;
g_mutex_lock(&ordersm);
g_hash_table_add(orders, args->caller);
g_mutex_unlock(&ordersm);
char *data = dataf(args->req);
if (*args->caller && !ipcsend(svrexe, args->caller, CCret, "", data)) fatal(2);
g_free(data); g_free(data);
until(args->until);
g_mutex_lock(&ordersm);
g_hash_table_remove(orders, args->caller);
g_mutex_unlock(&ordersm);
g_free(args->caller);
g_free(args->req);
g_free(args);
return NULL;
} }
@ -310,6 +341,7 @@ static void reuntil(char *exe)
if (!lastsec) return; if (!lastsec) return;
int sec = GPOINTER_TO_INT( int sec = GPOINTER_TO_INT(
g_hash_table_lookup(lastsec, exe)); g_hash_table_lookup(lastsec, exe));
if (sec) if (sec)
wyebuntil(exe, sec); wyebuntil(exe, sec);
} }
@ -453,7 +485,7 @@ static gboolean tcinputcb(GIOChannel *ch, GIOCondition c, char *exe)
if (g_str_has_prefix(line, "l")) if (g_str_has_prefix(line, "l"))
{ {
start = g_get_monotonic_time(); start = g_get_monotonic_time();
for (int i = 0; i < 1000; i++) for (int i = 0; i < 10000; i++)
{ {
char *is = g_strdup_printf("l%d", i); char *is = g_strdup_printf("l%d", i);
//g_print("loop %d ret %s\n", i, wyebreq(exe, is)); //g_print("loop %d ret %s\n", i, wyebreq(exe, is));
@ -482,7 +514,12 @@ static gboolean tcinputcb(GIOChannel *ch, GIOCondition c, char *exe)
static gboolean tcinit(char *exe) static gboolean tcinit(char *exe)
{ {
//wyebuntil(exe, 1); //wyebuntil(exe, 1);
//wyebloop(exe, 2, 1);
#if DEBUG
wyebloop(exe, 300, 200);
#else
wyebloop(exe, 2, 1); wyebloop(exe, 2, 1);
#endif
GIOChannel *io = g_io_channel_unix_new(fileno(stdin)); GIOChannel *io = g_io_channel_unix_new(fileno(stdin));
g_io_add_watch(io, G_IO_IN | G_IO_HUP, (GIOFunc)tcinputcb, exe); g_io_add_watch(io, G_IO_IN | G_IO_HUP, (GIOFunc)tcinputcb, exe);
@ -527,14 +564,19 @@ gboolean ipccb(GIOChannel *ch, GIOCondition c, gpointer p)
until(lastuntil = atoi(arg)); until(lastuntil = atoi(arg));
break; break;
case CSdata: case CSdata:
g_hash_table_add(orders, id); {
getdata(id, arg); Dataargs *args = g_new(Dataargs, 1);
g_hash_table_remove(orders, id); args->caller = g_strdup(id);
until(lastuntil); args->req = g_strdup(arg);
args->until = lastuntil;
g_thread_unref(g_thread_new("getdata", getdata, args));
break; break;
}
case CSping: case CSping:
g_mutex_lock(&ordersm);
if (!g_hash_table_lookup(orders, id)) if (!g_hash_table_lookup(orders, id))
ipcsend(svrexe, id, CClost, NULL, NULL); ipcsend(svrexe, id, CClost, NULL, NULL);
g_mutex_unlock(&ordersm);
break; break;
//client //client
@ -542,7 +584,8 @@ gboolean ipccb(GIOChannel *ch, GIOCondition c, gpointer p)
retdata = g_strdup(arg); retdata = g_strdup(arg);
case CClost: case CClost:
case CCwoke: case CCwoke:
//g_main_loop_quit(cloop); //for the case pinging at same time of ret
g_mutex_trylock(&retm);
g_mutex_unlock(&retm); g_mutex_unlock(&retm);
break; break;
} }
@ -561,7 +604,7 @@ static char *testdata(char *req)
//g_free(req); //makes crash //g_free(req); //makes crash
static int i = 0; static int i = 0;
return g_strdup_printf("%d th dummy data. req is %s", ++i, req); return g_strdup_printf("%d th test data. req is %s", ++i, req);
} }