badvpn/threadwork/BThreadWork.c
2012-09-01 01:54:52 +00:00

452 lines
13 KiB
C

/**
* @file BThreadWork.c
* @author Ambroz Bizjak <ambrop7@gmail.com>
*
* @section LICENSE
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the name of the author nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <stdint.h>
#include <stddef.h>
#ifdef BADVPN_THREADWORK_USE_PTHREAD
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#endif
#include <misc/offset.h>
#include <base/BLog.h>
#include <generated/blog_channel_BThreadWork.h>
#include <threadwork/BThreadWork.h>
#ifdef BADVPN_THREADWORK_USE_PTHREAD
static void * dispatcher_thread (struct BThreadWorkDispatcher_thread *t)
{
BThreadWorkDispatcher *o = t->d;
ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
while (1) {
// exit if requested
if (o->cancel) {
break;
}
if (LinkedList1_IsEmpty(&o->pending_list)) {
// wait for event
ASSERT_FORCE(pthread_cond_wait(&t->new_cond, &o->mutex) == 0)
continue;
}
// grab the work
BThreadWork *w = UPPER_OBJECT(LinkedList1_GetFirst(&o->pending_list), BThreadWork, list_node);
ASSERT(w->state == BTHREADWORK_STATE_PENDING)
LinkedList1_Remove(&o->pending_list, &w->list_node);
t->running_work = w;
w->state = BTHREADWORK_STATE_RUNNING;
// do the work
ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
w->work_func(w->work_func_user);
ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
// release the work
t->running_work = NULL;
LinkedList1_Append(&o->finished_list, &w->list_node);
w->state = BTHREADWORK_STATE_FINISHED;
ASSERT_FORCE(sem_post(&w->finished_sem) == 0)
// write to pipe
uint8_t b = 0;
int res = write(o->pipe[1], &b, sizeof(b));
if (res < 0) {
int error = errno;
ASSERT_FORCE(error == EAGAIN || error == EWOULDBLOCK)
}
}
ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
return NULL;
}
static void dispatch_job (BThreadWorkDispatcher *o)
{
ASSERT(o->num_threads > 0)
// lock
ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
// check for finished job
if (LinkedList1_IsEmpty(&o->finished_list)) {
ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
return;
}
// grab finished job
BThreadWork *w = UPPER_OBJECT(LinkedList1_GetFirst(&o->finished_list), BThreadWork, list_node);
ASSERT(w->state == BTHREADWORK_STATE_FINISHED)
LinkedList1_Remove(&o->finished_list, &w->list_node);
// schedule more
if (!LinkedList1_IsEmpty(&o->finished_list)) {
BPending_Set(&o->more_job);
}
// set state forgotten
w->state = BTHREADWORK_STATE_FORGOTTEN;
// unlock
ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
// call handler
w->handler_done(w->user);
return;
}
static void pipe_fd_handler (BThreadWorkDispatcher *o, int events)
{
ASSERT(o->num_threads > 0)
DebugObject_Access(&o->d_obj);
// read data from pipe
uint8_t b[64];
int res = read(o->pipe[0], b, sizeof(b));
if (res < 0) {
int error = errno;
ASSERT_FORCE(error == EAGAIN || error == EWOULDBLOCK)
} else {
ASSERT(res > 0)
}
dispatch_job(o);
return;
}
static void more_job_handler (BThreadWorkDispatcher *o)
{
ASSERT(o->num_threads > 0)
DebugObject_Access(&o->d_obj);
dispatch_job(o);
return;
}
static void stop_threads (BThreadWorkDispatcher *o)
{
// set cancelling
ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
o->cancel = 1;
ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
while (o->num_threads > 0) {
struct BThreadWorkDispatcher_thread *t = &o->threads[o->num_threads - 1];
// wake up thread
ASSERT_FORCE(pthread_cond_signal(&t->new_cond) == 0)
// wait for thread to exit
ASSERT_FORCE(pthread_join(t->thread, NULL) == 0)
// free condition variable
ASSERT_FORCE(pthread_cond_destroy(&t->new_cond) == 0)
o->num_threads--;
}
}
#endif
static void work_job_handler (BThreadWork *o)
{
#ifdef BADVPN_THREADWORK_USE_PTHREAD
ASSERT(o->d->num_threads == 0)
#endif
DebugObject_Access(&o->d_obj);
// do the work
o->work_func(o->work_func_user);
// call handler
o->handler_done(o->user);
return;
}
int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int num_threads_hint)
{
// init arguments
o->reactor = reactor;
if (num_threads_hint < 0) {
num_threads_hint = 2;
}
if (num_threads_hint > BTHREADWORK_MAX_THREADS) {
num_threads_hint = BTHREADWORK_MAX_THREADS;
}
#ifdef BADVPN_THREADWORK_USE_PTHREAD
if (num_threads_hint > 0) {
// init pending list
LinkedList1_Init(&o->pending_list);
// init finished list
LinkedList1_Init(&o->finished_list);
// init mutex
if (pthread_mutex_init(&o->mutex, NULL) != 0) {
BLog(BLOG_ERROR, "pthread_mutex_init failed");
goto fail0;
}
// init pipe
if (pipe(o->pipe) < 0) {
BLog(BLOG_ERROR, "pipe failed");
goto fail1;
}
// set read end non-blocking
if (fcntl(o->pipe[0], F_SETFL, O_NONBLOCK) < 0) {
BLog(BLOG_ERROR, "fcntl failed");
goto fail2;
}
// set write end non-blocking
if (fcntl(o->pipe[1], F_SETFL, O_NONBLOCK) < 0) {
BLog(BLOG_ERROR, "fcntl failed");
goto fail2;
}
// init BFileDescriptor
BFileDescriptor_Init(&o->bfd, o->pipe[0], (BFileDescriptor_handler)pipe_fd_handler, o);
if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
goto fail2;
}
BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
// init more job
BPending_Init(&o->more_job, BReactor_PendingGroup(o->reactor), (BPending_handler)more_job_handler, o);
// set not cancelling
o->cancel = 0;
// init threads
o->num_threads = 0;
for (int i = 0; i < num_threads_hint; i++) {
struct BThreadWorkDispatcher_thread *t = &o->threads[i];
// set parent pointer
t->d = o;
// set no running work
t->running_work = NULL;
// init condition variable
if (pthread_cond_init(&t->new_cond, NULL) != 0) {
BLog(BLOG_ERROR, "pthread_cond_init failed");
goto fail3;
}
// init thread
if (pthread_create(&t->thread, NULL, (void * (*) (void *))dispatcher_thread, t) != 0) {
BLog(BLOG_ERROR, "pthread_create failed");
ASSERT_FORCE(pthread_cond_destroy(&t->new_cond) == 0)
goto fail3;
}
o->num_threads++;
}
}
#endif
DebugObject_Init(&o->d_obj);
DebugCounter_Init(&o->d_ctr);
return 1;
#ifdef BADVPN_THREADWORK_USE_PTHREAD
fail3:
stop_threads(o);
BPending_Free(&o->more_job);
BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
fail2:
ASSERT_FORCE(close(o->pipe[0]) == 0)
ASSERT_FORCE(close(o->pipe[1]) == 0)
fail1:
ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
fail0:
return 0;
#endif
}
void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o)
{
#ifdef BADVPN_THREADWORK_USE_PTHREAD
if (o->num_threads > 0) {
ASSERT(LinkedList1_IsEmpty(&o->pending_list))
for (int i = 0; i < o->num_threads; i++) { ASSERT(!o->threads[i].running_work) }
ASSERT(LinkedList1_IsEmpty(&o->finished_list))
}
#endif
DebugObject_Free(&o->d_obj);
DebugCounter_Free(&o->d_ctr);
#ifdef BADVPN_THREADWORK_USE_PTHREAD
if (o->num_threads > 0) {
// stop threads
stop_threads(o);
// free more job
BPending_Free(&o->more_job);
// free BFileDescriptor
BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
// free pipe
ASSERT_FORCE(close(o->pipe[0]) == 0)
ASSERT_FORCE(close(o->pipe[1]) == 0)
// free mutex
ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
}
#endif
}
int BThreadWorkDispatcher_UsingThreads (BThreadWorkDispatcher *o)
{
#ifdef BADVPN_THREADWORK_USE_PTHREAD
return (o->num_threads > 0);
#else
return 0;
#endif
}
void BThreadWork_Init (BThreadWork *o, BThreadWorkDispatcher *d, BThreadWork_handler_done handler_done, void *user, BThreadWork_work_func work_func, void *work_func_user)
{
DebugObject_Access(&d->d_obj);
// init arguments
o->d = d;
o->handler_done = handler_done;
o->user = user;
o->work_func = work_func;
o->work_func_user = work_func_user;
#ifdef BADVPN_THREADWORK_USE_PTHREAD
if (d->num_threads > 0) {
// set state
o->state = BTHREADWORK_STATE_PENDING;
// init finished semaphore
ASSERT_FORCE(sem_init(&o->finished_sem, 0, 0) == 0)
// post work
ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
LinkedList1_Append(&d->pending_list, &o->list_node);
for (int i = 0; i < d->num_threads; i++) {
if (!d->threads[i].running_work) {
ASSERT_FORCE(pthread_cond_signal(&d->threads[i].new_cond) == 0)
break;
}
}
ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
} else {
#endif
// schedule job
BPending_Init(&o->job, BReactor_PendingGroup(d->reactor), (BPending_handler)work_job_handler, o);
BPending_Set(&o->job);
#ifdef BADVPN_THREADWORK_USE_PTHREAD
}
#endif
DebugObject_Init(&o->d_obj);
DebugCounter_Increment(&d->d_ctr);
}
void BThreadWork_Free (BThreadWork *o)
{
BThreadWorkDispatcher *d = o->d;
DebugObject_Free(&o->d_obj);
DebugCounter_Decrement(&d->d_ctr);
#ifdef BADVPN_THREADWORK_USE_PTHREAD
if (d->num_threads > 0) {
ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
switch (o->state) {
case BTHREADWORK_STATE_PENDING: {
BLog(BLOG_DEBUG, "remove pending work");
// remove from pending list
LinkedList1_Remove(&d->pending_list, &o->list_node);
} break;
case BTHREADWORK_STATE_RUNNING: {
BLog(BLOG_DEBUG, "remove running work");
// wait for the work to finish running
ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
ASSERT_FORCE(sem_wait(&o->finished_sem) == 0)
ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
ASSERT(o->state == BTHREADWORK_STATE_FINISHED)
// remove from finished list
LinkedList1_Remove(&d->finished_list, &o->list_node);
} break;
case BTHREADWORK_STATE_FINISHED: {
BLog(BLOG_DEBUG, "remove finished work");
// remove from finished list
LinkedList1_Remove(&d->finished_list, &o->list_node);
} break;
case BTHREADWORK_STATE_FORGOTTEN: {
BLog(BLOG_DEBUG, "remove forgotten work");
} break;
default:
ASSERT(0);
}
ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
// free finished semaphore
ASSERT_FORCE(sem_destroy(&o->finished_sem) == 0)
} else {
#endif
BPending_Free(&o->job);
#ifdef BADVPN_THREADWORK_USE_PTHREAD
}
#endif
}