Merge remote-tracking branch 'origin/patches-2.0'

This commit is contained in:
Nick Mathewson 2011-08-28 14:51:36 -04:00
commit 904254f975
8 changed files with 86 additions and 14 deletions

View File

@ -198,7 +198,8 @@ struct bufferevent_private {
enum bufferevent_ctrl_op {
BEV_CTRL_SET_FD,
BEV_CTRL_GET_FD,
BEV_CTRL_GET_UNDERLYING
BEV_CTRL_GET_UNDERLYING,
BEV_CTRL_CANCEL_ALL
};
/** Possible data types for a control callback */

View File

@ -60,6 +60,9 @@
#include "evbuffer-internal.h"
#include "util-internal.h"
static void _bufferevent_cancel_all(struct bufferevent *bev);
void
bufferevent_suspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
@ -675,6 +678,7 @@ bufferevent_free(struct bufferevent *bufev)
{
BEV_LOCK(bufev);
bufferevent_setcb(bufev, NULL, NULL, NULL, NULL);
_bufferevent_cancel_all(bufev);
_bufferevent_decref_and_unlock(bufev);
}
@ -751,6 +755,17 @@ bufferevent_getfd(struct bufferevent *bev)
return (res<0) ? -1 : d.fd;
}
static void
_bufferevent_cancel_all(struct bufferevent *bev)
{
union bufferevent_ctrl_data d;
memset(&d, 0, sizeof(d));
BEV_LOCK(bev);
if (bev->be_ops->ctrl)
bev->be_ops->ctrl(bev, BEV_CTRL_CANCEL_ALL, &d);
BEV_UNLOCK(bev);
}
short
bufferevent_get_enabled(struct bufferevent *bufev)
{

View File

@ -81,8 +81,8 @@ struct bufferevent_async {
struct event_overlapped connect_overlapped;
struct event_overlapped read_overlapped;
struct event_overlapped write_overlapped;
unsigned read_in_progress : 1;
unsigned write_in_progress : 1;
size_t read_in_progress;
size_t write_in_progress;
unsigned ok : 1;
unsigned read_added : 1;
unsigned write_added : 1;
@ -199,7 +199,6 @@ bev_async_consider_writing(struct bufferevent_async *beva)
at_most = evbuffer_get_length(bev->output);
/* XXXX This over-commits. */
/* This is safe so long as bufferevent_get_write_max never returns
* more than INT_MAX. That's true for now. XXXX */
limit = (int)_bufferevent_get_write_max(&beva->bev);
@ -219,7 +218,8 @@ bev_async_consider_writing(struct bufferevent_async *beva)
beva->ok = 0;
_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
} else {
beva->write_in_progress = 1;
beva->write_in_progress = at_most;
_bufferevent_decrement_write_buckets(&beva->bev, at_most);
bev_async_add_write(beva);
}
}
@ -269,10 +269,11 @@ bev_async_consider_reading(struct bufferevent_async *beva)
bufferevent_incref(bev);
if (evbuffer_launch_read(bev->input, at_most, &beva->read_overlapped)) {
beva->ok = 0;
bufferevent_decref(bev);
_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
bufferevent_decref(bev);
} else {
beva->read_in_progress = 1;
beva->read_in_progress = at_most;
_bufferevent_decrement_read_buckets(&beva->bev, at_most);
bev_async_add_read(beva);
}
@ -379,8 +380,10 @@ be_async_destruct(struct bufferevent *bev)
bev_async_del_write(bev_async);
fd = _evbuffer_overlapped_get_fd(bev->input);
if (bev_p->options & BEV_OPT_CLOSE_ON_FREE)
if (bev_p->options & BEV_OPT_CLOSE_ON_FREE) {
/* XXXX possible double-close */
evutil_closesocket(fd);
}
/* delete this in case non-blocking connect was used */
if (event_initialized(&bev->ev_write)) {
event_del(&bev->ev_write);
@ -443,12 +446,15 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key,
struct bufferevent_async *bev_a = upcast_read(eo);
struct bufferevent *bev = &bev_a->bev.bev;
short what = BEV_EVENT_READING;
ev_ssize_t amount_unread;
BEV_LOCK(bev);
EVUTIL_ASSERT(bev_a->read_in_progress);
amount_unread = bev_a->read_in_progress - nbytes;
evbuffer_commit_read(bev->input, nbytes);
bev_a->read_in_progress = 0;
if (amount_unread)
_bufferevent_decrement_read_buckets(&bev_a->bev, -amount_unread);
if (!ok)
bev_async_set_wsa_error(bev, eo);
@ -456,8 +462,6 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key,
if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_READ_TIMEOUT(bev);
_bufferevent_decrement_read_buckets(&bev_a->bev,
nbytes);
if (evbuffer_get_length(bev->input) >= bev->wm_read.low)
_bufferevent_run_readcb(bev);
bev_async_consider_reading(bev_a);
@ -482,20 +486,26 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key,
struct bufferevent_async *bev_a = upcast_write(eo);
struct bufferevent *bev = &bev_a->bev.bev;
short what = BEV_EVENT_WRITING;
ev_ssize_t amount_unwritten;
BEV_LOCK(bev);
EVUTIL_ASSERT(bev_a->write_in_progress);
amount_unwritten = bev_a->write_in_progress - nbytes;
evbuffer_commit_write(bev->output, nbytes);
bev_a->write_in_progress = 0;
if (amount_unwritten)
_bufferevent_decrement_write_buckets(&bev_a->bev,
-amount_unwritten);
if (!ok)
bev_async_set_wsa_error(bev, eo);
if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
_bufferevent_decrement_write_buckets(&bev_a->bev,
nbytes);
if (evbuffer_get_length(bev->output) <=
bev->wm_write.low)
_bufferevent_run_writecb(bev);
@ -662,8 +672,20 @@ be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
_evbuffer_overlapped_set_fd(bev->output, data->fd);
return 0;
}
case BEV_CTRL_CANCEL_ALL: {
struct bufferevent_async *bev_a = upcast(bev);
evutil_socket_t fd = _evbuffer_overlapped_get_fd(bev->input);
if (fd != (evutil_socket_t)INVALID_SOCKET &&
(bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
closesocket(fd);
}
bev_a->ok = 0;
return 0;
}
case BEV_CTRL_GET_UNDERLYING:
default:
return -1;
}
}

