From a98a512bc1dc85a87cd00fa7682aeccaeea2859c Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Fri, 17 Apr 2009 23:12:34 +0000 Subject: [PATCH] Add a generic way for any bufferevent to make its callback deferred svn:r1197 --- ChangeLog | 5 ++- bufferevent-internal.h | 11 +++++ bufferevent.c | 87 ++++++++++++++++++++++++++++++++++++ bufferevent_filter.c | 6 +-- bufferevent_pair.c | 36 +++------------ bufferevent_sock.c | 9 ++-- include/event2/bufferevent.h | 3 ++ 7 files changed, 118 insertions(+), 39 deletions(-) diff --git a/ChangeLog b/ChangeLog index d94080fb..de6e8f9a 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,4 +1,7 @@ -Changes in current version: +Changes in 2.0.2-alpha: + o Add a new flag to bufferevents to make all callbacks automatically deferred. + +Changes in 2.0.1-alpha: o free minheap on event_base_free(); from Christopher Layne o debug cleanups in signal.c; from Christopher Layne o provide event_base_new() that does not set the current_base global diff --git a/bufferevent-internal.h b/bufferevent-internal.h index 1fb78716..57748ebc 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -44,8 +44,15 @@ struct bufferevent_private { /** If set, read is suspended until evbuffer some. */ unsigned read_suspended : 1; + /** If set, we should free the lock when we free the bufferevent. */ unsigned own_lock : 1; + unsigned readcb_pending : 1; + unsigned writecb_pending : 1; + short errorcb_pending; + int errno_pending; + struct deferred_cb deferred; + enum bufferevent_options options; int refcnt; @@ -113,6 +120,10 @@ int bufferevent_enable_locking(struct bufferevent *bufev, void *lock); void bufferevent_incref(struct bufferevent *bufev); void _bufferevent_decref_and_unlock(struct bufferevent *bufev); +void _bufferevent_run_readcb(struct bufferevent *bufev); +void _bufferevent_run_writecb(struct bufferevent *bufev); +void _bufferevent_run_errorcb(struct bufferevent *bufev, short what); + #define BEV_UPCAST(b) EVUTIL_UPCAST((b), struct bufferevent_private, bev) #define BEV_LOCK(b) do { \ diff --git a/bufferevent.c b/bufferevent.c index 7b6c94f2..7f3c240a 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -46,6 +46,7 @@ #ifdef WIN32 #include #endif +#include #include "event2/util.h" #include "event2/bufferevent.h" @@ -112,6 +113,87 @@ bufferevent_inbuf_wm_cb(struct evbuffer *buf, } } +static void +bufferevent_run_deferred_callbacks(struct deferred_cb *_, void *arg) +{ + struct bufferevent_private *bufev_private = arg; + struct bufferevent *bufev = &bufev_private->bev; + + BEV_LOCK(bufev); + if (bufev_private->readcb_pending && bufev->readcb) { + bufev_private->readcb_pending = 0; + bufev->readcb(bufev, bufev->cbarg); + } + if (bufev_private->writecb_pending && bufev->writecb) { + bufev_private->writecb_pending = 0; + bufev->writecb(bufev, bufev->cbarg); + } + if (bufev_private->errorcb_pending && bufev->errorcb) { + short what = bufev_private->errorcb_pending; + int err = bufev_private->errno_pending; + bufev_private->errorcb_pending = 0; + bufev_private->errno_pending = 0; + EVUTIL_SET_SOCKET_ERROR(err); + bufev->errorcb(bufev, what, bufev->cbarg); + } + _bufferevent_decref_and_unlock(bufev); +} + +void +_bufferevent_run_readcb(struct bufferevent *bufev) +{ + /* Requires lock. */ + struct bufferevent_private *p = + EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + if (p->options & BEV_OPT_DEFER_CALLBACKS) { + p->readcb_pending = 1; + if (!p->deferred.queued) { + bufferevent_incref(bufev); + event_deferred_cb_schedule( + bufev->ev_base, &p->deferred); + } + } else { + bufev->readcb(bufev, bufev->cbarg); + } +} + +void +_bufferevent_run_writecb(struct bufferevent *bufev) +{ + /* Requires lock. */ + struct bufferevent_private *p = + EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + if (p->options & BEV_OPT_DEFER_CALLBACKS) { + p->writecb_pending = 1; + if (!p->deferred.queued) { + bufferevent_incref(bufev); + event_deferred_cb_schedule( + bufev->ev_base, &p->deferred); + } + } else { + bufev->writecb(bufev, bufev->cbarg); + } +} + +void +_bufferevent_run_errorcb(struct bufferevent *bufev, short what) +{ + /* Requires lock. */ + struct bufferevent_private *p = + EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + if (p->options & BEV_OPT_DEFER_CALLBACKS) { + p->errorcb_pending |= what; + p->errno_pending = EVUTIL_SOCKET_ERROR(); + if (!p->deferred.queued) { + bufferevent_incref(bufev); + event_deferred_cb_schedule( + bufev->ev_base, &p->deferred); + } + } else { + bufev->errorcb(bufev, what, bufev->cbarg); + } +} + int bufferevent_init_common(struct bufferevent_private *bufev_private, struct event_base *base, @@ -152,6 +234,11 @@ bufferevent_init_common(struct bufferevent_private *bufev_private, } } #endif + if (options & BEV_OPT_DEFER_CALLBACKS) { + event_deferred_cb_init(&bufev_private->deferred, + bufferevent_run_deferred_callbacks, + bufev_private); + } bufev_private->options = options; diff --git a/bufferevent_filter.c b/bufferevent_filter.c index 9208300e..5cf83f42 100644 --- a/bufferevent_filter.c +++ b/bufferevent_filter.c @@ -343,7 +343,7 @@ be_filter_process_output(struct bufferevent_filtered *bevf, if (processed && bufev->writecb && evbuffer_get_length(bufev->output) <= bufev->wm_write.low) { /* call the write callback.*/ - (*bufev->writecb)(bufev, bufev->cbarg); + _bufferevent_run_writecb(bufev); if (res == BEV_OK && (bufev->enabled & EV_WRITE) && @@ -396,7 +396,7 @@ be_filter_readcb(struct bufferevent *underlying, void *_me) if (processed_any && evbuffer_get_length(bufev->input) >= bufev->wm_read.low && bufev->readcb != NULL) - (*bufev->readcb)(bufev, bufev->cbarg); + _bufferevent_run_readcb(bufev); } /* Called when the underlying socket has drained enough that we can write to @@ -419,7 +419,7 @@ be_filter_errorcb(struct bufferevent *underlying, short what, void *_me) /* All we can really to is tell our own errorcb. */ if (bev->errorcb) - bev->errorcb(bev, what, bev->cbarg); + _bufferevent_run_errorcb(bev, what); } static int diff --git a/bufferevent_pair.c b/bufferevent_pair.c index 6080421c..068a80a8 100644 --- a/bufferevent_pair.c +++ b/bufferevent_pair.c @@ -44,8 +44,6 @@ struct bufferevent_pair { struct bufferevent_private bev; struct bufferevent_pair *partner; - struct deferred_cb deferred_write_cb; - struct deferred_cb deferred_read_cb; }; @@ -69,25 +67,6 @@ upcast(struct bufferevent *bev) static void be_pair_outbuf_cb(struct evbuffer *, const struct evbuffer_cb_info *, void *); -static void -run_callback(struct deferred_cb *cb, void *arg) -{ - struct bufferevent_pair *bufev = arg; - struct bufferevent *bev = downcast(bufev); - - BEV_LOCK(bev); - if (cb == &bufev->deferred_read_cb) { - if (bev->readcb) { - bev->readcb(bev, bev->cbarg); - } - } else { - if (bev->writecb) { - bev->writecb(bev, bev->cbarg); - } - } - BEV_UNLOCK(bev); -} - static struct bufferevent_pair * bufferevent_pair_elt_new(struct event_base *base, enum bufferevent_options options) @@ -106,8 +85,6 @@ bufferevent_pair_elt_new(struct event_base *base, bufferevent_free(downcast(bufev)); return NULL; } - event_deferred_cb_init(&bufev->deferred_read_cb, run_callback, bufev); - event_deferred_cb_init(&bufev->deferred_write_cb, run_callback, bufev); return bufev; } @@ -117,7 +94,10 @@ bufferevent_pair_new(struct event_base *base, enum bufferevent_options options, struct bufferevent *pair[2]) { struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL; - enum bufferevent_options tmp_options = options & ~BEV_OPT_THREADSAFE; + enum bufferevent_options tmp_options; + + options |= BEV_OPT_DEFER_CALLBACKS; + tmp_options = options & ~BEV_OPT_THREADSAFE; bufev1 = bufferevent_pair_elt_new(base, options); if (!bufev1) @@ -175,12 +155,10 @@ be_pair_transfer(struct bufferevent *src, struct bufferevent *dst, dst_size = evbuffer_get_length(dst->input); if (dst_size >= dst->wm_read.low && dst->readcb) { - event_deferred_cb_schedule(dst->ev_base, - &(upcast(dst)->deferred_read_cb)); + _bufferevent_run_readcb(dst); } if (src_size <= src->wm_write.low && src->writecb) { - event_deferred_cb_schedule(src->ev_base, - &(upcast(src)->deferred_write_cb)); + _bufferevent_run_writecb(src); } done: evbuffer_freeze(src->output, 1); @@ -247,8 +225,6 @@ be_pair_destruct(struct bufferevent *bev) bev_p->partner->partner = NULL; bev_p->partner = NULL; } - event_deferred_cb_cancel(bev->ev_base, &bev_p->deferred_write_cb); - event_deferred_cb_cancel(bev->ev_base, &bev_p->deferred_read_cb); } static void diff --git a/bufferevent_sock.c b/bufferevent_sock.c index 614e0e56..634da83a 100644 --- a/bufferevent_sock.c +++ b/bufferevent_sock.c @@ -156,7 +156,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg) /* Invoke the user callback - must always be called last */ if (evbuffer_get_length(input) >= bufev->wm_read.low && bufev->readcb != NULL) - (*bufev->readcb)(bufev, bufev->cbarg); + _bufferevent_run_readcb(bufev); return; @@ -165,8 +165,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg) error: event_del(&bufev->ev_read); - (*bufev->errorcb)(bufev, what, bufev->cbarg); - + _bufferevent_run_errorcb(bufev, what); } static void @@ -207,7 +206,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) */ if (bufev->writecb != NULL && evbuffer_get_length(bufev->output) <= bufev->wm_write.low) - (*bufev->writecb)(bufev, bufev->cbarg); + _bufferevent_run_writecb(bufev); return; @@ -218,7 +217,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) error: event_del(&bufev->ev_write); - (*bufev->errorcb)(bufev, what, bufev->cbarg); + _bufferevent_run_errorcb(bufev, what); } struct bufferevent * diff --git a/include/event2/bufferevent.h b/include/event2/bufferevent.h index 6f218465..94cebe27 100644 --- a/include/event2/bufferevent.h +++ b/include/event2/bufferevent.h @@ -124,6 +124,9 @@ enum bufferevent_options { /** If set, and threading is enabled, operations on this bufferevent * are protected by a lock */ BEV_OPT_THREADSAFE = (1<<1), + + /** If set, callbacks are run deferred in the event loop. */ + BEV_OPT_DEFER_CALLBACKS = (1<<2), }; /**