diff --git a/event-internal.h b/event-internal.h index e4025cf6..93b7c626 100644 --- a/event-internal.h +++ b/event-internal.h @@ -225,6 +225,9 @@ struct event_base { struct evcallback_list *activequeues; /** The length of the activequeues array */ int nactivequeues; + /** A list of event_callbacks that should become active the next time + * we process events, but not this time. */ + struct evcallback_list active_later_queue; /* common timeout logic */ @@ -364,7 +367,17 @@ int evsig_restore_handler_(struct event_base *base, int evsignal); void event_active_nolock_(struct event *ev, int res, short count); void event_callback_activate_nolock_(struct event_base *, struct event_callback *); +int event_callback_cancel_(struct event_base *base, + struct event_callback *evcb); +void event_active_later_(struct event *ev, int res); +void event_active_later_nolock_(struct event *ev, int res); +void event_callback_activate_later_nolock_(struct event_base *base, + struct event_callback *evcb); +int event_callback_cancel_nolock_(struct event_base *base, + struct event_callback *evcb); +void event_callback_init_(struct event_base *base, + struct event_callback *cb); /* FIXME document. */ void event_base_add_virtual_(struct event_base *base); diff --git a/event.c b/event.c index 3307db1b..d6bf4207 100644 --- a/event.c +++ b/event.c @@ -136,11 +136,15 @@ static inline int event_add_internal(struct event *ev, static inline int event_del_internal(struct event *ev); static void event_queue_insert_active(struct event_base *, struct event_callback *); +static void event_queue_insert_active_later(struct event_base *, struct event_callback *); static void event_queue_insert_timeout(struct event_base *, struct event *); static void event_queue_insert_inserted(struct event_base *, struct event *); static void event_queue_remove_active(struct event_base *, struct event_callback *); +static void event_queue_remove_active_later(struct event_base *, struct event_callback *); static void event_queue_remove_timeout(struct event_base *, struct event *); static void event_queue_remove_inserted(struct event_base *, struct event *); +static void event_queue_make_later_events_active(struct event_base *base); + #ifdef USE_REINSERT_TIMEOUT /* This code seems buggy; only turn it on if we find out what the trouble is. */ static void event_queue_reinsert_timeout(struct event_base *,struct event *, int was_common, int is_common, int old_timeout_idx); @@ -606,6 +610,8 @@ event_base_new_with_config(const struct event_config *cfg) base->defer_queue.notify_fn = notify_base_cbq_callback; base->defer_queue.notify_arg = base; + TAILQ_INIT(&base->active_later_queue); + evmap_io_initmap_(&base->io); evmap_signal_initmap_(&base->sigmap); event_changelist_init_(&base->changelist); @@ -800,11 +806,26 @@ event_base_free(struct event_base *base) ++n_deleted; } } else { - event_queue_remove_active(base, evcb); + event_callback_cancel_(base, evcb); + ++n_deleted; } evcb = next; } } + { + struct event_callback *evcb; + while ((evcb = TAILQ_FIRST(&base->active_later_queue))) { + if (evcb->evcb_flags & EVLIST_INIT) { + ev = event_callback_to_event(evcb); + event_del(ev); + ++n_deleted; + } else { + event_callback_cancel_(base, evcb); + ++n_deleted; + } + } + } + if (n_deleted) event_debug(("%s: %d events were still set in base", @@ -1754,6 +1775,8 @@ event_base_loop(struct event_base *base, int flags) goto done; } + event_queue_make_later_events_active(base); + clear_time_cache(base); res = evsel->dispatch(base, tv_p); @@ -2031,7 +2054,7 @@ event_pending(const struct event *ev, short event, struct timeval *tv) if (ev->ev_flags & EVLIST_INSERTED) flags |= (ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)); - if (ev->ev_flags & EVLIST_ACTIVE) + if (ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) flags |= ev->ev_res; if (ev->ev_flags & EVLIST_TIMEOUT) flags |= EV_TIMEOUT; @@ -2235,7 +2258,7 @@ event_add_internal(struct event *ev, const struct timeval *tv, #endif if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) && - !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) { + !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) { if (ev->ev_events & (EV_READ|EV_WRITE)) res = evmap_io_add_(base, ev->ev_fd, ev); else if (ev->ev_events & EV_SIGNAL) @@ -2421,6 +2444,8 @@ event_del_internal(struct event *ev) if (ev->ev_flags & EVLIST_ACTIVE) event_queue_remove_active(base, event_to_event_callback(ev)); + else if (ev->ev_flags & EVLIST_ACTIVE_LATER) + event_queue_remove_active_later(base, event_to_event_callback(ev)); if (ev->ev_flags & EVLIST_INSERTED) { event_queue_remove_inserted(base, ev); @@ -2470,18 +2495,25 @@ event_active_nolock_(struct event *ev, int res, short ncalls) event_debug(("event_active: %p (fd %d), res %d, callback %p", ev, (int)ev->ev_fd, (int)res, ev->ev_callback)); - - /* We get different kinds of events, add them together */ - if (ev->ev_flags & EVLIST_ACTIVE) { - ev->ev_res |= res; - return; - } - base = ev->ev_base; - EVENT_BASE_ASSERT_LOCKED(base); - ev->ev_res = res; + switch ((ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) { + default: + case EVLIST_ACTIVE|EVLIST_ACTIVE_LATER: + EVUTIL_ASSERT(0); + break; + case EVLIST_ACTIVE: + /* We get different kinds of events, add them together */ + ev->ev_res |= res; + return; + case EVLIST_ACTIVE_LATER: + ev->ev_res |= res; + break; + case 0: + ev->ev_res = res; + break; + } if (ev->ev_pri < base->event_running_priority) base->event_continue = 1; @@ -2501,16 +2533,100 @@ event_active_nolock_(struct event *ev, int res, short ncalls) event_callback_activate_nolock_(base, event_to_event_callback(ev)); } +void +event_active_later_(struct event *ev, int res) +{ + EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock); + event_active_later_nolock_(ev, res); + EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock); +} + +void +event_active_later_nolock_(struct event *ev, int res) +{ + struct event_base *base = ev->ev_base; + EVENT_BASE_ASSERT_LOCKED(base); + + if (ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) { + /* We get different kinds of events, add them together */ + ev->ev_res |= res; + return; + } + + ev->ev_res = res; + + event_callback_activate_later_nolock_(base, event_to_event_callback(ev)); +} + void event_callback_activate_nolock_(struct event_base *base, struct event_callback *evcb) { + if (evcb->evcb_flags & EVLIST_ACTIVE_LATER) + event_queue_remove_active_later(base, evcb); + event_queue_insert_active(base, evcb); if (EVBASE_NEED_NOTIFY(base)) evthread_notify_base(base); } +void +event_callback_activate_later_nolock_(struct event_base *base, + struct event_callback *evcb) +{ + if (evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) + return; + + event_queue_insert_active_later(base, evcb); + if (EVBASE_NEED_NOTIFY(base)) + evthread_notify_base(base); +} + +void +event_callback_init_(struct event_base *base, + struct event_callback *cb) +{ + memset(cb, 0, sizeof(*cb)); + cb->evcb_pri = base->nactivequeues - 1; +} + +int +event_callback_cancel_(struct event_base *base, + struct event_callback *evcb) +{ + int r; + EVBASE_ACQUIRE_LOCK(base, th_base_lock); + r = event_callback_cancel_nolock_(base, evcb); + EVBASE_RELEASE_LOCK(base, th_base_lock); + return r; +} + +int +event_callback_cancel_nolock_(struct event_base *base, + struct event_callback *evcb) +{ + if (evcb->evcb_flags & EVLIST_INIT) + return event_del_internal(event_callback_to_event(evcb)); + + switch ((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) { + default: + case EVLIST_ACTIVE|EVLIST_ACTIVE_LATER: + EVUTIL_ASSERT(0); + break; + case EVLIST_ACTIVE: + /* We get different kinds of events, add them together */ + event_queue_remove_active(base, evcb); + return 0; + case EVLIST_ACTIVE_LATER: + event_queue_remove_active_later(base, evcb); + break; + case 0: + break; + } + return 0; +} + void event_deferred_cb_init_(struct deferred_cb *cb, deferred_cb_fn fn, void *arg) { @@ -2666,6 +2782,21 @@ event_queue_remove_active(struct event_base *base, struct event_callback *evcb) evcb, evcb_active_next); } static void +event_queue_remove_active_later(struct event_base *base, struct event_callback *evcb) +{ + EVENT_BASE_ASSERT_LOCKED(base); + if (EVUTIL_FAILURE_CHECK(!(evcb->evcb_flags & EVLIST_ACTIVE_LATER))) { + event_errx(1, "%s: %p not on queue %x", __func__, + evcb, EVLIST_ACTIVE_LATER); + return; + } + DECR_EVENT_COUNT(base, evcb->evcb_flags); + evcb->evcb_flags &= ~EVLIST_ACTIVE_LATER; + base->event_count_active--; + + TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next); +} +static void event_queue_remove_timeout(struct event_base *base, struct event *ev) { EVENT_BASE_ASSERT_LOCKED(base); @@ -2794,6 +2925,21 @@ event_queue_insert_active(struct event_base *base, struct event_callback *evcb) evcb, evcb_active_next); } +static void +event_queue_insert_active_later(struct event_base *base, struct event_callback *evcb) +{ + EVENT_BASE_ASSERT_LOCKED(base); + if (evcb->evcb_flags & (EVLIST_ACTIVE_LATER|EVLIST_ACTIVE)) { + /* Double insertion is possible */ + return; + } + + INCR_EVENT_COUNT(base, evcb->evcb_flags); + evcb->evcb_flags |= EVLIST_ACTIVE_LATER; + base->event_count_active++; + TAILQ_INSERT_TAIL(&base->active_later_queue, evcb, evcb_active_next); +} + static void event_queue_insert_timeout(struct event_base *base, struct event *ev) { @@ -2818,6 +2964,19 @@ event_queue_insert_timeout(struct event_base *base, struct event *ev) } } +static void +event_queue_make_later_events_active(struct event_base *base) +{ + struct event_callback *evcb; + EVENT_BASE_ASSERT_LOCKED(base); + + while ((evcb = TAILQ_FIRST(&base->active_later_queue))) { + TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next); + evcb->evcb_flags = (evcb->evcb_flags & ~EVLIST_ACTIVE_LATER) | EVLIST_ACTIVE; + TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], evcb, evcb_active_next); + } +} + /* Functions for debugging */ const char * @@ -3137,16 +3296,17 @@ dump_active_event_fn(struct event_base *base, struct event *e, void *arg) const char *gloss = (e->ev_events & EV_SIGNAL) ? "sig" : "fd "; - if (! (e->ev_flags & EVLIST_ACTIVE)) + if (! (e->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) return 0; - fprintf(output, " %p [%s %ld, priority=%d]%s%s%s%s active%s\n", + fprintf(output, " %p [%s %ld, priority=%d]%s%s%s%s active%s%s\n", (void*)e, gloss, (long)e->ev_fd, e->ev_pri, (e->ev_res&EV_READ)?" Read":"", (e->ev_res&EV_WRITE)?" Write":"", (e->ev_res&EV_SIGNAL)?" Signal":"", (e->ev_res&EV_TIMEOUT)?" Timeout":"", - (e->ev_flags&EVLIST_INTERNAL)?" [Internal]":""); + (e->ev_flags&EVLIST_INTERNAL)?" [Internal]":"", + (e->ev_flags&EVLIST_ACTIVE_LATER)?" [NextTime]":""); return 0; } @@ -3283,10 +3443,17 @@ event_base_assert_ok_(struct event_base *base) struct event_callback *evcb; EVUTIL_ASSERT_TAILQ_OK(&base->activequeues[i], event_callback, evcb_active_next); TAILQ_FOREACH(evcb, &base->activequeues[i], evcb_active_next) { - EVUTIL_ASSERT(evcb->evcb_flags & EVLIST_ACTIVE); + EVUTIL_ASSERT((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) == EVLIST_ACTIVE); EVUTIL_ASSERT(evcb->evcb_pri == i); } } + { + struct event_callback *evcb; + TAILQ_FOREACH(evcb, &base->active_later_queue, evcb_active_next) { + EVUTIL_ASSERT((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) == EVLIST_ACTIVE_LATER); + } + } + EVBASE_RELEASE_LOCK(base, th_base_lock); } diff --git a/include/event2/event_struct.h b/include/event2/event_struct.h index d2470f05..701e8eac 100644 --- a/include/event2/event_struct.h +++ b/include/event2/event_struct.h @@ -54,14 +54,15 @@ extern "C" { /* For evkeyvalq */ #include -#define EVLIST_TIMEOUT 0x01 -#define EVLIST_INSERTED 0x02 -#define EVLIST_SIGNAL 0x04 -#define EVLIST_ACTIVE 0x08 -#define EVLIST_INTERNAL 0x10 -#define EVLIST_INIT 0x80 +#define EVLIST_TIMEOUT 0x01 +#define EVLIST_INSERTED 0x02 +#define EVLIST_SIGNAL 0x04 +#define EVLIST_ACTIVE 0x08 +#define EVLIST_INTERNAL 0x10 +#define EVLIST_ACTIVE_LATER 0x20 +#define EVLIST_INIT 0x80 -#define EVLIST_ALL 0x9f +#define EVLIST_ALL 0xbf /* Fix so that people don't have to run with */ #ifndef TAILQ_ENTRY diff --git a/test/regress.c b/test/regress.c index 0c5faed7..b736d891 100644 --- a/test/regress.c +++ b/test/regress.c @@ -1350,6 +1350,66 @@ end: ; } +static int n_write_a_byte_cb=0; +static int n_read_and_drain_cb=0; +static int n_activate_other_event_cb=0; +static void +write_a_byte_cb(evutil_socket_t fd, short what, void *arg) +{ + char buf[] = "x"; + write(fd, buf, 1); + ++n_write_a_byte_cb; +} +static void +read_and_drain_cb(evutil_socket_t fd, short what, void *arg) +{ + char buf[128]; + int n; + ++n_read_and_drain_cb; + while ((n = read(fd, buf, sizeof(buf))) > 0) + ; +} + +static void +activate_other_event_cb(evutil_socket_t fd, short what, void *other_) +{ + struct event *ev_activate = other_; + ++n_activate_other_event_cb; + event_active_later_(ev_activate, EV_READ); +} + +static void +test_active_later(void *ptr) +{ + struct basic_test_data *data = ptr; + struct event *ev1, *ev2; + struct event ev3, ev4; + struct timeval qsec = {0, 100000}; + ev1 = event_new(data->base, data->pair[0], EV_READ|EV_PERSIST, read_and_drain_cb, NULL); + ev2 = event_new(data->base, data->pair[1], EV_WRITE|EV_PERSIST, write_a_byte_cb, NULL); + event_assign(&ev3, data->base, -1, 0, activate_other_event_cb, &ev4); + event_assign(&ev4, data->base, -1, 0, activate_other_event_cb, &ev3); + event_add(ev1, NULL); + event_add(ev2, NULL); + event_active_later_(&ev3, EV_READ); + + event_base_loopexit(data->base, &qsec); + + event_base_loop(data->base, 0); + + TT_BLATHER(("%d write calls, %d read calls, %d activate-other calls.", + n_write_a_byte_cb, n_read_and_drain_cb, n_activate_other_event_cb)); + event_del(&ev3); + event_del(&ev4); + + tt_int_op(n_write_a_byte_cb, ==, n_activate_other_event_cb); + tt_int_op(n_write_a_byte_cb, >, 100); + tt_int_op(n_read_and_drain_cb, >, 100); + tt_int_op(n_activate_other_event_cb, >, 100); +end: + ; +} + static void test_event_base_new(void *ptr) { @@ -2468,7 +2528,9 @@ struct testcase_t main_testcases[] = { BASIC(bad_assign, TT_FORK|TT_NEED_BASE|TT_NO_LOGS), BASIC(bad_reentrant, TT_FORK|TT_NEED_BASE|TT_NO_LOGS), + BASIC(active_later, TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR), + /* These are still using the old API */ LEGACY(persistent_timeout, TT_FORK|TT_NEED_BASE), { "persistent_timeout_jump", test_persistent_timeout_jump, TT_FORK|TT_NEED_BASE, &basic_setup, NULL }, { "persistent_active_timeout", test_persistent_active_timeout,