Correct logic on disabling underlying bufferevents when disabling a filter

Previously, whenever writing was disabled on a bufferevent_filter (or
a filtering SSL bufferevent), we would stop writing on the underlying
bufferevent.  This would make for trouble, though, since if you
implemented common patterns like "stop writing once data X has been
flushed", your bufferevent filter would disable the underlying
bufferevent after the data was flushed to the underlying bufferevent,
but before actually having it written to the network.

Now, we have filters leave their underlying bufferevents enabled for
reading and writing for reading and writing immediately.  They are not
disabled, unless the user wants to disable them, which is now allowed.
To handle the case where we want to choke reading on the underlying
bufferevent because the filter no longer wants to read, we use
bufferevent_suspend_read().  This is analogous to the way that we use
bufferevent_suspend_write() to suspend writing on a filtering
bufferevent when the underlying bufferevent's output buffer has hit
its high watermark.
This commit is contained in:
Nick Mathewson 2010-10-08 00:59:02 -04:00
parent 34d64f8a34
commit ac27eb8276
4 changed files with 56 additions and 35 deletions

View File

@ -56,6 +56,11 @@ extern "C" {
/* On a socket bufferevent: can't do any operations while we're waiting for
* name lookup to finish. */
#define BEV_SUSPEND_LOOKUP 0x08
/* On a base bufferevent, for reading: used when a filter has choked this
* (underlying) bufferevent because it has stopped reading from it. */
#define BEV_SUSPEND_FILT_READ 0x10
typedef ev_uint16_t bufferevent_suspend_flags;
struct bufferevent_rate_limit_group {
/** List of all members in the group */
@ -154,12 +159,12 @@ struct bufferevent_private {
/** If set, read is suspended until one or more conditions are over.
* The actual value here is a bitfield of those conditions; see the
* BEV_SUSPEND_* flags above. */
short read_suspended;
bufferevent_suspend_flags read_suspended;
/** If set, writing is suspended until one or more conditions are over.
* The actual value here is a bitfield of those conditions; see the
* BEV_SUSPEND_* flags above. */
short write_suspended;
bufferevent_suspend_flags write_suspended;
/** Set to the current socket errno if we have deferred callbacks and
* an events callback is pending. */
@ -265,17 +270,17 @@ int bufferevent_init_common(struct bufferevent_private *, struct event_base *, c
/** For internal use: temporarily stop all reads on bufev, until the conditions
* in 'what' are over. */
void bufferevent_suspend_read(struct bufferevent *bufev, short what);
void bufferevent_suspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what);
/** For internal use: clear the conditions 'what' on bufev, and re-enable
* reading if there are no conditions left. */
void bufferevent_unsuspend_read(struct bufferevent *bufev, short what);
void bufferevent_unsuspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what);
/** For internal use: temporarily stop all writes on bufev, until the conditions
* in 'what' are over. */
void bufferevent_suspend_write(struct bufferevent *bufev, short what);
void bufferevent_suspend_write(struct bufferevent *bufev, bufferevent_suspend_flags what);
/** For internal use: clear the conditions 'what' on bufev, and re-enable
* writing if there are no conditions left. */
void bufferevent_unsuspend_write(struct bufferevent *bufev, short what);
void bufferevent_unsuspend_write(struct bufferevent *bufev, bufferevent_suspend_flags what);
#define bufferevent_wm_suspend_read(b) \
bufferevent_suspend_read((b), BEV_SUSPEND_WM)

View File

@ -60,7 +60,7 @@
#include "util-internal.h"
void
bufferevent_suspend_read(struct bufferevent *bufev, short what)
bufferevent_suspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
@ -72,7 +72,7 @@ bufferevent_suspend_read(struct bufferevent *bufev, short what)
}
void
bufferevent_unsuspend_read(struct bufferevent *bufev, short what)
bufferevent_unsuspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
@ -84,7 +84,7 @@ bufferevent_unsuspend_read(struct bufferevent *bufev, short what)
}
void
bufferevent_suspend_write(struct bufferevent *bufev, short what)
bufferevent_suspend_write(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
@ -96,7 +96,7 @@ bufferevent_suspend_write(struct bufferevent *bufev, short what)
}
void
bufferevent_unsuspend_write(struct bufferevent *bufev, short what)
bufferevent_unsuspend_write(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);

View File

