From 03afa209de96d67f1a4b935460aeabe63f5ef24f Mon Sep 17 00:00:00 2001 From: Christopher Davis Date: Mon, 16 Aug 2010 01:23:57 -0700 Subject: [PATCH 1/6] IOCP-related evbuffer fixes. - Prevent evbuffer_{add,prepend}_buffer from moving read-pinned chains. - Fix evbuffer_drain to handle read-pinned chains better. - Raise the limit on WSABUFs from two to MAX_WSABUFS for overlapped reads. --- buffer.c | 122 ++++++++++++++++++++++++++++++++++++++------ buffer_iocp.c | 51 +++++++++--------- evbuffer-internal.h | 3 +- 3 files changed, 135 insertions(+), 41 deletions(-) diff --git a/buffer.c b/buffer.c index 098afec3..8d0038c7 100644 --- a/buffer.c +++ b/buffer.c @@ -575,7 +575,8 @@ evbuffer_reserve_space(struct evbuffer *buf, ev_ssize_t size, } else { if (_evbuffer_expand_fast(buf, size, n_vecs)<0) goto done; - n = _evbuffer_read_setup_vecs(buf, size, vec, n_vecs, &chainp, 0); + n = _evbuffer_read_setup_vecs(buf, size, vec, n_vecs, + &chainp, 0); } done: @@ -670,6 +671,12 @@ done: return result; } +static inline int +HAS_PINNED_R(struct evbuffer *buf) +{ + return (buf->last && CHAIN_PINNED_R(buf->last)); +} + static inline void ZERO_CHAIN(struct evbuffer *dst) { @@ -680,6 +687,71 @@ ZERO_CHAIN(struct evbuffer *dst) dst->total_len = 0; } +/* Prepares the contents of src to be moved to another buffer by removing + * read-pinned chains. The first pinned chain is saved in first, and the + * last in last. If src has no read-pinned chains, first and last are set + * to NULL. */ +static int +PRESERVE_PINNED(struct evbuffer *src, struct evbuffer_chain **first, + struct evbuffer_chain **last) +{ + struct evbuffer_chain *chain, **pinned; + + ASSERT_EVBUFFER_LOCKED(src); + + if (!HAS_PINNED_R(src)) { + *first = *last = NULL; + return 0; + } + + pinned = src->last_with_datap; + if (!CHAIN_PINNED_R(*pinned)) + pinned = &(*pinned)->next; + EVUTIL_ASSERT(CHAIN_PINNED_R(*pinned)); + chain = *first = *pinned; + *last = src->last; + + /* If there's data in the first pinned chain, we need to allocate + * a new chain and copy the data over. */ + if (chain->off) { + struct evbuffer_chain *tmp; + + EVUTIL_ASSERT(pinned == src->last_with_datap); + tmp = evbuffer_chain_new(chain->off); + if (!tmp) + return -1; + memcpy(tmp->buffer, chain->buffer + chain->misalign, + chain->off); + tmp->off = chain->off; + *src->last_with_datap = tmp; + src->last = tmp; + chain->misalign += chain->off; + chain->off = 0; + } else { + src->last = *src->last_with_datap; + *pinned = NULL; + } + + return 0; +} + +static inline void +RESTORE_PINNED(struct evbuffer *src, struct evbuffer_chain *pinned, + struct evbuffer_chain *last) +{ + ASSERT_EVBUFFER_LOCKED(src); + + if (!pinned) { + ZERO_CHAIN(src); + return; + } + + src->first = pinned; + src->last = last; + src->last_with_datap = &src->first; + src->total_len = 0; +} + static inline void COPY_CHAIN(struct evbuffer *dst, struct evbuffer *src) { @@ -729,6 +801,7 @@ PREPEND_CHAIN(struct evbuffer *dst, struct evbuffer *src) int evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf) { + struct evbuffer_chain *pinned, *last; size_t in_total_len, out_total_len; int result = 0; @@ -744,6 +817,11 @@ evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf) goto done; } + if (PRESERVE_PINNED(inbuf, &pinned, &last) < 0) { + result = -1; + goto done; + } + if (out_total_len == 0) { /* There might be an empty chain at the start of outbuf; free * it. */ @@ -753,8 +831,8 @@ evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf) APPEND_CHAIN(outbuf, inbuf); } - /* remove everything from inbuf */ - ZERO_CHAIN(inbuf); + RESTORE_PINNED(inbuf, pinned, last); + inbuf->n_del_for_cb += in_total_len; outbuf->n_add_for_cb += in_total_len; @@ -769,6 +847,7 @@ done: int evbuffer_prepend_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf) { + struct evbuffer_chain *pinned, *last; size_t in_total_len, out_total_len; int result = 0; @@ -785,6 +864,11 @@ evbuffer_prepend_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf) goto done; } + if (PRESERVE_PINNED(inbuf, &pinned, &last) < 0) { + result = -1; + goto done; + } + if (out_total_len == 0) { /* There might be an empty chain at the start of outbuf; free * it. */ @@ -794,8 +878,8 @@ evbuffer_prepend_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf) PREPEND_CHAIN(outbuf, inbuf); } - /* remove everything from inbuf */ - ZERO_CHAIN(inbuf); + RESTORE_PINNED(inbuf, pinned, last); + inbuf->n_del_for_cb += in_total_len; outbuf->n_add_for_cb += in_total_len; @@ -810,7 +894,7 @@ int evbuffer_drain(struct evbuffer *buf, size_t len) { struct evbuffer_chain *chain, *next; - size_t old_len; + size_t remaining, old_len; int result = 0; EVBUFFER_LOCK(buf); @@ -824,12 +908,10 @@ evbuffer_drain(struct evbuffer *buf, size_t len) goto done; } - - if (len >= old_len && !(buf->last && CHAIN_PINNED_R(buf->last))) { + if (len >= old_len && !HAS_PINNED_R(buf)) { len = old_len; for (chain = buf->first; chain != NULL; chain = next) { next = chain->next; - evbuffer_chain_free(chain); } @@ -839,10 +921,12 @@ evbuffer_drain(struct evbuffer *buf, size_t len) len = old_len; buf->total_len -= len; - - for (chain = buf->first; len >= chain->off; chain = next) { + remaining = len; + for (chain = buf->first; + remaining >= chain->off; + chain = next) { next = chain->next; - len -= chain->off; + remaining -= chain->off; if (chain == *buf->last_with_datap) { buf->last_with_datap = &buf->first; @@ -850,14 +934,20 @@ evbuffer_drain(struct evbuffer *buf, size_t len) if (&chain->next == buf->last_with_datap) buf->last_with_datap = &buf->first; - if (len == 0 && CHAIN_PINNED_R(chain)) + if (CHAIN_PINNED_R(chain)) { + EVUTIL_ASSERT(remaining == 0); + chain->misalign += chain->off; + chain->off = 0; break; - evbuffer_chain_free(chain); + } else + evbuffer_chain_free(chain); } buf->first = chain; - chain->misalign += len; - chain->off -= len; + if (chain) { + chain->misalign += remaining; + chain->off -= remaining; + } } buf->n_del_for_cb += len; diff --git a/buffer_iocp.c b/buffer_iocp.c index 8255599c..e211e980 100644 --- a/buffer_iocp.c +++ b/buffer_iocp.c @@ -82,12 +82,13 @@ static void pin_release(struct evbuffer_overlapped *eo, unsigned flag) { int i; - struct evbuffer_chain *chain = eo->first_pinned; + struct evbuffer_chain *next, *chain = eo->first_pinned; for (i = 0; i < eo->n_buffers; ++i) { EVUTIL_ASSERT(chain); + next = chain->next; _evbuffer_chain_unpin(chain, flag); - chain = chain->next; + chain = next; } } @@ -95,8 +96,9 @@ void evbuffer_commit_read(struct evbuffer *evbuf, ev_ssize_t nBytes) { struct evbuffer_overlapped *buf = upcast_evbuffer(evbuf); - struct evbuffer_iovec iov[2]; - int n_vec; + struct evbuffer_chain **chainp; + size_t remaining, len; + unsigned i; EVBUFFER_LOCK(evbuf); EVUTIL_ASSERT(buf->read_in_progress && !buf->write_in_progress); @@ -104,24 +106,27 @@ evbuffer_commit_read(struct evbuffer *evbuf, ev_ssize_t nBytes) evbuffer_unfreeze(evbuf, 0); - iov[0].iov_base = buf->buffers[0].buf; - if ((size_t)nBytes <= buf->buffers[0].len) { - iov[0].iov_len = nBytes; - n_vec = 1; - } else { - iov[0].iov_len = buf->buffers[0].len; - iov[1].iov_base = buf->buffers[1].buf; - iov[1].iov_len = nBytes - iov[0].iov_len; - n_vec = 2; + chainp = evbuf->last_with_datap; + if (!((*chainp)->flags & EVBUFFER_MEM_PINNED_R)) + chainp = &(*chainp)->next; + remaining = nBytes; + for (i = 0; remaining > 0 && i < buf->n_buffers; ++i) { + EVUTIL_ASSERT(*chainp); + len = buf->buffers[i].len; + if (remaining < len) + len = remaining; + (*chainp)->off += len; + evbuf->last_with_datap = chainp; + remaining -= len; + chainp = &(*chainp)->next; } - if (evbuffer_commit_space(evbuf, iov, n_vec) < 0) - EVUTIL_ASSERT(0); /* XXXX fail nicer. */ - pin_release(buf, EVBUFFER_MEM_PINNED_R); buf->read_in_progress = 0; + evbuf->total_len += nBytes; + _evbuffer_decref_and_unlock(evbuf); } @@ -184,7 +189,7 @@ evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most, } evbuffer_freeze(buf, 1); - buf_o->first_pinned = 0; + buf_o->first_pinned = NULL; buf_o->n_buffers = 0; memset(buf_o->buffers, 0, sizeof(buf_o->buffers)); @@ -246,19 +251,16 @@ evbuffer_launch_read(struct evbuffer *buf, size_t at_most, if (buf->freeze_end || buf_o->read_in_progress) goto done; - buf_o->first_pinned = 0; + buf_o->first_pinned = NULL; buf_o->n_buffers = 0; memset(buf_o->buffers, 0, sizeof(buf_o->buffers)); - if (_evbuffer_expand_fast(buf, at_most, 2) == -1) + if (_evbuffer_expand_fast(buf, at_most, MAX_WSABUFS) == -1) goto done; evbuffer_freeze(buf, 0); - /* XXX This and evbuffer_read_setup_vecs() should say MAX_WSABUFS, - * not "2". But commit_read() above can't handle more than two - * buffers yet. */ nvecs = _evbuffer_read_setup_vecs(buf, at_most, - vecs, 2, &chainp, 1); + vecs, MAX_WSABUFS, &chainp, 1); for (i=0;ibuffers[i], @@ -266,7 +268,8 @@ evbuffer_launch_read(struct evbuffer *buf, size_t at_most, } buf_o->n_buffers = nvecs; - buf_o->first_pinned = chain= *chainp; + buf_o->first_pinned = chain = *chainp; + npin=0; for ( ; chain; chain = chain->next) { _evbuffer_chain_pin(chain, EVBUFFER_MEM_PINNED_R); diff --git a/evbuffer-internal.h b/evbuffer-internal.h index 90fd1b55..719e85dd 100644 --- a/evbuffer-internal.h +++ b/evbuffer-internal.h @@ -256,7 +256,8 @@ int _evbuffer_expand_fast(struct evbuffer *, size_t, int); * Returns the number of vecs used. */ int _evbuffer_read_setup_vecs(struct evbuffer *buf, ev_ssize_t howmuch, - struct evbuffer_iovec *vecs, int n_vecs, struct evbuffer_chain ***chainp, int exact); + struct evbuffer_iovec *vecs, int n_vecs, struct evbuffer_chain ***chainp, + int exact); /* Helper macro: copies an evbuffer_iovec in ei to a win32 WSABUF in i. */ #define WSABUF_FROM_EVBUFFER_IOV(i,ei) do { \ From d844242f9b138be4896942ded25d74a91bf29901 Mon Sep 17 00:00:00 2001 From: Christopher Davis Date: Sat, 28 Aug 2010 02:08:27 -0700 Subject: [PATCH 2/6] Stop IOCP when freeing the event_base. --- event.c | 18 ++++++++++++++++++ event_iocp.c | 10 ++++++++-- iocp-internal.h | 8 +++++--- test/regress_iocp.c | 1 - 4 files changed, 31 insertions(+), 6 deletions(-) diff --git a/event.c b/event.c index dbf1c2f3..5f4656fb 100644 --- a/event.c +++ b/event.c @@ -638,6 +638,20 @@ event_base_start_iocp(struct event_base *base) #endif } +void +event_base_stop_iocp(struct event_base *base) +{ +#ifdef WIN32 + int rv; + + if (!base->iocp) + return; + rv = event_iocp_shutdown(base->iocp, -1); + EVUTIL_ASSERT(rv >= 0); + base->iocp = NULL; +#endif +} + void event_base_free(struct event_base *base) { @@ -654,6 +668,10 @@ event_base_free(struct event_base *base) /* XXX(niels) - check for internal events first */ EVUTIL_ASSERT(base); +#ifdef WIN32 + event_base_stop_iocp(base); +#endif + /* threading fds if we have them */ if (base->th_notify_fd[0] != -1) { event_del(&base->th_notify); diff --git a/event_iocp.c b/event_iocp.c index fe9ea571..82fa9aee 100644 --- a/event_iocp.c +++ b/event_iocp.c @@ -73,7 +73,8 @@ loop(void *_port) EnterCriticalSection(&port->lock); if (port->shutdown) { if (--port->n_live_threads == 0) - ReleaseSemaphore(port->shutdownSemaphore, 1, NULL); + ReleaseSemaphore(port->shutdownSemaphore, 1, + NULL); LeaveCriticalSection(&port->lock); return; } @@ -233,13 +234,18 @@ event_iocp_notify_all(struct event_iocp_port *port) int event_iocp_shutdown(struct event_iocp_port *port, long waitMsec) { + DWORD ms = INFINITE; int n; + EnterCriticalSection(&port->lock); port->shutdown = 1; LeaveCriticalSection(&port->lock); event_iocp_notify_all(port); - WaitForSingleObject(port->shutdownSemaphore, waitMsec); + if (waitMsec >= 0) + ms = waitMsec; + + WaitForSingleObject(port->shutdownSemaphore, ms); EnterCriticalSection(&port->lock); n = port->n_live_threads; LeaveCriticalSection(&port->lock); diff --git a/iocp-internal.h b/iocp-internal.h index c444cc39..2b740bcc 100644 --- a/iocp-internal.h +++ b/iocp-internal.h @@ -164,9 +164,10 @@ int event_iocp_port_associate(struct event_iocp_port *port, evutil_socket_t fd, ev_uintptr_t key); /** Tell all threads serving an iocp to stop. Wait for up to waitMsec for all - the threads to finish whatever they're doing. If all the threads are - done, free the port and return 0. Otherwise, return -1. If you get a -1 - return value, it is safe to call this function again. + the threads to finish whatever they're doing. If waitMsec is -1, wait + as long as required. If all the threads are done, free the port and return + 0. Otherwise, return -1. If you get a -1 return value, it is safe to call + this function again. */ int event_iocp_shutdown(struct event_iocp_port *port, long waitMsec); @@ -181,6 +182,7 @@ struct event_iocp_port *event_base_get_iocp(struct event_base *base); /* FIXME document. */ int event_base_start_iocp(struct event_base *base); +void event_base_stop_iocp(struct event_base *base); /* FIXME document. */ struct bufferevent *bufferevent_async_new(struct event_base *base, diff --git a/test/regress_iocp.c b/test/regress_iocp.c index 64e86320..59aeab0b 100644 --- a/test/regress_iocp.c +++ b/test/regress_iocp.c @@ -255,7 +255,6 @@ test_iocp_bufferevent_async(void *ptr) buf[n]='\0'; tt_str_op(buf, ==, "Hello world"); - tt_want(!event_iocp_shutdown(port, 2000)); end: /* FIXME: free stuff. */; } From 76f7e7ae745ad730795f0a4a3bc1299a00137cc2 Mon Sep 17 00:00:00 2001 From: Christopher Davis Date: Tue, 17 Aug 2010 05:02:00 -0700 Subject: [PATCH 3/6] Some IOCP bufferevent tweaks. - Increment reference count of bufferevents before initiating overlapped operations to prevent the destructor from being called while operations are pending. The only portable way of canceling overlapped ops is to close the socket. - Translate error codes to WSA* codes. - Better handling of errors. - Add an interface to add and del "virtual" events. Because IOCP bufferevents don't register any events with the base, the event loop has no way of knowing they exist. This causes the loop to terminate prematurely. event_base_{add,del}_virtual increment/decrement base's event count so the loop runs while there are any enabled IOCP bufferevents. --- bufferevent_async.c | 286 +++++++++++++++++++++++++++++++------------- event-internal.h | 6 + event.c | 18 ++- event_iocp.c | 2 +- 4 files changed, 227 insertions(+), 85 deletions(-) diff --git a/bufferevent_async.c b/bufferevent_async.c index 23b636d2..fa1bbc92 100644 --- a/bufferevent_async.c +++ b/bufferevent_async.c @@ -48,11 +48,15 @@ #include #endif +#include + #include "event2/util.h" #include "event2/bufferevent.h" #include "event2/buffer.h" #include "event2/bufferevent_struct.h" #include "event2/event.h" +#include "event2/util.h" +#include "event-internal.h" #include "log-internal.h" #include "mm-internal.h" #include "bufferevent-internal.h" @@ -74,6 +78,8 @@ struct bufferevent_async { unsigned read_in_progress : 1; unsigned write_in_progress : 1; unsigned ok : 1; + unsigned read_added : 1; + unsigned write_added : 1; }; const struct bufferevent_ops bufferevent_ops_async = { @@ -125,74 +131,143 @@ upcast_write(struct event_overlapped *eo) } static void -bev_async_consider_writing(struct bufferevent_async *b) +bev_async_del_write(struct bufferevent_async *beva) { - size_t at_most; - int limit; - /* Don't write if there's a write in progress, or we do not - * want to write. */ - if (!b->ok || b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE)) - return; - /* Don't write if there's nothing to write */ - if (!evbuffer_get_length(b->bev.bev.output)) - return; + struct bufferevent *bev = &beva->bev.bev; - at_most = evbuffer_get_length(b->bev.bev.output); - - /* XXXX This over-commits. */ - limit = _bufferevent_get_write_max(&b->bev); - if (at_most >= limit) - at_most = limit; - - if (b->bev.write_suspended) - return; - - /* XXXX doesn't respect low-water mark very well. */ - if (evbuffer_launch_write(b->bev.bev.output, at_most, - &b->write_overlapped)) { - EVUTIL_ASSERT(0);/* XXX act sensibly. */ - } else { - b->write_in_progress = 1; + if (beva->write_added) { + beva->write_added = 0; + event_base_del_virtual(bev->ev_base); } } static void -bev_async_consider_reading(struct bufferevent_async *b) +bev_async_del_read(struct bufferevent_async *beva) +{ + struct bufferevent *bev = &beva->bev.bev; + + if (beva->read_added) { + beva->read_added = 0; + event_base_del_virtual(bev->ev_base); + } +} + +static void +bev_async_add_write(struct bufferevent_async *beva) +{ + struct bufferevent *bev = &beva->bev.bev; + + if (!beva->write_added) { + beva->write_added = 1; + event_base_add_virtual(bev->ev_base); + } +} + +static void +bev_async_add_read(struct bufferevent_async *beva) +{ + struct bufferevent *bev = &beva->bev.bev; + + if (!beva->read_added) { + beva->read_added = 1; + event_base_add_virtual(bev->ev_base); + } +} + +static void +bev_async_consider_writing(struct bufferevent_async *beva) +{ + size_t at_most; + int limit; + struct bufferevent *bev = &beva->bev.bev; + + /* Don't write if there's a write in progress, or we do not + * want to write, or when there's nothing left to write. */ + if (beva->write_in_progress) + return; + if (!beva->ok || !(bev->enabled&EV_WRITE) || + !evbuffer_get_length(bev->output)) { + bev_async_del_write(beva); + return; + } + + at_most = evbuffer_get_length(bev->output); + + /* XXXX This over-commits. */ + limit = _bufferevent_get_write_max(&beva->bev); + if (at_most >= limit) + at_most = limit; + + if (beva->bev.write_suspended) { + bev_async_del_write(beva); + return; + } + + /* XXXX doesn't respect low-water mark very well. */ + bufferevent_incref(bev); + if (evbuffer_launch_write(bev->output, at_most, + &beva->write_overlapped)) { + bufferevent_decref(bev); + beva->ok = 0; + _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); + } else { + beva->write_in_progress = 1; + bev_async_add_write(beva); + } +} + +static void +bev_async_consider_reading(struct bufferevent_async *beva) { size_t cur_size; size_t read_high; size_t at_most; int limit; + struct bufferevent *bev = &beva->bev.bev; + /* Don't read if there is a read in progress, or we do not * want to read. */ - if (!b->ok || b->read_in_progress || !(b->bev.bev.enabled&EV_READ)) + if (beva->read_in_progress) return; + if (!beva->ok || !(bev->enabled&EV_READ)) { + bev_async_del_read(beva); + return; + } /* Don't read if we're full */ - cur_size = evbuffer_get_length(b->bev.bev.input); - read_high = b->bev.bev.wm_read.high; + cur_size = evbuffer_get_length(bev->input); + read_high = bev->wm_read.high; if (read_high) { - if (cur_size >= read_high) + if (cur_size >= read_high) { + bev_async_del_read(beva); return; + } at_most = read_high - cur_size; } else { at_most = 16384; /* FIXME totally magic. */ } /* XXXX This over-commits. */ - limit = _bufferevent_get_read_max(&b->bev); + limit = _bufferevent_get_read_max(&beva->bev); if (at_most >= limit) at_most = limit; - if (b->bev.read_suspended) + if (beva->bev.read_suspended) { + bev_async_del_read(beva); return; - - if (evbuffer_launch_read(b->bev.bev.input, at_most, - &b->read_overlapped)) { - EVUTIL_ASSERT(0); - } else { - b->read_in_progress = 1; } + + bufferevent_incref(bev); + if (evbuffer_launch_read(bev->input, at_most, &beva->read_overlapped)) { + beva->ok = 0; + bufferevent_decref(bev); + _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); + } else { + beva->read_in_progress = 1; + bev_async_add_read(beva); + } + + return; } static void @@ -260,14 +335,19 @@ be_async_enable(struct bufferevent *buf, short what) static int be_async_disable(struct bufferevent *bev, short what) { + struct bufferevent_async *bev_async = upcast(bev); /* XXXX If we disable reading or writing, we may want to consider * canceling any in-progress read or write operation, though it might * not work. */ - if (what & EV_READ) + if (what & EV_READ) { BEV_DEL_GENERIC_READ_TIMEOUT(bev); - if (what & EV_WRITE) + bev_async_del_read(bev_async); + } + if (what & EV_WRITE) { BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); + bev_async_del_write(bev_async); + } return 0; } @@ -275,18 +355,36 @@ be_async_disable(struct bufferevent *bev, short what) static void be_async_destruct(struct bufferevent *bev) { + struct bufferevent_async *bev_async = upcast(bev); struct bufferevent_private *bev_p = BEV_UPCAST(bev); evutil_socket_t fd; - EVUTIL_ASSERT(!upcast(bev)->write_in_progress && !upcast(bev)->read_in_progress); + EVUTIL_ASSERT(!upcast(bev)->write_in_progress && + !upcast(bev)->read_in_progress); + + bev_async_del_read(bev_async); + bev_async_del_write(bev_async); - /* XXX cancel any outstanding I/O operations */ fd = _evbuffer_overlapped_get_fd(bev->input); - /* delete this in case non-blocking connect was used */ - event_del(&bev->ev_write); if (bev_p->options & BEV_OPT_CLOSE_ON_FREE) evutil_closesocket(fd); - _bufferevent_del_generic_timeout_cbs(bev); + /* delete this in case non-blocking connect was used */ + if (event_initialized(&bev->ev_write)) { + event_del(&bev->ev_write); + _bufferevent_del_generic_timeout_cbs(bev); + } +} + +/* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so + * we use WSAGetOverlappedResult to translate. */ +static void +bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo) +{ + DWORD bytes, flags; + evutil_socket_t fd; + + fd = _evbuffer_overlapped_get_fd(bev->input); + WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags); } static int @@ -303,15 +401,22 @@ connect_complete(struct event_overlapped *eo, ev_uintptr_t key, struct bufferevent_async *bev_a = upcast_connect(eo); struct bufferevent *bev = &bev_a->bev.bev; - _bufferevent_incref_and_lock(bev); + BEV_LOCK(bev); EVUTIL_ASSERT(bev_a->bev.connecting); bev_a->bev.connecting = 0; + event_base_del_virtual(bev->ev_base); + + if (ok) + bufferevent_async_set_connected(bev); + else + bev_async_set_wsa_error(bev, eo); - bufferevent_async_set_connected(bev); _bufferevent_run_eventcb(bev, ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR); + event_base_del_virtual(bev->ev_base); + _bufferevent_decref_and_unlock(bev); } @@ -323,26 +428,32 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key, struct bufferevent *bev = &bev_a->bev.bev; short what = BEV_EVENT_READING; - _bufferevent_incref_and_lock(bev); - EVUTIL_ASSERT(bev_a->ok && bev_a->read_in_progress); + BEV_LOCK(bev); + EVUTIL_ASSERT(bev_a->read_in_progress); evbuffer_commit_read(bev->input, nbytes); bev_a->read_in_progress = 0; - if (ok && nbytes) { - BEV_RESET_GENERIC_READ_TIMEOUT(bev); - _bufferevent_decrement_read_buckets(&bev_a->bev, nbytes); - if (evbuffer_get_length(bev->input) >= bev->wm_read.low) - _bufferevent_run_readcb(bev); - bev_async_consider_reading(bev_a); - } else if (!ok) { - what |= BEV_EVENT_ERROR; - bev_a->ok = 0; - _bufferevent_run_eventcb(bev, what); - } else if (!nbytes) { - what |= BEV_EVENT_EOF; - bev_a->ok = 0; - _bufferevent_run_eventcb(bev, what); + if (!ok) + bev_async_set_wsa_error(bev, eo); + + if (bev_a->ok) { + if (ok && nbytes) { + BEV_RESET_GENERIC_READ_TIMEOUT(bev); + _bufferevent_decrement_read_buckets(&bev_a->bev, + nbytes); + if (evbuffer_get_length(bev->input) >= bev->wm_read.low) + _bufferevent_run_readcb(bev); + bev_async_consider_reading(bev_a); + } else if (!ok) { + what |= BEV_EVENT_ERROR; + bev_a->ok = 0; + _bufferevent_run_eventcb(bev, what); + } else if (!nbytes) { + what |= BEV_EVENT_EOF; + bev_a->ok = 0; + _bufferevent_run_eventcb(bev, what); + } } _bufferevent_decref_and_unlock(bev); @@ -356,26 +467,32 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key, struct bufferevent *bev = &bev_a->bev.bev; short what = BEV_EVENT_WRITING; - _bufferevent_incref_and_lock(bev); - EVUTIL_ASSERT(bev_a->ok && bev_a->write_in_progress); - + BEV_LOCK(bev); + EVUTIL_ASSERT(bev_a->write_in_progress); evbuffer_commit_write(bev->output, nbytes); bev_a->write_in_progress = 0; - if (ok && nbytes) { - BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); - _bufferevent_decrement_write_buckets(&bev_a->bev, nbytes); - if (evbuffer_get_length(bev->output) <= bev->wm_write.low) - _bufferevent_run_writecb(bev); - bev_async_consider_writing(bev_a); - } else if (!ok) { - what |= BEV_EVENT_ERROR; - bev_a->ok = 0; - _bufferevent_run_eventcb(bev, what); - } else if (!nbytes) { - what |= BEV_EVENT_EOF; - bev_a->ok = 0; - _bufferevent_run_eventcb(bev, what); + if (!ok) + bev_async_set_wsa_error(bev, eo); + + if (bev_a->ok) { + if (ok && nbytes) { + BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); + _bufferevent_decrement_write_buckets(&bev_a->bev, + nbytes); + if (evbuffer_get_length(bev->output) <= + bev->wm_write.low) + _bufferevent_run_writecb(bev); + bev_async_consider_writing(bev_a); + } else if (!ok) { + what |= BEV_EVENT_ERROR; + bev_a->ok = 0; + _bufferevent_run_eventcb(bev, what); + } else if (!nbytes) { + what |= BEV_EVENT_EOF; + bev_a->ok = 0; + _bufferevent_run_eventcb(bev, what); + } } _bufferevent_decref_and_unlock(bev); @@ -423,8 +540,6 @@ bufferevent_async_new(struct event_base *base, evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev); evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev); - evbuffer_defer_callbacks(bev->input, base); - evbuffer_defer_callbacks(bev->output, base); event_overlapped_init(&bev_a->connect_overlapped, connect_complete); event_overlapped_init(&bev_a->read_overlapped, read_complete); @@ -497,11 +612,16 @@ bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd, WSAGetLastError() != WSAEINVAL) return -1; + event_base_add_virtual(bev->ev_base); + bufferevent_incref(bev); rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL, &bev_async->connect_overlapped.overlapped); if (rc || WSAGetLastError() == ERROR_IO_PENDING) return 0; + event_base_del_virtual(bev->ev_base); + bufferevent_decref(bev); + return -1; } diff --git a/event-internal.h b/event-internal.h index e52c129b..7d97d98e 100644 --- a/event-internal.h +++ b/event-internal.h @@ -182,6 +182,8 @@ struct event_base { /** Data to implement the common signal handelr code. */ struct evsig_info sig; + /** Number of virtual events */ + int virtual_event_count; /** Number of total events added to this event_base */ int event_count; /** Number of total events active in this event_base */ @@ -313,6 +315,10 @@ int _evsig_restore_handler(struct event_base *base, int evsignal); void event_active_nolock(struct event *ev, int res, short count); +/* FIXME document. */ +void event_base_add_virtual(struct event_base *base); +void event_base_del_virtual(struct event_base *base); + #ifdef __cplusplus } #endif diff --git a/event.c b/event.c index 5f4656fb..aaf8f28b 100644 --- a/event.c +++ b/event.c @@ -962,7 +962,7 @@ static int event_haveevents(struct event_base *base) { /* Caller must hold th_base_lock */ - return (base->event_count > 0); + return (base->virtual_event_count > 0 || base->event_count > 0); } /* "closure" function called when processing active signal events */ @@ -2707,3 +2707,19 @@ event_base_dump_events(struct event_base *base, FILE *output) } } } + +void +event_base_add_virtual(struct event_base *base) +{ + EVBASE_ACQUIRE_LOCK(base, th_base_lock); + base->virtual_event_count++; + EVBASE_RELEASE_LOCK(base, th_base_lock); +} + +void +event_base_del_virtual(struct event_base *base) +{ + EVBASE_ACQUIRE_LOCK(base, th_base_lock); + base->virtual_event_count--; + EVBASE_RELEASE_LOCK(base, th_base_lock); +} diff --git a/event_iocp.c b/event_iocp.c index 82fa9aee..19c7bffc 100644 --- a/event_iocp.c +++ b/event_iocp.c @@ -36,6 +36,7 @@ #include "log-internal.h" #include "mm-internal.h" #include "event-internal.h" +#include "evthread-internal.h" #define NOTIFICATION_KEY ((ULONG_PTR)-1) @@ -277,4 +278,3 @@ event_base_get_iocp(struct event_base *base) return NULL; #endif } - From 499452f4c24f172bf6846599e3886838a4207aca Mon Sep 17 00:00:00 2001 From: Christopher Davis Date: Sat, 28 Aug 2010 02:44:11 -0700 Subject: [PATCH 4/6] IOCP-related unit test tweaks --- test/regress_bufferevent.c | 17 +----- test/regress_iocp.c | 112 +++++++++++++++++++++++++++++++------ 2 files changed, 97 insertions(+), 32 deletions(-) diff --git a/test/regress_bufferevent.c b/test/regress_bufferevent.c index 370eee01..de97909f 100644 --- a/test/regress_bufferevent.c +++ b/test/regress_bufferevent.c @@ -426,9 +426,8 @@ listen_cb(struct evconnlistener *listener, evutil_socket_t fd, TT_BLATHER(("Got a request on socket %d", (int)fd )); bev = bufferevent_socket_new(base, fd, bufferevent_connect_test_flags); tt_assert(bev); - bufferevent_write(bev, s, sizeof(s)); bufferevent_setcb(bev, NULL, sender_writecb, sender_errorcb, NULL); - bufferevent_enable(bev, EV_WRITE); + bufferevent_write(bev, s, sizeof(s)); end: ; } @@ -527,13 +526,6 @@ test_bufferevent_connect(void *arg) tt_want(!bufferevent_socket_connect(bev1, sa, sizeof(localhost))); tt_want(!bufferevent_socket_connect(bev2, sa, sizeof(localhost))); -#ifdef WIN32 - /* FIXME this is to get IOCP to work. it shouldn't be required. */ - { - struct timeval tv = {5000,0}; - event_base_loopexit(data->base, &tv); - } -#endif event_base_dispatch(data->base); tt_int_op(n_strings_read, ==, 2); @@ -622,13 +614,6 @@ test_bufferevent_connect_fail(void *arg) event_add(&close_listener_event, &one_second); close_listener_event_added = 1; -#ifdef WIN32 - /* FIXME this is to get IOCP to work. it shouldn't be required. */ - { - struct timeval tv = {5000,0}; - event_base_loopexit(data->base, &tv); - } -#endif event_base_dispatch(data->base); tt_int_op(test_ok, ==, 1); diff --git a/test/regress_iocp.c b/test/regress_iocp.c index 59aeab0b..96d8eb3e 100644 --- a/test/regress_iocp.c +++ b/test/regress_iocp.c @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -44,6 +45,7 @@ #undef WIN32_LEAN_AND_MEAN #include "iocp-internal.h" +#include "evbuffer-internal.h" #include "evthread-internal.h" /* FIXME remove these ones */ @@ -52,6 +54,62 @@ #include "event-internal.h" #define MAX_CALLS 16 + +static void *count_lock = NULL, *count_cond = NULL; +static int count = 0; + +static void +count_init(void) +{ + EVTHREAD_ALLOC_LOCK(count_lock, 0); + EVTHREAD_ALLOC_COND(count_cond); + + tt_assert(count_lock); + tt_assert(count_cond); + +end: + ; +} + +static void +count_free(void) +{ + EVTHREAD_FREE_LOCK(count_lock, 0); + EVTHREAD_FREE_COND(count_cond); +} + +static void +count_incr(void) +{ + EVLOCK_LOCK(count_lock, 0); + count++; + EVTHREAD_COND_BROADCAST(count_cond); + EVLOCK_UNLOCK(count_lock, 0); +} + +static int +count_wait_for(int i, int ms) +{ + struct timeval tv; + DWORD elapsed; + int rv = -1; + + EVLOCK_LOCK(count_lock, 0); + while (ms > 0 && count != i) { + tv.tv_sec = 0; + tv.tv_usec = ms * 1000; + elapsed = GetTickCount(); + EVTHREAD_COND_WAIT_TIMED(count_cond, count_lock, &tv); + elapsed = GetTickCount() - elapsed; + ms -= elapsed; + } + if (count == i) + rv = 0; + EVLOCK_UNLOCK(count_lock, 0); + + return rv; +} + struct dummy_overlapped { struct event_overlapped eo; void *lock; @@ -73,6 +131,8 @@ dummy_cb(struct event_overlapped *o, uintptr_t key, ev_ssize_t n, int ok) } d_o->call_count++; EVLOCK_UNLOCK(d_o->lock, 0); + + count_incr(); } static int @@ -100,6 +160,7 @@ test_iocp_port(void *ptr) memset(&o1, 0, sizeof(o1)); memset(&o2, 0, sizeof(o2)); + count_init(); EVTHREAD_ALLOC_LOCK(o1.lock, EVTHREAD_LOCKTYPE_RECURSIVE); EVTHREAD_ALLOC_LOCK(o2.lock, EVTHREAD_LOCKTYPE_RECURSIVE); @@ -124,10 +185,7 @@ test_iocp_port(void *ptr) tt_assert(!event_iocp_activate_overlapped(port, &o1.eo, 13, 103)); tt_assert(!event_iocp_activate_overlapped(port, &o2.eo, 23, 203)); -#ifdef WIN32 - /* FIXME Be smarter. */ - Sleep(1000); -#endif + tt_int_op(count_wait_for(8, 2000), ==, 0); tt_want(!event_iocp_shutdown(port, 2000)); @@ -145,8 +203,9 @@ test_iocp_port(void *ptr) tt_want(pair_is_in(&o2, 23, 203)); end: - /* FIXME free the locks. */ - ; + EVTHREAD_FREE_LOCK(o1.lock, EVTHREAD_LOCKTYPE_RECURSIVE); + EVTHREAD_FREE_LOCK(o2.lock, EVTHREAD_LOCKTYPE_RECURSIVE); + count_free(); } static struct evbuffer *rbuf = NULL, *wbuf = NULL; @@ -157,6 +216,7 @@ read_complete(struct event_overlapped *eo, uintptr_t key, { tt_assert(ok); evbuffer_commit_read(rbuf, nbytes); + count_incr(); end: ; } @@ -167,6 +227,7 @@ write_complete(struct event_overlapped *eo, uintptr_t key, { tt_assert(ok); evbuffer_commit_write(wbuf, nbytes); + count_incr(); end: ; } @@ -177,9 +238,12 @@ test_iocp_evbuffer(void *ptr) struct event_overlapped rol, wol; struct basic_test_data *data = ptr; struct event_iocp_port *port = NULL; + struct evbuffer *buf; + struct evbuffer_chain *chain; char junk[1024]; int i; + count_init(); event_overlapped_init(&rol, read_complete); event_overlapped_init(&wol, write_complete); @@ -202,14 +266,18 @@ test_iocp_evbuffer(void *ptr) for (i=0;i<10;++i) evbuffer_add(wbuf, junk, sizeof(junk)); + buf = evbuffer_new(); + tt_assert(buf != NULL); + evbuffer_add(rbuf, junk, sizeof(junk)); + tt_assert(!evbuffer_launch_read(rbuf, 2048, &rol)); + evbuffer_add_buffer(buf, rbuf); + tt_int_op(evbuffer_get_length(buf), ==, sizeof(junk)); + for (chain = buf->first; chain; chain = chain->next) + tt_int_op(chain->flags & EVBUFFER_MEM_PINNED_ANY, ==, 0); tt_assert(!evbuffer_get_length(rbuf)); tt_assert(!evbuffer_launch_write(wbuf, 512, &wol)); - tt_assert(!evbuffer_launch_read(rbuf, 2048, &rol)); -#ifdef WIN32 - /* FIXME this is stupid. */ - Sleep(1000); -#endif + tt_int_op(count_wait_for(2, 2000), ==, 0); tt_int_op(evbuffer_get_length(rbuf),==,512); @@ -217,8 +285,20 @@ test_iocp_evbuffer(void *ptr) tt_want(!event_iocp_shutdown(port, 2000)); end: + count_free(); evbuffer_free(rbuf); evbuffer_free(wbuf); + evbuffer_free(buf); +} + +static int got_readcb = 0; + +static void +async_readcb(struct bufferevent *bev, void *arg) +{ + /* Disabling read should cause the loop to quit */ + bufferevent_disable(bev, EV_READ); + got_readcb++; } static void @@ -229,7 +309,6 @@ test_iocp_bufferevent_async(void *ptr) struct bufferevent *bea1=NULL, *bea2=NULL; char buf[128]; size_t n; - struct timeval one_sec = {1,0}; event_base_start_iocp(data->base); port = event_base_get_iocp(data->base); @@ -242,26 +321,27 @@ test_iocp_bufferevent_async(void *ptr) tt_assert(bea1); tt_assert(bea2); - /*FIXME set some callbacks */ + bufferevent_setcb(bea2, async_readcb, NULL, NULL, NULL); bufferevent_enable(bea1, EV_WRITE); bufferevent_enable(bea2, EV_READ); bufferevent_write(bea1, "Hello world", strlen("Hello world")+1); - event_base_loopexit(data->base, &one_sec); event_base_dispatch(data->base); + tt_int_op(got_readcb, ==, 1); n = bufferevent_read(bea2, buf, sizeof(buf)-1); buf[n]='\0'; tt_str_op(buf, ==, "Hello world"); end: - /* FIXME: free stuff. */; + bufferevent_free(bea1); + bufferevent_free(bea2); } struct testcase_t iocp_testcases[] = { - { "port", test_iocp_port, TT_FORK|TT_NEED_THREADS, NULL, NULL }, + { "port", test_iocp_port, TT_FORK|TT_NEED_THREADS, &basic_setup, NULL }, { "evbuffer", test_iocp_evbuffer, TT_FORK|TT_NEED_SOCKETPAIR|TT_NEED_THREADS, &basic_setup, NULL }, From 2447fe88860c40e968261ad8ea059166cb84a280 Mon Sep 17 00:00:00 2001 From: Christopher Davis Date: Sat, 28 Aug 2010 04:07:48 -0700 Subject: [PATCH 5/6] Add event_config_set_num_cpus_hint for tuning thread pools, etc. --- event-internal.h | 1 + event.c | 15 ++++++++++++--- event_iocp.c | 12 +++++++++--- include/event2/event.h | 10 ++++++++++ iocp-internal.h | 4 ++-- test/bench_http.c | 2 +- test/regress_iocp.c | 6 +++--- test/regress_main.c | 2 +- 8 files changed, 39 insertions(+), 13 deletions(-) diff --git a/event-internal.h b/event-internal.h index 7d97d98e..256f3086 100644 --- a/event-internal.h +++ b/event-internal.h @@ -285,6 +285,7 @@ struct event_config_entry { struct event_config { TAILQ_HEAD(event_configq, event_config_entry) entries; + int n_cpus_hint; enum event_method_feature require_features; enum event_base_config_flag flags; }; diff --git a/event.c b/event.c index aaf8f28b..8e8c324b 100644 --- a/event.c +++ b/event.c @@ -615,19 +615,19 @@ event_base_new_with_config(const struct event_config *cfg) #ifdef WIN32 if (cfg && (cfg->flags & EVENT_BASE_FLAG_STARTUP_IOCP)) - event_base_start_iocp(base); + event_base_start_iocp(base, cfg->n_cpus_hint); #endif return (base); } int -event_base_start_iocp(struct event_base *base) +event_base_start_iocp(struct event_base *base, int n_cpus) { #ifdef WIN32 if (base->iocp) return 0; - base->iocp = event_iocp_port_launch(); + base->iocp = event_iocp_port_launch(n_cpus); if (!base->iocp) { event_warnx("%s: Couldn't launch IOCP", __func__); return -1; @@ -918,6 +918,15 @@ event_config_require_features(struct event_config *cfg, return (0); } +int +event_config_set_num_cpus_hint(struct event_config *cfg, int cpus) +{ + if (!cfg) + return (-1); + cfg->n_cpus_hint = cpus; + return (0); +} + int event_priority_init(int npriorities) { diff --git a/event_iocp.c b/event_iocp.c index 19c7bffc..254ed90d 100644 --- a/event_iocp.c +++ b/event_iocp.c @@ -162,8 +162,10 @@ event_get_win32_extension_fns(void) return &the_extension_fns; } +#define N_CPUS_DEFAULT 2 + struct event_iocp_port * -event_iocp_port_launch(void) +event_iocp_port_launch(int n_cpus) { struct event_iocp_port *port; int i; @@ -173,12 +175,16 @@ event_iocp_port_launch(void) if (!(port = mm_calloc(1, sizeof(struct event_iocp_port)))) return NULL; - port->n_threads = 2; + + if (n_cpus <= 0) + n_cpus = N_CPUS_DEFAULT; + port->n_threads = n_cpus * 2; port->threads = calloc(port->n_threads, sizeof(HANDLE)); if (!port->threads) goto err; - port->port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, port->n_threads); + port->port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, + n_cpus); port->ms = -1; if (!port->port) goto err; diff --git a/include/event2/event.h b/include/event2/event.h index e9d0048f..fa0f625d 100644 --- a/include/event2/event.h +++ b/include/event2/event.h @@ -226,6 +226,16 @@ int event_config_require_features(struct event_config *cfg, int feature); * will be initialized, and how they'll work. */ int event_config_set_flag(struct event_config *cfg, int flag); +/** + * Records a hint for the number of CPUs in the system. This is used for + * tuning thread pools, etc, for optimal performance. + * + * @param cfg the event configuration object + * @param cpus the number of cpus + * @return 0 on success, -1 on failure. + */ +int event_config_set_num_cpus_hint(struct event_config *cfg, int cpus); + /** Initialize the event API. diff --git a/iocp-internal.h b/iocp-internal.h index 2b740bcc..eb2eae49 100644 --- a/iocp-internal.h +++ b/iocp-internal.h @@ -155,7 +155,7 @@ void evbuffer_commit_write(struct evbuffer *, ev_ssize_t); This interface is unstable, and will change. */ -struct event_iocp_port *event_iocp_port_launch(void); +struct event_iocp_port *event_iocp_port_launch(int n_cpus); /** Associate a file descriptor with an iocp, such that overlapped IO on the fd will happen on one of the iocp's worker threads. @@ -181,7 +181,7 @@ struct event_base; struct event_iocp_port *event_base_get_iocp(struct event_base *base); /* FIXME document. */ -int event_base_start_iocp(struct event_base *base); +int event_base_start_iocp(struct event_base *base, int n_cpus); void event_base_stop_iocp(struct event_base *base); /* FIXME document. */ diff --git a/test/bench_http.c b/test/bench_http.c index fdcf49dd..4b2eb3cd 100644 --- a/test/bench_http.c +++ b/test/bench_http.c @@ -133,7 +133,7 @@ main(int argc, char **argv) case 'i': use_iocp = 1; evthread_use_windows_threads(); - event_base_start_iocp(base); + event_base_start_iocp(base, 0); break; #endif default: diff --git a/test/regress_iocp.c b/test/regress_iocp.c index 96d8eb3e..920da2a7 100644 --- a/test/regress_iocp.c +++ b/test/regress_iocp.c @@ -170,7 +170,7 @@ test_iocp_port(void *ptr) event_overlapped_init(&o1.eo, dummy_cb); event_overlapped_init(&o2.eo, dummy_cb); - port = event_iocp_port_launch(); + port = event_iocp_port_launch(0); tt_assert(port); tt_assert(!event_iocp_activate_overlapped(port, &o1.eo, 10, 100)); @@ -255,7 +255,7 @@ test_iocp_evbuffer(void *ptr) evbuffer_enable_locking(rbuf, NULL); evbuffer_enable_locking(wbuf, NULL); - port = event_iocp_port_launch(); + port = event_iocp_port_launch(0); tt_assert(port); tt_assert(rbuf); tt_assert(wbuf); @@ -310,7 +310,7 @@ test_iocp_bufferevent_async(void *ptr) char buf[128]; size_t n; - event_base_start_iocp(data->base); + event_base_start_iocp(data->base, 0); port = event_base_get_iocp(data->base); tt_assert(port); diff --git a/test/regress_main.c b/test/regress_main.c index 7d18938b..3732f098 100644 --- a/test/regress_main.c +++ b/test/regress_main.c @@ -209,7 +209,7 @@ basic_test_setup(const struct testcase_t *testcase) exit(1); } if (testcase->flags & TT_ENABLE_IOCP_FLAG) { - if (event_base_start_iocp(base)<0) { + if (event_base_start_iocp(base, 0)<0) { event_base_free(base); return (void*)TT_SKIP; } From 17a14f1af2ace0201baa1b5bbba031296e62d879 Mon Sep 17 00:00:00 2001 From: Christopher Davis Date: Wed, 1 Sep 2010 11:04:57 -0700 Subject: [PATCH 6/6] Only process up to MAX_DEFERRED deferred_cbs at a time. If threads queue callbacks while event_process_deferred_callbacks is running, the loop may spin long enough to significantly skew timers. A unit test stressing this behavior is also in this commit. --- bufferevent.c | 15 +++---- event.c | 16 +++++--- test/regress_thread.c | 94 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 16 deletions(-) diff --git a/bufferevent.c b/bufferevent.c index 9923bbe0..53b07f1f 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -206,10 +206,11 @@ bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *_, void *arg) #define SCHEDULE_DEFERRED(bevp) \ do { \ + bufferevent_incref(&(bevp)->bev); \ event_deferred_cb_schedule( \ event_base_get_deferred_cb_queue((bevp)->bev.ev_base), \ &(bevp)->deferred); \ - } while (0); + } while (0) void @@ -222,10 +223,8 @@ _bufferevent_run_readcb(struct bufferevent *bufev) return; if (p->options & BEV_OPT_DEFER_CALLBACKS) { p->readcb_pending = 1; - if (!p->deferred.queued) { - bufferevent_incref(bufev); + if (!p->deferred.queued) SCHEDULE_DEFERRED(p); - } } else { bufev->readcb(bufev, bufev->cbarg); } @@ -241,10 +240,8 @@ _bufferevent_run_writecb(struct bufferevent *bufev) return; if (p->options & BEV_OPT_DEFER_CALLBACKS) { p->writecb_pending = 1; - if (!p->deferred.queued) { - bufferevent_incref(bufev); + if (!p->deferred.queued) SCHEDULE_DEFERRED(p); - } } else { bufev->writecb(bufev, bufev->cbarg); } @@ -261,10 +258,8 @@ _bufferevent_run_eventcb(struct bufferevent *bufev, short what) if (p->options & BEV_OPT_DEFER_CALLBACKS) { p->eventcb_pending |= what; p->errno_pending = EVUTIL_SOCKET_ERROR(); - if (!p->deferred.queued) { - bufferevent_incref(bufev); + if (!p->deferred.queued) SCHEDULE_DEFERRED(p); - } } else { bufev->errorcb(bufev, what, bufev->cbarg); } diff --git a/event.c b/event.c index 8e8c324b..d71932ce 100644 --- a/event.c +++ b/event.c @@ -1285,9 +1285,10 @@ event_process_active_single_queue(struct event_base *base, } /* - Process all the defered_cb entries in 'queue'. If *breakptr becomes set to - 1, stop. Requires that we start out holding the lock on 'queue'; releases - the lock around 'queue' for each deferred_cb we process. + Process up to MAX_DEFERRED of the defered_cb entries in 'queue'. If + *breakptr becomes set to 1, stop. Requires that we start out holding + the lock on 'queue'; releases the lock around 'queue' for each deferred_cb + we process. */ static int event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr) @@ -1295,6 +1296,7 @@ event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr) int count = 0; struct deferred_cb *cb; +#define MAX_DEFERRED 16 while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) { cb->queued = 0; TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next); @@ -1302,12 +1304,14 @@ event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr) UNLOCK_DEFERRED_QUEUE(queue); cb->cb(cb, cb->arg); - ++count; - if (*breakptr) - return -1; LOCK_DEFERRED_QUEUE(queue); + if (*breakptr) + return -1; + if (++count == MAX_DEFERRED) + break; } +#undef MAX_DEFERRED return count; } diff --git a/test/regress_thread.c b/test/regress_thread.c index 34cf64b1..675e350e 100644 --- a/test/regress_thread.c +++ b/test/regress_thread.c @@ -33,6 +33,9 @@ #include #include #include +#ifndef WIN32 +#include +#endif #ifdef _EVENT_HAVE_PTHREADS #include @@ -46,6 +49,7 @@ #include "event2/event_struct.h" #include "event2/thread.h" #include "evthread-internal.h" +#include "defer-internal.h" #include "regress.h" #include "tinytest_macros.h" @@ -312,12 +316,102 @@ end: ; } +#define CB_COUNT 128 +#define QUEUE_THREAD_COUNT 8 + +#ifdef WIN32 +#define SLEEP_MS(ms) Sleep(ms) +#else +#define SLEEP_MS(ms) usleep((ms) * 1000) +#endif + +struct deferred_test_data { + struct deferred_cb cbs[CB_COUNT]; + struct deferred_cb_queue *queue; +}; + +static time_t timer_start = 0; +static time_t timer_end = 0; +static unsigned callback_count = 0; +static THREAD_T load_threads[QUEUE_THREAD_COUNT]; +static struct deferred_test_data deferred_data[QUEUE_THREAD_COUNT]; + +static void +deferred_callback(struct deferred_cb *cb, void *arg) +{ + SLEEP_MS(1); + callback_count += 1; +} + +static THREAD_FN +load_deferred_queue(void *arg) +{ + struct deferred_test_data *data = arg; + size_t i; + + for (i = 0; i < CB_COUNT; ++i) { + event_deferred_cb_init(&data->cbs[i], deferred_callback, NULL); + event_deferred_cb_schedule(data->queue, &data->cbs[i]); + SLEEP_MS(1); + } + + THREAD_RETURN(); +} + +static void +timer_callback(evutil_socket_t fd, short what, void *arg) +{ + timer_end = time(NULL); +} + +static void +start_threads_callback(evutil_socket_t fd, short what, void *arg) +{ + int i; + + for (i = 0; i < QUEUE_THREAD_COUNT; ++i) { + THREAD_START(load_threads[i], load_deferred_queue, + &deferred_data[i]); + } +} + +static void +thread_deferred_cb_skew(void *arg) +{ + struct basic_test_data *data = arg; + struct timeval tv_timer = {4, 0}; + struct event event_threads; + struct deferred_cb_queue *queue; + int i; + + queue = event_base_get_deferred_cb_queue(data->base); + tt_assert(queue); + + for (i = 0; i < QUEUE_THREAD_COUNT; ++i) + deferred_data[i].queue = queue; + + timer_start = time(NULL); + event_base_once(data->base, -1, EV_TIMEOUT, timer_callback, NULL, + &tv_timer); + event_base_once(data->base, -1, EV_TIMEOUT, start_threads_callback, + NULL, NULL); + event_base_dispatch(data->base); + + TT_BLATHER(("callback count, %u", callback_count)); + tt_int_op(timer_end - timer_start, ==, 4); + +end: + for (i = 0; i < QUEUE_THREAD_COUNT; ++i) + THREAD_JOIN(load_threads[i]); +} + #define TEST(name) \ { #name, thread_##name, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE, \ &basic_setup, NULL } struct testcase_t thread_testcases[] = { TEST(basic), TEST(conditions_simple), + TEST(deferred_cb_skew), END_OF_TESTCASES };