Merge branch 'th_notify_fd_reinit'

This commit is contained in:
Nick Mathewson 2010-09-08 14:00:45 -04:00
commit 9580e282d7
5 changed files with 147 additions and 16 deletions

View File

@ -262,6 +262,9 @@ struct event_base {
enum event_base_config_flag flags;
/* Notify main thread to wake up break, etc. */
/** True if the base already has a pending notify, and we don't need
* to add any more. */
int is_notify_pending;
/** A socketpair used by some th_notify functions to wake up the main
* thread. */
int th_notify_fd[2];

52
event.c
View File

@ -471,7 +471,7 @@ static void
notify_base_cbq_callback(struct deferred_cb_queue *cb, void *baseptr)
{
struct event_base *base = baseptr;
if (!EVBASE_IN_THREAD(base))
if (EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base);
}
@ -742,6 +742,7 @@ event_reinit(struct event_base *base)
const struct eventop *evsel;
int res = 0;
struct event *ev;
int was_notifiable = 0;
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
@ -769,6 +770,23 @@ event_reinit(struct event_base *base)
EVLIST_ACTIVE);
base->sig.ev_signal_added = 0;
}
if (base->th_notify_fd[0] != -1) {
/* we cannot call event_del here because the base has
* not been reinitialized yet. */
was_notifiable = 1;
event_queue_remove(base, &base->th_notify,
EVLIST_INSERTED);
if (base->th_notify.ev_flags & EVLIST_ACTIVE)
event_queue_remove(base, &base->th_notify,
EVLIST_ACTIVE);
base->sig.ev_signal_added = 0;
EVUTIL_CLOSESOCKET(base->th_notify_fd[0]);
if (base->th_notify_fd[1] != -1)
EVUTIL_CLOSESOCKET(base->th_notify_fd[1]);
base->th_notify_fd[0] = -1;
base->th_notify_fd[1] = -1;
event_debug_unassign(&base->th_notify);
}
if (base->evsel->dealloc != NULL)
base->evsel->dealloc(base);
@ -794,6 +812,9 @@ event_reinit(struct event_base *base)
}
}
if (was_notifiable && res == 0)
res = evthread_make_base_notifiable(base);
done:
EVBASE_RELEASE_LOCK(base, th_base_lock);
return (res);
@ -1369,18 +1390,20 @@ event_loopbreak(void)
int
event_base_loopbreak(struct event_base *event_base)
{
int r = 0;
if (event_base == NULL)
return (-1);
EVBASE_ACQUIRE_LOCK(event_base, th_base_lock);
event_base->event_break = 1;
EVBASE_RELEASE_LOCK(event_base, th_base_lock);
if (!EVBASE_IN_THREAD(event_base)) {
return evthread_notify_base(event_base);
if (EVBASE_NEED_NOTIFY(event_base)) {
r = evthread_notify_base(event_base);
} else {
return (0);
r = (0);
}
EVBASE_RELEASE_LOCK(event_base, th_base_lock);
return r;
}
int
@ -1879,8 +1902,12 @@ evthread_notify_base_eventfd(struct event_base *base)
static int
evthread_notify_base(struct event_base *base)
{
EVENT_BASE_ASSERT_LOCKED(base);
if (!base->th_notify_fn)
return -1;
if (base->is_notify_pending)
return 0;
base->is_notify_pending = 1;
return base->th_notify_fn(base);
}
@ -2029,7 +2056,7 @@ event_add_internal(struct event *ev, const struct timeval *tv,
}
/* if we are not in the right thread, we need to wake up the loop */
if (res != -1 && notify && !EVBASE_IN_THREAD(base))
if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base);
_event_debug_note_add(ev);
@ -2123,7 +2150,7 @@ event_del_internal(struct event *ev)
}
/* if we are not in the right thread, we need to wake up the loop */
if (res != -1 && notify && !EVBASE_IN_THREAD(base))
if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base);
_event_debug_note_del(ev);
@ -2224,8 +2251,6 @@ event_deferred_cb_schedule(struct deferred_cb_queue *queue,
cb->queued = 1;
TAILQ_INSERT_TAIL(&queue->deferred_cb_list, cb, cb_next);
++queue->active_count;
/* XXXX Can we get away with doing this only when adding
* the first active deferred_cb to the queue? */
if (queue->notify_fn)
queue->notify_fn(queue, queue->notify_arg);
}
@ -2571,11 +2596,15 @@ evthread_notify_drain_eventfd(evutil_socket_t fd, short what, void *arg)
{
ev_uint64_t msg;
ev_ssize_t r;
struct event_base *base = arg;
r = read(fd, (void*) &msg, sizeof(msg));
if (r<0 && errno != EAGAIN) {
event_sock_warn(fd, "Error reading from eventfd");
}
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
base->is_notify_pending = 0;
EVBASE_RELEASE_LOCK(base, th_base_lock);
}
#endif
@ -2583,6 +2612,7 @@ static void
evthread_notify_drain_default(evutil_socket_t fd, short what, void *arg)
{
unsigned char buf[1024];
struct event_base *base = arg;
#ifdef WIN32
while (recv(fd, (char*)buf, sizeof(buf), 0) > 0)
;
@ -2590,6 +2620,10 @@ evthread_notify_drain_default(evutil_socket_t fd, short what, void *arg)
while (read(fd, (char*)buf, sizeof(buf)) > 0)
;
#endif
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
base->is_notify_pending = 0;
EVBASE_RELEASE_LOCK(base, th_base_lock);
}
int