@ -168,6 +168,9 @@ bufferevent_filter_new(struct bufferevent *underlying,
struct bufferevent_filtered *bufev_f;
int tmp_options = options & ~BEV_OPT_THREADSAFE;
if (!underlying)
return NULL;
if (!input_filter)
input_filter = be_null_filter;
if (!output_filter)
@ -202,6 +205,9 @@ bufferevent_filter_new(struct bufferevent *underlying,
_bufferevent_init_generic_timeout_cbs(downcast(bufev_f));
bufferevent_incref(underlying);
bufferevent_enable(underlying, EV_READ|EV_WRITE);
bufferevent_suspend_read(underlying, BEV_SUSPEND_FILT_READ);
return downcast(bufev_f);
}
@ -225,6 +231,11 @@ be_filter_destruct(struct bufferevent *bev)
} else {
bufferevent_free(bevf->underlying);
}
} else {
if (bevf->underlying) {
bufferevent_unsuspend_read(bevf->underlying,
BEV_SUSPEND_FILT_READ);
}
}
_bufferevent_del_generic_timeout_cbs(bev);
@ -234,22 +245,29 @@ static int
be_filter_enable(struct bufferevent *bev, short event)
{
struct bufferevent_filtered *bevf = upcast(bev);
if (event & EV_READ)
BEV_RESET_GENERIC_READ_TIMEOUT(bev);
if (event & EV_WRITE)
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
return bufferevent_enable(bevf->underlying, event);
if (event & EV_READ) {
BEV_RESET_GENERIC_READ_TIMEOUT(bev);
bufferevent_unsuspend_read(bevf->underlying,
BEV_SUSPEND_FILT_READ);
}
return 0;
}
static int
be_filter_disable(struct bufferevent *bev, short event)
{
struct bufferevent_filtered *bevf = upcast(bev);
if (event & EV_READ)
BEV_DEL_GENERIC_READ_TIMEOUT(bev);
if (event & EV_WRITE)
BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
return bufferevent_disable(bevf->underlying, event);
if (event & EV_READ) {
BEV_DEL_GENERIC_READ_TIMEOUT(bev);
bufferevent_suspend_read(bevf->underlying,
BEV_SUSPEND_FILT_READ);
}
return 0;
}
static enum bufferevent_filter_result

View File

@ -376,10 +376,9 @@ static int
start_reading(struct bufferevent_openssl *bev_ssl)
{
if (bev_ssl->underlying) {
short e = EV_READ;
if (bev_ssl->read_blocked_on_write)
e |= EV_WRITE;
return bufferevent_enable(bev_ssl->underlying, e);
bufferevent_unsuspend_read(bev_ssl->underlying,
BEV_SUSPEND_FILT_READ);
return 0;
} else {
struct bufferevent *bev = &bev_ssl->bev.bev;
int r;
@ -397,12 +396,9 @@ start_reading(struct bufferevent_openssl *bev_ssl)
static int
start_writing(struct bufferevent_openssl *bev_ssl)
{
int r;
int r = 0;
if (bev_ssl->underlying) {
short e = EV_WRITE;
if (bev_ssl->write_blocked_on_read)
e |= EV_READ;
r = bufferevent_enable(bev_ssl->underlying, e);
;
} else {
struct bufferevent *bev = &bev_ssl->bev.bev;
r = _bufferevent_add_event(&bev->ev_write, &bev->timeout_write);
@ -418,9 +414,10 @@ stop_reading(struct bufferevent_openssl *bev_ssl)
{
if (bev_ssl->write_blocked_on_read)
return;
if (bev_ssl->underlying)
bufferevent_disable(bev_ssl->underlying, EV_READ);
else {
if (bev_ssl->underlying) {
bufferevent_suspend_read(bev_ssl->underlying,
BEV_SUSPEND_FILT_READ);
} else {
struct bufferevent *bev = &bev_ssl->bev.bev;
event_del(&bev->ev_read);
}
@ -431,9 +428,9 @@ stop_writing(struct bufferevent_openssl *bev_ssl)
{
if (bev_ssl->read_blocked_on_write)
return;
if (bev_ssl->underlying)
bufferevent_disable(bev_ssl->underlying, EV_WRITE);
else {
if (bev_ssl->underlying) {
;
} else {
struct bufferevent *bev = &bev_ssl->bev.bev;
event_del(&bev->ev_write);
}
@ -901,13 +898,13 @@ do_handshake(struct bufferevent_openssl *bev_ssl)
print_err(err);
switch (err) {
case SSL_ERROR_WANT_WRITE:
if (!bev_ssl->underlying) {
if (!bev_ssl->underlying) { /* XXXX ???? */
stop_reading(bev_ssl);
return start_writing(bev_ssl);
}
return 0;
case SSL_ERROR_WANT_READ:
if (!bev_ssl->underlying) {
if (!bev_ssl->underlying) { /* XXXX ???? */
stop_writing(bev_ssl);
return start_reading(bev_ssl);
}
@ -1219,9 +1216,10 @@ bufferevent_openssl_new_impl(struct event_base *base,
goto err;
}
if (underlying)
if (underlying) {
bufferevent_enable(underlying, EV_READ|EV_WRITE);
else {
/* XXXX ???? */
} else {
bev_ssl->bev.bev.enabled = EV_READ|EV_WRITE;
if (bev_ssl->fd_is_set) {
/* XXX Is this quite right? */