mirror of
https://github.com/cuberite/libevent.git
synced 2025-09-09 20:41:27 -04:00
Merge branch 'be-wm-overrun-v2'
* be-wm-overrun-v2: Fix hangs due to watermarks overruns in bufferevents implementations test: cover watermarks (with some corner cases) in ssl bufferevent Fixes: #690 (cherry picked from commit 878bb2d3b9484b27594308da1d0d6a7c9bdf6647)
This commit is contained in:
parent
2594a96ff6
commit
3f692fff32
@ -111,6 +111,28 @@ bufferevent_unsuspend_write_(struct bufferevent *bufev, bufferevent_suspend_flag
|
|||||||
BEV_UNLOCK(bufev);
|
BEV_UNLOCK(bufev);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sometimes bufferevent's implementation can overrun high watermarks
|
||||||
|
* (one of examples is openssl) and in this case if the read callback
|
||||||
|
* will not handle enough data do over condition above the read
|
||||||
|
* callback will never be called again (due to suspend above).
|
||||||
|
*
|
||||||
|
* To avoid this we are scheduling read callback again here, but only
|
||||||
|
* from the user callback to avoid multiple scheduling:
|
||||||
|
* - when the data had been added to it
|
||||||
|
* - when the data had been drained from it (user specified read callback)
|
||||||
|
*/
|
||||||
|
static void bufferevent_inbuf_wm_check(struct bufferevent *bev)
|
||||||
|
{
|
||||||
|
if (!bev->wm_read.high)
|
||||||
|
return;
|
||||||
|
if (!(bev->enabled & EV_READ))
|
||||||
|
return;
|
||||||
|
if (evbuffer_get_length(bev->input) < bev->wm_read.high)
|
||||||
|
return;
|
||||||
|
|
||||||
|
bufferevent_trigger(bev, EV_READ, BEV_OPT_DEFER_CALLBACKS);
|
||||||
|
}
|
||||||
|
|
||||||
/* Callback to implement watermarks on the input buffer. Only enabled
|
/* Callback to implement watermarks on the input buffer. Only enabled
|
||||||
* if the watermark is set. */
|
* if the watermark is set. */
|
||||||
@ -147,6 +169,7 @@ bufferevent_run_deferred_callbacks_locked(struct event_callback *cb, void *arg)
|
|||||||
if (bufev_private->readcb_pending && bufev->readcb) {
|
if (bufev_private->readcb_pending && bufev->readcb) {
|
||||||
bufev_private->readcb_pending = 0;
|
bufev_private->readcb_pending = 0;
|
||||||
bufev->readcb(bufev, bufev->cbarg);
|
bufev->readcb(bufev, bufev->cbarg);
|
||||||
|
bufferevent_inbuf_wm_check(bufev);
|
||||||
}
|
}
|
||||||
if (bufev_private->writecb_pending && bufev->writecb) {
|
if (bufev_private->writecb_pending && bufev->writecb) {
|
||||||
bufev_private->writecb_pending = 0;
|
bufev_private->writecb_pending = 0;
|
||||||
@ -187,6 +210,7 @@ bufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg
|
|||||||
void *cbarg = bufev->cbarg;
|
void *cbarg = bufev->cbarg;
|
||||||
bufev_private->readcb_pending = 0;
|
bufev_private->readcb_pending = 0;
|
||||||
UNLOCKED(readcb(bufev, cbarg));
|
UNLOCKED(readcb(bufev, cbarg));
|
||||||
|
bufferevent_inbuf_wm_check(bufev);
|
||||||
}
|
}
|
||||||
if (bufev_private->writecb_pending && bufev->writecb) {
|
if (bufev_private->writecb_pending && bufev->writecb) {
|
||||||
bufferevent_data_cb writecb = bufev->writecb;
|
bufferevent_data_cb writecb = bufev->writecb;
|
||||||
@ -230,6 +254,7 @@ bufferevent_run_readcb_(struct bufferevent *bufev, int options)
|
|||||||
SCHEDULE_DEFERRED(p);
|
SCHEDULE_DEFERRED(p);
|
||||||
} else {
|
} else {
|
||||||
bufev->readcb(bufev, bufev->cbarg);
|
bufev->readcb(bufev, bufev->cbarg);
|
||||||
|
bufferevent_inbuf_wm_check(bufev);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -518,6 +518,9 @@ int bufferevent_set_timeouts(struct bufferevent *bufev,
|
|||||||
On input, a bufferevent does not invoke the user read callback unless
|
On input, a bufferevent does not invoke the user read callback unless
|
||||||
there is at least low watermark data in the buffer. If the read buffer
|
there is at least low watermark data in the buffer. If the read buffer
|
||||||
is beyond the high watermark, the bufferevent stops reading from the network.
|
is beyond the high watermark, the bufferevent stops reading from the network.
|
||||||
|
But be aware that bufferevent input/read buffer can overrun high watermark
|
||||||
|
limit (typical example is openssl bufferevent), so you should not relay in
|
||||||
|
this.
|
||||||
|
|
||||||
On output, the user write callback is invoked whenever the buffered data
|
On output, the user write callback is invoked whenever the buffered data
|
||||||
falls below the low watermark. Filters that write to this bufev will try
|
falls below the low watermark. Filters that write to this bufev will try
|
||||||
|
@ -733,6 +733,169 @@ end:
|
|||||||
;
|
;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct wm_context
|
||||||
|
{
|
||||||
|
int server;
|
||||||
|
struct evbuffer *data;
|
||||||
|
size_t to_read;
|
||||||
|
size_t wm_high;
|
||||||
|
size_t limit;
|
||||||
|
size_t get;
|
||||||
|
struct bufferevent *bev;
|
||||||
|
};
|
||||||
|
static void
|
||||||
|
wm_transfer(struct bufferevent *bev, void *arg)
|
||||||
|
{
|
||||||
|
struct wm_context *ctx = arg;
|
||||||
|
struct evbuffer *in = bufferevent_get_input(bev);
|
||||||
|
struct evbuffer *out = bufferevent_get_output(bev);
|
||||||
|
size_t len = evbuffer_get_length(in);
|
||||||
|
size_t drain = len < ctx->to_read ? len : ctx->to_read;
|
||||||
|
|
||||||
|
evbuffer_drain(in, drain);
|
||||||
|
ctx->get += drain;
|
||||||
|
|
||||||
|
TT_BLATHER(("wm_transfer-%s: in: %zu, out: %zu, got: %zu",
|
||||||
|
ctx->server ? "server" : "client",
|
||||||
|
evbuffer_get_length(in),
|
||||||
|
evbuffer_get_length(out),
|
||||||
|
ctx->get));
|
||||||
|
|
||||||
|
evbuffer_add_buffer_reference(out, ctx->data);
|
||||||
|
if (ctx->get >= ctx->limit) {
|
||||||
|
TT_BLATHER(("wm_transfer-%s: break",
|
||||||
|
ctx->server ? "server" : "client"));
|
||||||
|
bufferevent_setcb(bev, NULL, NULL, NULL, NULL);
|
||||||
|
bufferevent_disable(bev, EV_READ);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
static void
|
||||||
|
wm_eventcb(struct bufferevent *bev, short what, void *arg)
|
||||||
|
{
|
||||||
|
struct wm_context *ctx = arg;
|
||||||
|
TT_BLATHER(("wm_eventcb-%s: %i",
|
||||||
|
ctx->server ? "server" : "client", what));
|
||||||
|
if (what & BEV_EVENT_CONNECTED) {
|
||||||
|
} else {
|
||||||
|
ctx->get = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
static void
|
||||||
|
wm_acceptcb(struct evconnlistener *listener, evutil_socket_t fd,
|
||||||
|
struct sockaddr *addr, int socklen, void *arg)
|
||||||
|
{
|
||||||
|
struct wm_context *ctx = arg;
|
||||||
|
struct bufferevent *bev;
|
||||||
|
struct event_base *base = evconnlistener_get_base(listener);
|
||||||
|
SSL *ssl = SSL_new(get_ssl_ctx());
|
||||||
|
|
||||||
|
SSL_use_certificate(ssl, ssl_getcert());
|
||||||
|
SSL_use_PrivateKey(ssl, ssl_getkey());
|
||||||
|
|
||||||
|
bev = bufferevent_openssl_socket_new(
|
||||||
|
base, fd, ssl, BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_CLOSE_ON_FREE);
|
||||||
|
|
||||||
|
bufferevent_setwatermark(bev, EV_READ, 0, ctx->wm_high);
|
||||||
|
bufferevent_setcb(bev, wm_transfer, NULL, wm_eventcb, ctx);
|
||||||
|
bufferevent_enable(bev, EV_READ|EV_WRITE);
|
||||||
|
ctx->bev = bev;
|
||||||
|
|
||||||
|
/* Only accept once, then disable ourself. */
|
||||||
|
evconnlistener_disable(listener);
|
||||||
|
}
|
||||||
|
static void
|
||||||
|
regress_bufferevent_openssl_wm(void *arg)
|
||||||
|
{
|
||||||
|
struct basic_test_data *data = arg;
|
||||||
|
struct event_base *base = data->base;
|
||||||
|
|
||||||
|
struct evconnlistener *listener;
|
||||||
|
struct bufferevent *bev;
|
||||||
|
struct sockaddr_in sin;
|
||||||
|
struct sockaddr_storage ss;
|
||||||
|
enum regress_openssl_type type =
|
||||||
|
(enum regress_openssl_type)data->setup_data;
|
||||||
|
int bev_flags = BEV_OPT_CLOSE_ON_FREE;
|
||||||
|
ev_socklen_t slen;
|
||||||
|
SSL *ssl;
|
||||||
|
struct wm_context client, server;
|
||||||
|
char *payload;
|
||||||
|
size_t payload_len = 1<<10;
|
||||||
|
size_t wm_high = 5<<10;
|
||||||
|
|
||||||
|
init_ssl();
|
||||||
|
|
||||||
|
memset(&sin, 0, sizeof(sin));
|
||||||
|
sin.sin_family = AF_INET;
|
||||||
|
sin.sin_addr.s_addr = htonl(0x7f000001);
|
||||||
|
|
||||||
|
memset(&ss, 0, sizeof(ss));
|
||||||
|
slen = sizeof(ss);
|
||||||
|
|
||||||
|
memset(&client, 0, sizeof(client));
|
||||||
|
memset(&server, 0, sizeof(server));
|
||||||
|
client.server = 0;
|
||||||
|
server.server = 1;
|
||||||
|
client.data = evbuffer_new();
|
||||||
|
server.data = evbuffer_new();
|
||||||
|
payload = calloc(1, payload_len);
|
||||||
|
memset(payload, 'A', payload_len);
|
||||||
|
evbuffer_add(server.data, payload, payload_len);
|
||||||
|
evbuffer_add(client.data, payload, payload_len);
|
||||||
|
client.wm_high = server.wm_high = wm_high;
|
||||||
|
client.limit = server.limit = wm_high<<3;
|
||||||
|
client.to_read = server.to_read = payload_len>>1;
|
||||||
|
|
||||||
|
TT_BLATHER(("openssl_wm: payload_len = %zu, wm_high = %zu, limit = %zu, to_read: %zu",
|
||||||
|
payload_len, wm_high, server.limit, server.to_read));
|
||||||
|
|
||||||
|
listener = evconnlistener_new_bind(base, wm_acceptcb, &server,
|
||||||
|
LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE,
|
||||||
|
-1, (struct sockaddr *)&sin, sizeof(sin));
|
||||||
|
|
||||||
|
tt_assert(listener);
|
||||||
|
tt_assert(evconnlistener_get_fd(listener) >= 0);
|
||||||
|
|
||||||
|
ssl = SSL_new(get_ssl_ctx());
|
||||||
|
tt_assert(ssl);
|
||||||
|
|
||||||
|
if (type & REGRESS_OPENSSL_FILTER) {
|
||||||
|
bev = bufferevent_socket_new(data->base, -1, bev_flags);
|
||||||
|
tt_assert(bev);
|
||||||
|
bev = bufferevent_openssl_filter_new(
|
||||||
|
base, bev, ssl, BUFFEREVENT_SSL_CONNECTING, bev_flags);
|
||||||
|
} else {
|
||||||
|
bev = bufferevent_openssl_socket_new(
|
||||||
|
data->base, -1, ssl,
|
||||||
|
BUFFEREVENT_SSL_CONNECTING,
|
||||||
|
bev_flags);
|
||||||
|
}
|
||||||
|
tt_assert(bev);
|
||||||
|
client.bev = bev;
|
||||||
|
|
||||||
|
bufferevent_setwatermark(bev, EV_READ, 0, client.wm_high);
|
||||||
|
bufferevent_setcb(bev, wm_transfer, NULL, wm_eventcb, &client);
|
||||||
|
|
||||||
|
tt_assert(getsockname(evconnlistener_get_fd(listener),
|
||||||
|
(struct sockaddr*)&ss, &slen) == 0);
|
||||||
|
|
||||||
|
tt_assert(!bufferevent_socket_connect(bev, (struct sockaddr*)&ss, slen));
|
||||||
|
tt_assert(!evbuffer_add_buffer_reference(bufferevent_get_output(bev), client.data));
|
||||||
|
tt_assert(!bufferevent_enable(bev, EV_READ|EV_WRITE));
|
||||||
|
|
||||||
|
event_base_dispatch(base);
|
||||||
|
|
||||||
|
tt_int_op(client.get, ==, client.limit);
|
||||||
|
tt_int_op(server.get, ==, server.limit);
|
||||||
|
end:
|
||||||
|
free(payload);
|
||||||
|
evbuffer_free(client.data);
|
||||||
|
evbuffer_free(server.data);
|
||||||
|
evconnlistener_free(listener);
|
||||||
|
bufferevent_free(client.bev);
|
||||||
|
bufferevent_free(server.bev);
|
||||||
|
}
|
||||||
|
|
||||||
struct testcase_t ssl_testcases[] = {
|
struct testcase_t ssl_testcases[] = {
|
||||||
#define T(a) ((void *)(a))
|
#define T(a) ((void *)(a))
|
||||||
{ "bufferevent_socketpair", regress_bufferevent_openssl,
|
{ "bufferevent_socketpair", regress_bufferevent_openssl,
|
||||||
@ -808,6 +971,11 @@ struct testcase_t ssl_testcases[] = {
|
|||||||
{ "bufferevent_connect_sleep", regress_bufferevent_openssl_connect,
|
{ "bufferevent_connect_sleep", regress_bufferevent_openssl_connect,
|
||||||
TT_FORK|TT_NEED_BASE, &basic_setup, T(REGRESS_OPENSSL_SLEEP) },
|
TT_FORK|TT_NEED_BASE, &basic_setup, T(REGRESS_OPENSSL_SLEEP) },
|
||||||
|
|
||||||
|
{ "bufferevent_wm", regress_bufferevent_openssl_wm,
|
||||||
|
TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
|
||||||
|
{ "bufferevent_wm_filter", regress_bufferevent_openssl_wm,
|
||||||
|
TT_FORK|TT_NEED_BASE, &basic_setup, T(REGRESS_OPENSSL_FILTER) },
|
||||||
|
|
||||||
#undef T
|
#undef T
|
||||||
|
|
||||||
END_OF_TESTCASES,
|
END_OF_TESTCASES,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user