Implement queued timeouts for case where many timeouts are the same.

Libevent's current timeout code is relatively optimized for the
randomly scattered timeout case, where events are added with their
timeouts in no particular order.  We add and remove timeouts with
O(lg n) behavior.

Frequently, however, an application will want to have many timeouts
of the same value.  For example, we might have 1000 bufferevents,
each with a 2 second timeout on reading or writing.  If we knew this
were always the case, we could just put timeouts in a queue and get
O(1) add and remove behavior.  Of course, a queue would give O(n)
performance for a scattered timeout pattern, so we don't want to
just switch the implementation.

This patch gives the user the ability to explicitly tag certain
timeout values as being "very common".  These timeout values have a
cookie encoded in the high bits of their tv_usec field to indicate
which queue they belong on.  The queues themselves are each
triggered by an entry in the minheap.

See the regress_main.c code for an example use.

svn:r1517
This commit is contained in:
Nick Mathewson 2009-11-09 17:16:30 +00:00
parent 784b8773a4
commit 693c24ef9d
9 changed files with 371 additions and 25 deletions

View File

@ -39,6 +39,8 @@ Changes in 2.0.3-alpha:
o Remove compat/sys/_time.h. It interfered with system headers on HPUX, and its functionality has been subsumed by event2/util.h and util-internal.h.
o Add a new bufferevent_socket_connect_hostname() to encapsulate the resolve-then-connect operation.
o Build kqueue.c correctly on GNU/kFreeBSD platforms. Patch pulled upstream from Debian.
o Alternative queue-based timeout algorithm for programs that use a large number of timeouts with the same value.
Changes in 2.0.2-alpha:
o Add a new flag to bufferevents to make all callbacks automatically deferred.

View File

