From d8dccc05b7bb330750ac6ac6da5c3a05f4bdfdc6 Mon Sep 17 00:00:00 2001 From: jun7 Date: Tue, 5 Jun 2018 18:34:05 +0900 Subject: [PATCH] Multi thread client --- ab.c | 16 +-- wyebrun.c | 417 ++++++++++++++++++++++++++++-------------------------- wyebrun.h | 12 +- 3 files changed, 234 insertions(+), 211 deletions(-) diff --git a/ab.c b/ab.c index d346b5e..041dea7 100644 --- a/ab.c +++ b/ab.c @@ -37,7 +37,7 @@ static bool first = true; static bool check(const char *requri, const char *pageuri) { char *uris = g_strconcat(requri, " ", pageuri, NULL); - char *ruri = wyebreq(EXE, uris); + char *ruri = wyebget(EXE, uris); g_free(uris); if (ruri && !*ruri) return false; @@ -62,10 +62,10 @@ static gboolean reqcb(WebKitWebPage *kp, WebKitURIRequest *req, return true; } -static gboolean untilcb(WebKitWebPage *kp) +static gboolean keepcb(WebKitWebPage *kp) { if (g_object_get_data(G_OBJECT(kp), "adblock") != (gpointer)'n') - wyebuntil(EXE, 30); + wyebkeep(EXE, 30); return true; } @@ -79,9 +79,9 @@ static void pageinit(WebKitWebExtension *ex, WebKitWebPage *kp) g_object_set_data(G_OBJECT(kp), "wyebcheck", check); - untilcb(kp); + keepcb(kp); g_object_weak_ref(G_OBJECT(kp), (GWeakNotify)g_source_remove, - GUINT_TO_POINTER(g_timeout_add(11 * 1000, (GSourceFunc)untilcb, kp))); + GUINT_TO_POINTER(g_timeout_add(11 * 1000, (GSourceFunc)keepcb, kp))); } G_MODULE_EXPORT void webkit_web_extension_initialize_with_user_data( @@ -164,9 +164,9 @@ static void init() g_free(path); } -static GMutex datam; static char *datafunc(char *req) { + static GMutex datam; g_mutex_lock(&datam); if (initt) @@ -219,8 +219,8 @@ int main(int argc, char **argv) } else { - wyebuntil(argv[0], 30); - g_print("%s", wyebreq(argv[0], argv[1])); + wyebkeep(argv[0], 30); + g_print("%s", wyebget(argv[0], argv[1])); } exit(0); diff --git a/wyebrun.c b/wyebrun.c index 602c811..95d193f 100644 --- a/wyebrun.c +++ b/wyebrun.c @@ -31,11 +31,11 @@ along with wyebrun. If not, see . #include "wyebrun.h" #define ROOTNAME "wyebrun" +#define CLIDIR "clients" #define PREFIX WYEBPREFIX +#define KEEPS WYEBKEEPSEC #define INPUT "wyebinput" #define PING "wyebping" - -#define DUNTIL WYEBDUNTIL #define DPINGTIME 1000 #define P(f, ...) g_print(#f"\n", __VA_ARGS__); @@ -96,8 +96,8 @@ static char *preparepp(char *exe, char *name) static bool ipcsend(char *exe, char *name, Com type, char *caller, char *data) { - static GMutex sendm; - g_mutex_lock(&sendm); + static GMutex m; + g_mutex_lock(&m); //D(ipcsend exe:%s name:%s, exe, name) char *path = preparepp(exe, name); @@ -115,19 +115,20 @@ static bool ipcsend(char *exe, char *name, g_free(path); - g_mutex_unlock(&sendm); + g_mutex_unlock(&m); return ret; } static gboolean ipccb(GIOChannel *ch, GIOCondition c, gpointer p); -static GSource *ipcwatch(char *exe, char *name, GMainContext *ctx) { +static GSource *ipcwatch(char *exe, char *name, GMainContext *ctx, gpointer p) +{ char *path = preparepp(exe, name); GIOChannel *io = g_io_channel_new_file(path, "r+", NULL); GSource *watch = g_io_create_watch(io, G_IO_IN); g_io_channel_unref(io); - g_source_set_callback(watch, (GSourceFunc)ipccb, NULL, NULL); + g_source_set_callback(watch, (GSourceFunc)ipccb, p, NULL); g_source_attach(watch, ctx); g_free(path); @@ -159,17 +160,17 @@ static void until(int sec) if (!sloop) return; static guint last = 0; - static GMutex lastm; - g_mutex_lock(&lastm); + static GMutex m; + g_mutex_lock(&m); if (last) g_source_remove(last); last = g_timeout_add_full(G_PRIORITY_LOW * 2, sec * 1000, quitif, NULL, NULL); - g_mutex_unlock(&lastm); + g_mutex_unlock(&m); } static gpointer pingt(gpointer p) { GMainContext *ctx = g_main_context_new(); - ipcwatch(svrexe, PING, ctx); + ipcwatch(svrexe, PING, ctx, NULL); g_main_loop_run(g_main_loop_new(ctx, true)); return NULL; } @@ -179,12 +180,12 @@ void wyebwatch(char *exe, char *caller, wyebdataf func) dataf = func; orders = g_hash_table_new(g_str_hash, g_str_equal); - until(DUNTIL); + until(KEEPS); - g_thread_new("ping", pingt, NULL); - ipcwatch(exe, INPUT, g_main_context_default()); + g_thread_unref(g_thread_new("ping", pingt, NULL)); + ipcwatch(exe, INPUT, g_main_context_default(), NULL); - if (!ipcsend(exe, caller, CCwoke, "", NULL)) + if (!ipcsend(CLIDIR, caller, CCwoke, "", NULL)) fatal(1); } @@ -226,10 +227,9 @@ bool wyebsvr(int argc, char **argv, wyebdataf func) typedef struct { char *caller; - char *req; - int until; + char *data; } Dataargs; -static gpointer getdata(gpointer p) +static void getdata(gpointer p, gpointer ap) { Dataargs *args = p; @@ -237,127 +237,147 @@ static gpointer getdata(gpointer p) g_hash_table_add(orders, args->caller); g_mutex_unlock(&ordersm); - char *data = dataf(args->req); - if (*args->caller && !ipcsend(svrexe, args->caller, CCret, "", data)) fatal(2); + char *data = dataf(args->data); + if (*args->caller && !ipcsend(CLIDIR, args->caller, CCret, "", data)) fatal(2); g_free(data); - until(args->until); - g_mutex_lock(&ordersm); g_hash_table_remove(orders, args->caller); g_mutex_unlock(&ordersm); - g_free(args->caller); - g_free(args->req); + g_free(args->data); g_free(args); - - return NULL; } //@client -static GMutex retm; -static GMainLoop *cloop; -static GMainContext *wctx = NULL; -static char *retdata = NULL; -static char *pid() +typedef struct { + GMutex retm; + GMainContext *wctx; + GMainLoop *loop; + char *pid; + char *retdata; + char *pppath; + char *exe; //do not free. this is tmp +} Client; +static Client *makecli() { - static char *pid = NULL; - if (!pid) pid = g_strdup_printf(PREFIX"%d", getpid()); - return pid; + Client *cli = g_new0(Client, 1); + g_mutex_init(&cli->retm); + cli->pid = g_strdup_printf(PREFIX"%d-%d", + getpid(), GPOINTER_TO_INT(g_thread_self())); + return cli; } - -static void spawnsvr(char *exe) +static void freecli(Client *cli) { - char **argv = g_new0(char*, 2); - argv[0] = exe; - argv[1] = pid(); - GError *err = NULL; - if (!g_spawn_async(NULL, argv, NULL, G_SPAWN_SEARCH_PATH, - NULL, NULL, NULL, &err)) + g_mutex_clear(&cli->retm); + g_main_loop_quit(cli->loop); + g_free(cli->pid); + g_free(cli->retdata); + remove(cli->pppath); + g_free(cli->pppath); +} +static Client *getcli() +{ + static GMutex m; + g_mutex_lock(&m); + static GPrivate pc = G_PRIVATE_INIT((GDestroyNotify)freecli); + Client *cli = g_private_get(&pc); + if (!cli) { - g_print("err %s", err->message); - g_error_free(err); + cli = makecli(); + g_private_set(&pc, cli); } - - g_free(argv); -} -static gboolean pingloop(gpointer p) -{ - if (!ipcsend((char *)p, PING, CSping, pid(), NULL)) - g_mutex_unlock(&retm); - - return true; -} -static char *pppath = NULL; -static void removepp() -{ - remove(pppath); -} -static gpointer watcht(char *exe) -{ - wctx = g_main_context_new(); - cloop = g_main_loop_new(wctx, true); - - GSource *watch = ipcwatch(exe, pid(), wctx); - - if (!pppath) - { - pppath = ipcpath(exe, pid()); - atexit(removepp); - } - - g_mutex_unlock(&retm); - g_main_loop_run(cloop); - - g_source_unref(watch); - g_main_context_unref(wctx); - g_main_loop_unref(cloop); - cloop = NULL; - - return NULL; -} -static void watchstart(char *exe) -{ - g_mutex_lock(&retm); - - g_thread_new("watch", (GThreadFunc)watcht, exe); - - g_mutex_lock(&retm); - g_mutex_unlock(&retm); -} -static gboolean timeout(gpointer p) -{ - g_mutex_unlock(&retm); - return false; + g_mutex_unlock(&m); + return cli; } static GHashTable *lastsec = NULL; -static void reuntil(char *exe) +static GMutex lastm; +static void setsec(char *exe, int sec) { - if (!lastsec) return; - int sec = GPOINTER_TO_INT( - g_hash_table_lookup(lastsec, exe)); + g_mutex_lock(&lastm); + if (!lastsec) lastsec = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free); + g_hash_table_replace(lastsec, g_strdup(exe), g_strdup_printf("%d", sec)); + g_mutex_unlock(&lastm); +} +static char *keepstr(char *exe) +{ + g_mutex_lock(&lastm); + char *ret = g_hash_table_lookup(lastsec, exe); + g_mutex_unlock(&lastm); - if (sec) - wyebuntil(exe, sec); +#define Z(v) #v + return ret ?: Z(KEEPS); +#undef Z +} + +static gboolean pingloop(Client *cli) +{ + if (!ipcsend(cli->exe, PING, CSping, cli->pid, keepstr(cli->exe))) + g_mutex_unlock(&cli->retm); + + return true; +} +static void removepp() +{ + remove(getcli()->pppath); +} +static gpointer waitt(Client *cli) +{ + if (cli->wctx) return NULL; + + cli->wctx = g_main_context_new(); + cli->loop = g_main_loop_new(cli->wctx, true); + GSource *watch = ipcwatch(CLIDIR, cli->pid, cli->wctx, cli); + + cli->pppath = ipcpath(CLIDIR, cli->pid); + static bool reged = false; + if (!reged) atexit(removepp); + reged = true; + + g_mutex_unlock(&cli->retm); + g_main_loop_run(cli->loop); + + g_source_unref(watch); + g_main_loop_unref(cli->loop); + g_main_context_unref(cli->wctx); + + return NULL; +} +static void makewaitt(Client *cli) +{ + g_mutex_lock(&cli->retm); + + g_thread_unref(g_thread_new("wait", (GThreadFunc)waitt, cli)); + + g_mutex_lock(&cli->retm); + g_mutex_unlock(&cli->retm); +} +static gboolean timeoutcb(Client *cli) +{ + g_mutex_unlock(&cli->retm); + return false; } //don't free -static char *request(char *exe, Com type, char *caller, char *req) +static char *request(char *exe, Com type, bool caller, char *data) { - g_free(retdata); - retdata = NULL; + Client *cli = getcli(); - if (!cloop) watchstart(exe); + if (!cli->wctx) makewaitt(cli); if (caller) - g_mutex_lock(&retm); + { + g_mutex_lock(&cli->retm); + g_free(cli->retdata); + cli->retdata = NULL; + } - if (!ipcsend(exe, INPUT, type, caller, req)) + if (!ipcsend(exe, INPUT, type, caller ? cli->pid : NULL, data)) { //svr is not running char *path = ipcpath(exe, "lock"); if (!g_file_test(path, G_FILE_TEST_EXISTS)) @@ -368,28 +388,41 @@ static char *request(char *exe, Com type, char *caller, char *req) flock(lock, LOCK_EX); //retry in single proc - if (!ipcsend(exe, INPUT, type, caller, req)) + if (!ipcsend(exe, INPUT, type, caller ? cli->pid : NULL, data)) { if (!caller) - g_mutex_lock(&retm); + g_mutex_lock(&cli->retm); - GSource *tout = g_timeout_source_new(DUNTIL * 1000); - g_source_set_callback(tout, timeout, NULL, NULL); - g_source_attach(tout, wctx); + GSource *tout = g_timeout_source_new(KEEPS * 1000); + g_source_set_callback(tout, (GSourceFunc)timeoutcb, cli, NULL); + g_source_attach(tout, cli->wctx); - spawnsvr(exe); - g_mutex_lock(&retm); - g_mutex_unlock(&retm); + + char **argv = g_new0(char*, 3); + argv[0] = exe; + argv[1] = cli->pid; + GError *err = NULL; + if (!g_spawn_async(NULL, argv, NULL, G_SPAWN_SEARCH_PATH, + NULL, NULL, NULL, &err)) + { + g_print("err %s", err->message); + g_error_free(err); + } + g_free(argv); + + + g_mutex_lock(&cli->retm); + g_mutex_unlock(&cli->retm); g_source_destroy(tout); g_source_unref(tout); //wyebloop doesn't know svr quits - reuntil(exe); + wyebkeep(exe, 0); if (caller) - g_mutex_lock(&retm); - if (!ipcsend(exe, INPUT, type, caller, req)) + g_mutex_lock(&cli->retm); + if (!ipcsend(exe, INPUT, type, caller ? cli->pid : NULL, data)) fatal(3); //spawning svr failse is fatal } close(lock); @@ -398,76 +431,61 @@ static char *request(char *exe, Com type, char *caller, char *req) if (caller) { GSource *ping = g_timeout_source_new(DPINGTIME); - g_source_set_callback(ping, (GSourceFunc)pingloop, exe, NULL); - g_source_attach(ping, wctx); //attach to ping thread + cli->exe = exe; + g_source_set_callback(ping, (GSourceFunc)pingloop, cli, NULL); + g_source_attach(ping, cli->wctx); //attach to ping thread - g_mutex_lock(&retm); - g_mutex_unlock(&retm); + g_mutex_lock(&cli->retm); + g_mutex_unlock(&cli->retm); g_source_destroy(ping); g_source_unref(ping); } - return retdata; + return cli->retdata; } -char *wyebreq(char *exe, char *req) +char *wyebget(char *exe, char *data) { - return request(exe, CSdata, pid(), req); + return request(exe, CSdata, true, data); } -void wyebsend(char *exe, char *req) +void wyebsend(char *exe, char *data) { - request(exe, CSdata, NULL, req); + request(exe, CSdata, false, data); } -typedef struct { - char *exe; - int sec; -} wyebsst; -static void wsfree(wyebsst *ss) +static gboolean keepcb(char *exe) { - g_free(ss->exe); - g_free(ss); -} -static gboolean untilcb(wyebsst *ss) -{ - if (!lastsec) - lastsec = g_hash_table_new(g_str_hash, g_str_equal); - - g_hash_table_replace(lastsec, ss->exe, GINT_TO_POINTER(ss->sec)); - - char *str = g_strdup_printf("%d", ss->sec); - request(ss->exe, CSuntil, NULL, str); - g_free(str); - + request(exe, CSuntil, false, keepstr(exe)); return false; } -void wyebuntil(char *exe, int sec) +void wyebkeep(char *exe, int sec) { - wyebsst *ss = g_new(wyebsst, 1); - ss->exe = g_strdup(exe); - ss->sec = sec; - g_idle_add_full(G_PRIORITY_DEFAULT, (GSourceFunc)untilcb, - ss, (GDestroyNotify)wsfree); + if (sec) setsec(exe, sec); + g_idle_add_full(G_PRIORITY_DEFAULT, (GSourceFunc)keepcb, + g_strdup(exe), g_free); } -static gboolean loopcb(wyebsst *ss) +static gboolean loopcb(char *exe) { - untilcb(ss); + keepcb(exe); return true; } -guint wyebloop(char *exe, int sec, int loopsec) +guint wyebloop(char *exe, int sec) { - wyebsst *ss = g_new(wyebsst, 1); - ss->exe = g_strdup(exe); - ss->sec = sec; - - loopcb(ss); - return g_timeout_add_full(G_PRIORITY_DEFAULT, - loopsec * 1000, - (GSourceFunc)loopcb, - ss, (GDestroyNotify)wsfree); + if (sec) setsec(exe, sec); + loopcb(exe); + return g_timeout_add_full(G_PRIORITY_DEFAULT, sec * 300, + (GSourceFunc)loopcb, g_strdup(exe), g_free); } + +#if DEBUG +static void testget(gpointer p, gpointer ap) +{ + D(send %s ret %s, (char *)p, wyebget(ap, p)) + g_free(p); +} +#endif static gboolean tcinputcb(GIOChannel *ch, GIOCondition c, char *exe) { char *line; @@ -484,28 +502,31 @@ static gboolean tcinputcb(GIOChannel *ch, GIOCondition c, char *exe) #if DEBUG if (g_str_has_prefix(line, "l")) { + GThreadPool *pool = g_thread_pool_new(testget, exe, 4, false, NULL); + start = g_get_monotonic_time(); - for (int i = 0; i < 10000; i++) + for (int i = 0; i < 1000; i++) { char *is = g_strdup_printf("l%d", i); - //g_print("loop %d ret %s\n", i, wyebreq(exe, is)); + //g_print("loop %d ret %s\n", i, wyebget(exe, is)); if (*(line + 1) == 's') wyebsend(exe, is); else if (*(line + 1) == 'p') D(pint %s, is) else - wyebreq(exe, is); + g_thread_pool_push(pool, g_strdup(is), NULL); +// wyebget(exe, is); g_free(is); } + g_thread_pool_free(pool, false, true); + gint64 now = g_get_monotonic_time(); - char *time = g_strdup_printf("time %f", (now - start) / 1000000.0); - wyebsend(exe, time); - g_free(time); + D(time %f, (now - start) / 1000000.0) } else - g_print("RET is %s\n", wyebreq(exe, line)); + g_print("RET is %s\n", wyebget(exe, line)); #else - g_print("%s\n", wyebreq(exe, line)); //don't free + g_print("%s\n", wyebget(exe, line)); //don't free #endif g_free(line); @@ -513,12 +534,10 @@ static gboolean tcinputcb(GIOChannel *ch, GIOCondition c, char *exe) } static gboolean tcinit(char *exe) { - //wyebuntil(exe, 1); - //wyebloop(exe, 2, 1); #if DEBUG - wyebloop(exe, 300, 200); + wyebloop(exe, 300); #else - wyebloop(exe, 2, 1); + wyebloop(exe, 2); #endif GIOChannel *io = g_io_channel_unix_new(fileno(stdin)); @@ -528,10 +547,9 @@ static gboolean tcinit(char *exe) } void wyebclient(char *exe) { - //pid_t getpid(void); - sloop = g_main_loop_new(NULL, false); + GMainLoop *loop = g_main_loop_new(NULL, false); g_idle_add((GSourceFunc)tcinit, exe); - g_main_loop_run(sloop); + g_main_loop_run(loop); } @@ -549,46 +567,51 @@ gboolean ipccb(GIOChannel *ch, GIOCondition c, gpointer p) Com type = *unesc; char *id = unesc + 1; - char *arg = strchr(unesc, ':'); - *arg++ = '\0'; + char *data = strchr(unesc, ':'); + *data++ = '\0'; #if DEBUG - static int i = 0; - D(%c ipccb%d %c/%s/%s;, svrexe ? 'S':'C', i++, type ,id ,arg) +// static int i = 0; +// D(%c ipccb%d %c/%s/%s;, svrexe ? 'S':'C', i++, type ,id ,data) #endif - static int lastuntil = DUNTIL; switch (type) { //server - case CSuntil: - until(lastuntil = atoi(arg)); - break; case CSdata: { Dataargs *args = g_new(Dataargs, 1); args->caller = g_strdup(id); - args->req = g_strdup(arg); - args->until = lastuntil; - g_thread_unref(g_thread_new("getdata", getdata, args)); + args->data = g_strdup(data); + + static GThreadPool *pool = NULL; + if (!pool) pool = g_thread_pool_new(getdata, NULL, -1, false, NULL); + g_thread_pool_push(pool, args, NULL); break; } case CSping: g_mutex_lock(&ordersm); if (!g_hash_table_lookup(orders, id)) - ipcsend(svrexe, id, CClost, NULL, NULL); + ipcsend(CLIDIR, id, CClost, NULL, NULL); g_mutex_unlock(&ordersm); + case CSuntil: + until(atoi(data)); break; //client case CCret: - retdata = g_strdup(arg); case CClost: case CCwoke: + { + Client *cli = p; + if (type == CCret) + cli->retdata = g_strdup(data); + //for the case pinging at same time of ret - g_mutex_trylock(&retm); - g_mutex_unlock(&retm); + g_mutex_trylock(&cli->retm); + g_mutex_unlock(&cli->retm); break; } + } g_free(unesc); return true; @@ -598,13 +621,13 @@ gboolean ipccb(GIOChannel *ch, GIOCondition c, gpointer p) //test #if DEBUG -static char *testdata(char *req) +static char *testdata(char *data) { //sleep(9); - //g_free(req); //makes crash + //g_free(data); //makes crash static int i = 0; - return g_strdup_printf("%d th test data. req is %s", ++i, req); + return g_strdup_printf("%d th test data. req is %s", ++i, data); } diff --git a/wyebrun.h b/wyebrun.h index 9694981..b6cf0fa 100644 --- a/wyebrun.h +++ b/wyebrun.h @@ -24,21 +24,21 @@ along with wyebrun. If not, see . #include #define WYEBPREFIX "-wyeb" -#define WYEBDUNTIL 3 +#define WYEBKEEPSEC 3 //client //wyebrun spawns the exe if wyebsend failes -char *wyebreq( char *exe, char *req); //don't free the ret val -void wyebsend( char *exe, char *req); -void wyebuntil( char *exe, int sec); //keep alive. default is 3s +char *wyebget( char *exe, char *data); //don't free the ret val +void wyebsend(char *exe, char *data); +void wyebkeep(char *exe, int sec); //keep alive. default is 3s //loop the wyebuntil. to stop, use g_source_remove -guint wyebloop( char *exe, int sec, int loopsec); +guint wyebloop(char *exe, int sec); //send stdin to svr and ret data to stdout //blank and enter exit it void wyebclient(char *exe); //server -typedef char *(*wyebdataf)(char *req); +typedef char *(*wyebdataf)(char *data); //server is spawned with an arg the caller bool wyebsvr(int argc, char **argv, wyebdataf func); //or if there is own GMainLoop