Multi thread client

This commit is contained in:
jun7 2018-06-05 18:34:05 +09:00
parent 1bf0baaa4b
commit d8dccc05b7
3 changed files with 234 additions and 211 deletions

16
ab.c
View File

@ -37,7 +37,7 @@ static bool first = true;
static bool check(const char *requri, const char *pageuri)
{
char *uris = g_strconcat(requri, " ", pageuri, NULL);
char *ruri = wyebreq(EXE, uris);
char *ruri = wyebget(EXE, uris);
g_free(uris);
if (ruri && !*ruri) return false;
@ -62,10 +62,10 @@ static gboolean reqcb(WebKitWebPage *kp, WebKitURIRequest *req,
return true;
}
static gboolean untilcb(WebKitWebPage *kp)
static gboolean keepcb(WebKitWebPage *kp)
{
if (g_object_get_data(G_OBJECT(kp), "adblock") != (gpointer)'n')
wyebuntil(EXE, 30);
wyebkeep(EXE, 30);
return true;
}
@ -79,9 +79,9 @@ static void pageinit(WebKitWebExtension *ex, WebKitWebPage *kp)
g_object_set_data(G_OBJECT(kp), "wyebcheck", check);
untilcb(kp);
keepcb(kp);
g_object_weak_ref(G_OBJECT(kp), (GWeakNotify)g_source_remove,
GUINT_TO_POINTER(g_timeout_add(11 * 1000, (GSourceFunc)untilcb, kp)));
GUINT_TO_POINTER(g_timeout_add(11 * 1000, (GSourceFunc)keepcb, kp)));
}
G_MODULE_EXPORT void webkit_web_extension_initialize_with_user_data(
@ -164,9 +164,9 @@ static void init()
g_free(path);
}
static GMutex datam;
static char *datafunc(char *req)
{
static GMutex datam;
g_mutex_lock(&datam);
if (initt)
@ -219,8 +219,8 @@ int main(int argc, char **argv)
}
else
{
wyebuntil(argv[0], 30);
g_print("%s", wyebreq(argv[0], argv[1]));
wyebkeep(argv[0], 30);
g_print("%s", wyebget(argv[0], argv[1]));
}
exit(0);

417
wyebrun.c
View File

