badvpn/system/BReactor_glib.c

525 lines
13 KiB
C

/**
* @file BReactor_glib.c
* @author Ambroz Bizjak <ambrop7@gmail.com>
*
* @section LICENSE
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the name of the author nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <stdlib.h>
#include <string.h>
#include <misc/offset.h>
#include <base/BLog.h>
#include "BReactor_glib.h"
#include <generated/blog_channel_BReactor.h>
struct fd_source {
GSource source;
BFileDescriptor *bfd;
};
static void assert_timer (BSmallTimer *bt)
{
ASSERT(bt->is_small == 0 || bt->is_small == 1)
ASSERT(bt->active == 0 || bt->active == 1)
ASSERT(!bt->active || bt->reactor)
ASSERT(!bt->active || bt->source)
}
static void dispatch_pending (BReactor *o)
{
while (!o->exiting && BPendingGroup_HasJobs(&o->pending_jobs)) {
BPendingGroup_ExecuteJob(&o->pending_jobs);
}
}
static void reset_limits (BReactor *o)
{
LinkedList1Node *list_node;
while (list_node = LinkedList1_GetFirst(&o->active_limits_list)) {
BReactorLimit *limit = UPPER_OBJECT(list_node, BReactorLimit, active_limits_list_node);
ASSERT(limit->count > 0)
limit->count = 0;
LinkedList1_Remove(&o->active_limits_list, &limit->active_limits_list_node);
}
}
static gushort get_glib_wait_events (int ev)
{
gushort gev = G_IO_ERR | G_IO_HUP;
if (ev & BREACTOR_READ) {
gev |= G_IO_IN;
}
if (ev & BREACTOR_WRITE) {
gev |= G_IO_OUT;
}
return gev;
}
static int get_fd_dispatchable_events (BFileDescriptor *bfd)
{
ASSERT(bfd->active)
int ev = 0;
if ((bfd->waitEvents & BREACTOR_READ) && (bfd->pollfd.revents & G_IO_IN)) {
ev |= BREACTOR_READ;
}
if ((bfd->waitEvents & BREACTOR_WRITE) && (bfd->pollfd.revents & G_IO_OUT)) {
ev |= BREACTOR_WRITE;
}
if ((bfd->pollfd.revents & G_IO_ERR)) {
ev |= BREACTOR_ERROR;
}
if ((bfd->pollfd.revents & G_IO_HUP)) {
ev |= BREACTOR_HUP;
}
return ev;
}
static gboolean timer_source_handler (gpointer data)
{
BSmallTimer *bt = (void *)data;
assert_timer(bt);
ASSERT(bt->active)
BReactor *reactor = bt->reactor;
if (reactor->exiting) {
return FALSE;
}
g_source_destroy(bt->source);
g_source_unref(bt->source);
bt->active = 0;
DebugCounter_Decrement(&reactor->d_timers_ctr);
if (bt->is_small) {
bt->handler.smalll(bt);
} else {
BTimer *btimer = UPPER_OBJECT(bt, BTimer, base);
bt->handler.heavy(btimer->user);
}
dispatch_pending(reactor);
reset_limits(reactor);
return FALSE;
}
static gboolean fd_source_func_prepare (GSource *source, gint *timeout)
{
BFileDescriptor *bfd = ((struct fd_source *)source)->bfd;
ASSERT(bfd->active)
ASSERT(bfd->source == source)
*timeout = -1;
return FALSE;
}
static gboolean fd_source_func_check (GSource *source)
{
BFileDescriptor *bfd = ((struct fd_source *)source)->bfd;
ASSERT(bfd->active)
ASSERT(bfd->source == source)
return (get_fd_dispatchable_events(bfd) ? TRUE : FALSE);
}
static gboolean fd_source_func_dispatch (GSource *source, GSourceFunc callback, gpointer user_data)
{
BFileDescriptor *bfd = ((struct fd_source *)source)->bfd;
BReactor *reactor = bfd->reactor;
ASSERT(bfd->active)
ASSERT(bfd->source == source)
if (reactor->exiting) {
return TRUE;
}
int events = get_fd_dispatchable_events(bfd);
if (!events) {
return TRUE;
}
bfd->handler(bfd->user, events);
dispatch_pending(reactor);
reset_limits(reactor);
return TRUE;
}
void BSmallTimer_Init (BSmallTimer *bt, BSmallTimer_handler handler)
{
bt->handler.smalll = handler;
bt->active = 0;
bt->is_small = 1;
}
int BSmallTimer_IsRunning (BSmallTimer *bt)
{
assert_timer(bt);
return bt->active;
}
void BTimer_Init (BTimer *bt, btime_t msTime, BTimer_handler handler, void *user)
{
bt->base.handler.heavy = handler;
bt->base.active = 0;
bt->base.is_small = 0;
bt->user = user;
bt->msTime = msTime;
}
int BTimer_IsRunning (BTimer *bt)
{
return BSmallTimer_IsRunning(&bt->base);
}
void BFileDescriptor_Init (BFileDescriptor *bs, int fd, BFileDescriptor_handler handler, void *user)
{
bs->fd = fd;
bs->handler = handler;
bs->user = user;
bs->active = 0;
}
int BReactor_Init (BReactor *bsys)
{
return BReactor_InitFromExistingGMainLoop(bsys, g_main_loop_new(NULL, FALSE), 1);
}
void BReactor_Free (BReactor *bsys)
{
DebugObject_Free(&bsys->d_obj);
DebugCounter_Free(&bsys->d_timers_ctr);
DebugCounter_Free(&bsys->d_limits_ctr);
DebugCounter_Free(&bsys->d_fds_counter);
ASSERT(!BPendingGroup_HasJobs(&bsys->pending_jobs))
ASSERT(LinkedList1_IsEmpty(&bsys->active_limits_list))
// free job queue
BPendingGroup_Free(&bsys->pending_jobs);
// unref main loop if needed
if (bsys->unref_gloop_on_free) {
g_main_loop_unref(bsys->gloop);
}
}
int BReactor_Exec (BReactor *bsys)
{
DebugObject_Access(&bsys->d_obj);
// dispatch pending jobs (until exiting) and reset limits
dispatch_pending(bsys);
reset_limits(bsys);
// if exiting, do not enter glib loop
if (bsys->exiting) {
return bsys->exit_code;
}
// enter glib loop
g_main_loop_run(bsys->gloop);
ASSERT(bsys->exiting)
return bsys->exit_code;
}
void BReactor_Quit (BReactor *bsys, int code)
{
DebugObject_Access(&bsys->d_obj);
// remember exiting
bsys->exiting = 1;
bsys->exit_code = code;
// request termination of glib loop
g_main_loop_quit(bsys->gloop);
}
void BReactor_SetSmallTimer (BReactor *bsys, BSmallTimer *bt, int mode, btime_t time)
{
DebugObject_Access(&bsys->d_obj);
assert_timer(bt);
// remove timer if it's already set
BReactor_RemoveSmallTimer(bsys, bt);
// if mode is absolute, subtract current time
if (mode == BTIMER_SET_ABSOLUTE) {
btime_t now = btime_gettime();
time = (time < now ? 0 : time - now);
}
// set active and reactor
bt->active = 1;
bt->reactor = bsys;
// init source
bt->source = g_timeout_source_new(time);
g_source_set_callback(bt->source, timer_source_handler, bt, NULL);
g_source_attach(bt->source, g_main_loop_get_context(bsys->gloop));
DebugCounter_Increment(&bsys->d_timers_ctr);
}
void BReactor_RemoveSmallTimer (BReactor *bsys, BSmallTimer *bt)
{
DebugObject_Access(&bsys->d_obj);
assert_timer(bt);
// do nothing if timer is not active
if (!bt->active) {
return;
}
// free source
g_source_destroy(bt->source);
g_source_unref(bt->source);
// set not active
bt->active = 0;
DebugCounter_Decrement(&bsys->d_timers_ctr);
}
void BReactor_SetTimer (BReactor *bsys, BTimer *bt)
{
BReactor_SetSmallTimer(bsys, &bt->base, BTIMER_SET_RELATIVE, bt->msTime);
}
void BReactor_SetTimerAfter (BReactor *bsys, BTimer *bt, btime_t after)
{
BReactor_SetSmallTimer(bsys, &bt->base, BTIMER_SET_RELATIVE, after);
}
void BReactor_SetTimerAbsolute (BReactor *bsys, BTimer *bt, btime_t time)
{
BReactor_SetSmallTimer(bsys, &bt->base, BTIMER_SET_ABSOLUTE, time);
}
void BReactor_RemoveTimer (BReactor *bsys, BTimer *bt)
{
return BReactor_RemoveSmallTimer(bsys, &bt->base);
}
BPendingGroup * BReactor_PendingGroup (BReactor *bsys)
{
DebugObject_Access(&bsys->d_obj);
return &bsys->pending_jobs;
}
int BReactor_Synchronize (BReactor *bsys, BSmallPending *ref)
{
DebugObject_Access(&bsys->d_obj);
ASSERT(ref)
while (!bsys->exiting) {
ASSERT(BPendingGroup_HasJobs(&bsys->pending_jobs))
if (BPendingGroup_PeekJob(&bsys->pending_jobs) == ref) {
return 1;
}
BPendingGroup_ExecuteJob(&bsys->pending_jobs);
}
return 0;
}
int BReactor_AddFileDescriptor (BReactor *bsys, BFileDescriptor *bs)
{
DebugObject_Access(&bsys->d_obj);
ASSERT(!bs->active)
// set active, no wait events, and set reactor
bs->active = 1;
bs->waitEvents = 0;
bs->reactor = bsys;
// create source
bs->source = g_source_new(&bsys->fd_source_funcs, sizeof(struct fd_source));
((struct fd_source *)bs->source)->bfd = bs;
// init pollfd
bs->pollfd.fd = bs->fd;
bs->pollfd.events = get_glib_wait_events(bs->waitEvents);
bs->pollfd.revents = 0;
// start source
g_source_add_poll(bs->source, &bs->pollfd);
g_source_attach(bs->source, g_main_loop_get_context(bsys->gloop));
DebugCounter_Increment(&bsys->d_fds_counter);
return 1;
}
void BReactor_RemoveFileDescriptor (BReactor *bsys, BFileDescriptor *bs)
{
DebugObject_Access(&bsys->d_obj);
DebugCounter_Decrement(&bsys->d_fds_counter);
ASSERT(bs->active)
// free source
g_source_destroy(bs->source);
g_source_unref(bs->source);
// set not active
bs->active = 0;
}
void BReactor_SetFileDescriptorEvents (BReactor *bsys, BFileDescriptor *bs, int events)
{
DebugObject_Access(&bsys->d_obj);
ASSERT(bs->active)
ASSERT(!(events&~(BREACTOR_READ|BREACTOR_WRITE)))
// set new wait events
bs->waitEvents = events;
// update pollfd wait events
bs->pollfd.events = get_glib_wait_events(bs->waitEvents);
}
int BReactor_InitFromExistingGMainLoop (BReactor *bsys, GMainLoop *gloop, int unref_gloop_on_free)
{
ASSERT(gloop)
ASSERT(unref_gloop_on_free == !!unref_gloop_on_free)
// set not exiting
bsys->exiting = 0;
// set gloop and unref on free flag
bsys->gloop = gloop;
bsys->unref_gloop_on_free = unref_gloop_on_free;
// init fd source functions table
memset(&bsys->fd_source_funcs, 0, sizeof(bsys->fd_source_funcs));
bsys->fd_source_funcs.prepare = fd_source_func_prepare;
bsys->fd_source_funcs.check = fd_source_func_check;
bsys->fd_source_funcs.dispatch = fd_source_func_dispatch;
bsys->fd_source_funcs.finalize = NULL;
// init job queue
BPendingGroup_Init(&bsys->pending_jobs);
// init active limits list
LinkedList1_Init(&bsys->active_limits_list);
DebugCounter_Init(&bsys->d_fds_counter);
DebugCounter_Init(&bsys->d_limits_ctr);
DebugCounter_Init(&bsys->d_timers_ctr);
DebugObject_Init(&bsys->d_obj);
return 1;
}
GMainLoop * BReactor_GetGMainLoop (BReactor *bsys)
{
DebugObject_Access(&bsys->d_obj);
return bsys->gloop;
}
int BReactor_SynchronizeAll (BReactor *bsys)
{
DebugObject_Access(&bsys->d_obj);
dispatch_pending(bsys);
return !bsys->exiting;
}
void BReactorLimit_Init (BReactorLimit *o, BReactor *reactor, int limit)
{
DebugObject_Access(&reactor->d_obj);
ASSERT(limit > 0)
// init arguments
o->reactor = reactor;
o->limit = limit;
// set count zero
o->count = 0;
DebugCounter_Increment(&reactor->d_limits_ctr);
DebugObject_Init(&o->d_obj);
}
void BReactorLimit_Free (BReactorLimit *o)
{
BReactor *reactor = o->reactor;
DebugObject_Free(&o->d_obj);
DebugCounter_Decrement(&reactor->d_limits_ctr);
// remove from active limits list
if (o->count > 0) {
LinkedList1_Remove(&reactor->active_limits_list, &o->active_limits_list_node);
}
}
int BReactorLimit_Increment (BReactorLimit *o)
{
BReactor *reactor = o->reactor;
DebugObject_Access(&o->d_obj);
// check count against limit
if (o->count >= o->limit) {
return 0;
}
// increment count
o->count++;
// if limit was zero, add to active limits list
if (o->count == 1) {
LinkedList1_Append(&reactor->active_limits_list, &o->active_limits_list_node);
}
return 1;
}
void BReactorLimit_SetLimit (BReactorLimit *o, int limit)
{
DebugObject_Access(&o->d_obj);
ASSERT(limit > 0)
// set limit
o->limit = limit;
}