From 495ed66705d5790de031f9591b47afcbe4fc1156 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Wed, 1 Sep 2010 16:36:30 -0400 Subject: [PATCH 1/4] Close th_notify_fds and open a new pair on reinit After a fork, you want subthreads to wake up the event_base in the child process, not to have the child process and the main process fight over who wakes up whom. Related to a problem found by Nicholas Marriott while debugging 3048812. --- event.c | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/event.c b/event.c index dbf1c2f3..ee741e7b 100644 --- a/event.c +++ b/event.c @@ -742,6 +742,7 @@ event_reinit(struct event_base *base) const struct eventop *evsel; int res = 0; struct event *ev; + int was_notifiable = 0; EVBASE_ACQUIRE_LOCK(base, th_base_lock); @@ -769,6 +770,23 @@ event_reinit(struct event_base *base) EVLIST_ACTIVE); base->sig.ev_signal_added = 0; } + if (base->th_notify_fd[0] != -1) { + /* we cannot call event_del here because the base has + * not been reinitialized yet. */ + was_notifiable = 1; + event_queue_remove(base, &base->th_notify, + EVLIST_INSERTED); + if (base->th_notify.ev_flags & EVLIST_ACTIVE) + event_queue_remove(base, &base->th_notify, + EVLIST_ACTIVE); + base->sig.ev_signal_added = 0; + EVUTIL_CLOSESOCKET(base->th_notify_fd[0]); + if (base->th_notify_fd[1] != -1) + EVUTIL_CLOSESOCKET(base->th_notify_fd[1]); + base->th_notify_fd[0] = -1; + base->th_notify_fd[1] = -1; + event_debug_unassign(&base->th_notify); + } if (base->evsel->dealloc != NULL) base->evsel->dealloc(base); @@ -794,6 +812,9 @@ event_reinit(struct event_base *base) } } + if (was_notifiable && res == 0) + res = evthread_make_base_notifiable(base); + done: EVBASE_RELEASE_LOCK(base, th_base_lock); return (res); From c7a06bfaee7f96327779972214e8b744170531ff Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Wed, 8 Sep 2010 13:02:58 -0400 Subject: [PATCH 2/4] Avoid needlessly calling evthread_notify_base() when the loop is not running Also make sure that we always hold the base lock when calling evthread_notify_base. --- event.c | 16 +++++++++------- evthread-internal.h | 13 +++++++++++-- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/event.c b/event.c index ee741e7b..0c8238d7 100644 --- a/event.c +++ b/event.c @@ -471,7 +471,7 @@ static void notify_base_cbq_callback(struct deferred_cb_queue *cb, void *baseptr) { struct event_base *base = baseptr; - if (!EVBASE_IN_THREAD(base)) + if (EVBASE_NEED_NOTIFY(base)) evthread_notify_base(base); } @@ -1390,18 +1390,20 @@ event_loopbreak(void) int event_base_loopbreak(struct event_base *event_base) { + int r = 0; if (event_base == NULL) return (-1); EVBASE_ACQUIRE_LOCK(event_base, th_base_lock); event_base->event_break = 1; - EVBASE_RELEASE_LOCK(event_base, th_base_lock); - if (!EVBASE_IN_THREAD(event_base)) { - return evthread_notify_base(event_base); + if (EVBASE_NEED_NOTIFY(event_base)) { + r = evthread_notify_base(event_base); } else { - return (0); + r = (0); } + EVBASE_RELEASE_LOCK(event_base, th_base_lock); + return r; } int @@ -2050,7 +2052,7 @@ event_add_internal(struct event *ev, const struct timeval *tv, } /* if we are not in the right thread, we need to wake up the loop */ - if (res != -1 && notify && !EVBASE_IN_THREAD(base)) + if (res != -1 && notify && EVBASE_NEED_NOTIFY(base)) evthread_notify_base(base); _event_debug_note_add(ev); @@ -2144,7 +2146,7 @@ event_del_internal(struct event *ev) } /* if we are not in the right thread, we need to wake up the loop */ - if (res != -1 && notify && !EVBASE_IN_THREAD(base)) + if (res != -1 && notify && EVBASE_NEED_NOTIFY(base)) evthread_notify_base(base); _event_debug_note_del(ev); diff --git a/evthread-internal.h b/evthread-internal.h index d710939a..b7286ded 100644 --- a/evthread-internal.h +++ b/evthread-internal.h @@ -48,12 +48,20 @@ extern int _evthread_lock_debugging_enabled; #define EVTHREAD_GET_ID() \ (_evthread_id_fn ? _evthread_id_fn() : 1) -/** Return true iff we're in the thread that is currently running a given - * event_base's loop. */ +/** Return true iff we're in the thread that is currently (or most recently) + * running a given event_base's loop. Requires lock. */ #define EVBASE_IN_THREAD(base) \ (_evthread_id_fn == NULL || \ (base)->th_owner_id == _evthread_id_fn()) +/** Return true iff we need to notify the base's main thread about changes to + * its state, because it's currently running the main loop in another + * thread. Requires lock. */ +#define EVBASE_NEED_NOTIFY(base) \ + (_evthread_id_fn != NULL && \ + (base)->running_loop && \ + (base)->th_owner_id != _evthread_id_fn()) + /** Allocate a new lock, and store it in lockvar, a void*. Sets lockvar to NULL if locking is not enabled. */ #define EVTHREAD_ALLOC_LOCK(lockvar, locktype) \ @@ -196,6 +204,7 @@ EVLOCK_TRY_LOCK(void *lock) #define EVLOCK_UNLOCK2(lock1,lock2,mode1,mode2) _EVUTIL_NIL_STMT #define EVBASE_IN_THREAD(base) 1 +#define EVBASE_NEED_NOTIFY(base) 0 #define EVBASE_ACQUIRE_LOCK(base, lock) _EVUTIL_NIL_STMT #define EVBASE_RELEASE_LOCK(base, lock) _EVUTIL_NIL_STMT #define EVLOCK_ASSERT_LOCKED(lock) _EVUTIL_NIL_STMT From 4632b78e01a312ae716bc17322c462be53182eca Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Wed, 8 Sep 2010 13:22:55 -0400 Subject: [PATCH 3/4] Minimize calls to base_notify implementation functions, thereby avoiding needless syscalls The trick here is that if we already told the base to wake up, and it hasn't woken up yet, we don't need to tell it to wake up again. This should help lots with inherently multithreaded code like IOCP. --- event-internal.h | 3 +++ event.c | 15 +++++++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/event-internal.h b/event-internal.h index e52c129b..122195b3 100644 --- a/event-internal.h +++ b/event-internal.h @@ -262,6 +262,9 @@ struct event_base { enum event_base_config_flag flags; /* Notify main thread to wake up break, etc. */ + /** True if the base already has a pending notify, and we don't need + * to add any more. */ + int is_notify_pending; /** A socketpair used by some th_notify functions to wake up the main * thread. */ int th_notify_fd[2]; diff --git a/event.c b/event.c index 0c8238d7..902aed54 100644 --- a/event.c +++ b/event.c @@ -1902,8 +1902,12 @@ evthread_notify_base_eventfd(struct event_base *base) static int evthread_notify_base(struct event_base *base) { + EVENT_BASE_ASSERT_LOCKED(base); if (!base->th_notify_fn) return -1; + if (base->is_notify_pending) + return 0; + base->is_notify_pending = 1; return base->th_notify_fn(base); } @@ -2247,8 +2251,6 @@ event_deferred_cb_schedule(struct deferred_cb_queue *queue, cb->queued = 1; TAILQ_INSERT_TAIL(&queue->deferred_cb_list, cb, cb_next); ++queue->active_count; - /* XXXX Can we get away with doing this only when adding - * the first active deferred_cb to the queue? */ if (queue->notify_fn) queue->notify_fn(queue, queue->notify_arg); } @@ -2594,11 +2596,15 @@ evthread_notify_drain_eventfd(evutil_socket_t fd, short what, void *arg) { ev_uint64_t msg; ev_ssize_t r; + struct event_base *base = arg; r = read(fd, (void*) &msg, sizeof(msg)); if (r<0 && errno != EAGAIN) { event_sock_warn(fd, "Error reading from eventfd"); } + EVBASE_ACQUIRE_LOCK(base, th_base_lock); + base->is_notify_pending = 0; + EVBASE_RELEASE_LOCK(base, th_base_lock); } #endif @@ -2606,6 +2612,7 @@ static void evthread_notify_drain_default(evutil_socket_t fd, short what, void *arg) { unsigned char buf[1024]; + struct event_base *base = arg; #ifdef WIN32 while (recv(fd, (char*)buf, sizeof(buf), 0) > 0) ; @@ -2613,6 +2620,10 @@ evthread_notify_drain_default(evutil_socket_t fd, short what, void *arg) while (read(fd, (char*)buf, sizeof(buf)) > 0) ; #endif + + EVBASE_ACQUIRE_LOCK(base, th_base_lock); + base->is_notify_pending = 0; + EVBASE_RELEASE_LOCK(base, th_base_lock); } int From ce85280beb56c4b1c7e69c542878a8f1fe647f28 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Wed, 8 Sep 2010 13:29:06 -0400 Subject: [PATCH 4/4] Improve testing of when thread-notification occurs --- test/regress.c | 5 +++ test/regress_thread.c | 90 ++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 90 insertions(+), 5 deletions(-) diff --git a/test/regress.c b/test/regress.c index 8ce68e77..a36f6d51 100644 --- a/test/regress.c +++ b/test/regress.c @@ -62,6 +62,7 @@ #include "event2/buffer_compat.h" #include "event2/util.h" #include "event-internal.h" +#include "evthread-internal.h" #include "util-internal.h" #include "log-internal.h" @@ -805,6 +806,9 @@ test_fork(void) setup_test("After fork: "); + tt_assert(current_base); + evthread_make_base_notifiable(current_base); + write(pair[0], TEST1, strlen(TEST1)+1); event_set(&ev, pair[1], EV_READ, simple_read_cb, &ev); @@ -863,6 +867,7 @@ test_fork(void) evsignal_del(&sig_ev); + end: cleanup_test(); } diff --git a/test/regress_thread.c b/test/regress_thread.c index 34cf64b1..88b0e167 100644 --- a/test/regress_thread.c +++ b/test/regress_thread.c @@ -40,12 +40,18 @@ #include #endif #include +#ifdef _EVENT_HAVE_UNISTD_H +#include +#endif + +#include "sys/queue.h" #include "event2/util.h" #include "event2/event.h" #include "event2/event_struct.h" #include "event2/thread.h" #include "evthread-internal.h" +#include "event-internal.h" #include "regress.h" #include "tinytest_macros.h" @@ -131,7 +137,7 @@ basic_thread(void *arg) /* exit the loop only if all threads fired all timeouts */ EVLOCK_LOCK(count_lock, 0); - if (count >= NUM_THREADS * 100) + if (count >= NUM_THREADS * NUM_ITERATIONS) event_base_loopexit(base, NULL); EVLOCK_UNLOCK(count_lock, 0); @@ -141,6 +147,27 @@ basic_thread(void *arg) THREAD_RETURN(); } +static int got_sigchld = 0; +static void +sigchld_cb(evutil_socket_t fd, short event, void *arg) +{ + struct timeval tv; + struct event_base *base = arg; + + got_sigchld++; + tv.tv_usec = 100000; + tv.tv_sec = 0; + event_base_loopexit(base, &tv); +} + + +static int notification_fd_used = 0; +static void +notify_fd_cb(evutil_socket_t fd, short event, void *arg) +{ + ++notification_fd_used; +} + static void thread_basic(void *arg) { @@ -151,6 +178,10 @@ thread_basic(void *arg) struct basic_test_data *data = arg; struct event_base *base = data->base; + int forking = data->setup_data && !strcmp(data->setup_data, "forking"); + struct event *notification_event = NULL; + struct event *sigchld_event = NULL; + EVTHREAD_ALLOC_LOCK(count_lock, 0); tt_assert(count_lock); @@ -159,6 +190,42 @@ thread_basic(void *arg) tt_abort_msg("Couldn't make base notifiable!"); } +#ifndef WIN32 + if (forking) { + pid_t pid; + int status; + sigchld_event = evsignal_new(base, SIGCHLD, sigchld_cb, base); + /* This piggybacks on the th_notify_fd weirdly, and looks + * inside libevent internals. Not a good idea in non-testing + * code! */ + notification_event = event_new(base, + base->th_notify_fd[0], EV_READ|EV_PERSIST, notify_fd_cb, + NULL); + event_add(sigchld_event, NULL); + event_add(notification_event, NULL); + + if ((pid = fork()) == 0) { + if (event_reinit(base) < 0) { + TT_FAIL(("reinit")); + exit(1); + } + goto child; + } + + event_base_dispatch(base); + + if (waitpid(pid, &status, 0) == -1) + tt_abort_perror("waitpid"); + TT_BLATHER(("Waitpid okay\n")); + + tt_assert(got_sigchld); + tt_int_op(notification_fd_used, ==, 0); + + goto end; + } + +child: +#endif for (i = 0; i < NUM_THREADS; ++i) THREAD_START(threads[i], basic_thread, base); @@ -177,8 +244,15 @@ thread_basic(void *arg) tt_int_op(count, ==, NUM_THREADS * NUM_ITERATIONS); EVTHREAD_FREE_LOCK(count_lock, 0); + + TT_BLATHER(("notifiations==%d", notification_fd_used)); + end: - ; + + if (notification_event) + event_free(notification_event); + if (sigchld_event) + event_free(sigchld_event); } #undef NUM_THREADS @@ -313,10 +387,16 @@ end: } #define TEST(name) \ - { #name, thread_##name, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE, \ - &basic_setup, NULL } + { #name, thread_##name, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE, \ + &basic_setup, NULL } + struct testcase_t thread_testcases[] = { - TEST(basic), + { "basic", thread_basic, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE, + &basic_setup, NULL }, +#ifndef WIN32 + { "forking", thread_basic, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE, + &basic_setup, (char*)"forking" }, +#endif TEST(conditions_simple), END_OF_TESTCASES };