diff --git a/buffer.c b/buffer.c index 99aa1d58..19897296 100644 --- a/buffer.c +++ b/buffer.c @@ -404,7 +404,8 @@ evbuffer_defer_callbacks(struct evbuffer *buffer, struct event_base *base) EVBUFFER_LOCK(buffer); buffer->cb_queue = base; buffer->deferred_cbs = 1; - event_deferred_cb_init_(base, &buffer->deferred, + event_deferred_cb_init_(&buffer->deferred, + event_base_get_npriorities(base) / 2, evbuffer_deferred_callback, buffer); EVBUFFER_UNLOCK(buffer); return 0; diff --git a/bufferevent.c b/bufferevent.c index f47153c9..98003ac7 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -324,14 +324,14 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private, if (options & BEV_OPT_DEFER_CALLBACKS) { if (options & BEV_OPT_UNLOCK_CALLBACKS) event_deferred_cb_init_( - bufev->ev_base, &bufev_private->deferred, + event_base_get_npriorities(base) / 2, bufferevent_run_deferred_callbacks_unlocked, bufev_private); else event_deferred_cb_init_( - bufev->ev_base, &bufev_private->deferred, + event_base_get_npriorities(base) / 2, bufferevent_run_deferred_callbacks_locked, bufev_private); } diff --git a/bufferevent_pair.c b/bufferevent_pair.c index da4b1117..16edad3d 100644 --- a/bufferevent_pair.c +++ b/bufferevent_pair.c @@ -260,8 +260,9 @@ be_pair_disable(struct bufferevent *bev, short events) if (events & EV_READ) { BEV_DEL_GENERIC_READ_TIMEOUT(bev); } - if (events & EV_WRITE) + if (events & EV_WRITE) { BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); + } return 0; } diff --git a/bufferevent_sock.c b/bufferevent_sock.c index 3f782155..50270be9 100644 --- a/bufferevent_sock.c +++ b/bufferevent_sock.c @@ -636,6 +636,8 @@ int bufferevent_priority_set(struct bufferevent *bufev, int priority) { int r = -1; + struct bufferevent_private *bufev_p = + EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); BEV_LOCK(bufev); if (bufev->be_ops != &bufferevent_ops_socket) @@ -646,6 +648,8 @@ bufferevent_priority_set(struct bufferevent *bufev, int priority) if (event_priority_set(&bufev->ev_write, priority) == -1) goto done; + event_deferred_cb_set_priority_(&bufev_p->deferred, priority); + r = 0; done: BEV_UNLOCK(bufev); diff --git a/defer-internal.h b/defer-internal.h index f2ed2823..d8cf32f4 100644 --- a/defer-internal.h +++ b/defer-internal.h @@ -42,10 +42,15 @@ typedef void (*deferred_cb_fn)(struct event_callback *, void *); Initialize an empty, non-pending event_callback. @param deferred The struct event_callback structure to initialize. + @param priority The priority that the callback should run at. @param cb The function to run when the struct event_callback executes. @param arg The function's second argument. */ -void event_deferred_cb_init_(struct event_base *base, struct event_callback *, deferred_cb_fn, void *); +void event_deferred_cb_init_(struct event_callback *, ev_uint8_t, deferred_cb_fn, void *); +/** + Change the priority of a non-pending event_callback. + */ +void event_deferred_cb_set_priority_(struct event_callback *, ev_uint8_t); /** Cancel a struct event_callback if it is currently scheduled in an event_base. */ diff --git a/evdns.c b/evdns.c index b815b9d0..ae86fe92 100644 --- a/evdns.c +++ b/evdns.c @@ -836,8 +836,10 @@ reply_schedule_callback(struct request *const req, u32 ttl, u32 err, struct repl d->handle = req->handle; } - event_deferred_cb_init_(req->base->event_base, - &d->deferred, reply_run_callback, + event_deferred_cb_init_( + &d->deferred, + event_get_priority(&req->timeout_event), + reply_run_callback, req->user_pointer); event_deferred_cb_schedule_( req->base->event_base, diff --git a/event-internal.h b/event-internal.h index 4757836c..2c908208 100644 --- a/event-internal.h +++ b/event-internal.h @@ -218,6 +218,12 @@ struct event_base { * reentrant invocation. */ int running_loop; + /** Set to the number of deferred_cbs we've made 'active' in the + * loop. This is a hack to prevent starvation; it would be smarter + * to just use event_config_set_max_dispatch_interval's max_callbacks + * feature */ + int n_deferreds_queued; + /* Active event management. */ /** An array of nactivequeues queues for active event_callbacks (ones * that have triggered, and whose callbacks need to be called). Low diff --git a/event.c b/event.c index 2080d256..56d12ab6 100644 --- a/event.c +++ b/event.c @@ -1072,8 +1072,8 @@ event_config_set_max_dispatch_interval(struct event_config *cfg, cfg->max_dispatch_interval.tv_sec = -1; cfg->max_dispatch_callbacks = max_callbacks >= 0 ? max_callbacks : INT_MAX; - if (min_priority <= 0) - min_priority = 1; + if (min_priority < 0) + min_priority = 0; cfg->limit_callbacks_after_prio = min_priority; return (0); } @@ -1683,6 +1683,7 @@ event_base_loop(struct event_base *base, int flags) while (!done) { base->event_continue = 0; + base->n_deferreds_queued = 0; /* Terminate the loop if we have been asked to */ if (base->event_gotterm) { @@ -2593,21 +2594,28 @@ event_callback_cancel_nolock_(struct event_base *base, case 0: break; } + + event_base_assert_ok_(base); + return 0; } void -event_deferred_cb_init_(struct event_base *base, struct event_callback *cb, deferred_cb_fn fn, void *arg) +event_deferred_cb_init_(struct event_callback *cb, ev_uint8_t priority, deferred_cb_fn fn, void *arg) { - if (!base) - base = current_base; memset(cb, 0, sizeof(*cb)); cb->evcb_cb_union.evcb_selfcb = fn; cb->evcb_arg = arg; - cb->evcb_pri = base->nactivequeues - 1; + cb->evcb_pri = priority; cb->evcb_closure = EV_CLOSURE_CB_SELF; } +void +event_deferred_cb_set_priority_(struct event_callback *cb, ev_uint8_t priority) +{ + cb->evcb_pri = priority; +} + void event_deferred_cb_cancel_(struct event_base *base, struct event_callback *cb) { @@ -2616,12 +2624,22 @@ event_deferred_cb_cancel_(struct event_base *base, struct event_callback *cb) event_callback_cancel_(base, cb); } +#define MAX_DEFERREDS_QUEUED 32 int event_deferred_cb_schedule_(struct event_base *base, struct event_callback *cb) { + int r = 1; if (!base) base = current_base; - return event_callback_activate_(base, cb); + EVBASE_ACQUIRE_LOCK(base, th_base_lock); + if (base->n_deferreds_queued > MAX_DEFERREDS_QUEUED) { + event_callback_activate_later_nolock_(base, cb); + } else { + ++base->n_deferreds_queued; + r = event_callback_activate_nolock_(base, cb); + } + EVBASE_RELEASE_LOCK(base, th_base_lock); + return r; } static int @@ -2868,6 +2886,7 @@ event_queue_insert_active(struct event_base *base, struct event_callback *evcb) evcb->evcb_flags |= EVLIST_ACTIVE; base->event_count_active++; + EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues); TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], evcb, evcb_active_next); } @@ -2884,6 +2903,7 @@ event_queue_insert_active_later(struct event_base *base, struct event_callback * INCR_EVENT_COUNT(base, evcb->evcb_flags); evcb->evcb_flags |= EVLIST_ACTIVE_LATER; base->event_count_active++; + EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues); TAILQ_INSERT_TAIL(&base->active_later_queue, evcb, evcb_active_next); } @@ -2920,7 +2940,9 @@ event_queue_make_later_events_active(struct event_base *base) while ((evcb = TAILQ_FIRST(&base->active_later_queue))) { TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next); evcb->evcb_flags = (evcb->evcb_flags & ~EVLIST_ACTIVE_LATER) | EVLIST_ACTIVE; + EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues); TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], evcb, evcb_active_next); + base->n_deferreds_queued += (evcb->evcb_closure == EV_CLOSURE_CB_SELF); } } diff --git a/http.c b/http.c index 63224f8d..936d4321 100644 --- a/http.c +++ b/http.c @@ -2197,8 +2197,9 @@ evhttp_connection_base_bufferevent_new(struct event_base *base, struct evdns_bas bufferevent_base_set(base, evcon->bufev); } - event_deferred_cb_init_(evcon->base, + event_deferred_cb_init_( &evcon->read_more_deferred_cb, + bufferevent_get_priority(bev), evhttp_deferred_read_cb, evcon); evcon->dns_base = dnsbase; diff --git a/listener.c b/listener.c index 30fd4c11..7cec0571 100644 --- a/listener.c +++ b/listener.c @@ -498,7 +498,8 @@ new_accepting_socket(struct evconnlistener_iocp *lev, int family) res->family = family; event_deferred_cb_init_(&res->deferred, - accepted_socket_invoke_user_cb, res); + event_base_get_npriorities(base) / 2, + accepted_socket_invoke_user_cb, res); InitializeCriticalSectionAndSpinCount(&res->lock, 1000); diff --git a/test/regress_thread.c b/test/regress_thread.c index 2c668329..24ac0ef6 100644 --- a/test/regress_thread.c +++ b/test/regress_thread.c @@ -439,7 +439,8 @@ load_deferred_queue(void *arg) size_t i; for (i = 0; i < CB_COUNT; ++i) { - event_deferred_cb_init_(data->queue, &data->cbs[i], deferred_callback, NULL); + event_deferred_cb_init_(&data->cbs[i], 0, deferred_callback, + NULL); event_deferred_cb_schedule_(data->queue, &data->cbs[i]); SLEEP_MS(1); } @@ -469,20 +470,28 @@ thread_deferred_cb_skew(void *arg) { struct basic_test_data *data = arg; struct timeval tv_timer = {1, 0}; - struct event_base *queue = data->base; + struct event_base *base = NULL; + struct event_config *cfg = NULL; struct timeval elapsed; int elapsed_usec; int i; + cfg = event_config_new(); + tt_assert(cfg); + event_config_set_max_dispatch_interval(cfg, NULL, 16, 0); + + base = event_base_new_with_config(cfg); + tt_assert(base); + for (i = 0; i < QUEUE_THREAD_COUNT; ++i) - deferred_data[i].queue = queue; + deferred_data[i].queue = base; evutil_gettimeofday(&timer_start, NULL); - event_base_once(data->base, -1, EV_TIMEOUT, timer_callback, NULL, + event_base_once(base, -1, EV_TIMEOUT, timer_callback, NULL, &tv_timer); - event_base_once(data->base, -1, EV_TIMEOUT, start_threads_callback, + event_base_once(base, -1, EV_TIMEOUT, start_threads_callback, NULL, NULL); - event_base_dispatch(data->base); + event_base_dispatch(base); evutil_timersub(&timer_end, &timer_start, &elapsed); TT_BLATHER(("callback count, %u", callback_count)); @@ -497,6 +506,10 @@ thread_deferred_cb_skew(void *arg) end: for (i = 0; i < QUEUE_THREAD_COUNT; ++i) THREAD_JOIN(load_threads[i]); + if (base) + event_base_free(base); + if (cfg) + event_config_free(cfg); } static struct event time_events[5]; @@ -580,7 +593,8 @@ struct testcase_t thread_testcases[] = { &basic_setup, (char*)"forking" }, #endif TEST(conditions_simple), - TEST(deferred_cb_skew), + { "deferred_cb_skew", thread_deferred_cb_skew, TT_FORK|TT_NEED_THREADS, + &basic_setup, NULL }, TEST(no_events), END_OF_TESTCASES };