From be7bf2c76845b5f0ade066f7b4faf11d750296e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Kuzn=C3=ADk?= Date: Tue, 3 Dec 2013 22:36:45 +0000 Subject: [PATCH 1/6] Fix a typo --- include/event2/event.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/event2/event.h b/include/event2/event.h index 986c009c..c71f8c77 100644 --- a/include/event2/event.h +++ b/include/event2/event.h @@ -518,7 +518,7 @@ enum event_base_config_flag { if you have any fds cloned by dup() or its variants. Doing so will produce strange and hard-to-diagnose bugs. - This flag can also be activated by settnig the + This flag can also be activated by setting the EVENT_EPOLL_USE_CHANGELIST environment variable. This flag has no effect if you wind up using a backend other than From 13a9a020e1361b34c600b7e1f0191c33a32a05c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Kuzn=C3=ADk?= Date: Tue, 3 Dec 2013 22:50:51 +0000 Subject: [PATCH 2/6] Document deferred eventcb behaviour --- include/event2/bufferevent.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/include/event2/bufferevent.h b/include/event2/bufferevent.h index 513d3285..f3cf1d6e 100644 --- a/include/event2/bufferevent.h +++ b/include/event2/bufferevent.h @@ -138,6 +138,9 @@ typedef void (*bufferevent_data_cb)(struct bufferevent *bev, void *ctx); The event callback is triggered if either an EOF condition or another unrecoverable error was encountered. + For bufferevents with deferred callbacks, this is a bitwise OR of all errors + that have happened on the bufferevent since the last callback invocation. + @param bev the bufferevent for which the error condition was reached @param what a conjunction of flags: BEV_EVENT_READING or BEV_EVENT_WRITING to indicate if the error was encountered on the read or write path, From 4ce242bd0087ed3f6d36c64d0d15094d8a6fc9fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Kuzn=C3=ADk?= Date: Tue, 3 Dec 2013 22:35:53 +0000 Subject: [PATCH 3/6] Add watermark introspection --- bufferevent.c | 21 +++++++++++++++++++++ include/event2/bufferevent.h | 12 ++++++++++++ test/regress_bufferevent.c | 15 +++++++++++++++ 3 files changed, 48 insertions(+) diff --git a/bufferevent.c b/bufferevent.c index 3cd1ba62..96ce109f 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -596,6 +596,27 @@ bufferevent_setwatermark(struct bufferevent *bufev, short events, BEV_UNLOCK(bufev); } +void +bufferevent_getwatermark(struct bufferevent *bufev, short events, + size_t *lowmark, size_t *highmark) +{ + BEV_LOCK(bufev); + if (events & EV_WRITE) { + if (lowmark) + *lowmark = bufev->wm_write.low; + if (highmark) + *highmark = bufev->wm_write.high; + } + + if (events & EV_READ) { + if (lowmark) + *lowmark = bufev->wm_read.low; + if (highmark) + *highmark = bufev->wm_read.high; + } + BEV_UNLOCK(bufev); +} + int bufferevent_flush(struct bufferevent *bufev, short iotype, diff --git a/include/event2/bufferevent.h b/include/event2/bufferevent.h index f3cf1d6e..efe0617b 100644 --- a/include/event2/bufferevent.h +++ b/include/event2/bufferevent.h @@ -507,6 +507,18 @@ int bufferevent_set_timeouts(struct bufferevent *bufev, void bufferevent_setwatermark(struct bufferevent *bufev, short events, size_t lowmark, size_t highmark); +/** + Retrieves the watermarks for read or write events. Result is undefined if + events contains both EV_READ and EV_WRITE. + + @param bufev the bufferevent to be examined + @param events EV_READ or EV_WRITE + @param lowmark receives the lower watermark if not NULL + @param highmark receives the high watermark if not NULL +*/ +void bufferevent_getwatermark(struct bufferevent *bufev, short events, + size_t *lowmark, size_t *highmark); + /** Acquire the lock on a bufferevent. Has no effect if locking was not enabled with BEV_OPT_THREADSAFE. diff --git a/test/regress_bufferevent.c b/test/regress_bufferevent.c index 1be16216..89c405bf 100644 --- a/test/regress_bufferevent.c +++ b/test/regress_bufferevent.c @@ -243,6 +243,7 @@ test_bufferevent_watermarks_impl(int use_pair) { struct bufferevent *bev1 = NULL, *bev2 = NULL; char buffer[65000]; + size_t low, high; int i; test_ok = 0; @@ -262,16 +263,30 @@ test_bufferevent_watermarks_impl(int use_pair) bufferevent_disable(bev1, EV_READ); bufferevent_enable(bev2, EV_READ); + /* By default, low watermarks are set to 0 */ + bufferevent_getwatermark(bev1, EV_READ, &low, NULL); + tt_int_op(low, ==, 0); + bufferevent_getwatermark(bev2, EV_WRITE, &low, NULL); + tt_int_op(low, ==, 0); + for (i = 0; i < (int)sizeof(buffer); i++) buffer[i] = (char)i; /* limit the reading on the receiving bufferevent */ bufferevent_setwatermark(bev2, EV_READ, 10, 20); + bufferevent_getwatermark(bev2, EV_READ, &low, &high); + tt_int_op(low, ==, 10); + tt_int_op(high, ==, 20); + /* Tell the sending bufferevent not to notify us till it's down to 100 bytes. */ bufferevent_setwatermark(bev1, EV_WRITE, 100, 2000); + bufferevent_getwatermark(bev1, EV_WRITE, &low, &high); + tt_int_op(low, ==, 100); + tt_int_op(high, ==, 2000); + bufferevent_write(bev1, buffer, sizeof(buffer)); event_dispatch(); From 61ee18b8b1d2ac0025955b3f949531c712fb7527 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Kuzn=C3=ADk?= Date: Tue, 3 Dec 2013 22:49:57 +0000 Subject: [PATCH 4/6] Add an option to trigger bufferevent I/O callbacks --- bufferevent-internal.h | 10 ++- bufferevent.c | 55 ++++++++++----- bufferevent_async.c | 7 +- bufferevent_filter.c | 10 ++- bufferevent_openssl.c | 13 +--- bufferevent_pair.c | 13 +--- bufferevent_sock.c | 8 +-- include/event2/bufferevent.h | 26 +++++++ test/regress_bufferevent.c | 127 +++++++++++++++++++++++++++++++++++ 9 files changed, 213 insertions(+), 56 deletions(-) diff --git a/bufferevent-internal.h b/bufferevent-internal.h index 0c4df871..0fa690b9 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -341,14 +341,20 @@ int bufferevent_decref_and_unlock_(struct bufferevent *bufev); /** Internal: If callbacks are deferred and we have a read callback, schedule * a readcb. Otherwise just run the readcb. */ -void bufferevent_run_readcb_(struct bufferevent *bufev); +void bufferevent_run_readcb_(struct bufferevent *bufev, int options); /** Internal: If callbacks are deferred and we have a write callback, schedule * a writecb. Otherwise just run the writecb. */ -void bufferevent_run_writecb_(struct bufferevent *bufev); +void bufferevent_run_writecb_(struct bufferevent *bufev, int options); /** Internal: If callbacks are deferred and we have an eventcb, schedule * it to run with events "what". Otherwise just run the eventcb. */ void bufferevent_run_eventcb_(struct bufferevent *bufev, short what); +/** Internal: Run or schedule (if deferred or options contain + * BEV_TRIG_DEFER_CALLBACKS) I/O callbacks specified in iotype. + * Must already hold the bufev lock. */ +void bufferevent_trigger_nolock_(struct bufferevent *bufev, short iotype, int options); + + /** Internal: Add the event 'ev' with timeout tv, unless tv is set to 0, in * which case add ev with no timeout. */ int bufferevent_add_event_(struct event *ev, const struct timeval *tv); diff --git a/bufferevent.c b/bufferevent.c index 96ce109f..fd95941b 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -219,14 +219,15 @@ bufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg void -bufferevent_run_readcb_(struct bufferevent *bufev) +bufferevent_run_readcb_(struct bufferevent *bufev, int options) { /* Requires that we hold the lock and a reference */ struct bufferevent_private *p = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); if (bufev->readcb == NULL) return; - if (p->options & BEV_OPT_DEFER_CALLBACKS) { + if ((p->options & BEV_OPT_DEFER_CALLBACKS) || + (options & BEV_TRIG_DEFER_CALLBACKS)) { p->readcb_pending = 1; SCHEDULE_DEFERRED(p); } else { @@ -235,14 +236,15 @@ bufferevent_run_readcb_(struct bufferevent *bufev) } void -bufferevent_run_writecb_(struct bufferevent *bufev) +bufferevent_run_writecb_(struct bufferevent *bufev, int options) { /* Requires that we hold the lock and a reference */ struct bufferevent_private *p = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); if (bufev->writecb == NULL) return; - if (p->options & BEV_OPT_DEFER_CALLBACKS) { + if ((p->options & BEV_OPT_DEFER_CALLBACKS) || + (options & BEV_TRIG_DEFER_CALLBACKS)) { p->writecb_pending = 1; SCHEDULE_DEFERRED(p); } else { @@ -250,6 +252,25 @@ bufferevent_run_writecb_(struct bufferevent *bufev) } } +void +bufferevent_trigger_nolock_(struct bufferevent *bufev, short iotype, int options) +{ + if ((iotype & EV_READ) && ((options & BEV_TRIG_IGNORE_WATERMARKS) || + evbuffer_get_length(bufev->input) >= bufev->wm_read.low)) + bufferevent_run_readcb_(bufev, options); + if ((iotype & EV_WRITE) && ((options & BEV_TRIG_IGNORE_WATERMARKS) || + evbuffer_get_length(bufev->output) <= bufev->wm_write.low)) + bufferevent_run_writecb_(bufev, options); +} + +void +bufferevent_trigger(struct bufferevent *bufev, short iotype, int options) +{ + bufferevent_incref_and_lock_(bufev); + bufferevent_trigger_nolock_(bufev, iotype, options); + bufferevent_decref_and_unlock_(bufev); +} + void bufferevent_run_eventcb_(struct bufferevent *bufev, short what) { @@ -322,20 +343,18 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private, event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS"); return -1; } - if (options & BEV_OPT_DEFER_CALLBACKS) { - if (options & BEV_OPT_UNLOCK_CALLBACKS) - event_deferred_cb_init_( - &bufev_private->deferred, - event_base_get_npriorities(base) / 2, - bufferevent_run_deferred_callbacks_unlocked, - bufev_private); - else - event_deferred_cb_init_( - &bufev_private->deferred, - event_base_get_npriorities(base) / 2, - bufferevent_run_deferred_callbacks_locked, - bufev_private); - } + if (options & BEV_OPT_UNLOCK_CALLBACKS) + event_deferred_cb_init_( + &bufev_private->deferred, + event_base_get_npriorities(base) / 2, + bufferevent_run_deferred_callbacks_unlocked, + bufev_private); + else + event_deferred_cb_init_( + &bufev_private->deferred, + event_base_get_npriorities(base) / 2, + bufferevent_run_deferred_callbacks_locked, + bufev_private); bufev_private->options = options; diff --git a/bufferevent_async.c b/bufferevent_async.c index 0152fd16..4e686479 100644 --- a/bufferevent_async.c +++ b/bufferevent_async.c @@ -458,8 +458,7 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key, if (bev_a->ok) { if (ok && nbytes) { BEV_RESET_GENERIC_READ_TIMEOUT(bev); - if (evbuffer_get_length(bev->input) >= bev->wm_read.low) - bufferevent_run_readcb_(bev); + bufferevent_trigger_nolock_(bev, EV_READ, 0); bev_async_consider_reading(bev_a); } else if (!ok) { what |= BEV_EVENT_ERROR; @@ -502,9 +501,7 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key, if (bev_a->ok) { if (ok && nbytes) { BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); - if (evbuffer_get_length(bev->output) <= - bev->wm_write.low) - bufferevent_run_writecb_(bev); + bufferevent_trigger_nolock_(bev, EV_WRITE, 0); bev_async_consider_writing(bev_a); } else if (!ok) { what |= BEV_EVENT_ERROR; diff --git a/bufferevent_filter.c b/bufferevent_filter.c index cc02230c..cb1c0097 100644 --- a/bufferevent_filter.c +++ b/bufferevent_filter.c @@ -376,10 +376,9 @@ be_filter_process_output(struct bufferevent_filtered *bevf, /* Or if we have filled the underlying output buffer. */ !be_underlying_writebuf_full(bevf,state)); - if (processed && - evbuffer_get_length(bufev->output) <= bufev->wm_write.low) { + if (processed) { /* call the write callback.*/ - bufferevent_run_writecb_(bufev); + bufferevent_trigger_nolock_(bufev, EV_WRITE, 0); if (res == BEV_OK && (bufev->enabled & EV_WRITE) && @@ -442,9 +441,8 @@ be_filter_readcb(struct bufferevent *underlying, void *me_) /* XXX This should be in process_input, not here. There are * other places that can call process-input, and they should * force readcb calls as needed. */ - if (processed_any && - evbuffer_get_length(bufev->input) >= bufev->wm_read.low) - bufferevent_run_readcb_(bufev); + if (processed_any) + bufferevent_trigger_nolock_(bufev, EV_READ, 0); bufferevent_decref_and_unlock_(bufev); } diff --git a/bufferevent_openssl.c b/bufferevent_openssl.c index 1ce124f9..ed9e4a3d 100644 --- a/bufferevent_openssl.c +++ b/bufferevent_openssl.c @@ -709,8 +709,7 @@ do_write(struct bufferevent_openssl *bev_ssl, int atmost) if (bev_ssl->underlying) BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); - if (evbuffer_get_length(output) <= bev->wm_write.low) - bufferevent_run_writecb_(bev); + bufferevent_trigger_nolock_(bev, EV_WRITE, 0); } return result; } @@ -824,11 +823,8 @@ consider_reading(struct bufferevent_openssl *bev_ssl) if (all_result_flags & OP_MADE_PROGRESS) { struct bufferevent *bev = &bev_ssl->bev.bev; - struct evbuffer *input = bev->input; - if (evbuffer_get_length(input) >= bev->wm_read.low) { - bufferevent_run_readcb_(bev); - } + bufferevent_trigger_nolock_(bev, EV_READ, 0); } if (!bev_ssl->underlying) { @@ -852,11 +848,8 @@ consider_writing(struct bufferevent_openssl *bev_ssl) r = do_read(bev_ssl, 1024); /* XXXX 1024 is a hack */ if (r & OP_MADE_PROGRESS) { struct bufferevent *bev = &bev_ssl->bev.bev; - struct evbuffer *input = bev->input; - if (evbuffer_get_length(input) >= bev->wm_read.low) { - bufferevent_run_readcb_(bev); - } + bufferevent_trigger_nolock_(bev, EV_READ, 0); } if (r & (OP_ERR|OP_BLOCKED)) break; diff --git a/bufferevent_pair.c b/bufferevent_pair.c index 4d467260..eb3da3e3 100644 --- a/bufferevent_pair.c +++ b/bufferevent_pair.c @@ -151,7 +151,7 @@ static void be_pair_transfer(struct bufferevent *src, struct bufferevent *dst, int ignore_wm) { - size_t src_size, dst_size; + size_t dst_size; size_t n; evbuffer_unfreeze(src->output, 1); @@ -182,15 +182,8 @@ be_pair_transfer(struct bufferevent *src, struct bufferevent *dst, BEV_DEL_GENERIC_WRITE_TIMEOUT(dst); } - src_size = evbuffer_get_length(src->output); - dst_size = evbuffer_get_length(dst->input); - - if (dst_size >= dst->wm_read.low) { - bufferevent_run_readcb_(dst); - } - if (src_size <= src->wm_write.low) { - bufferevent_run_writecb_(src); - } + bufferevent_trigger_nolock_(dst, EV_READ, 0); + bufferevent_trigger_nolock_(src, EV_WRITE, 0); done: evbuffer_freeze(src->output, 1); evbuffer_freeze(dst->input, 0); diff --git a/bufferevent_sock.c b/bufferevent_sock.c index 5ce4953b..82983ed7 100644 --- a/bufferevent_sock.c +++ b/bufferevent_sock.c @@ -184,8 +184,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg) bufferevent_decrement_read_buckets_(bufev_p, res); /* Invoke the user callback - must always be called last */ - if (evbuffer_get_length(input) >= bufev->wm_read.low) - bufferevent_run_readcb_(bufev); + bufferevent_trigger_nolock_(bufev, EV_READ, 0); goto done; @@ -294,9 +293,8 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) * Invoke the user callback if our buffer is drained or below the * low watermark. */ - if ((res || !connected) && - evbuffer_get_length(bufev->output) <= bufev->wm_write.low) { - bufferevent_run_writecb_(bufev); + if (res || !connected) { + bufferevent_trigger_nolock_(bufev, EV_WRITE, 0); } goto done; diff --git a/include/event2/bufferevent.h b/include/event2/bufferevent.h index efe0617b..af6f7cde 100644 --- a/include/event2/bufferevent.h +++ b/include/event2/bufferevent.h @@ -558,6 +558,32 @@ int bufferevent_flush(struct bufferevent *bufev, short iotype, enum bufferevent_flush_mode mode); +/** + Flags for bufferevent_trigger(_event) that modify when and how to trigger + the callback. +*/ +enum bufferevent_trigger_options { + /** trigger the callback regardless of the watermarks */ + BEV_TRIG_IGNORE_WATERMARKS = (1<<0), + + /** defer even if the callbacks are not */ + BEV_TRIG_DEFER_CALLBACKS = (1<<1), +}; + +/** + Triggers bufferevent data callbacks. + + The function will honor watermarks unless options contain + BEV_TRIG_IGNORE_WATERMARKS. If the options contain BEV_OPT_DEFER_CALLBACKS, + the callbacks are deferred. + + @param bufev the bufferevent object + @param iotype either EV_READ or EV_WRITE or both. + @param options + */ +void bufferevent_trigger(struct bufferevent *bufev, short iotype, + int options); + /** @name Filtering support diff --git a/test/regress_bufferevent.c b/test/regress_bufferevent.c index 89c405bf..874d6018 100644 --- a/test/regress_bufferevent.c +++ b/test/regress_bufferevent.c @@ -447,6 +447,7 @@ sender_errorcb(struct bufferevent *bev, short what, void *ctx) } static int bufferevent_connect_test_flags = 0; +static int bufferevent_trigger_test_flags = 0; static int n_strings_read = 0; static int n_reads_invoked = 0; @@ -812,6 +813,122 @@ end: bufferevent_free(bev2); } +static void +trigger_failure_cb(evutil_socket_t fd, short what, void *ctx) +{ + TT_FAIL(("The triggered callback did not fire or the machine is really slow (try increasing timeout).")); +} + +static void +trigger_readcb_triggered(struct bufferevent *bev, void *ctx) +{ + struct event_base *base = ctx; + + TT_BLATHER(("Read successfully triggered.")); + n_reads_invoked++; + event_base_loopexit(base, NULL); +} + +static void +trigger_readcb(struct bufferevent *bev, void *ctx) +{ + struct timeval timeout = { 30, 0 }; + struct event_base *base = ctx; + size_t low, high, len; + int expected_reads; + + TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev))); + expected_reads = ++n_reads_invoked; + + bufferevent_setcb(bev, trigger_readcb_triggered, NULL, reader_eventcb, ctx); + + bufferevent_getwatermark(bev, EV_READ, &low, &high); + len = evbuffer_get_length(bufferevent_get_input(bev)); + + bufferevent_setwatermark(bev, EV_READ, len + 1, 0); + bufferevent_trigger(bev, EV_READ, bufferevent_trigger_test_flags); + /* no callback expected */ + tt_int_op(n_reads_invoked, ==, expected_reads); + + if ((bufferevent_trigger_test_flags & BEV_TRIG_DEFER_CALLBACKS) || + (bufferevent_connect_test_flags & BEV_OPT_DEFER_CALLBACKS)) { + /* will be deferred */ + } else { + expected_reads++; + } + + event_base_once(base, -1, EV_TIMEOUT, trigger_failure_cb, NULL, &timeout); + + bufferevent_trigger(bev, EV_READ, + bufferevent_trigger_test_flags | BEV_TRIG_IGNORE_WATERMARKS); + tt_int_op(n_reads_invoked, ==, expected_reads); + + bufferevent_setwatermark(bev, EV_READ, low, high); +end: + ; +} + +static void +test_bufferevent_trigger(void *arg) +{ + struct basic_test_data *data = arg; + struct evconnlistener *lev=NULL; + struct bufferevent *bev=NULL; + struct sockaddr_in localhost; + struct sockaddr_storage ss; + struct sockaddr *sa; + ev_socklen_t slen; + + int be_flags=BEV_OPT_CLOSE_ON_FREE; + int trig_flags=0; + + if (strstr((char*)data->setup_data, "defer")) { + be_flags |= BEV_OPT_DEFER_CALLBACKS; + } + bufferevent_connect_test_flags = be_flags; + + if (strstr((char*)data->setup_data, "postpone")) { + trig_flags |= BEV_TRIG_DEFER_CALLBACKS; + } + bufferevent_trigger_test_flags = trig_flags; + + memset(&localhost, 0, sizeof(localhost)); + + localhost.sin_port = 0; /* pick-a-port */ + localhost.sin_addr.s_addr = htonl(0x7f000001L); + localhost.sin_family = AF_INET; + sa = (struct sockaddr *)&localhost; + lev = evconnlistener_new_bind(data->base, listen_cb, data->base, + LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, + 16, sa, sizeof(localhost)); + tt_assert(lev); + + sa = (struct sockaddr *)&ss; + slen = sizeof(ss); + if (regress_get_listener_addr(lev, sa, &slen) < 0) { + tt_abort_perror("getsockname"); + } + + tt_assert(!evconnlistener_enable(lev)); + bev = bufferevent_socket_new(data->base, -1, be_flags); + tt_assert(bev); + bufferevent_setcb(bev, trigger_readcb, NULL, reader_eventcb, data->base); + + bufferevent_enable(bev, EV_READ); + + tt_want(!bufferevent_socket_connect(bev, sa, sizeof(localhost))); + + event_base_dispatch(data->base); + + tt_int_op(n_reads_invoked, ==, 2); +end: + if (lev) + evconnlistener_free(lev); + + if (bev) + bufferevent_free(bev); +} + struct testcase_t bufferevent_testcases[] = { LEGACY(bufferevent, TT_ISOLATED), @@ -842,6 +959,16 @@ struct testcase_t bufferevent_testcases[] = { TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter" }, { "bufferevent_timeout_filter_pair", test_bufferevent_timeouts, TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter pair" }, + { "bufferevent_trigger", test_bufferevent_trigger, TT_FORK|TT_NEED_BASE, + &basic_setup, (void*)"" }, + { "bufferevent_trigger_defer", test_bufferevent_trigger, + TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" }, + { "bufferevent_trigger_postpone", test_bufferevent_trigger, + TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, + (void*)"postpone" }, + { "bufferevent_trigger_defer_postpone", test_bufferevent_trigger, + TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, + (void*)"defer postpone" }, #ifdef EVENT__HAVE_LIBZ LEGACY(bufferevent_zlib, TT_ISOLATED), #else From a7384c782486c293e9cb8c22916eb9ee842a8c1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Kuzn=C3=ADk?= Date: Tue, 3 Dec 2013 23:01:54 +0000 Subject: [PATCH 5/6] Add an option to trigger bufferevent event callbacks --- bufferevent-internal.h | 5 +++-- bufferevent.c | 17 +++++++++++++---- bufferevent_async.c | 14 +++++++------- bufferevent_filter.c | 2 +- bufferevent_openssl.c | 12 ++++++------ bufferevent_pair.c | 2 +- bufferevent_sock.c | 14 +++++++------- include/event2/bufferevent.h | 12 ++++++++++++ test/regress_bufferevent.c | 18 ++++++++++++++---- 9 files changed, 64 insertions(+), 32 deletions(-) diff --git a/bufferevent-internal.h b/bufferevent-internal.h index 0fa690b9..12ae142b 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -346,8 +346,9 @@ void bufferevent_run_readcb_(struct bufferevent *bufev, int options); * a writecb. Otherwise just run the writecb. */ void bufferevent_run_writecb_(struct bufferevent *bufev, int options); /** Internal: If callbacks are deferred and we have an eventcb, schedule - * it to run with events "what". Otherwise just run the eventcb. */ -void bufferevent_run_eventcb_(struct bufferevent *bufev, short what); + * it to run with events "what". Otherwise just run the eventcb. + * See bufferevent_trigger_event for meaning of "options". */ +void bufferevent_run_eventcb_(struct bufferevent *bufev, short what, int options); /** Internal: Run or schedule (if deferred or options contain * BEV_TRIG_DEFER_CALLBACKS) I/O callbacks specified in iotype. diff --git a/bufferevent.c b/bufferevent.c index fd95941b..5f424d7e 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -272,14 +272,15 @@ bufferevent_trigger(struct bufferevent *bufev, short iotype, int options) } void -bufferevent_run_eventcb_(struct bufferevent *bufev, short what) +bufferevent_run_eventcb_(struct bufferevent *bufev, short what, int options) { /* Requires that we hold the lock and a reference */ struct bufferevent_private *p = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); if (bufev->errorcb == NULL) return; - if (p->options & BEV_OPT_DEFER_CALLBACKS) { + if ((p->options & BEV_OPT_DEFER_CALLBACKS) || + (options & BEV_TRIG_DEFER_CALLBACKS)) { p->eventcb_pending |= what; p->errno_pending = EVUTIL_SOCKET_ERROR(); SCHEDULE_DEFERRED(p); @@ -288,6 +289,14 @@ bufferevent_run_eventcb_(struct bufferevent *bufev, short what) } } +void +bufferevent_trigger_event(struct bufferevent *bufev, short what, int options) +{ + bufferevent_incref_and_lock_(bufev); + bufferevent_run_eventcb_(bufev, what, options); + bufferevent_decref_and_unlock_(bufev); +} + int bufferevent_init_common_(struct bufferevent_private *bufev_private, struct event_base *base, @@ -914,7 +923,7 @@ bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx) struct bufferevent *bev = ctx; bufferevent_incref_and_lock_(bev); bufferevent_disable(bev, EV_READ); - bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING); + bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING, 0); bufferevent_decref_and_unlock_(bev); } static void @@ -923,7 +932,7 @@ bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx) struct bufferevent *bev = ctx; bufferevent_incref_and_lock_(bev); bufferevent_disable(bev, EV_WRITE); - bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING); + bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING, 0); bufferevent_decref_and_unlock_(bev); } diff --git a/bufferevent_async.c b/bufferevent_async.c index 4e686479..137ad247 100644 --- a/bufferevent_async.c +++ b/bufferevent_async.c @@ -217,7 +217,7 @@ bev_async_consider_writing(struct bufferevent_async *beva) &beva->write_overlapped)) { bufferevent_decref_(bev); beva->ok = 0; - bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR); + bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); } else { beva->write_in_progress = at_most; bufferevent_decrement_write_buckets_(&beva->bev, at_most); @@ -270,7 +270,7 @@ bev_async_consider_reading(struct bufferevent_async *beva) bufferevent_incref_(bev); if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) { beva->ok = 0; - bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR); + bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); bufferevent_decref_(bev); } else { beva->read_in_progress = at_most; @@ -428,7 +428,7 @@ connect_complete(struct event_overlapped *eo, ev_uintptr_t key, bev_async_set_wsa_error(bev, eo); bufferevent_run_eventcb_(bev, - ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR); + ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0); event_base_del_virtual_(bev->ev_base); @@ -463,11 +463,11 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key, } else if (!ok) { what |= BEV_EVENT_ERROR; bev_a->ok = 0; - bufferevent_run_eventcb_(bev, what); + bufferevent_run_eventcb_(bev, what, 0); } else if (!nbytes) { what |= BEV_EVENT_EOF; bev_a->ok = 0; - bufferevent_run_eventcb_(bev, what); + bufferevent_run_eventcb_(bev, what, 0); } } @@ -506,11 +506,11 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key, } else if (!ok) { what |= BEV_EVENT_ERROR; bev_a->ok = 0; - bufferevent_run_eventcb_(bev, what); + bufferevent_run_eventcb_(bev, what, 0); } else if (!nbytes) { what |= BEV_EVENT_EOF; bev_a->ok = 0; - bufferevent_run_eventcb_(bev, what); + bufferevent_run_eventcb_(bev, what, 0); } } diff --git a/bufferevent_filter.c b/bufferevent_filter.c index cb1c0097..af71ebee 100644 --- a/bufferevent_filter.c +++ b/bufferevent_filter.c @@ -470,7 +470,7 @@ be_filter_eventcb(struct bufferevent *underlying, short what, void *me_) bufferevent_incref_and_lock_(bev); /* All we can really to is tell our own eventcb. */ - bufferevent_run_eventcb_(bev, what); + bufferevent_run_eventcb_(bev, what, 0); bufferevent_decref_and_unlock_(bev); } diff --git a/bufferevent_openssl.c b/bufferevent_openssl.c index ed9e4a3d..3ce491ef 100644 --- a/bufferevent_openssl.c +++ b/bufferevent_openssl.c @@ -533,7 +533,7 @@ conn_closed(struct bufferevent_openssl *bev_ssl, int when, int errcode, int ret) /* when is BEV_EVENT_{READING|WRITING} */ event = when | event; - bufferevent_run_eventcb_(&bev_ssl->bev.bev, event); + bufferevent_run_eventcb_(&bev_ssl->bev.bev, event, 0); } static void @@ -921,7 +921,7 @@ be_openssl_eventcb(struct bufferevent *bev_base, short what, void *ctx) eat it. */ } if (event) - bufferevent_run_eventcb_(&bev_ssl->bev.bev, event); + bufferevent_run_eventcb_(&bev_ssl->bev.bev, event, 0); } static void @@ -931,7 +931,7 @@ be_openssl_readeventcb(evutil_socket_t fd, short what, void *ptr) bufferevent_incref_and_lock_(&bev_ssl->bev.bev); if (what == EV_TIMEOUT) { bufferevent_run_eventcb_(&bev_ssl->bev.bev, - BEV_EVENT_TIMEOUT|BEV_EVENT_READING); + BEV_EVENT_TIMEOUT|BEV_EVENT_READING, 0); } else { consider_reading(bev_ssl); } @@ -945,7 +945,7 @@ be_openssl_writeeventcb(evutil_socket_t fd, short what, void *ptr) bufferevent_incref_and_lock_(&bev_ssl->bev.bev); if (what == EV_TIMEOUT) { bufferevent_run_eventcb_(&bev_ssl->bev.bev, - BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING); + BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING, 0); } else { consider_writing(bev_ssl); } @@ -1012,7 +1012,7 @@ do_handshake(struct bufferevent_openssl *bev_ssl) /* Call do_read and do_write as needed */ bufferevent_enable(&bev_ssl->bev.bev, bev_ssl->bev.bev.enabled); bufferevent_run_eventcb_(&bev_ssl->bev.bev, - BEV_EVENT_CONNECTED); + BEV_EVENT_CONNECTED, 0); return 1; } else { int err = SSL_get_error(bev_ssl->ssl, r); @@ -1051,7 +1051,7 @@ be_openssl_handshakeeventcb(evutil_socket_t fd, short what, void *ptr) bufferevent_incref_and_lock_(&bev_ssl->bev.bev); if (what & EV_TIMEOUT) { - bufferevent_run_eventcb_(&bev_ssl->bev.bev, BEV_EVENT_TIMEOUT); + bufferevent_run_eventcb_(&bev_ssl->bev.bev, BEV_EVENT_TIMEOUT, 0); } else do_handshake(bev_ssl);/* XXX handle failure */ bufferevent_decref_and_unlock_(&bev_ssl->bev.bev); diff --git a/bufferevent_pair.c b/bufferevent_pair.c index eb3da3e3..5e2e2c41 100644 --- a/bufferevent_pair.c +++ b/bufferevent_pair.c @@ -292,7 +292,7 @@ be_pair_flush(struct bufferevent *bev, short iotype, be_pair_transfer(bev, partner, 1); if (mode == BEV_FINISHED) { - bufferevent_run_eventcb_(partner, iotype|BEV_EVENT_EOF); + bufferevent_run_eventcb_(partner, iotype|BEV_EVENT_EOF, 0); } decref_and_unlock(bev); return 0; diff --git a/bufferevent_sock.c b/bufferevent_sock.c index 82983ed7..49ebc0be 100644 --- a/bufferevent_sock.c +++ b/bufferevent_sock.c @@ -193,7 +193,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg) error: bufferevent_disable(bufev, EV_READ); - bufferevent_run_eventcb_(bufev, what); + bufferevent_run_eventcb_(bufev, what, 0); done: bufferevent_decref_and_unlock_(bufev); @@ -235,7 +235,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) if (c < 0) { event_del(&bufev->ev_write); event_del(&bufev->ev_read); - bufferevent_run_eventcb_(bufev, BEV_EVENT_ERROR); + bufferevent_run_eventcb_(bufev, BEV_EVENT_ERROR, 0); goto done; } else { connected = 1; @@ -244,12 +244,12 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) event_del(&bufev->ev_write); bufferevent_async_set_connected_(bufev); bufferevent_run_eventcb_(bufev, - BEV_EVENT_CONNECTED); + BEV_EVENT_CONNECTED, 0); goto done; } #endif bufferevent_run_eventcb_(bufev, - BEV_EVENT_CONNECTED); + BEV_EVENT_CONNECTED, 0); if (!(bufev->enabled & EV_WRITE) || bufev_p->write_suspended) { event_del(&bufev->ev_write); @@ -307,7 +307,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) error: bufferevent_disable(bufev, EV_WRITE); - bufferevent_run_eventcb_(bufev, what); + bufferevent_run_eventcb_(bufev, what, 0); done: bufferevent_decref_and_unlock_(bufev); @@ -424,7 +424,7 @@ bufferevent_socket_connect(struct bufferevent *bev, goto done; freesock: - bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR); + bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); if (ownfd) evutil_closesocket(fd); /* do something about the error? */ @@ -448,7 +448,7 @@ bufferevent_connect_getaddrinfo_cb(int result, struct evutil_addrinfo *ai, if (result != 0) { bev_p->dns_error = result; - bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR); + bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); bufferevent_decref_and_unlock_(bev); if (ai) evutil_freeaddrinfo(ai); diff --git a/include/event2/bufferevent.h b/include/event2/bufferevent.h index af6f7cde..aef408b8 100644 --- a/include/event2/bufferevent.h +++ b/include/event2/bufferevent.h @@ -584,6 +584,18 @@ enum bufferevent_trigger_options { void bufferevent_trigger(struct bufferevent *bufev, short iotype, int options); +/** + Triggers the bufferevent event callback. + + If the options contain BEV_OPT_DEFER_CALLBACKS, the callbacks are deferred. + + @param bufev the bufferevent object + @param what the flags to pass onto the event callback + @param options + */ +void bufferevent_trigger_event(struct bufferevent *bufev, short what, + int options); + /** @name Filtering support diff --git a/test/regress_bufferevent.c b/test/regress_bufferevent.c index 874d6018..a6a27752 100644 --- a/test/regress_bufferevent.c +++ b/test/regress_bufferevent.c @@ -820,13 +820,23 @@ trigger_failure_cb(evutil_socket_t fd, short what, void *ctx) } static void -trigger_readcb_triggered(struct bufferevent *bev, void *ctx) +trigger_eventcb(struct bufferevent *bev, short what, void *ctx) { struct event_base *base = ctx; + if (what == ~0) { + TT_BLATHER(("Event successfully triggered.")); + event_base_loopexit(base, NULL); + return; + } + reader_eventcb(bev, what, ctx); +} +static void +trigger_readcb_triggered(struct bufferevent *bev, void *ctx) +{ TT_BLATHER(("Read successfully triggered.")); n_reads_invoked++; - event_base_loopexit(base, NULL); + bufferevent_trigger_event(bev, ~0, bufferevent_trigger_test_flags); } static void @@ -840,7 +850,7 @@ trigger_readcb(struct bufferevent *bev, void *ctx) TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev))); expected_reads = ++n_reads_invoked; - bufferevent_setcb(bev, trigger_readcb_triggered, NULL, reader_eventcb, ctx); + bufferevent_setcb(bev, trigger_readcb_triggered, NULL, trigger_eventcb, ctx); bufferevent_getwatermark(bev, EV_READ, &low, &high); len = evbuffer_get_length(bufferevent_get_input(bev)); @@ -912,7 +922,7 @@ test_bufferevent_trigger(void *arg) tt_assert(!evconnlistener_enable(lev)); bev = bufferevent_socket_new(data->base, -1, be_flags); tt_assert(bev); - bufferevent_setcb(bev, trigger_readcb, NULL, reader_eventcb, data->base); + bufferevent_setcb(bev, trigger_readcb, NULL, trigger_eventcb, data->base); bufferevent_enable(bev, EV_READ); From bd41947175f9156d0cd631a0f7a81161a39c66fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Kuzn=C3=ADk?= Date: Thu, 5 Dec 2013 22:45:45 +0000 Subject: [PATCH 6/6] Clarifications in response to merge req. comments --- bufferevent-internal.h | 7 ++++--- bufferevent.c | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/bufferevent-internal.h b/bufferevent-internal.h index 12ae142b..70b25cd9 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -340,10 +340,10 @@ int bufferevent_decref_(struct bufferevent *bufev); int bufferevent_decref_and_unlock_(struct bufferevent *bufev); /** Internal: If callbacks are deferred and we have a read callback, schedule - * a readcb. Otherwise just run the readcb. */ + * a readcb. Otherwise just run the readcb. Ignores watermarks. */ void bufferevent_run_readcb_(struct bufferevent *bufev, int options); /** Internal: If callbacks are deferred and we have a write callback, schedule - * a writecb. Otherwise just run the writecb. */ + * a writecb. Otherwise just run the writecb. Ignores watermarks. */ void bufferevent_run_writecb_(struct bufferevent *bufev, int options); /** Internal: If callbacks are deferred and we have an eventcb, schedule * it to run with events "what". Otherwise just run the eventcb. @@ -352,7 +352,8 @@ void bufferevent_run_eventcb_(struct bufferevent *bufev, short what, int options /** Internal: Run or schedule (if deferred or options contain * BEV_TRIG_DEFER_CALLBACKS) I/O callbacks specified in iotype. - * Must already hold the bufev lock. */ + * Must already hold the bufev lock. Honors watermarks unless + * BEV_TRIG_IGNORE_WATERMARKS is in options. */ void bufferevent_trigger_nolock_(struct bufferevent *bufev, short iotype, int options); diff --git a/bufferevent.c b/bufferevent.c index 5f424d7e..d8c84da4 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -629,14 +629,14 @@ bufferevent_getwatermark(struct bufferevent *bufev, short events, size_t *lowmark, size_t *highmark) { BEV_LOCK(bufev); - if (events & EV_WRITE) { + if (events == EV_WRITE) { if (lowmark) *lowmark = bufev->wm_write.low; if (highmark) *highmark = bufev->wm_write.high; } - if (events & EV_READ) { + if (events == EV_READ) { if (lowmark) *lowmark = bufev->wm_read.low; if (highmark)