mirror of
https://github.com/cuberite/libevent.git
synced 2025-09-10 04:50:37 -04:00
Merge branch 'ratelimit'
Conflicts: bufferevent_async.c
This commit is contained in:
commit
ba2945f931
1
.gitignore
vendored
1
.gitignore
vendored
@ -68,6 +68,7 @@ libevent.pc
|
||||
/test/regress.gen.h
|
||||
/test/test-eof
|
||||
/test/test-init
|
||||
/test/test-ratelim
|
||||
/test/test-time
|
||||
/test/test-weof
|
||||
|
||||
|
@ -105,7 +105,7 @@ event-config.h: config.h
|
||||
|
||||
CORE_SRC = event.c evthread.c buffer.c \
|
||||
bufferevent.c bufferevent_sock.c bufferevent_filter.c \
|
||||
bufferevent_pair.c listener.c \
|
||||
bufferevent_pair.c listener.c bufferevent_ratelim.c \
|
||||
evmap.c log.c evutil.c strlcpy.c $(SYS_SRC)
|
||||
EXTRA_SRC = event_tagging.c http.c evdns.c evrpc.c
|
||||
|
||||
@ -136,7 +136,8 @@ noinst_HEADERS = util-internal.h mm-internal.h ipv6-internal.h \
|
||||
evrpc-internal.h strlcpy-internal.h evbuffer-internal.h \
|
||||
bufferevent-internal.h http-internal.h event-internal.h \
|
||||
evthread-internal.h ht-internal.h defer-internal.h \
|
||||
minheap-internal.h log-internal.h evsignal-internal.h evmap-internal.h
|
||||
minheap-internal.h log-internal.h evsignal-internal.h evmap-internal.h \
|
||||
ratelim-internal.h
|
||||
|
||||
include_HEADERS = event.h evhttp.h evdns.h evrpc.h evutil.h
|
||||
|
||||
|
@ -35,6 +35,7 @@ extern "C" {
|
||||
#include "defer-internal.h"
|
||||
#include "evthread-internal.h"
|
||||
#include "event2/thread.h"
|
||||
#include "ratelim-internal.h"
|
||||
|
||||
/* These flags are reasons that we might be declining to actually enable
|
||||
reading or writing on a bufferevent.
|
||||
@ -43,22 +44,74 @@ extern "C" {
|
||||
/* On a all bufferevents, for reading: used when we have read up to the
|
||||
watermark value.
|
||||
|
||||
On a filtering bufferxevent, for writing: used when the underlying
|
||||
On a filtering bufferevent, for writing: used when the underlying
|
||||
bufferevent's write buffer has been filled up to its watermark
|
||||
value.
|
||||
*/
|
||||
#define BEV_SUSPEND_WM 0x01
|
||||
/* On a base bufferevent: when we have used up our bandwidth buckets. */
|
||||
/* On a base bufferevent: when we have emptied a bandwidth buckets */
|
||||
#define BEV_SUSPEND_BW 0x02
|
||||
/* On a socket bufferevent: we aren't going to try reading until the
|
||||
* connect operation is done. */
|
||||
#define BEV_SUSPEND_CONNECTING 0x04
|
||||
/* On a base bufferevent: when we have emptied the group's bandwidth bucket. */
|
||||
#define BEV_SUSPEND_BW_GROUP 0x04
|
||||
|
||||
struct token_bucket {
|
||||
ev_uint32_t limit;
|
||||
ev_uint32_t rate;
|
||||
ev_uint32_t burst;
|
||||
unsigned last_updated;
|
||||
struct bufferevent_rate_limit_group {
|
||||
/** List of all members in the group */
|
||||
TAILQ_HEAD(rlim_group_member_list, bufferevent_private) members;
|
||||
/** Current limits for the group. */
|
||||
struct ev_token_bucket rate_limit;
|
||||
struct ev_token_bucket_cfg rate_limit_cfg;
|
||||
|
||||
/** True iff we don't want to read from any member of the group.until
|
||||
* the token bucket refills. */
|
||||
unsigned read_suspended : 1;
|
||||
/** True iff we don't want to write from any member of the group.until
|
||||
* the token bucket refills. */
|
||||
unsigned write_suspended : 1;
|
||||
/** True iff we were unable to suspend one of the bufferevents in the
|
||||
* group for reading the last time we tried, and we should try
|
||||
* again. */
|
||||
unsigned pending_unsuspend_read : 1;
|
||||
/** True iff we were unable to suspend one of the bufferevents in the
|
||||
* group for writing the last time we tried, and we should try
|
||||
* again. */
|
||||
unsigned pending_unsuspend_write : 1;
|
||||
|
||||
/** The number of bufferevents in the group. */
|
||||
int n_members;
|
||||
|
||||
/** The smallest number of bytes that any member of the group should
|
||||
* be limited to read or write at a time. */
|
||||
ev_uint32_t min_share;
|
||||
/** Timeout event that goes off once a tick, when the bucket is ready
|
||||
* to refill. */
|
||||
struct event master_refill_event;
|
||||
/** Lock to protect the members of this group. This lock should nest
|
||||
* within every bufferevent lock: if you are holding this lock, do
|
||||
* not assume you can lock another bufferevent. */
|
||||
void *lock;
|
||||
};
|
||||
|
||||
/** Fields for rate-limiting a single bufferevent. */
|
||||
struct bufferevent_rate_limit {
|
||||
/* Linked-list elements for storing this bufferevent_private in a
|
||||
* group.
|
||||
*
|
||||
* Note that this field is supposed to be protected by the group
|
||||
* lock */
|
||||
TAILQ_ENTRY(bufferevent_private) next_in_group;
|
||||
/** The rate-limiting group for this bufferevent, or NULL if it is
|
||||
* only rate-limited on its own. */
|
||||
struct bufferevent_rate_limit_group *group;
|
||||
|
||||
/* This bufferevent's current limits. */
|
||||
struct ev_token_bucket limit;
|
||||
/* Pointer to the rate-limit configuration for this bufferevent.
|
||||
* Can be shared. XXX reference-count this? */
|
||||
struct ev_token_bucket_cfg *cfg;
|
||||
|
||||
/* Timeout event used when one this bufferevent's buckets are
|
||||
* empty. */
|
||||
struct event refill_bucket_event;
|
||||
};
|
||||
|
||||
/** Parts of the bufferevent structure that are shared among all bufferevent
|
||||
@ -111,6 +164,9 @@ struct bufferevent_private {
|
||||
/** Lock for this bufferevent. Shared by the inbuf and the outbuf.
|
||||
* If NULL, locking is disabled. */
|
||||
void *lock;
|
||||
|
||||
/** Rate-limiting information for this bufferevent */
|
||||
struct bufferevent_rate_limit *rate_limiting;
|
||||
};
|
||||
|
||||
/** Possible operations for a control callback. */
|
||||
@ -170,6 +226,7 @@ struct bufferevent_ops {
|
||||
|
||||
/** Called to access miscellaneous fields. */
|
||||
int (*ctrl)(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
|
||||
|
||||
};
|
||||
|
||||
extern const struct bufferevent_ops bufferevent_ops_socket;
|
||||
@ -287,6 +344,15 @@ void _bufferevent_generic_adj_timeouts(struct bufferevent *bev);
|
||||
EVLOCK_UNLOCK(locking->lock, 0); \
|
||||
} while(0)
|
||||
|
||||
/* ==== For rate-limiting. */
|
||||
|
||||
int _bufferevent_decrement_write_buckets(struct bufferevent_private *bev,
|
||||
int bytes);
|
||||
int _bufferevent_decrement_read_buckets(struct bufferevent_private *bev,
|
||||
int bytes);
|
||||
int _bufferevent_get_read_max(struct bufferevent_private *bev);
|
||||
int _bufferevent_get_write_max(struct bufferevent_private *bev);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
@ -531,6 +531,15 @@ _bufferevent_decref_and_unlock(struct bufferevent *bufev)
|
||||
evbuffer_free(bufev->input);
|
||||
evbuffer_free(bufev->output);
|
||||
|
||||
if (bufev_private->rate_limiting) {
|
||||
if (bufev_private->rate_limiting->group)
|
||||
bufferevent_remove_from_rate_limit_group(bufev);
|
||||
if (event_initialized(&bufev_private->rate_limiting->refill_bucket_event))
|
||||
event_del(&bufev_private->rate_limiting->refill_bucket_event);
|
||||
mm_free(bufev_private->rate_limiting);
|
||||
bufev_private->rate_limiting = NULL;
|
||||
}
|
||||
|
||||
BEV_UNLOCK(bufev);
|
||||
if (bufev_private->own_lock)
|
||||
EVTHREAD_FREE_LOCK(bufev_private->lock,
|
||||
|
@ -127,6 +127,8 @@ upcast_write(struct event_overlapped *eo)
|
||||
static void
|
||||
bev_async_consider_writing(struct bufferevent_async *b)
|
||||
{
|
||||
size_t at_most;
|
||||
int limit;
|
||||
/* Don't write if there's a write in progress, or we do not
|
||||
* want to write. */
|
||||
if (!b->ok || b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE))
|
||||
@ -135,8 +137,18 @@ bev_async_consider_writing(struct bufferevent_async *b)
|
||||
if (!evbuffer_get_length(b->bev.bev.output))
|
||||
return;
|
||||
|
||||
at_most = evbuffer_get_length(b->bev.bev.output);
|
||||
|
||||
/* XXXX This over-commits. */
|
||||
limit = _bufferevent_get_write_max(&b->bev);
|
||||
if (at_most >= limit)
|
||||
at_most = limit;
|
||||
|
||||
if (b->bev.write_suspended)
|
||||
return;
|
||||
|
||||
/* XXXX doesn't respect low-water mark very well. */
|
||||
if (evbuffer_launch_write(b->bev.bev.output, -1,
|
||||
if (evbuffer_launch_write(b->bev.bev.output, at_most,
|
||||
&b->write_overlapped)) {
|
||||
EVUTIL_ASSERT(0);/* XXX act sensibly. */
|
||||
} else {
|
||||
@ -150,6 +162,7 @@ bev_async_consider_reading(struct bufferevent_async *b)
|
||||
size_t cur_size;
|
||||
size_t read_high;
|
||||
size_t at_most;
|
||||
int limit;
|
||||
/* Don't read if there is a read in progress, or we do not
|
||||
* want to read. */
|
||||
if (!b->ok || b->read_in_progress || !(b->bev.bev.enabled&EV_READ))
|
||||
@ -166,6 +179,14 @@ bev_async_consider_reading(struct bufferevent_async *b)
|
||||
at_most = 16384; /* FIXME totally magic. */
|
||||
}
|
||||
|
||||
/* XXXX This over-commits. */
|
||||
limit = _bufferevent_get_read_max(&b->bev);
|
||||
if (at_most >= limit)
|
||||
at_most = limit;
|
||||
|
||||
if (b->bev.read_suspended)
|
||||
return;
|
||||
|
||||
if (evbuffer_launch_read(b->bev.bev.input, at_most,
|
||||
&b->read_overlapped)) {
|
||||
EVUTIL_ASSERT(0);
|
||||
@ -304,6 +325,7 @@ read_complete(struct event_overlapped *eo, uintptr_t key,
|
||||
|
||||
if (ok && nbytes) {
|
||||
BEV_RESET_GENERIC_READ_TIMEOUT(bev);
|
||||
_bufferevent_decrement_read_buckets(&bev_a->bev, nbytes);
|
||||
if (evbuffer_get_length(bev->input) >= bev->wm_read.low)
|
||||
_bufferevent_run_readcb(bev);
|
||||
bev_async_consider_reading(bev_a);
|
||||
@ -336,6 +358,7 @@ write_complete(struct event_overlapped *eo, uintptr_t key,
|
||||
|
||||
if (ok && nbytes) {
|
||||
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
|
||||
_bufferevent_decrement_write_buckets(&bev_a->bev, nbytes);
|
||||
if (evbuffer_get_length(bev->output) <= bev->wm_write.low)
|
||||
_bufferevent_run_writecb(bev);
|
||||
bev_async_consider_writing(bev_a);
|
||||
|
@ -524,20 +524,29 @@ do_read(struct bufferevent_openssl *bev_ssl, int n_to_read)
|
||||
/* Requires lock */
|
||||
struct bufferevent *bev = &bev_ssl->bev.bev;
|
||||
struct evbuffer *input = bev->input;
|
||||
int r, n, i, n_used = 0, blocked = 0;
|
||||
int r, n, i, n_used = 0, blocked = 0, atmost;
|
||||
struct evbuffer_iovec space[2];
|
||||
|
||||
atmost = _bufferevent_get_read_max(&bev_ssl->bev);
|
||||
if (n_to_read > atmost)
|
||||
n_to_read = atmost;
|
||||
|
||||
n = evbuffer_reserve_space(input, n_to_read, space, 2);
|
||||
if (n < 0)
|
||||
return -1;
|
||||
|
||||
for (i=0; i<n; ++i) {
|
||||
if (bev_ssl->bev.read_suspended)
|
||||
break;
|
||||
r = SSL_read(bev_ssl->ssl, space[i].iov_base, space[i].iov_len);
|
||||
if (r>0) {
|
||||
if (bev_ssl->read_blocked_on_write)
|
||||
clear_rbow(bev_ssl);
|
||||
++n_used;
|
||||
space[i].iov_len = r;
|
||||
/* Not exactly right; we probably want to do
|
||||
* our rate-limiting on the underlying bytes. */
|
||||
_bufferevent_decrement_read_buckets(&bev_ssl->bev, r);
|
||||
} else {
|
||||
int err = SSL_get_error(bev_ssl->ssl, r);
|
||||
print_err(err);
|
||||
@ -584,6 +593,8 @@ do_write(struct bufferevent_openssl *bev_ssl, int atmost)
|
||||
|
||||
if (bev_ssl->last_write > 0)
|
||||
atmost = bev_ssl->last_write;
|
||||
else
|
||||
atmost = _bufferevent_get_write_max(&bev_ssl->bev);
|
||||
|
||||
n = evbuffer_peek(output, atmost, NULL, space, 8);
|
||||
if (n < 0)
|
||||
@ -592,6 +603,9 @@ do_write(struct bufferevent_openssl *bev_ssl, int atmost)
|
||||
if (n > 8)
|
||||
n = 8;
|
||||
for (i=0; i < n; ++i) {
|
||||
if (bev_ssl->bev.write_suspended)
|
||||
break;
|
||||
|
||||
r = SSL_write(bev_ssl->ssl, space[i].iov_base,
|
||||
space[i].iov_len);
|
||||
if (r > 0) {
|
||||
@ -599,6 +613,9 @@ do_write(struct bufferevent_openssl *bev_ssl, int atmost)
|
||||
clear_wbor(bev_ssl);
|
||||
n_written += r;
|
||||
bev_ssl->last_write = -1;
|
||||
/* Not exactly right; we probably want to do
|
||||
* our rate-limiting on the underlying bytes. */
|
||||
_bufferevent_decrement_write_buckets(&bev_ssl->bev, r);
|
||||
} else {
|
||||
int err = SSL_get_error(bev_ssl->ssl, r);
|
||||
print_err(err);
|
||||
@ -659,6 +676,7 @@ consider_reading(struct bufferevent_openssl *bev_ssl)
|
||||
if (bev_ssl->write_blocked_on_read)
|
||||
return;
|
||||
while ((bev_ssl->bev.bev.enabled & EV_READ) &&
|
||||
(! bev_ssl->bev.read_suspended) &&
|
||||
(! wm->high || evbuffer_get_length(input) < wm->high)) {
|
||||
int n_to_read =
|
||||
wm->high ? wm->high - evbuffer_get_length(input)
|
||||
@ -689,6 +707,7 @@ consider_writing(struct bufferevent_openssl *bev_ssl)
|
||||
wm = &bev_ssl->underlying->wm_write;
|
||||
}
|
||||
while ((bev_ssl->bev.bev.enabled & EV_WRITE) &&
|
||||
(! bev_ssl->bev.write_suspended) &&
|
||||
evbuffer_get_length(output) &&
|
||||
(!target || (! wm->high || evbuffer_get_length(target) < wm->high))) {
|
||||
int n_to_write;
|
||||
|
654
bufferevent_ratelim.c
Normal file
654
bufferevent_ratelim.c
Normal file
@ -0,0 +1,654 @@
|
||||
/*
|
||||
* Copyright (c) 2007-2009 Niels Provos and Nick Mathewson
|
||||
* Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions
|
||||
* are met:
|
||||
* 1. Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* 2. Redistributions in binary form must reproduce the above copyright
|
||||
* notice, this list of conditions and the following disclaimer in the
|
||||
* documentation and/or other materials provided with the distribution.
|
||||
* 3. The name of the author may not be used to endorse or promote products
|
||||
* derived from this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
|
||||
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
|
||||
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
|
||||
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
|
||||
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
|
||||
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
|
||||
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
#include <sys/types.h>
|
||||
#include <limits.h>
|
||||
#include <string.h>
|
||||
#include <stdlib.h>
|
||||
|
||||
#include "event2/event.h"
|
||||
#include "event2/event_struct.h"
|
||||
#include "event2/util.h"
|
||||
#include "event2/bufferevent.h"
|
||||
#include "event2/bufferevent_struct.h"
|
||||
#include "event2/buffer.h"
|
||||
|
||||
#include "ratelim-internal.h"
|
||||
|
||||
#include "bufferevent-internal.h"
|
||||
#include "mm-internal.h"
|
||||
#include "util-internal.h"
|
||||
|
||||
int
|
||||
ev_token_bucket_init(struct ev_token_bucket *bucket,
|
||||
const struct ev_token_bucket_cfg *cfg,
|
||||
ev_uint32_t current_tick,
|
||||
int reinitialize)
|
||||
{
|
||||
if (reinitialize) {
|
||||
/* on reinitialization, we only clip downwards, since we've
|
||||
already used who-knows-how-much bandwidth this tick. We
|
||||
leave "last_updated" as it is; the next update will add the
|
||||
appropriate amount of bandwidth to the bucket.
|
||||
*/
|
||||
if (bucket->read_limit > cfg->read_maximum)
|
||||
bucket->read_limit = cfg->read_maximum;
|
||||
if (bucket->write_limit > cfg->write_maximum)
|
||||
bucket->write_limit = cfg->write_maximum;
|
||||
} else {
|
||||
bucket->read_limit = cfg->read_rate;
|
||||
bucket->write_limit = cfg->write_rate;
|
||||
bucket->last_updated = current_tick;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int
|
||||
ev_token_bucket_update(struct ev_token_bucket *bucket,
|
||||
const struct ev_token_bucket_cfg *cfg,
|
||||
ev_uint32_t current_tick)
|
||||
{
|
||||
/* It's okay if the tick number overflows, since we'll just
|
||||
* wrap around when we do the unsigned substraction. */
|
||||
unsigned n_ticks = current_tick - bucket->last_updated;
|
||||
|
||||
/* Make sure some ticks actually happened, and that time didn't
|
||||
* roll back. */
|
||||
if (n_ticks == 0 || n_ticks > INT_MAX)
|
||||
return 0;
|
||||
|
||||
/* Naively, we would say
|
||||
bucket->limit += n_ticks * cfg->rate;
|
||||
|
||||
if (bucket->limit > cfg->maximum)
|
||||
bucket->limit = cfg->maximum;
|
||||
|
||||
But we're worried about overflow, so we do it like this:
|
||||
*/
|
||||
|
||||
if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
|
||||
bucket->read_limit = cfg->read_maximum;
|
||||
else
|
||||
bucket->read_limit += n_ticks * cfg->read_rate;
|
||||
|
||||
|
||||
if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
|
||||
bucket->write_limit = cfg->write_maximum;
|
||||
else
|
||||
bucket->write_limit += n_ticks * cfg->write_rate;
|
||||
|
||||
|
||||
bucket->last_updated = current_tick;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
ev_uint32_t
|
||||
ev_token_bucket_get_tick(const struct timeval *tv,
|
||||
const struct ev_token_bucket_cfg *cfg)
|
||||
{
|
||||
/* This computation uses two multiplies and a divide. We could do
|
||||
* fewer if we knew that the tick length was an integer number of
|
||||
* seconds, or if we knew it divided evenly into a second. We should
|
||||
* investigate that more.
|
||||
*/
|
||||
|
||||
/* We cast to an ev_uint64_t first, since we don't want to overflow
|
||||
* before we do the final divide. */
|
||||
ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
|
||||
return (unsigned)(msec / cfg->msec_per_tick);
|
||||
}
|
||||
|
||||
struct ev_token_bucket_cfg *
|
||||
ev_token_bucket_cfg_new(ev_uint32_t read_rate, ev_uint32_t read_burst,
|
||||
ev_uint32_t write_rate, ev_uint32_t write_burst,
|
||||
const struct timeval *tick_len)
|
||||
{
|
||||
struct ev_token_bucket_cfg *r;
|
||||
struct timeval g;
|
||||
if (! tick_len) {
|
||||
g.tv_sec = 1;
|
||||
g.tv_usec = 0;
|
||||
tick_len = &g;
|
||||
}
|
||||
if (read_rate > read_burst || write_rate > write_burst ||
|
||||
read_rate < 1 || write_rate < 1)
|
||||
return NULL;
|
||||
r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
|
||||
if (!r)
|
||||
return NULL;
|
||||
r->read_rate = read_rate;
|
||||
r->write_rate = write_rate;
|
||||
r->read_maximum = read_burst;
|
||||
r->write_maximum = write_burst;
|
||||
memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
|
||||
r->msec_per_tick = (tick_len->tv_sec * 1000) + tick_len->tv_usec/1000;
|
||||
return r;
|
||||
}
|
||||
|
||||
void
|
||||
ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
|
||||
{
|
||||
mm_free(cfg);
|
||||
}
|
||||
|
||||
/* No matter how big our bucket gets, don't try to read more than this
|
||||
* much in a single read operation. */
|
||||
#define MAX_TO_READ_EVER 16384
|
||||
/* No matter how big our bucket gets, don't try to write more than this
|
||||
* much in a single write operation. */
|
||||
#define MAX_TO_WRITE_EVER 16384
|
||||
|
||||
#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
|
||||
#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
|
||||
|
||||
static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g);
|
||||
static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g);
|
||||
|
||||
/** Helper: figure out the maximum amount we should write if is_write, or
|
||||
the maximum amount we should read if is_read. Return that maximum, or
|
||||
0 if our bucket is wholly exhausted.
|
||||
*/
|
||||
static inline int
|
||||
_bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
|
||||
{
|
||||
/* needs lock on bev. */
|
||||
int max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER;
|
||||
struct timeval now;
|
||||
|
||||
#define LIM(x) \
|
||||
(is_write ? (x).write_limit : (x).read_limit)
|
||||
|
||||
#define GROUP_SUSPENDED(g) \
|
||||
(is_write ? (g)->write_suspended : (g)->read_suspended)
|
||||
|
||||
/* Sets max_so_far to MIN(x, max_so_far) */
|
||||
#define CLAMPTO(x) \
|
||||
do { \
|
||||
if (max_so_far > (x)) \
|
||||
max_so_far = (x); \
|
||||
} while (0);
|
||||
|
||||
if (!bev->rate_limiting)
|
||||
return max_so_far;
|
||||
|
||||
/* If rate-limiting is enabled at all, update the appropriate
|
||||
bucket, and take the smaller of our rate limit and the group
|
||||
rate limit.
|
||||
*/
|
||||
|
||||
if (bev->rate_limiting->cfg) {
|
||||
unsigned tick;
|
||||
|
||||
event_base_gettimeofday_cached(bev->bev.ev_base, &now);
|
||||
tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
|
||||
ev_token_bucket_update(&bev->rate_limiting->limit,
|
||||
bev->rate_limiting->cfg, tick);
|
||||
max_so_far = LIM(bev->rate_limiting->limit);
|
||||
}
|
||||
if (bev->rate_limiting->group) {
|
||||
struct bufferevent_rate_limit_group *g =
|
||||
bev->rate_limiting->group;
|
||||
ev_uint32_t share;
|
||||
LOCK_GROUP(g);
|
||||
if (GROUP_SUSPENDED(g)) {
|
||||
/* We can get here if we failed to lock this
|
||||
* particular bufferevent while suspending the whole
|
||||
* group. */
|
||||
if (is_write)
|
||||
bufferevent_suspend_write(&bev->bev,
|
||||
BEV_SUSPEND_BW_GROUP);
|
||||
else
|
||||
bufferevent_suspend_read(&bev->bev,
|
||||
BEV_SUSPEND_BW_GROUP);
|
||||
share = 0;
|
||||
} else {
|
||||
/* XXXX probably we should divide among the active
|
||||
* members, not the total members. */
|
||||
share = LIM(g->rate_limit) / g->n_members;
|
||||
if (share < g->min_share)
|
||||
share = g->min_share;
|
||||
}
|
||||
UNLOCK_GROUP(g);
|
||||
CLAMPTO(share);
|
||||
}
|
||||
|
||||
return max_so_far;
|
||||
}
|
||||
|
||||
int
|
||||
_bufferevent_get_read_max(struct bufferevent_private *bev)
|
||||
{
|
||||
return _bufferevent_get_rlim_max(bev, 0);
|
||||
}
|
||||
|
||||
int
|
||||
_bufferevent_get_write_max(struct bufferevent_private *bev)
|
||||
{
|
||||
return _bufferevent_get_rlim_max(bev, 1);
|
||||
}
|
||||
|
||||
int
|
||||
_bufferevent_decrement_read_buckets(struct bufferevent_private *bev, int bytes)
|
||||
{
|
||||
/* need to hold lock on bev */
|
||||
if (!bev->rate_limiting)
|
||||
return 0;
|
||||
|
||||
if (bev->rate_limiting->cfg) {
|
||||
bev->rate_limiting->limit.read_limit -= bytes;
|
||||
if (bev->rate_limiting->limit.read_limit <= 0) {
|
||||
bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW);
|
||||
event_add(&bev->rate_limiting->refill_bucket_event,
|
||||
&bev->rate_limiting->cfg->tick_timeout);
|
||||
}
|
||||
}
|
||||
|
||||
if (bev->rate_limiting->group) {
|
||||
LOCK_GROUP(bev->rate_limiting->group);
|
||||
bev->rate_limiting->group->rate_limit.read_limit -= bytes;
|
||||
if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
|
||||
_bev_group_suspend_reading(bev->rate_limiting->group);
|
||||
}
|
||||
UNLOCK_GROUP(bev->rate_limiting->group);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int
|
||||
_bufferevent_decrement_write_buckets(struct bufferevent_private *bev, int bytes)
|
||||
{
|
||||
/* need to hold lock */
|
||||
if (!bev->rate_limiting)
|
||||
return 0;
|
||||
|
||||
if (bev->rate_limiting->cfg) {
|
||||
bev->rate_limiting->limit.write_limit -= bytes;
|
||||
if (bev->rate_limiting->limit.write_limit <= 0) {
|
||||
bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW);
|
||||
event_add(&bev->rate_limiting->refill_bucket_event,
|
||||
&bev->rate_limiting->cfg->tick_timeout);
|
||||
}
|
||||
}
|
||||
|
||||
if (bev->rate_limiting->group) {
|
||||
LOCK_GROUP(bev->rate_limiting->group);
|
||||
bev->rate_limiting->group->rate_limit.write_limit -= bytes;
|
||||
if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
|
||||
_bev_group_suspend_writing(bev->rate_limiting->group);
|
||||
}
|
||||
UNLOCK_GROUP(bev->rate_limiting->group);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/** Stop reading on every bufferevent in <b>g</b> */
|
||||
static int
|
||||
_bev_group_suspend_reading(struct bufferevent_rate_limit_group *g)
|
||||
{
|
||||
/* Needs group lock */
|
||||
struct bufferevent_private *bev;
|
||||
g->read_suspended = 1;
|
||||
g->pending_unsuspend_read = 0;
|
||||
|
||||
/* Note that in this loop we call EVLOCK_TRY_LOCK instead of BEV_LOCK,
|
||||
to prevent a deadlock. (Ordinarily, the group lock nests inside
|
||||
the bufferevent locks. If we are unable to lock any individual
|
||||
bufferevent, it will find out later when it looks at its limit
|
||||
and sees that its group is suspended.
|
||||
*/
|
||||
TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
|
||||
if (EVLOCK_TRY_LOCK(bev->lock)) {
|
||||
bufferevent_suspend_read(&bev->bev,
|
||||
BEV_SUSPEND_BW_GROUP);
|
||||
EVLOCK_UNLOCK(bev->lock, 0);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/** Stop writing on every bufferevent in <b>g</b> */
|
||||
static int
|
||||
_bev_group_suspend_writing(struct bufferevent_rate_limit_group *g)
|
||||
{
|
||||
/* Needs group lock */
|
||||
struct bufferevent_private *bev;
|
||||
g->write_suspended = 1;
|
||||
g->pending_unsuspend_write = 0;
|
||||
TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
|
||||
if (EVLOCK_TRY_LOCK(bev->lock)) {
|
||||
bufferevent_suspend_write(&bev->bev,
|
||||
BEV_SUSPEND_BW_GROUP);
|
||||
EVLOCK_UNLOCK(bev->lock, 0);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/** Timer callback invoked on a single bufferevent with one or more exhausted
|
||||
buckets when they are ready to refill. */
|
||||
static void
|
||||
_bev_refill_callback(evutil_socket_t fd, short what, void *arg)
|
||||
{
|
||||
unsigned tick;
|
||||
struct timeval now;
|
||||
struct bufferevent_private *bev = arg;
|
||||
int again = 0;
|
||||
BEV_LOCK(&bev->bev);
|
||||
if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
|
||||
BEV_UNLOCK(&bev->bev);
|
||||
return;
|
||||
}
|
||||
|
||||
/* First, update the bucket */
|
||||
event_base_gettimeofday_cached(bev->bev.ev_base, &now);
|
||||
tick = ev_token_bucket_get_tick(&now,
|
||||
bev->rate_limiting->cfg);
|
||||
ev_token_bucket_update(&bev->rate_limiting->limit,
|
||||
bev->rate_limiting->cfg,
|
||||
tick);
|
||||
|
||||
/* Now unsuspend any read/write operations as appropriate. */
|
||||
if ((bev->read_suspended & BEV_SUSPEND_BW)) {
|
||||
if (bev->rate_limiting->limit.read_limit > 0)
|
||||
bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
|
||||
else
|
||||
again = 1;
|
||||
}
|
||||
if ((bev->write_suspended & BEV_SUSPEND_BW)) {
|
||||
if (bev->rate_limiting->limit.write_limit > 0)
|
||||
bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
|
||||
else
|
||||
again = 1;
|
||||
}
|
||||
if (again) {
|
||||
/* One or more of the buckets may need another refill if they
|
||||
started negative.
|
||||
|
||||
XXXX if we need to be quiet for more ticks, we should
|
||||
maybe figure out what timeout we really want.
|
||||
*/
|
||||
event_add(&bev->rate_limiting->refill_bucket_event,
|
||||
&bev->rate_limiting->cfg->tick_timeout);
|
||||
}
|
||||
BEV_UNLOCK(&bev->bev);
|
||||
}
|
||||
|
||||
/** Helper: grab a random element from a bufferevent group. */
|
||||
static struct bufferevent_private *
|
||||
_bev_group_random_element(struct bufferevent_rate_limit_group *group)
|
||||
{
|
||||
int which;
|
||||
struct bufferevent_private *bev;
|
||||
|
||||
/* requires group lock */
|
||||
|
||||
if (!group->n_members)
|
||||
return NULL;
|
||||
|
||||
EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members));
|
||||
|
||||
which = _evutil_weakrand() % group->n_members;
|
||||
|
||||
bev = TAILQ_FIRST(&group->members);
|
||||
while (which--)
|
||||
bev = TAILQ_NEXT(bev, rate_limiting->next_in_group);
|
||||
|
||||
return bev;
|
||||
}
|
||||
|
||||
/** Iterate over the elements of a rate-limiting group 'g' with a random
|
||||
starting point, assigning each to the variable 'bev', and executing the
|
||||
block 'block'.
|
||||
|
||||
We do this in a half-baked effort to get fairness among group members.
|
||||
XXX Round-robin or some kind of priority queue would be even more fair.
|
||||
*/
|
||||
#define FOREACH_RANDOM_ORDER(block) \
|
||||
do { \
|
||||
first = _bev_group_random_element(g); \
|
||||
for (bev = first; bev != TAILQ_END(&g->members); \
|
||||
bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
|
||||
block ; \
|
||||
} \
|
||||
for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \
|
||||
bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
|
||||
block ; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
/** Callback invoked every tick to add more elements to the group bucket
|
||||
and unsuspend group members as needed.
|
||||
*/
|
||||
static void
|
||||
_bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
|
||||
{
|
||||
struct bufferevent_rate_limit_group *g = arg;
|
||||
unsigned tick;
|
||||
struct timeval now;
|
||||
int again = 0;
|
||||
struct bufferevent_private *bev, *first;
|
||||
|
||||
event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
|
||||
|
||||
LOCK_GROUP(g);
|
||||
tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg);
|
||||
ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick);
|
||||
|
||||
if (g->pending_unsuspend_read ||
|
||||
(g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
|
||||
g->read_suspended = 0;
|
||||
again = 0;
|
||||
FOREACH_RANDOM_ORDER({
|
||||
if (EVLOCK_TRY_LOCK(bev->lock)) {
|
||||
bufferevent_unsuspend_read(&bev->bev,
|
||||
BEV_SUSPEND_BW_GROUP);
|
||||
EVLOCK_UNLOCK(bev->lock, 0);
|
||||
} else {
|
||||
again = 1;
|
||||
}
|
||||
});
|
||||
g->pending_unsuspend_read = again;
|
||||
}
|
||||
if (g->pending_unsuspend_write ||
|
||||
(g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
|
||||
g->write_suspended = 0;
|
||||
again = 0;
|
||||
FOREACH_RANDOM_ORDER({
|
||||
if (EVLOCK_TRY_LOCK(bev->lock)) {
|
||||
bufferevent_unsuspend_write(&bev->bev,
|
||||
BEV_SUSPEND_BW_GROUP);
|
||||
EVLOCK_UNLOCK(bev->lock, 0);
|
||||
} else {
|
||||
again = 1;
|
||||
}
|
||||
});
|
||||
g->pending_unsuspend_write = again;
|
||||
}
|
||||
|
||||
/* XXXX Rather than waiting to the next tick to unsuspend stuff
|
||||
* with pending_unsuspend_write/read, we should do it on the
|
||||
* next iteration of the mainloop.
|
||||
*/
|
||||
|
||||
UNLOCK_GROUP(g);
|
||||
}
|
||||
|
||||
int
|
||||
bufferevent_set_rate_limit(struct bufferevent *bev,
|
||||
struct ev_token_bucket_cfg *cfg)
|
||||
{
|
||||
struct bufferevent_private *bevp =
|
||||
EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
|
||||
int r = -1;
|
||||
struct bufferevent_rate_limit *rlim;
|
||||
struct timeval now;
|
||||
ev_uint32_t tick;
|
||||
/* XXX reference-count cfg */
|
||||
|
||||
BEV_LOCK(bev);
|
||||
|
||||
if (cfg == NULL) {
|
||||
if (bevp->rate_limiting) {
|
||||
bevp->rate_limiting->cfg = NULL;
|
||||
bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
|
||||
bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
|
||||
}
|
||||
r = 0;
|
||||
goto done;
|
||||
}
|
||||
|
||||
event_base_gettimeofday_cached(bev->ev_base, &now);
|
||||
tick = ev_token_bucket_get_tick(&now, cfg);
|
||||
|
||||
if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
|
||||
;
|
||||
} else if (bevp->rate_limiting) {
|
||||
bevp->rate_limiting->cfg = cfg;
|
||||
ev_token_bucket_init(&bevp->rate_limiting->limit, cfg, tick, 1);
|
||||
if (bevp->rate_limiting->limit.read_limit > 0)
|
||||
bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
|
||||
else
|
||||
bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
|
||||
if (bevp->rate_limiting->limit.write_limit > 0)
|
||||
bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
|
||||
else
|
||||
bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
|
||||
} else {
|
||||
rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
|
||||
if (!rlim)
|
||||
goto done;
|
||||
rlim->cfg = cfg;
|
||||
ev_token_bucket_init(&rlim->limit, cfg, tick, 0);
|
||||
evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
|
||||
_bev_refill_callback, bevp);
|
||||
bevp->rate_limiting = rlim;
|
||||
}
|
||||
r = 0;
|
||||
done:
|
||||
BEV_UNLOCK(bev);
|
||||
return r;
|
||||
}
|
||||
|
||||
struct bufferevent_rate_limit_group *
|
||||
bufferevent_rate_limit_group_new(struct event_base *base,
|
||||
const struct ev_token_bucket_cfg *cfg)
|
||||
{
|
||||
struct bufferevent_rate_limit_group *g;
|
||||
struct timeval now;
|
||||
ev_uint32_t tick;
|
||||
|
||||
event_base_gettimeofday_cached(base, &now);
|
||||
tick = ev_token_bucket_get_tick(&now, cfg);
|
||||
|
||||
g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
|
||||
if (!g)
|
||||
return NULL;
|
||||
memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
|
||||
TAILQ_INIT(&g->members);
|
||||
|
||||
ev_token_bucket_init(&g->rate_limit, cfg, tick, 0);
|
||||
|
||||
g->min_share = 64;
|
||||
event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
|
||||
_bev_group_refill_callback, g);
|
||||
event_add(&g->master_refill_event, &cfg->tick_timeout);
|
||||
|
||||
EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
|
||||
return g;
|
||||
}
|
||||
|
||||
int
|
||||
bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
|
||||
struct bufferevent_rate_limit_group *g)
|
||||
{
|
||||
int wsuspend, rsuspend;
|
||||
struct bufferevent_private *bevp =
|
||||
EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
|
||||
BEV_LOCK(bev);
|
||||
|
||||
if (!bevp->rate_limiting) {
|
||||
struct bufferevent_rate_limit *rlim;
|
||||
rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
|
||||
if (!rlim) {
|
||||
BEV_UNLOCK(bev);
|
||||
return -1;
|
||||
}
|
||||
evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
|
||||
_bev_refill_callback, bevp);
|
||||
bevp->rate_limiting = rlim;
|
||||
}
|
||||
|
||||
if (bevp->rate_limiting->group == g) {
|
||||
BEV_UNLOCK(bev);
|
||||
return 0;
|
||||
}
|
||||
if (bevp->rate_limiting->group)
|
||||
bufferevent_remove_from_rate_limit_group(bev);
|
||||
|
||||
LOCK_GROUP(g);
|
||||
bevp->rate_limiting->group = g;
|
||||
++g->n_members;
|
||||
TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group);
|
||||
|
||||
rsuspend = g->read_suspended;
|
||||
wsuspend = g->write_suspended;
|
||||
|
||||
UNLOCK_GROUP(g);
|
||||
|
||||
if (rsuspend)
|
||||
bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP);
|
||||
if (wsuspend)
|
||||
bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP);
|
||||
|
||||
BEV_UNLOCK(bev);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int
|
||||
bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
|
||||
{
|
||||
struct bufferevent_private *bevp =
|
||||
EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
|
||||
BEV_LOCK(bev);
|
||||
if (bevp->rate_limiting && bevp->rate_limiting->group) {
|
||||
struct bufferevent_rate_limit_group *g =
|
||||
bevp->rate_limiting->group;
|
||||
LOCK_GROUP(g);
|
||||
bevp->rate_limiting->group = NULL;
|
||||
--g->n_members;
|
||||
TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group);
|
||||
UNLOCK_GROUP(g);
|
||||
}
|
||||
bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP);
|
||||
bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP);
|
||||
BEV_UNLOCK(bev);
|
||||
return 0;
|
||||
}
|
@ -118,10 +118,12 @@ static void
|
||||
bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
|
||||
{
|
||||
struct bufferevent *bufev = arg;
|
||||
struct bufferevent_private *bufev_p =
|
||||
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
|
||||
struct evbuffer *input;
|
||||
int res = 0;
|
||||
short what = BEV_EVENT_READING;
|
||||
int howmuch = -1;
|
||||
int howmuch = -1, readmax=-1;
|
||||
|
||||
_bufferevent_incref_and_lock(bufev);
|
||||
|
||||
@ -144,6 +146,12 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
|
||||
goto done;
|
||||
}
|
||||
}
|
||||
readmax = _bufferevent_get_read_max(bufev_p);
|
||||
if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited"
|
||||
* uglifies this code. */
|
||||
howmuch = readmax;
|
||||
if (bufev_p->read_suspended)
|
||||
goto done;
|
||||
|
||||
evbuffer_unfreeze(input, 0);
|
||||
res = evbuffer_read(input, fd, howmuch);
|
||||
@ -163,6 +171,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
|
||||
if (res <= 0)
|
||||
goto error;
|
||||
|
||||
_bufferevent_decrement_read_buckets(bufev_p, res);
|
||||
|
||||
/* Invoke the user callback - must always be called last */
|
||||
if (evbuffer_get_length(input) >= bufev->wm_read.low)
|
||||
@ -190,6 +199,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
|
||||
int res = 0;
|
||||
short what = BEV_EVENT_WRITING;
|
||||
int connected = 0;
|
||||
int atmost = -1;
|
||||
|
||||
_bufferevent_incref_and_lock(bufev);
|
||||
|
||||
@ -211,7 +221,6 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
|
||||
goto done;
|
||||
} else {
|
||||
connected = 1;
|
||||
bufferevent_unsuspend_read(bufev, BEV_SUSPEND_CONNECTING);
|
||||
#ifdef WIN32
|
||||
if (BEV_IS_ASYNC(bufev)) {
|
||||
event_del(&bufev->ev_write);
|
||||
@ -231,9 +240,14 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
|
||||
}
|
||||
}
|
||||
|
||||
atmost = _bufferevent_get_write_max(bufev_p);
|
||||
|
||||
if (bufev_p->write_suspended)
|
||||
goto done;
|
||||
|
||||
if (evbuffer_get_length(bufev->output)) {
|
||||
evbuffer_unfreeze(bufev->output, 1);
|
||||
res = evbuffer_write(bufev->output, fd);
|
||||
res = evbuffer_write_atmost(bufev->output, fd, atmost);
|
||||
evbuffer_freeze(bufev->output, 1);
|
||||
if (res == -1) {
|
||||
int err = evutil_socket_geterror(fd);
|
||||
@ -249,6 +263,8 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
|
||||
}
|
||||
if (res <= 0)
|
||||
goto error;
|
||||
|
||||
_bufferevent_decrement_write_buckets(bufev_p, res);
|
||||
}
|
||||
|
||||
if (evbuffer_get_length(bufev->output) == 0)
|
||||
@ -390,11 +406,7 @@ freesock:
|
||||
if (ownfd)
|
||||
EVUTIL_CLOSESOCKET(fd);
|
||||
/* do something about the error? */
|
||||
|
||||
done:
|
||||
if (result == 0)
|
||||
bufferevent_suspend_read(bev, BEV_SUSPEND_CONNECTING);
|
||||
|
||||
_bufferevent_decref_and_unlock(bev);
|
||||
return result;
|
||||
}
|
||||
@ -557,6 +569,10 @@ be_socket_setfd(struct bufferevent *bufev, evutil_socket_t fd)
|
||||
EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
|
||||
event_assign(&bufev->ev_write, bufev->ev_base, fd,
|
||||
EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
|
||||
|
||||
if (fd >= 0)
|
||||
bufferevent_enable(bufev, bufev->enabled);
|
||||
|
||||
BEV_UNLOCK(bufev);
|
||||
}
|
||||
|
||||
|
16
event.c
16
event.c
@ -180,6 +180,22 @@ gettime(struct event_base *base, struct timeval *tp)
|
||||
return (evutil_gettimeofday(tp, NULL));
|
||||
}
|
||||
|
||||
int
|
||||
event_base_gettimeofday_cached(struct event_base *base, struct timeval *tv)
|
||||
{
|
||||
int r;
|
||||
if (!base) {
|
||||
base = current_base;
|
||||
if (!current_base)
|
||||
return evutil_gettimeofday(tv, NULL);
|
||||
}
|
||||
|
||||
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
|
||||
r = gettime(base, tv);
|
||||
EVBASE_RELEASE_LOCK(base, th_base_lock);
|
||||
return r;
|
||||
}
|
||||
|
||||
static inline void
|
||||
clear_time_cache(struct event_base *base)
|
||||
{
|
||||
|
11
evutil.c
11
evutil.c
@ -1776,3 +1776,14 @@ evutil_getenv(const char *varname)
|
||||
|
||||
return getenv(varname);
|
||||
}
|
||||
|
||||
long
|
||||
_evutil_weakrand(void)
|
||||
{
|
||||
#ifdef WIN32
|
||||
return rand();
|
||||
#else
|
||||
return random();
|
||||
#endif
|
||||
}
|
||||
|
||||
|
@ -499,6 +499,97 @@ int
|
||||
bufferevent_pair_new(struct event_base *base, int options,
|
||||
struct bufferevent *pair[2]);
|
||||
|
||||
|
||||
/**
|
||||
Abstract type used to configure rate-limiting on a bufferevent or a group
|
||||
of bufferevents.
|
||||
*/
|
||||
struct ev_token_bucket_cfg;
|
||||
/**
|
||||
A group of bufferevents which are configured to respect the same rate
|
||||
limit.
|
||||
*/
|
||||
struct bufferevent_rate_limit_group;
|
||||
|
||||
/**
|
||||
Initialize and return a new object to configure the rate-limiting behavior
|
||||
of bufferevents.
|
||||
|
||||
@param read_rate The maximum number of bytes to read per tick on
|
||||
average.
|
||||
@param read_burst The maximum number of bytes to read in any single tick.
|
||||
@param write_rate The maximum number of bytes to write per tick on
|
||||
average.
|
||||
@param write_burst The maximum number of bytes to write in any single tick.
|
||||
@param tick_len The length of a single tick. Defaults to one second.
|
||||
Any fractions of a millisecond are ignored.
|
||||
|
||||
Note that all rate-limits hare are currently best-effort: future versions
|
||||
of Libevent may implement them more tightly.
|
||||
*/
|
||||
struct ev_token_bucket_cfg *ev_token_bucket_cfg_new(
|
||||
ev_uint32_t read_rate, ev_uint32_t read_burst,
|
||||
ev_uint32_t write_rate, ev_uint32_t write_burst,
|
||||
const struct timeval *tick_len);
|
||||
|
||||
/** Free all storage held in 'cfg'.
|
||||
|
||||
Note: 'cfg' is not currently reference-counted; it is not safe to free it
|
||||
until no bufferevent is using it.
|
||||
*/
|
||||
void ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg);
|
||||
|
||||
/**
|
||||
Set the rate-limit of a the bufferevent 'bev' to the one specified in
|
||||
'cfg'. If 'cfg' is NULL, disable any per-bufferevent rate-limiting on
|
||||
'bev'.
|
||||
|
||||
Note that only some bufferevent types currently respect rate-limiting.
|
||||
They are: socket-based bufferevents (normal and IOCP-based), and SSL-based
|
||||
bufferevents.
|
||||
|
||||
Return 0 on sucess, -1 on failure.
|
||||
*/
|
||||
int bufferevent_set_rate_limit(struct bufferevent *bev,
|
||||
struct ev_token_bucket_cfg *cfg);
|
||||
/**
|
||||
Create a new rate-limit group for bufferevents. A rate-limit group
|
||||
constrains the maximum number of bytes sent and received, in toto,
|
||||
by all of its bufferevents.
|
||||
|
||||
@param base An event_base to run any necessary timeouts for the group.
|
||||
Note that all bufferevents in the group do not necessarily need to share
|
||||
this event_base.
|
||||
@param cfg The rate-limit for this group.
|
||||
|
||||
Note that all rate-limits hare are currently best-effort: future versions
|
||||
of Libevent may implement them more tightly.
|
||||
|
||||
Note also that only some bufferevent types currently respect rate-limiting.
|
||||
They are: socket-based bufferevents (normal and IOCP-based), and SSL-based
|
||||
bufferevents.
|
||||
*/
|
||||
struct bufferevent_rate_limit_group *bufferevent_rate_limit_group_new(
|
||||
struct event_base *base,
|
||||
const struct ev_token_bucket_cfg *cfg);
|
||||
/*XXX we need a bufferevent_rate_limit_group_set_cfg */
|
||||
|
||||
/**
|
||||
Add 'bev' to the list of bufferevents whose aggregate reading and writing
|
||||
is restricted by 'g'. If 'g' is NULL, remove 'bev' from its current group.
|
||||
|
||||
A bufferevent may belong to no more than one rate-limit group at a time.
|
||||
If 'bev' is already a member of a group, it will be removed from its old
|
||||
group before being added to 'g'.
|
||||
|
||||
Return 0 on success and -1 on failure.
|
||||
*/
|
||||
int bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
|
||||
struct bufferevent_rate_limit_group *g);
|
||||
|
||||
/** Remove 'bev' from its current rate-limit group (if any). */
|
||||
int bufferevent_remove_from_rate_limit_group(struct bufferevent *bev);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
@ -671,6 +671,20 @@ void event_set_mem_functions(void *(*malloc_fn)(size_t sz),
|
||||
|
||||
void event_base_dump_events(struct event_base *, FILE *);
|
||||
|
||||
/** Sets 'tv' to the current time (as returned by gettimeofday()),
|
||||
looking at the cached value in 'base' if possible, and calling
|
||||
gettimeofday() or clock_gettime() as appropriate if there is no
|
||||
cached time.
|
||||
|
||||
Generally, this value will only be cached while actually
|
||||
processing event callbacks, and may be very inaccuate if your
|
||||
callbacks take a long time to execute.
|
||||
|
||||
Returns 0 on success, negative on failure.
|
||||
*/
|
||||
int event_base_gettimeofday_cached(struct event_base *base,
|
||||
struct timeval *tv);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
@ -93,12 +93,16 @@ extern "C" {
|
||||
|
||||
#ifdef _EVENT_HAVE_UINT32_T
|
||||
#define ev_uint32_t uint32_t
|
||||
#define ev_int32_t int32_t
|
||||
#elif defined(WIN32)
|
||||
#define ev_uint32_t unsigned int
|
||||
#define ev_int32_t signed int
|
||||
#elif _EVENT_SIZEOF_LONG == 4
|
||||
#define ev_uint32_t unsigned long
|
||||
#define ev_int32_t signed long
|
||||
#elif _EVENT_SIZEOF_INT == 4
|
||||
#define ev_uint32_t unsigned int
|
||||
#define ev_int32_t signed int
|
||||
#else
|
||||
#error "No way to define ev_uint32_t"
|
||||
#endif
|
||||
|
102
ratelim-internal.h
Normal file
102
ratelim-internal.h
Normal file
@ -0,0 +1,102 @@
|
||||
/*
|
||||
* Copyright (c) 2009 Niels Provos and Nick Mathewson
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions
|
||||
* are met:
|
||||
* 1. Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* 2. Redistributions in binary form must reproduce the above copyright
|
||||
* notice, this list of conditions and the following disclaimer in the
|
||||
* documentation and/or other materials provided with the distribution.
|
||||
* 3. The name of the author may not be used to endorse or promote products
|
||||
* derived from this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
|
||||
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
|
||||
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
|
||||
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
|
||||
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
|
||||
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
|
||||
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
#ifndef _RATELIM_INTERNAL_H_
|
||||
#define _RATELIM_INTERNAL_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <event2/util.h>
|
||||
|
||||
/** A token bucket is an internal structure that tracks how many bytes we are
|
||||
* currently willing to read or write on a given bufferevent or group of
|
||||
* bufferevents */
|
||||
struct ev_token_bucket {
|
||||
/** How many bytes are we willing to read or write right now? These
|
||||
* values are signed so that we can do "defecit spending" */
|
||||
ev_int32_t read_limit, write_limit;
|
||||
/** When was this bucket last updated? Measured in abstract 'ticks'
|
||||
* relative to the token bucket configuration. */
|
||||
ev_uint32_t last_updated;
|
||||
};
|
||||
|
||||
/** Configuration info for a token bucket or set of token buckets. */
|
||||
struct ev_token_bucket_cfg {
|
||||
/** How many bytes are we willing to read on average per tick? */
|
||||
ev_uint32_t read_rate;
|
||||
/** How many bytes are we willing to read at most in any one tick? */
|
||||
ev_uint32_t read_maximum;
|
||||
/** How many bytes are we willing to write on average per tick? */
|
||||
ev_uint32_t write_rate;
|
||||
/** How many bytes are we willing to write at most in any one tick? */
|
||||
ev_uint32_t write_maximum;
|
||||
|
||||
/* How long is a tick? Note that fractions of a millisecond are
|
||||
* ignored. */
|
||||
struct timeval tick_timeout;
|
||||
|
||||
/* How long is a tick, in milliseconds? Derived from tick_timeout. */
|
||||
unsigned msec_per_tick;
|
||||
};
|
||||
|
||||
/** The current tick is 'current_tick': add bytes to 'bucket' as specified in
|
||||
* 'cfg'. */
|
||||
int ev_token_bucket_update(struct ev_token_bucket *bucket,
|
||||
const struct ev_token_bucket_cfg *cfg,
|
||||
ev_uint32_t current_tick);
|
||||
|
||||
/** In which tick does 'tv' fall according to 'cfg'? Note that ticks can
|
||||
* overflow easily; your code needs to handle this. */
|
||||
ev_uint32_t ev_token_bucket_get_tick(const struct timeval *tv,
|
||||
const struct ev_token_bucket_cfg *cfg);
|
||||
|
||||
/** Adjust 'bucket' to respect 'cfg', and note that it was last updated in
|
||||
* 'current_tick'. If 'reinitialize' is true, we are changing the
|
||||
* configuration of 'bucket'; otherwise, we are setting it up for the first
|
||||
* time.
|
||||
*/
|
||||
int ev_token_bucket_init(struct ev_token_bucket *bucket,
|
||||
const struct ev_token_bucket_cfg *cfg,
|
||||
ev_uint32_t current_tick,
|
||||
int reinitialize);
|
||||
|
||||
/** Decrease the read limit of 'b' by 'n' bytes */
|
||||
#define ev_token_bucket_decrement_read(b,n) \
|
||||
do { \
|
||||
(b)->read_limit -= (n); \
|
||||
} while (0)
|
||||
/** Decrease the write limit of 'b' by 'n' bytes */
|
||||
#define ev_token_bucket_decrement_write(b,n) \
|
||||
do { \
|
||||
(b)->write_limit -= (n); \
|
||||
} while (0)
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
@ -5,7 +5,7 @@ AM_CFLAGS = -I$(top_srcdir) -I$(top_srcdir)/compat -I$(top_srcdir)/include
|
||||
EXTRA_DIST = regress.rpc regress.gen.h regress.gen.c
|
||||
|
||||
noinst_PROGRAMS = test-init test-eof test-weof test-time regress \
|
||||
bench bench_cascade bench_http bench_httpclient
|
||||
bench bench_cascade bench_http bench_httpclient test-ratelim
|
||||
noinst_HEADERS = tinytest.h tinytest_macros.h regress.h
|
||||
|
||||
BUILT_SOURCES = regress.gen.c regress.gen.h
|
||||
@ -17,6 +17,8 @@ test_weof_SOURCES = test-weof.c
|
||||
test_weof_LDADD = ../libevent_core.la
|
||||
test_time_SOURCES = test-time.c
|
||||
test_time_LDADD = ../libevent_core.la
|
||||
test_ratelim_SOURCES = test-ratelim.c
|
||||
test_ratelim_LDADD = ../libevent_core.la -lm
|
||||
|
||||
regress_SOURCES = regress.c regress_buffer.c regress_http.c regress_dns.c \
|
||||
regress_rpc.c regress.gen.c regress.gen.h regress_et.c \
|
||||
|
348
test/test-ratelim.c
Normal file
348
test/test-ratelim.c
Normal file
@ -0,0 +1,348 @@
|
||||
/*
|
||||
* Copyright (c) 2009 Niels Provos and Nick Mathewson
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions
|
||||
* are met:
|
||||
* 1. Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* 2. Redistributions in binary form must reproduce the above copyright
|
||||
* notice, this list of conditions and the following disclaimer in the
|
||||
* documentation and/or other materials provided with the distribution.
|
||||
* 3. The name of the author may not be used to endorse or promote products
|
||||
* derived from this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
|
||||
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
|
||||
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
|
||||
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
|
||||
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
|
||||
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
|
||||
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <assert.h>
|
||||
#include <math.h>
|
||||
|
||||
#ifdef WIN32
|
||||
#include <winsock2.h>
|
||||
#include <ws2tcpip.h>
|
||||
#endif
|
||||
|
||||
#include <event2/bufferevent.h>
|
||||
#include <event2/buffer.h>
|
||||
#include <event2/event.h>
|
||||
#include <event2/util.h>
|
||||
#include <event2/listener.h>
|
||||
#include <event2/thread.h>
|
||||
|
||||
static int cfg_verbose = 0;
|
||||
static int cfg_help = 0;
|
||||
|
||||
static int cfg_n_connections = 30;
|
||||
static int cfg_duration = 5;
|
||||
static int cfg_connlimit = 0;
|
||||
static int cfg_grouplimit = 0;
|
||||
static int cfg_tick_msec = 1000;
|
||||
|
||||
static struct timeval cfg_tick = { 0, 500*1000 };
|
||||
|
||||
static struct ev_token_bucket_cfg *conn_bucket_cfg = NULL;
|
||||
static struct ev_token_bucket_cfg *group_bucket_cfg = NULL;
|
||||
struct bufferevent_rate_limit_group *ratelim_group = NULL;
|
||||
|
||||
struct client_state {
|
||||
size_t queued;
|
||||
ev_uint64_t received;
|
||||
};
|
||||
|
||||
static void
|
||||
loud_writecb(struct bufferevent *bev, void *ctx)
|
||||
{
|
||||
struct client_state *cs = ctx;
|
||||
struct evbuffer *output = bufferevent_get_output(bev);
|
||||
char buf[1024];
|
||||
#ifdef WIN32
|
||||
int r = rand() % 256;
|
||||
#else
|
||||
int r = random() % 256;
|
||||
#endif
|
||||
memset(buf, r, sizeof(buf));
|
||||
while (evbuffer_get_length(output) < 8192) {
|
||||
evbuffer_add(output, buf, sizeof(buf));
|
||||
cs->queued += sizeof(buf);
|
||||
// printf("queued %d\n", (int)sizeof(buf));
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
discard_readcb(struct bufferevent *bev, void *ctx)
|
||||
{
|
||||
struct client_state *cs = ctx;
|
||||
struct evbuffer *input = bufferevent_get_input(bev);
|
||||
size_t len = evbuffer_get_length(input);
|
||||
evbuffer_drain(input, len);
|
||||
cs->received += len;
|
||||
// printf("read %d bytes\n", (int)len);
|
||||
}
|
||||
|
||||
static void
|
||||
write_on_connectedcb(struct bufferevent *bev, short what, void *ctx)
|
||||
{
|
||||
if (what & BEV_EVENT_CONNECTED) {
|
||||
loud_writecb(bev, ctx);
|
||||
/* XXXX this shouldn't be needed. */
|
||||
bufferevent_enable(bev, EV_READ|EV_WRITE);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
echo_readcb(struct bufferevent *bev, void *ctx)
|
||||
{
|
||||
struct evbuffer *input = bufferevent_get_input(bev);
|
||||
struct evbuffer *output = bufferevent_get_output(bev);
|
||||
|
||||
// puts("read.");
|
||||
evbuffer_add_buffer(output, input);
|
||||
// printf(" outbuf len is now %d\n", (int)evbuffer_get_length(output));
|
||||
if (evbuffer_get_length(output) > 1024000)
|
||||
bufferevent_disable(bev, EV_READ);
|
||||
}
|
||||
|
||||
static void
|
||||
echo_listenercb(struct evconnlistener *listener, evutil_socket_t newsock,
|
||||
struct sockaddr *sourceaddr, int socklen, void *ctx)
|
||||
{
|
||||
struct event_base *base = ctx;
|
||||
int flags = BEV_OPT_CLOSE_ON_FREE|BEV_OPT_THREADSAFE;
|
||||
struct bufferevent *bev;
|
||||
|
||||
bev = bufferevent_socket_new(base, newsock, flags);
|
||||
bufferevent_setcb(bev, echo_readcb, NULL, NULL, NULL);
|
||||
if (conn_bucket_cfg)
|
||||
bufferevent_set_rate_limit(bev, conn_bucket_cfg);
|
||||
if (ratelim_group)
|
||||
bufferevent_add_to_rate_limit_group(bev, ratelim_group);
|
||||
bufferevent_enable(bev, EV_READ|EV_WRITE);
|
||||
}
|
||||
|
||||
static void
|
||||
test_ratelimiting(void)
|
||||
{
|
||||
struct event_base *base;
|
||||
struct sockaddr_in sin;
|
||||
struct evconnlistener *listener;
|
||||
|
||||
struct sockaddr_storage ss;
|
||||
ev_socklen_t slen;
|
||||
|
||||
struct bufferevent **bevs;
|
||||
struct client_state *states;
|
||||
|
||||
int i;
|
||||
|
||||
struct timeval tv;
|
||||
|
||||
ev_uint64_t total_received;
|
||||
double total_sq_persec, total_persec;
|
||||
double variance;
|
||||
|
||||
memset(&sin, 0, sizeof(sin));
|
||||
sin.sin_family = AF_INET;
|
||||
sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
|
||||
sin.sin_port = 0; /* unspecified port */
|
||||
|
||||
base = event_base_new();
|
||||
listener = evconnlistener_new_bind(base, echo_listenercb, base,
|
||||
LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, -1,
|
||||
(struct sockaddr *)&sin, sizeof(sin));
|
||||
|
||||
slen = sizeof(ss);
|
||||
if (getsockname(evconnlistener_get_fd(listener), (struct sockaddr *)&ss,
|
||||
&slen) < 0) {
|
||||
perror("getsockname");
|
||||
return;
|
||||
}
|
||||
|
||||
if (cfg_connlimit > 0) {
|
||||
conn_bucket_cfg = ev_token_bucket_cfg_new(
|
||||
cfg_connlimit, cfg_connlimit * 4,
|
||||
cfg_connlimit, cfg_connlimit * 4,
|
||||
&cfg_tick);
|
||||
assert(conn_bucket_cfg);
|
||||
}
|
||||
|
||||
if (cfg_grouplimit > 0) {
|
||||
group_bucket_cfg = ev_token_bucket_cfg_new(
|
||||
cfg_grouplimit, cfg_grouplimit * 4,
|
||||
cfg_grouplimit, cfg_grouplimit * 4,
|
||||
&cfg_tick);
|
||||
ratelim_group = bufferevent_rate_limit_group_new(
|
||||
base, group_bucket_cfg);
|
||||
};
|
||||
|
||||
bevs = calloc(cfg_n_connections, sizeof(struct bufferevent *));
|
||||
states = calloc(cfg_n_connections, sizeof(struct client_state));
|
||||
|
||||
for (i = 0; i < cfg_n_connections; ++i) {
|
||||
// printf("creating %d:\n",i);
|
||||
bevs[i] = bufferevent_socket_new(base, -1,
|
||||
BEV_OPT_CLOSE_ON_FREE|BEV_OPT_THREADSAFE);
|
||||
assert(bevs[i]);
|
||||
bufferevent_setcb(bevs[i], discard_readcb, loud_writecb,
|
||||
write_on_connectedcb, &states[i]);
|
||||
bufferevent_enable(bevs[i], EV_READ|EV_WRITE);
|
||||
bufferevent_socket_connect(bevs[i], (struct sockaddr *)&ss,
|
||||
slen);
|
||||
}
|
||||
|
||||
tv.tv_sec = cfg_duration;
|
||||
tv.tv_usec = 0;
|
||||
|
||||
event_base_loopexit(base, &tv);
|
||||
|
||||
event_base_dispatch(base);
|
||||
|
||||
total_received = 0;
|
||||
total_persec = 0.0;
|
||||
total_sq_persec = 0.0;
|
||||
for (i=0; i < cfg_n_connections; ++i) {
|
||||
double persec = states[i].received;
|
||||
persec /= cfg_duration;
|
||||
total_received += states[i].received;
|
||||
total_persec += persec;
|
||||
total_sq_persec += persec*persec;
|
||||
printf("%d: %lf per second\n", i, persec);
|
||||
}
|
||||
printf(" total: %lf per second\n",
|
||||
((double)total_received)/cfg_duration);
|
||||
printf(" average: %lf per second\n",
|
||||
(((double)total_received)/cfg_duration)/cfg_n_connections);
|
||||
|
||||
variance = total_sq_persec/cfg_n_connections - total_persec*total_persec/(cfg_n_connections*cfg_n_connections);
|
||||
|
||||
printf(" stddev: %lf per second\n", sqrt(variance));
|
||||
}
|
||||
|
||||
static struct option {
|
||||
const char *name; int *ptr; int min; int isbool;
|
||||
} options[] = {
|
||||
{ "-v", &cfg_verbose, 0, 1 },
|
||||
{ "-h", &cfg_help, 0, 1 },
|
||||
{ "-n", &cfg_n_connections, 1, 0 },
|
||||
{ "-d", &cfg_duration, 1, 0 },
|
||||
{ "-c", &cfg_connlimit, 0, 0 },
|
||||
{ "-g", &cfg_grouplimit, 0, 0 },
|
||||
{ "-t", &cfg_tick_msec, 10, 0 },
|
||||
{ NULL, NULL, -1, 0 },
|
||||
};
|
||||
|
||||
static int
|
||||
handle_option(int argc, char **argv, int *i, const struct option *opt)
|
||||
{
|
||||
long val;
|
||||
char *endptr = NULL;
|
||||
if (opt->isbool) {
|
||||
*opt->ptr = 1;
|
||||
return 0;
|
||||
}
|
||||
if (*i + 1 == argc) {
|
||||
fprintf(stderr, "Too few arguments to '%s'\n",argv[*i]);
|
||||
return -1;
|
||||
}
|
||||
val = strtol(argv[*i+1], &endptr, 10);
|
||||
if (*argv[*i+1] == '\0' || !endptr || *endptr != '\0') {
|
||||
fprintf(stderr, "Couldn't parse numeric value '%s'\n",
|
||||
argv[*i+1]);
|
||||
return -1;
|
||||
}
|
||||
if (val < opt->min || val > 0x7fffffff) {
|
||||
fprintf(stderr, "Value '%s' is out-of-range'\n",
|
||||
argv[*i+1]);
|
||||
return -1;
|
||||
}
|
||||
*opt->ptr = (int)val;
|
||||
++*i;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
usage(void)
|
||||
{
|
||||
fprintf(stderr,
|
||||
"test-ratelim [-v] [-n INT] [-d INT] [-c INT] [-g INT] [-t INT]\n\n"
|
||||
"Pushes bytes through a number of possibly rate-limited connections, and\n"
|
||||
"displays average throughput.\n\n"
|
||||
" -n INT: Number of connections to open (default: 30)\n"
|
||||
" -d INT: Duration of the test in seconds (default: 5 sec)\n"
|
||||
" -c INT: Connection-rate limit applied to each connection in bytes per second\n"
|
||||
" (default: None.)\n"
|
||||
" -g INT: Group-rate limit applied to sum of all usage in bytes per second\n"
|
||||
" (default: None.)\n"
|
||||
" -t INT: Granularity of timing, in milliseconds (default: 1000 msec)\n");
|
||||
}
|
||||
|
||||
int
|
||||
main(int argc, char **argv)
|
||||
{
|
||||
int i,j;
|
||||
double ratio;
|
||||
|
||||
#ifdef WIN32
|
||||
WORD wVersionRequested = MAKEWORD(2,2);
|
||||
WSADATA wsaData;
|
||||
int err;
|
||||
|
||||
err = WSAStartup(wVersionRequested, &wsaData);
|
||||
#endif
|
||||
|
||||
|
||||
for (i = 1; i < argc; ++i) {
|
||||
for (j = 0; options[j].name; ++j) {
|
||||
if (!strcmp(argv[i],options[j].name)) {
|
||||
if (handle_option(argc,argv,&i,&options[j])<0)
|
||||
return 1;
|
||||
goto again;
|
||||
}
|
||||
}
|
||||
fprintf(stderr, "Unknown option '%s'\n", argv[i]);
|
||||
usage();
|
||||
return 1;
|
||||
again:
|
||||
;
|
||||
}
|
||||
if (cfg_help) {
|
||||
usage();
|
||||
return 0;
|
||||
}
|
||||
|
||||
cfg_tick.tv_sec = cfg_tick_msec / 1000;
|
||||
cfg_tick.tv_usec = (cfg_tick_msec % 1000)*1000;
|
||||
|
||||
ratio = cfg_tick_msec / 1000.0;
|
||||
|
||||
cfg_connlimit *= ratio;
|
||||
cfg_grouplimit *= ratio;
|
||||
|
||||
{
|
||||
struct timeval tv;
|
||||
evutil_gettimeofday(&tv, NULL);
|
||||
#ifdef WIN32
|
||||
srand(tv.tv_usec);
|
||||
#else
|
||||
srandom(tv.tv_usec);
|
||||
#endif
|
||||
}
|
||||
|
||||
evthread_enable_lock_debuging();
|
||||
|
||||
test_ratelimiting();
|
||||
|
||||
return 0;
|
||||
}
|
@ -146,6 +146,8 @@ int evutil_resolve(int family, const char *hostname, struct sockaddr *sa,
|
||||
|
||||
const char *evutil_getenv(const char *name);
|
||||
|
||||
long _evutil_weakrand(void);
|
||||
|
||||
/* Evaluates to the same boolean value as 'p', and hints to the compiler that
|
||||
* we expect this value to be false. */
|
||||
#ifdef __GNUC__X
|
||||
|
Loading…
x
Reference in New Issue
Block a user