diff --git a/bufferevent-internal.h b/bufferevent-internal.h index c6365262..1b817f6c 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -56,6 +56,11 @@ extern "C" { /* On a socket bufferevent: can't do any operations while we're waiting for * name lookup to finish. */ #define BEV_SUSPEND_LOOKUP 0x08 +/* On a base bufferevent, for reading: used when a filter has choked this + * (underlying) bufferevent because it has stopped reading from it. */ +#define BEV_SUSPEND_FILT_READ 0x10 + +typedef ev_uint16_t bufferevent_suspend_flags; struct bufferevent_rate_limit_group { /** List of all members in the group */ @@ -154,12 +159,12 @@ struct bufferevent_private { /** If set, read is suspended until one or more conditions are over. * The actual value here is a bitfield of those conditions; see the * BEV_SUSPEND_* flags above. */ - short read_suspended; + bufferevent_suspend_flags read_suspended; /** If set, writing is suspended until one or more conditions are over. * The actual value here is a bitfield of those conditions; see the * BEV_SUSPEND_* flags above. */ - short write_suspended; + bufferevent_suspend_flags write_suspended; /** Set to the current socket errno if we have deferred callbacks and * an events callback is pending. */ @@ -265,17 +270,17 @@ int bufferevent_init_common(struct bufferevent_private *, struct event_base *, c /** For internal use: temporarily stop all reads on bufev, until the conditions * in 'what' are over. */ -void bufferevent_suspend_read(struct bufferevent *bufev, short what); +void bufferevent_suspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what); /** For internal use: clear the conditions 'what' on bufev, and re-enable * reading if there are no conditions left. */ -void bufferevent_unsuspend_read(struct bufferevent *bufev, short what); +void bufferevent_unsuspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what); /** For internal use: temporarily stop all writes on bufev, until the conditions * in 'what' are over. */ -void bufferevent_suspend_write(struct bufferevent *bufev, short what); +void bufferevent_suspend_write(struct bufferevent *bufev, bufferevent_suspend_flags what); /** For internal use: clear the conditions 'what' on bufev, and re-enable * writing if there are no conditions left. */ -void bufferevent_unsuspend_write(struct bufferevent *bufev, short what); +void bufferevent_unsuspend_write(struct bufferevent *bufev, bufferevent_suspend_flags what); #define bufferevent_wm_suspend_read(b) \ bufferevent_suspend_read((b), BEV_SUSPEND_WM) diff --git a/bufferevent.c b/bufferevent.c index 2ae42fba..e5369ece 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -60,7 +60,7 @@ #include "util-internal.h" void -bufferevent_suspend_read(struct bufferevent *bufev, short what) +bufferevent_suspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what) { struct bufferevent_private *bufev_private = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); @@ -72,7 +72,7 @@ bufferevent_suspend_read(struct bufferevent *bufev, short what) } void -bufferevent_unsuspend_read(struct bufferevent *bufev, short what) +bufferevent_unsuspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what) { struct bufferevent_private *bufev_private = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); @@ -84,7 +84,7 @@ bufferevent_unsuspend_read(struct bufferevent *bufev, short what) } void -bufferevent_suspend_write(struct bufferevent *bufev, short what) +bufferevent_suspend_write(struct bufferevent *bufev, bufferevent_suspend_flags what) { struct bufferevent_private *bufev_private = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); @@ -96,7 +96,7 @@ bufferevent_suspend_write(struct bufferevent *bufev, short what) } void -bufferevent_unsuspend_write(struct bufferevent *bufev, short what) +bufferevent_unsuspend_write(struct bufferevent *bufev, bufferevent_suspend_flags what) { struct bufferevent_private *bufev_private = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); diff --git a/bufferevent_filter.c b/bufferevent_filter.c index c1fa3ddd..0dbc0973 100644 --- a/bufferevent_filter.c +++ b/bufferevent_filter.c @@ -168,6 +168,9 @@ bufferevent_filter_new(struct bufferevent *underlying, struct bufferevent_filtered *bufev_f; int tmp_options = options & ~BEV_OPT_THREADSAFE; + if (!underlying) + return NULL; + if (!input_filter) input_filter = be_null_filter; if (!output_filter) @@ -202,6 +205,9 @@ bufferevent_filter_new(struct bufferevent *underlying, _bufferevent_init_generic_timeout_cbs(downcast(bufev_f)); bufferevent_incref(underlying); + bufferevent_enable(underlying, EV_READ|EV_WRITE); + bufferevent_suspend_read(underlying, BEV_SUSPEND_FILT_READ); + return downcast(bufev_f); } @@ -225,6 +231,11 @@ be_filter_destruct(struct bufferevent *bev) } else { bufferevent_free(bevf->underlying); } + } else { + if (bevf->underlying) { + bufferevent_unsuspend_read(bevf->underlying, + BEV_SUSPEND_FILT_READ); + } } _bufferevent_del_generic_timeout_cbs(bev); @@ -234,22 +245,29 @@ static int be_filter_enable(struct bufferevent *bev, short event) { struct bufferevent_filtered *bevf = upcast(bev); - if (event & EV_READ) - BEV_RESET_GENERIC_READ_TIMEOUT(bev); if (event & EV_WRITE) BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); - return bufferevent_enable(bevf->underlying, event); + + if (event & EV_READ) { + BEV_RESET_GENERIC_READ_TIMEOUT(bev); + bufferevent_unsuspend_read(bevf->underlying, + BEV_SUSPEND_FILT_READ); + } + return 0; } static int be_filter_disable(struct bufferevent *bev, short event) { struct bufferevent_filtered *bevf = upcast(bev); - if (event & EV_READ) - BEV_DEL_GENERIC_READ_TIMEOUT(bev); if (event & EV_WRITE) BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); - return bufferevent_disable(bevf->underlying, event); + if (event & EV_READ) { + BEV_DEL_GENERIC_READ_TIMEOUT(bev); + bufferevent_suspend_read(bevf->underlying, + BEV_SUSPEND_FILT_READ); + } + return 0; } static enum bufferevent_filter_result diff --git a/bufferevent_openssl.c b/bufferevent_openssl.c index 2477f579..11fd5b8d 100644 --- a/bufferevent_openssl.c +++ b/bufferevent_openssl.c @@ -376,10 +376,9 @@ static int start_reading(struct bufferevent_openssl *bev_ssl) { if (bev_ssl->underlying) { - short e = EV_READ; - if (bev_ssl->read_blocked_on_write) - e |= EV_WRITE; - return bufferevent_enable(bev_ssl->underlying, e); + bufferevent_unsuspend_read(bev_ssl->underlying, + BEV_SUSPEND_FILT_READ); + return 0; } else { struct bufferevent *bev = &bev_ssl->bev.bev; int r; @@ -397,12 +396,9 @@ start_reading(struct bufferevent_openssl *bev_ssl) static int start_writing(struct bufferevent_openssl *bev_ssl) { - int r; + int r = 0; if (bev_ssl->underlying) { - short e = EV_WRITE; - if (bev_ssl->write_blocked_on_read) - e |= EV_READ; - r = bufferevent_enable(bev_ssl->underlying, e); + ; } else { struct bufferevent *bev = &bev_ssl->bev.bev; r = _bufferevent_add_event(&bev->ev_write, &bev->timeout_write); @@ -418,9 +414,10 @@ stop_reading(struct bufferevent_openssl *bev_ssl) { if (bev_ssl->write_blocked_on_read) return; - if (bev_ssl->underlying) - bufferevent_disable(bev_ssl->underlying, EV_READ); - else { + if (bev_ssl->underlying) { + bufferevent_suspend_read(bev_ssl->underlying, + BEV_SUSPEND_FILT_READ); + } else { struct bufferevent *bev = &bev_ssl->bev.bev; event_del(&bev->ev_read); } @@ -431,9 +428,9 @@ stop_writing(struct bufferevent_openssl *bev_ssl) { if (bev_ssl->read_blocked_on_write) return; - if (bev_ssl->underlying) - bufferevent_disable(bev_ssl->underlying, EV_WRITE); - else { + if (bev_ssl->underlying) { + ; + } else { struct bufferevent *bev = &bev_ssl->bev.bev; event_del(&bev->ev_write); } @@ -901,13 +898,13 @@ do_handshake(struct bufferevent_openssl *bev_ssl) print_err(err); switch (err) { case SSL_ERROR_WANT_WRITE: - if (!bev_ssl->underlying) { + if (!bev_ssl->underlying) { /* XXXX ???? */ stop_reading(bev_ssl); return start_writing(bev_ssl); } return 0; case SSL_ERROR_WANT_READ: - if (!bev_ssl->underlying) { + if (!bev_ssl->underlying) { /* XXXX ???? */ stop_writing(bev_ssl); return start_reading(bev_ssl); } @@ -1219,9 +1216,10 @@ bufferevent_openssl_new_impl(struct event_base *base, goto err; } - if (underlying) + if (underlying) { bufferevent_enable(underlying, EV_READ|EV_WRITE); - else { + /* XXXX ???? */ + } else { bev_ssl->bev.bev.enabled = EV_READ|EV_WRITE; if (bev_ssl->fd_is_set) { /* XXX Is this quite right? */