View File

@ -56,12 +56,20 @@ extern int _evthread_lock_debugging_enabled;
#define EVTHREAD_GET_ID() \
(_evthread_id_fn ? _evthread_id_fn() : 1)
/** Return true iff we're in the thread that is currently running a given
* event_base's loop. */
/** Return true iff we're in the thread that is currently (or most recently)
* running a given event_base's loop. Requires lock. */
#define EVBASE_IN_THREAD(base) \
(_evthread_id_fn == NULL || \
(base)->th_owner_id == _evthread_id_fn())
/** Return true iff we need to notify the base's main thread about changes to
* its state, because it's currently running the main loop in another
* thread. Requires lock. */
#define EVBASE_NEED_NOTIFY(base) \
(_evthread_id_fn != NULL && \
(base)->running_loop && \
(base)->th_owner_id != _evthread_id_fn())
/** Allocate a new lock, and store it in lockvar, a void*. Sets lockvar to
NULL if locking is not enabled. */
#define EVTHREAD_ALLOC_LOCK(lockvar, locktype) \
@ -281,6 +289,7 @@ EVLOCK_TRY_LOCK(void *lock)
#define EVLOCK_UNLOCK2(lock1,lock2,mode1,mode2) _EVUTIL_NIL_STMT
#define EVBASE_IN_THREAD(base) 1
#define EVBASE_NEED_NOTIFY(base) 0
#define EVBASE_ACQUIRE_LOCK(base, lock) _EVUTIL_NIL_STMT
#define EVBASE_RELEASE_LOCK(base, lock) _EVUTIL_NIL_STMT
#define EVLOCK_ASSERT_LOCKED(lock) _EVUTIL_NIL_STMT

View File

@ -62,6 +62,7 @@
#include "event2/buffer_compat.h"
#include "event2/util.h"
#include "event-internal.h"
#include "evthread-internal.h"
#include "util-internal.h"
#include "log-internal.h"
@ -805,6 +806,9 @@ test_fork(void)
setup_test("After fork: ");
tt_assert(current_base);
evthread_make_base_notifiable(current_base);
write(pair[0], TEST1, strlen(TEST1)+1);
event_set(&ev, pair[1], EV_READ, simple_read_cb, &ev);
@ -863,6 +867,7 @@ test_fork(void)
evsignal_del(&sig_ev);
end:
cleanup_test();
}

View File

