From cba59e53253bb396f192e40a002c6dd9835df51c Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Thu, 5 Apr 2012 12:38:18 -0400 Subject: [PATCH 01/12] Refactor the callback part of an event into its own event_callback type This shouldn't have any visible effect, but it's necessary or advisible for a few changes and cleanups I would like to make, including: * Replacing the deferred queue with a type that works more as if it were an event. * Introducing a useful "activate this on the next round through the event loop" state for events and deferreds. * Adding an "on until further notice" status for events, to allow a saner win32-hybrid approach. * Eventually, making all user callbacks first-class things with event-like semantics. --- event-internal.h | 26 +++-- event.c | 199 +++++++++++++++++++++------------- include/event2/event_struct.h | 19 ++-- 3 files changed, 153 insertions(+), 91 deletions(-) diff --git a/event-internal.h b/event-internal.h index 60b58c6c..e4025cf6 100644 --- a/event-internal.h +++ b/event-internal.h @@ -53,10 +53,16 @@ extern "C" { #define ev_ncalls ev_.ev_signal.ev_ncalls #define ev_pncalls ev_.ev_signal.ev_pncalls -/* Possible values for ev_closure in struct event. */ -#define EV_CLOSURE_NONE 0 -#define EV_CLOSURE_SIGNAL 1 -#define EV_CLOSURE_PERSIST 2 +#define ev_pri ev_evcallback.evcb_pri +#define ev_flags ev_evcallback.evcb_flags +#define ev_closure ev_evcallback.evcb_closure +#define ev_callback ev_evcallback.evcb_callback +#define ev_arg ev_evcallback.evcb_arg + +/* Possible values for evcb_closure in struct event_callback */ +#define EV_CLOSURE_EVENT 0 +#define EV_CLOSURE_EVENT_SIGNAL 1 +#define EV_CLOSURE_EVENT_PERSIST 2 /** Structure to define the backend of a given event_base. */ struct eventop { @@ -170,6 +176,8 @@ extern int event_debug_mode_on_; #define EVENT_DEBUG_MODE_IS_ON() (0) #endif +TAILQ_HEAD(evcallback_list, event_callback); + struct event_base { /** Function pointers and other data to describe this event_base's * backend. */ @@ -210,11 +218,11 @@ struct event_base { int running_loop; /* Active event management. */ - /** An array of nactivequeues queues for active events (ones that - * have triggered, and whose callbacks need to be called). Low + /** An array of nactivequeues queues for active event_callbacks (ones + * that have triggered, and whose callbacks need to be called). Low * priority numbers are more important, and stall higher ones. */ - struct event_list *activequeues; + struct evcallback_list *activequeues; /** The length of the activequeues array */ int nactivequeues; @@ -266,7 +274,7 @@ struct event_base { int current_event_waiters; #endif /** The event whose callback is executing right now */ - struct event *current_event; + struct event_callback *current_event; #ifdef _WIN32 /** IOCP support structure, if IOCP is enabled. */ @@ -355,6 +363,8 @@ int evsig_restore_handler_(struct event_base *base, int evsignal); void event_active_nolock_(struct event *ev, int res, short count); +void event_callback_activate_nolock_(struct event_base *, struct event_callback *); + /* FIXME document. */ void event_base_add_virtual_(struct event_base *base); diff --git a/event.c b/event.c index c5c118cc..3307db1b 100644 --- a/event.c +++ b/event.c @@ -135,10 +135,10 @@ static inline int event_add_internal(struct event *ev, const struct timeval *tv, int tv_is_absolute); static inline int event_del_internal(struct event *ev); -static void event_queue_insert_active(struct event_base *, struct event *); +static void event_queue_insert_active(struct event_base *, struct event_callback *); static void event_queue_insert_timeout(struct event_base *, struct event *); static void event_queue_insert_inserted(struct event_base *, struct event *); -static void event_queue_remove_active(struct event_base *, struct event *); +static void event_queue_remove_active(struct event_base *, struct event_callback *); static void event_queue_remove_timeout(struct event_base *, struct event *); static void event_queue_remove_inserted(struct event_base *, struct event *); #ifdef USE_REINSERT_TIMEOUT @@ -424,6 +424,19 @@ event_base_update_cache_time(struct event_base *base) return 0; } +static inline struct event * +event_callback_to_event(struct event_callback *evcb) +{ + EVUTIL_ASSERT((evcb->evcb_flags & EVLIST_INIT)); + return EVUTIL_UPCAST(evcb, struct event, ev_evcallback); +} + +static inline struct event_callback * +event_to_event_callback(struct event *ev) +{ + return &ev->ev_evcallback; +} + struct event_base * event_init(void) { @@ -777,13 +790,19 @@ event_base_free(struct event_base *base) mm_free(base->common_timeout_queues); for (i = 0; i < base->nactivequeues; ++i) { - for (ev = TAILQ_FIRST(&base->activequeues[i]); ev; ) { - struct event *next = TAILQ_NEXT(ev, ev_active_next); - if (!(ev->ev_flags & EVLIST_INTERNAL)) { - event_del(ev); - ++n_deleted; + struct event_callback *evcb, *next; + for (evcb = TAILQ_FIRST(&base->activequeues[i]); evcb; ) { + next = TAILQ_NEXT(evcb, evcb_active_next); + if (evcb->evcb_flags & EVLIST_INIT) { + ev = event_callback_to_event(evcb); + if (!(ev->ev_flags & EVLIST_INTERNAL)) { + event_del(ev); + ++n_deleted; + } + } else { + event_queue_remove_active(base, evcb); } - ev = next; + evcb = next; } } @@ -1093,8 +1112,8 @@ event_base_priority_init(struct event_base *base, int npriorities) } /* Allocate our priority queues */ - base->activequeues = (struct event_list *) - mm_calloc(npriorities, sizeof(struct event_list)); + base->activequeues = (struct evcallback_list *) + mm_calloc(npriorities, sizeof(struct evcallback_list)); if (base->activequeues == NULL) { event_warn("%s: calloc", __func__); goto err; @@ -1393,51 +1412,63 @@ event_persist_closure(struct event_base *base, struct event *ev) releasing the lock as we go. This function requires that the lock be held when it's invoked. Returns -1 if we get a signal or an event_break that means we should stop processing any active events now. Otherwise returns - the number of non-internal events that we processed. + the number of non-internal event_callbacks that we processed. */ static int event_process_active_single_queue(struct event_base *base, - struct event_list *activeq, + struct evcallback_list *activeq, int max_to_process, const struct timeval *endtime) { - struct event *ev; + struct event_callback *evcb; int count = 0; EVUTIL_ASSERT(activeq != NULL); - for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) { - if (ev->ev_events & EV_PERSIST) - event_queue_remove_active(base, ev); - else - event_del_internal(ev); - if (!(ev->ev_flags & EVLIST_INTERNAL)) + for (evcb = TAILQ_FIRST(activeq); evcb; evcb = TAILQ_FIRST(activeq)) { + struct event *ev=NULL; + if (evcb->evcb_flags & EVLIST_INIT) { + ev = event_callback_to_event(evcb); + + if (ev->ev_events & EV_PERSIST) + event_queue_remove_active(base, evcb); + else + event_del_internal(ev); + event_debug(( + "event_process_active: event: %p, %s%scall %p", + ev, + ev->ev_res & EV_READ ? "EV_READ " : " ", + ev->ev_res & EV_WRITE ? "EV_WRITE " : " ", + ev->ev_callback)); + } else { + event_queue_remove_active(base, evcb); + event_debug(("event_process_active: event_callback %p, " + "closure %d, call %p", + evcb, evcb->evcb_closure, evcb->evcb_callback)); + } + + if (!(evcb->evcb_flags & EVLIST_INTERNAL)) ++count; - event_debug(( - "event_process_active: event: %p, %s%scall %p", - ev, - ev->ev_res & EV_READ ? "EV_READ " : " ", - ev->ev_res & EV_WRITE ? "EV_WRITE " : " ", - ev->ev_callback)); - base->current_event = ev; + base->current_event = evcb; #ifndef EVENT__DISABLE_THREAD_SUPPORT base->current_event_waiters = 0; #endif - switch (ev->ev_closure) { - case EV_CLOSURE_SIGNAL: + switch (evcb->evcb_closure) { + case EV_CLOSURE_EVENT_SIGNAL: event_signal_closure(base, ev); break; - case EV_CLOSURE_PERSIST: + case EV_CLOSURE_EVENT_PERSIST: event_persist_closure(base, ev); break; - default: - case EV_CLOSURE_NONE: + case EV_CLOSURE_EVENT: EVBASE_RELEASE_LOCK(base, th_base_lock); (*ev->ev_callback)( ev->ev_fd, ev->ev_res, ev->ev_arg); break; + default: + EVUTIL_ASSERT(0); } EVBASE_ACQUIRE_LOCK(base, th_base_lock); @@ -1517,7 +1548,7 @@ static int event_process_active(struct event_base *base) { /* Caller must hold th_base_lock */ - struct event_list *activeq = NULL; + struct evcallback_list *activeq = NULL; int i, c = 0; const struct timeval *endtime; struct timeval tv; @@ -1865,13 +1896,13 @@ event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, shor "EV_READ or EV_WRITE", __func__); return -1; } - ev->ev_closure = EV_CLOSURE_SIGNAL; + ev->ev_closure = EV_CLOSURE_EVENT_SIGNAL; } else { if (events & EV_PERSIST) { evutil_timerclear(&ev->ev_io_timeout); - ev->ev_closure = EV_CLOSURE_PERSIST; + ev->ev_closure = EV_CLOSURE_EVENT_PERSIST; } else { - ev->ev_closure = EV_CLOSURE_NONE; + ev->ev_closure = EV_CLOSURE_EVENT; } } @@ -1922,8 +1953,11 @@ event_base_get_running_event(struct event_base *base) { struct event *ev = NULL; EVBASE_ACQUIRE_LOCK(base, th_base_lock); - if (EVBASE_IN_THREAD(base)) - ev = base->current_event; + if (EVBASE_IN_THREAD(base)) { + struct event_callback *evcb = base->current_event; + if (evcb->evcb_flags & EVLIST_INIT) + ev = event_callback_to_event(evcb); + } EVBASE_RELEASE_LOCK(base, th_base_lock); return ev; } @@ -2192,7 +2226,8 @@ event_add_internal(struct event *ev, const struct timeval *tv, * until the callback is done before we mess with the event, or else * we can race on ev_ncalls and ev_pncalls below. */ #ifndef EVENT__DISABLE_THREAD_SUPPORT - if (base->current_event == ev && (ev->ev_events & EV_SIGNAL) + if (base->current_event == event_to_event_callback(ev) && + (ev->ev_events & EV_SIGNAL) && !EVBASE_IN_THREAD(base)) { ++base->current_event_waiters; EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); @@ -2232,7 +2267,7 @@ event_add_internal(struct event *ev, const struct timeval *tv, * * If tv_is_absolute, this was already set. */ - if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute) + if (ev->ev_closure == EV_CLOSURE_EVENT_PERSIST && !tv_is_absolute) ev->ev_io_timeout = *tv; #ifndef USE_REINSERT_TIMEOUT @@ -2256,7 +2291,7 @@ event_add_internal(struct event *ev, const struct timeval *tv, } } - event_queue_remove_active(base, ev); + event_queue_remove_active(base, event_to_event_callback(ev)); } gettime(base, &now); @@ -2356,7 +2391,8 @@ event_del_internal(struct event *ev) * user-supplied argument. */ base = ev->ev_base; #ifndef EVENT__DISABLE_THREAD_SUPPORT - if (base->current_event == ev && !EVBASE_IN_THREAD(base)) { + if (base->current_event == event_to_event_callback(ev) && + !EVBASE_IN_THREAD(base)) { ++base->current_event_waiters; EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); } @@ -2384,7 +2420,7 @@ event_del_internal(struct event *ev) } if (ev->ev_flags & EVLIST_ACTIVE) - event_queue_remove_active(base, ev); + event_queue_remove_active(base, event_to_event_callback(ev)); if (ev->ev_flags & EVLIST_INSERTED) { event_queue_remove_inserted(base, ev); @@ -2452,7 +2488,8 @@ event_active_nolock_(struct event *ev, int res, short ncalls) if (ev->ev_events & EV_SIGNAL) { #ifndef EVENT__DISABLE_THREAD_SUPPORT - if (base->current_event == ev && !EVBASE_IN_THREAD(base)) { + if (base->current_event == event_to_event_callback(ev) && + !EVBASE_IN_THREAD(base)) { ++base->current_event_waiters; EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); } @@ -2461,7 +2498,14 @@ event_active_nolock_(struct event *ev, int res, short ncalls) ev->ev_pncalls = NULL; } - event_queue_insert_active(base, ev); + event_callback_activate_nolock_(base, event_to_event_callback(ev)); +} + +void +event_callback_activate_nolock_(struct event_base *base, + struct event_callback *evcb) +{ + event_queue_insert_active(base, evcb); if (EVBASE_NEED_NOTIFY(base)) evthread_notify_base(base); @@ -2585,13 +2629,13 @@ timeout_process(struct event_base *base) #error "Mismatch for value of EVLIST_INTERNAL" #endif /* These are a fancy way to spell - if (~ev->ev_flags & EVLIST_INTERNAL) + if (flags & EVLIST_INTERNAL) base->event_count--/++; */ -#define DECR_EVENT_COUNT(base,ev) \ - ((base)->event_count -= (~((ev)->ev_flags >> 4) & 1)) -#define INCR_EVENT_COUNT(base, ev) \ - ((base)->event_count += (~((ev)->ev_flags >> 4) & 1)) +#define DECR_EVENT_COUNT(base,flags) \ + ((base)->event_count -= (~((flags) >> 4) & 1)) +#define INCR_EVENT_COUNT(base,flags) \ + ((base)->event_count += (~((flags) >> 4) & 1)) static void event_queue_remove_inserted(struct event_base *base, struct event *ev) @@ -2602,23 +2646,24 @@ event_queue_remove_inserted(struct event_base *base, struct event *ev) ev, ev->ev_fd, EVLIST_INSERTED); return; } - DECR_EVENT_COUNT(base, ev); + DECR_EVENT_COUNT(base, ev->ev_flags); ev->ev_flags &= ~EVLIST_INSERTED; } static void -event_queue_remove_active(struct event_base *base, struct event *ev) +event_queue_remove_active(struct event_base *base, struct event_callback *evcb) { EVENT_BASE_ASSERT_LOCKED(base); - if (EVUTIL_FAILURE_CHECK(!(ev->ev_flags & EVLIST_ACTIVE))) { - event_errx(1, "%s: %p(fd %d) not on queue %x", __func__, - ev, ev->ev_fd, EVLIST_ACTIVE); + if (EVUTIL_FAILURE_CHECK(!(evcb->evcb_flags & EVLIST_ACTIVE))) { + event_errx(1, "%s: %p not on queue %x", __func__, + evcb, EVLIST_ACTIVE); return; } - DECR_EVENT_COUNT(base, ev); - ev->ev_flags &= ~EVLIST_ACTIVE; + DECR_EVENT_COUNT(base, evcb->evcb_flags); + evcb->evcb_flags &= ~EVLIST_ACTIVE; base->event_count_active--; - TAILQ_REMOVE(&base->activequeues[ev->ev_pri], - ev, ev_active_next); + + TAILQ_REMOVE(&base->activequeues[evcb->evcb_pri], + evcb, evcb_active_next); } static void event_queue_remove_timeout(struct event_base *base, struct event *ev) @@ -2629,7 +2674,7 @@ event_queue_remove_timeout(struct event_base *base, struct event *ev) ev, ev->ev_fd, EVLIST_TIMEOUT); return; } - DECR_EVENT_COUNT(base, ev); + DECR_EVENT_COUNT(base, ev->ev_flags); ev->ev_flags &= ~EVLIST_TIMEOUT; if (is_common_timeout(&ev->ev_timeout, base)) { @@ -2725,28 +2770,28 @@ event_queue_insert_inserted(struct event_base *base, struct event *ev) return; } - INCR_EVENT_COUNT(base, ev); + INCR_EVENT_COUNT(base, ev->ev_flags); ev->ev_flags |= EVLIST_INSERTED; } static void -event_queue_insert_active(struct event_base *base, struct event *ev) +event_queue_insert_active(struct event_base *base, struct event_callback *evcb) { EVENT_BASE_ASSERT_LOCKED(base); - if (ev->ev_flags & EVLIST_ACTIVE) { + if (evcb->evcb_flags & EVLIST_ACTIVE) { /* Double insertion is possible for active events */ return; } - INCR_EVENT_COUNT(base, ev); + INCR_EVENT_COUNT(base, evcb->evcb_flags); - ev->ev_flags |= EVLIST_ACTIVE; + evcb->evcb_flags |= EVLIST_ACTIVE; base->event_count_active++; - TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri], - ev,ev_active_next); + TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], + evcb, evcb_active_next); } static void @@ -2760,7 +2805,7 @@ event_queue_insert_timeout(struct event_base *base, struct event *ev) return; } - INCR_EVENT_COUNT(base, ev); + INCR_EVENT_COUNT(base, ev->ev_flags); ev->ev_flags |= EVLIST_TIMEOUT; @@ -3034,11 +3079,15 @@ event_base_foreach_event_(struct event_base *base, /* Finally, we deal wit all the active events that we haven't touched * yet. */ for (i = 0; i < base->nactivequeues; ++i) { - TAILQ_FOREACH(ev, &base->activequeues[i], ev_active_next) { - if (ev->ev_flags & (EVLIST_INSERTED|EVLIST_TIMEOUT)) { - /* we already processed this one */ + struct event_callback *evcb; + TAILQ_FOREACH(evcb, &base->activequeues[i], evcb_active_next) { + if ((evcb->evcb_flags & (EVLIST_INIT|EVLIST_INSERTED|EVLIST_TIMEOUT)) != EVLIST_INIT) { + /* This isn't an event (evlist_init clear), or + * we already processed it. (inserted or + * timeout set */ continue; } + ev = event_callback_to_event(evcb); if ((r = fn(base, ev, arg))) return r; } @@ -3231,11 +3280,11 @@ event_base_assert_ok_(struct event_base *base) /* Check the active queues. */ for (i = 0; i < base->nactivequeues; ++i) { - struct event *ev; - EVUTIL_ASSERT_TAILQ_OK(&base->activequeues[i], event, ev_active_next); - TAILQ_FOREACH(ev, &base->activequeues[i], ev_active_next) { - EVUTIL_ASSERT(ev->ev_pri == i); - EVUTIL_ASSERT(ev->ev_flags & EVLIST_ACTIVE); + struct event_callback *evcb; + EVUTIL_ASSERT_TAILQ_OK(&base->activequeues[i], event_callback, evcb_active_next); + TAILQ_FOREACH(evcb, &base->activequeues[i], evcb_active_next) { + EVUTIL_ASSERT(evcb->evcb_flags & EVLIST_ACTIVE); + EVUTIL_ASSERT(evcb->evcb_pri == i); } } diff --git a/include/event2/event_struct.h b/include/event2/event_struct.h index 44bb6f7e..6f30bde9 100644 --- a/include/event2/event_struct.h +++ b/include/event2/event_struct.h @@ -93,9 +93,19 @@ struct { \ } #endif /* !TAILQ_ENTRY */ +struct event_callback { + TAILQ_ENTRY(event_callback) evcb_active_next; + short evcb_flags; + ev_uint8_t evcb_pri; /* smaller numbers are higher priority */ + ev_uint8_t evcb_closure; + /* allows us to adopt for different types of events */ + void (*evcb_callback)(evutil_socket_t, short, void *arg); + void *evcb_arg; +}; + struct event_base; struct event { - TAILQ_ENTRY(event) ev_active_next; + struct event_callback ev_evcallback; /* for managing timeouts */ union { @@ -124,14 +134,7 @@ struct event { short ev_events; short ev_res; /* result passed to event callback */ - short ev_flags; - ev_uint8_t ev_pri; /* smaller numbers are higher priority */ - ev_uint8_t ev_closure; struct timeval ev_timeout; - - /* allows us to adopt for different types of events */ - void (*ev_callback)(evutil_socket_t, short, void *arg); - void *ev_arg; }; TAILQ_HEAD (event_list, event); From 9889a3d883d99c644f00398c3a6a11a9037e8117 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Thu, 5 Apr 2012 12:56:07 -0400 Subject: [PATCH 02/12] Remove the unused bits from EVLIST_ALL --- include/event2/event_struct.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/include/event2/event_struct.h b/include/event2/event_struct.h index 6f30bde9..d2470f05 100644 --- a/include/event2/event_struct.h +++ b/include/event2/event_struct.h @@ -61,8 +61,7 @@ extern "C" { #define EVLIST_INTERNAL 0x10 #define EVLIST_INIT 0x80 -/* EVLIST_X_ Private space: 0x1000-0xf000 */ -#define EVLIST_ALL (0xf000 | 0x9f) +#define EVLIST_ALL 0x9f /* Fix so that people don't have to run with */ #ifndef TAILQ_ENTRY From 745a63dba33c53440064908f086bb739aa9c4bb3 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Fri, 6 Apr 2012 03:00:40 -0400 Subject: [PATCH 03/12] Add "active later" event_callbacks to supersede deferred An event or event callback can now be in an additional state: "active later". When an event is in this state, it will become active the next time we run through the event loop. This lets us do what we wanted to with deferred callbacks: make a type of active thing that avoids infinite circular regress in a way that starves other events or exhausts the stack. It improves on deferred callbacks by respecting priorities, and by having a non-kludgy way to avoid event starvation. --- event-internal.h | 13 +++ event.c | 199 +++++++++++++++++++++++++++++++--- include/event2/event_struct.h | 15 +-- test/regress.c | 62 +++++++++++ 4 files changed, 266 insertions(+), 23 deletions(-) diff --git a/event-internal.h b/event-internal.h index e4025cf6..93b7c626 100644 --- a/event-internal.h +++ b/event-internal.h @@ -225,6 +225,9 @@ struct event_base { struct evcallback_list *activequeues; /** The length of the activequeues array */ int nactivequeues; + /** A list of event_callbacks that should become active the next time + * we process events, but not this time. */ + struct evcallback_list active_later_queue; /* common timeout logic */ @@ -364,7 +367,17 @@ int evsig_restore_handler_(struct event_base *base, int evsignal); void event_active_nolock_(struct event *ev, int res, short count); void event_callback_activate_nolock_(struct event_base *, struct event_callback *); +int event_callback_cancel_(struct event_base *base, + struct event_callback *evcb); +void event_active_later_(struct event *ev, int res); +void event_active_later_nolock_(struct event *ev, int res); +void event_callback_activate_later_nolock_(struct event_base *base, + struct event_callback *evcb); +int event_callback_cancel_nolock_(struct event_base *base, + struct event_callback *evcb); +void event_callback_init_(struct event_base *base, + struct event_callback *cb); /* FIXME document. */ void event_base_add_virtual_(struct event_base *base); diff --git a/event.c b/event.c index 3307db1b..d6bf4207 100644 --- a/event.c +++ b/event.c @@ -136,11 +136,15 @@ static inline int event_add_internal(struct event *ev, static inline int event_del_internal(struct event *ev); static void event_queue_insert_active(struct event_base *, struct event_callback *); +static void event_queue_insert_active_later(struct event_base *, struct event_callback *); static void event_queue_insert_timeout(struct event_base *, struct event *); static void event_queue_insert_inserted(struct event_base *, struct event *); static void event_queue_remove_active(struct event_base *, struct event_callback *); +static void event_queue_remove_active_later(struct event_base *, struct event_callback *); static void event_queue_remove_timeout(struct event_base *, struct event *); static void event_queue_remove_inserted(struct event_base *, struct event *); +static void event_queue_make_later_events_active(struct event_base *base); + #ifdef USE_REINSERT_TIMEOUT /* This code seems buggy; only turn it on if we find out what the trouble is. */ static void event_queue_reinsert_timeout(struct event_base *,struct event *, int was_common, int is_common, int old_timeout_idx); @@ -606,6 +610,8 @@ event_base_new_with_config(const struct event_config *cfg) base->defer_queue.notify_fn = notify_base_cbq_callback; base->defer_queue.notify_arg = base; + TAILQ_INIT(&base->active_later_queue); + evmap_io_initmap_(&base->io); evmap_signal_initmap_(&base->sigmap); event_changelist_init_(&base->changelist); @@ -800,11 +806,26 @@ event_base_free(struct event_base *base) ++n_deleted; } } else { - event_queue_remove_active(base, evcb); + event_callback_cancel_(base, evcb); + ++n_deleted; } evcb = next; } } + { + struct event_callback *evcb; + while ((evcb = TAILQ_FIRST(&base->active_later_queue))) { + if (evcb->evcb_flags & EVLIST_INIT) { + ev = event_callback_to_event(evcb); + event_del(ev); + ++n_deleted; + } else { + event_callback_cancel_(base, evcb); + ++n_deleted; + } + } + } + if (n_deleted) event_debug(("%s: %d events were still set in base", @@ -1754,6 +1775,8 @@ event_base_loop(struct event_base *base, int flags) goto done; } + event_queue_make_later_events_active(base); + clear_time_cache(base); res = evsel->dispatch(base, tv_p); @@ -2031,7 +2054,7 @@ event_pending(const struct event *ev, short event, struct timeval *tv) if (ev->ev_flags & EVLIST_INSERTED) flags |= (ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)); - if (ev->ev_flags & EVLIST_ACTIVE) + if (ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) flags |= ev->ev_res; if (ev->ev_flags & EVLIST_TIMEOUT) flags |= EV_TIMEOUT; @@ -2235,7 +2258,7 @@ event_add_internal(struct event *ev, const struct timeval *tv, #endif if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) && - !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) { + !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) { if (ev->ev_events & (EV_READ|EV_WRITE)) res = evmap_io_add_(base, ev->ev_fd, ev); else if (ev->ev_events & EV_SIGNAL) @@ -2421,6 +2444,8 @@ event_del_internal(struct event *ev) if (ev->ev_flags & EVLIST_ACTIVE) event_queue_remove_active(base, event_to_event_callback(ev)); + else if (ev->ev_flags & EVLIST_ACTIVE_LATER) + event_queue_remove_active_later(base, event_to_event_callback(ev)); if (ev->ev_flags & EVLIST_INSERTED) { event_queue_remove_inserted(base, ev); @@ -2470,18 +2495,25 @@ event_active_nolock_(struct event *ev, int res, short ncalls) event_debug(("event_active: %p (fd %d), res %d, callback %p", ev, (int)ev->ev_fd, (int)res, ev->ev_callback)); - - /* We get different kinds of events, add them together */ - if (ev->ev_flags & EVLIST_ACTIVE) { - ev->ev_res |= res; - return; - } - base = ev->ev_base; - EVENT_BASE_ASSERT_LOCKED(base); - ev->ev_res = res; + switch ((ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) { + default: + case EVLIST_ACTIVE|EVLIST_ACTIVE_LATER: + EVUTIL_ASSERT(0); + break; + case EVLIST_ACTIVE: + /* We get different kinds of events, add them together */ + ev->ev_res |= res; + return; + case EVLIST_ACTIVE_LATER: + ev->ev_res |= res; + break; + case 0: + ev->ev_res = res; + break; + } if (ev->ev_pri < base->event_running_priority) base->event_continue = 1; @@ -2501,16 +2533,100 @@ event_active_nolock_(struct event *ev, int res, short ncalls) event_callback_activate_nolock_(base, event_to_event_callback(ev)); } +void +event_active_later_(struct event *ev, int res) +{ + EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock); + event_active_later_nolock_(ev, res); + EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock); +} + +void +event_active_later_nolock_(struct event *ev, int res) +{ + struct event_base *base = ev->ev_base; + EVENT_BASE_ASSERT_LOCKED(base); + + if (ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) { + /* We get different kinds of events, add them together */ + ev->ev_res |= res; + return; + } + + ev->ev_res = res; + + event_callback_activate_later_nolock_(base, event_to_event_callback(ev)); +} + void event_callback_activate_nolock_(struct event_base *base, struct event_callback *evcb) { + if (evcb->evcb_flags & EVLIST_ACTIVE_LATER) + event_queue_remove_active_later(base, evcb); + event_queue_insert_active(base, evcb); if (EVBASE_NEED_NOTIFY(base)) evthread_notify_base(base); } +void +event_callback_activate_later_nolock_(struct event_base *base, + struct event_callback *evcb) +{ + if (evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) + return; + + event_queue_insert_active_later(base, evcb); + if (EVBASE_NEED_NOTIFY(base)) + evthread_notify_base(base); +} + +void +event_callback_init_(struct event_base *base, + struct event_callback *cb) +{ + memset(cb, 0, sizeof(*cb)); + cb->evcb_pri = base->nactivequeues - 1; +} + +int +event_callback_cancel_(struct event_base *base, + struct event_callback *evcb) +{ + int r; + EVBASE_ACQUIRE_LOCK(base, th_base_lock); + r = event_callback_cancel_nolock_(base, evcb); + EVBASE_RELEASE_LOCK(base, th_base_lock); + return r; +} + +int +event_callback_cancel_nolock_(struct event_base *base, + struct event_callback *evcb) +{ + if (evcb->evcb_flags & EVLIST_INIT) + return event_del_internal(event_callback_to_event(evcb)); + + switch ((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) { + default: + case EVLIST_ACTIVE|EVLIST_ACTIVE_LATER: + EVUTIL_ASSERT(0); + break; + case EVLIST_ACTIVE: + /* We get different kinds of events, add them together */ + event_queue_remove_active(base, evcb); + return 0; + case EVLIST_ACTIVE_LATER: + event_queue_remove_active_later(base, evcb); + break; + case 0: + break; + } + return 0; +} + void event_deferred_cb_init_(struct deferred_cb *cb, deferred_cb_fn fn, void *arg) { @@ -2666,6 +2782,21 @@ event_queue_remove_active(struct event_base *base, struct event_callback *evcb) evcb, evcb_active_next); } static void +event_queue_remove_active_later(struct event_base *base, struct event_callback *evcb) +{ + EVENT_BASE_ASSERT_LOCKED(base); + if (EVUTIL_FAILURE_CHECK(!(evcb->evcb_flags & EVLIST_ACTIVE_LATER))) { + event_errx(1, "%s: %p not on queue %x", __func__, + evcb, EVLIST_ACTIVE_LATER); + return; + } + DECR_EVENT_COUNT(base, evcb->evcb_flags); + evcb->evcb_flags &= ~EVLIST_ACTIVE_LATER; + base->event_count_active--; + + TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next); +} +static void event_queue_remove_timeout(struct event_base *base, struct event *ev) { EVENT_BASE_ASSERT_LOCKED(base); @@ -2794,6 +2925,21 @@ event_queue_insert_active(struct event_base *base, struct event_callback *evcb) evcb, evcb_active_next); } +static void +event_queue_insert_active_later(struct event_base *base, struct event_callback *evcb) +{ + EVENT_BASE_ASSERT_LOCKED(base); + if (evcb->evcb_flags & (EVLIST_ACTIVE_LATER|EVLIST_ACTIVE)) { + /* Double insertion is possible */ + return; + } + + INCR_EVENT_COUNT(base, evcb->evcb_flags); + evcb->evcb_flags |= EVLIST_ACTIVE_LATER; + base->event_count_active++; + TAILQ_INSERT_TAIL(&base->active_later_queue, evcb, evcb_active_next); +} + static void event_queue_insert_timeout(struct event_base *base, struct event *ev) { @@ -2818,6 +2964,19 @@ event_queue_insert_timeout(struct event_base *base, struct event *ev) } } +static void +event_queue_make_later_events_active(struct event_base *base) +{ + struct event_callback *evcb; + EVENT_BASE_ASSERT_LOCKED(base); + + while ((evcb = TAILQ_FIRST(&base->active_later_queue))) { + TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next); + evcb->evcb_flags = (evcb->evcb_flags & ~EVLIST_ACTIVE_LATER) | EVLIST_ACTIVE; + TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], evcb, evcb_active_next); + } +} + /* Functions for debugging */ const char * @@ -3137,16 +3296,17 @@ dump_active_event_fn(struct event_base *base, struct event *e, void *arg) const char *gloss = (e->ev_events & EV_SIGNAL) ? "sig" : "fd "; - if (! (e->ev_flags & EVLIST_ACTIVE)) + if (! (e->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) return 0; - fprintf(output, " %p [%s %ld, priority=%d]%s%s%s%s active%s\n", + fprintf(output, " %p [%s %ld, priority=%d]%s%s%s%s active%s%s\n", (void*)e, gloss, (long)e->ev_fd, e->ev_pri, (e->ev_res&EV_READ)?" Read":"", (e->ev_res&EV_WRITE)?" Write":"", (e->ev_res&EV_SIGNAL)?" Signal":"", (e->ev_res&EV_TIMEOUT)?" Timeout":"", - (e->ev_flags&EVLIST_INTERNAL)?" [Internal]":""); + (e->ev_flags&EVLIST_INTERNAL)?" [Internal]":"", + (e->ev_flags&EVLIST_ACTIVE_LATER)?" [NextTime]":""); return 0; } @@ -3283,10 +3443,17 @@ event_base_assert_ok_(struct event_base *base) struct event_callback *evcb; EVUTIL_ASSERT_TAILQ_OK(&base->activequeues[i], event_callback, evcb_active_next); TAILQ_FOREACH(evcb, &base->activequeues[i], evcb_active_next) { - EVUTIL_ASSERT(evcb->evcb_flags & EVLIST_ACTIVE); + EVUTIL_ASSERT((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) == EVLIST_ACTIVE); EVUTIL_ASSERT(evcb->evcb_pri == i); } } + { + struct event_callback *evcb; + TAILQ_FOREACH(evcb, &base->active_later_queue, evcb_active_next) { + EVUTIL_ASSERT((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) == EVLIST_ACTIVE_LATER); + } + } + EVBASE_RELEASE_LOCK(base, th_base_lock); } diff --git a/include/event2/event_struct.h b/include/event2/event_struct.h index d2470f05..701e8eac 100644 --- a/include/event2/event_struct.h +++ b/include/event2/event_struct.h @@ -54,14 +54,15 @@ extern "C" { /* For evkeyvalq */ #include -#define EVLIST_TIMEOUT 0x01 -#define EVLIST_INSERTED 0x02 -#define EVLIST_SIGNAL 0x04 -#define EVLIST_ACTIVE 0x08 -#define EVLIST_INTERNAL 0x10 -#define EVLIST_INIT 0x80 +#define EVLIST_TIMEOUT 0x01 +#define EVLIST_INSERTED 0x02 +#define EVLIST_SIGNAL 0x04 +#define EVLIST_ACTIVE 0x08 +#define EVLIST_INTERNAL 0x10 +#define EVLIST_ACTIVE_LATER 0x20 +#define EVLIST_INIT 0x80 -#define EVLIST_ALL 0x9f +#define EVLIST_ALL 0xbf /* Fix so that people don't have to run with */ #ifndef TAILQ_ENTRY diff --git a/test/regress.c b/test/regress.c index 0c5faed7..b736d891 100644 --- a/test/regress.c +++ b/test/regress.c @@ -1350,6 +1350,66 @@ end: ; } +static int n_write_a_byte_cb=0; +static int n_read_and_drain_cb=0; +static int n_activate_other_event_cb=0; +static void +write_a_byte_cb(evutil_socket_t fd, short what, void *arg) +{ + char buf[] = "x"; + write(fd, buf, 1); + ++n_write_a_byte_cb; +} +static void +read_and_drain_cb(evutil_socket_t fd, short what, void *arg) +{ + char buf[128]; + int n; + ++n_read_and_drain_cb; + while ((n = read(fd, buf, sizeof(buf))) > 0) + ; +} + +static void +activate_other_event_cb(evutil_socket_t fd, short what, void *other_) +{ + struct event *ev_activate = other_; + ++n_activate_other_event_cb; + event_active_later_(ev_activate, EV_READ); +} + +static void +test_active_later(void *ptr) +{ + struct basic_test_data *data = ptr; + struct event *ev1, *ev2; + struct event ev3, ev4; + struct timeval qsec = {0, 100000}; + ev1 = event_new(data->base, data->pair[0], EV_READ|EV_PERSIST, read_and_drain_cb, NULL); + ev2 = event_new(data->base, data->pair[1], EV_WRITE|EV_PERSIST, write_a_byte_cb, NULL); + event_assign(&ev3, data->base, -1, 0, activate_other_event_cb, &ev4); + event_assign(&ev4, data->base, -1, 0, activate_other_event_cb, &ev3); + event_add(ev1, NULL); + event_add(ev2, NULL); + event_active_later_(&ev3, EV_READ); + + event_base_loopexit(data->base, &qsec); + + event_base_loop(data->base, 0); + + TT_BLATHER(("%d write calls, %d read calls, %d activate-other calls.", + n_write_a_byte_cb, n_read_and_drain_cb, n_activate_other_event_cb)); + event_del(&ev3); + event_del(&ev4); + + tt_int_op(n_write_a_byte_cb, ==, n_activate_other_event_cb); + tt_int_op(n_write_a_byte_cb, >, 100); + tt_int_op(n_read_and_drain_cb, >, 100); + tt_int_op(n_activate_other_event_cb, >, 100); +end: + ; +} + static void test_event_base_new(void *ptr) { @@ -2468,7 +2528,9 @@ struct testcase_t main_testcases[] = { BASIC(bad_assign, TT_FORK|TT_NEED_BASE|TT_NO_LOGS), BASIC(bad_reentrant, TT_FORK|TT_NEED_BASE|TT_NO_LOGS), + BASIC(active_later, TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR), + /* These are still using the old API */ LEGACY(persistent_timeout, TT_FORK|TT_NEED_BASE), { "persistent_timeout_jump", test_persistent_timeout_jump, TT_FORK|TT_NEED_BASE, &basic_setup, NULL }, { "persistent_active_timeout", test_persistent_active_timeout, From fec8bae2673966325eeaabc3fc1af60b584abb1a Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Fri, 6 Apr 2012 03:15:50 -0400 Subject: [PATCH 04/12] event_base_assert_ok: check value of event_active_count for correctness --- event.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/event.c b/event.c index d6bf4207..c8315911 100644 --- a/event.c +++ b/event.c @@ -3405,6 +3405,7 @@ void event_base_assert_ok_(struct event_base *base) { int i; + int count; EVBASE_ACQUIRE_LOCK(base, th_base_lock); /* First do checks on the per-fd and per-signal lists */ @@ -3439,12 +3440,14 @@ event_base_assert_ok_(struct event_base *base) } /* Check the active queues. */ + count = 0; for (i = 0; i < base->nactivequeues; ++i) { struct event_callback *evcb; EVUTIL_ASSERT_TAILQ_OK(&base->activequeues[i], event_callback, evcb_active_next); TAILQ_FOREACH(evcb, &base->activequeues[i], evcb_active_next) { EVUTIL_ASSERT((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) == EVLIST_ACTIVE); EVUTIL_ASSERT(evcb->evcb_pri == i); + ++count; } } @@ -3452,8 +3455,10 @@ event_base_assert_ok_(struct event_base *base) struct event_callback *evcb; TAILQ_FOREACH(evcb, &base->active_later_queue, evcb_active_next) { EVUTIL_ASSERT((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) == EVLIST_ACTIVE_LATER); + ++count; } } + EVUTIL_ASSERT(count == base->event_count_active); EVBASE_RELEASE_LOCK(base, th_base_lock); } From ae2b84b2575be93d0aebba5c0b78453836f89f3c Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Fri, 6 Apr 2012 04:33:19 -0400 Subject: [PATCH 05/12] Replace deferred_cbs with event_callback-based implementation. --- buffer.c | 13 ++- bufferevent-internal.h | 1 + bufferevent.c | 23 ++--- defer-internal.h | 56 +++--------- evbuffer-internal.h | 1 + evdns.c | 3 +- event-internal.h | 12 ++- event.c | 162 ++++++++++------------------------ http.c | 3 +- include/event2/event_struct.h | 5 +- test/regress_buffer.c | 1 + test/regress_thread.c | 2 +- 12 files changed, 94 insertions(+), 188 deletions(-) diff --git a/buffer.c b/buffer.c index b28c9361..6853edf3 100644 --- a/buffer.c +++ b/buffer.c @@ -404,7 +404,7 @@ evbuffer_defer_callbacks(struct evbuffer *buffer, struct event_base *base) EVBUFFER_LOCK(buffer); buffer->cb_queue = event_base_get_deferred_cb_queue_(base); buffer->deferred_cbs = 1; - event_deferred_cb_init_(&buffer->deferred, + event_deferred_cb_init_(base, &buffer->deferred, evbuffer_deferred_callback, buffer); EVBUFFER_UNLOCK(buffer); return 0; @@ -509,13 +509,12 @@ evbuffer_invoke_callbacks_(struct evbuffer *buffer) } if (buffer->deferred_cbs) { - if (buffer->deferred.queued) - return; - evbuffer_incref_and_lock_(buffer); - if (buffer->parent) - bufferevent_incref_(buffer->parent); + if (event_deferred_cb_schedule_(buffer->cb_queue, &buffer->deferred)) { + evbuffer_incref_and_lock_(buffer); + if (buffer->parent) + bufferevent_incref_(buffer->parent); + } EVBUFFER_UNLOCK(buffer); - event_deferred_cb_schedule_(buffer->cb_queue, &buffer->deferred); } evbuffer_run_callbacks(buffer, 0); diff --git a/bufferevent-internal.h b/bufferevent-internal.h index 461c46e7..5d7e98c0 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -31,6 +31,7 @@ extern "C" { #endif #include "event2/event-config.h" +#include "event2/event_struct.h" #include "evconfig-private.h" #include "event2/util.h" #include "defer-internal.h" diff --git a/bufferevent.c b/bufferevent.c index 0d4b01d6..9c023ad4 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -210,10 +210,10 @@ bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *cb, void *arg) #define SCHEDULE_DEFERRED(bevp) \ do { \ - bufferevent_incref_(&(bevp)->bev); \ - event_deferred_cb_schedule_( \ + if (event_deferred_cb_schedule_( \ event_base_get_deferred_cb_queue_((bevp)->bev.ev_base), \ - &(bevp)->deferred); \ + &(bevp)->deferred)) \ + bufferevent_incref_(&(bevp)->bev); \ } while (0) @@ -227,8 +227,7 @@ bufferevent_run_readcb_(struct bufferevent *bufev) return; if (p->options & BEV_OPT_DEFER_CALLBACKS) { p->readcb_pending = 1; - if (!p->deferred.queued) - SCHEDULE_DEFERRED(p); + SCHEDULE_DEFERRED(p); } else { bufev->readcb(bufev, bufev->cbarg); } @@ -244,8 +243,7 @@ bufferevent_run_writecb_(struct bufferevent *bufev) return; if (p->options & BEV_OPT_DEFER_CALLBACKS) { p->writecb_pending = 1; - if (!p->deferred.queued) - SCHEDULE_DEFERRED(p); + SCHEDULE_DEFERRED(p); } else { bufev->writecb(bufev, bufev->cbarg); } @@ -262,8 +260,7 @@ bufferevent_run_eventcb_(struct bufferevent *bufev, short what) if (p->options & BEV_OPT_DEFER_CALLBACKS) { p->eventcb_pending |= what; p->errno_pending = EVUTIL_SOCKET_ERROR(); - if (!p->deferred.queued) - SCHEDULE_DEFERRED(p); + SCHEDULE_DEFERRED(p); } else { bufev->errorcb(bufev, what, bufev->cbarg); } @@ -326,11 +323,15 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private, } if (options & BEV_OPT_DEFER_CALLBACKS) { if (options & BEV_OPT_UNLOCK_CALLBACKS) - event_deferred_cb_init_(&bufev_private->deferred, + event_deferred_cb_init_( + bufev->ev_base, + &bufev_private->deferred, bufferevent_run_deferred_callbacks_unlocked, bufev_private); else - event_deferred_cb_init_(&bufev_private->deferred, + event_deferred_cb_init_( + bufev->ev_base, + &bufev_private->deferred, bufferevent_run_deferred_callbacks_locked, bufev_private); } diff --git a/defer-internal.h b/defer-internal.h index 114a9dc2..a4c88135 100644 --- a/defer-internal.h +++ b/defer-internal.h @@ -35,43 +35,11 @@ extern "C" { #include -struct deferred_cb; +#define deferred_cb event_callback +#define deferred_cb_queue event_base +struct event_callback; -typedef void (*deferred_cb_fn)(struct deferred_cb *, void *); - -/** A deferred_cb is a callback that can be scheduled to run as part of - * an event_base's event_loop, rather than running immediately. */ -struct deferred_cb { - /** Links to the adjacent active (pending) deferred_cb objects. */ - TAILQ_ENTRY (deferred_cb) cb_next; - /** True iff this deferred_cb is pending in an event_base. */ - unsigned queued : 1; - /** The function to execute when the callback runs. */ - deferred_cb_fn cb; - /** The function's second argument. */ - void *arg; -}; - -/** A deferred_cb_queue is a list of deferred_cb that we can add to and run. */ -struct deferred_cb_queue { - /** Lock used to protect the queue. */ - void *lock; - - /** Which event_base does this queue associate itself with? - * (Used for timing) */ - struct event_base *base; - - /** How many entries are in the queue? */ - int active_count; - - /** Function called when adding to the queue from another thread. */ - void (*notify_fn)(struct deferred_cb_queue *, void *); - void *notify_arg; - - /** Deferred callback management: a list of deferred callbacks to - * run active the active events. */ - TAILQ_HEAD (deferred_cb_list, deferred_cb) deferred_cb_list; -}; +typedef void (*deferred_cb_fn)(struct event_callback *, void *); /** Initialize an empty, non-pending deferred_cb. @@ -80,27 +48,23 @@ struct deferred_cb_queue { @param cb The function to run when the deferred_cb executes. @param arg The function's second argument. */ -void event_deferred_cb_init_(struct deferred_cb *, deferred_cb_fn, void *); +void event_deferred_cb_init_(struct event_base *base, struct event_callback *, deferred_cb_fn, void *); /** Cancel a deferred_cb if it is currently scheduled in an event_base. */ -void event_deferred_cb_cancel_(struct deferred_cb_queue *, struct deferred_cb *); +void event_deferred_cb_cancel_(struct event_base *, struct event_callback *); /** Activate a deferred_cb if it is not currently scheduled in an event_base. - */ -void event_deferred_cb_schedule_(struct deferred_cb_queue *, struct deferred_cb *); -#define LOCK_DEFERRED_QUEUE(q) \ - EVLOCK_LOCK((q)->lock, 0) -#define UNLOCK_DEFERRED_QUEUE(q) \ - EVLOCK_UNLOCK((q)->lock, 0) + Return true iff it was not previously scheduled. + */ +int event_deferred_cb_schedule_(struct event_base *, struct event_callback *); #ifdef __cplusplus } #endif -void event_deferred_cb_queue_init_(struct deferred_cb_queue *); -struct deferred_cb_queue *event_base_get_deferred_cb_queue_(struct event_base *); +#define event_base_get_deferred_cb_queue_(x) (x) #endif /* EVENT_INTERNAL_H_INCLUDED_ */ diff --git a/evbuffer-internal.h b/evbuffer-internal.h index 480c35a3..824739b1 100644 --- a/evbuffer-internal.h +++ b/evbuffer-internal.h @@ -34,6 +34,7 @@ extern "C" { #include "event2/event-config.h" #include "evconfig-private.h" #include "event2/util.h" +#include "event2/event_struct.h" #include "util-internal.h" #include "defer-internal.h" diff --git a/evdns.c b/evdns.c index 41d8b6b2..ff36fc3c 100644 --- a/evdns.c +++ b/evdns.c @@ -836,7 +836,8 @@ reply_schedule_callback(struct request *const req, u32 ttl, u32 err, struct repl d->handle = req->handle; } - event_deferred_cb_init_(&d->deferred, reply_run_callback, + event_deferred_cb_init_(req->base->event_base, + &d->deferred, reply_run_callback, req->user_pointer); event_deferred_cb_schedule_( event_base_get_deferred_cb_queue_(req->base->event_base), diff --git a/event-internal.h b/event-internal.h index 93b7c626..4757836c 100644 --- a/event-internal.h +++ b/event-internal.h @@ -56,13 +56,14 @@ extern "C" { #define ev_pri ev_evcallback.evcb_pri #define ev_flags ev_evcallback.evcb_flags #define ev_closure ev_evcallback.evcb_closure -#define ev_callback ev_evcallback.evcb_callback +#define ev_callback ev_evcallback.evcb_cb_union.evcb_callback #define ev_arg ev_evcallback.evcb_arg /* Possible values for evcb_closure in struct event_callback */ #define EV_CLOSURE_EVENT 0 #define EV_CLOSURE_EVENT_SIGNAL 1 #define EV_CLOSURE_EVENT_PERSIST 2 +#define EV_CLOSURE_CB_SELF 3 /** Structure to define the backend of a given event_base. */ struct eventop { @@ -239,10 +240,6 @@ struct event_base { /** The total size of common_timeout_queues. */ int n_common_timeouts_allocated; - /** List of defered_cb that are active. We run these after the active - * events. */ - struct deferred_cb_queue defer_queue; - /** Mapping from file descriptors to enabled (added) events */ struct event_io_map io; @@ -358,7 +355,7 @@ struct event_config { #endif /* TAILQ_FOREACH */ #define N_ACTIVE_CALLBACKS(base) \ - ((base)->event_count_active + (base)->defer_queue.active_count) + ((base)->event_count_active) int evsig_set_handler_(struct event_base *base, int evsignal, void (*fn)(int)); @@ -366,7 +363,8 @@ int evsig_restore_handler_(struct event_base *base, int evsignal); void event_active_nolock_(struct event *ev, int res, short count); -void event_callback_activate_nolock_(struct event_base *, struct event_callback *); +int event_callback_activate_(struct event_base *, struct event_callback *); +int event_callback_activate_nolock_(struct event_base *, struct event_callback *); int event_callback_cancel_(struct event_base *base, struct event_callback *evcb); diff --git a/event.c b/event.c index c8315911..9a506c82 100644 --- a/event.c +++ b/event.c @@ -506,28 +506,6 @@ event_base_get_features(const struct event_base *base) return base->evsel->features; } -void -event_deferred_cb_queue_init_(struct deferred_cb_queue *cb) -{ - memset(cb, 0, sizeof(struct deferred_cb_queue)); - TAILQ_INIT(&cb->deferred_cb_list); -} - -/** Helper for the deferred_cb queue: wake up the event base. */ -static void -notify_base_cbq_callback(struct deferred_cb_queue *cb, void *baseptr) -{ - struct event_base *base = baseptr; - if (EVBASE_NEED_NOTIFY(base)) - evthread_notify_base(base); -} - -struct deferred_cb_queue * -event_base_get_deferred_cb_queue_(struct event_base *base) -{ - return base ? &base->defer_queue : NULL; -} - void event_enable_debug_mode(void) { @@ -605,11 +583,6 @@ event_base_new_with_config(const struct event_config *cfg) base->th_notify_fd[0] = -1; base->th_notify_fd[1] = -1; - event_deferred_cb_queue_init_(&base->defer_queue); - base->defer_queue.base = base; - base->defer_queue.notify_fn = notify_base_cbq_callback; - base->defer_queue.notify_arg = base; - TAILQ_INIT(&base->active_later_queue); evmap_io_initmap_(&base->io); @@ -682,7 +655,6 @@ event_base_new_with_config(const struct event_config *cfg) int r; EVTHREAD_ALLOC_LOCK(base->th_base_lock, EVTHREAD_LOCKTYPE_RECURSIVE); - base->defer_queue.lock = base->th_base_lock; EVTHREAD_ALLOC_COND(base->current_event_cond); r = evthread_make_base_notifiable(base); if (r<0) { @@ -1464,7 +1436,7 @@ event_process_active_single_queue(struct event_base *base, event_queue_remove_active(base, evcb); event_debug(("event_process_active: event_callback %p, " "closure %d, call %p", - evcb, evcb->evcb_closure, evcb->evcb_callback)); + evcb, evcb->evcb_closure, evcb->evcb_cb_union.evcb_callback)); } if (!(evcb->evcb_flags & EVLIST_INTERNAL)) @@ -1488,6 +1460,10 @@ event_process_active_single_queue(struct event_base *base, (*ev->ev_callback)( ev->ev_fd, ev->ev_res, ev->ev_arg); break; + case EV_CLOSURE_CB_SELF: + EVBASE_RELEASE_LOCK(base, th_base_lock); + evcb->evcb_cb_union.evcb_selfcb(evcb, evcb->evcb_arg); + break; default: EVUTIL_ASSERT(0); } @@ -1518,47 +1494,6 @@ event_process_active_single_queue(struct event_base *base, return count; } -/* - Process up to MAX_DEFERRED of the defered_cb entries in 'queue'. If - *breakptr becomes set to 1, stop. Requires that we start out holding - the lock on 'queue'; releases the lock around 'queue' for each deferred_cb - we process. - */ -static int -event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr, - int max_to_process, const struct timeval *endtime) -{ - int count = 0; - struct deferred_cb *cb; -#define MAX_DEFERRED 16 - if (max_to_process > MAX_DEFERRED) - max_to_process = MAX_DEFERRED; -#undef MAX_DEFERRED - - while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) { - cb->queued = 0; - TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next); - --queue->active_count; - UNLOCK_DEFERRED_QUEUE(queue); - - cb->cb(cb, cb->arg); - - LOCK_DEFERRED_QUEUE(queue); - if (*breakptr) - return -1; - if (++count >= max_to_process) - break; - if (endtime) { - struct timeval now; - update_time_cache(queue->base); - gettime(queue->base, &now); - if (evutil_timercmp(&now, endtime, >=)) - return count; - } - } - return count; -} - /* * Active events are stored in priority queues. Lower priorities are always * process before higher priorities. Low priority events can starve high @@ -1605,8 +1540,6 @@ event_process_active(struct event_base *base) } } - event_process_deferred_callbacks(&base->defer_queue,&base->event_break, - maxcb-c, endtime); base->event_running_priority = -1; return c; } @@ -2558,17 +2491,42 @@ event_active_later_nolock_(struct event *ev, int res) event_callback_activate_later_nolock_(base, event_to_event_callback(ev)); } -void +int +event_callback_activate_(struct event_base *base, + struct event_callback *evcb) +{ + int r; + EVBASE_ACQUIRE_LOCK(base, th_base_lock); + r = event_callback_activate_nolock_(base, evcb); + EVBASE_RELEASE_LOCK(base, th_base_lock); + return r; +} + +int event_callback_activate_nolock_(struct event_base *base, struct event_callback *evcb) { - if (evcb->evcb_flags & EVLIST_ACTIVE_LATER) + int r = 1; + + switch (evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) { + default: + EVUTIL_ASSERT(0); + case EVLIST_ACTIVE_LATER: event_queue_remove_active_later(base, evcb); + r = 0; + break; + case EVLIST_ACTIVE: + return 0; + case 0: + break; + } event_queue_insert_active(base, evcb); if (EVBASE_NEED_NOTIFY(base)) evthread_notify_base(base); + + return r; } void @@ -2628,53 +2586,31 @@ event_callback_cancel_nolock_(struct event_base *base, } void -event_deferred_cb_init_(struct deferred_cb *cb, deferred_cb_fn fn, void *arg) +event_deferred_cb_init_(struct event_base *base, struct event_callback *cb, deferred_cb_fn fn, void *arg) { - memset(cb, 0, sizeof(struct deferred_cb)); - cb->cb = fn; - cb->arg = arg; + if (!base) + base = current_base; + memset(cb, 0, sizeof(*cb)); + cb->evcb_cb_union.evcb_selfcb = fn; + cb->evcb_arg = arg; + cb->evcb_pri = base->nactivequeues - 1; + cb->evcb_closure = EV_CLOSURE_CB_SELF; } void -event_deferred_cb_cancel_(struct deferred_cb_queue *queue, - struct deferred_cb *cb) +event_deferred_cb_cancel_(struct event_base *base, struct event_callback *cb) { - if (!queue) { - if (current_base) - queue = ¤t_base->defer_queue; - else - return; - } - - LOCK_DEFERRED_QUEUE(queue); - if (cb->queued) { - TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next); - --queue->active_count; - cb->queued = 0; - } - UNLOCK_DEFERRED_QUEUE(queue); + if (!base) + base = current_base; + event_callback_cancel_(base, cb); } -void -event_deferred_cb_schedule_(struct deferred_cb_queue *queue, - struct deferred_cb *cb) +int +event_deferred_cb_schedule_(struct event_base *base, struct event_callback *cb) { - if (!queue) { - if (current_base) - queue = ¤t_base->defer_queue; - else - return; - } - - LOCK_DEFERRED_QUEUE(queue); - if (!cb->queued) { - cb->queued = 1; - TAILQ_INSERT_TAIL(&queue->deferred_cb_list, cb, cb_next); - ++queue->active_count; - if (queue->notify_fn) - queue->notify_fn(queue, queue->notify_arg); - } - UNLOCK_DEFERRED_QUEUE(queue); + if (!base) + base = current_base; + return event_callback_activate_(base, cb); } static int diff --git a/http.c b/http.c index 786105d3..e5262755 100644 --- a/http.c +++ b/http.c @@ -2197,7 +2197,8 @@ evhttp_connection_base_bufferevent_new(struct event_base *base, struct evdns_bas bufferevent_base_set(base, evcon->bufev); } - event_deferred_cb_init_(&evcon->read_more_deferred_cb, + event_deferred_cb_init_(evcon->base, + &evcon->read_more_deferred_cb, evhttp_deferred_read_cb, evcon); evcon->dns_base = dnsbase; diff --git a/include/event2/event_struct.h b/include/event2/event_struct.h index 701e8eac..cf7b3df3 100644 --- a/include/event2/event_struct.h +++ b/include/event2/event_struct.h @@ -99,7 +99,10 @@ struct event_callback { ev_uint8_t evcb_pri; /* smaller numbers are higher priority */ ev_uint8_t evcb_closure; /* allows us to adopt for different types of events */ - void (*evcb_callback)(evutil_socket_t, short, void *arg); + union { + void (*evcb_callback)(evutil_socket_t, short, void *arg); + void (*evcb_selfcb)(struct event_callback *, void *arg); + } evcb_cb_union; void *evcb_arg; }; diff --git a/test/regress_buffer.c b/test/regress_buffer.c index d183b4f9..d94a8b37 100644 --- a/test/regress_buffer.c +++ b/test/regress_buffer.c @@ -57,6 +57,7 @@ #include "event2/buffer_compat.h" #include "event2/util.h" +#include "defer-internal.h" #include "evbuffer-internal.h" #include "log-internal.h" diff --git a/test/regress_thread.c b/test/regress_thread.c index 091bcb73..9fd49fa6 100644 --- a/test/regress_thread.c +++ b/test/regress_thread.c @@ -439,7 +439,7 @@ load_deferred_queue(void *arg) size_t i; for (i = 0; i < CB_COUNT; ++i) { - event_deferred_cb_init_(&data->cbs[i], deferred_callback, NULL); + event_deferred_cb_init_(data->queue, &data->cbs[i], deferred_callback, NULL); event_deferred_cb_schedule_(data->queue, &data->cbs[i]); SLEEP_MS(1); } From a4079aa88a5d4bd70ad3c74f83460e1a0a40772d Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Fri, 6 Apr 2012 11:05:35 -0400 Subject: [PATCH 06/12] Replace more deferred_cb names with event_callback --- buffer.c | 6 +++--- bufferevent-internal.h | 2 +- bufferevent.c | 6 +++--- defer-internal.h | 15 +++++---------- evbuffer-internal.h | 6 +++--- evdns.c | 6 +++--- event.c | 2 +- http-internal.h | 2 +- http.c | 4 ++-- listener.c | 12 ++++++------ test/regress_thread.c | 11 ++++------- 11 files changed, 32 insertions(+), 40 deletions(-) diff --git a/buffer.c b/buffer.c index 6853edf3..99aa1d58 100644 --- a/buffer.c +++ b/buffer.c @@ -142,7 +142,7 @@ static void evbuffer_chain_align(struct evbuffer_chain *chain); static int evbuffer_chain_should_realign(struct evbuffer_chain *chain, size_t datalen); -static void evbuffer_deferred_callback(struct deferred_cb *cb, void *arg); +static void evbuffer_deferred_callback(struct event_callback *cb, void *arg); static int evbuffer_ptr_memcmp(const struct evbuffer *buf, const struct evbuffer_ptr *pos, const char *mem, size_t len); static struct evbuffer_chain *evbuffer_expand_singlechain(struct evbuffer *buf, @@ -402,7 +402,7 @@ int evbuffer_defer_callbacks(struct evbuffer *buffer, struct event_base *base) { EVBUFFER_LOCK(buffer); - buffer->cb_queue = event_base_get_deferred_cb_queue_(base); + buffer->cb_queue = base; buffer->deferred_cbs = 1; event_deferred_cb_init_(base, &buffer->deferred, evbuffer_deferred_callback, buffer); @@ -521,7 +521,7 @@ evbuffer_invoke_callbacks_(struct evbuffer *buffer) } static void -evbuffer_deferred_callback(struct deferred_cb *cb, void *arg) +evbuffer_deferred_callback(struct event_callback *cb, void *arg) { struct bufferevent *parent = NULL; struct evbuffer *buffer = arg; diff --git a/bufferevent-internal.h b/bufferevent-internal.h index 5d7e98c0..63bf4708 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -183,7 +183,7 @@ struct bufferevent_private { int dns_error; /** Used to implement deferred callbacks */ - struct deferred_cb deferred; + struct event_callback deferred; /** The options this bufferevent was constructed with */ enum bufferevent_options options; diff --git a/bufferevent.c b/bufferevent.c index 9c023ad4..5906335c 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -131,7 +131,7 @@ bufferevent_inbuf_wm_cb(struct evbuffer *buf, } static void -bufferevent_run_deferred_callbacks_locked(struct deferred_cb *cb, void *arg) +bufferevent_run_deferred_callbacks_locked(struct event_callback *cb, void *arg) { struct bufferevent_private *bufev_private = arg; struct bufferevent *bufev = &bufev_private->bev; @@ -164,7 +164,7 @@ bufferevent_run_deferred_callbacks_locked(struct deferred_cb *cb, void *arg) } static void -bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *cb, void *arg) +bufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg) { struct bufferevent_private *bufev_private = arg; struct bufferevent *bufev = &bufev_private->bev; @@ -211,7 +211,7 @@ bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *cb, void *arg) #define SCHEDULE_DEFERRED(bevp) \ do { \ if (event_deferred_cb_schedule_( \ - event_base_get_deferred_cb_queue_((bevp)->bev.ev_base), \ + (bevp)->bev.ev_base, \ &(bevp)->deferred)) \ bufferevent_incref_(&(bevp)->bev); \ } while (0) diff --git a/defer-internal.h b/defer-internal.h index a4c88135..f2ed2823 100644 --- a/defer-internal.h +++ b/defer-internal.h @@ -35,26 +35,23 @@ extern "C" { #include -#define deferred_cb event_callback -#define deferred_cb_queue event_base struct event_callback; - typedef void (*deferred_cb_fn)(struct event_callback *, void *); /** - Initialize an empty, non-pending deferred_cb. + Initialize an empty, non-pending event_callback. - @param deferred The deferred_cb structure to initialize. - @param cb The function to run when the deferred_cb executes. + @param deferred The struct event_callback structure to initialize. + @param cb The function to run when the struct event_callback executes. @param arg The function's second argument. */ void event_deferred_cb_init_(struct event_base *base, struct event_callback *, deferred_cb_fn, void *); /** - Cancel a deferred_cb if it is currently scheduled in an event_base. + Cancel a struct event_callback if it is currently scheduled in an event_base. */ void event_deferred_cb_cancel_(struct event_base *, struct event_callback *); /** - Activate a deferred_cb if it is not currently scheduled in an event_base. + Activate a struct event_callback if it is not currently scheduled in an event_base. Return true iff it was not previously scheduled. */ @@ -64,7 +61,5 @@ int event_deferred_cb_schedule_(struct event_base *, struct event_callback *); } #endif -#define event_base_get_deferred_cb_queue_(x) (x) - #endif /* EVENT_INTERNAL_H_INCLUDED_ */ diff --git a/evbuffer-internal.h b/evbuffer-internal.h index 824739b1..5967b8e5 100644 --- a/evbuffer-internal.h +++ b/evbuffer-internal.h @@ -135,7 +135,7 @@ struct evbuffer { ev_uint32_t flags; /** Used to implement deferred callbacks. */ - struct deferred_cb_queue *cb_queue; + struct event_base *cb_queue; /** A reference count on this evbuffer. When the reference count * reaches 0, the buffer is destroyed. Manipulated with @@ -143,9 +143,9 @@ struct evbuffer { * evbuffer_free. */ int refcnt; - /** A deferred_cb handle to make all of this buffer's callbacks + /** A struct event_callback handle to make all of this buffer's callbacks * invoked from the event loop. */ - struct deferred_cb deferred; + struct event_callback deferred; /** A doubly-linked-list of callback functions */ LIST_HEAD(evbuffer_cb_queue, evbuffer_cb_entry) callbacks; diff --git a/evdns.c b/evdns.c index ff36fc3c..b815b9d0 100644 --- a/evdns.c +++ b/evdns.c @@ -754,7 +754,7 @@ evdns_requests_pump_waiting_queue(struct evdns_base *base) { /* TODO(nickm) document */ struct deferred_reply_callback { - struct deferred_cb deferred; + struct event_callback deferred; struct evdns_request *handle; u8 request_type; u8 have_reply; @@ -765,7 +765,7 @@ struct deferred_reply_callback { }; static void -reply_run_callback(struct deferred_cb *d, void *user_pointer) +reply_run_callback(struct event_callback *d, void *user_pointer) { struct deferred_reply_callback *cb = EVUTIL_UPCAST(d, struct deferred_reply_callback, deferred); @@ -840,7 +840,7 @@ reply_schedule_callback(struct request *const req, u32 ttl, u32 err, struct repl &d->deferred, reply_run_callback, req->user_pointer); event_deferred_cb_schedule_( - event_base_get_deferred_cb_queue_(req->base->event_base), + req->base->event_base, &d->deferred); } diff --git a/event.c b/event.c index 9a506c82..28fa8d0e 100644 --- a/event.c +++ b/event.c @@ -2128,7 +2128,7 @@ evthread_notify_base_eventfd(struct event_base *base) /** Tell the thread currently running the event_loop for base (if any) that it * needs to stop waiting in its dispatch function (if it is) and process all - * active events and deferred callbacks (if there are any). */ + * active callbacks. */ static int evthread_notify_base(struct event_base *base) { diff --git a/http-internal.h b/http-internal.h index e366c7c6..83aa6ef1 100644 --- a/http-internal.h +++ b/http-internal.h @@ -103,7 +103,7 @@ struct evhttp_connection { void (*closecb)(struct evhttp_connection *, void *); void *closecb_arg; - struct deferred_cb read_more_deferred_cb; + struct event_callback read_more_deferred_cb; struct event_base *base; struct evdns_base *dns_base; diff --git a/http.c b/http.c index e5262755..63224f8d 100644 --- a/http.c +++ b/http.c @@ -1017,7 +1017,7 @@ evhttp_read_body(struct evhttp_connection *evcon, struct evhttp_request *req) } #define get_deferred_queue(evcon) \ - (event_base_get_deferred_cb_queue_((evcon)->base)) + ((evcon)->base) /* * Gets called when more data becomes available @@ -1079,7 +1079,7 @@ evhttp_read_cb(struct bufferevent *bufev, void *arg) } static void -evhttp_deferred_read_cb(struct deferred_cb *cb, void *data) +evhttp_deferred_read_cb(struct event_callback *cb, void *data) { struct evhttp_connection *evcon = data; evhttp_read_cb(evcon->bufev, evcon); diff --git a/listener.c b/listener.c index 2cceff24..30fd4c11 100644 --- a/listener.c +++ b/listener.c @@ -440,7 +440,7 @@ struct accepting_socket { struct event_overlapped overlapped; SOCKET s; int error; - struct deferred_cb deferred; + struct event_callback deferred; struct evconnlistener_iocp *lev; ev_uint8_t buflen; ev_uint8_t family; @@ -450,7 +450,7 @@ struct accepting_socket { static void accepted_socket_cb(struct event_overlapped *o, ev_uintptr_t key, ev_ssize_t n, int ok); -static void accepted_socket_invoke_user_cb(struct deferred_cb *cb, void *arg); +static void accepted_socket_invoke_user_cb(struct event_callback *cb, void *arg); static void iocp_listener_event_add(struct evconnlistener_iocp *lev) @@ -566,7 +566,7 @@ start_accepting(struct accepting_socket *as) report_err: as->error = error; event_deferred_cb_schedule_( - event_base_get_deferred_cb_queue_(as->lev->event_base), + as->lev->event_base, &as->deferred); return 0; } @@ -581,7 +581,7 @@ stop_accepting(struct accepting_socket *as) } static void -accepted_socket_invoke_user_cb(struct deferred_cb *dcb, void *arg) +accepted_socket_invoke_user_cb(struct event_callback *dcb, void *arg) { struct accepting_socket *as = arg; @@ -658,7 +658,7 @@ accepted_socket_cb(struct event_overlapped *o, ev_uintptr_t key, ev_ssize_t n, i if (ok) { /* XXXX Don't do this if some EV_MT flag is set. */ event_deferred_cb_schedule_( - event_base_get_deferred_cb_queue_(as->lev->event_base), + as->lev->event_base, &as->deferred); LeaveCriticalSection(&as->lock); } else if (as->free_on_cb) { @@ -683,7 +683,7 @@ accepted_socket_cb(struct event_overlapped *o, ev_uintptr_t key, ev_ssize_t n, i as->error = WSAGetLastError(); } event_deferred_cb_schedule_( - event_base_get_deferred_cb_queue_(as->lev->event_base), + as->lev->event_base, &as->deferred); LeaveCriticalSection(&as->lock); } diff --git a/test/regress_thread.c b/test/regress_thread.c index 9fd49fa6..2c668329 100644 --- a/test/regress_thread.c +++ b/test/regress_thread.c @@ -415,8 +415,8 @@ SLEEP_MS(int ms) } struct deferred_test_data { - struct deferred_cb cbs[CB_COUNT]; - struct deferred_cb_queue *queue; + struct event_callback cbs[CB_COUNT]; + struct event_base *queue; }; static struct timeval timer_start = {0,0}; @@ -426,7 +426,7 @@ static THREAD_T load_threads[QUEUE_THREAD_COUNT]; static struct deferred_test_data deferred_data[QUEUE_THREAD_COUNT]; static void -deferred_callback(struct deferred_cb *cb, void *arg) +deferred_callback(struct event_callback *cb, void *arg) { SLEEP_MS(1); callback_count += 1; @@ -469,14 +469,11 @@ thread_deferred_cb_skew(void *arg) { struct basic_test_data *data = arg; struct timeval tv_timer = {1, 0}; - struct deferred_cb_queue *queue; + struct event_base *queue = data->base; struct timeval elapsed; int elapsed_usec; int i; - queue = event_base_get_deferred_cb_queue_(data->base); - tt_assert(queue); - for (i = 0; i < QUEUE_THREAD_COUNT; ++i) deferred_data[i].queue = queue; From f90e25593c29258e440e2d84e85bc5ddba4e16bd Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 8 May 2012 17:46:46 -0400 Subject: [PATCH 07/12] New event_get_priority() function to return an event's priority --- event.c | 7 +++++++ include/event2/event.h | 8 +++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/event.c b/event.c index 28fa8d0e..3bc16bab 100644 --- a/event.c +++ b/event.c @@ -2072,6 +2072,13 @@ event_get_callback_arg(const struct event *ev) return ev->ev_arg; } +int +event_get_priority(const struct event *ev) +{ + event_debug_assert_is_setup_(ev); + return ev->ev_pri; +} + int event_add(struct event *ev, const struct timeval *tv) { diff --git a/include/event2/event.h b/include/event2/event.h index f1331f62..1cfc3ce9 100644 --- a/include/event2/event.h +++ b/include/event2/event.h @@ -1124,6 +1124,12 @@ event_callback_fn event_get_callback(const struct event *ev); */ void *event_get_callback_arg(const struct event *ev); +/** + Return the priority of an event. + @see event_priority_init(), event_get_priority() +*/ +int event_get_priority(const struct event *ev); + /** Extract _all_ of arguments given to construct a given event. The event_base is copied into *base_out, the fd is copied into *fd_out, and so @@ -1226,7 +1232,7 @@ int event_base_get_npriorities(struct event_base *eb); @param ev an event struct @param priority the new priority to be assigned @return 0 if successful, or -1 if an error occurred - @see event_priority_init() + @see event_priority_init(), event_get_priority() */ int event_priority_set(struct event *, int); From bd395549a3a148aa143afda27aa03907c5e5c73c Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 8 May 2012 18:04:19 -0400 Subject: [PATCH 08/12] Add a bufferevent_get_priority() function --- bufferevent.c | 10 ++++++++++ include/event2/bufferevent.h | 6 ++++++ 2 files changed, 16 insertions(+) diff --git a/bufferevent.c b/bufferevent.c index 5906335c..f47153c9 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -397,6 +397,16 @@ bufferevent_get_base(struct bufferevent *bufev) return bufev->ev_base; } +int +bufferevent_get_priority(struct bufferevent *bufev) +{ + if (event_initialized(&bufev->ev_read)) { + return event_get_priority(&bufev->ev_read); + } else { + return event_base_get_npriorities(bufev->ev_base) / 2; + } +} + int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) { diff --git a/include/event2/bufferevent.h b/include/event2/bufferevent.h index 467b4e32..b3f9b1e2 100644 --- a/include/event2/bufferevent.h +++ b/include/event2/bufferevent.h @@ -273,6 +273,12 @@ struct event_base *bufferevent_get_base(struct bufferevent *bev); */ int bufferevent_priority_set(struct bufferevent *bufev, int pri); +/** + Return the priority of a bufferevent. + + Only supported for socket bufferevents + */ +int bufferevent_get_priority(struct bufferevent *bufev); /** Deallocate the storage associated with a bufferevent structure. From c46cb9c3a2ab8256fc1ae8035c4d967e16b69c04 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Wed, 9 May 2012 10:49:28 -0400 Subject: [PATCH 09/12] Make event_base_getnpriorities work with old "implicit base" code --- event.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/event.c b/event.c index 3bc16bab..616ad9da 100644 --- a/event.c +++ b/event.c @@ -1129,6 +1129,9 @@ event_base_get_npriorities(struct event_base *base) { int n; + if (base == NULL) + base = current_base; + EVBASE_ACQUIRE_LOCK(base, th_base_lock); n = base->nactivequeues; EVBASE_RELEASE_LOCK(base, th_base_lock); From 581b5beb98f45ec73ade6c8026f4fadef4325d4b Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Wed, 9 May 2012 10:50:07 -0400 Subject: [PATCH 10/12] Give event_base_process_active a single exit path --- event.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/event.c b/event.c index 616ad9da..2080d256 100644 --- a/event.c +++ b/event.c @@ -1533,8 +1533,7 @@ event_process_active(struct event_base *base) c = event_process_active_single_queue(base, activeq, maxcb, endtime); if (c < 0) { - base->event_running_priority = -1; - return -1; + goto done; } else if (c > 0) break; /* Processed a real event; do not * consider lower-priority events */ @@ -1543,7 +1542,9 @@ event_process_active(struct event_base *base) } } +done: base->event_running_priority = -1; + return c; } From c0e425abdcfc883fa70b6deafdf7327bfb75f02d Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Wed, 9 May 2012 11:06:06 -0400 Subject: [PATCH 11/12] Restore our priority-inversion-prevention code with deferreds Back when deferred_cb stuff had its own queue, the queue was always executed, but we never ran more than 16 callbacks per iteration. That made for two problems: 1: Because deferred_cb stuff would always run, and had no priority, it could cause priority inversion. 2: It doesn't respect the max_dispatch_interval code. Then, when I refactored deferred_cb to be a special case of event_callback, that solved the above issues, but made for two more issues: 3: Because deferred_cb stuff would always get the default priority, it could could low-priority bufferevents to get too much priority. 4: With code like bufferevent_pair, it's easy to get into a situation where two deferreds keep adding one another, preventing the event loop from ever actually scanning for more events. This commit fixes the above by giving deferreds a better notion of priorities, and by limiting the number of deferreds that can be added to the _current_ loop iteration's active queues. (Extra deferreds are put into the active_later state.) That isn't an all-purpose priority inversion solution, of course: for that, you may need to mess around with max_dispatch_interval. --- buffer.c | 3 ++- bufferevent.c | 4 ++-- bufferevent_pair.c | 3 ++- bufferevent_sock.c | 4 ++++ defer-internal.h | 7 ++++++- evdns.c | 6 ++++-- event-internal.h | 6 ++++++ event.c | 36 +++++++++++++++++++++++++++++------- http.c | 3 ++- listener.c | 3 ++- test/regress_thread.c | 28 +++++++++++++++++++++------- 11 files changed, 80 insertions(+), 23 deletions(-) diff --git a/buffer.c b/buffer.c index 99aa1d58..19897296 100644 --- a/buffer.c +++ b/buffer.c @@ -404,7 +404,8 @@ evbuffer_defer_callbacks(struct evbuffer *buffer, struct event_base *base) EVBUFFER_LOCK(buffer); buffer->cb_queue = base; buffer->deferred_cbs = 1; - event_deferred_cb_init_(base, &buffer->deferred, + event_deferred_cb_init_(&buffer->deferred, + event_base_get_npriorities(base) / 2, evbuffer_deferred_callback, buffer); EVBUFFER_UNLOCK(buffer); return 0; diff --git a/bufferevent.c b/bufferevent.c index f47153c9..98003ac7 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -324,14 +324,14 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private, if (options & BEV_OPT_DEFER_CALLBACKS) { if (options & BEV_OPT_UNLOCK_CALLBACKS) event_deferred_cb_init_( - bufev->ev_base, &bufev_private->deferred, + event_base_get_npriorities(base) / 2, bufferevent_run_deferred_callbacks_unlocked, bufev_private); else event_deferred_cb_init_( - bufev->ev_base, &bufev_private->deferred, + event_base_get_npriorities(base) / 2, bufferevent_run_deferred_callbacks_locked, bufev_private); } diff --git a/bufferevent_pair.c b/bufferevent_pair.c index da4b1117..16edad3d 100644 --- a/bufferevent_pair.c +++ b/bufferevent_pair.c @@ -260,8 +260,9 @@ be_pair_disable(struct bufferevent *bev, short events) if (events & EV_READ) { BEV_DEL_GENERIC_READ_TIMEOUT(bev); } - if (events & EV_WRITE) + if (events & EV_WRITE) { BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); + } return 0; } diff --git a/bufferevent_sock.c b/bufferevent_sock.c index 3f782155..50270be9 100644 --- a/bufferevent_sock.c +++ b/bufferevent_sock.c @@ -636,6 +636,8 @@ int bufferevent_priority_set(struct bufferevent *bufev, int priority) { int r = -1; + struct bufferevent_private *bufev_p = + EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); BEV_LOCK(bufev); if (bufev->be_ops != &bufferevent_ops_socket) @@ -646,6 +648,8 @@ bufferevent_priority_set(struct bufferevent *bufev, int priority) if (event_priority_set(&bufev->ev_write, priority) == -1) goto done; + event_deferred_cb_set_priority_(&bufev_p->deferred, priority); + r = 0; done: BEV_UNLOCK(bufev); diff --git a/defer-internal.h b/defer-internal.h index f2ed2823..d8cf32f4 100644 --- a/defer-internal.h +++ b/defer-internal.h @@ -42,10 +42,15 @@ typedef void (*deferred_cb_fn)(struct event_callback *, void *); Initialize an empty, non-pending event_callback. @param deferred The struct event_callback structure to initialize. + @param priority The priority that the callback should run at. @param cb The function to run when the struct event_callback executes. @param arg The function's second argument. */ -void event_deferred_cb_init_(struct event_base *base, struct event_callback *, deferred_cb_fn, void *); +void event_deferred_cb_init_(struct event_callback *, ev_uint8_t, deferred_cb_fn, void *); +/** + Change the priority of a non-pending event_callback. + */ +void event_deferred_cb_set_priority_(struct event_callback *, ev_uint8_t); /** Cancel a struct event_callback if it is currently scheduled in an event_base. */ diff --git a/evdns.c b/evdns.c index b815b9d0..ae86fe92 100644 --- a/evdns.c +++ b/evdns.c @@ -836,8 +836,10 @@ reply_schedule_callback(struct request *const req, u32 ttl, u32 err, struct repl d->handle = req->handle; } - event_deferred_cb_init_(req->base->event_base, - &d->deferred, reply_run_callback, + event_deferred_cb_init_( + &d->deferred, + event_get_priority(&req->timeout_event), + reply_run_callback, req->user_pointer); event_deferred_cb_schedule_( req->base->event_base, diff --git a/event-internal.h b/event-internal.h index 4757836c..2c908208 100644 --- a/event-internal.h +++ b/event-internal.h @@ -218,6 +218,12 @@ struct event_base { * reentrant invocation. */ int running_loop; + /** Set to the number of deferred_cbs we've made 'active' in the + * loop. This is a hack to prevent starvation; it would be smarter + * to just use event_config_set_max_dispatch_interval's max_callbacks + * feature */ + int n_deferreds_queued; + /* Active event management. */ /** An array of nactivequeues queues for active event_callbacks (ones * that have triggered, and whose callbacks need to be called). Low diff --git a/event.c b/event.c index 2080d256..56d12ab6 100644 --- a/event.c +++ b/event.c @@ -1072,8 +1072,8 @@ event_config_set_max_dispatch_interval(struct event_config *cfg, cfg->max_dispatch_interval.tv_sec = -1; cfg->max_dispatch_callbacks = max_callbacks >= 0 ? max_callbacks : INT_MAX; - if (min_priority <= 0) - min_priority = 1; + if (min_priority < 0) + min_priority = 0; cfg->limit_callbacks_after_prio = min_priority; return (0); } @@ -1683,6 +1683,7 @@ event_base_loop(struct event_base *base, int flags) while (!done) { base->event_continue = 0; + base->n_deferreds_queued = 0; /* Terminate the loop if we have been asked to */ if (base->event_gotterm) { @@ -2593,21 +2594,28 @@ event_callback_cancel_nolock_(struct event_base *base, case 0: break; } + + event_base_assert_ok_(base); + return 0; } void -event_deferred_cb_init_(struct event_base *base, struct event_callback *cb, deferred_cb_fn fn, void *arg) +event_deferred_cb_init_(struct event_callback *cb, ev_uint8_t priority, deferred_cb_fn fn, void *arg) { - if (!base) - base = current_base; memset(cb, 0, sizeof(*cb)); cb->evcb_cb_union.evcb_selfcb = fn; cb->evcb_arg = arg; - cb->evcb_pri = base->nactivequeues - 1; + cb->evcb_pri = priority; cb->evcb_closure = EV_CLOSURE_CB_SELF; } +void +event_deferred_cb_set_priority_(struct event_callback *cb, ev_uint8_t priority) +{ + cb->evcb_pri = priority; +} + void event_deferred_cb_cancel_(struct event_base *base, struct event_callback *cb) { @@ -2616,12 +2624,22 @@ event_deferred_cb_cancel_(struct event_base *base, struct event_callback *cb) event_callback_cancel_(base, cb); } +#define MAX_DEFERREDS_QUEUED 32 int event_deferred_cb_schedule_(struct event_base *base, struct event_callback *cb) { + int r = 1; if (!base) base = current_base; - return event_callback_activate_(base, cb); + EVBASE_ACQUIRE_LOCK(base, th_base_lock); + if (base->n_deferreds_queued > MAX_DEFERREDS_QUEUED) { + event_callback_activate_later_nolock_(base, cb); + } else { + ++base->n_deferreds_queued; + r = event_callback_activate_nolock_(base, cb); + } + EVBASE_RELEASE_LOCK(base, th_base_lock); + return r; } static int @@ -2868,6 +2886,7 @@ event_queue_insert_active(struct event_base *base, struct event_callback *evcb) evcb->evcb_flags |= EVLIST_ACTIVE; base->event_count_active++; + EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues); TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], evcb, evcb_active_next); } @@ -2884,6 +2903,7 @@ event_queue_insert_active_later(struct event_base *base, struct event_callback * INCR_EVENT_COUNT(base, evcb->evcb_flags); evcb->evcb_flags |= EVLIST_ACTIVE_LATER; base->event_count_active++; + EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues); TAILQ_INSERT_TAIL(&base->active_later_queue, evcb, evcb_active_next); } @@ -2920,7 +2940,9 @@ event_queue_make_later_events_active(struct event_base *base) while ((evcb = TAILQ_FIRST(&base->active_later_queue))) { TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next); evcb->evcb_flags = (evcb->evcb_flags & ~EVLIST_ACTIVE_LATER) | EVLIST_ACTIVE; + EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues); TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], evcb, evcb_active_next); + base->n_deferreds_queued += (evcb->evcb_closure == EV_CLOSURE_CB_SELF); } } diff --git a/http.c b/http.c index 63224f8d..936d4321 100644 --- a/http.c +++ b/http.c @@ -2197,8 +2197,9 @@ evhttp_connection_base_bufferevent_new(struct event_base *base, struct evdns_bas bufferevent_base_set(base, evcon->bufev); } - event_deferred_cb_init_(evcon->base, + event_deferred_cb_init_( &evcon->read_more_deferred_cb, + bufferevent_get_priority(bev), evhttp_deferred_read_cb, evcon); evcon->dns_base = dnsbase; diff --git a/listener.c b/listener.c index 30fd4c11..7cec0571 100644 --- a/listener.c +++ b/listener.c @@ -498,7 +498,8 @@ new_accepting_socket(struct evconnlistener_iocp *lev, int family) res->family = family; event_deferred_cb_init_(&res->deferred, - accepted_socket_invoke_user_cb, res); + event_base_get_npriorities(base) / 2, + accepted_socket_invoke_user_cb, res); InitializeCriticalSectionAndSpinCount(&res->lock, 1000); diff --git a/test/regress_thread.c b/test/regress_thread.c index 2c668329..24ac0ef6 100644 --- a/test/regress_thread.c +++ b/test/regress_thread.c @@ -439,7 +439,8 @@ load_deferred_queue(void *arg) size_t i; for (i = 0; i < CB_COUNT; ++i) { - event_deferred_cb_init_(data->queue, &data->cbs[i], deferred_callback, NULL); + event_deferred_cb_init_(&data->cbs[i], 0, deferred_callback, + NULL); event_deferred_cb_schedule_(data->queue, &data->cbs[i]); SLEEP_MS(1); } @@ -469,20 +470,28 @@ thread_deferred_cb_skew(void *arg) { struct basic_test_data *data = arg; struct timeval tv_timer = {1, 0}; - struct event_base *queue = data->base; + struct event_base *base = NULL; + struct event_config *cfg = NULL; struct timeval elapsed; int elapsed_usec; int i; + cfg = event_config_new(); + tt_assert(cfg); + event_config_set_max_dispatch_interval(cfg, NULL, 16, 0); + + base = event_base_new_with_config(cfg); + tt_assert(base); + for (i = 0; i < QUEUE_THREAD_COUNT; ++i) - deferred_data[i].queue = queue; + deferred_data[i].queue = base; evutil_gettimeofday(&timer_start, NULL); - event_base_once(data->base, -1, EV_TIMEOUT, timer_callback, NULL, + event_base_once(base, -1, EV_TIMEOUT, timer_callback, NULL, &tv_timer); - event_base_once(data->base, -1, EV_TIMEOUT, start_threads_callback, + event_base_once(base, -1, EV_TIMEOUT, start_threads_callback, NULL, NULL); - event_base_dispatch(data->base); + event_base_dispatch(base); evutil_timersub(&timer_end, &timer_start, &elapsed); TT_BLATHER(("callback count, %u", callback_count)); @@ -497,6 +506,10 @@ thread_deferred_cb_skew(void *arg) end: for (i = 0; i < QUEUE_THREAD_COUNT; ++i) THREAD_JOIN(load_threads[i]); + if (base) + event_base_free(base); + if (cfg) + event_config_free(cfg); } static struct event time_events[5]; @@ -580,7 +593,8 @@ struct testcase_t thread_testcases[] = { &basic_setup, (char*)"forking" }, #endif TEST(conditions_simple), - TEST(deferred_cb_skew), + { "deferred_cb_skew", thread_deferred_cb_skew, TT_FORK|TT_NEED_THREADS, + &basic_setup, NULL }, TEST(no_events), END_OF_TESTCASES }; From 7d6aa5ee68962f91de3690ce4bb39ec7c9776e9e Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Wed, 9 May 2012 12:05:07 -0400 Subject: [PATCH 12/12] Add an event_base_loopcontinue() to tell libevent to rescan right away --- event.c | 19 +++++++++++++++++++ include/event2/event.h | 19 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/event.c b/event.c index 56d12ab6..5cb5d28c 100644 --- a/event.c +++ b/event.c @@ -1619,6 +1619,25 @@ event_base_loopbreak(struct event_base *event_base) return r; } +int +event_base_loopcontinue(struct event_base *event_base) +{ + int r = 0; + if (event_base == NULL) + return (-1); + + EVBASE_ACQUIRE_LOCK(event_base, th_base_lock); + event_base->event_continue = 1; + + if (EVBASE_NEED_NOTIFY(event_base)) { + r = evthread_notify_base(event_base); + } else { + r = (0); + } + EVBASE_RELEASE_LOCK(event_base, th_base_lock); + return r; +} + int event_base_got_break(struct event_base *event_base) { diff --git a/include/event2/event.h b/include/event2/event.h index 1cfc3ce9..4f5a7c83 100644 --- a/include/event2/event.h +++ b/include/event2/event.h @@ -754,6 +754,25 @@ int event_base_loopexit(struct event_base *, const struct timeval *); */ int event_base_loopbreak(struct event_base *); +/** + Tell the active event_base_loop() to scan for new events immediately. + + Calling this function makes the currently active event_base_loop() + start the loop over again (scanning for new events) after the current + event callback finishes. If the event loop is not running, this + function has no effect. + + event_base_loopbreak() is typically invoked from this event's callback. + This behavior is analogous to the "continue;" statement. + + Subsequent invocations of event loop will proceed normally. + + @param eb the event_base structure returned by event_init() + @return 0 if successful, or -1 if an error occurred + @see event_base_loopbreak() + */ +int event_base_loopcontinue(struct event_base *); + /** Checks if the event loop was told to exit by event_loopexit().