View File

@ -506,6 +506,7 @@ be_filter_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
return 0;
case BEV_CTRL_GET_FD:
case BEV_CTRL_SET_FD:
case BEV_CTRL_CANCEL_ALL:
default:
return -1;
}

View File

@ -1168,6 +1168,7 @@ be_openssl_ctrl(struct bufferevent *bev,
return -1;
data->ptr = bev_ssl->underlying;
return 0;
case BEV_CTRL_CANCEL_ALL:
default:
return -1;
}

View File

@ -190,6 +190,8 @@ ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g);
static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g);
static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g);
static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g);
/** Helper: figure out the maximum amount we should write if is_write, or
the maximum amount we should read if is_read. Return that maximum, or
@ -286,6 +288,10 @@ _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t
if (event_add(&bev->rate_limiting->refill_bucket_event,
&bev->rate_limiting->cfg->tick_timeout) < 0)
r = -1;
} else if (bev->read_suspended & BEV_SUSPEND_BW) {
if (!(bev->write_suspended & BEV_SUSPEND_BW))
event_del(&bev->rate_limiting->refill_bucket_event);
bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
}
}
@ -295,6 +301,8 @@ _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t
bev->rate_limiting->group->total_read += bytes;
if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
_bev_group_suspend_reading(bev->rate_limiting->group);
} else if (bev->rate_limiting->group->read_suspended) {
_bev_group_unsuspend_reading(bev->rate_limiting->group);
}
UNLOCK_GROUP(bev->rate_limiting->group);
}
@ -318,6 +326,10 @@ _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t
if (event_add(&bev->rate_limiting->refill_bucket_event,
&bev->rate_limiting->cfg->tick_timeout) < 0)
r = -1;
} else if (bev->write_suspended & BEV_SUSPEND_BW) {
if (!(bev->read_suspended & BEV_SUSPEND_BW))
event_del(&bev->rate_limiting->refill_bucket_event);
bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
}
}
@ -327,6 +339,8 @@ _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t
bev->rate_limiting->group->total_written += bytes;
if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
_bev_group_suspend_writing(bev->rate_limiting->group);
} else if (bev->rate_limiting->group->write_suspended) {
_bev_group_unsuspend_writing(bev->rate_limiting->group);
}
UNLOCK_GROUP(bev->rate_limiting->group);
}

View File

@ -686,6 +686,7 @@ be_socket_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
data->fd = event_get_fd(&bev->ev_read);
return 0;
case BEV_CTRL_GET_UNDERLYING:
case BEV_CTRL_CANCEL_ALL:
default:
return -1;
}

View File

@ -65,6 +65,10 @@ static int cfg_connlimit_tolerance = -1;
static int cfg_grouplimit_tolerance = -1;
static int cfg_stddev_tolerance = -1;
#ifdef _WIN32
static int cfg_enable_iocp = 0;
#endif
static struct timeval cfg_tick = { 0, 500*1000 };
static struct ev_token_bucket_cfg *conn_bucket_cfg = NULL;
@ -186,6 +190,7 @@ test_ratelimiting(void)
double variance;
double expected_total_persec = -1.0, expected_avg_persec = -1.0;
int ok = 1;
struct event_config *base_cfg;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
@ -195,7 +200,16 @@ test_ratelimiting(void)
if (0)
event_enable_debug_mode();
base = event_base_new();
base_cfg = event_config_new();
#ifdef _WIN32
if (cfg_enable_iocp) {
evthread_use_windows_threads();
event_config_set_flag(base_cfg, EVENT_BASE_FLAG_STARTUP_IOCP);
}
#endif
base = event_base_new_with_config(base_cfg);
listener = evconnlistener_new_bind(base, echo_listenercb, base,
LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, -1,
@ -349,6 +363,9 @@ static struct option {
{ "--check-connlimit", &cfg_connlimit_tolerance, 0, 0 },
{ "--check-grouplimit", &cfg_grouplimit_tolerance, 0, 0 },
{ "--check-stddev", &cfg_stddev_tolerance, 0, 0 },
#ifdef _WIN32
{ "--iocp", &cfg_enable_iocp, 0, 1 },
#endif
{ NULL, NULL, -1, 0 },
};