Merge remote-tracking branch 'mistotebe/bufferevent_trigger'

This commit is contained in:
Nick Mathewson 2013-12-24 10:33:58 -05:00
commit b4ef3def6f
10 changed files with 328 additions and 87 deletions

View File

@ -340,14 +340,22 @@ int bufferevent_decref_(struct bufferevent *bufev);
int bufferevent_decref_and_unlock_(struct bufferevent *bufev); int bufferevent_decref_and_unlock_(struct bufferevent *bufev);
/** Internal: If callbacks are deferred and we have a read callback, schedule /** Internal: If callbacks are deferred and we have a read callback, schedule
* a readcb. Otherwise just run the readcb. */ * a readcb. Otherwise just run the readcb. Ignores watermarks. */
void bufferevent_run_readcb_(struct bufferevent *bufev); void bufferevent_run_readcb_(struct bufferevent *bufev, int options);
/** Internal: If callbacks are deferred and we have a write callback, schedule /** Internal: If callbacks are deferred and we have a write callback, schedule
* a writecb. Otherwise just run the writecb. */ * a writecb. Otherwise just run the writecb. Ignores watermarks. */
void bufferevent_run_writecb_(struct bufferevent *bufev); void bufferevent_run_writecb_(struct bufferevent *bufev, int options);
/** Internal: If callbacks are deferred and we have an eventcb, schedule /** Internal: If callbacks are deferred and we have an eventcb, schedule
* it to run with events "what". Otherwise just run the eventcb. */ * it to run with events "what". Otherwise just run the eventcb.
void bufferevent_run_eventcb_(struct bufferevent *bufev, short what); * See bufferevent_trigger_event for meaning of "options". */
void bufferevent_run_eventcb_(struct bufferevent *bufev, short what, int options);
/** Internal: Run or schedule (if deferred or options contain
* BEV_TRIG_DEFER_CALLBACKS) I/O callbacks specified in iotype.
* Must already hold the bufev lock. Honors watermarks unless
* BEV_TRIG_IGNORE_WATERMARKS is in options. */
void bufferevent_trigger_nolock_(struct bufferevent *bufev, short iotype, int options);
/** Internal: Add the event 'ev' with timeout tv, unless tv is set to 0, in /** Internal: Add the event 'ev' with timeout tv, unless tv is set to 0, in
* which case add ev with no timeout. */ * which case add ev with no timeout. */

View File