@ -107,6 +107,13 @@ struct event_signal_map {
int nentries;
};
struct common_timeout_list {
struct event_list events;
struct timeval duration;
struct event timeout_event;
struct event_base *base;
};
struct event_base {
/** Function pointers and other data to describe this event_base's
* backend. */
@ -134,6 +141,10 @@ struct event_base {
struct event_list **activequeues;
int nactivequeues;
struct common_timeout_list **common_timeout_queues;
int n_common_timeouts;
int n_common_timeouts_allocated;
/** The event whose callback is executing right now */
struct event *current_event;

230
event.c
View File

@ -403,6 +403,23 @@ event_base_free(struct event_base *base)
event_del(ev);
++n_deleted;
}
for (i = 0; i < base->n_common_timeouts; ++i) {
struct common_timeout_list *ctl =
base->common_timeout_queues[i];
event_del(&ctl->timeout_event); /* Internal; doesn't count */
for (ev = TAILQ_FIRST(&ctl->events); ev; ) {
struct event *next = TAILQ_NEXT(ev,
ev_timeout_pos.ev_next_with_common_timeout);
if (!(ev->ev_flags & EVLIST_INTERNAL)) {
event_del(ev);
++n_deleted;
}
ev = next;
}
mm_free(ctl);
}
if (base->common_timeout_queues)
mm_free(base->common_timeout_queues);
for (i = 0; i < base->nactivequeues; ++i) {
for (ev = TAILQ_FIRST(base->activequeues[i]); ev; ) {
@ -669,6 +686,152 @@ event_signal_closure(struct event_base *base, struct event *ev)
}
}
#define MICROSECONDS_MASK 0x000fffff
#define COMMON_TIMEOUT_IDX_MASK 0x0ff00000
#define COMMON_TIMEOUT_IDX_SHIFT 20
#define COMMON_TIMEOUT_MASK 0xf0000000
#define COMMON_TIMEOUT_MAGIC 0x50000000
#define COMMON_TIMEOUT_IDX(tv) \
(((tv)->tv_usec & COMMON_TIMEOUT_IDX_MASK)>>COMMON_TIMEOUT_IDX_SHIFT)
static inline int
is_common_timeout(const struct timeval *tv,
const struct event_base *base)
{
int idx;
if ((tv->tv_usec & COMMON_TIMEOUT_MASK) != COMMON_TIMEOUT_MAGIC)
return 0;
idx = COMMON_TIMEOUT_IDX(tv);
return idx < base->n_common_timeouts;
}
static inline struct common_timeout_list *
get_common_timeout_list(struct event_base *base, const struct timeval *tv)
{
return base->common_timeout_queues[COMMON_TIMEOUT_IDX(tv)];
}
static inline int
common_timeout_ok(const struct timeval *tv,
struct event_base *base)
{
const struct timeval *expect =
&get_common_timeout_list(base, tv)->duration;
return tv->tv_sec == expect->tv_sec &&
tv->tv_usec == expect->tv_usec;
}
static void
common_timeout_schedule(struct common_timeout_list *ctl,
const struct timeval *now, struct event *head)
{
struct timeval delay;
struct timeval timeout = head->ev_timeout;
timeout.tv_usec &= MICROSECONDS_MASK;
evutil_timersub(&timeout, now, &delay);
event_add_internal(&ctl->timeout_event, &delay);
}
static void
common_timeout_callback(evutil_socket_t fd, short what, void *arg)
{
struct timeval now;
struct common_timeout_list *ctl = arg;
struct event_base *base = ctl->base;
struct event *ev = NULL;
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
gettime(base, &now);
while (1) {
ev = TAILQ_FIRST(&ctl->events);
if (!ev || ev->ev_timeout.tv_sec > now.tv_sec ||
(ev->ev_timeout.tv_sec == now.tv_sec &&
(ev->ev_timeout.tv_usec&MICROSECONDS_MASK) > now.tv_usec))
break;
event_del_internal(ev);
event_active_nolock(ev, EV_TIMEOUT, 1);
}
if (ev)
common_timeout_schedule(ctl, &now, ev);
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
}
#define MAX_COMMON_TIMEOUTS 256
const struct timeval *
event_base_init_common_timeout(struct event_base *base,
const struct timeval *duration)
{
int i;
struct timeval tv;
const struct timeval *result=NULL;
struct common_timeout_list *new_ctl;
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
if (duration->tv_usec > 1000000) {
memcpy(&tv, duration, sizeof(struct timeval));
if (is_common_timeout(duration, base))
tv.tv_usec &= MICROSECONDS_MASK;
tv.tv_sec += tv.tv_usec / 1000000;
tv.tv_usec %= 1000000;
duration = &tv;
}
for (i = 0; i < base->n_common_timeouts; ++i) {
const struct common_timeout_list *ctl =
base->common_timeout_queues[i];
if (duration->tv_sec == ctl->duration.tv_sec &&
duration->tv_usec ==
(ctl->duration.tv_usec & MICROSECONDS_MASK)) {
EVUTIL_ASSERT(is_common_timeout(&ctl->duration, base));
result = &ctl->duration;
goto done;
}
}
if (base->n_common_timeouts == MAX_COMMON_TIMEOUTS) {
event_warn("%s: Too many common timeouts already in use; "
"we only support %d per event_base", __func__,
MAX_COMMON_TIMEOUTS);
goto done;
}
if (base->n_common_timeouts_allocated == base->n_common_timeouts) {
int n = base->n_common_timeouts < 16 ? 16 :
base->n_common_timeouts*2;
struct common_timeout_list **newqueues =
mm_realloc(base->common_timeout_queues,
n*sizeof(struct common_timeout_queue *));
if (!newqueues) {
event_warn("%s: realloc",__func__);
goto done;
}
base->n_common_timeouts_allocated = n;
base->common_timeout_queues = newqueues;
}
new_ctl = mm_calloc(1, sizeof(struct common_timeout_list));
if (!new_ctl) {
event_warn("%s: calloc",__func__);
goto done;
}
TAILQ_INIT(&new_ctl->events);
new_ctl->duration.tv_sec = duration->tv_sec;
new_ctl->duration.tv_usec =
duration->tv_usec | COMMON_TIMEOUT_MAGIC |
(base->n_common_timeouts << COMMON_TIMEOUT_IDX_SHIFT);
evtimer_assign(&new_ctl->timeout_event, base,
common_timeout_callback, new_ctl);
new_ctl->timeout_event.ev_flags |= EVLIST_INTERNAL;
event_priority_set(&new_ctl->timeout_event, 0);
new_ctl->base = base;
base->common_timeout_queues[base->n_common_timeouts++] = new_ctl;
result = &new_ctl->duration;
done:
if (result)
EVUTIL_ASSERT(is_common_timeout(result, base));
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
return result;
}
/*
Helper for event_process_active to process all the events in a single queue,
releasing the lock as we go. This function requires that the lock be held
@ -1166,8 +1329,10 @@ event_pending(struct event *ev, short event, struct timeval *tv)
/* See if there is a timeout that we should report */
if (tv != NULL && (flags & event & EV_TIMEOUT)) {
struct timeval tmp = ev->ev_timeout;
gettime(ev->ev_base, &now);
evutil_timersub(&ev->ev_timeout, &now, &res);
tmp.tv_usec &= MICROSECONDS_MASK;
evutil_timersub(&tmp, &now, &res);
/* correctly remap to real time */
evutil_gettimeofday(&now, NULL);
evutil_timeradd(&now, &res, tv);
@ -1299,6 +1464,7 @@ event_add_internal(struct event *ev, const struct timeval *tv)
*/
if (res != -1 && tv != NULL) {
struct timeval now;
int common_timeout;
/*
* for persistent timeout events, we remember the
@ -1312,6 +1478,7 @@ event_add_internal(struct event *ev, const struct timeval *tv)
* are not replacing an existing timeout.
*/
if (ev->ev_flags & EVLIST_TIMEOUT) {
/* XXX I believe this is needless. */
if (min_heap_elt_is_top(ev))
notify = 1;
event_queue_remove(base, ev, EVLIST_TIMEOUT);
@ -1336,18 +1503,35 @@ event_add_internal(struct event *ev, const struct timeval *tv)
}
gettime(base, &now);
evutil_timeradd(&now, tv, &ev->ev_timeout);
common_timeout = is_common_timeout(tv, base);
if (common_timeout) {
struct timeval tmp = *tv;
tmp.tv_usec &= MICROSECONDS_MASK;
evutil_timeradd(&now, &tmp, &ev->ev_timeout);
ev->ev_timeout.tv_usec |=
(tv->tv_usec & ~MICROSECONDS_MASK);
} else {
evutil_timeradd(&now, tv, &ev->ev_timeout);
}
event_debug((
"event_add: timeout in %d seconds, call %p",
(int)tv->tv_sec, ev->ev_callback));
event_queue_insert(base, ev, EVLIST_TIMEOUT);
if (min_heap_elt_is_top(ev)) {
/* The earliest timeout is now earlier than it was
* before: we will need to tell the main thread to
* wake up earlier than it would otherwise. */
notify = 1;
if (common_timeout) {
struct common_timeout_list *ctl =
get_common_timeout_list(base, &ev->ev_timeout);
if (ev == TAILQ_FIRST(&ctl->events)) {
common_timeout_schedule(ctl, &now, ev);
}
} else {
/* See if the earliest timeout is now earlier than it
* was before: if so, we will need to tell the main
* thread to wake up earlier than it would
* otherwise. */
if (min_heap_elt_is_top(ev))
notify = 1;
}
}
@ -1578,6 +1762,7 @@ timeout_correct(struct event_base *base, struct timeval *tv)
struct event **pev;
unsigned int size;
struct timeval off;
int i;
if (use_monotonic)
return;
@ -1604,6 +1789,20 @@ timeout_correct(struct event_base *base, struct timeval *tv)
struct timeval *ev_tv = &(**pev).ev_timeout;
evutil_timersub(ev_tv, &off, ev_tv);
}
for (i=0; i<base->n_common_timeouts; ++i) {
struct event *ev;
struct common_timeout_list *ctl =
base->common_timeout_queues[i];
TAILQ_FOREACH(ev, &ctl->events,
ev_timeout_pos.ev_next_with_common_timeout) {
struct timeval *ev_tv = &ev->ev_timeout;
ev_tv->tv_usec &= MICROSECONDS_MASK;
evutil_timersub(ev_tv, &off, ev_tv);
ev_tv->tv_usec |= COMMON_TIMEOUT_MAGIC |
(i<<COMMON_TIMEOUT_IDX_SHIFT);
}
}
/* Now remember what the new time turned out to be. */
base->event_tv = *tv;
}
@ -1655,7 +1854,14 @@ event_queue_remove(struct event_base *base, struct event *ev, int queue)
ev, ev_active_next);
break;
case EVLIST_TIMEOUT:
min_heap_erase(&base->timeheap, ev);
if (is_common_timeout(&ev->ev_timeout, base)) {
struct common_timeout_list *ctl =
get_common_timeout_list(base, &ev->ev_timeout);
TAILQ_REMOVE(&ctl->events, ev,
ev_timeout_pos.ev_next_with_common_timeout);
} else {
min_heap_erase(&base->timeheap, ev);
}
break;
default:
event_errx(1, "%s: unknown queue %x", __func__, queue);
@ -1688,7 +1894,13 @@ event_queue_insert(struct event_base *base, struct event *ev, int queue)
ev,ev_active_next);
break;
case EVLIST_TIMEOUT: {
min_heap_push(&base->timeheap, ev);
if (is_common_timeout(&ev->ev_timeout, base)) {
struct common_timeout_list *ctl =
get_common_timeout_list(base, &ev->ev_timeout);
TAILQ_INSERT_TAIL(&ctl->events, ev,
ev_timeout_pos.ev_next_with_common_timeout);
} else
min_heap_push(&base->timeheap, ev);
break;
}
default:

