diff --git a/.gitignore b/.gitignore index 09cc4311..9ecd8cd0 100644 --- a/.gitignore +++ b/.gitignore @@ -68,6 +68,7 @@ libevent.pc /test/regress.gen.h /test/test-eof /test/test-init +/test/test-ratelim /test/test-time /test/test-weof diff --git a/Makefile.am b/Makefile.am index 08fb0fdd..52b15023 100644 --- a/Makefile.am +++ b/Makefile.am @@ -105,7 +105,7 @@ event-config.h: config.h CORE_SRC = event.c evthread.c buffer.c \ bufferevent.c bufferevent_sock.c bufferevent_filter.c \ - bufferevent_pair.c listener.c \ + bufferevent_pair.c listener.c bufferevent_ratelim.c \ evmap.c log.c evutil.c strlcpy.c $(SYS_SRC) EXTRA_SRC = event_tagging.c http.c evdns.c evrpc.c @@ -136,7 +136,8 @@ noinst_HEADERS = util-internal.h mm-internal.h ipv6-internal.h \ evrpc-internal.h strlcpy-internal.h evbuffer-internal.h \ bufferevent-internal.h http-internal.h event-internal.h \ evthread-internal.h ht-internal.h defer-internal.h \ - minheap-internal.h log-internal.h evsignal-internal.h evmap-internal.h + minheap-internal.h log-internal.h evsignal-internal.h evmap-internal.h \ + ratelim-internal.h include_HEADERS = event.h evhttp.h evdns.h evrpc.h evutil.h diff --git a/bufferevent-internal.h b/bufferevent-internal.h index c3c33eca..9594784a 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -35,6 +35,7 @@ extern "C" { #include "defer-internal.h" #include "evthread-internal.h" #include "event2/thread.h" +#include "ratelim-internal.h" /* These flags are reasons that we might be declining to actually enable reading or writing on a bufferevent. @@ -43,22 +44,74 @@ extern "C" { /* On a all bufferevents, for reading: used when we have read up to the watermark value. - On a filtering bufferxevent, for writing: used when the underlying + On a filtering bufferevent, for writing: used when the underlying bufferevent's write buffer has been filled up to its watermark value. */ #define BEV_SUSPEND_WM 0x01 -/* On a base bufferevent: when we have used up our bandwidth buckets. */ +/* On a base bufferevent: when we have emptied a bandwidth buckets */ #define BEV_SUSPEND_BW 0x02 -/* On a socket bufferevent: we aren't going to try reading until the - * connect operation is done. */ -#define BEV_SUSPEND_CONNECTING 0x04 +/* On a base bufferevent: when we have emptied the group's bandwidth bucket. */ +#define BEV_SUSPEND_BW_GROUP 0x04 -struct token_bucket { - ev_uint32_t limit; - ev_uint32_t rate; - ev_uint32_t burst; - unsigned last_updated; +struct bufferevent_rate_limit_group { + /** List of all members in the group */ + TAILQ_HEAD(rlim_group_member_list, bufferevent_private) members; + /** Current limits for the group. */ + struct ev_token_bucket rate_limit; + struct ev_token_bucket_cfg rate_limit_cfg; + + /** True iff we don't want to read from any member of the group.until + * the token bucket refills. */ + unsigned read_suspended : 1; + /** True iff we don't want to write from any member of the group.until + * the token bucket refills. */ + unsigned write_suspended : 1; + /** True iff we were unable to suspend one of the bufferevents in the + * group for reading the last time we tried, and we should try + * again. */ + unsigned pending_unsuspend_read : 1; + /** True iff we were unable to suspend one of the bufferevents in the + * group for writing the last time we tried, and we should try + * again. */ + unsigned pending_unsuspend_write : 1; + + /** The number of bufferevents in the group. */ + int n_members; + + /** The smallest number of bytes that any member of the group should + * be limited to read or write at a time. */ + ev_uint32_t min_share; + /** Timeout event that goes off once a tick, when the bucket is ready + * to refill. */ + struct event master_refill_event; + /** Lock to protect the members of this group. This lock should nest + * within every bufferevent lock: if you are holding this lock, do + * not assume you can lock another bufferevent. */ + void *lock; +}; + +/** Fields for rate-limiting a single bufferevent. */ +struct bufferevent_rate_limit { + /* Linked-list elements for storing this bufferevent_private in a + * group. + * + * Note that this field is supposed to be protected by the group + * lock */ + TAILQ_ENTRY(bufferevent_private) next_in_group; + /** The rate-limiting group for this bufferevent, or NULL if it is + * only rate-limited on its own. */ + struct bufferevent_rate_limit_group *group; + + /* This bufferevent's current limits. */ + struct ev_token_bucket limit; + /* Pointer to the rate-limit configuration for this bufferevent. + * Can be shared. XXX reference-count this? */ + struct ev_token_bucket_cfg *cfg; + + /* Timeout event used when one this bufferevent's buckets are + * empty. */ + struct event refill_bucket_event; }; /** Parts of the bufferevent structure that are shared among all bufferevent @@ -111,6 +164,9 @@ struct bufferevent_private { /** Lock for this bufferevent. Shared by the inbuf and the outbuf. * If NULL, locking is disabled. */ void *lock; + + /** Rate-limiting information for this bufferevent */ + struct bufferevent_rate_limit *rate_limiting; }; /** Possible operations for a control callback. */ @@ -170,6 +226,7 @@ struct bufferevent_ops { /** Called to access miscellaneous fields. */ int (*ctrl)(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); + }; extern const struct bufferevent_ops bufferevent_ops_socket; @@ -287,6 +344,15 @@ void _bufferevent_generic_adj_timeouts(struct bufferevent *bev); EVLOCK_UNLOCK(locking->lock, 0); \ } while(0) +/* ==== For rate-limiting. */ + +int _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, + int bytes); +int _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, + int bytes); +int _bufferevent_get_read_max(struct bufferevent_private *bev); +int _bufferevent_get_write_max(struct bufferevent_private *bev); + #ifdef __cplusplus } #endif diff --git a/bufferevent.c b/bufferevent.c index c753a867..6e6c4187 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -531,6 +531,15 @@ _bufferevent_decref_and_unlock(struct bufferevent *bufev) evbuffer_free(bufev->input); evbuffer_free(bufev->output); + if (bufev_private->rate_limiting) { + if (bufev_private->rate_limiting->group) + bufferevent_remove_from_rate_limit_group(bufev); + if (event_initialized(&bufev_private->rate_limiting->refill_bucket_event)) + event_del(&bufev_private->rate_limiting->refill_bucket_event); + mm_free(bufev_private->rate_limiting); + bufev_private->rate_limiting = NULL; + } + BEV_UNLOCK(bufev); if (bufev_private->own_lock) EVTHREAD_FREE_LOCK(bufev_private->lock, diff --git a/bufferevent_async.c b/bufferevent_async.c index 253a972b..3d21d4c1 100644 --- a/bufferevent_async.c +++ b/bufferevent_async.c @@ -127,6 +127,8 @@ upcast_write(struct event_overlapped *eo) static void bev_async_consider_writing(struct bufferevent_async *b) { + size_t at_most; + int limit; /* Don't write if there's a write in progress, or we do not * want to write. */ if (!b->ok || b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE)) @@ -135,8 +137,18 @@ bev_async_consider_writing(struct bufferevent_async *b) if (!evbuffer_get_length(b->bev.bev.output)) return; + at_most = evbuffer_get_length(b->bev.bev.output); + + /* XXXX This over-commits. */ + limit = _bufferevent_get_write_max(&b->bev); + if (at_most >= limit) + at_most = limit; + + if (b->bev.write_suspended) + return; + /* XXXX doesn't respect low-water mark very well. */ - if (evbuffer_launch_write(b->bev.bev.output, -1, + if (evbuffer_launch_write(b->bev.bev.output, at_most, &b->write_overlapped)) { EVUTIL_ASSERT(0);/* XXX act sensibly. */ } else { @@ -150,6 +162,7 @@ bev_async_consider_reading(struct bufferevent_async *b) size_t cur_size; size_t read_high; size_t at_most; + int limit; /* Don't read if there is a read in progress, or we do not * want to read. */ if (!b->ok || b->read_in_progress || !(b->bev.bev.enabled&EV_READ)) @@ -166,6 +179,14 @@ bev_async_consider_reading(struct bufferevent_async *b) at_most = 16384; /* FIXME totally magic. */ } + /* XXXX This over-commits. */ + limit = _bufferevent_get_read_max(&b->bev); + if (at_most >= limit) + at_most = limit; + + if (b->bev.read_suspended) + return; + if (evbuffer_launch_read(b->bev.bev.input, at_most, &b->read_overlapped)) { EVUTIL_ASSERT(0); @@ -304,6 +325,7 @@ read_complete(struct event_overlapped *eo, uintptr_t key, if (ok && nbytes) { BEV_RESET_GENERIC_READ_TIMEOUT(bev); + _bufferevent_decrement_read_buckets(&bev_a->bev, nbytes); if (evbuffer_get_length(bev->input) >= bev->wm_read.low) _bufferevent_run_readcb(bev); bev_async_consider_reading(bev_a); @@ -336,6 +358,7 @@ write_complete(struct event_overlapped *eo, uintptr_t key, if (ok && nbytes) { BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); + _bufferevent_decrement_write_buckets(&bev_a->bev, nbytes); if (evbuffer_get_length(bev->output) <= bev->wm_write.low) _bufferevent_run_writecb(bev); bev_async_consider_writing(bev_a); diff --git a/bufferevent_openssl.c b/bufferevent_openssl.c index 26a701c0..26be4ff1 100644 --- a/bufferevent_openssl.c +++ b/bufferevent_openssl.c @@ -524,20 +524,29 @@ do_read(struct bufferevent_openssl *bev_ssl, int n_to_read) /* Requires lock */ struct bufferevent *bev = &bev_ssl->bev.bev; struct evbuffer *input = bev->input; - int r, n, i, n_used = 0, blocked = 0; + int r, n, i, n_used = 0, blocked = 0, atmost; struct evbuffer_iovec space[2]; + atmost = _bufferevent_get_read_max(&bev_ssl->bev); + if (n_to_read > atmost) + n_to_read = atmost; + n = evbuffer_reserve_space(input, n_to_read, space, 2); if (n < 0) return -1; for (i=0; ibev.read_suspended) + break; r = SSL_read(bev_ssl->ssl, space[i].iov_base, space[i].iov_len); if (r>0) { if (bev_ssl->read_blocked_on_write) clear_rbow(bev_ssl); ++n_used; space[i].iov_len = r; + /* Not exactly right; we probably want to do + * our rate-limiting on the underlying bytes. */ + _bufferevent_decrement_read_buckets(&bev_ssl->bev, r); } else { int err = SSL_get_error(bev_ssl->ssl, r); print_err(err); @@ -584,6 +593,8 @@ do_write(struct bufferevent_openssl *bev_ssl, int atmost) if (bev_ssl->last_write > 0) atmost = bev_ssl->last_write; + else + atmost = _bufferevent_get_write_max(&bev_ssl->bev); n = evbuffer_peek(output, atmost, NULL, space, 8); if (n < 0) @@ -592,6 +603,9 @@ do_write(struct bufferevent_openssl *bev_ssl, int atmost) if (n > 8) n = 8; for (i=0; i < n; ++i) { + if (bev_ssl->bev.write_suspended) + break; + r = SSL_write(bev_ssl->ssl, space[i].iov_base, space[i].iov_len); if (r > 0) { @@ -599,6 +613,9 @@ do_write(struct bufferevent_openssl *bev_ssl, int atmost) clear_wbor(bev_ssl); n_written += r; bev_ssl->last_write = -1; + /* Not exactly right; we probably want to do + * our rate-limiting on the underlying bytes. */ + _bufferevent_decrement_write_buckets(&bev_ssl->bev, r); } else { int err = SSL_get_error(bev_ssl->ssl, r); print_err(err); @@ -659,6 +676,7 @@ consider_reading(struct bufferevent_openssl *bev_ssl) if (bev_ssl->write_blocked_on_read) return; while ((bev_ssl->bev.bev.enabled & EV_READ) && + (! bev_ssl->bev.read_suspended) && (! wm->high || evbuffer_get_length(input) < wm->high)) { int n_to_read = wm->high ? wm->high - evbuffer_get_length(input) @@ -689,6 +707,7 @@ consider_writing(struct bufferevent_openssl *bev_ssl) wm = &bev_ssl->underlying->wm_write; } while ((bev_ssl->bev.bev.enabled & EV_WRITE) && + (! bev_ssl->bev.write_suspended) && evbuffer_get_length(output) && (!target || (! wm->high || evbuffer_get_length(target) < wm->high))) { int n_to_write; diff --git a/bufferevent_ratelim.c b/bufferevent_ratelim.c new file mode 100644 index 00000000..968168ad --- /dev/null +++ b/bufferevent_ratelim.c @@ -0,0 +1,654 @@ +/* + * Copyright (c) 2007-2009 Niels Provos and Nick Mathewson + * Copyright (c) 2002-2006 Niels Provos + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include + +#include "event2/event.h" +#include "event2/event_struct.h" +#include "event2/util.h" +#include "event2/bufferevent.h" +#include "event2/bufferevent_struct.h" +#include "event2/buffer.h" + +#include "ratelim-internal.h" + +#include "bufferevent-internal.h" +#include "mm-internal.h" +#include "util-internal.h" + +int +ev_token_bucket_init(struct ev_token_bucket *bucket, + const struct ev_token_bucket_cfg *cfg, + ev_uint32_t current_tick, + int reinitialize) +{ + if (reinitialize) { + /* on reinitialization, we only clip downwards, since we've + already used who-knows-how-much bandwidth this tick. We + leave "last_updated" as it is; the next update will add the + appropriate amount of bandwidth to the bucket. + */ + if (bucket->read_limit > cfg->read_maximum) + bucket->read_limit = cfg->read_maximum; + if (bucket->write_limit > cfg->write_maximum) + bucket->write_limit = cfg->write_maximum; + } else { + bucket->read_limit = cfg->read_rate; + bucket->write_limit = cfg->write_rate; + bucket->last_updated = current_tick; + } + return 0; +} + +int +ev_token_bucket_update(struct ev_token_bucket *bucket, + const struct ev_token_bucket_cfg *cfg, + ev_uint32_t current_tick) +{ + /* It's okay if the tick number overflows, since we'll just + * wrap around when we do the unsigned substraction. */ + unsigned n_ticks = current_tick - bucket->last_updated; + + /* Make sure some ticks actually happened, and that time didn't + * roll back. */ + if (n_ticks == 0 || n_ticks > INT_MAX) + return 0; + + /* Naively, we would say + bucket->limit += n_ticks * cfg->rate; + + if (bucket->limit > cfg->maximum) + bucket->limit = cfg->maximum; + + But we're worried about overflow, so we do it like this: + */ + + if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate) + bucket->read_limit = cfg->read_maximum; + else + bucket->read_limit += n_ticks * cfg->read_rate; + + + if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate) + bucket->write_limit = cfg->write_maximum; + else + bucket->write_limit += n_ticks * cfg->write_rate; + + + bucket->last_updated = current_tick; + + return 1; +} + +ev_uint32_t +ev_token_bucket_get_tick(const struct timeval *tv, + const struct ev_token_bucket_cfg *cfg) +{ + /* This computation uses two multiplies and a divide. We could do + * fewer if we knew that the tick length was an integer number of + * seconds, or if we knew it divided evenly into a second. We should + * investigate that more. + */ + + /* We cast to an ev_uint64_t first, since we don't want to overflow + * before we do the final divide. */ + ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000; + return (unsigned)(msec / cfg->msec_per_tick); +} + +struct ev_token_bucket_cfg * +ev_token_bucket_cfg_new(ev_uint32_t read_rate, ev_uint32_t read_burst, + ev_uint32_t write_rate, ev_uint32_t write_burst, + const struct timeval *tick_len) +{ + struct ev_token_bucket_cfg *r; + struct timeval g; + if (! tick_len) { + g.tv_sec = 1; + g.tv_usec = 0; + tick_len = &g; + } + if (read_rate > read_burst || write_rate > write_burst || + read_rate < 1 || write_rate < 1) + return NULL; + r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg)); + if (!r) + return NULL; + r->read_rate = read_rate; + r->write_rate = write_rate; + r->read_maximum = read_burst; + r->write_maximum = write_burst; + memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval)); + r->msec_per_tick = (tick_len->tv_sec * 1000) + tick_len->tv_usec/1000; + return r; +} + +void +ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg) +{ + mm_free(cfg); +} + +/* No matter how big our bucket gets, don't try to read more than this + * much in a single read operation. */ +#define MAX_TO_READ_EVER 16384 +/* No matter how big our bucket gets, don't try to write more than this + * much in a single write operation. */ +#define MAX_TO_WRITE_EVER 16384 + +#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0) +#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0) + +static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g); +static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g); + +/** Helper: figure out the maximum amount we should write if is_write, or + the maximum amount we should read if is_read. Return that maximum, or + 0 if our bucket is wholly exhausted. + */ +static inline int +_bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write) +{ + /* needs lock on bev. */ + int max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER; + struct timeval now; + +#define LIM(x) \ + (is_write ? (x).write_limit : (x).read_limit) + +#define GROUP_SUSPENDED(g) \ + (is_write ? (g)->write_suspended : (g)->read_suspended) + + /* Sets max_so_far to MIN(x, max_so_far) */ +#define CLAMPTO(x) \ + do { \ + if (max_so_far > (x)) \ + max_so_far = (x); \ + } while (0); + + if (!bev->rate_limiting) + return max_so_far; + + /* If rate-limiting is enabled at all, update the appropriate + bucket, and take the smaller of our rate limit and the group + rate limit. + */ + + if (bev->rate_limiting->cfg) { + unsigned tick; + + event_base_gettimeofday_cached(bev->bev.ev_base, &now); + tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg); + ev_token_bucket_update(&bev->rate_limiting->limit, + bev->rate_limiting->cfg, tick); + max_so_far = LIM(bev->rate_limiting->limit); + } + if (bev->rate_limiting->group) { + struct bufferevent_rate_limit_group *g = + bev->rate_limiting->group; + ev_uint32_t share; + LOCK_GROUP(g); + if (GROUP_SUSPENDED(g)) { + /* We can get here if we failed to lock this + * particular bufferevent while suspending the whole + * group. */ + if (is_write) + bufferevent_suspend_write(&bev->bev, + BEV_SUSPEND_BW_GROUP); + else + bufferevent_suspend_read(&bev->bev, + BEV_SUSPEND_BW_GROUP); + share = 0; + } else { + /* XXXX probably we should divide among the active + * members, not the total members. */ + share = LIM(g->rate_limit) / g->n_members; + if (share < g->min_share) + share = g->min_share; + } + UNLOCK_GROUP(g); + CLAMPTO(share); + } + + return max_so_far; +} + +int +_bufferevent_get_read_max(struct bufferevent_private *bev) +{ + return _bufferevent_get_rlim_max(bev, 0); +} + +int +_bufferevent_get_write_max(struct bufferevent_private *bev) +{ + return _bufferevent_get_rlim_max(bev, 1); +} + +int +_bufferevent_decrement_read_buckets(struct bufferevent_private *bev, int bytes) +{ + /* need to hold lock on bev */ + if (!bev->rate_limiting) + return 0; + + if (bev->rate_limiting->cfg) { + bev->rate_limiting->limit.read_limit -= bytes; + if (bev->rate_limiting->limit.read_limit <= 0) { + bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW); + event_add(&bev->rate_limiting->refill_bucket_event, + &bev->rate_limiting->cfg->tick_timeout); + } + } + + if (bev->rate_limiting->group) { + LOCK_GROUP(bev->rate_limiting->group); + bev->rate_limiting->group->rate_limit.read_limit -= bytes; + if (bev->rate_limiting->group->rate_limit.read_limit <= 0) { + _bev_group_suspend_reading(bev->rate_limiting->group); + } + UNLOCK_GROUP(bev->rate_limiting->group); + } + + return 0; +} + +int +_bufferevent_decrement_write_buckets(struct bufferevent_private *bev, int bytes) +{ + /* need to hold lock */ + if (!bev->rate_limiting) + return 0; + + if (bev->rate_limiting->cfg) { + bev->rate_limiting->limit.write_limit -= bytes; + if (bev->rate_limiting->limit.write_limit <= 0) { + bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW); + event_add(&bev->rate_limiting->refill_bucket_event, + &bev->rate_limiting->cfg->tick_timeout); + } + } + + if (bev->rate_limiting->group) { + LOCK_GROUP(bev->rate_limiting->group); + bev->rate_limiting->group->rate_limit.write_limit -= bytes; + if (bev->rate_limiting->group->rate_limit.write_limit <= 0) { + _bev_group_suspend_writing(bev->rate_limiting->group); + } + UNLOCK_GROUP(bev->rate_limiting->group); + } + + return 0; +} + +/** Stop reading on every bufferevent in g */ +static int +_bev_group_suspend_reading(struct bufferevent_rate_limit_group *g) +{ + /* Needs group lock */ + struct bufferevent_private *bev; + g->read_suspended = 1; + g->pending_unsuspend_read = 0; + + /* Note that in this loop we call EVLOCK_TRY_LOCK instead of BEV_LOCK, + to prevent a deadlock. (Ordinarily, the group lock nests inside + the bufferevent locks. If we are unable to lock any individual + bufferevent, it will find out later when it looks at its limit + and sees that its group is suspended. + */ + TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) { + if (EVLOCK_TRY_LOCK(bev->lock)) { + bufferevent_suspend_read(&bev->bev, + BEV_SUSPEND_BW_GROUP); + EVLOCK_UNLOCK(bev->lock, 0); + } + } + return 0; +} + +/** Stop writing on every bufferevent in g */ +static int +_bev_group_suspend_writing(struct bufferevent_rate_limit_group *g) +{ + /* Needs group lock */ + struct bufferevent_private *bev; + g->write_suspended = 1; + g->pending_unsuspend_write = 0; + TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) { + if (EVLOCK_TRY_LOCK(bev->lock)) { + bufferevent_suspend_write(&bev->bev, + BEV_SUSPEND_BW_GROUP); + EVLOCK_UNLOCK(bev->lock, 0); + } + } + return 0; +} + +/** Timer callback invoked on a single bufferevent with one or more exhausted + buckets when they are ready to refill. */ +static void +_bev_refill_callback(evutil_socket_t fd, short what, void *arg) +{ + unsigned tick; + struct timeval now; + struct bufferevent_private *bev = arg; + int again = 0; + BEV_LOCK(&bev->bev); + if (!bev->rate_limiting || !bev->rate_limiting->cfg) { + BEV_UNLOCK(&bev->bev); + return; + } + + /* First, update the bucket */ + event_base_gettimeofday_cached(bev->bev.ev_base, &now); + tick = ev_token_bucket_get_tick(&now, + bev->rate_limiting->cfg); + ev_token_bucket_update(&bev->rate_limiting->limit, + bev->rate_limiting->cfg, + tick); + + /* Now unsuspend any read/write operations as appropriate. */ + if ((bev->read_suspended & BEV_SUSPEND_BW)) { + if (bev->rate_limiting->limit.read_limit > 0) + bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW); + else + again = 1; + } + if ((bev->write_suspended & BEV_SUSPEND_BW)) { + if (bev->rate_limiting->limit.write_limit > 0) + bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW); + else + again = 1; + } + if (again) { + /* One or more of the buckets may need another refill if they + started negative. + + XXXX if we need to be quiet for more ticks, we should + maybe figure out what timeout we really want. + */ + event_add(&bev->rate_limiting->refill_bucket_event, + &bev->rate_limiting->cfg->tick_timeout); + } + BEV_UNLOCK(&bev->bev); +} + +/** Helper: grab a random element from a bufferevent group. */ +static struct bufferevent_private * +_bev_group_random_element(struct bufferevent_rate_limit_group *group) +{ + int which; + struct bufferevent_private *bev; + + /* requires group lock */ + + if (!group->n_members) + return NULL; + + EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members)); + + which = _evutil_weakrand() % group->n_members; + + bev = TAILQ_FIRST(&group->members); + while (which--) + bev = TAILQ_NEXT(bev, rate_limiting->next_in_group); + + return bev; +} + +/** Iterate over the elements of a rate-limiting group 'g' with a random + starting point, assigning each to the variable 'bev', and executing the + block 'block'. + + We do this in a half-baked effort to get fairness among group members. + XXX Round-robin or some kind of priority queue would be even more fair. + */ +#define FOREACH_RANDOM_ORDER(block) \ + do { \ + first = _bev_group_random_element(g); \ + for (bev = first; bev != TAILQ_END(&g->members); \ + bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \ + block ; \ + } \ + for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \ + bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \ + block ; \ + } \ + } while (0) + +/** Callback invoked every tick to add more elements to the group bucket + and unsuspend group members as needed. + */ +static void +_bev_group_refill_callback(evutil_socket_t fd, short what, void *arg) +{ + struct bufferevent_rate_limit_group *g = arg; + unsigned tick; + struct timeval now; + int again = 0; + struct bufferevent_private *bev, *first; + + event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now); + + LOCK_GROUP(g); + tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg); + ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick); + + if (g->pending_unsuspend_read || + (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) { + g->read_suspended = 0; + again = 0; + FOREACH_RANDOM_ORDER({ + if (EVLOCK_TRY_LOCK(bev->lock)) { + bufferevent_unsuspend_read(&bev->bev, + BEV_SUSPEND_BW_GROUP); + EVLOCK_UNLOCK(bev->lock, 0); + } else { + again = 1; + } + }); + g->pending_unsuspend_read = again; + } + if (g->pending_unsuspend_write || + (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){ + g->write_suspended = 0; + again = 0; + FOREACH_RANDOM_ORDER({ + if (EVLOCK_TRY_LOCK(bev->lock)) { + bufferevent_unsuspend_write(&bev->bev, + BEV_SUSPEND_BW_GROUP); + EVLOCK_UNLOCK(bev->lock, 0); + } else { + again = 1; + } + }); + g->pending_unsuspend_write = again; + } + + /* XXXX Rather than waiting to the next tick to unsuspend stuff + * with pending_unsuspend_write/read, we should do it on the + * next iteration of the mainloop. + */ + + UNLOCK_GROUP(g); +} + +int +bufferevent_set_rate_limit(struct bufferevent *bev, + struct ev_token_bucket_cfg *cfg) +{ + struct bufferevent_private *bevp = + EVUTIL_UPCAST(bev, struct bufferevent_private, bev); + int r = -1; + struct bufferevent_rate_limit *rlim; + struct timeval now; + ev_uint32_t tick; + /* XXX reference-count cfg */ + + BEV_LOCK(bev); + + if (cfg == NULL) { + if (bevp->rate_limiting) { + bevp->rate_limiting->cfg = NULL; + bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW); + bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW); + } + r = 0; + goto done; + } + + event_base_gettimeofday_cached(bev->ev_base, &now); + tick = ev_token_bucket_get_tick(&now, cfg); + + if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) { + ; + } else if (bevp->rate_limiting) { + bevp->rate_limiting->cfg = cfg; + ev_token_bucket_init(&bevp->rate_limiting->limit, cfg, tick, 1); + if (bevp->rate_limiting->limit.read_limit > 0) + bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW); + else + bufferevent_suspend_read(bev, BEV_SUSPEND_BW); + if (bevp->rate_limiting->limit.write_limit > 0) + bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW); + else + bufferevent_suspend_write(bev, BEV_SUSPEND_BW); + } else { + rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); + if (!rlim) + goto done; + rlim->cfg = cfg; + ev_token_bucket_init(&rlim->limit, cfg, tick, 0); + evtimer_assign(&rlim->refill_bucket_event, bev->ev_base, + _bev_refill_callback, bevp); + bevp->rate_limiting = rlim; + } + r = 0; +done: + BEV_UNLOCK(bev); + return r; +} + +struct bufferevent_rate_limit_group * +bufferevent_rate_limit_group_new(struct event_base *base, + const struct ev_token_bucket_cfg *cfg) +{ + struct bufferevent_rate_limit_group *g; + struct timeval now; + ev_uint32_t tick; + + event_base_gettimeofday_cached(base, &now); + tick = ev_token_bucket_get_tick(&now, cfg); + + g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group)); + if (!g) + return NULL; + memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); + TAILQ_INIT(&g->members); + + ev_token_bucket_init(&g->rate_limit, cfg, tick, 0); + + g->min_share = 64; + event_assign(&g->master_refill_event, base, -1, EV_PERSIST, + _bev_group_refill_callback, g); + event_add(&g->master_refill_event, &cfg->tick_timeout); + + EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); + return g; +} + +int +bufferevent_add_to_rate_limit_group(struct bufferevent *bev, + struct bufferevent_rate_limit_group *g) +{ + int wsuspend, rsuspend; + struct bufferevent_private *bevp = + EVUTIL_UPCAST(bev, struct bufferevent_private, bev); + BEV_LOCK(bev); + + if (!bevp->rate_limiting) { + struct bufferevent_rate_limit *rlim; + rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); + if (!rlim) { + BEV_UNLOCK(bev); + return -1; + } + evtimer_assign(&rlim->refill_bucket_event, bev->ev_base, + _bev_refill_callback, bevp); + bevp->rate_limiting = rlim; + } + + if (bevp->rate_limiting->group == g) { + BEV_UNLOCK(bev); + return 0; + } + if (bevp->rate_limiting->group) + bufferevent_remove_from_rate_limit_group(bev); + + LOCK_GROUP(g); + bevp->rate_limiting->group = g; + ++g->n_members; + TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group); + + rsuspend = g->read_suspended; + wsuspend = g->write_suspended; + + UNLOCK_GROUP(g); + + if (rsuspend) + bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP); + if (wsuspend) + bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP); + + BEV_UNLOCK(bev); + return 0; +} + +int +bufferevent_remove_from_rate_limit_group(struct bufferevent *bev) +{ + struct bufferevent_private *bevp = + EVUTIL_UPCAST(bev, struct bufferevent_private, bev); + BEV_LOCK(bev); + if (bevp->rate_limiting && bevp->rate_limiting->group) { + struct bufferevent_rate_limit_group *g = + bevp->rate_limiting->group; + LOCK_GROUP(g); + bevp->rate_limiting->group = NULL; + --g->n_members; + TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group); + UNLOCK_GROUP(g); + } + bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP); + bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP); + BEV_UNLOCK(bev); + return 0; +} diff --git a/bufferevent_sock.c b/bufferevent_sock.c index 617c9110..30bc44bc 100644 --- a/bufferevent_sock.c +++ b/bufferevent_sock.c @@ -118,10 +118,12 @@ static void bufferevent_readcb(evutil_socket_t fd, short event, void *arg) { struct bufferevent *bufev = arg; + struct bufferevent_private *bufev_p = + EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); struct evbuffer *input; int res = 0; short what = BEV_EVENT_READING; - int howmuch = -1; + int howmuch = -1, readmax=-1; _bufferevent_incref_and_lock(bufev); @@ -144,6 +146,12 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg) goto done; } } + readmax = _bufferevent_get_read_max(bufev_p); + if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited" + * uglifies this code. */ + howmuch = readmax; + if (bufev_p->read_suspended) + goto done; evbuffer_unfreeze(input, 0); res = evbuffer_read(input, fd, howmuch); @@ -163,6 +171,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg) if (res <= 0) goto error; + _bufferevent_decrement_read_buckets(bufev_p, res); /* Invoke the user callback - must always be called last */ if (evbuffer_get_length(input) >= bufev->wm_read.low) @@ -190,6 +199,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) int res = 0; short what = BEV_EVENT_WRITING; int connected = 0; + int atmost = -1; _bufferevent_incref_and_lock(bufev); @@ -211,7 +221,6 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) goto done; } else { connected = 1; - bufferevent_unsuspend_read(bufev, BEV_SUSPEND_CONNECTING); #ifdef WIN32 if (BEV_IS_ASYNC(bufev)) { event_del(&bufev->ev_write); @@ -231,9 +240,14 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) } } + atmost = _bufferevent_get_write_max(bufev_p); + + if (bufev_p->write_suspended) + goto done; + if (evbuffer_get_length(bufev->output)) { evbuffer_unfreeze(bufev->output, 1); - res = evbuffer_write(bufev->output, fd); + res = evbuffer_write_atmost(bufev->output, fd, atmost); evbuffer_freeze(bufev->output, 1); if (res == -1) { int err = evutil_socket_geterror(fd); @@ -249,6 +263,8 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) } if (res <= 0) goto error; + + _bufferevent_decrement_write_buckets(bufev_p, res); } if (evbuffer_get_length(bufev->output) == 0) @@ -390,11 +406,7 @@ freesock: if (ownfd) EVUTIL_CLOSESOCKET(fd); /* do something about the error? */ - done: - if (result == 0) - bufferevent_suspend_read(bev, BEV_SUSPEND_CONNECTING); - _bufferevent_decref_and_unlock(bev); return result; } @@ -557,6 +569,10 @@ be_socket_setfd(struct bufferevent *bufev, evutil_socket_t fd) EV_READ|EV_PERSIST, bufferevent_readcb, bufev); event_assign(&bufev->ev_write, bufev->ev_base, fd, EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev); + + if (fd >= 0) + bufferevent_enable(bufev, bufev->enabled); + BEV_UNLOCK(bufev); } diff --git a/event.c b/event.c index aa8d5ce5..2c830cee 100644 --- a/event.c +++ b/event.c @@ -180,6 +180,22 @@ gettime(struct event_base *base, struct timeval *tp) return (evutil_gettimeofday(tp, NULL)); } +int +event_base_gettimeofday_cached(struct event_base *base, struct timeval *tv) +{ + int r; + if (!base) { + base = current_base; + if (!current_base) + return evutil_gettimeofday(tv, NULL); + } + + EVBASE_ACQUIRE_LOCK(base, th_base_lock); + r = gettime(base, tv); + EVBASE_RELEASE_LOCK(base, th_base_lock); + return r; +} + static inline void clear_time_cache(struct event_base *base) { diff --git a/evutil.c b/evutil.c index 78e18d42..8caf2825 100644 --- a/evutil.c +++ b/evutil.c @@ -1776,3 +1776,14 @@ evutil_getenv(const char *varname) return getenv(varname); } + +long +_evutil_weakrand(void) +{ +#ifdef WIN32 + return rand(); +#else + return random(); +#endif +} + diff --git a/include/event2/bufferevent.h b/include/event2/bufferevent.h index 523a7fce..48d56bd6 100644 --- a/include/event2/bufferevent.h +++ b/include/event2/bufferevent.h @@ -499,6 +499,97 @@ int bufferevent_pair_new(struct event_base *base, int options, struct bufferevent *pair[2]); + +/** + Abstract type used to configure rate-limiting on a bufferevent or a group + of bufferevents. + */ +struct ev_token_bucket_cfg; +/** + A group of bufferevents which are configured to respect the same rate + limit. +*/ +struct bufferevent_rate_limit_group; + +/** + Initialize and return a new object to configure the rate-limiting behavior + of bufferevents. + + @param read_rate The maximum number of bytes to read per tick on + average. + @param read_burst The maximum number of bytes to read in any single tick. + @param write_rate The maximum number of bytes to write per tick on + average. + @param write_burst The maximum number of bytes to write in any single tick. + @param tick_len The length of a single tick. Defaults to one second. + Any fractions of a millisecond are ignored. + + Note that all rate-limits hare are currently best-effort: future versions + of Libevent may implement them more tightly. + */ +struct ev_token_bucket_cfg *ev_token_bucket_cfg_new( + ev_uint32_t read_rate, ev_uint32_t read_burst, + ev_uint32_t write_rate, ev_uint32_t write_burst, + const struct timeval *tick_len); + +/** Free all storage held in 'cfg'. + + Note: 'cfg' is not currently reference-counted; it is not safe to free it + until no bufferevent is using it. + */ +void ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg); + +/** + Set the rate-limit of a the bufferevent 'bev' to the one specified in + 'cfg'. If 'cfg' is NULL, disable any per-bufferevent rate-limiting on + 'bev'. + + Note that only some bufferevent types currently respect rate-limiting. + They are: socket-based bufferevents (normal and IOCP-based), and SSL-based + bufferevents. + + Return 0 on sucess, -1 on failure. + */ +int bufferevent_set_rate_limit(struct bufferevent *bev, + struct ev_token_bucket_cfg *cfg); +/** + Create a new rate-limit group for bufferevents. A rate-limit group + constrains the maximum number of bytes sent and received, in toto, + by all of its bufferevents. + + @param base An event_base to run any necessary timeouts for the group. + Note that all bufferevents in the group do not necessarily need to share + this event_base. + @param cfg The rate-limit for this group. + + Note that all rate-limits hare are currently best-effort: future versions + of Libevent may implement them more tightly. + + Note also that only some bufferevent types currently respect rate-limiting. + They are: socket-based bufferevents (normal and IOCP-based), and SSL-based + bufferevents. + */ +struct bufferevent_rate_limit_group *bufferevent_rate_limit_group_new( + struct event_base *base, + const struct ev_token_bucket_cfg *cfg); +/*XXX we need a bufferevent_rate_limit_group_set_cfg */ + +/** + Add 'bev' to the list of bufferevents whose aggregate reading and writing + is restricted by 'g'. If 'g' is NULL, remove 'bev' from its current group. + + A bufferevent may belong to no more than one rate-limit group at a time. + If 'bev' is already a member of a group, it will be removed from its old + group before being added to 'g'. + + Return 0 on success and -1 on failure. + */ +int bufferevent_add_to_rate_limit_group(struct bufferevent *bev, + struct bufferevent_rate_limit_group *g); + +/** Remove 'bev' from its current rate-limit group (if any). */ +int bufferevent_remove_from_rate_limit_group(struct bufferevent *bev); + #ifdef __cplusplus } #endif diff --git a/include/event2/event.h b/include/event2/event.h index 6cc83d49..489026a6 100644 --- a/include/event2/event.h +++ b/include/event2/event.h @@ -671,6 +671,20 @@ void event_set_mem_functions(void *(*malloc_fn)(size_t sz), void event_base_dump_events(struct event_base *, FILE *); +/** Sets 'tv' to the current time (as returned by gettimeofday()), + looking at the cached value in 'base' if possible, and calling + gettimeofday() or clock_gettime() as appropriate if there is no + cached time. + + Generally, this value will only be cached while actually + processing event callbacks, and may be very inaccuate if your + callbacks take a long time to execute. + + Returns 0 on success, negative on failure. + */ +int event_base_gettimeofday_cached(struct event_base *base, + struct timeval *tv); + #ifdef __cplusplus } #endif diff --git a/include/event2/util.h b/include/event2/util.h index 0f72b6ed..0e7320e4 100644 --- a/include/event2/util.h +++ b/include/event2/util.h @@ -93,12 +93,16 @@ extern "C" { #ifdef _EVENT_HAVE_UINT32_T #define ev_uint32_t uint32_t +#define ev_int32_t int32_t #elif defined(WIN32) #define ev_uint32_t unsigned int +#define ev_int32_t signed int #elif _EVENT_SIZEOF_LONG == 4 #define ev_uint32_t unsigned long +#define ev_int32_t signed long #elif _EVENT_SIZEOF_INT == 4 #define ev_uint32_t unsigned int +#define ev_int32_t signed int #else #error "No way to define ev_uint32_t" #endif diff --git a/ratelim-internal.h b/ratelim-internal.h new file mode 100644 index 00000000..105798ae --- /dev/null +++ b/ratelim-internal.h @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2009 Niels Provos and Nick Mathewson + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +#ifndef _RATELIM_INTERNAL_H_ +#define _RATELIM_INTERNAL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +/** A token bucket is an internal structure that tracks how many bytes we are + * currently willing to read or write on a given bufferevent or group of + * bufferevents */ +struct ev_token_bucket { + /** How many bytes are we willing to read or write right now? These + * values are signed so that we can do "defecit spending" */ + ev_int32_t read_limit, write_limit; + /** When was this bucket last updated? Measured in abstract 'ticks' + * relative to the token bucket configuration. */ + ev_uint32_t last_updated; +}; + +/** Configuration info for a token bucket or set of token buckets. */ +struct ev_token_bucket_cfg { + /** How many bytes are we willing to read on average per tick? */ + ev_uint32_t read_rate; + /** How many bytes are we willing to read at most in any one tick? */ + ev_uint32_t read_maximum; + /** How many bytes are we willing to write on average per tick? */ + ev_uint32_t write_rate; + /** How many bytes are we willing to write at most in any one tick? */ + ev_uint32_t write_maximum; + + /* How long is a tick? Note that fractions of a millisecond are + * ignored. */ + struct timeval tick_timeout; + + /* How long is a tick, in milliseconds? Derived from tick_timeout. */ + unsigned msec_per_tick; +}; + +/** The current tick is 'current_tick': add bytes to 'bucket' as specified in + * 'cfg'. */ +int ev_token_bucket_update(struct ev_token_bucket *bucket, + const struct ev_token_bucket_cfg *cfg, + ev_uint32_t current_tick); + +/** In which tick does 'tv' fall according to 'cfg'? Note that ticks can + * overflow easily; your code needs to handle this. */ +ev_uint32_t ev_token_bucket_get_tick(const struct timeval *tv, + const struct ev_token_bucket_cfg *cfg); + +/** Adjust 'bucket' to respect 'cfg', and note that it was last updated in + * 'current_tick'. If 'reinitialize' is true, we are changing the + * configuration of 'bucket'; otherwise, we are setting it up for the first + * time. + */ +int ev_token_bucket_init(struct ev_token_bucket *bucket, + const struct ev_token_bucket_cfg *cfg, + ev_uint32_t current_tick, + int reinitialize); + +/** Decrease the read limit of 'b' by 'n' bytes */ +#define ev_token_bucket_decrement_read(b,n) \ + do { \ + (b)->read_limit -= (n); \ + } while (0) +/** Decrease the write limit of 'b' by 'n' bytes */ +#define ev_token_bucket_decrement_write(b,n) \ + do { \ + (b)->write_limit -= (n); \ + } while (0) + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/test/Makefile.am b/test/Makefile.am index a1d3ac70..5d2eab77 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -5,7 +5,7 @@ AM_CFLAGS = -I$(top_srcdir) -I$(top_srcdir)/compat -I$(top_srcdir)/include EXTRA_DIST = regress.rpc regress.gen.h regress.gen.c noinst_PROGRAMS = test-init test-eof test-weof test-time regress \ - bench bench_cascade bench_http bench_httpclient + bench bench_cascade bench_http bench_httpclient test-ratelim noinst_HEADERS = tinytest.h tinytest_macros.h regress.h BUILT_SOURCES = regress.gen.c regress.gen.h @@ -17,6 +17,8 @@ test_weof_SOURCES = test-weof.c test_weof_LDADD = ../libevent_core.la test_time_SOURCES = test-time.c test_time_LDADD = ../libevent_core.la +test_ratelim_SOURCES = test-ratelim.c +test_ratelim_LDADD = ../libevent_core.la -lm regress_SOURCES = regress.c regress_buffer.c regress_http.c regress_dns.c \ regress_rpc.c regress.gen.c regress.gen.h regress_et.c \ diff --git a/test/test-ratelim.c b/test/test-ratelim.c new file mode 100644 index 00000000..59e3ecb1 --- /dev/null +++ b/test/test-ratelim.c @@ -0,0 +1,348 @@ +/* + * Copyright (c) 2009 Niels Provos and Nick Mathewson + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include + +#ifdef WIN32 +#include +#include +#endif + +#include +#include +#include +#include +#include +#include + +static int cfg_verbose = 0; +static int cfg_help = 0; + +static int cfg_n_connections = 30; +static int cfg_duration = 5; +static int cfg_connlimit = 0; +static int cfg_grouplimit = 0; +static int cfg_tick_msec = 1000; + +static struct timeval cfg_tick = { 0, 500*1000 }; + +static struct ev_token_bucket_cfg *conn_bucket_cfg = NULL; +static struct ev_token_bucket_cfg *group_bucket_cfg = NULL; +struct bufferevent_rate_limit_group *ratelim_group = NULL; + +struct client_state { + size_t queued; + ev_uint64_t received; +}; + +static void +loud_writecb(struct bufferevent *bev, void *ctx) +{ + struct client_state *cs = ctx; + struct evbuffer *output = bufferevent_get_output(bev); + char buf[1024]; +#ifdef WIN32 + int r = rand() % 256; +#else + int r = random() % 256; +#endif + memset(buf, r, sizeof(buf)); + while (evbuffer_get_length(output) < 8192) { + evbuffer_add(output, buf, sizeof(buf)); + cs->queued += sizeof(buf); + // printf("queued %d\n", (int)sizeof(buf)); + } +} + +static void +discard_readcb(struct bufferevent *bev, void *ctx) +{ + struct client_state *cs = ctx; + struct evbuffer *input = bufferevent_get_input(bev); + size_t len = evbuffer_get_length(input); + evbuffer_drain(input, len); + cs->received += len; + // printf("read %d bytes\n", (int)len); +} + +static void +write_on_connectedcb(struct bufferevent *bev, short what, void *ctx) +{ + if (what & BEV_EVENT_CONNECTED) { + loud_writecb(bev, ctx); + /* XXXX this shouldn't be needed. */ + bufferevent_enable(bev, EV_READ|EV_WRITE); + } +} + +static void +echo_readcb(struct bufferevent *bev, void *ctx) +{ + struct evbuffer *input = bufferevent_get_input(bev); + struct evbuffer *output = bufferevent_get_output(bev); + + // puts("read."); + evbuffer_add_buffer(output, input); + // printf(" outbuf len is now %d\n", (int)evbuffer_get_length(output)); + if (evbuffer_get_length(output) > 1024000) + bufferevent_disable(bev, EV_READ); +} + +static void +echo_listenercb(struct evconnlistener *listener, evutil_socket_t newsock, + struct sockaddr *sourceaddr, int socklen, void *ctx) +{ + struct event_base *base = ctx; + int flags = BEV_OPT_CLOSE_ON_FREE|BEV_OPT_THREADSAFE; + struct bufferevent *bev; + + bev = bufferevent_socket_new(base, newsock, flags); + bufferevent_setcb(bev, echo_readcb, NULL, NULL, NULL); + if (conn_bucket_cfg) + bufferevent_set_rate_limit(bev, conn_bucket_cfg); + if (ratelim_group) + bufferevent_add_to_rate_limit_group(bev, ratelim_group); + bufferevent_enable(bev, EV_READ|EV_WRITE); +} + +static void +test_ratelimiting(void) +{ + struct event_base *base; + struct sockaddr_in sin; + struct evconnlistener *listener; + + struct sockaddr_storage ss; + ev_socklen_t slen; + + struct bufferevent **bevs; + struct client_state *states; + + int i; + + struct timeval tv; + + ev_uint64_t total_received; + double total_sq_persec, total_persec; + double variance; + + memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */ + sin.sin_port = 0; /* unspecified port */ + + base = event_base_new(); + listener = evconnlistener_new_bind(base, echo_listenercb, base, + LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, -1, + (struct sockaddr *)&sin, sizeof(sin)); + + slen = sizeof(ss); + if (getsockname(evconnlistener_get_fd(listener), (struct sockaddr *)&ss, + &slen) < 0) { + perror("getsockname"); + return; + } + + if (cfg_connlimit > 0) { + conn_bucket_cfg = ev_token_bucket_cfg_new( + cfg_connlimit, cfg_connlimit * 4, + cfg_connlimit, cfg_connlimit * 4, + &cfg_tick); + assert(conn_bucket_cfg); + } + + if (cfg_grouplimit > 0) { + group_bucket_cfg = ev_token_bucket_cfg_new( + cfg_grouplimit, cfg_grouplimit * 4, + cfg_grouplimit, cfg_grouplimit * 4, + &cfg_tick); + ratelim_group = bufferevent_rate_limit_group_new( + base, group_bucket_cfg); + }; + + bevs = calloc(cfg_n_connections, sizeof(struct bufferevent *)); + states = calloc(cfg_n_connections, sizeof(struct client_state)); + + for (i = 0; i < cfg_n_connections; ++i) { + // printf("creating %d:\n",i); + bevs[i] = bufferevent_socket_new(base, -1, + BEV_OPT_CLOSE_ON_FREE|BEV_OPT_THREADSAFE); + assert(bevs[i]); + bufferevent_setcb(bevs[i], discard_readcb, loud_writecb, + write_on_connectedcb, &states[i]); + bufferevent_enable(bevs[i], EV_READ|EV_WRITE); + bufferevent_socket_connect(bevs[i], (struct sockaddr *)&ss, + slen); + } + + tv.tv_sec = cfg_duration; + tv.tv_usec = 0; + + event_base_loopexit(base, &tv); + + event_base_dispatch(base); + + total_received = 0; + total_persec = 0.0; + total_sq_persec = 0.0; + for (i=0; i < cfg_n_connections; ++i) { + double persec = states[i].received; + persec /= cfg_duration; + total_received += states[i].received; + total_persec += persec; + total_sq_persec += persec*persec; + printf("%d: %lf per second\n", i, persec); + } + printf(" total: %lf per second\n", + ((double)total_received)/cfg_duration); + printf(" average: %lf per second\n", + (((double)total_received)/cfg_duration)/cfg_n_connections); + + variance = total_sq_persec/cfg_n_connections - total_persec*total_persec/(cfg_n_connections*cfg_n_connections); + + printf(" stddev: %lf per second\n", sqrt(variance)); +} + +static struct option { + const char *name; int *ptr; int min; int isbool; +} options[] = { + { "-v", &cfg_verbose, 0, 1 }, + { "-h", &cfg_help, 0, 1 }, + { "-n", &cfg_n_connections, 1, 0 }, + { "-d", &cfg_duration, 1, 0 }, + { "-c", &cfg_connlimit, 0, 0 }, + { "-g", &cfg_grouplimit, 0, 0 }, + { "-t", &cfg_tick_msec, 10, 0 }, + { NULL, NULL, -1, 0 }, +}; + +static int +handle_option(int argc, char **argv, int *i, const struct option *opt) +{ + long val; + char *endptr = NULL; + if (opt->isbool) { + *opt->ptr = 1; + return 0; + } + if (*i + 1 == argc) { + fprintf(stderr, "Too few arguments to '%s'\n",argv[*i]); + return -1; + } + val = strtol(argv[*i+1], &endptr, 10); + if (*argv[*i+1] == '\0' || !endptr || *endptr != '\0') { + fprintf(stderr, "Couldn't parse numeric value '%s'\n", + argv[*i+1]); + return -1; + } + if (val < opt->min || val > 0x7fffffff) { + fprintf(stderr, "Value '%s' is out-of-range'\n", + argv[*i+1]); + return -1; + } + *opt->ptr = (int)val; + ++*i; + return 0; +} + +static void +usage(void) +{ + fprintf(stderr, +"test-ratelim [-v] [-n INT] [-d INT] [-c INT] [-g INT] [-t INT]\n\n" +"Pushes bytes through a number of possibly rate-limited connections, and\n" +"displays average throughput.\n\n" +" -n INT: Number of connections to open (default: 30)\n" +" -d INT: Duration of the test in seconds (default: 5 sec)\n" +" -c INT: Connection-rate limit applied to each connection in bytes per second\n" +" (default: None.)\n" +" -g INT: Group-rate limit applied to sum of all usage in bytes per second\n" +" (default: None.)\n" +" -t INT: Granularity of timing, in milliseconds (default: 1000 msec)\n"); +} + +int +main(int argc, char **argv) +{ + int i,j; + double ratio; + +#ifdef WIN32 + WORD wVersionRequested = MAKEWORD(2,2); + WSADATA wsaData; + int err; + + err = WSAStartup(wVersionRequested, &wsaData); +#endif + + + for (i = 1; i < argc; ++i) { + for (j = 0; options[j].name; ++j) { + if (!strcmp(argv[i],options[j].name)) { + if (handle_option(argc,argv,&i,&options[j])<0) + return 1; + goto again; + } + } + fprintf(stderr, "Unknown option '%s'\n", argv[i]); + usage(); + return 1; + again: + ; + } + if (cfg_help) { + usage(); + return 0; + } + + cfg_tick.tv_sec = cfg_tick_msec / 1000; + cfg_tick.tv_usec = (cfg_tick_msec % 1000)*1000; + + ratio = cfg_tick_msec / 1000.0; + + cfg_connlimit *= ratio; + cfg_grouplimit *= ratio; + + { + struct timeval tv; + evutil_gettimeofday(&tv, NULL); +#ifdef WIN32 + srand(tv.tv_usec); +#else + srandom(tv.tv_usec); +#endif + } + + evthread_enable_lock_debuging(); + + test_ratelimiting(); + + return 0; +} diff --git a/util-internal.h b/util-internal.h index 36ec3239..509f7299 100644 --- a/util-internal.h +++ b/util-internal.h @@ -146,6 +146,8 @@ int evutil_resolve(int family, const char *hostname, struct sockaddr *sa, const char *evutil_getenv(const char *name); +long _evutil_weakrand(void); + /* Evaluates to the same boolean value as 'p', and hints to the compiler that * we expect this value to be false. */ #ifdef __GNUC__X