@ -219,14 +219,15 @@ bufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg
void void
bufferevent_run_readcb_(struct bufferevent *bufev) bufferevent_run_readcb_(struct bufferevent *bufev, int options)
{ {
/* Requires that we hold the lock and a reference */ /* Requires that we hold the lock and a reference */
struct bufferevent_private *p = struct bufferevent_private *p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (bufev->readcb == NULL) if (bufev->readcb == NULL)
return; return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) { if ((p->options & BEV_OPT_DEFER_CALLBACKS) ||
(options & BEV_TRIG_DEFER_CALLBACKS)) {
p->readcb_pending = 1; p->readcb_pending = 1;
SCHEDULE_DEFERRED(p); SCHEDULE_DEFERRED(p);
} else { } else {
@ -235,14 +236,15 @@ bufferevent_run_readcb_(struct bufferevent *bufev)
} }
void void
bufferevent_run_writecb_(struct bufferevent *bufev) bufferevent_run_writecb_(struct bufferevent *bufev, int options)
{ {
/* Requires that we hold the lock and a reference */ /* Requires that we hold the lock and a reference */
struct bufferevent_private *p = struct bufferevent_private *p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (bufev->writecb == NULL) if (bufev->writecb == NULL)
return; return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) { if ((p->options & BEV_OPT_DEFER_CALLBACKS) ||
(options & BEV_TRIG_DEFER_CALLBACKS)) {
p->writecb_pending = 1; p->writecb_pending = 1;
SCHEDULE_DEFERRED(p); SCHEDULE_DEFERRED(p);
} else { } else {
@ -251,14 +253,34 @@ bufferevent_run_writecb_(struct bufferevent *bufev)
} }
void void
bufferevent_run_eventcb_(struct bufferevent *bufev, short what) bufferevent_trigger_nolock_(struct bufferevent *bufev, short iotype, int options)
{
if ((iotype & EV_READ) && ((options & BEV_TRIG_IGNORE_WATERMARKS) ||
evbuffer_get_length(bufev->input) >= bufev->wm_read.low))
bufferevent_run_readcb_(bufev, options);
if ((iotype & EV_WRITE) && ((options & BEV_TRIG_IGNORE_WATERMARKS) ||
evbuffer_get_length(bufev->output) <= bufev->wm_write.low))
bufferevent_run_writecb_(bufev, options);
}
void
bufferevent_trigger(struct bufferevent *bufev, short iotype, int options)
{
bufferevent_incref_and_lock_(bufev);
bufferevent_trigger_nolock_(bufev, iotype, options);
bufferevent_decref_and_unlock_(bufev);
}
void
bufferevent_run_eventcb_(struct bufferevent *bufev, short what, int options)
{ {
/* Requires that we hold the lock and a reference */ /* Requires that we hold the lock and a reference */
struct bufferevent_private *p = struct bufferevent_private *p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (bufev->errorcb == NULL) if (bufev->errorcb == NULL)
return; return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) { if ((p->options & BEV_OPT_DEFER_CALLBACKS) ||
(options & BEV_TRIG_DEFER_CALLBACKS)) {
p->eventcb_pending |= what; p->eventcb_pending |= what;
p->errno_pending = EVUTIL_SOCKET_ERROR(); p->errno_pending = EVUTIL_SOCKET_ERROR();
SCHEDULE_DEFERRED(p); SCHEDULE_DEFERRED(p);
@ -267,6 +289,14 @@ bufferevent_run_eventcb_(struct bufferevent *bufev, short what)
} }
} }
void
bufferevent_trigger_event(struct bufferevent *bufev, short what, int options)
{
bufferevent_incref_and_lock_(bufev);
bufferevent_run_eventcb_(bufev, what, options);
bufferevent_decref_and_unlock_(bufev);
}
int int
bufferevent_init_common_(struct bufferevent_private *bufev_private, bufferevent_init_common_(struct bufferevent_private *bufev_private,
struct event_base *base, struct event_base *base,
@ -322,7 +352,6 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private,
event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS"); event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
return -1; return -1;
} }
if (options & BEV_OPT_DEFER_CALLBACKS) {
if (options & BEV_OPT_UNLOCK_CALLBACKS) if (options & BEV_OPT_UNLOCK_CALLBACKS)
event_deferred_cb_init_( event_deferred_cb_init_(
&bufev_private->deferred, &bufev_private->deferred,
@ -335,7 +364,6 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private,
event_base_get_npriorities(base) / 2, event_base_get_npriorities(base) / 2,
bufferevent_run_deferred_callbacks_locked, bufferevent_run_deferred_callbacks_locked,
bufev_private); bufev_private);
}
bufev_private->options = options; bufev_private->options = options;
@ -596,6 +624,27 @@ bufferevent_setwatermark(struct bufferevent *bufev, short events,
BEV_UNLOCK(bufev); BEV_UNLOCK(bufev);
} }
void
bufferevent_getwatermark(struct bufferevent *bufev, short events,
size_t *lowmark, size_t *highmark)
{
BEV_LOCK(bufev);
if (events == EV_WRITE) {
if (lowmark)
*lowmark = bufev->wm_write.low;
if (highmark)
*highmark = bufev->wm_write.high;
}
if (events == EV_READ) {
if (lowmark)
*lowmark = bufev->wm_read.low;
if (highmark)
*highmark = bufev->wm_read.high;
}
BEV_UNLOCK(bufev);
}
int int
bufferevent_flush(struct bufferevent *bufev, bufferevent_flush(struct bufferevent *bufev,
short iotype, short iotype,
@ -874,7 +923,7 @@ bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx)
struct bufferevent *bev = ctx; struct bufferevent *bev = ctx;
bufferevent_incref_and_lock_(bev); bufferevent_incref_and_lock_(bev);
bufferevent_disable(bev, EV_READ); bufferevent_disable(bev, EV_READ);
bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING); bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING, 0);
bufferevent_decref_and_unlock_(bev); bufferevent_decref_and_unlock_(bev);
} }
static void static void
@ -883,7 +932,7 @@ bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx)
struct bufferevent *bev = ctx; struct bufferevent *bev = ctx;
bufferevent_incref_and_lock_(bev); bufferevent_incref_and_lock_(bev);
bufferevent_disable(bev, EV_WRITE); bufferevent_disable(bev, EV_WRITE);
bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING); bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING, 0);
bufferevent_decref_and_unlock_(bev); bufferevent_decref_and_unlock_(bev);
} }

View File