View File

@ -615,6 +615,27 @@ int event_base_priority_init(struct event_base *, int);
*/
int event_priority_set(struct event *, int);
/**
Prepare Libevent to use a large number of timeouts with the same duration.
Libevent's default scheduling algorithm is optimized for having a large
number of timeouts with their durations more or less randomly distributed.
If you have a large number of timeouts that all have the same duration (for
example, if you have a large number of connections that all have a
10-second timeout), then you can improve Libevent's performance by telling
Libevent about it.
To do this, call this function with the common duration. It will return a
pointer to a different, opaque timeout value. (Don't depend on its actual
contents!) When you use this timeout value in event_add(), Libevent will
schedule the event more efficiently.
(This optimization probably will not be worthwhile until you have thousands
or tens of thousands of events with the same timeout.)
*/
const struct timeval *event_base_init_common_timeout(struct event_base *base,
const struct timeval *duration);
#ifndef _EVENT_DISABLE_MM_REPLACEMENT
/**
Override the functions that Libevent uses for memory management.

View File

@ -80,7 +80,11 @@ struct event_base;
struct event {
TAILQ_ENTRY (event) (ev_active_next);
TAILQ_ENTRY (event) (ev_next);
int min_heap_idx; /* for managing timeouts */
/* for managing timeouts */
union {
TAILQ_ENTRY (event) (ev_next_with_common_timeout);
int min_heap_idx;
} ev_timeout_pos;
evutil_socket_t ev_fd;
struct event_base *ev_base;