@ -31,11 +31,11 @@ along with wyebrun. If not, see <http://www.gnu.org/licenses/>.
#include "wyebrun.h"
#define ROOTNAME "wyebrun"
#define CLIDIR "clients"
#define PREFIX WYEBPREFIX
#define KEEPS WYEBKEEPSEC
#define INPUT "wyebinput"
#define PING "wyebping"
#define DUNTIL WYEBDUNTIL
#define DPINGTIME 1000
#define P(f, ...) g_print(#f"\n", __VA_ARGS__);
@ -96,8 +96,8 @@ static char *preparepp(char *exe, char *name)
static bool ipcsend(char *exe, char *name,
Com type, char *caller, char *data)
{
static GMutex sendm;
g_mutex_lock(&sendm);
static GMutex m;
g_mutex_lock(&m);
//D(ipcsend exe:%s name:%s, exe, name)
char *path = preparepp(exe, name);
@ -115,19 +115,20 @@ static bool ipcsend(char *exe, char *name,
g_free(path);
g_mutex_unlock(&sendm);
g_mutex_unlock(&m);
return ret;
}
static gboolean ipccb(GIOChannel *ch, GIOCondition c, gpointer p);
static GSource *ipcwatch(char *exe, char *name, GMainContext *ctx) {
static GSource *ipcwatch(char *exe, char *name, GMainContext *ctx, gpointer p)
{
char *path = preparepp(exe, name);
GIOChannel *io = g_io_channel_new_file(path, "r+", NULL);
GSource *watch = g_io_create_watch(io, G_IO_IN);
g_io_channel_unref(io);
g_source_set_callback(watch, (GSourceFunc)ipccb, NULL, NULL);
g_source_set_callback(watch, (GSourceFunc)ipccb, p, NULL);
g_source_attach(watch, ctx);
g_free(path);
@ -159,17 +160,17 @@ static void until(int sec)
if (!sloop) return;
static guint last = 0;
static GMutex lastm;
g_mutex_lock(&lastm);
static GMutex m;
g_mutex_lock(&m);
if (last)
g_source_remove(last);
last = g_timeout_add_full(G_PRIORITY_LOW * 2, sec * 1000, quitif, NULL, NULL);
g_mutex_unlock(&lastm);
g_mutex_unlock(&m);
}
static gpointer pingt(gpointer p)
{
GMainContext *ctx = g_main_context_new();
ipcwatch(svrexe, PING, ctx);
ipcwatch(svrexe, PING, ctx, NULL);
g_main_loop_run(g_main_loop_new(ctx, true));
return NULL;
}
@ -179,12 +180,12 @@ void wyebwatch(char *exe, char *caller, wyebdataf func)
dataf = func;
orders = g_hash_table_new(g_str_hash, g_str_equal);
until(DUNTIL);
until(KEEPS);
g_thread_new("ping", pingt, NULL);
ipcwatch(exe, INPUT, g_main_context_default());
g_thread_unref(g_thread_new("ping", pingt, NULL));
ipcwatch(exe, INPUT, g_main_context_default(), NULL);
if (!ipcsend(exe, caller, CCwoke, "", NULL))
if (!ipcsend(CLIDIR, caller, CCwoke, "", NULL))
fatal(1);
}
@ -226,10 +227,9 @@ bool wyebsvr(int argc, char **argv, wyebdataf func)
typedef struct {
char *caller;
char *req;
int until;
char *data;
} Dataargs;
static gpointer getdata(gpointer p)
static void getdata(gpointer p, gpointer ap)
{
Dataargs *args = p;
@ -237,127 +237,147 @@ static gpointer getdata(gpointer p)
g_hash_table_add(orders, args->caller);
g_mutex_unlock(&ordersm);
char *data = dataf(args->req);
if (*args->caller && !ipcsend(svrexe, args->caller, CCret, "", data)) fatal(2);
char *data = dataf(args->data);
if (*args->caller && !ipcsend(CLIDIR, args->caller, CCret, "", data)) fatal(2);
g_free(data);
until(args->until);
g_mutex_lock(&ordersm);
g_hash_table_remove(orders, args->caller);
g_mutex_unlock(&ordersm);
g_free(args->caller);
g_free(args->req);
g_free(args->data);
g_free(args);
return NULL;
}
//@client
static GMutex retm;
static GMainLoop *cloop;
static GMainContext *wctx = NULL;
static char *retdata = NULL;
static char *pid()
typedef struct {
GMutex retm;
GMainContext *wctx;
GMainLoop *loop;
char *pid;
char *retdata;
char *pppath;
char *exe; //do not free. this is tmp
} Client;
static Client *makecli()
{
static char *pid = NULL;
if (!pid) pid = g_strdup_printf(PREFIX"%d", getpid());
return pid;
Client *cli = g_new0(Client, 1);
g_mutex_init(&cli->retm);
cli->pid = g_strdup_printf(PREFIX"%d-%d",
getpid(), GPOINTER_TO_INT(g_thread_self()));
return cli;
}
static void spawnsvr(char *exe)
static void freecli(Client *cli)
{
char **argv = g_new0(char*, 2);
argv[0] = exe;
argv[1] = pid();
GError *err = NULL;
if (!g_spawn_async(NULL, argv, NULL, G_SPAWN_SEARCH_PATH,
NULL, NULL, NULL, &err))
g_mutex_clear(&cli->retm);
g_main_loop_quit(cli->loop);
g_free(cli->pid);
g_free(cli->retdata);
remove(cli->pppath);
g_free(cli->pppath);
}
static Client *getcli()
{
static GMutex m;
g_mutex_lock(&m);
static GPrivate pc = G_PRIVATE_INIT((GDestroyNotify)freecli);
Client *cli = g_private_get(&pc);
if (!cli)
{
g_print("err %s", err->message);
g_error_free(err);
cli = makecli();
g_private_set(&pc, cli);
}
g_free(argv);
}
static gboolean pingloop(gpointer p)
{
if (!ipcsend((char *)p, PING, CSping, pid(), NULL))
g_mutex_unlock(&retm);
return true;
}
static char *pppath = NULL;
static void removepp()
{
remove(pppath);
}
static gpointer watcht(char *exe)
{
wctx = g_main_context_new();
cloop = g_main_loop_new(wctx, true);
GSource *watch = ipcwatch(exe, pid(), wctx);
if (!pppath)
{
pppath = ipcpath(exe, pid());
atexit(removepp);
}
g_mutex_unlock(&retm);
g_main_loop_run(cloop);
g_source_unref(watch);
g_main_context_unref(wctx);
g_main_loop_unref(cloop);
cloop = NULL;
return NULL;
}
static void watchstart(char *exe)
{
g_mutex_lock(&retm);
g_thread_new("watch", (GThreadFunc)watcht, exe);
g_mutex_lock(&retm);
g_mutex_unlock(&retm);
}
static gboolean timeout(gpointer p)
{
g_mutex_unlock(&retm);
return false;
g_mutex_unlock(&m);
return cli;
}
static GHashTable *lastsec = NULL;
static void reuntil(char *exe)
static GMutex lastm;
static void setsec(char *exe, int sec)
{
if (!lastsec) return;
int sec = GPOINTER_TO_INT(
g_hash_table_lookup(lastsec, exe));
g_mutex_lock(&lastm);
if (!lastsec) lastsec = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
g_hash_table_replace(lastsec, g_strdup(exe), g_strdup_printf("%d", sec));
g_mutex_unlock(&lastm);
}
static char *keepstr(char *exe)
{
g_mutex_lock(&lastm);
char *ret = g_hash_table_lookup(lastsec, exe);
g_mutex_unlock(&lastm);
if (sec)
wyebuntil(exe, sec);
#define Z(v) #v
return ret ?: Z(KEEPS);
#undef Z
}
static gboolean pingloop(Client *cli)
{
if (!ipcsend(cli->exe, PING, CSping, cli->pid, keepstr(cli->exe)))
g_mutex_unlock(&cli->retm);
return true;
}
static void removepp()
{
remove(getcli()->pppath);
}
static gpointer waitt(Client *cli)
{
if (cli->wctx) return NULL;
cli->wctx = g_main_context_new();
cli->loop = g_main_loop_new(cli->wctx, true);
GSource *watch = ipcwatch(CLIDIR, cli->pid, cli->wctx, cli);
cli->pppath = ipcpath(CLIDIR, cli->pid);
static bool reged = false;
if (!reged) atexit(removepp);
reged = true;
g_mutex_unlock(&cli->retm);
g_main_loop_run(cli->loop);
g_source_unref(watch);
g_main_loop_unref(cli->loop);
g_main_context_unref(cli->wctx);
return NULL;
}
static void makewaitt(Client *cli)
{
g_mutex_lock(&cli->retm);
g_thread_unref(g_thread_new("wait", (GThreadFunc)waitt, cli));
g_mutex_lock(&cli->retm);
g_mutex_unlock(&cli->retm);
}
static gboolean timeoutcb(Client *cli)
{
g_mutex_unlock(&cli->retm);
return false;
}
//don't free
static char *request(char *exe, Com type, char *caller, char *req)
static char *request(char *exe, Com type, bool caller, char *data)
{
g_free(retdata);
retdata = NULL;
Client *cli = getcli();
if (!cloop) watchstart(exe);
if (!cli->wctx) makewaitt(cli);
if (caller)
g_mutex_lock(&retm);
{
g_mutex_lock(&cli->retm);
g_free(cli->retdata);
cli->retdata = NULL;
}
if (!ipcsend(exe, INPUT, type, caller, req))
if (!ipcsend(exe, INPUT, type, caller ? cli->pid : NULL, data))
{ //svr is not running
char *path = ipcpath(exe, "lock");
if (!g_file_test(path, G_FILE_TEST_EXISTS))
@ -368,28 +388,41 @@ static char *request(char *exe, Com type, char *caller, char *req)
flock(lock, LOCK_EX);
//retry in single proc
if (!ipcsend(exe, INPUT, type, caller, req))
if (!ipcsend(exe, INPUT, type, caller ? cli->pid : NULL, data))
{
if (!caller)
g_mutex_lock(&retm);
g_mutex_lock(&cli->retm);
GSource *tout = g_timeout_source_new(DUNTIL * 1000);
g_source_set_callback(tout, timeout, NULL, NULL);
g_source_attach(tout, wctx);
GSource *tout = g_timeout_source_new(KEEPS * 1000);
g_source_set_callback(tout, (GSourceFunc)timeoutcb, cli, NULL);
g_source_attach(tout, cli->wctx);
spawnsvr(exe);
g_mutex_lock(&retm);
g_mutex_unlock(&retm);
char **argv = g_new0(char*, 3);
argv[0] = exe;
argv[1] = cli->pid;
GError *err = NULL;
if (!g_spawn_async(NULL, argv, NULL, G_SPAWN_SEARCH_PATH,
NULL, NULL, NULL, &err))
{
g_print("err %s", err->message);
g_error_free(err);
}
g_free(argv);
g_mutex_lock(&cli->retm);
g_mutex_unlock(&cli->retm);
g_source_destroy(tout);
g_source_unref(tout);
//wyebloop doesn't know svr quits
reuntil(exe);
wyebkeep(exe, 0);
if (caller)
g_mutex_lock(&retm);
if (!ipcsend(exe, INPUT, type, caller, req))
g_mutex_lock(&cli->retm);
if (!ipcsend(exe, INPUT, type, caller ? cli->pid : NULL, data))
fatal(3); //spawning svr failse is fatal
}
close(lock);
@ -398,76 +431,61 @@ static char *request(char *exe, Com type, char *caller, char *req)
if (caller)
{
GSource *ping = g_timeout_source_new(DPINGTIME);
g_source_set_callback(ping, (GSourceFunc)pingloop, exe, NULL);
g_source_attach(ping, wctx); //attach to ping thread
cli->exe = exe;
g_source_set_callback(ping, (GSourceFunc)pingloop, cli, NULL);
g_source_attach(ping, cli->wctx); //attach to ping thread
g_mutex_lock(&retm);
g_mutex_unlock(&retm);
g_mutex_lock(&cli->retm);
g_mutex_unlock(&cli->retm);
g_source_destroy(ping);
g_source_unref(ping);
}
return retdata;
return cli->retdata;
}
char *wyebreq(char *exe, char *req)
char *wyebget(char *exe, char *data)
{
return request(exe, CSdata, pid(), req);
return request(exe, CSdata, true, data);
}
void wyebsend(char *exe, char *req)
void wyebsend(char *exe, char *data)
{
request(exe, CSdata, NULL, req);
request(exe, CSdata, false, data);
}
typedef struct {
char *exe;
int sec;
} wyebsst;
static void wsfree(wyebsst *ss)
static gboolean keepcb(char *exe)
{
g_free(ss->exe);
g_free(ss);
}
static gboolean untilcb(wyebsst *ss)
{
if (!lastsec)
lastsec = g_hash_table_new(g_str_hash, g_str_equal);
g_hash_table_replace(lastsec, ss->exe, GINT_TO_POINTER(ss->sec));
char *str = g_strdup_printf("%d", ss->sec);
request(ss->exe, CSuntil, NULL, str);
g_free(str);
request(exe, CSuntil, false, keepstr(exe));
return false;
}
void wyebuntil(char *exe, int sec)
void wyebkeep(char *exe, int sec)
{
wyebsst *ss = g_new(wyebsst, 1);
ss->exe = g_strdup(exe);
ss->sec = sec;
g_idle_add_full(G_PRIORITY_DEFAULT, (GSourceFunc)untilcb,
ss, (GDestroyNotify)wsfree);
if (sec) setsec(exe, sec);
g_idle_add_full(G_PRIORITY_DEFAULT, (GSourceFunc)keepcb,
g_strdup(exe), g_free);
}
static gboolean loopcb(wyebsst *ss)
static gboolean loopcb(char *exe)
{
untilcb(ss);
keepcb(exe);
return true;
}
guint wyebloop(char *exe, int sec, int loopsec)
guint wyebloop(char *exe, int sec)
{
wyebsst *ss = g_new(wyebsst, 1);
ss->exe = g_strdup(exe);
ss->sec = sec;
loopcb(ss);
return g_timeout_add_full(G_PRIORITY_DEFAULT,
loopsec * 1000,
(GSourceFunc)loopcb,
ss, (GDestroyNotify)wsfree);
if (sec) setsec(exe, sec);
loopcb(exe);
return g_timeout_add_full(G_PRIORITY_DEFAULT, sec * 300,
(GSourceFunc)loopcb, g_strdup(exe), g_free);
}
#if DEBUG
static void testget(gpointer p, gpointer ap)
{
D(send %s ret %s, (char *)p, wyebget(ap, p))
g_free(p);
}
#endif
static gboolean tcinputcb(GIOChannel *ch, GIOCondition c, char *exe)
{
char *line;
@ -484,28 +502,31 @@ static gboolean tcinputcb(GIOChannel *ch, GIOCondition c, char *exe)
#if DEBUG
if (g_str_has_prefix(line, "l"))
{
GThreadPool *pool = g_thread_pool_new(testget, exe, 4, false, NULL);
start = g_get_monotonic_time();
for (int i = 0; i < 10000; i++)
for (int i = 0; i < 1000; i++)
{
char *is = g_strdup_printf("l%d", i);
//g_print("loop %d ret %s\n", i, wyebreq(exe, is));
//g_print("loop %d ret %s\n", i, wyebget(exe, is));
if (*(line + 1) == 's')
wyebsend(exe, is);
else if (*(line + 1) == 'p')
D(pint %s, is)
else
wyebreq(exe, is);
g_thread_pool_push(pool, g_strdup(is), NULL);
// wyebget(exe, is);
g_free(is);
}
g_thread_pool_free(pool, false, true);
gint64 now = g_get_monotonic_time();
char *time = g_strdup_printf("time %f", (now - start) / 1000000.0);
wyebsend(exe, time);
g_free(time);
D(time %f, (now - start) / 1000000.0)
}
else
g_print("RET is %s\n", wyebreq(exe, line));
g_print("RET is %s\n", wyebget(exe, line));
#else
g_print("%s\n", wyebreq(exe, line)); //don't free
g_print("%s\n", wyebget(exe, line)); //don't free
#endif
g_free(line);
@ -513,12 +534,10 @@ static gboolean tcinputcb(GIOChannel *ch, GIOCondition c, char *exe)
}
static gboolean tcinit(char *exe)
{
//wyebuntil(exe, 1);
//wyebloop(exe, 2, 1);
#if DEBUG
wyebloop(exe, 300, 200);
wyebloop(exe, 300);
#else
wyebloop(exe, 2, 1);
wyebloop(exe, 2);
#endif
GIOChannel *io = g_io_channel_unix_new(fileno(stdin));
@ -528,10 +547,9 @@ static gboolean tcinit(char *exe)
}
void wyebclient(char *exe)
{
//pid_t getpid(void);
sloop = g_main_loop_new(NULL, false);
GMainLoop *loop = g_main_loop_new(NULL, false);
g_idle_add((GSourceFunc)tcinit, exe);
g_main_loop_run(sloop);
g_main_loop_run(loop);
}
@ -549,46 +567,51 @@ gboolean ipccb(GIOChannel *ch, GIOCondition c, gpointer p)
Com type = *unesc;
char *id = unesc + 1;
char *arg = strchr(unesc, ':');
*arg++ = '\0';
char *data = strchr(unesc, ':');
*data++ = '\0';
#if DEBUG
static int i = 0;
D(%c ipccb%d %c/%s/%s;, svrexe ? 'S':'C', i++, type ,id ,arg)
// static int i = 0;
// D(%c ipccb%d %c/%s/%s;, svrexe ? 'S':'C', i++, type ,id ,data)
#endif
static int lastuntil = DUNTIL;
switch (type) {
//server
case CSuntil:
until(lastuntil = atoi(arg));
break;
case CSdata:
{
Dataargs *args = g_new(Dataargs, 1);
args->caller = g_strdup(id);
args->req = g_strdup(arg);
args->until = lastuntil;
g_thread_unref(g_thread_new("getdata", getdata, args));
args->data = g_strdup(data);
static GThreadPool *pool = NULL;
if (!pool) pool = g_thread_pool_new(getdata, NULL, -1, false, NULL);
g_thread_pool_push(pool, args, NULL);
break;
}
case CSping:
g_mutex_lock(&ordersm);
if (!g_hash_table_lookup(orders, id))
ipcsend(svrexe, id, CClost, NULL, NULL);
ipcsend(CLIDIR, id, CClost, NULL, NULL);
g_mutex_unlock(&ordersm);
case CSuntil:
until(atoi(data));
break;
//client
case CCret:
retdata = g_strdup(arg);
case CClost:
case CCwoke:
{
Client *cli = p;
if (type == CCret)
cli->retdata = g_strdup(data);
//for the case pinging at same time of ret
g_mutex_trylock(&retm);
g_mutex_unlock(&retm);
g_mutex_trylock(&cli->retm);
g_mutex_unlock(&cli->retm);
break;
}
}
g_free(unesc);
return true;
@ -598,13 +621,13 @@ gboolean ipccb(GIOChannel *ch, GIOCondition c, gpointer p)
//test
#if DEBUG
static char *testdata(char *req)
static char *testdata(char *data)
{
//sleep(9);
//g_free(req); //makes crash
//g_free(data); //makes crash
static int i = 0;
return g_strdup_printf("%d th test data. req is %s", ++i, req);
return g_strdup_printf("%d th test data. req is %s", ++i, data);
}

View File

@ -24,21 +24,21 @@ along with wyebrun. If not, see <http://www.gnu.org/licenses/>.
#include <glib/gstdio.h>
#define WYEBPREFIX "-wyeb"
#define WYEBDUNTIL 3
#define WYEBKEEPSEC 3
//client
//wyebrun spawns the exe if wyebsend failes
char *wyebreq( char *exe, char *req); //don't free the ret val
void wyebsend( char *exe, char *req);
void wyebuntil( char *exe, int sec); //keep alive. default is 3s
char *wyebget( char *exe, char *data); //don't free the ret val
void wyebsend(char *exe, char *data);
void wyebkeep(char *exe, int sec); //keep alive. default is 3s
//loop the wyebuntil. to stop, use g_source_remove
guint wyebloop( char *exe, int sec, int loopsec);
guint wyebloop(char *exe, int sec);
//send stdin to svr and ret data to stdout
//blank and enter exit it
void wyebclient(char *exe);
//server
typedef char *(*wyebdataf)(char *req);
typedef char *(*wyebdataf)(char *data);
//server is spawned with an arg the caller
bool wyebsvr(int argc, char **argv, wyebdataf func);
//or if there is own GMainLoop