diff --git a/event-internal.h b/event-internal.h index d1620c5e..e52c129b 100644 --- a/event-internal.h +++ b/event-internal.h @@ -216,9 +216,6 @@ struct event_base { /** The total size of common_timeout_queues. */ int n_common_timeouts_allocated; - /** The event whose callback is executing right now */ - struct event *current_event; - /** List of defered_cb that are active. We run these after the active * events. */ struct deferred_cb_queue defer_queue; @@ -247,9 +244,13 @@ struct event_base { unsigned long th_owner_id; /** A lock to prevent conflicting accesses to this event_base */ void *th_base_lock; - /** A lock to prevent event_del from deleting an event while its - * callback is executing. */ - void *current_event_lock; + /** The event whose callback is executing right now */ + struct event *current_event; + /** A condition that gets signalled when we're done processing an + * event with waiters on it. */ + void *current_event_cond; + /** Number of threads blocking on current_event_cond. */ + int current_event_waiters; #endif #ifdef WIN32 diff --git a/event.c b/event.c index ce0f8b5c..dc1079f2 100644 --- a/event.c +++ b/event.c @@ -604,8 +604,7 @@ event_base_new_with_config(const struct event_config *cfg) EVTHREAD_ALLOC_LOCK(base->th_base_lock, EVTHREAD_LOCKTYPE_RECURSIVE); base->defer_queue.lock = base->th_base_lock; - EVTHREAD_ALLOC_LOCK(base->current_event_lock, - EVTHREAD_LOCKTYPE_RECURSIVE); + EVTHREAD_ALLOC_COND(base->current_event_cond); r = evthread_make_base_notifiable(base); if (r<0) { event_base_free(base); @@ -731,8 +730,7 @@ event_base_free(struct event_base *base) event_changelist_freemem(&base->changelist); EVTHREAD_FREE_LOCK(base->th_base_lock, EVTHREAD_LOCKTYPE_RECURSIVE); - EVTHREAD_FREE_LOCK(base->current_event_lock, - EVTHREAD_LOCKTYPE_RECURSIVE); + EVTHREAD_FREE_COND(base->current_event_cond); mm_free(base); } @@ -1217,9 +1215,10 @@ event_process_active_single_queue(struct event_base *base, ev->ev_res & EV_WRITE ? "EV_WRITE " : " ", ev->ev_callback)); +#ifndef _EVENT_DISABLE_THREAD_SUPPORT base->current_event = ev; - - EVBASE_ACQUIRE_LOCK(base, current_event_lock); + base->current_event_waiters = 0; +#endif switch (ev->ev_closure) { case EV_CLOSURE_SIGNAL: @@ -1236,9 +1235,14 @@ event_process_active_single_queue(struct event_base *base, break; } - EVBASE_RELEASE_LOCK(base, current_event_lock); EVBASE_ACQUIRE_LOCK(base, th_base_lock); +#ifndef _EVENT_DISABLE_THREAD_SUPPORT base->current_event = NULL; + if (base->current_event_waiters) { + base->current_event_waiters = 0; + EVTHREAD_COND_BROADCAST(base->current_event_cond); + } +#endif if (base->event_break) return -1; @@ -1879,7 +1883,6 @@ event_add_internal(struct event *ev, const struct timeval *tv, struct event_base *base = ev->ev_base; int res = 0; int notify = 0; - int need_cur_lock; EVENT_BASE_ASSERT_LOCKED(base); _event_debug_assert_is_setup(ev); @@ -1908,10 +1911,13 @@ event_add_internal(struct event *ev, const struct timeval *tv, * callback, and we are not the main thread, then we want to wait * until the callback is done before we mess with the event, or else * we can race on ev_ncalls and ev_pncalls below. */ - need_cur_lock = (base->current_event == ev) && - (ev->ev_events & EV_SIGNAL); - if (need_cur_lock) - EVBASE_ACQUIRE_LOCK(base, current_event_lock); +#ifndef _EVENT_DISABLE_THREAD_SUPPORT + if (base->current_event == ev && (ev->ev_events & EV_SIGNAL) + && !EVBASE_IN_THREAD(base)) { + ++base->current_event_waiters; + EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); + } +#endif if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) && !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) { @@ -2016,9 +2022,6 @@ event_add_internal(struct event *ev, const struct timeval *tv, _event_debug_note_add(ev); - if (need_cur_lock) - EVBASE_RELEASE_LOCK(base, current_event_lock); - return (res); } @@ -2042,7 +2045,6 @@ event_del_internal(struct event *ev) { struct event_base *base; int res = 0, notify = 0; - int need_cur_lock; event_debug(("event_del: %p, callback %p", ev, ev->ev_callback)); @@ -2059,9 +2061,12 @@ event_del_internal(struct event *ev) * when this function returns, it will be safe to free the * user-supplied argument. */ base = ev->ev_base; - need_cur_lock = (base->current_event == ev); - if (need_cur_lock) - EVBASE_ACQUIRE_LOCK(base, current_event_lock); +#ifndef _EVENT_DISABLE_THREAD_SUPPORT + if (base->current_event == ev && !EVBASE_IN_THREAD(base)) { + ++base->current_event_waiters; + EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); + } +#endif EVUTIL_ASSERT(!(ev->ev_flags & ~EVLIST_ALL)); @@ -2104,9 +2109,6 @@ event_del_internal(struct event *ev) if (res != -1 && notify && !EVBASE_IN_THREAD(base)) evthread_notify_base(base); - if (need_cur_lock) - EVBASE_RELEASE_LOCK(base, current_event_lock); - _event_debug_note_del(ev); return (res); @@ -2129,7 +2131,6 @@ void event_active_nolock(struct event *ev, int res, short ncalls) { struct event_base *base; - int need_cur_lock = 0; /* We get different kinds of events, add them together */ if (ev->ev_flags & EVLIST_ACTIVE) { @@ -2144,16 +2145,17 @@ event_active_nolock(struct event *ev, int res, short ncalls) ev->ev_res = res; if (ev->ev_events & EV_SIGNAL) { - need_cur_lock = (base->current_event == ev); - if (need_cur_lock) - EVBASE_ACQUIRE_LOCK(base, current_event_lock); +#ifndef _EVENT_DISABLE_THREAD_SUPPORT + if (base->current_event == ev && !EVBASE_IN_THREAD(base)) { + ++base->current_event_waiters; + EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); + } +#endif ev->ev_ncalls = ncalls; ev->ev_pncalls = NULL; } event_queue_insert(base, ev, EVLIST_ACTIVE); - if (need_cur_lock) - EVBASE_RELEASE_LOCK(base, current_event_lock); } void