diff --git a/event-internal.h b/event-internal.h index e52c129b..122195b3 100644 --- a/event-internal.h +++ b/event-internal.h @@ -262,6 +262,9 @@ struct event_base { enum event_base_config_flag flags; /* Notify main thread to wake up break, etc. */ + /** True if the base already has a pending notify, and we don't need + * to add any more. */ + int is_notify_pending; /** A socketpair used by some th_notify functions to wake up the main * thread. */ int th_notify_fd[2]; diff --git a/event.c b/event.c index dbf1c2f3..902aed54 100644 --- a/event.c +++ b/event.c @@ -471,7 +471,7 @@ static void notify_base_cbq_callback(struct deferred_cb_queue *cb, void *baseptr) { struct event_base *base = baseptr; - if (!EVBASE_IN_THREAD(base)) + if (EVBASE_NEED_NOTIFY(base)) evthread_notify_base(base); } @@ -742,6 +742,7 @@ event_reinit(struct event_base *base) const struct eventop *evsel; int res = 0; struct event *ev; + int was_notifiable = 0; EVBASE_ACQUIRE_LOCK(base, th_base_lock); @@ -769,6 +770,23 @@ event_reinit(struct event_base *base) EVLIST_ACTIVE); base->sig.ev_signal_added = 0; } + if (base->th_notify_fd[0] != -1) { + /* we cannot call event_del here because the base has + * not been reinitialized yet. */ + was_notifiable = 1; + event_queue_remove(base, &base->th_notify, + EVLIST_INSERTED); + if (base->th_notify.ev_flags & EVLIST_ACTIVE) + event_queue_remove(base, &base->th_notify, + EVLIST_ACTIVE); + base->sig.ev_signal_added = 0; + EVUTIL_CLOSESOCKET(base->th_notify_fd[0]); + if (base->th_notify_fd[1] != -1) + EVUTIL_CLOSESOCKET(base->th_notify_fd[1]); + base->th_notify_fd[0] = -1; + base->th_notify_fd[1] = -1; + event_debug_unassign(&base->th_notify); + } if (base->evsel->dealloc != NULL) base->evsel->dealloc(base); @@ -794,6 +812,9 @@ event_reinit(struct event_base *base) } } + if (was_notifiable && res == 0) + res = evthread_make_base_notifiable(base); + done: EVBASE_RELEASE_LOCK(base, th_base_lock); return (res); @@ -1369,18 +1390,20 @@ event_loopbreak(void) int event_base_loopbreak(struct event_base *event_base) { + int r = 0; if (event_base == NULL) return (-1); EVBASE_ACQUIRE_LOCK(event_base, th_base_lock); event_base->event_break = 1; - EVBASE_RELEASE_LOCK(event_base, th_base_lock); - if (!EVBASE_IN_THREAD(event_base)) { - return evthread_notify_base(event_base); + if (EVBASE_NEED_NOTIFY(event_base)) { + r = evthread_notify_base(event_base); } else { - return (0); + r = (0); } + EVBASE_RELEASE_LOCK(event_base, th_base_lock); + return r; } int @@ -1879,8 +1902,12 @@ evthread_notify_base_eventfd(struct event_base *base) static int evthread_notify_base(struct event_base *base) { + EVENT_BASE_ASSERT_LOCKED(base); if (!base->th_notify_fn) return -1; + if (base->is_notify_pending) + return 0; + base->is_notify_pending = 1; return base->th_notify_fn(base); } @@ -2029,7 +2056,7 @@ event_add_internal(struct event *ev, const struct timeval *tv, } /* if we are not in the right thread, we need to wake up the loop */ - if (res != -1 && notify && !EVBASE_IN_THREAD(base)) + if (res != -1 && notify && EVBASE_NEED_NOTIFY(base)) evthread_notify_base(base); _event_debug_note_add(ev); @@ -2123,7 +2150,7 @@ event_del_internal(struct event *ev) } /* if we are not in the right thread, we need to wake up the loop */ - if (res != -1 && notify && !EVBASE_IN_THREAD(base)) + if (res != -1 && notify && EVBASE_NEED_NOTIFY(base)) evthread_notify_base(base); _event_debug_note_del(ev); @@ -2224,8 +2251,6 @@ event_deferred_cb_schedule(struct deferred_cb_queue *queue, cb->queued = 1; TAILQ_INSERT_TAIL(&queue->deferred_cb_list, cb, cb_next); ++queue->active_count; - /* XXXX Can we get away with doing this only when adding - * the first active deferred_cb to the queue? */ if (queue->notify_fn) queue->notify_fn(queue, queue->notify_arg); } @@ -2571,11 +2596,15 @@ evthread_notify_drain_eventfd(evutil_socket_t fd, short what, void *arg) { ev_uint64_t msg; ev_ssize_t r; + struct event_base *base = arg; r = read(fd, (void*) &msg, sizeof(msg)); if (r<0 && errno != EAGAIN) { event_sock_warn(fd, "Error reading from eventfd"); } + EVBASE_ACQUIRE_LOCK(base, th_base_lock); + base->is_notify_pending = 0; + EVBASE_RELEASE_LOCK(base, th_base_lock); } #endif @@ -2583,6 +2612,7 @@ static void evthread_notify_drain_default(evutil_socket_t fd, short what, void *arg) { unsigned char buf[1024]; + struct event_base *base = arg; #ifdef WIN32 while (recv(fd, (char*)buf, sizeof(buf), 0) > 0) ; @@ -2590,6 +2620,10 @@ evthread_notify_drain_default(evutil_socket_t fd, short what, void *arg) while (read(fd, (char*)buf, sizeof(buf)) > 0) ; #endif + + EVBASE_ACQUIRE_LOCK(base, th_base_lock); + base->is_notify_pending = 0; + EVBASE_RELEASE_LOCK(base, th_base_lock); } int diff --git a/evthread-internal.h b/evthread-internal.h index b768a753..c6a8787e 100644 --- a/evthread-internal.h +++ b/evthread-internal.h @@ -56,12 +56,20 @@ extern int _evthread_lock_debugging_enabled; #define EVTHREAD_GET_ID() \ (_evthread_id_fn ? _evthread_id_fn() : 1) -/** Return true iff we're in the thread that is currently running a given - * event_base's loop. */ +/** Return true iff we're in the thread that is currently (or most recently) + * running a given event_base's loop. Requires lock. */ #define EVBASE_IN_THREAD(base) \ (_evthread_id_fn == NULL || \ (base)->th_owner_id == _evthread_id_fn()) +/** Return true iff we need to notify the base's main thread about changes to + * its state, because it's currently running the main loop in another + * thread. Requires lock. */ +#define EVBASE_NEED_NOTIFY(base) \ + (_evthread_id_fn != NULL && \ + (base)->running_loop && \ + (base)->th_owner_id != _evthread_id_fn()) + /** Allocate a new lock, and store it in lockvar, a void*. Sets lockvar to NULL if locking is not enabled. */ #define EVTHREAD_ALLOC_LOCK(lockvar, locktype) \ @@ -281,6 +289,7 @@ EVLOCK_TRY_LOCK(void *lock) #define EVLOCK_UNLOCK2(lock1,lock2,mode1,mode2) _EVUTIL_NIL_STMT #define EVBASE_IN_THREAD(base) 1 +#define EVBASE_NEED_NOTIFY(base) 0 #define EVBASE_ACQUIRE_LOCK(base, lock) _EVUTIL_NIL_STMT #define EVBASE_RELEASE_LOCK(base, lock) _EVUTIL_NIL_STMT #define EVLOCK_ASSERT_LOCKED(lock) _EVUTIL_NIL_STMT diff --git a/test/regress.c b/test/regress.c index 8ce68e77..a36f6d51 100644 --- a/test/regress.c +++ b/test/regress.c @@ -62,6 +62,7 @@ #include "event2/buffer_compat.h" #include "event2/util.h" #include "event-internal.h" +#include "evthread-internal.h" #include "util-internal.h" #include "log-internal.h" @@ -805,6 +806,9 @@ test_fork(void) setup_test("After fork: "); + tt_assert(current_base); + evthread_make_base_notifiable(current_base); + write(pair[0], TEST1, strlen(TEST1)+1); event_set(&ev, pair[1], EV_READ, simple_read_cb, &ev); @@ -863,6 +867,7 @@ test_fork(void) evsignal_del(&sig_ev); + end: cleanup_test(); } diff --git a/test/regress_thread.c b/test/regress_thread.c index 34cf64b1..88b0e167 100644 --- a/test/regress_thread.c +++ b/test/regress_thread.c @@ -40,12 +40,18 @@ #include #endif #include +#ifdef _EVENT_HAVE_UNISTD_H +#include +#endif + +#include "sys/queue.h" #include "event2/util.h" #include "event2/event.h" #include "event2/event_struct.h" #include "event2/thread.h" #include "evthread-internal.h" +#include "event-internal.h" #include "regress.h" #include "tinytest_macros.h" @@ -131,7 +137,7 @@ basic_thread(void *arg) /* exit the loop only if all threads fired all timeouts */ EVLOCK_LOCK(count_lock, 0); - if (count >= NUM_THREADS * 100) + if (count >= NUM_THREADS * NUM_ITERATIONS) event_base_loopexit(base, NULL); EVLOCK_UNLOCK(count_lock, 0); @@ -141,6 +147,27 @@ basic_thread(void *arg) THREAD_RETURN(); } +static int got_sigchld = 0; +static void +sigchld_cb(evutil_socket_t fd, short event, void *arg) +{ + struct timeval tv; + struct event_base *base = arg; + + got_sigchld++; + tv.tv_usec = 100000; + tv.tv_sec = 0; + event_base_loopexit(base, &tv); +} + + +static int notification_fd_used = 0; +static void +notify_fd_cb(evutil_socket_t fd, short event, void *arg) +{ + ++notification_fd_used; +} + static void thread_basic(void *arg) { @@ -151,6 +178,10 @@ thread_basic(void *arg) struct basic_test_data *data = arg; struct event_base *base = data->base; + int forking = data->setup_data && !strcmp(data->setup_data, "forking"); + struct event *notification_event = NULL; + struct event *sigchld_event = NULL; + EVTHREAD_ALLOC_LOCK(count_lock, 0); tt_assert(count_lock); @@ -159,6 +190,42 @@ thread_basic(void *arg) tt_abort_msg("Couldn't make base notifiable!"); } +#ifndef WIN32 + if (forking) { + pid_t pid; + int status; + sigchld_event = evsignal_new(base, SIGCHLD, sigchld_cb, base); + /* This piggybacks on the th_notify_fd weirdly, and looks + * inside libevent internals. Not a good idea in non-testing + * code! */ + notification_event = event_new(base, + base->th_notify_fd[0], EV_READ|EV_PERSIST, notify_fd_cb, + NULL); + event_add(sigchld_event, NULL); + event_add(notification_event, NULL); + + if ((pid = fork()) == 0) { + if (event_reinit(base) < 0) { + TT_FAIL(("reinit")); + exit(1); + } + goto child; + } + + event_base_dispatch(base); + + if (waitpid(pid, &status, 0) == -1) + tt_abort_perror("waitpid"); + TT_BLATHER(("Waitpid okay\n")); + + tt_assert(got_sigchld); + tt_int_op(notification_fd_used, ==, 0); + + goto end; + } + +child: +#endif for (i = 0; i < NUM_THREADS; ++i) THREAD_START(threads[i], basic_thread, base); @@ -177,8 +244,15 @@ thread_basic(void *arg) tt_int_op(count, ==, NUM_THREADS * NUM_ITERATIONS); EVTHREAD_FREE_LOCK(count_lock, 0); + + TT_BLATHER(("notifiations==%d", notification_fd_used)); + end: - ; + + if (notification_event) + event_free(notification_event); + if (sigchld_event) + event_free(sigchld_event); } #undef NUM_THREADS @@ -313,10 +387,16 @@ end: } #define TEST(name) \ - { #name, thread_##name, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE, \ - &basic_setup, NULL } + { #name, thread_##name, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE, \ + &basic_setup, NULL } + struct testcase_t thread_testcases[] = { - TEST(basic), + { "basic", thread_basic, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE, + &basic_setup, NULL }, +#ifndef WIN32 + { "forking", thread_basic, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE, + &basic_setup, (char*)"forking" }, +#endif TEST(conditions_simple), END_OF_TESTCASES };