Add "active later" event_callbacks to supersede deferred

An event or event callback can now be in an additional state: "active
later".  When an event is in this state, it will become active the
next time we run through the event loop.  This lets us do what we
wanted to with deferred callbacks: make a type of active thing that
avoids infinite circular regress in a way that starves other events or
exhausts the stack.  It improves on deferred callbacks by respecting
priorities, and by having a non-kludgy way to avoid event starvation.
This commit is contained in:
Nick Mathewson 2012-04-06 03:00:40 -04:00
parent 9889a3d883
commit 745a63dba3
4 changed files with 266 additions and 23 deletions

View File

@ -225,6 +225,9 @@ struct event_base {
struct evcallback_list *activequeues; struct evcallback_list *activequeues;
/** The length of the activequeues array */ /** The length of the activequeues array */
int nactivequeues; int nactivequeues;
/** A list of event_callbacks that should become active the next time
* we process events, but not this time. */
struct evcallback_list active_later_queue;
/* common timeout logic */ /* common timeout logic */
@ -364,7 +367,17 @@ int evsig_restore_handler_(struct event_base *base, int evsignal);
void event_active_nolock_(struct event *ev, int res, short count); void event_active_nolock_(struct event *ev, int res, short count);
void event_callback_activate_nolock_(struct event_base *, struct event_callback *); void event_callback_activate_nolock_(struct event_base *, struct event_callback *);
int event_callback_cancel_(struct event_base *base,
struct event_callback *evcb);
void event_active_later_(struct event *ev, int res);
void event_active_later_nolock_(struct event *ev, int res);
void event_callback_activate_later_nolock_(struct event_base *base,
struct event_callback *evcb);
int event_callback_cancel_nolock_(struct event_base *base,
struct event_callback *evcb);
void event_callback_init_(struct event_base *base,
struct event_callback *cb);
/* FIXME document. */ /* FIXME document. */
void event_base_add_virtual_(struct event_base *base); void event_base_add_virtual_(struct event_base *base);

199
event.c
View File

@ -136,11 +136,15 @@ static inline int event_add_internal(struct event *ev,
static inline int event_del_internal(struct event *ev); static inline int event_del_internal(struct event *ev);
static void event_queue_insert_active(struct event_base *, struct event_callback *); static void event_queue_insert_active(struct event_base *, struct event_callback *);
static void event_queue_insert_active_later(struct event_base *, struct event_callback *);
static void event_queue_insert_timeout(struct event_base *, struct event *); static void event_queue_insert_timeout(struct event_base *, struct event *);
static void event_queue_insert_inserted(struct event_base *, struct event *); static void event_queue_insert_inserted(struct event_base *, struct event *);
static void event_queue_remove_active(struct event_base *, struct event_callback *); static void event_queue_remove_active(struct event_base *, struct event_callback *);
static void event_queue_remove_active_later(struct event_base *, struct event_callback *);
static void event_queue_remove_timeout(struct event_base *, struct event *); static void event_queue_remove_timeout(struct event_base *, struct event *);
static void event_queue_remove_inserted(struct event_base *, struct event *); static void event_queue_remove_inserted(struct event_base *, struct event *);
static void event_queue_make_later_events_active(struct event_base *base);
#ifdef USE_REINSERT_TIMEOUT #ifdef USE_REINSERT_TIMEOUT
/* This code seems buggy; only turn it on if we find out what the trouble is. */ /* This code seems buggy; only turn it on if we find out what the trouble is. */
static void event_queue_reinsert_timeout(struct event_base *,struct event *, int was_common, int is_common, int old_timeout_idx); static void event_queue_reinsert_timeout(struct event_base *,struct event *, int was_common, int is_common, int old_timeout_idx);
@ -606,6 +610,8 @@ event_base_new_with_config(const struct event_config *cfg)
base->defer_queue.notify_fn = notify_base_cbq_callback; base->defer_queue.notify_fn = notify_base_cbq_callback;
base->defer_queue.notify_arg = base; base->defer_queue.notify_arg = base;
TAILQ_INIT(&base->active_later_queue);
evmap_io_initmap_(&base->io); evmap_io_initmap_(&base->io);
evmap_signal_initmap_(&base->sigmap); evmap_signal_initmap_(&base->sigmap);
event_changelist_init_(&base->changelist); event_changelist_init_(&base->changelist);
@ -800,11 +806,26 @@ event_base_free(struct event_base *base)
++n_deleted; ++n_deleted;
} }
} else { } else {
event_queue_remove_active(base, evcb); event_callback_cancel_(base, evcb);
++n_deleted;
} }
evcb = next; evcb = next;
} }
} }
{
struct event_callback *evcb;
while ((evcb = TAILQ_FIRST(&base->active_later_queue))) {
if (evcb->evcb_flags & EVLIST_INIT) {
ev = event_callback_to_event(evcb);
event_del(ev);
++n_deleted;
} else {
event_callback_cancel_(base, evcb);
++n_deleted;
}
}
}
if (n_deleted) if (n_deleted)
event_debug(("%s: %d events were still set in base", event_debug(("%s: %d events were still set in base",
@ -1754,6 +1775,8 @@ event_base_loop(struct event_base *base, int flags)
goto done; goto done;
} }
event_queue_make_later_events_active(base);
clear_time_cache(base); clear_time_cache(base);
res = evsel->dispatch(base, tv_p); res = evsel->dispatch(base, tv_p);
@ -2031,7 +2054,7 @@ event_pending(const struct event *ev, short event, struct timeval *tv)
if (ev->ev_flags & EVLIST_INSERTED) if (ev->ev_flags & EVLIST_INSERTED)
flags |= (ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)); flags |= (ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL));
if (ev->ev_flags & EVLIST_ACTIVE) if (ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))
flags |= ev->ev_res; flags |= ev->ev_res;
if (ev->ev_flags & EVLIST_TIMEOUT) if (ev->ev_flags & EVLIST_TIMEOUT)
flags |= EV_TIMEOUT; flags |= EV_TIMEOUT;
@ -2235,7 +2258,7 @@ event_add_internal(struct event *ev, const struct timeval *tv,
#endif #endif
if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) && if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
!(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) { !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) {
if (ev->ev_events & (EV_READ|EV_WRITE)) if (ev->ev_events & (EV_READ|EV_WRITE))
res = evmap_io_add_(base, ev->ev_fd, ev); res = evmap_io_add_(base, ev->ev_fd, ev);
else if (ev->ev_events & EV_SIGNAL) else if (ev->ev_events & EV_SIGNAL)
@ -2421,6 +2444,8 @@ event_del_internal(struct event *ev)
if (ev->ev_flags & EVLIST_ACTIVE) if (ev->ev_flags & EVLIST_ACTIVE)
event_queue_remove_active(base, event_to_event_callback(ev)); event_queue_remove_active(base, event_to_event_callback(ev));
else if (ev->ev_flags & EVLIST_ACTIVE_LATER)
event_queue_remove_active_later(base, event_to_event_callback(ev));
if (ev->ev_flags & EVLIST_INSERTED) { if (ev->ev_flags & EVLIST_INSERTED) {
event_queue_remove_inserted(base, ev); event_queue_remove_inserted(base, ev);
@ -2470,18 +2495,25 @@ event_active_nolock_(struct event *ev, int res, short ncalls)
event_debug(("event_active: %p (fd %d), res %d, callback %p", event_debug(("event_active: %p (fd %d), res %d, callback %p",
ev, (int)ev->ev_fd, (int)res, ev->ev_callback)); ev, (int)ev->ev_fd, (int)res, ev->ev_callback));
/* We get different kinds of events, add them together */
if (ev->ev_flags & EVLIST_ACTIVE) {
ev->ev_res |= res;
return;
}
base = ev->ev_base; base = ev->ev_base;
EVENT_BASE_ASSERT_LOCKED(base); EVENT_BASE_ASSERT_LOCKED(base);
ev->ev_res = res; switch ((ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) {
default:
case EVLIST_ACTIVE|EVLIST_ACTIVE_LATER:
EVUTIL_ASSERT(0);
break;
case EVLIST_ACTIVE:
/* We get different kinds of events, add them together */
ev->ev_res |= res;
return;
case EVLIST_ACTIVE_LATER:
ev->ev_res |= res;
break;
case 0:
ev->ev_res = res;
break;
}
if (ev->ev_pri < base->event_running_priority) if (ev->ev_pri < base->event_running_priority)
base->event_continue = 1; base->event_continue = 1;
@ -2501,16 +2533,100 @@ event_active_nolock_(struct event *ev, int res, short ncalls)
event_callback_activate_nolock_(base, event_to_event_callback(ev)); event_callback_activate_nolock_(base, event_to_event_callback(ev));
} }
void
event_active_later_(struct event *ev, int res)
{
EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
event_active_later_nolock_(ev, res);
EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
}
void
event_active_later_nolock_(struct event *ev, int res)
{
struct event_base *base = ev->ev_base;
EVENT_BASE_ASSERT_LOCKED(base);
if (ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) {
/* We get different kinds of events, add them together */
ev->ev_res |= res;
return;
}
ev->ev_res = res;
event_callback_activate_later_nolock_(base, event_to_event_callback(ev));
}
void void
event_callback_activate_nolock_(struct event_base *base, event_callback_activate_nolock_(struct event_base *base,
struct event_callback *evcb) struct event_callback *evcb)
{ {
if (evcb->evcb_flags & EVLIST_ACTIVE_LATER)
event_queue_remove_active_later(base, evcb);
event_queue_insert_active(base, evcb); event_queue_insert_active(base, evcb);
if (EVBASE_NEED_NOTIFY(base)) if (EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base); evthread_notify_base(base);
} }
void
event_callback_activate_later_nolock_(struct event_base *base,
struct event_callback *evcb)
{
if (evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))
return;
event_queue_insert_active_later(base, evcb);
if (EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base);
}
void
event_callback_init_(struct event_base *base,
struct event_callback *cb)
{
memset(cb, 0, sizeof(*cb));
cb->evcb_pri = base->nactivequeues - 1;
}
int
event_callback_cancel_(struct event_base *base,
struct event_callback *evcb)
{
int r;
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
r = event_callback_cancel_nolock_(base, evcb);
EVBASE_RELEASE_LOCK(base, th_base_lock);
return r;
}
int
event_callback_cancel_nolock_(struct event_base *base,
struct event_callback *evcb)
{
if (evcb->evcb_flags & EVLIST_INIT)
return event_del_internal(event_callback_to_event(evcb));
switch ((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) {
default:
case EVLIST_ACTIVE|EVLIST_ACTIVE_LATER:
EVUTIL_ASSERT(0);
break;
case EVLIST_ACTIVE:
/* We get different kinds of events, add them together */
event_queue_remove_active(base, evcb);
return 0;
case EVLIST_ACTIVE_LATER:
event_queue_remove_active_later(base, evcb);
break;
case 0:
break;
}
return 0;
}
void void
event_deferred_cb_init_(struct deferred_cb *cb, deferred_cb_fn fn, void *arg) event_deferred_cb_init_(struct deferred_cb *cb, deferred_cb_fn fn, void *arg)
{ {
@ -2666,6 +2782,21 @@ event_queue_remove_active(struct event_base *base, struct event_callback *evcb)
evcb, evcb_active_next); evcb, evcb_active_next);
} }
static void static void
event_queue_remove_active_later(struct event_base *base, struct event_callback *evcb)
{
EVENT_BASE_ASSERT_LOCKED(base);
if (EVUTIL_FAILURE_CHECK(!(evcb->evcb_flags & EVLIST_ACTIVE_LATER))) {
event_errx(1, "%s: %p not on queue %x", __func__,
evcb, EVLIST_ACTIVE_LATER);
return;
}
DECR_EVENT_COUNT(base, evcb->evcb_flags);
evcb->evcb_flags &= ~EVLIST_ACTIVE_LATER;
base->event_count_active--;
TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next);
}
static void
event_queue_remove_timeout(struct event_base *base, struct event *ev) event_queue_remove_timeout(struct event_base *base, struct event *ev)
{ {
EVENT_BASE_ASSERT_LOCKED(base); EVENT_BASE_ASSERT_LOCKED(base);
@ -2794,6 +2925,21 @@ event_queue_insert_active(struct event_base *base, struct event_callback *evcb)
evcb, evcb_active_next); evcb, evcb_active_next);
} }
static void
event_queue_insert_active_later(struct event_base *base, struct event_callback *evcb)
{
EVENT_BASE_ASSERT_LOCKED(base);
if (evcb->evcb_flags & (EVLIST_ACTIVE_LATER|EVLIST_ACTIVE)) {
/* Double insertion is possible */
return;
}
INCR_EVENT_COUNT(base, evcb->evcb_flags);
evcb->evcb_flags |= EVLIST_ACTIVE_LATER;
base->event_count_active++;
TAILQ_INSERT_TAIL(&base->active_later_queue, evcb, evcb_active_next);
}
static void static void
event_queue_insert_timeout(struct event_base *base, struct event *ev) event_queue_insert_timeout(struct event_base *base, struct event *ev)
{ {
@ -2818,6 +2964,19 @@ event_queue_insert_timeout(struct event_base *base, struct event *ev)
} }
} }
static void
event_queue_make_later_events_active(struct event_base *base)
{
struct event_callback *evcb;
EVENT_BASE_ASSERT_LOCKED(base);
while ((evcb = TAILQ_FIRST(&base->active_later_queue))) {
TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next);
evcb->evcb_flags = (evcb->evcb_flags & ~EVLIST_ACTIVE_LATER) | EVLIST_ACTIVE;
TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], evcb, evcb_active_next);
}
}
/* Functions for debugging */ /* Functions for debugging */
const char * const char *
@ -3137,16 +3296,17 @@ dump_active_event_fn(struct event_base *base, struct event *e, void *arg)
const char *gloss = (e->ev_events & EV_SIGNAL) ? const char *gloss = (e->ev_events & EV_SIGNAL) ?
"sig" : "fd "; "sig" : "fd ";
if (! (e->ev_flags & EVLIST_ACTIVE)) if (! (e->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)))
return 0; return 0;
fprintf(output, " %p [%s %ld, priority=%d]%s%s%s%s active%s\n", fprintf(output, " %p [%s %ld, priority=%d]%s%s%s%s active%s%s\n",
(void*)e, gloss, (long)e->ev_fd, e->ev_pri, (void*)e, gloss, (long)e->ev_fd, e->ev_pri,
(e->ev_res&EV_READ)?" Read":"", (e->ev_res&EV_READ)?" Read":"",
(e->ev_res&EV_WRITE)?" Write":"", (e->ev_res&EV_WRITE)?" Write":"",
(e->ev_res&EV_SIGNAL)?" Signal":"", (e->ev_res&EV_SIGNAL)?" Signal":"",
(e->ev_res&EV_TIMEOUT)?" Timeout":"", (e->ev_res&EV_TIMEOUT)?" Timeout":"",
(e->ev_flags&EVLIST_INTERNAL)?" [Internal]":""); (e->ev_flags&EVLIST_INTERNAL)?" [Internal]":"",
(e->ev_flags&EVLIST_ACTIVE_LATER)?" [NextTime]":"");
return 0; return 0;
} }
@ -3283,10 +3443,17 @@ event_base_assert_ok_(struct event_base *base)
struct event_callback *evcb; struct event_callback *evcb;
EVUTIL_ASSERT_TAILQ_OK(&base->activequeues[i], event_callback, evcb_active_next); EVUTIL_ASSERT_TAILQ_OK(&base->activequeues[i], event_callback, evcb_active_next);
TAILQ_FOREACH(evcb, &base->activequeues[i], evcb_active_next) { TAILQ_FOREACH(evcb, &base->activequeues[i], evcb_active_next) {
EVUTIL_ASSERT(evcb->evcb_flags & EVLIST_ACTIVE); EVUTIL_ASSERT((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) == EVLIST_ACTIVE);
EVUTIL_ASSERT(evcb->evcb_pri == i); EVUTIL_ASSERT(evcb->evcb_pri == i);
} }
} }
{
struct event_callback *evcb;
TAILQ_FOREACH(evcb, &base->active_later_queue, evcb_active_next) {
EVUTIL_ASSERT((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) == EVLIST_ACTIVE_LATER);
}
}
EVBASE_RELEASE_LOCK(base, th_base_lock); EVBASE_RELEASE_LOCK(base, th_base_lock);
} }