View File

@ -62,7 +62,7 @@ int min_heap_elem_greater(struct event *a, struct event *b)
void min_heap_ctor(min_heap_t* s) { s->p = 0; s->n = 0; s->a = 0; }
void min_heap_dtor(min_heap_t* s) { free(s->p); }
void min_heap_elem_init(struct event* e) { e->min_heap_idx = -1; }
void min_heap_elem_init(struct event* e) { e->ev_timeout_pos.min_heap_idx = -1; }
int min_heap_empty(min_heap_t* s) { return 0u == s->n; }
unsigned min_heap_size(min_heap_t* s) { return s->n; }
struct event* min_heap_top(min_heap_t* s) { return s->n ? *s->p : 0; }
@ -81,7 +81,7 @@ struct event* min_heap_pop(min_heap_t* s)
{
struct event* e = *s->p;
min_heap_shift_down_(s, 0u, s->p[--s->n]);
e->min_heap_idx = -1;
e->ev_timeout_pos.min_heap_idx = -1;
return e;
}
return 0;
@ -89,25 +89,25 @@ struct event* min_heap_pop(min_heap_t* s)
int min_heap_elt_is_top(const struct event *e)
{
return e->min_heap_idx == 0;
return e->ev_timeout_pos.min_heap_idx == 0;
}
int min_heap_erase(min_heap_t* s, struct event* e)
{
if(((unsigned int)-1) != e->min_heap_idx)
if(((unsigned int)-1) != e->ev_timeout_pos.min_heap_idx)
{
struct event *last = s->p[--s->n];
unsigned parent = (e->min_heap_idx - 1) / 2;
unsigned parent = (e->ev_timeout_pos.min_heap_idx - 1) / 2;
/* we replace e with the last element in the heap. We might need to
shift it upward if it is less than its parent, or downward if it is
greater than one or both its children. Since the children are known
to be less than the parent, it can't need to shift both up and
down. */
if (e->min_heap_idx > 0 && min_heap_elem_greater(s->p[parent], last))
min_heap_shift_up_(s, e->min_heap_idx, last);
if (e->ev_timeout_pos.min_heap_idx > 0 && min_heap_elem_greater(s->p[parent], last))
min_heap_shift_up_(s, e->ev_timeout_pos.min_heap_idx, last);
else
min_heap_shift_down_(s, e->min_heap_idx, last);
e->min_heap_idx = -1;
min_heap_shift_down_(s, e->ev_timeout_pos.min_heap_idx, last);
e->ev_timeout_pos.min_heap_idx = -1;
return 0;
}
return -1;
@ -134,11 +134,11 @@ void min_heap_shift_up_(min_heap_t* s, unsigned hole_index, struct event* e)
unsigned parent = (hole_index - 1) / 2;
while(hole_index && min_heap_elem_greater(s->p[parent], e))
{
(s->p[hole_index] = s->p[parent])->min_heap_idx = hole_index;
(s->p[hole_index] = s->p[parent])->ev_timeout_pos.min_heap_idx = hole_index;
hole_index = parent;
parent = (hole_index - 1) / 2;
}
(s->p[hole_index] = e)->min_heap_idx = hole_index;
(s->p[hole_index] = e)->ev_timeout_pos.min_heap_idx = hole_index;
}
void min_heap_shift_down_(min_heap_t* s, unsigned hole_index, struct event* e)
@ -149,7 +149,7 @@ void min_heap_shift_down_(min_heap_t* s, unsigned hole_index, struct event* e)
min_child -= min_child == s->n || min_heap_elem_greater(s->p[min_child], s->p[min_child - 1]);
if(!(min_heap_elem_greater(e, s->p[min_child])))
break;
(s->p[hole_index] = s->p[min_child])->min_heap_idx = hole_index;
(s->p[hole_index] = s->p[min_child])->ev_timeout_pos.min_heap_idx = hole_index;
hole_index = min_child;
min_child = 2 * (hole_index + 1);
}