@ -40,12 +40,18 @@
#include <process.h>
#endif
#include <assert.h>
#ifdef _EVENT_HAVE_UNISTD_H
#include <unistd.h>
#endif
#include "sys/queue.h"
#include "event2/util.h"
#include "event2/event.h"
#include "event2/event_struct.h"
#include "event2/thread.h"
#include "evthread-internal.h"
#include "event-internal.h"
#include "regress.h"
#include "tinytest_macros.h"
@ -131,7 +137,7 @@ basic_thread(void *arg)
/* exit the loop only if all threads fired all timeouts */
EVLOCK_LOCK(count_lock, 0);
if (count >= NUM_THREADS * 100)
if (count >= NUM_THREADS * NUM_ITERATIONS)
event_base_loopexit(base, NULL);
EVLOCK_UNLOCK(count_lock, 0);
@ -141,6 +147,27 @@ basic_thread(void *arg)
THREAD_RETURN();
}
static int got_sigchld = 0;
static void
sigchld_cb(evutil_socket_t fd, short event, void *arg)
{
struct timeval tv;
struct event_base *base = arg;
got_sigchld++;
tv.tv_usec = 100000;
tv.tv_sec = 0;
event_base_loopexit(base, &tv);
}
static int notification_fd_used = 0;
static void
notify_fd_cb(evutil_socket_t fd, short event, void *arg)
{
++notification_fd_used;
}
static void
thread_basic(void *arg)
{
@ -151,6 +178,10 @@ thread_basic(void *arg)
struct basic_test_data *data = arg;
struct event_base *base = data->base;
int forking = data->setup_data && !strcmp(data->setup_data, "forking");
struct event *notification_event = NULL;
struct event *sigchld_event = NULL;
EVTHREAD_ALLOC_LOCK(count_lock, 0);
tt_assert(count_lock);
@ -159,6 +190,42 @@ thread_basic(void *arg)
tt_abort_msg("Couldn't make base notifiable!");
}
#ifndef WIN32
if (forking) {
pid_t pid;
int status;
sigchld_event = evsignal_new(base, SIGCHLD, sigchld_cb, base);
/* This piggybacks on the th_notify_fd weirdly, and looks
* inside libevent internals. Not a good idea in non-testing
* code! */
notification_event = event_new(base,
base->th_notify_fd[0], EV_READ|EV_PERSIST, notify_fd_cb,
NULL);
event_add(sigchld_event, NULL);
event_add(notification_event, NULL);
if ((pid = fork()) == 0) {
if (event_reinit(base) < 0) {
TT_FAIL(("reinit"));
exit(1);
}
goto child;
}
event_base_dispatch(base);
if (waitpid(pid, &status, 0) == -1)
tt_abort_perror("waitpid");
TT_BLATHER(("Waitpid okay\n"));
tt_assert(got_sigchld);
tt_int_op(notification_fd_used, ==, 0);
goto end;
}
child:
#endif
for (i = 0; i < NUM_THREADS; ++i)
THREAD_START(threads[i], basic_thread, base);
@ -177,8 +244,15 @@ thread_basic(void *arg)
tt_int_op(count, ==, NUM_THREADS * NUM_ITERATIONS);
EVTHREAD_FREE_LOCK(count_lock, 0);
TT_BLATHER(("notifiations==%d", notification_fd_used));
end:
;
if (notification_event)
event_free(notification_event);
if (sigchld_event)
event_free(sigchld_event);
}
#undef NUM_THREADS
@ -313,10 +387,16 @@ end:
}
#define TEST(name) \
{ #name, thread_##name, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE, \
&basic_setup, NULL }
{ #name, thread_##name, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE, \
&basic_setup, NULL }
struct testcase_t thread_testcases[] = {
TEST(basic),
{ "basic", thread_basic, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE,
&basic_setup, NULL },
#ifndef WIN32
{ "forking", thread_basic, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE,
&basic_setup, (char*)"forking" },
#endif
TEST(conditions_simple),
END_OF_TESTCASES
};