From c75341b077d14f6f9f76b304295a10dafec2a069 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Wed, 24 Aug 2011 18:42:00 -0400 Subject: [PATCH 1/4] Support negative arguments to _bufferevent_decrement_(read/write)_buckets() --- bufferevent_ratelim.c | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/bufferevent_ratelim.c b/bufferevent_ratelim.c index 31d385b5..a2205f78 100644 --- a/bufferevent_ratelim.c +++ b/bufferevent_ratelim.c @@ -189,6 +189,8 @@ ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg) static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g); static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g); +static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g); +static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g); /** Helper: figure out the maximum amount we should write if is_write, or the maximum amount we should read if is_read. Return that maximum, or @@ -285,6 +287,10 @@ _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t if (event_add(&bev->rate_limiting->refill_bucket_event, &bev->rate_limiting->cfg->tick_timeout) < 0) r = -1; + } else if (bev->read_suspended & BEV_SUSPEND_BW) { + if (!(bev->write_suspended & BEV_SUSPEND_BW)) + event_del(&bev->rate_limiting->refill_bucket_event); + bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW); } } @@ -294,6 +300,8 @@ _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bev->rate_limiting->group->total_read += bytes; if (bev->rate_limiting->group->rate_limit.read_limit <= 0) { _bev_group_suspend_reading(bev->rate_limiting->group); + } else if (bev->rate_limiting->group->read_suspended) { + _bev_group_unsuspend_reading(bev->rate_limiting->group); } UNLOCK_GROUP(bev->rate_limiting->group); } @@ -317,6 +325,10 @@ _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t if (event_add(&bev->rate_limiting->refill_bucket_event, &bev->rate_limiting->cfg->tick_timeout) < 0) r = -1; + } else if (bev->write_suspended & BEV_SUSPEND_BW) { + if (!(bev->read_suspended & BEV_SUSPEND_BW)) + event_del(&bev->rate_limiting->refill_bucket_event); + bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW); } } @@ -326,6 +338,8 @@ _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bev->rate_limiting->group->total_written += bytes; if (bev->rate_limiting->group->rate_limit.write_limit <= 0) { _bev_group_suspend_writing(bev->rate_limiting->group); + } else if (bev->rate_limiting->group->write_suspended) { + _bev_group_unsuspend_writing(bev->rate_limiting->group); } UNLOCK_GROUP(bev->rate_limiting->group); } From a98da7bfc9148fd4c51d46320f51a6d28eed3463 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Wed, 24 Aug 2011 18:41:35 -0400 Subject: [PATCH 2/4] Make IOCP rate-limiting group support stricter and less surprising. Previously, we wouldn't decrement read/write buckets because of IOCP reads and writes until those reads and writes were complete. That's not so bad on the per-connection front. But for group limits, the old approach makes us launch a huge amount of reads and writes whenever the group limit becomes positive, and then decrement the limit to a hugely negative number as they complete. With this patch, we decrement our read buckets whenever we launch an IOCP read or write, based on the maximum that tried to read or write. Later, when the operations finish, we re-increment the bucket based on the portion of the request that couldn't finish. --- bufferevent_async.c | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/bufferevent_async.c b/bufferevent_async.c index 87ba404c..9416e31f 100644 --- a/bufferevent_async.c +++ b/bufferevent_async.c @@ -80,8 +80,8 @@ struct bufferevent_async { struct event_overlapped connect_overlapped; struct event_overlapped read_overlapped; struct event_overlapped write_overlapped; - unsigned read_in_progress : 1; - unsigned write_in_progress : 1; + size_t read_in_progress; + size_t write_in_progress; unsigned ok : 1; unsigned read_added : 1; unsigned write_added : 1; @@ -198,7 +198,6 @@ bev_async_consider_writing(struct bufferevent_async *beva) at_most = evbuffer_get_length(bev->output); - /* XXXX This over-commits. */ /* This is safe so long as bufferevent_get_write_max never returns * more than INT_MAX. That's true for now. XXXX */ limit = (int)_bufferevent_get_write_max(&beva->bev); @@ -218,7 +217,8 @@ bev_async_consider_writing(struct bufferevent_async *beva) beva->ok = 0; _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); } else { - beva->write_in_progress = 1; + beva->write_in_progress = at_most; + _bufferevent_decrement_write_buckets(&beva->bev, at_most); bev_async_add_write(beva); } } @@ -271,7 +271,8 @@ bev_async_consider_reading(struct bufferevent_async *beva) bufferevent_decref(bev); _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); } else { - beva->read_in_progress = 1; + beva->read_in_progress = at_most; + _bufferevent_decrement_read_buckets(&beva->bev, at_most); bev_async_add_read(beva); } @@ -442,12 +443,15 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key, struct bufferevent_async *bev_a = upcast_read(eo); struct bufferevent *bev = &bev_a->bev.bev; short what = BEV_EVENT_READING; - + ev_ssize_t amount_unread; BEV_LOCK(bev); EVUTIL_ASSERT(bev_a->read_in_progress); + amount_unread = bev_a->read_in_progress - nbytes; evbuffer_commit_read(bev->input, nbytes); bev_a->read_in_progress = 0; + if (amount_unread) + _bufferevent_decrement_read_buckets(&bev_a->bev, -amount_unread); if (!ok) bev_async_set_wsa_error(bev, eo); @@ -455,8 +459,6 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key, if (bev_a->ok) { if (ok && nbytes) { BEV_RESET_GENERIC_READ_TIMEOUT(bev); - _bufferevent_decrement_read_buckets(&bev_a->bev, - nbytes); if (evbuffer_get_length(bev->input) >= bev->wm_read.low) _bufferevent_run_readcb(bev); bev_async_consider_reading(bev_a); @@ -481,20 +483,26 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key, struct bufferevent_async *bev_a = upcast_write(eo); struct bufferevent *bev = &bev_a->bev.bev; short what = BEV_EVENT_WRITING; + ev_ssize_t amount_unwritten; BEV_LOCK(bev); EVUTIL_ASSERT(bev_a->write_in_progress); + + amount_unwritten = bev_a->write_in_progress - nbytes; evbuffer_commit_write(bev->output, nbytes); bev_a->write_in_progress = 0; + if (amount_unwritten) + _bufferevent_decrement_write_buckets(&bev_a->bev, + -amount_unwritten); + + if (!ok) bev_async_set_wsa_error(bev, eo); if (bev_a->ok) { if (ok && nbytes) { BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); - _bufferevent_decrement_write_buckets(&bev_a->bev, - nbytes); if (evbuffer_get_length(bev->output) <= bev->wm_write.low) _bufferevent_run_writecb(bev); From 0ff2c5a92246083495ff4d672372b8afe510105b Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Wed, 24 Aug 2011 18:42:12 -0400 Subject: [PATCH 3/4] Have test-ratelim.c support IOCP --- test/test-ratelim.c | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/test/test-ratelim.c b/test/test-ratelim.c index bbbd7608..473cfc5f 100644 --- a/test/test-ratelim.c +++ b/test/test-ratelim.c @@ -65,6 +65,10 @@ static int cfg_connlimit_tolerance = -1; static int cfg_grouplimit_tolerance = -1; static int cfg_stddev_tolerance = -1; +#ifdef _WIN32 +static int cfg_enable_iocp = 0; +#endif + static struct timeval cfg_tick = { 0, 500*1000 }; static struct ev_token_bucket_cfg *conn_bucket_cfg = NULL; @@ -186,6 +190,7 @@ test_ratelimiting(void) double variance; double expected_total_persec = -1.0, expected_avg_persec = -1.0; int ok = 1; + struct event_config *base_cfg; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; @@ -195,7 +200,16 @@ test_ratelimiting(void) if (0) event_enable_debug_mode(); - base = event_base_new(); + base_cfg = event_config_new(); + +#ifdef _WIN32 + if (cfg_enable_iocp) { + evthread_use_windows_threads(); + event_config_set_flag(base_cfg, EVENT_BASE_FLAG_STARTUP_IOCP); + } +#endif + + base = event_base_new_with_config(base_cfg); listener = evconnlistener_new_bind(base, echo_listenercb, base, LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, -1, @@ -349,6 +363,9 @@ static struct option { { "--check-connlimit", &cfg_connlimit_tolerance, 0, 0 }, { "--check-grouplimit", &cfg_grouplimit_tolerance, 0, 0 }, { "--check-stddev", &cfg_stddev_tolerance, 0, 0 }, +#ifdef _WIN32 + { "--iocp", &cfg_enable_iocp, 0, 1 }, +#endif { NULL, NULL, -1, 0 }, }; From e6af35d762822e56e97cca3708ffee291c867d20 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Wed, 24 Aug 2011 21:39:28 -0400 Subject: [PATCH 4/4] Correctly terminate IO on an async bufferevent on bufferevent_free --- bufferevent-internal.h | 3 ++- bufferevent.c | 15 +++++++++++++++ bufferevent_async.c | 18 ++++++++++++++++-- bufferevent_filter.c | 1 + bufferevent_openssl.c | 1 + bufferevent_sock.c | 1 + 6 files changed, 36 insertions(+), 3 deletions(-) diff --git a/bufferevent-internal.h b/bufferevent-internal.h index dc471720..de1ff973 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -197,7 +197,8 @@ struct bufferevent_private { enum bufferevent_ctrl_op { BEV_CTRL_SET_FD, BEV_CTRL_GET_FD, - BEV_CTRL_GET_UNDERLYING + BEV_CTRL_GET_UNDERLYING, + BEV_CTRL_CANCEL_ALL }; /** Possible data types for a control callback */ diff --git a/bufferevent.c b/bufferevent.c index 9855d183..93a08015 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -59,6 +59,9 @@ #include "evbuffer-internal.h" #include "util-internal.h" +static void _bufferevent_cancel_all(struct bufferevent *bev); + + void bufferevent_suspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what) { @@ -674,6 +677,7 @@ bufferevent_free(struct bufferevent *bufev) { BEV_LOCK(bufev); bufferevent_setcb(bufev, NULL, NULL, NULL, NULL); + _bufferevent_cancel_all(bufev); _bufferevent_decref_and_unlock(bufev); } @@ -750,6 +754,17 @@ bufferevent_getfd(struct bufferevent *bev) return (res<0) ? -1 : d.fd; } +static void +_bufferevent_cancel_all(struct bufferevent *bev) +{ + union bufferevent_ctrl_data d; + memset(&d, 0, sizeof(d)); + BEV_LOCK(bev); + if (bev->be_ops->ctrl) + bev->be_ops->ctrl(bev, BEV_CTRL_CANCEL_ALL, &d); + BEV_UNLOCK(bev); +} + short bufferevent_get_enabled(struct bufferevent *bufev) { diff --git a/bufferevent_async.c b/bufferevent_async.c index 9416e31f..a3b3ab1e 100644 --- a/bufferevent_async.c +++ b/bufferevent_async.c @@ -268,8 +268,8 @@ bev_async_consider_reading(struct bufferevent_async *beva) bufferevent_incref(bev); if (evbuffer_launch_read(bev->input, at_most, &beva->read_overlapped)) { beva->ok = 0; - bufferevent_decref(bev); _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); + bufferevent_decref(bev); } else { beva->read_in_progress = at_most; _bufferevent_decrement_read_buckets(&beva->bev, at_most); @@ -379,8 +379,10 @@ be_async_destruct(struct bufferevent *bev) bev_async_del_write(bev_async); fd = _evbuffer_overlapped_get_fd(bev->input); - if (bev_p->options & BEV_OPT_CLOSE_ON_FREE) + if (bev_p->options & BEV_OPT_CLOSE_ON_FREE) { + /* XXXX possible double-close */ evutil_closesocket(fd); + } /* delete this in case non-blocking connect was used */ if (event_initialized(&bev->ev_write)) { event_del(&bev->ev_write); @@ -669,8 +671,20 @@ be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, _evbuffer_overlapped_set_fd(bev->output, data->fd); return 0; } + case BEV_CTRL_CANCEL_ALL: { + struct bufferevent_async *bev_a = upcast(bev); + evutil_socket_t fd = _evbuffer_overlapped_get_fd(bev->input); + if (fd != (evutil_socket_t)INVALID_SOCKET && + (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) { + closesocket(fd); + } + bev_a->ok = 0; + return 0; + } case BEV_CTRL_GET_UNDERLYING: default: return -1; } } + + diff --git a/bufferevent_filter.c b/bufferevent_filter.c index 7f19eb9a..234204fc 100644 --- a/bufferevent_filter.c +++ b/bufferevent_filter.c @@ -504,6 +504,7 @@ be_filter_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, return 0; case BEV_CTRL_GET_FD: case BEV_CTRL_SET_FD: + case BEV_CTRL_CANCEL_ALL: default: return -1; } diff --git a/bufferevent_openssl.c b/bufferevent_openssl.c index c5d242ea..da50c600 100644 --- a/bufferevent_openssl.c +++ b/bufferevent_openssl.c @@ -1167,6 +1167,7 @@ be_openssl_ctrl(struct bufferevent *bev, return -1; data->ptr = bev_ssl->underlying; return 0; + case BEV_CTRL_CANCEL_ALL: default: return -1; } diff --git a/bufferevent_sock.c b/bufferevent_sock.c index 975b5b6c..089aedb4 100644 --- a/bufferevent_sock.c +++ b/bufferevent_sock.c @@ -685,6 +685,7 @@ be_socket_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, data->fd = event_get_fd(&bev->ev_read); return 0; case BEV_CTRL_GET_UNDERLYING: + case BEV_CTRL_CANCEL_ALL: default: return -1; }