View File

@ -54,14 +54,15 @@ extern "C" {
/* For evkeyvalq */ /* For evkeyvalq */
#include <event2/keyvalq_struct.h> #include <event2/keyvalq_struct.h>
#define EVLIST_TIMEOUT 0x01 #define EVLIST_TIMEOUT 0x01
#define EVLIST_INSERTED 0x02 #define EVLIST_INSERTED 0x02
#define EVLIST_SIGNAL 0x04 #define EVLIST_SIGNAL 0x04
#define EVLIST_ACTIVE 0x08 #define EVLIST_ACTIVE 0x08
#define EVLIST_INTERNAL 0x10 #define EVLIST_INTERNAL 0x10
#define EVLIST_INIT 0x80 #define EVLIST_ACTIVE_LATER 0x20
#define EVLIST_INIT 0x80
#define EVLIST_ALL 0x9f #define EVLIST_ALL 0xbf
/* Fix so that people don't have to run with <sys/queue.h> */ /* Fix so that people don't have to run with <sys/queue.h> */
#ifndef TAILQ_ENTRY #ifndef TAILQ_ENTRY

View File

@ -1350,6 +1350,66 @@ end:
; ;
} }
static int n_write_a_byte_cb=0;
static int n_read_and_drain_cb=0;
static int n_activate_other_event_cb=0;
static void
write_a_byte_cb(evutil_socket_t fd, short what, void *arg)
{
char buf[] = "x";
write(fd, buf, 1);
++n_write_a_byte_cb;
}
static void
read_and_drain_cb(evutil_socket_t fd, short what, void *arg)
{
char buf[128];
int n;
++n_read_and_drain_cb;
while ((n = read(fd, buf, sizeof(buf))) > 0)
;
}
static void
activate_other_event_cb(evutil_socket_t fd, short what, void *other_)
{
struct event *ev_activate = other_;
++n_activate_other_event_cb;
event_active_later_(ev_activate, EV_READ);
}
static void
test_active_later(void *ptr)
{
struct basic_test_data *data = ptr;
struct event *ev1, *ev2;
struct event ev3, ev4;
struct timeval qsec = {0, 100000};
ev1 = event_new(data->base, data->pair[0], EV_READ|EV_PERSIST, read_and_drain_cb, NULL);
ev2 = event_new(data->base, data->pair[1], EV_WRITE|EV_PERSIST, write_a_byte_cb, NULL);
event_assign(&ev3, data->base, -1, 0, activate_other_event_cb, &ev4);
event_assign(&ev4, data->base, -1, 0, activate_other_event_cb, &ev3);
event_add(ev1, NULL);
event_add(ev2, NULL);
event_active_later_(&ev3, EV_READ);
event_base_loopexit(data->base, &qsec);
event_base_loop(data->base, 0);
TT_BLATHER(("%d write calls, %d read calls, %d activate-other calls.",
n_write_a_byte_cb, n_read_and_drain_cb, n_activate_other_event_cb));
event_del(&ev3);
event_del(&ev4);
tt_int_op(n_write_a_byte_cb, ==, n_activate_other_event_cb);
tt_int_op(n_write_a_byte_cb, >, 100);
tt_int_op(n_read_and_drain_cb, >, 100);
tt_int_op(n_activate_other_event_cb, >, 100);
end:
;
}
static void static void
test_event_base_new(void *ptr) test_event_base_new(void *ptr)
{ {
@ -2468,7 +2528,9 @@ struct testcase_t main_testcases[] = {
BASIC(bad_assign, TT_FORK|TT_NEED_BASE|TT_NO_LOGS), BASIC(bad_assign, TT_FORK|TT_NEED_BASE|TT_NO_LOGS),
BASIC(bad_reentrant, TT_FORK|TT_NEED_BASE|TT_NO_LOGS), BASIC(bad_reentrant, TT_FORK|TT_NEED_BASE|TT_NO_LOGS),
BASIC(active_later, TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR),
/* These are still using the old API */
LEGACY(persistent_timeout, TT_FORK|TT_NEED_BASE), LEGACY(persistent_timeout, TT_FORK|TT_NEED_BASE),
{ "persistent_timeout_jump", test_persistent_timeout_jump, TT_FORK|TT_NEED_BASE, &basic_setup, NULL }, { "persistent_timeout_jump", test_persistent_timeout_jump, TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
{ "persistent_active_timeout", test_persistent_active_timeout, { "persistent_active_timeout", test_persistent_active_timeout,