View File

@ -548,7 +548,100 @@ test_persistent_timeout(void)
event_dispatch();
event_del(&ev);
}
static int total_common_counts;
struct common_timeout_info {
struct event ev;
struct timeval called_at;
int which;
int count;
};
static void
common_timeout_cb(int fd, short event, void *arg)
{
struct common_timeout_info *ti = arg;
++ti->count;
evutil_gettimeofday(&ti->called_at, NULL);
if (ti->count >= 6)
event_del(&ti->ev);
}
static void
test_common_timeout(void *ptr)
{
struct basic_test_data *data = ptr;
struct event_base *base = data->base;
int i;
struct common_timeout_info info[100];
struct timeval now;
struct timeval tmp_100_ms = { 0, 100*1000 };
struct timeval tmp_200_ms = { 0, 200*1000 };
const struct timeval *ms_100, *ms_200;
ms_100 = event_base_init_common_timeout(base, &tmp_100_ms);
ms_200 = event_base_init_common_timeout(base, &tmp_200_ms);
tt_assert(ms_100);
tt_assert(ms_200);
tt_ptr_op(event_base_init_common_timeout(base, &tmp_200_ms),
==, ms_200);
tt_int_op(ms_100->tv_sec, ==, 0);
tt_int_op(ms_200->tv_sec, ==, 0);
tt_int_op(ms_100->tv_usec, ==, 100000|0x50000000);
tt_int_op(ms_200->tv_usec, ==, 200000|0x50100000);
total_common_counts = 0;
memset(info, 0, sizeof(info));
for (i=0; i<100; ++i) {
info[i].which = i;
event_assign(&info[i].ev, base, -1, EV_TIMEOUT|EV_PERSIST,
common_timeout_cb, &info[i]);
if (i % 2) {
event_add(&info[i].ev, ms_100);
} else {
event_add(&info[i].ev, ms_200);
}
}
event_base_dispatch(base);
evutil_gettimeofday(&now, NULL);
for (i=0; i<10; ++i) {
struct timeval tmp;
int ms_diff;
tt_int_op(info[i].count, ==, 6);
evutil_timersub(&now, &info[i].called_at, &tmp);
ms_diff = tmp.tv_usec/1000 + tmp.tv_sec*1000;
if (i % 2) {
tt_int_op(ms_diff, >, 500);
tt_int_op(ms_diff, <, 700);
} else {
tt_int_op(ms_diff, >, -100);
tt_int_op(ms_diff, <, 100);
}
}
/* Make sure we can free the base with some events in. */
for (i=0; i<100; ++i) {
if (i % 2) {
event_add(&info[i].ev, ms_100);
} else {
event_add(&info[i].ev, ms_200);
}
}
end:
event_base_free(data->base); /* need to do this here before info is
* out-of-scope */
data->base = NULL;
}
#ifndef WIN32
@ -1880,6 +1973,8 @@ struct testcase_t main_testcases[] = {
/* These are still using the old API */
LEGACY(persistent_timeout, TT_FORK|TT_NEED_BASE),
LEGACY(priorities, TT_FORK|TT_NEED_BASE),
{ "common_timeout", test_common_timeout, TT_FORK|TT_NEED_BASE,
&basic_setup, NULL },
/* These legacy tests may not all need all of these flags. */
LEGACY(simpleread, TT_ISOLATED),

View File

@ -217,7 +217,8 @@ basic_test_cleanup(const struct testcase_t *testcase, void *ptr)
}
if (testcase->flags & TT_NEED_BASE) {
event_base_free(data->base);
if (data->base)
event_base_free(data->base);
}
if (testcase->flags & TT_NEED_DNS) {

View File

@ -35,8 +35,8 @@ static void
set_random_timeout(struct event *ev)
{
ev->ev_timeout.tv_sec = rand();
ev->ev_timeout.tv_usec = rand();
ev->min_heap_idx = -1;
ev->ev_timeout.tv_usec = rand() & 0xfffff;
ev->ev_timeout_pos.min_heap_idx = -1;
}
static void