diff --git a/buffer.c b/buffer.c index b28c9361..19897296 100644 --- a/buffer.c +++ b/buffer.c @@ -142,7 +142,7 @@ static void evbuffer_chain_align(struct evbuffer_chain *chain); static int evbuffer_chain_should_realign(struct evbuffer_chain *chain, size_t datalen); -static void evbuffer_deferred_callback(struct deferred_cb *cb, void *arg); +static void evbuffer_deferred_callback(struct event_callback *cb, void *arg); static int evbuffer_ptr_memcmp(const struct evbuffer *buf, const struct evbuffer_ptr *pos, const char *mem, size_t len); static struct evbuffer_chain *evbuffer_expand_singlechain(struct evbuffer *buf, @@ -402,9 +402,10 @@ int evbuffer_defer_callbacks(struct evbuffer *buffer, struct event_base *base) { EVBUFFER_LOCK(buffer); - buffer->cb_queue = event_base_get_deferred_cb_queue_(base); + buffer->cb_queue = base; buffer->deferred_cbs = 1; event_deferred_cb_init_(&buffer->deferred, + event_base_get_npriorities(base) / 2, evbuffer_deferred_callback, buffer); EVBUFFER_UNLOCK(buffer); return 0; @@ -509,20 +510,19 @@ evbuffer_invoke_callbacks_(struct evbuffer *buffer) } if (buffer->deferred_cbs) { - if (buffer->deferred.queued) - return; - evbuffer_incref_and_lock_(buffer); - if (buffer->parent) - bufferevent_incref_(buffer->parent); + if (event_deferred_cb_schedule_(buffer->cb_queue, &buffer->deferred)) { + evbuffer_incref_and_lock_(buffer); + if (buffer->parent) + bufferevent_incref_(buffer->parent); + } EVBUFFER_UNLOCK(buffer); - event_deferred_cb_schedule_(buffer->cb_queue, &buffer->deferred); } evbuffer_run_callbacks(buffer, 0); } static void -evbuffer_deferred_callback(struct deferred_cb *cb, void *arg) +evbuffer_deferred_callback(struct event_callback *cb, void *arg) { struct bufferevent *parent = NULL; struct evbuffer *buffer = arg; diff --git a/bufferevent-internal.h b/bufferevent-internal.h index 461c46e7..63bf4708 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -31,6 +31,7 @@ extern "C" { #endif #include "event2/event-config.h" +#include "event2/event_struct.h" #include "evconfig-private.h" #include "event2/util.h" #include "defer-internal.h" @@ -182,7 +183,7 @@ struct bufferevent_private { int dns_error; /** Used to implement deferred callbacks */ - struct deferred_cb deferred; + struct event_callback deferred; /** The options this bufferevent was constructed with */ enum bufferevent_options options; diff --git a/bufferevent.c b/bufferevent.c index 0d4b01d6..98003ac7 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -131,7 +131,7 @@ bufferevent_inbuf_wm_cb(struct evbuffer *buf, } static void -bufferevent_run_deferred_callbacks_locked(struct deferred_cb *cb, void *arg) +bufferevent_run_deferred_callbacks_locked(struct event_callback *cb, void *arg) { struct bufferevent_private *bufev_private = arg; struct bufferevent *bufev = &bufev_private->bev; @@ -164,7 +164,7 @@ bufferevent_run_deferred_callbacks_locked(struct deferred_cb *cb, void *arg) } static void -bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *cb, void *arg) +bufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg) { struct bufferevent_private *bufev_private = arg; struct bufferevent *bufev = &bufev_private->bev; @@ -210,10 +210,10 @@ bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *cb, void *arg) #define SCHEDULE_DEFERRED(bevp) \ do { \ - bufferevent_incref_(&(bevp)->bev); \ - event_deferred_cb_schedule_( \ - event_base_get_deferred_cb_queue_((bevp)->bev.ev_base), \ - &(bevp)->deferred); \ + if (event_deferred_cb_schedule_( \ + (bevp)->bev.ev_base, \ + &(bevp)->deferred)) \ + bufferevent_incref_(&(bevp)->bev); \ } while (0) @@ -227,8 +227,7 @@ bufferevent_run_readcb_(struct bufferevent *bufev) return; if (p->options & BEV_OPT_DEFER_CALLBACKS) { p->readcb_pending = 1; - if (!p->deferred.queued) - SCHEDULE_DEFERRED(p); + SCHEDULE_DEFERRED(p); } else { bufev->readcb(bufev, bufev->cbarg); } @@ -244,8 +243,7 @@ bufferevent_run_writecb_(struct bufferevent *bufev) return; if (p->options & BEV_OPT_DEFER_CALLBACKS) { p->writecb_pending = 1; - if (!p->deferred.queued) - SCHEDULE_DEFERRED(p); + SCHEDULE_DEFERRED(p); } else { bufev->writecb(bufev, bufev->cbarg); } @@ -262,8 +260,7 @@ bufferevent_run_eventcb_(struct bufferevent *bufev, short what) if (p->options & BEV_OPT_DEFER_CALLBACKS) { p->eventcb_pending |= what; p->errno_pending = EVUTIL_SOCKET_ERROR(); - if (!p->deferred.queued) - SCHEDULE_DEFERRED(p); + SCHEDULE_DEFERRED(p); } else { bufev->errorcb(bufev, what, bufev->cbarg); } @@ -326,11 +323,15 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private, } if (options & BEV_OPT_DEFER_CALLBACKS) { if (options & BEV_OPT_UNLOCK_CALLBACKS) - event_deferred_cb_init_(&bufev_private->deferred, + event_deferred_cb_init_( + &bufev_private->deferred, + event_base_get_npriorities(base) / 2, bufferevent_run_deferred_callbacks_unlocked, bufev_private); else - event_deferred_cb_init_(&bufev_private->deferred, + event_deferred_cb_init_( + &bufev_private->deferred, + event_base_get_npriorities(base) / 2, bufferevent_run_deferred_callbacks_locked, bufev_private); } @@ -396,6 +397,16 @@ bufferevent_get_base(struct bufferevent *bufev) return bufev->ev_base; } +int +bufferevent_get_priority(struct bufferevent *bufev) +{ + if (event_initialized(&bufev->ev_read)) { + return event_get_priority(&bufev->ev_read); + } else { + return event_base_get_npriorities(bufev->ev_base) / 2; + } +} + int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) { diff --git a/bufferevent_pair.c b/bufferevent_pair.c index da4b1117..16edad3d 100644 --- a/bufferevent_pair.c +++ b/bufferevent_pair.c @@ -260,8 +260,9 @@ be_pair_disable(struct bufferevent *bev, short events) if (events & EV_READ) { BEV_DEL_GENERIC_READ_TIMEOUT(bev); } - if (events & EV_WRITE) + if (events & EV_WRITE) { BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); + } return 0; } diff --git a/bufferevent_sock.c b/bufferevent_sock.c index 3f782155..50270be9 100644 --- a/bufferevent_sock.c +++ b/bufferevent_sock.c @@ -636,6 +636,8 @@ int bufferevent_priority_set(struct bufferevent *bufev, int priority) { int r = -1; + struct bufferevent_private *bufev_p = + EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); BEV_LOCK(bufev); if (bufev->be_ops != &bufferevent_ops_socket) @@ -646,6 +648,8 @@ bufferevent_priority_set(struct bufferevent *bufev, int priority) if (event_priority_set(&bufev->ev_write, priority) == -1) goto done; + event_deferred_cb_set_priority_(&bufev_p->deferred, priority); + r = 0; done: BEV_UNLOCK(bufev); diff --git a/defer-internal.h b/defer-internal.h index 114a9dc2..d8cf32f4 100644 --- a/defer-internal.h +++ b/defer-internal.h @@ -35,72 +35,36 @@ extern "C" { #include -struct deferred_cb; - -typedef void (*deferred_cb_fn)(struct deferred_cb *, void *); - -/** A deferred_cb is a callback that can be scheduled to run as part of - * an event_base's event_loop, rather than running immediately. */ -struct deferred_cb { - /** Links to the adjacent active (pending) deferred_cb objects. */ - TAILQ_ENTRY (deferred_cb) cb_next; - /** True iff this deferred_cb is pending in an event_base. */ - unsigned queued : 1; - /** The function to execute when the callback runs. */ - deferred_cb_fn cb; - /** The function's second argument. */ - void *arg; -}; - -/** A deferred_cb_queue is a list of deferred_cb that we can add to and run. */ -struct deferred_cb_queue { - /** Lock used to protect the queue. */ - void *lock; - - /** Which event_base does this queue associate itself with? - * (Used for timing) */ - struct event_base *base; - - /** How many entries are in the queue? */ - int active_count; - - /** Function called when adding to the queue from another thread. */ - void (*notify_fn)(struct deferred_cb_queue *, void *); - void *notify_arg; - - /** Deferred callback management: a list of deferred callbacks to - * run active the active events. */ - TAILQ_HEAD (deferred_cb_list, deferred_cb) deferred_cb_list; -}; +struct event_callback; +typedef void (*deferred_cb_fn)(struct event_callback *, void *); /** - Initialize an empty, non-pending deferred_cb. + Initialize an empty, non-pending event_callback. - @param deferred The deferred_cb structure to initialize. - @param cb The function to run when the deferred_cb executes. + @param deferred The struct event_callback structure to initialize. + @param priority The priority that the callback should run at. + @param cb The function to run when the struct event_callback executes. @param arg The function's second argument. */ -void event_deferred_cb_init_(struct deferred_cb *, deferred_cb_fn, void *); +void event_deferred_cb_init_(struct event_callback *, ev_uint8_t, deferred_cb_fn, void *); /** - Cancel a deferred_cb if it is currently scheduled in an event_base. + Change the priority of a non-pending event_callback. */ -void event_deferred_cb_cancel_(struct deferred_cb_queue *, struct deferred_cb *); +void event_deferred_cb_set_priority_(struct event_callback *, ev_uint8_t); /** - Activate a deferred_cb if it is not currently scheduled in an event_base. + Cancel a struct event_callback if it is currently scheduled in an event_base. */ -void event_deferred_cb_schedule_(struct deferred_cb_queue *, struct deferred_cb *); +void event_deferred_cb_cancel_(struct event_base *, struct event_callback *); +/** + Activate a struct event_callback if it is not currently scheduled in an event_base. -#define LOCK_DEFERRED_QUEUE(q) \ - EVLOCK_LOCK((q)->lock, 0) -#define UNLOCK_DEFERRED_QUEUE(q) \ - EVLOCK_UNLOCK((q)->lock, 0) + Return true iff it was not previously scheduled. + */ +int event_deferred_cb_schedule_(struct event_base *, struct event_callback *); #ifdef __cplusplus } #endif -void event_deferred_cb_queue_init_(struct deferred_cb_queue *); -struct deferred_cb_queue *event_base_get_deferred_cb_queue_(struct event_base *); - #endif /* EVENT_INTERNAL_H_INCLUDED_ */ diff --git a/evbuffer-internal.h b/evbuffer-internal.h index 480c35a3..5967b8e5 100644 --- a/evbuffer-internal.h +++ b/evbuffer-internal.h @@ -34,6 +34,7 @@ extern "C" { #include "event2/event-config.h" #include "evconfig-private.h" #include "event2/util.h" +#include "event2/event_struct.h" #include "util-internal.h" #include "defer-internal.h" @@ -134,7 +135,7 @@ struct evbuffer { ev_uint32_t flags; /** Used to implement deferred callbacks. */ - struct deferred_cb_queue *cb_queue; + struct event_base *cb_queue; /** A reference count on this evbuffer. When the reference count * reaches 0, the buffer is destroyed. Manipulated with @@ -142,9 +143,9 @@ struct evbuffer { * evbuffer_free. */ int refcnt; - /** A deferred_cb handle to make all of this buffer's callbacks + /** A struct event_callback handle to make all of this buffer's callbacks * invoked from the event loop. */ - struct deferred_cb deferred; + struct event_callback deferred; /** A doubly-linked-list of callback functions */ LIST_HEAD(evbuffer_cb_queue, evbuffer_cb_entry) callbacks; diff --git a/evdns.c b/evdns.c index 41d8b6b2..ae86fe92 100644 --- a/evdns.c +++ b/evdns.c @@ -754,7 +754,7 @@ evdns_requests_pump_waiting_queue(struct evdns_base *base) { /* TODO(nickm) document */ struct deferred_reply_callback { - struct deferred_cb deferred; + struct event_callback deferred; struct evdns_request *handle; u8 request_type; u8 have_reply; @@ -765,7 +765,7 @@ struct deferred_reply_callback { }; static void -reply_run_callback(struct deferred_cb *d, void *user_pointer) +reply_run_callback(struct event_callback *d, void *user_pointer) { struct deferred_reply_callback *cb = EVUTIL_UPCAST(d, struct deferred_reply_callback, deferred); @@ -836,10 +836,13 @@ reply_schedule_callback(struct request *const req, u32 ttl, u32 err, struct repl d->handle = req->handle; } - event_deferred_cb_init_(&d->deferred, reply_run_callback, + event_deferred_cb_init_( + &d->deferred, + event_get_priority(&req->timeout_event), + reply_run_callback, req->user_pointer); event_deferred_cb_schedule_( - event_base_get_deferred_cb_queue_(req->base->event_base), + req->base->event_base, &d->deferred); } diff --git a/event-internal.h b/event-internal.h index 60b58c6c..2c908208 100644 --- a/event-internal.h +++ b/event-internal.h @@ -53,10 +53,17 @@ extern "C" { #define ev_ncalls ev_.ev_signal.ev_ncalls #define ev_pncalls ev_.ev_signal.ev_pncalls -/* Possible values for ev_closure in struct event. */ -#define EV_CLOSURE_NONE 0 -#define EV_CLOSURE_SIGNAL 1 -#define EV_CLOSURE_PERSIST 2 +#define ev_pri ev_evcallback.evcb_pri +#define ev_flags ev_evcallback.evcb_flags +#define ev_closure ev_evcallback.evcb_closure +#define ev_callback ev_evcallback.evcb_cb_union.evcb_callback +#define ev_arg ev_evcallback.evcb_arg + +/* Possible values for evcb_closure in struct event_callback */ +#define EV_CLOSURE_EVENT 0 +#define EV_CLOSURE_EVENT_SIGNAL 1 +#define EV_CLOSURE_EVENT_PERSIST 2 +#define EV_CLOSURE_CB_SELF 3 /** Structure to define the backend of a given event_base. */ struct eventop { @@ -170,6 +177,8 @@ extern int event_debug_mode_on_; #define EVENT_DEBUG_MODE_IS_ON() (0) #endif +TAILQ_HEAD(evcallback_list, event_callback); + struct event_base { /** Function pointers and other data to describe this event_base's * backend. */ @@ -209,14 +218,23 @@ struct event_base { * reentrant invocation. */ int running_loop; + /** Set to the number of deferred_cbs we've made 'active' in the + * loop. This is a hack to prevent starvation; it would be smarter + * to just use event_config_set_max_dispatch_interval's max_callbacks + * feature */ + int n_deferreds_queued; + /* Active event management. */ - /** An array of nactivequeues queues for active events (ones that - * have triggered, and whose callbacks need to be called). Low + /** An array of nactivequeues queues for active event_callbacks (ones + * that have triggered, and whose callbacks need to be called). Low * priority numbers are more important, and stall higher ones. */ - struct event_list *activequeues; + struct evcallback_list *activequeues; /** The length of the activequeues array */ int nactivequeues; + /** A list of event_callbacks that should become active the next time + * we process events, but not this time. */ + struct evcallback_list active_later_queue; /* common timeout logic */ @@ -228,10 +246,6 @@ struct event_base { /** The total size of common_timeout_queues. */ int n_common_timeouts_allocated; - /** List of defered_cb that are active. We run these after the active - * events. */ - struct deferred_cb_queue defer_queue; - /** Mapping from file descriptors to enabled (added) events */ struct event_io_map io; @@ -266,7 +280,7 @@ struct event_base { int current_event_waiters; #endif /** The event whose callback is executing right now */ - struct event *current_event; + struct event_callback *current_event; #ifdef _WIN32 /** IOCP support structure, if IOCP is enabled. */ @@ -347,7 +361,7 @@ struct event_config { #endif /* TAILQ_FOREACH */ #define N_ACTIVE_CALLBACKS(base) \ - ((base)->event_count_active + (base)->defer_queue.active_count) + ((base)->event_count_active) int evsig_set_handler_(struct event_base *base, int evsignal, void (*fn)(int)); @@ -355,6 +369,19 @@ int evsig_restore_handler_(struct event_base *base, int evsignal); void event_active_nolock_(struct event *ev, int res, short count); +int event_callback_activate_(struct event_base *, struct event_callback *); +int event_callback_activate_nolock_(struct event_base *, struct event_callback *); +int event_callback_cancel_(struct event_base *base, + struct event_callback *evcb); + +void event_active_later_(struct event *ev, int res); +void event_active_later_nolock_(struct event *ev, int res); +void event_callback_activate_later_nolock_(struct event_base *base, + struct event_callback *evcb); +int event_callback_cancel_nolock_(struct event_base *base, + struct event_callback *evcb); +void event_callback_init_(struct event_base *base, + struct event_callback *cb); /* FIXME document. */ void event_base_add_virtual_(struct event_base *base); diff --git a/event.c b/event.c index c5c118cc..5cb5d28c 100644 --- a/event.c +++ b/event.c @@ -135,12 +135,16 @@ static inline int event_add_internal(struct event *ev, const struct timeval *tv, int tv_is_absolute); static inline int event_del_internal(struct event *ev); -static void event_queue_insert_active(struct event_base *, struct event *); +static void event_queue_insert_active(struct event_base *, struct event_callback *); +static void event_queue_insert_active_later(struct event_base *, struct event_callback *); static void event_queue_insert_timeout(struct event_base *, struct event *); static void event_queue_insert_inserted(struct event_base *, struct event *); -static void event_queue_remove_active(struct event_base *, struct event *); +static void event_queue_remove_active(struct event_base *, struct event_callback *); +static void event_queue_remove_active_later(struct event_base *, struct event_callback *); static void event_queue_remove_timeout(struct event_base *, struct event *); static void event_queue_remove_inserted(struct event_base *, struct event *); +static void event_queue_make_later_events_active(struct event_base *base); + #ifdef USE_REINSERT_TIMEOUT /* This code seems buggy; only turn it on if we find out what the trouble is. */ static void event_queue_reinsert_timeout(struct event_base *,struct event *, int was_common, int is_common, int old_timeout_idx); @@ -424,6 +428,19 @@ event_base_update_cache_time(struct event_base *base) return 0; } +static inline struct event * +event_callback_to_event(struct event_callback *evcb) +{ + EVUTIL_ASSERT((evcb->evcb_flags & EVLIST_INIT)); + return EVUTIL_UPCAST(evcb, struct event, ev_evcallback); +} + +static inline struct event_callback * +event_to_event_callback(struct event *ev) +{ + return &ev->ev_evcallback; +} + struct event_base * event_init(void) { @@ -489,28 +506,6 @@ event_base_get_features(const struct event_base *base) return base->evsel->features; } -void -event_deferred_cb_queue_init_(struct deferred_cb_queue *cb) -{ - memset(cb, 0, sizeof(struct deferred_cb_queue)); - TAILQ_INIT(&cb->deferred_cb_list); -} - -/** Helper for the deferred_cb queue: wake up the event base. */ -static void -notify_base_cbq_callback(struct deferred_cb_queue *cb, void *baseptr) -{ - struct event_base *base = baseptr; - if (EVBASE_NEED_NOTIFY(base)) - evthread_notify_base(base); -} - -struct deferred_cb_queue * -event_base_get_deferred_cb_queue_(struct event_base *base) -{ - return base ? &base->defer_queue : NULL; -} - void event_enable_debug_mode(void) { @@ -588,10 +583,7 @@ event_base_new_with_config(const struct event_config *cfg) base->th_notify_fd[0] = -1; base->th_notify_fd[1] = -1; - event_deferred_cb_queue_init_(&base->defer_queue); - base->defer_queue.base = base; - base->defer_queue.notify_fn = notify_base_cbq_callback; - base->defer_queue.notify_arg = base; + TAILQ_INIT(&base->active_later_queue); evmap_io_initmap_(&base->io); evmap_signal_initmap_(&base->sigmap); @@ -663,7 +655,6 @@ event_base_new_with_config(const struct event_config *cfg) int r; EVTHREAD_ALLOC_LOCK(base->th_base_lock, EVTHREAD_LOCKTYPE_RECURSIVE); - base->defer_queue.lock = base->th_base_lock; EVTHREAD_ALLOC_COND(base->current_event_cond); r = evthread_make_base_notifiable(base); if (r<0) { @@ -777,15 +768,36 @@ event_base_free(struct event_base *base) mm_free(base->common_timeout_queues); for (i = 0; i < base->nactivequeues; ++i) { - for (ev = TAILQ_FIRST(&base->activequeues[i]); ev; ) { - struct event *next = TAILQ_NEXT(ev, ev_active_next); - if (!(ev->ev_flags & EVLIST_INTERNAL)) { - event_del(ev); + struct event_callback *evcb, *next; + for (evcb = TAILQ_FIRST(&base->activequeues[i]); evcb; ) { + next = TAILQ_NEXT(evcb, evcb_active_next); + if (evcb->evcb_flags & EVLIST_INIT) { + ev = event_callback_to_event(evcb); + if (!(ev->ev_flags & EVLIST_INTERNAL)) { + event_del(ev); + ++n_deleted; + } + } else { + event_callback_cancel_(base, evcb); ++n_deleted; } - ev = next; + evcb = next; } } + { + struct event_callback *evcb; + while ((evcb = TAILQ_FIRST(&base->active_later_queue))) { + if (evcb->evcb_flags & EVLIST_INIT) { + ev = event_callback_to_event(evcb); + event_del(ev); + ++n_deleted; + } else { + event_callback_cancel_(base, evcb); + ++n_deleted; + } + } + } + if (n_deleted) event_debug(("%s: %d events were still set in base", @@ -1060,8 +1072,8 @@ event_config_set_max_dispatch_interval(struct event_config *cfg, cfg->max_dispatch_interval.tv_sec = -1; cfg->max_dispatch_callbacks = max_callbacks >= 0 ? max_callbacks : INT_MAX; - if (min_priority <= 0) - min_priority = 1; + if (min_priority < 0) + min_priority = 0; cfg->limit_callbacks_after_prio = min_priority; return (0); } @@ -1093,8 +1105,8 @@ event_base_priority_init(struct event_base *base, int npriorities) } /* Allocate our priority queues */ - base->activequeues = (struct event_list *) - mm_calloc(npriorities, sizeof(struct event_list)); + base->activequeues = (struct evcallback_list *) + mm_calloc(npriorities, sizeof(struct evcallback_list)); if (base->activequeues == NULL) { event_warn("%s: calloc", __func__); goto err; @@ -1117,6 +1129,9 @@ event_base_get_npriorities(struct event_base *base) { int n; + if (base == NULL) + base = current_base; + EVBASE_ACQUIRE_LOCK(base, th_base_lock); n = base->nactivequeues; EVBASE_RELEASE_LOCK(base, th_base_lock); @@ -1393,51 +1408,67 @@ event_persist_closure(struct event_base *base, struct event *ev) releasing the lock as we go. This function requires that the lock be held when it's invoked. Returns -1 if we get a signal or an event_break that means we should stop processing any active events now. Otherwise returns - the number of non-internal events that we processed. + the number of non-internal event_callbacks that we processed. */ static int event_process_active_single_queue(struct event_base *base, - struct event_list *activeq, + struct evcallback_list *activeq, int max_to_process, const struct timeval *endtime) { - struct event *ev; + struct event_callback *evcb; int count = 0; EVUTIL_ASSERT(activeq != NULL); - for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) { - if (ev->ev_events & EV_PERSIST) - event_queue_remove_active(base, ev); - else - event_del_internal(ev); - if (!(ev->ev_flags & EVLIST_INTERNAL)) + for (evcb = TAILQ_FIRST(activeq); evcb; evcb = TAILQ_FIRST(activeq)) { + struct event *ev=NULL; + if (evcb->evcb_flags & EVLIST_INIT) { + ev = event_callback_to_event(evcb); + + if (ev->ev_events & EV_PERSIST) + event_queue_remove_active(base, evcb); + else + event_del_internal(ev); + event_debug(( + "event_process_active: event: %p, %s%scall %p", + ev, + ev->ev_res & EV_READ ? "EV_READ " : " ", + ev->ev_res & EV_WRITE ? "EV_WRITE " : " ", + ev->ev_callback)); + } else { + event_queue_remove_active(base, evcb); + event_debug(("event_process_active: event_callback %p, " + "closure %d, call %p", + evcb, evcb->evcb_closure, evcb->evcb_cb_union.evcb_callback)); + } + + if (!(evcb->evcb_flags & EVLIST_INTERNAL)) ++count; - event_debug(( - "event_process_active: event: %p, %s%scall %p", - ev, - ev->ev_res & EV_READ ? "EV_READ " : " ", - ev->ev_res & EV_WRITE ? "EV_WRITE " : " ", - ev->ev_callback)); - base->current_event = ev; + base->current_event = evcb; #ifndef EVENT__DISABLE_THREAD_SUPPORT base->current_event_waiters = 0; #endif - switch (ev->ev_closure) { - case EV_CLOSURE_SIGNAL: + switch (evcb->evcb_closure) { + case EV_CLOSURE_EVENT_SIGNAL: event_signal_closure(base, ev); break; - case EV_CLOSURE_PERSIST: + case EV_CLOSURE_EVENT_PERSIST: event_persist_closure(base, ev); break; - default: - case EV_CLOSURE_NONE: + case EV_CLOSURE_EVENT: EVBASE_RELEASE_LOCK(base, th_base_lock); (*ev->ev_callback)( ev->ev_fd, ev->ev_res, ev->ev_arg); break; + case EV_CLOSURE_CB_SELF: + EVBASE_RELEASE_LOCK(base, th_base_lock); + evcb->evcb_cb_union.evcb_selfcb(evcb, evcb->evcb_arg); + break; + default: + EVUTIL_ASSERT(0); } EVBASE_ACQUIRE_LOCK(base, th_base_lock); @@ -1466,47 +1497,6 @@ event_process_active_single_queue(struct event_base *base, return count; } -/* - Process up to MAX_DEFERRED of the defered_cb entries in 'queue'. If - *breakptr becomes set to 1, stop. Requires that we start out holding - the lock on 'queue'; releases the lock around 'queue' for each deferred_cb - we process. - */ -static int -event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr, - int max_to_process, const struct timeval *endtime) -{ - int count = 0; - struct deferred_cb *cb; -#define MAX_DEFERRED 16 - if (max_to_process > MAX_DEFERRED) - max_to_process = MAX_DEFERRED; -#undef MAX_DEFERRED - - while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) { - cb->queued = 0; - TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next); - --queue->active_count; - UNLOCK_DEFERRED_QUEUE(queue); - - cb->cb(cb, cb->arg); - - LOCK_DEFERRED_QUEUE(queue); - if (*breakptr) - return -1; - if (++count >= max_to_process) - break; - if (endtime) { - struct timeval now; - update_time_cache(queue->base); - gettime(queue->base, &now); - if (evutil_timercmp(&now, endtime, >=)) - return count; - } - } - return count; -} - /* * Active events are stored in priority queues. Lower priorities are always * process before higher priorities. Low priority events can starve high @@ -1517,7 +1507,7 @@ static int event_process_active(struct event_base *base) { /* Caller must hold th_base_lock */ - struct event_list *activeq = NULL; + struct evcallback_list *activeq = NULL; int i, c = 0; const struct timeval *endtime; struct timeval tv; @@ -1543,8 +1533,7 @@ event_process_active(struct event_base *base) c = event_process_active_single_queue(base, activeq, maxcb, endtime); if (c < 0) { - base->event_running_priority = -1; - return -1; + goto done; } else if (c > 0) break; /* Processed a real event; do not * consider lower-priority events */ @@ -1553,9 +1542,9 @@ event_process_active(struct event_base *base) } } - event_process_deferred_callbacks(&base->defer_queue,&base->event_break, - maxcb-c, endtime); +done: base->event_running_priority = -1; + return c; } @@ -1630,6 +1619,25 @@ event_base_loopbreak(struct event_base *event_base) return r; } +int +event_base_loopcontinue(struct event_base *event_base) +{ + int r = 0; + if (event_base == NULL) + return (-1); + + EVBASE_ACQUIRE_LOCK(event_base, th_base_lock); + event_base->event_continue = 1; + + if (EVBASE_NEED_NOTIFY(event_base)) { + r = evthread_notify_base(event_base); + } else { + r = (0); + } + EVBASE_RELEASE_LOCK(event_base, th_base_lock); + return r; +} + int event_base_got_break(struct event_base *event_base) { @@ -1694,6 +1702,7 @@ event_base_loop(struct event_base *base, int flags) while (!done) { base->event_continue = 0; + base->n_deferreds_queued = 0; /* Terminate the loop if we have been asked to */ if (base->event_gotterm) { @@ -1723,6 +1732,8 @@ event_base_loop(struct event_base *base, int flags) goto done; } + event_queue_make_later_events_active(base); + clear_time_cache(base); res = evsel->dispatch(base, tv_p); @@ -1865,13 +1876,13 @@ event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, shor "EV_READ or EV_WRITE", __func__); return -1; } - ev->ev_closure = EV_CLOSURE_SIGNAL; + ev->ev_closure = EV_CLOSURE_EVENT_SIGNAL; } else { if (events & EV_PERSIST) { evutil_timerclear(&ev->ev_io_timeout); - ev->ev_closure = EV_CLOSURE_PERSIST; + ev->ev_closure = EV_CLOSURE_EVENT_PERSIST; } else { - ev->ev_closure = EV_CLOSURE_NONE; + ev->ev_closure = EV_CLOSURE_EVENT; } } @@ -1922,8 +1933,11 @@ event_base_get_running_event(struct event_base *base) { struct event *ev = NULL; EVBASE_ACQUIRE_LOCK(base, th_base_lock); - if (EVBASE_IN_THREAD(base)) - ev = base->current_event; + if (EVBASE_IN_THREAD(base)) { + struct event_callback *evcb = base->current_event; + if (evcb->evcb_flags & EVLIST_INIT) + ev = event_callback_to_event(evcb); + } EVBASE_RELEASE_LOCK(base, th_base_lock); return ev; } @@ -1997,7 +2011,7 @@ event_pending(const struct event *ev, short event, struct timeval *tv) if (ev->ev_flags & EVLIST_INSERTED) flags |= (ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)); - if (ev->ev_flags & EVLIST_ACTIVE) + if (ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) flags |= ev->ev_res; if (ev->ev_flags & EVLIST_TIMEOUT) flags |= EV_TIMEOUT; @@ -2082,6 +2096,13 @@ event_get_callback_arg(const struct event *ev) return ev->ev_arg; } +int +event_get_priority(const struct event *ev) +{ + event_debug_assert_is_setup_(ev); + return ev->ev_pri; +} + int event_add(struct event *ev, const struct timeval *tv) { @@ -2138,7 +2159,7 @@ evthread_notify_base_eventfd(struct event_base *base) /** Tell the thread currently running the event_loop for base (if any) that it * needs to stop waiting in its dispatch function (if it is) and process all - * active events and deferred callbacks (if there are any). */ + * active callbacks. */ static int evthread_notify_base(struct event_base *base) { @@ -2192,7 +2213,8 @@ event_add_internal(struct event *ev, const struct timeval *tv, * until the callback is done before we mess with the event, or else * we can race on ev_ncalls and ev_pncalls below. */ #ifndef EVENT__DISABLE_THREAD_SUPPORT - if (base->current_event == ev && (ev->ev_events & EV_SIGNAL) + if (base->current_event == event_to_event_callback(ev) && + (ev->ev_events & EV_SIGNAL) && !EVBASE_IN_THREAD(base)) { ++base->current_event_waiters; EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); @@ -2200,7 +2222,7 @@ event_add_internal(struct event *ev, const struct timeval *tv, #endif if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) && - !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) { + !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) { if (ev->ev_events & (EV_READ|EV_WRITE)) res = evmap_io_add_(base, ev->ev_fd, ev); else if (ev->ev_events & EV_SIGNAL) @@ -2232,7 +2254,7 @@ event_add_internal(struct event *ev, const struct timeval *tv, * * If tv_is_absolute, this was already set. */ - if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute) + if (ev->ev_closure == EV_CLOSURE_EVENT_PERSIST && !tv_is_absolute) ev->ev_io_timeout = *tv; #ifndef USE_REINSERT_TIMEOUT @@ -2256,7 +2278,7 @@ event_add_internal(struct event *ev, const struct timeval *tv, } } - event_queue_remove_active(base, ev); + event_queue_remove_active(base, event_to_event_callback(ev)); } gettime(base, &now); @@ -2356,7 +2378,8 @@ event_del_internal(struct event *ev) * user-supplied argument. */ base = ev->ev_base; #ifndef EVENT__DISABLE_THREAD_SUPPORT - if (base->current_event == ev && !EVBASE_IN_THREAD(base)) { + if (base->current_event == event_to_event_callback(ev) && + !EVBASE_IN_THREAD(base)) { ++base->current_event_waiters; EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); } @@ -2384,7 +2407,9 @@ event_del_internal(struct event *ev) } if (ev->ev_flags & EVLIST_ACTIVE) - event_queue_remove_active(base, ev); + event_queue_remove_active(base, event_to_event_callback(ev)); + else if (ev->ev_flags & EVLIST_ACTIVE_LATER) + event_queue_remove_active_later(base, event_to_event_callback(ev)); if (ev->ev_flags & EVLIST_INSERTED) { event_queue_remove_inserted(base, ev); @@ -2434,25 +2459,33 @@ event_active_nolock_(struct event *ev, int res, short ncalls) event_debug(("event_active: %p (fd %d), res %d, callback %p", ev, (int)ev->ev_fd, (int)res, ev->ev_callback)); - - /* We get different kinds of events, add them together */ - if (ev->ev_flags & EVLIST_ACTIVE) { - ev->ev_res |= res; - return; - } - base = ev->ev_base; - EVENT_BASE_ASSERT_LOCKED(base); - ev->ev_res = res; + switch ((ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) { + default: + case EVLIST_ACTIVE|EVLIST_ACTIVE_LATER: + EVUTIL_ASSERT(0); + break; + case EVLIST_ACTIVE: + /* We get different kinds of events, add them together */ + ev->ev_res |= res; + return; + case EVLIST_ACTIVE_LATER: + ev->ev_res |= res; + break; + case 0: + ev->ev_res = res; + break; + } if (ev->ev_pri < base->event_running_priority) base->event_continue = 1; if (ev->ev_events & EV_SIGNAL) { #ifndef EVENT__DISABLE_THREAD_SUPPORT - if (base->current_event == ev && !EVBASE_IN_THREAD(base)) { + if (base->current_event == event_to_event_callback(ev) && + !EVBASE_IN_THREAD(base)) { ++base->current_event_waiters; EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); } @@ -2461,60 +2494,171 @@ event_active_nolock_(struct event *ev, int res, short ncalls) ev->ev_pncalls = NULL; } - event_queue_insert_active(base, ev); + event_callback_activate_nolock_(base, event_to_event_callback(ev)); +} +void +event_active_later_(struct event *ev, int res) +{ + EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock); + event_active_later_nolock_(ev, res); + EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock); +} + +void +event_active_later_nolock_(struct event *ev, int res) +{ + struct event_base *base = ev->ev_base; + EVENT_BASE_ASSERT_LOCKED(base); + + if (ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) { + /* We get different kinds of events, add them together */ + ev->ev_res |= res; + return; + } + + ev->ev_res = res; + + event_callback_activate_later_nolock_(base, event_to_event_callback(ev)); +} + +int +event_callback_activate_(struct event_base *base, + struct event_callback *evcb) +{ + int r; + EVBASE_ACQUIRE_LOCK(base, th_base_lock); + r = event_callback_activate_nolock_(base, evcb); + EVBASE_RELEASE_LOCK(base, th_base_lock); + return r; +} + +int +event_callback_activate_nolock_(struct event_base *base, + struct event_callback *evcb) +{ + int r = 1; + + switch (evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) { + default: + EVUTIL_ASSERT(0); + case EVLIST_ACTIVE_LATER: + event_queue_remove_active_later(base, evcb); + r = 0; + break; + case EVLIST_ACTIVE: + return 0; + case 0: + break; + } + + event_queue_insert_active(base, evcb); + + if (EVBASE_NEED_NOTIFY(base)) + evthread_notify_base(base); + + return r; +} + +void +event_callback_activate_later_nolock_(struct event_base *base, + struct event_callback *evcb) +{ + if (evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) + return; + + event_queue_insert_active_later(base, evcb); if (EVBASE_NEED_NOTIFY(base)) evthread_notify_base(base); } void -event_deferred_cb_init_(struct deferred_cb *cb, deferred_cb_fn fn, void *arg) +event_callback_init_(struct event_base *base, + struct event_callback *cb) { - memset(cb, 0, sizeof(struct deferred_cb)); - cb->cb = fn; - cb->arg = arg; + memset(cb, 0, sizeof(*cb)); + cb->evcb_pri = base->nactivequeues - 1; +} + +int +event_callback_cancel_(struct event_base *base, + struct event_callback *evcb) +{ + int r; + EVBASE_ACQUIRE_LOCK(base, th_base_lock); + r = event_callback_cancel_nolock_(base, evcb); + EVBASE_RELEASE_LOCK(base, th_base_lock); + return r; +} + +int +event_callback_cancel_nolock_(struct event_base *base, + struct event_callback *evcb) +{ + if (evcb->evcb_flags & EVLIST_INIT) + return event_del_internal(event_callback_to_event(evcb)); + + switch ((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) { + default: + case EVLIST_ACTIVE|EVLIST_ACTIVE_LATER: + EVUTIL_ASSERT(0); + break; + case EVLIST_ACTIVE: + /* We get different kinds of events, add them together */ + event_queue_remove_active(base, evcb); + return 0; + case EVLIST_ACTIVE_LATER: + event_queue_remove_active_later(base, evcb); + break; + case 0: + break; + } + + event_base_assert_ok_(base); + + return 0; } void -event_deferred_cb_cancel_(struct deferred_cb_queue *queue, - struct deferred_cb *cb) +event_deferred_cb_init_(struct event_callback *cb, ev_uint8_t priority, deferred_cb_fn fn, void *arg) { - if (!queue) { - if (current_base) - queue = ¤t_base->defer_queue; - else - return; - } - - LOCK_DEFERRED_QUEUE(queue); - if (cb->queued) { - TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next); - --queue->active_count; - cb->queued = 0; - } - UNLOCK_DEFERRED_QUEUE(queue); + memset(cb, 0, sizeof(*cb)); + cb->evcb_cb_union.evcb_selfcb = fn; + cb->evcb_arg = arg; + cb->evcb_pri = priority; + cb->evcb_closure = EV_CLOSURE_CB_SELF; } void -event_deferred_cb_schedule_(struct deferred_cb_queue *queue, - struct deferred_cb *cb) +event_deferred_cb_set_priority_(struct event_callback *cb, ev_uint8_t priority) { - if (!queue) { - if (current_base) - queue = ¤t_base->defer_queue; - else - return; - } + cb->evcb_pri = priority; +} - LOCK_DEFERRED_QUEUE(queue); - if (!cb->queued) { - cb->queued = 1; - TAILQ_INSERT_TAIL(&queue->deferred_cb_list, cb, cb_next); - ++queue->active_count; - if (queue->notify_fn) - queue->notify_fn(queue, queue->notify_arg); +void +event_deferred_cb_cancel_(struct event_base *base, struct event_callback *cb) +{ + if (!base) + base = current_base; + event_callback_cancel_(base, cb); +} + +#define MAX_DEFERREDS_QUEUED 32 +int +event_deferred_cb_schedule_(struct event_base *base, struct event_callback *cb) +{ + int r = 1; + if (!base) + base = current_base; + EVBASE_ACQUIRE_LOCK(base, th_base_lock); + if (base->n_deferreds_queued > MAX_DEFERREDS_QUEUED) { + event_callback_activate_later_nolock_(base, cb); + } else { + ++base->n_deferreds_queued; + r = event_callback_activate_nolock_(base, cb); } - UNLOCK_DEFERRED_QUEUE(queue); + EVBASE_RELEASE_LOCK(base, th_base_lock); + return r; } static int @@ -2585,13 +2729,13 @@ timeout_process(struct event_base *base) #error "Mismatch for value of EVLIST_INTERNAL" #endif /* These are a fancy way to spell - if (~ev->ev_flags & EVLIST_INTERNAL) + if (flags & EVLIST_INTERNAL) base->event_count--/++; */ -#define DECR_EVENT_COUNT(base,ev) \ - ((base)->event_count -= (~((ev)->ev_flags >> 4) & 1)) -#define INCR_EVENT_COUNT(base, ev) \ - ((base)->event_count += (~((ev)->ev_flags >> 4) & 1)) +#define DECR_EVENT_COUNT(base,flags) \ + ((base)->event_count -= (~((flags) >> 4) & 1)) +#define INCR_EVENT_COUNT(base,flags) \ + ((base)->event_count += (~((flags) >> 4) & 1)) static void event_queue_remove_inserted(struct event_base *base, struct event *ev) @@ -2602,23 +2746,39 @@ event_queue_remove_inserted(struct event_base *base, struct event *ev) ev, ev->ev_fd, EVLIST_INSERTED); return; } - DECR_EVENT_COUNT(base, ev); + DECR_EVENT_COUNT(base, ev->ev_flags); ev->ev_flags &= ~EVLIST_INSERTED; } static void -event_queue_remove_active(struct event_base *base, struct event *ev) +event_queue_remove_active(struct event_base *base, struct event_callback *evcb) { EVENT_BASE_ASSERT_LOCKED(base); - if (EVUTIL_FAILURE_CHECK(!(ev->ev_flags & EVLIST_ACTIVE))) { - event_errx(1, "%s: %p(fd %d) not on queue %x", __func__, - ev, ev->ev_fd, EVLIST_ACTIVE); + if (EVUTIL_FAILURE_CHECK(!(evcb->evcb_flags & EVLIST_ACTIVE))) { + event_errx(1, "%s: %p not on queue %x", __func__, + evcb, EVLIST_ACTIVE); return; } - DECR_EVENT_COUNT(base, ev); - ev->ev_flags &= ~EVLIST_ACTIVE; + DECR_EVENT_COUNT(base, evcb->evcb_flags); + evcb->evcb_flags &= ~EVLIST_ACTIVE; base->event_count_active--; - TAILQ_REMOVE(&base->activequeues[ev->ev_pri], - ev, ev_active_next); + + TAILQ_REMOVE(&base->activequeues[evcb->evcb_pri], + evcb, evcb_active_next); +} +static void +event_queue_remove_active_later(struct event_base *base, struct event_callback *evcb) +{ + EVENT_BASE_ASSERT_LOCKED(base); + if (EVUTIL_FAILURE_CHECK(!(evcb->evcb_flags & EVLIST_ACTIVE_LATER))) { + event_errx(1, "%s: %p not on queue %x", __func__, + evcb, EVLIST_ACTIVE_LATER); + return; + } + DECR_EVENT_COUNT(base, evcb->evcb_flags); + evcb->evcb_flags &= ~EVLIST_ACTIVE_LATER; + base->event_count_active--; + + TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next); } static void event_queue_remove_timeout(struct event_base *base, struct event *ev) @@ -2629,7 +2789,7 @@ event_queue_remove_timeout(struct event_base *base, struct event *ev) ev, ev->ev_fd, EVLIST_TIMEOUT); return; } - DECR_EVENT_COUNT(base, ev); + DECR_EVENT_COUNT(base, ev->ev_flags); ev->ev_flags &= ~EVLIST_TIMEOUT; if (is_common_timeout(&ev->ev_timeout, base)) { @@ -2725,28 +2885,45 @@ event_queue_insert_inserted(struct event_base *base, struct event *ev) return; } - INCR_EVENT_COUNT(base, ev); + INCR_EVENT_COUNT(base, ev->ev_flags); ev->ev_flags |= EVLIST_INSERTED; } static void -event_queue_insert_active(struct event_base *base, struct event *ev) +event_queue_insert_active(struct event_base *base, struct event_callback *evcb) { EVENT_BASE_ASSERT_LOCKED(base); - if (ev->ev_flags & EVLIST_ACTIVE) { + if (evcb->evcb_flags & EVLIST_ACTIVE) { /* Double insertion is possible for active events */ return; } - INCR_EVENT_COUNT(base, ev); + INCR_EVENT_COUNT(base, evcb->evcb_flags); - ev->ev_flags |= EVLIST_ACTIVE; + evcb->evcb_flags |= EVLIST_ACTIVE; base->event_count_active++; - TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri], - ev,ev_active_next); + EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues); + TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], + evcb, evcb_active_next); +} + +static void +event_queue_insert_active_later(struct event_base *base, struct event_callback *evcb) +{ + EVENT_BASE_ASSERT_LOCKED(base); + if (evcb->evcb_flags & (EVLIST_ACTIVE_LATER|EVLIST_ACTIVE)) { + /* Double insertion is possible */ + return; + } + + INCR_EVENT_COUNT(base, evcb->evcb_flags); + evcb->evcb_flags |= EVLIST_ACTIVE_LATER; + base->event_count_active++; + EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues); + TAILQ_INSERT_TAIL(&base->active_later_queue, evcb, evcb_active_next); } static void @@ -2760,7 +2937,7 @@ event_queue_insert_timeout(struct event_base *base, struct event *ev) return; } - INCR_EVENT_COUNT(base, ev); + INCR_EVENT_COUNT(base, ev->ev_flags); ev->ev_flags |= EVLIST_TIMEOUT; @@ -2773,6 +2950,21 @@ event_queue_insert_timeout(struct event_base *base, struct event *ev) } } +static void +event_queue_make_later_events_active(struct event_base *base) +{ + struct event_callback *evcb; + EVENT_BASE_ASSERT_LOCKED(base); + + while ((evcb = TAILQ_FIRST(&base->active_later_queue))) { + TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next); + evcb->evcb_flags = (evcb->evcb_flags & ~EVLIST_ACTIVE_LATER) | EVLIST_ACTIVE; + EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues); + TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], evcb, evcb_active_next); + base->n_deferreds_queued += (evcb->evcb_closure == EV_CLOSURE_CB_SELF); + } +} + /* Functions for debugging */ const char * @@ -3034,11 +3226,15 @@ event_base_foreach_event_(struct event_base *base, /* Finally, we deal wit all the active events that we haven't touched * yet. */ for (i = 0; i < base->nactivequeues; ++i) { - TAILQ_FOREACH(ev, &base->activequeues[i], ev_active_next) { - if (ev->ev_flags & (EVLIST_INSERTED|EVLIST_TIMEOUT)) { - /* we already processed this one */ + struct event_callback *evcb; + TAILQ_FOREACH(evcb, &base->activequeues[i], evcb_active_next) { + if ((evcb->evcb_flags & (EVLIST_INIT|EVLIST_INSERTED|EVLIST_TIMEOUT)) != EVLIST_INIT) { + /* This isn't an event (evlist_init clear), or + * we already processed it. (inserted or + * timeout set */ continue; } + ev = event_callback_to_event(evcb); if ((r = fn(base, ev, arg))) return r; } @@ -3088,16 +3284,17 @@ dump_active_event_fn(struct event_base *base, struct event *e, void *arg) const char *gloss = (e->ev_events & EV_SIGNAL) ? "sig" : "fd "; - if (! (e->ev_flags & EVLIST_ACTIVE)) + if (! (e->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) return 0; - fprintf(output, " %p [%s %ld, priority=%d]%s%s%s%s active%s\n", + fprintf(output, " %p [%s %ld, priority=%d]%s%s%s%s active%s%s\n", (void*)e, gloss, (long)e->ev_fd, e->ev_pri, (e->ev_res&EV_READ)?" Read":"", (e->ev_res&EV_WRITE)?" Write":"", (e->ev_res&EV_SIGNAL)?" Signal":"", (e->ev_res&EV_TIMEOUT)?" Timeout":"", - (e->ev_flags&EVLIST_INTERNAL)?" [Internal]":""); + (e->ev_flags&EVLIST_INTERNAL)?" [Internal]":"", + (e->ev_flags&EVLIST_ACTIVE_LATER)?" [NextTime]":""); return 0; } @@ -3196,6 +3393,7 @@ void event_base_assert_ok_(struct event_base *base) { int i; + int count; EVBASE_ACQUIRE_LOCK(base, th_base_lock); /* First do checks on the per-fd and per-signal lists */ @@ -3230,14 +3428,25 @@ event_base_assert_ok_(struct event_base *base) } /* Check the active queues. */ + count = 0; for (i = 0; i < base->nactivequeues; ++i) { - struct event *ev; - EVUTIL_ASSERT_TAILQ_OK(&base->activequeues[i], event, ev_active_next); - TAILQ_FOREACH(ev, &base->activequeues[i], ev_active_next) { - EVUTIL_ASSERT(ev->ev_pri == i); - EVUTIL_ASSERT(ev->ev_flags & EVLIST_ACTIVE); + struct event_callback *evcb; + EVUTIL_ASSERT_TAILQ_OK(&base->activequeues[i], event_callback, evcb_active_next); + TAILQ_FOREACH(evcb, &base->activequeues[i], evcb_active_next) { + EVUTIL_ASSERT((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) == EVLIST_ACTIVE); + EVUTIL_ASSERT(evcb->evcb_pri == i); + ++count; } } + { + struct event_callback *evcb; + TAILQ_FOREACH(evcb, &base->active_later_queue, evcb_active_next) { + EVUTIL_ASSERT((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) == EVLIST_ACTIVE_LATER); + ++count; + } + } + EVUTIL_ASSERT(count == base->event_count_active); + EVBASE_RELEASE_LOCK(base, th_base_lock); } diff --git a/http-internal.h b/http-internal.h index e366c7c6..83aa6ef1 100644 --- a/http-internal.h +++ b/http-internal.h @@ -103,7 +103,7 @@ struct evhttp_connection { void (*closecb)(struct evhttp_connection *, void *); void *closecb_arg; - struct deferred_cb read_more_deferred_cb; + struct event_callback read_more_deferred_cb; struct event_base *base; struct evdns_base *dns_base; diff --git a/http.c b/http.c index 786105d3..936d4321 100644 --- a/http.c +++ b/http.c @@ -1017,7 +1017,7 @@ evhttp_read_body(struct evhttp_connection *evcon, struct evhttp_request *req) } #define get_deferred_queue(evcon) \ - (event_base_get_deferred_cb_queue_((evcon)->base)) + ((evcon)->base) /* * Gets called when more data becomes available @@ -1079,7 +1079,7 @@ evhttp_read_cb(struct bufferevent *bufev, void *arg) } static void -evhttp_deferred_read_cb(struct deferred_cb *cb, void *data) +evhttp_deferred_read_cb(struct event_callback *cb, void *data) { struct evhttp_connection *evcon = data; evhttp_read_cb(evcon->bufev, evcon); @@ -2197,7 +2197,9 @@ evhttp_connection_base_bufferevent_new(struct event_base *base, struct evdns_bas bufferevent_base_set(base, evcon->bufev); } - event_deferred_cb_init_(&evcon->read_more_deferred_cb, + event_deferred_cb_init_( + &evcon->read_more_deferred_cb, + bufferevent_get_priority(bev), evhttp_deferred_read_cb, evcon); evcon->dns_base = dnsbase; diff --git a/include/event2/bufferevent.h b/include/event2/bufferevent.h index 9e65873f..0d94e8d1 100644 --- a/include/event2/bufferevent.h +++ b/include/event2/bufferevent.h @@ -273,6 +273,12 @@ struct event_base *bufferevent_get_base(struct bufferevent *bev); */ int bufferevent_priority_set(struct bufferevent *bufev, int pri); +/** + Return the priority of a bufferevent. + + Only supported for socket bufferevents + */ +int bufferevent_get_priority(struct bufferevent *bufev); /** Deallocate the storage associated with a bufferevent structure. diff --git a/include/event2/event.h b/include/event2/event.h index cf358c03..c2f65c92 100644 --- a/include/event2/event.h +++ b/include/event2/event.h @@ -758,6 +758,25 @@ int event_base_loopexit(struct event_base *, const struct timeval *); */ int event_base_loopbreak(struct event_base *); +/** + Tell the active event_base_loop() to scan for new events immediately. + + Calling this function makes the currently active event_base_loop() + start the loop over again (scanning for new events) after the current + event callback finishes. If the event loop is not running, this + function has no effect. + + event_base_loopbreak() is typically invoked from this event's callback. + This behavior is analogous to the "continue;" statement. + + Subsequent invocations of event loop will proceed normally. + + @param eb the event_base structure returned by event_init() + @return 0 if successful, or -1 if an error occurred + @see event_base_loopbreak() + */ +int event_base_loopcontinue(struct event_base *); + /** Checks if the event loop was told to exit by event_loopexit(). @@ -1128,6 +1147,12 @@ event_callback_fn event_get_callback(const struct event *ev); */ void *event_get_callback_arg(const struct event *ev); +/** + Return the priority of an event. + @see event_priority_init(), event_get_priority() +*/ +int event_get_priority(const struct event *ev); + /** Extract _all_ of arguments given to construct a given event. The event_base is copied into *base_out, the fd is copied into *fd_out, and so @@ -1230,7 +1255,7 @@ int event_base_get_npriorities(struct event_base *eb); @param ev an event struct @param priority the new priority to be assigned @return 0 if successful, or -1 if an error occurred - @see event_priority_init() + @see event_priority_init(), event_get_priority() */ int event_priority_set(struct event *, int); diff --git a/include/event2/event_struct.h b/include/event2/event_struct.h index 44bb6f7e..cf7b3df3 100644 --- a/include/event2/event_struct.h +++ b/include/event2/event_struct.h @@ -54,15 +54,15 @@ extern "C" { /* For evkeyvalq */ #include -#define EVLIST_TIMEOUT 0x01 -#define EVLIST_INSERTED 0x02 -#define EVLIST_SIGNAL 0x04 -#define EVLIST_ACTIVE 0x08 -#define EVLIST_INTERNAL 0x10 -#define EVLIST_INIT 0x80 +#define EVLIST_TIMEOUT 0x01 +#define EVLIST_INSERTED 0x02 +#define EVLIST_SIGNAL 0x04 +#define EVLIST_ACTIVE 0x08 +#define EVLIST_INTERNAL 0x10 +#define EVLIST_ACTIVE_LATER 0x20 +#define EVLIST_INIT 0x80 -/* EVLIST_X_ Private space: 0x1000-0xf000 */ -#define EVLIST_ALL (0xf000 | 0x9f) +#define EVLIST_ALL 0xbf /* Fix so that people don't have to run with */ #ifndef TAILQ_ENTRY @@ -93,9 +93,22 @@ struct { \ } #endif /* !TAILQ_ENTRY */ +struct event_callback { + TAILQ_ENTRY(event_callback) evcb_active_next; + short evcb_flags; + ev_uint8_t evcb_pri; /* smaller numbers are higher priority */ + ev_uint8_t evcb_closure; + /* allows us to adopt for different types of events */ + union { + void (*evcb_callback)(evutil_socket_t, short, void *arg); + void (*evcb_selfcb)(struct event_callback *, void *arg); + } evcb_cb_union; + void *evcb_arg; +}; + struct event_base; struct event { - TAILQ_ENTRY(event) ev_active_next; + struct event_callback ev_evcallback; /* for managing timeouts */ union { @@ -124,14 +137,7 @@ struct event { short ev_events; short ev_res; /* result passed to event callback */ - short ev_flags; - ev_uint8_t ev_pri; /* smaller numbers are higher priority */ - ev_uint8_t ev_closure; struct timeval ev_timeout; - - /* allows us to adopt for different types of events */ - void (*ev_callback)(evutil_socket_t, short, void *arg); - void *ev_arg; }; TAILQ_HEAD (event_list, event); diff --git a/listener.c b/listener.c index 2cceff24..7cec0571 100644 --- a/listener.c +++ b/listener.c @@ -440,7 +440,7 @@ struct accepting_socket { struct event_overlapped overlapped; SOCKET s; int error; - struct deferred_cb deferred; + struct event_callback deferred; struct evconnlistener_iocp *lev; ev_uint8_t buflen; ev_uint8_t family; @@ -450,7 +450,7 @@ struct accepting_socket { static void accepted_socket_cb(struct event_overlapped *o, ev_uintptr_t key, ev_ssize_t n, int ok); -static void accepted_socket_invoke_user_cb(struct deferred_cb *cb, void *arg); +static void accepted_socket_invoke_user_cb(struct event_callback *cb, void *arg); static void iocp_listener_event_add(struct evconnlistener_iocp *lev) @@ -498,7 +498,8 @@ new_accepting_socket(struct evconnlistener_iocp *lev, int family) res->family = family; event_deferred_cb_init_(&res->deferred, - accepted_socket_invoke_user_cb, res); + event_base_get_npriorities(base) / 2, + accepted_socket_invoke_user_cb, res); InitializeCriticalSectionAndSpinCount(&res->lock, 1000); @@ -566,7 +567,7 @@ start_accepting(struct accepting_socket *as) report_err: as->error = error; event_deferred_cb_schedule_( - event_base_get_deferred_cb_queue_(as->lev->event_base), + as->lev->event_base, &as->deferred); return 0; } @@ -581,7 +582,7 @@ stop_accepting(struct accepting_socket *as) } static void -accepted_socket_invoke_user_cb(struct deferred_cb *dcb, void *arg) +accepted_socket_invoke_user_cb(struct event_callback *dcb, void *arg) { struct accepting_socket *as = arg; @@ -658,7 +659,7 @@ accepted_socket_cb(struct event_overlapped *o, ev_uintptr_t key, ev_ssize_t n, i if (ok) { /* XXXX Don't do this if some EV_MT flag is set. */ event_deferred_cb_schedule_( - event_base_get_deferred_cb_queue_(as->lev->event_base), + as->lev->event_base, &as->deferred); LeaveCriticalSection(&as->lock); } else if (as->free_on_cb) { @@ -683,7 +684,7 @@ accepted_socket_cb(struct event_overlapped *o, ev_uintptr_t key, ev_ssize_t n, i as->error = WSAGetLastError(); } event_deferred_cb_schedule_( - event_base_get_deferred_cb_queue_(as->lev->event_base), + as->lev->event_base, &as->deferred); LeaveCriticalSection(&as->lock); } diff --git a/test/regress.c b/test/regress.c index 0c5faed7..b736d891 100644 --- a/test/regress.c +++ b/test/regress.c @@ -1350,6 +1350,66 @@ end: ; } +static int n_write_a_byte_cb=0; +static int n_read_and_drain_cb=0; +static int n_activate_other_event_cb=0; +static void +write_a_byte_cb(evutil_socket_t fd, short what, void *arg) +{ + char buf[] = "x"; + write(fd, buf, 1); + ++n_write_a_byte_cb; +} +static void +read_and_drain_cb(evutil_socket_t fd, short what, void *arg) +{ + char buf[128]; + int n; + ++n_read_and_drain_cb; + while ((n = read(fd, buf, sizeof(buf))) > 0) + ; +} + +static void +activate_other_event_cb(evutil_socket_t fd, short what, void *other_) +{ + struct event *ev_activate = other_; + ++n_activate_other_event_cb; + event_active_later_(ev_activate, EV_READ); +} + +static void +test_active_later(void *ptr) +{ + struct basic_test_data *data = ptr; + struct event *ev1, *ev2; + struct event ev3, ev4; + struct timeval qsec = {0, 100000}; + ev1 = event_new(data->base, data->pair[0], EV_READ|EV_PERSIST, read_and_drain_cb, NULL); + ev2 = event_new(data->base, data->pair[1], EV_WRITE|EV_PERSIST, write_a_byte_cb, NULL); + event_assign(&ev3, data->base, -1, 0, activate_other_event_cb, &ev4); + event_assign(&ev4, data->base, -1, 0, activate_other_event_cb, &ev3); + event_add(ev1, NULL); + event_add(ev2, NULL); + event_active_later_(&ev3, EV_READ); + + event_base_loopexit(data->base, &qsec); + + event_base_loop(data->base, 0); + + TT_BLATHER(("%d write calls, %d read calls, %d activate-other calls.", + n_write_a_byte_cb, n_read_and_drain_cb, n_activate_other_event_cb)); + event_del(&ev3); + event_del(&ev4); + + tt_int_op(n_write_a_byte_cb, ==, n_activate_other_event_cb); + tt_int_op(n_write_a_byte_cb, >, 100); + tt_int_op(n_read_and_drain_cb, >, 100); + tt_int_op(n_activate_other_event_cb, >, 100); +end: + ; +} + static void test_event_base_new(void *ptr) { @@ -2468,7 +2528,9 @@ struct testcase_t main_testcases[] = { BASIC(bad_assign, TT_FORK|TT_NEED_BASE|TT_NO_LOGS), BASIC(bad_reentrant, TT_FORK|TT_NEED_BASE|TT_NO_LOGS), + BASIC(active_later, TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR), + /* These are still using the old API */ LEGACY(persistent_timeout, TT_FORK|TT_NEED_BASE), { "persistent_timeout_jump", test_persistent_timeout_jump, TT_FORK|TT_NEED_BASE, &basic_setup, NULL }, { "persistent_active_timeout", test_persistent_active_timeout, diff --git a/test/regress_buffer.c b/test/regress_buffer.c index d183b4f9..d94a8b37 100644 --- a/test/regress_buffer.c +++ b/test/regress_buffer.c @@ -57,6 +57,7 @@ #include "event2/buffer_compat.h" #include "event2/util.h" +#include "defer-internal.h" #include "evbuffer-internal.h" #include "log-internal.h" diff --git a/test/regress_thread.c b/test/regress_thread.c index 091bcb73..24ac0ef6 100644 --- a/test/regress_thread.c +++ b/test/regress_thread.c @@ -415,8 +415,8 @@ SLEEP_MS(int ms) } struct deferred_test_data { - struct deferred_cb cbs[CB_COUNT]; - struct deferred_cb_queue *queue; + struct event_callback cbs[CB_COUNT]; + struct event_base *queue; }; static struct timeval timer_start = {0,0}; @@ -426,7 +426,7 @@ static THREAD_T load_threads[QUEUE_THREAD_COUNT]; static struct deferred_test_data deferred_data[QUEUE_THREAD_COUNT]; static void -deferred_callback(struct deferred_cb *cb, void *arg) +deferred_callback(struct event_callback *cb, void *arg) { SLEEP_MS(1); callback_count += 1; @@ -439,7 +439,8 @@ load_deferred_queue(void *arg) size_t i; for (i = 0; i < CB_COUNT; ++i) { - event_deferred_cb_init_(&data->cbs[i], deferred_callback, NULL); + event_deferred_cb_init_(&data->cbs[i], 0, deferred_callback, + NULL); event_deferred_cb_schedule_(data->queue, &data->cbs[i]); SLEEP_MS(1); } @@ -469,23 +470,28 @@ thread_deferred_cb_skew(void *arg) { struct basic_test_data *data = arg; struct timeval tv_timer = {1, 0}; - struct deferred_cb_queue *queue; + struct event_base *base = NULL; + struct event_config *cfg = NULL; struct timeval elapsed; int elapsed_usec; int i; - queue = event_base_get_deferred_cb_queue_(data->base); - tt_assert(queue); + cfg = event_config_new(); + tt_assert(cfg); + event_config_set_max_dispatch_interval(cfg, NULL, 16, 0); + + base = event_base_new_with_config(cfg); + tt_assert(base); for (i = 0; i < QUEUE_THREAD_COUNT; ++i) - deferred_data[i].queue = queue; + deferred_data[i].queue = base; evutil_gettimeofday(&timer_start, NULL); - event_base_once(data->base, -1, EV_TIMEOUT, timer_callback, NULL, + event_base_once(base, -1, EV_TIMEOUT, timer_callback, NULL, &tv_timer); - event_base_once(data->base, -1, EV_TIMEOUT, start_threads_callback, + event_base_once(base, -1, EV_TIMEOUT, start_threads_callback, NULL, NULL); - event_base_dispatch(data->base); + event_base_dispatch(base); evutil_timersub(&timer_end, &timer_start, &elapsed); TT_BLATHER(("callback count, %u", callback_count)); @@ -500,6 +506,10 @@ thread_deferred_cb_skew(void *arg) end: for (i = 0; i < QUEUE_THREAD_COUNT; ++i) THREAD_JOIN(load_threads[i]); + if (base) + event_base_free(base); + if (cfg) + event_config_free(cfg); } static struct event time_events[5]; @@ -583,7 +593,8 @@ struct testcase_t thread_testcases[] = { &basic_setup, (char*)"forking" }, #endif TEST(conditions_simple), - TEST(deferred_cb_skew), + { "deferred_cb_skew", thread_deferred_cb_skew, TT_FORK|TT_NEED_THREADS, + &basic_setup, NULL }, TEST(no_events), END_OF_TESTCASES };