@ -217,7 +217,7 @@ bev_async_consider_writing(struct bufferevent_async *beva)
&beva->write_overlapped)) { &beva->write_overlapped)) {
bufferevent_decref_(bev); bufferevent_decref_(bev);
beva->ok = 0; beva->ok = 0;
bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR); bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
} else { } else {
beva->write_in_progress = at_most; beva->write_in_progress = at_most;
bufferevent_decrement_write_buckets_(&beva->bev, at_most); bufferevent_decrement_write_buckets_(&beva->bev, at_most);
@ -270,7 +270,7 @@ bev_async_consider_reading(struct bufferevent_async *beva)
bufferevent_incref_(bev); bufferevent_incref_(bev);
if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) { if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
beva->ok = 0; beva->ok = 0;
bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR); bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
bufferevent_decref_(bev); bufferevent_decref_(bev);
} else { } else {
beva->read_in_progress = at_most; beva->read_in_progress = at_most;
@ -428,7 +428,7 @@ connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
bev_async_set_wsa_error(bev, eo); bev_async_set_wsa_error(bev, eo);
bufferevent_run_eventcb_(bev, bufferevent_run_eventcb_(bev,
ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR); ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
event_base_del_virtual_(bev->ev_base); event_base_del_virtual_(bev->ev_base);
@ -458,17 +458,16 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key,
if (bev_a->ok) { if (bev_a->ok) {
if (ok && nbytes) { if (ok && nbytes) {
BEV_RESET_GENERIC_READ_TIMEOUT(bev); BEV_RESET_GENERIC_READ_TIMEOUT(bev);
if (evbuffer_get_length(bev->input) >= bev->wm_read.low) bufferevent_trigger_nolock_(bev, EV_READ, 0);
bufferevent_run_readcb_(bev);
bev_async_consider_reading(bev_a); bev_async_consider_reading(bev_a);
} else if (!ok) { } else if (!ok) {
what |= BEV_EVENT_ERROR; what |= BEV_EVENT_ERROR;
bev_a->ok = 0; bev_a->ok = 0;
bufferevent_run_eventcb_(bev, what); bufferevent_run_eventcb_(bev, what, 0);
} else if (!nbytes) { } else if (!nbytes) {
what |= BEV_EVENT_EOF; what |= BEV_EVENT_EOF;
bev_a->ok = 0; bev_a->ok = 0;
bufferevent_run_eventcb_(bev, what); bufferevent_run_eventcb_(bev, what, 0);
} }
} }
@ -502,18 +501,16 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key,
if (bev_a->ok) { if (bev_a->ok) {
if (ok && nbytes) { if (ok && nbytes) {
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
if (evbuffer_get_length(bev->output) <= bufferevent_trigger_nolock_(bev, EV_WRITE, 0);
bev->wm_write.low)
bufferevent_run_writecb_(bev);
bev_async_consider_writing(bev_a); bev_async_consider_writing(bev_a);
} else if (!ok) { } else if (!ok) {
what |= BEV_EVENT_ERROR; what |= BEV_EVENT_ERROR;
bev_a->ok = 0; bev_a->ok = 0;
bufferevent_run_eventcb_(bev, what); bufferevent_run_eventcb_(bev, what, 0);
} else if (!nbytes) { } else if (!nbytes) {
what |= BEV_EVENT_EOF; what |= BEV_EVENT_EOF;
bev_a->ok = 0; bev_a->ok = 0;
bufferevent_run_eventcb_(bev, what); bufferevent_run_eventcb_(bev, what, 0);
} }
} }

View File

@ -376,10 +376,9 @@ be_filter_process_output(struct bufferevent_filtered *bevf,
/* Or if we have filled the underlying output buffer. */ /* Or if we have filled the underlying output buffer. */
!be_underlying_writebuf_full(bevf,state)); !be_underlying_writebuf_full(bevf,state));
if (processed && if (processed) {
evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
/* call the write callback.*/ /* call the write callback.*/
bufferevent_run_writecb_(bufev); bufferevent_trigger_nolock_(bufev, EV_WRITE, 0);
if (res == BEV_OK && if (res == BEV_OK &&
(bufev->enabled & EV_WRITE) && (bufev->enabled & EV_WRITE) &&
@ -442,9 +441,8 @@ be_filter_readcb(struct bufferevent *underlying, void *me_)
/* XXX This should be in process_input, not here. There are /* XXX This should be in process_input, not here. There are
* other places that can call process-input, and they should * other places that can call process-input, and they should
* force readcb calls as needed. */ * force readcb calls as needed. */
if (processed_any && if (processed_any)
evbuffer_get_length(bufev->input) >= bufev->wm_read.low) bufferevent_trigger_nolock_(bufev, EV_READ, 0);
bufferevent_run_readcb_(bufev);
bufferevent_decref_and_unlock_(bufev); bufferevent_decref_and_unlock_(bufev);
} }
@ -472,7 +470,7 @@ be_filter_eventcb(struct bufferevent *underlying, short what, void *me_)
bufferevent_incref_and_lock_(bev); bufferevent_incref_and_lock_(bev);
/* All we can really to is tell our own eventcb. */ /* All we can really to is tell our own eventcb. */
bufferevent_run_eventcb_(bev, what); bufferevent_run_eventcb_(bev, what, 0);
bufferevent_decref_and_unlock_(bev); bufferevent_decref_and_unlock_(bev);
} }

View File

@ -533,7 +533,7 @@ conn_closed(struct bufferevent_openssl *bev_ssl, int when, int errcode, int ret)
/* when is BEV_EVENT_{READING|WRITING} */ /* when is BEV_EVENT_{READING|WRITING} */
event = when | event; event = when | event;
bufferevent_run_eventcb_(&bev_ssl->bev.bev, event); bufferevent_run_eventcb_(&bev_ssl->bev.bev, event, 0);
} }
static void static void
@ -709,8 +709,7 @@ do_write(struct bufferevent_openssl *bev_ssl, int atmost)
if (bev_ssl->underlying) if (bev_ssl->underlying)
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
if (evbuffer_get_length(output) <= bev->wm_write.low) bufferevent_trigger_nolock_(bev, EV_WRITE, 0);
bufferevent_run_writecb_(bev);
} }
return result; return result;
} }
@ -824,11 +823,8 @@ consider_reading(struct bufferevent_openssl *bev_ssl)
if (all_result_flags & OP_MADE_PROGRESS) { if (all_result_flags & OP_MADE_PROGRESS) {
struct bufferevent *bev = &bev_ssl->bev.bev; struct bufferevent *bev = &bev_ssl->bev.bev;
struct evbuffer *input = bev->input;
if (evbuffer_get_length(input) >= bev->wm_read.low) { bufferevent_trigger_nolock_(bev, EV_READ, 0);
bufferevent_run_readcb_(bev);
}
} }
if (!bev_ssl->underlying) { if (!bev_ssl->underlying) {
@ -852,11 +848,8 @@ consider_writing(struct bufferevent_openssl *bev_ssl)
r = do_read(bev_ssl, 1024); /* XXXX 1024 is a hack */ r = do_read(bev_ssl, 1024); /* XXXX 1024 is a hack */
if (r & OP_MADE_PROGRESS) { if (r & OP_MADE_PROGRESS) {
struct bufferevent *bev = &bev_ssl->bev.bev; struct bufferevent *bev = &bev_ssl->bev.bev;
struct evbuffer *input = bev->input;
if (evbuffer_get_length(input) >= bev->wm_read.low) { bufferevent_trigger_nolock_(bev, EV_READ, 0);
bufferevent_run_readcb_(bev);
}
} }
if (r & (OP_ERR|OP_BLOCKED)) if (r & (OP_ERR|OP_BLOCKED))
break; break;
@ -928,7 +921,7 @@ be_openssl_eventcb(struct bufferevent *bev_base, short what, void *ctx)
eat it. */ eat it. */
} }
if (event) if (event)
bufferevent_run_eventcb_(&bev_ssl->bev.bev, event); bufferevent_run_eventcb_(&bev_ssl->bev.bev, event, 0);
} }
static void static void
@ -938,7 +931,7 @@ be_openssl_readeventcb(evutil_socket_t fd, short what, void *ptr)
bufferevent_incref_and_lock_(&bev_ssl->bev.bev); bufferevent_incref_and_lock_(&bev_ssl->bev.bev);
if (what == EV_TIMEOUT) { if (what == EV_TIMEOUT) {
bufferevent_run_eventcb_(&bev_ssl->bev.bev, bufferevent_run_eventcb_(&bev_ssl->bev.bev,
BEV_EVENT_TIMEOUT|BEV_EVENT_READING); BEV_EVENT_TIMEOUT|BEV_EVENT_READING, 0);
} else { } else {
consider_reading(bev_ssl); consider_reading(bev_ssl);
} }
@ -952,7 +945,7 @@ be_openssl_writeeventcb(evutil_socket_t fd, short what, void *ptr)
bufferevent_incref_and_lock_(&bev_ssl->bev.bev); bufferevent_incref_and_lock_(&bev_ssl->bev.bev);
if (what == EV_TIMEOUT) { if (what == EV_TIMEOUT) {
bufferevent_run_eventcb_(&bev_ssl->bev.bev, bufferevent_run_eventcb_(&bev_ssl->bev.bev,
BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING); BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING, 0);
} else { } else {
consider_writing(bev_ssl); consider_writing(bev_ssl);
} }
@ -1019,7 +1012,7 @@ do_handshake(struct bufferevent_openssl *bev_ssl)
/* Call do_read and do_write as needed */ /* Call do_read and do_write as needed */
bufferevent_enable(&bev_ssl->bev.bev, bev_ssl->bev.bev.enabled); bufferevent_enable(&bev_ssl->bev.bev, bev_ssl->bev.bev.enabled);
bufferevent_run_eventcb_(&bev_ssl->bev.bev, bufferevent_run_eventcb_(&bev_ssl->bev.bev,
BEV_EVENT_CONNECTED); BEV_EVENT_CONNECTED, 0);
return 1; return 1;
} else { } else {
int err = SSL_get_error(bev_ssl->ssl, r); int err = SSL_get_error(bev_ssl->ssl, r);
@ -1058,7 +1051,7 @@ be_openssl_handshakeeventcb(evutil_socket_t fd, short what, void *ptr)
bufferevent_incref_and_lock_(&bev_ssl->bev.bev); bufferevent_incref_and_lock_(&bev_ssl->bev.bev);
if (what & EV_TIMEOUT) { if (what & EV_TIMEOUT) {
bufferevent_run_eventcb_(&bev_ssl->bev.bev, BEV_EVENT_TIMEOUT); bufferevent_run_eventcb_(&bev_ssl->bev.bev, BEV_EVENT_TIMEOUT, 0);
} else } else
do_handshake(bev_ssl);/* XXX handle failure */ do_handshake(bev_ssl);/* XXX handle failure */
bufferevent_decref_and_unlock_(&bev_ssl->bev.bev); bufferevent_decref_and_unlock_(&bev_ssl->bev.bev);

View File

@ -151,7 +151,7 @@ static void
be_pair_transfer(struct bufferevent *src, struct bufferevent *dst, be_pair_transfer(struct bufferevent *src, struct bufferevent *dst,
int ignore_wm) int ignore_wm)
{ {
size_t src_size, dst_size; size_t dst_size;
size_t n; size_t n;
evbuffer_unfreeze(src->output, 1); evbuffer_unfreeze(src->output, 1);
@ -182,15 +182,8 @@ be_pair_transfer(struct bufferevent *src, struct bufferevent *dst,
BEV_DEL_GENERIC_WRITE_TIMEOUT(dst); BEV_DEL_GENERIC_WRITE_TIMEOUT(dst);
} }
src_size = evbuffer_get_length(src->output); bufferevent_trigger_nolock_(dst, EV_READ, 0);
dst_size = evbuffer_get_length(dst->input); bufferevent_trigger_nolock_(src, EV_WRITE, 0);
if (dst_size >= dst->wm_read.low) {
bufferevent_run_readcb_(dst);
}
if (src_size <= src->wm_write.low) {
bufferevent_run_writecb_(src);
}
done: done:
evbuffer_freeze(src->output, 1); evbuffer_freeze(src->output, 1);
evbuffer_freeze(dst->input, 0); evbuffer_freeze(dst->input, 0);
@ -299,7 +292,7 @@ be_pair_flush(struct bufferevent *bev, short iotype,
be_pair_transfer(bev, partner, 1); be_pair_transfer(bev, partner, 1);
if (mode == BEV_FINISHED) { if (mode == BEV_FINISHED) {
bufferevent_run_eventcb_(partner, iotype|BEV_EVENT_EOF); bufferevent_run_eventcb_(partner, iotype|BEV_EVENT_EOF, 0);
} }
decref_and_unlock(bev); decref_and_unlock(bev);
return 0; return 0;

View File

@ -184,8 +184,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
bufferevent_decrement_read_buckets_(bufev_p, res); bufferevent_decrement_read_buckets_(bufev_p, res);
/* Invoke the user callback - must always be called last */ /* Invoke the user callback - must always be called last */
if (evbuffer_get_length(input) >= bufev->wm_read.low) bufferevent_trigger_nolock_(bufev, EV_READ, 0);
bufferevent_run_readcb_(bufev);
goto done; goto done;
@ -194,7 +193,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
error: error:
bufferevent_disable(bufev, EV_READ); bufferevent_disable(bufev, EV_READ);
bufferevent_run_eventcb_(bufev, what); bufferevent_run_eventcb_(bufev, what, 0);
done: done:
bufferevent_decref_and_unlock_(bufev); bufferevent_decref_and_unlock_(bufev);
@ -236,7 +235,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
if (c < 0) { if (c < 0) {
event_del(&bufev->ev_write); event_del(&bufev->ev_write);
event_del(&bufev->ev_read); event_del(&bufev->ev_read);
bufferevent_run_eventcb_(bufev, BEV_EVENT_ERROR); bufferevent_run_eventcb_(bufev, BEV_EVENT_ERROR, 0);
goto done; goto done;
} else { } else {
connected = 1; connected = 1;
@ -245,12 +244,12 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
event_del(&bufev->ev_write); event_del(&bufev->ev_write);
bufferevent_async_set_connected_(bufev); bufferevent_async_set_connected_(bufev);
bufferevent_run_eventcb_(bufev, bufferevent_run_eventcb_(bufev,
BEV_EVENT_CONNECTED); BEV_EVENT_CONNECTED, 0);
goto done; goto done;
} }
#endif #endif
bufferevent_run_eventcb_(bufev, bufferevent_run_eventcb_(bufev,
BEV_EVENT_CONNECTED); BEV_EVENT_CONNECTED, 0);
if (!(bufev->enabled & EV_WRITE) || if (!(bufev->enabled & EV_WRITE) ||
bufev_p->write_suspended) { bufev_p->write_suspended) {
event_del(&bufev->ev_write); event_del(&bufev->ev_write);
@ -294,9 +293,8 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
* Invoke the user callback if our buffer is drained or below the * Invoke the user callback if our buffer is drained or below the
* low watermark. * low watermark.
*/ */
if ((res || !connected) && if (res || !connected) {
evbuffer_get_length(bufev->output) <= bufev->wm_write.low) { bufferevent_trigger_nolock_(bufev, EV_WRITE, 0);
bufferevent_run_writecb_(bufev);
} }
goto done; goto done;
@ -309,7 +307,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
error: error:
bufferevent_disable(bufev, EV_WRITE); bufferevent_disable(bufev, EV_WRITE);
bufferevent_run_eventcb_(bufev, what); bufferevent_run_eventcb_(bufev, what, 0);
done: done:
bufferevent_decref_and_unlock_(bufev); bufferevent_decref_and_unlock_(bufev);
@ -426,7 +424,7 @@ bufferevent_socket_connect(struct bufferevent *bev,
goto done; goto done;
freesock: freesock:
bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR); bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
if (ownfd) if (ownfd)
evutil_closesocket(fd); evutil_closesocket(fd);
/* do something about the error? */ /* do something about the error? */
@ -450,7 +448,7 @@ bufferevent_connect_getaddrinfo_cb(int result, struct evutil_addrinfo *ai,
if (result != 0) { if (result != 0) {
bev_p->dns_error = result; bev_p->dns_error = result;
bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR); bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
bufferevent_decref_and_unlock_(bev); bufferevent_decref_and_unlock_(bev);
if (ai) if (ai)
evutil_freeaddrinfo(ai); evutil_freeaddrinfo(ai);

View File

@ -138,6 +138,9 @@ typedef void (*bufferevent_data_cb)(struct bufferevent *bev, void *ctx);
The event callback is triggered if either an EOF condition or another The event callback is triggered if either an EOF condition or another
unrecoverable error was encountered. unrecoverable error was encountered.
For bufferevents with deferred callbacks, this is a bitwise OR of all errors
that have happened on the bufferevent since the last callback invocation.
@param bev the bufferevent for which the error condition was reached @param bev the bufferevent for which the error condition was reached
@param what a conjunction of flags: BEV_EVENT_READING or BEV_EVENT_WRITING @param what a conjunction of flags: BEV_EVENT_READING or BEV_EVENT_WRITING
to indicate if the error was encountered on the read or write path, to indicate if the error was encountered on the read or write path,
@ -504,6 +507,18 @@ int bufferevent_set_timeouts(struct bufferevent *bufev,
void bufferevent_setwatermark(struct bufferevent *bufev, short events, void bufferevent_setwatermark(struct bufferevent *bufev, short events,
size_t lowmark, size_t highmark); size_t lowmark, size_t highmark);
/**
Retrieves the watermarks for read or write events. Result is undefined if
events contains both EV_READ and EV_WRITE.
@param bufev the bufferevent to be examined
@param events EV_READ or EV_WRITE
@param lowmark receives the lower watermark if not NULL
@param highmark receives the high watermark if not NULL
*/
void bufferevent_getwatermark(struct bufferevent *bufev, short events,
size_t *lowmark, size_t *highmark);
/** /**
Acquire the lock on a bufferevent. Has no effect if locking was not Acquire the lock on a bufferevent. Has no effect if locking was not
enabled with BEV_OPT_THREADSAFE. enabled with BEV_OPT_THREADSAFE.
@ -543,6 +558,44 @@ int bufferevent_flush(struct bufferevent *bufev,
short iotype, short iotype,
enum bufferevent_flush_mode mode); enum bufferevent_flush_mode mode);
/**
Flags for bufferevent_trigger(_event) that modify when and how to trigger
the callback.
*/
enum bufferevent_trigger_options {
/** trigger the callback regardless of the watermarks */
BEV_TRIG_IGNORE_WATERMARKS = (1<<0),
/** defer even if the callbacks are not */
BEV_TRIG_DEFER_CALLBACKS = (1<<1),
};
/**
Triggers bufferevent data callbacks.
The function will honor watermarks unless options contain
BEV_TRIG_IGNORE_WATERMARKS. If the options contain BEV_OPT_DEFER_CALLBACKS,
the callbacks are deferred.
@param bufev the bufferevent object
@param iotype either EV_READ or EV_WRITE or both.
@param options
*/
void bufferevent_trigger(struct bufferevent *bufev, short iotype,
int options);
/**
Triggers the bufferevent event callback.
If the options contain BEV_OPT_DEFER_CALLBACKS, the callbacks are deferred.
@param bufev the bufferevent object
@param what the flags to pass onto the event callback
@param options
*/
void bufferevent_trigger_event(struct bufferevent *bufev, short what,
int options);
/** /**
@name Filtering support @name Filtering support

View File

@ -518,7 +518,7 @@ enum event_base_config_flag {
if you have any fds cloned by dup() or its variants. Doing so if you have any fds cloned by dup() or its variants. Doing so
will produce strange and hard-to-diagnose bugs. will produce strange and hard-to-diagnose bugs.
This flag can also be activated by settnig the This flag can also be activated by setting the
EVENT_EPOLL_USE_CHANGELIST environment variable. EVENT_EPOLL_USE_CHANGELIST environment variable.
This flag has no effect if you wind up using a backend other than This flag has no effect if you wind up using a backend other than

View File

@ -243,6 +243,7 @@ test_bufferevent_watermarks_impl(int use_pair)
{ {
struct bufferevent *bev1 = NULL, *bev2 = NULL; struct bufferevent *bev1 = NULL, *bev2 = NULL;
char buffer[65000]; char buffer[65000];
size_t low, high;
int i; int i;
test_ok = 0; test_ok = 0;
@ -262,16 +263,30 @@ test_bufferevent_watermarks_impl(int use_pair)
bufferevent_disable(bev1, EV_READ); bufferevent_disable(bev1, EV_READ);
bufferevent_enable(bev2, EV_READ); bufferevent_enable(bev2, EV_READ);
/* By default, low watermarks are set to 0 */
bufferevent_getwatermark(bev1, EV_READ, &low, NULL);
tt_int_op(low, ==, 0);
bufferevent_getwatermark(bev2, EV_WRITE, &low, NULL);
tt_int_op(low, ==, 0);
for (i = 0; i < (int)sizeof(buffer); i++) for (i = 0; i < (int)sizeof(buffer); i++)
buffer[i] = (char)i; buffer[i] = (char)i;
/* limit the reading on the receiving bufferevent */ /* limit the reading on the receiving bufferevent */
bufferevent_setwatermark(bev2, EV_READ, 10, 20); bufferevent_setwatermark(bev2, EV_READ, 10, 20);
bufferevent_getwatermark(bev2, EV_READ, &low, &high);
tt_int_op(low, ==, 10);
tt_int_op(high, ==, 20);
/* Tell the sending bufferevent not to notify us till it's down to /* Tell the sending bufferevent not to notify us till it's down to
100 bytes. */ 100 bytes. */
bufferevent_setwatermark(bev1, EV_WRITE, 100, 2000); bufferevent_setwatermark(bev1, EV_WRITE, 100, 2000);
bufferevent_getwatermark(bev1, EV_WRITE, &low, &high);
tt_int_op(low, ==, 100);
tt_int_op(high, ==, 2000);
bufferevent_write(bev1, buffer, sizeof(buffer)); bufferevent_write(bev1, buffer, sizeof(buffer));
event_dispatch(); event_dispatch();
@ -432,6 +447,7 @@ sender_errorcb(struct bufferevent *bev, short what, void *ctx)
} }
static int bufferevent_connect_test_flags = 0; static int bufferevent_connect_test_flags = 0;
static int bufferevent_trigger_test_flags = 0;
static int n_strings_read = 0; static int n_strings_read = 0;
static int n_reads_invoked = 0; static int n_reads_invoked = 0;
@ -797,6 +813,132 @@ end:
bufferevent_free(bev2); bufferevent_free(bev2);
} }
static void
trigger_failure_cb(evutil_socket_t fd, short what, void *ctx)
{
TT_FAIL(("The triggered callback did not fire or the machine is really slow (try increasing timeout)."));
}
static void
trigger_eventcb(struct bufferevent *bev, short what, void *ctx)
{
struct event_base *base = ctx;
if (what == ~0) {
TT_BLATHER(("Event successfully triggered."));
event_base_loopexit(base, NULL);
return;
}
reader_eventcb(bev, what, ctx);
}
static void
trigger_readcb_triggered(struct bufferevent *bev, void *ctx)
{
TT_BLATHER(("Read successfully triggered."));
n_reads_invoked++;
bufferevent_trigger_event(bev, ~0, bufferevent_trigger_test_flags);
}
static void
trigger_readcb(struct bufferevent *bev, void *ctx)
{
struct timeval timeout = { 30, 0 };
struct event_base *base = ctx;
size_t low, high, len;
int expected_reads;
TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev)));
expected_reads = ++n_reads_invoked;
bufferevent_setcb(bev, trigger_readcb_triggered, NULL, trigger_eventcb, ctx);
bufferevent_getwatermark(bev, EV_READ, &low, &high);
len = evbuffer_get_length(bufferevent_get_input(bev));
bufferevent_setwatermark(bev, EV_READ, len + 1, 0);
bufferevent_trigger(bev, EV_READ, bufferevent_trigger_test_flags);
/* no callback expected */
tt_int_op(n_reads_invoked, ==, expected_reads);
if ((bufferevent_trigger_test_flags & BEV_TRIG_DEFER_CALLBACKS) ||
(bufferevent_connect_test_flags & BEV_OPT_DEFER_CALLBACKS)) {
/* will be deferred */
} else {
expected_reads++;
}
event_base_once(base, -1, EV_TIMEOUT, trigger_failure_cb, NULL, &timeout);
bufferevent_trigger(bev, EV_READ,
bufferevent_trigger_test_flags | BEV_TRIG_IGNORE_WATERMARKS);
tt_int_op(n_reads_invoked, ==, expected_reads);
bufferevent_setwatermark(bev, EV_READ, low, high);
end:
;
}
static void
test_bufferevent_trigger(void *arg)
{
struct basic_test_data *data = arg;
struct evconnlistener *lev=NULL;
struct bufferevent *bev=NULL;
struct sockaddr_in localhost;
struct sockaddr_storage ss;
struct sockaddr *sa;
ev_socklen_t slen;
int be_flags=BEV_OPT_CLOSE_ON_FREE;
int trig_flags=0;
if (strstr((char*)data->setup_data, "defer")) {
be_flags |= BEV_OPT_DEFER_CALLBACKS;
}
bufferevent_connect_test_flags = be_flags;
if (strstr((char*)data->setup_data, "postpone")) {
trig_flags |= BEV_TRIG_DEFER_CALLBACKS;
}
bufferevent_trigger_test_flags = trig_flags;
memset(&localhost, 0, sizeof(localhost));
localhost.sin_port = 0; /* pick-a-port */
localhost.sin_addr.s_addr = htonl(0x7f000001L);
localhost.sin_family = AF_INET;
sa = (struct sockaddr *)&localhost;
lev = evconnlistener_new_bind(data->base, listen_cb, data->base,
LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE,
16, sa, sizeof(localhost));
tt_assert(lev);
sa = (struct sockaddr *)&ss;
slen = sizeof(ss);
if (regress_get_listener_addr(lev, sa, &slen) < 0) {
tt_abort_perror("getsockname");
}
tt_assert(!evconnlistener_enable(lev));
bev = bufferevent_socket_new(data->base, -1, be_flags);
tt_assert(bev);
bufferevent_setcb(bev, trigger_readcb, NULL, trigger_eventcb, data->base);
bufferevent_enable(bev, EV_READ);
tt_want(!bufferevent_socket_connect(bev, sa, sizeof(localhost)));
event_base_dispatch(data->base);
tt_int_op(n_reads_invoked, ==, 2);
end:
if (lev)
evconnlistener_free(lev);
if (bev)
bufferevent_free(bev);
}
struct testcase_t bufferevent_testcases[] = { struct testcase_t bufferevent_testcases[] = {
LEGACY(bufferevent, TT_ISOLATED), LEGACY(bufferevent, TT_ISOLATED),
@ -827,6 +969,16 @@ struct testcase_t bufferevent_testcases[] = {
TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter" }, TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter" },
{ "bufferevent_timeout_filter_pair", test_bufferevent_timeouts, { "bufferevent_timeout_filter_pair", test_bufferevent_timeouts,
TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter pair" }, TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter pair" },
{ "bufferevent_trigger", test_bufferevent_trigger, TT_FORK|TT_NEED_BASE,
&basic_setup, (void*)"" },
{ "bufferevent_trigger_defer", test_bufferevent_trigger,
TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" },
{ "bufferevent_trigger_postpone", test_bufferevent_trigger,
TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
(void*)"postpone" },
{ "bufferevent_trigger_defer_postpone", test_bufferevent_trigger,
TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
(void*)"defer postpone" },
#ifdef EVENT__HAVE_LIBZ #ifdef EVENT__HAVE_LIBZ
LEGACY(bufferevent_zlib, TT_ISOLATED), LEGACY(bufferevent_zlib, TT_ISOLATED),
#else #else