Merge remote branch 'chrisd/iocp-fixes4'

Conflicts:
	test/regress_thread.c
This commit is contained in:
Nick Mathewson 2010-09-08 14:12:12 -04:00
commit 3658b1696d
15 changed files with 638 additions and 193 deletions

122
buffer.c
View File

@ -575,7 +575,8 @@ evbuffer_reserve_space(struct evbuffer *buf, ev_ssize_t size,
} else {
if (_evbuffer_expand_fast(buf, size, n_vecs)<0)
goto done;
n = _evbuffer_read_setup_vecs(buf, size, vec, n_vecs, &chainp, 0);
n = _evbuffer_read_setup_vecs(buf, size, vec, n_vecs,
&chainp, 0);
}
done:
@ -670,6 +671,12 @@ done:
return result;
}
static inline int
HAS_PINNED_R(struct evbuffer *buf)
{
return (buf->last && CHAIN_PINNED_R(buf->last));
}
static inline void
ZERO_CHAIN(struct evbuffer *dst)
{
@ -680,6 +687,71 @@ ZERO_CHAIN(struct evbuffer *dst)
dst->total_len = 0;
}
/* Prepares the contents of src to be moved to another buffer by removing
* read-pinned chains. The first pinned chain is saved in first, and the
* last in last. If src has no read-pinned chains, first and last are set
* to NULL. */
static int
PRESERVE_PINNED(struct evbuffer *src, struct evbuffer_chain **first,
struct evbuffer_chain **last)
{
struct evbuffer_chain *chain, **pinned;
ASSERT_EVBUFFER_LOCKED(src);
if (!HAS_PINNED_R(src)) {
*first = *last = NULL;
return 0;
}
pinned = src->last_with_datap;
if (!CHAIN_PINNED_R(*pinned))
pinned = &(*pinned)->next;
EVUTIL_ASSERT(CHAIN_PINNED_R(*pinned));
chain = *first = *pinned;
*last = src->last;
/* If there's data in the first pinned chain, we need to allocate
* a new chain and copy the data over. */
if (chain->off) {
struct evbuffer_chain *tmp;
EVUTIL_ASSERT(pinned == src->last_with_datap);
tmp = evbuffer_chain_new(chain->off);
if (!tmp)
return -1;
memcpy(tmp->buffer, chain->buffer + chain->misalign,
chain->off);
tmp->off = chain->off;
*src->last_with_datap = tmp;
src->last = tmp;
chain->misalign += chain->off;
chain->off = 0;
} else {
src->last = *src->last_with_datap;
*pinned = NULL;
}
return 0;
}
static inline void
RESTORE_PINNED(struct evbuffer *src, struct evbuffer_chain *pinned,
struct evbuffer_chain *last)
{
ASSERT_EVBUFFER_LOCKED(src);
if (!pinned) {
ZERO_CHAIN(src);
return;
}
src->first = pinned;
src->last = last;
src->last_with_datap = &src->first;
src->total_len = 0;
}
static inline void
COPY_CHAIN(struct evbuffer *dst, struct evbuffer *src)
{
@ -729,6 +801,7 @@ PREPEND_CHAIN(struct evbuffer *dst, struct evbuffer *src)
int
evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
{
struct evbuffer_chain *pinned, *last;
size_t in_total_len, out_total_len;
int result = 0;
@ -744,6 +817,11 @@ evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
goto done;
}
if (PRESERVE_PINNED(inbuf, &pinned, &last) < 0) {
result = -1;
goto done;
}
if (out_total_len == 0) {
/* There might be an empty chain at the start of outbuf; free
* it. */
@ -753,8 +831,8 @@ evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
APPEND_CHAIN(outbuf, inbuf);
}
/* remove everything from inbuf */
ZERO_CHAIN(inbuf);
RESTORE_PINNED(inbuf, pinned, last);
inbuf->n_del_for_cb += in_total_len;
outbuf->n_add_for_cb += in_total_len;
@ -769,6 +847,7 @@ done:
int
evbuffer_prepend_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
{
struct evbuffer_chain *pinned, *last;
size_t in_total_len, out_total_len;
int result = 0;
@ -785,6 +864,11 @@ evbuffer_prepend_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
goto done;
}
if (PRESERVE_PINNED(inbuf, &pinned, &last) < 0) {
result = -1;
goto done;
}
if (out_total_len == 0) {
/* There might be an empty chain at the start of outbuf; free
* it. */
@ -794,8 +878,8 @@ evbuffer_prepend_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
PREPEND_CHAIN(outbuf, inbuf);
}
/* remove everything from inbuf */
ZERO_CHAIN(inbuf);
RESTORE_PINNED(inbuf, pinned, last);
inbuf->n_del_for_cb += in_total_len;
outbuf->n_add_for_cb += in_total_len;
@ -810,7 +894,7 @@ int
evbuffer_drain(struct evbuffer *buf, size_t len)
{
struct evbuffer_chain *chain, *next;
size_t old_len;
size_t remaining, old_len;
int result = 0;
EVBUFFER_LOCK(buf);
@ -824,12 +908,10 @@ evbuffer_drain(struct evbuffer *buf, size_t len)
goto done;
}
if (len >= old_len && !(buf->last && CHAIN_PINNED_R(buf->last))) {
if (len >= old_len && !HAS_PINNED_R(buf)) {
len = old_len;
for (chain = buf->first; chain != NULL; chain = next) {
next = chain->next;
evbuffer_chain_free(chain);
}
@ -839,10 +921,12 @@ evbuffer_drain(struct evbuffer *buf, size_t len)
len = old_len;
buf->total_len -= len;
for (chain = buf->first; len >= chain->off; chain = next) {
remaining = len;
for (chain = buf->first;
remaining >= chain->off;
chain = next) {
next = chain->next;
len -= chain->off;
remaining -= chain->off;
if (chain == *buf->last_with_datap) {
buf->last_with_datap = &buf->first;
@ -850,14 +934,20 @@ evbuffer_drain(struct evbuffer *buf, size_t len)
if (&chain->next == buf->last_with_datap)
buf->last_with_datap = &buf->first;
if (len == 0 && CHAIN_PINNED_R(chain))
if (CHAIN_PINNED_R(chain)) {
EVUTIL_ASSERT(remaining == 0);
chain->misalign += chain->off;
chain->off = 0;
break;
evbuffer_chain_free(chain);
} else
evbuffer_chain_free(chain);
}
buf->first = chain;
chain->misalign += len;
chain->off -= len;
if (chain) {
chain->misalign += remaining;
chain->off -= remaining;
}
}
buf->n_del_for_cb += len;

View File

@ -82,12 +82,13 @@ static void
pin_release(struct evbuffer_overlapped *eo, unsigned flag)
{
int i;
struct evbuffer_chain *chain = eo->first_pinned;
struct evbuffer_chain *next, *chain = eo->first_pinned;
for (i = 0; i < eo->n_buffers; ++i) {
EVUTIL_ASSERT(chain);
next = chain->next;
_evbuffer_chain_unpin(chain, flag);
chain = chain->next;
chain = next;
}
}
@ -95,8 +96,9 @@ void
evbuffer_commit_read(struct evbuffer *evbuf, ev_ssize_t nBytes)
{
struct evbuffer_overlapped *buf = upcast_evbuffer(evbuf);
struct evbuffer_iovec iov[2];
int n_vec;
struct evbuffer_chain **chainp;
size_t remaining, len;
unsigned i;
EVBUFFER_LOCK(evbuf);
EVUTIL_ASSERT(buf->read_in_progress && !buf->write_in_progress);
@ -104,24 +106,27 @@ evbuffer_commit_read(struct evbuffer *evbuf, ev_ssize_t nBytes)
evbuffer_unfreeze(evbuf, 0);
iov[0].iov_base = buf->buffers[0].buf;
if ((size_t)nBytes <= buf->buffers[0].len) {
iov[0].iov_len = nBytes;
n_vec = 1;
} else {
iov[0].iov_len = buf->buffers[0].len;
iov[1].iov_base = buf->buffers[1].buf;
iov[1].iov_len = nBytes - iov[0].iov_len;
n_vec = 2;
chainp = evbuf->last_with_datap;
if (!((*chainp)->flags & EVBUFFER_MEM_PINNED_R))
chainp = &(*chainp)->next;
remaining = nBytes;
for (i = 0; remaining > 0 && i < buf->n_buffers; ++i) {
EVUTIL_ASSERT(*chainp);
len = buf->buffers[i].len;
if (remaining < len)
len = remaining;
(*chainp)->off += len;
evbuf->last_with_datap = chainp;
remaining -= len;
chainp = &(*chainp)->next;
}
if (evbuffer_commit_space(evbuf, iov, n_vec) < 0)
EVUTIL_ASSERT(0); /* XXXX fail nicer. */
pin_release(buf, EVBUFFER_MEM_PINNED_R);
buf->read_in_progress = 0;
evbuf->total_len += nBytes;
_evbuffer_decref_and_unlock(evbuf);
}
@ -184,7 +189,7 @@ evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most,
}
evbuffer_freeze(buf, 1);
buf_o->first_pinned = 0;
buf_o->first_pinned = NULL;
buf_o->n_buffers = 0;
memset(buf_o->buffers, 0, sizeof(buf_o->buffers));
@ -246,19 +251,16 @@ evbuffer_launch_read(struct evbuffer *buf, size_t at_most,
if (buf->freeze_end || buf_o->read_in_progress)
goto done;
buf_o->first_pinned = 0;
buf_o->first_pinned = NULL;
buf_o->n_buffers = 0;
memset(buf_o->buffers, 0, sizeof(buf_o->buffers));
if (_evbuffer_expand_fast(buf, at_most, 2) == -1)
if (_evbuffer_expand_fast(buf, at_most, MAX_WSABUFS) == -1)
goto done;
evbuffer_freeze(buf, 0);
/* XXX This and evbuffer_read_setup_vecs() should say MAX_WSABUFS,
* not "2". But commit_read() above can't handle more than two
* buffers yet. */
nvecs = _evbuffer_read_setup_vecs(buf, at_most,
vecs, 2, &chainp, 1);
vecs, MAX_WSABUFS, &chainp, 1);
for (i=0;i<nvecs;++i) {
WSABUF_FROM_EVBUFFER_IOV(
&buf_o->buffers[i],
@ -266,7 +268,8 @@ evbuffer_launch_read(struct evbuffer *buf, size_t at_most,
}
buf_o->n_buffers = nvecs;
buf_o->first_pinned = chain= *chainp;
buf_o->first_pinned = chain = *chainp;
npin=0;
for ( ; chain; chain = chain->next) {
_evbuffer_chain_pin(chain, EVBUFFER_MEM_PINNED_R);

View File

@ -206,10 +206,11 @@ bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *_, void *arg)
#define SCHEDULE_DEFERRED(bevp) \
do { \
bufferevent_incref(&(bevp)->bev); \
event_deferred_cb_schedule( \
event_base_get_deferred_cb_queue((bevp)->bev.ev_base), \
&(bevp)->deferred); \
} while (0);
} while (0)
void
@ -222,10 +223,8 @@ _bufferevent_run_readcb(struct bufferevent *bufev)
return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->readcb_pending = 1;
if (!p->deferred.queued) {
bufferevent_incref(bufev);
if (!p->deferred.queued)
SCHEDULE_DEFERRED(p);
}
} else {
bufev->readcb(bufev, bufev->cbarg);
}
@ -241,10 +240,8 @@ _bufferevent_run_writecb(struct bufferevent *bufev)
return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->writecb_pending = 1;
if (!p->deferred.queued) {
bufferevent_incref(bufev);
if (!p->deferred.queued)
SCHEDULE_DEFERRED(p);
}
} else {
bufev->writecb(bufev, bufev->cbarg);
}
@ -261,10 +258,8 @@ _bufferevent_run_eventcb(struct bufferevent *bufev, short what)
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->eventcb_pending |= what;
p->errno_pending = EVUTIL_SOCKET_ERROR();
if (!p->deferred.queued) {
bufferevent_incref(bufev);
if (!p->deferred.queued)
SCHEDULE_DEFERRED(p);
}
} else {
bufev->errorcb(bufev, what, bufev->cbarg);
}

View File

@ -48,11 +48,15 @@
#include <ws2tcpip.h>
#endif
#include <sys/queue.h>
#include "event2/util.h"
#include "event2/bufferevent.h"
#include "event2/buffer.h"
#include "event2/bufferevent_struct.h"
#include "event2/event.h"
#include "event2/util.h"
#include "event-internal.h"
#include "log-internal.h"
#include "mm-internal.h"
#include "bufferevent-internal.h"
@ -74,6 +78,8 @@ struct bufferevent_async {
unsigned read_in_progress : 1;
unsigned write_in_progress : 1;
unsigned ok : 1;
unsigned read_added : 1;
unsigned write_added : 1;
};
const struct bufferevent_ops bufferevent_ops_async = {
@ -125,74 +131,143 @@ upcast_write(struct event_overlapped *eo)
}
static void
bev_async_consider_writing(struct bufferevent_async *b)
bev_async_del_write(struct bufferevent_async *beva)
{
size_t at_most;
int limit;
/* Don't write if there's a write in progress, or we do not
* want to write. */
if (!b->ok || b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE))
return;
/* Don't write if there's nothing to write */
if (!evbuffer_get_length(b->bev.bev.output))
return;
struct bufferevent *bev = &beva->bev.bev;
at_most = evbuffer_get_length(b->bev.bev.output);
/* XXXX This over-commits. */
limit = _bufferevent_get_write_max(&b->bev);
if (at_most >= limit)
at_most = limit;
if (b->bev.write_suspended)
return;
/* XXXX doesn't respect low-water mark very well. */
if (evbuffer_launch_write(b->bev.bev.output, at_most,
&b->write_overlapped)) {
EVUTIL_ASSERT(0);/* XXX act sensibly. */
} else {
b->write_in_progress = 1;
if (beva->write_added) {
beva->write_added = 0;
event_base_del_virtual(bev->ev_base);
}
}
static void
bev_async_consider_reading(struct bufferevent_async *b)
bev_async_del_read(struct bufferevent_async *beva)
{
struct bufferevent *bev = &beva->bev.bev;
if (beva->read_added) {
beva->read_added = 0;
event_base_del_virtual(bev->ev_base);
}
}
static void
bev_async_add_write(struct bufferevent_async *beva)
{
struct bufferevent *bev = &beva->bev.bev;
if (!beva->write_added) {
beva->write_added = 1;
event_base_add_virtual(bev->ev_base);
}
}
static void
bev_async_add_read(struct bufferevent_async *beva)
{
struct bufferevent *bev = &beva->bev.bev;
if (!beva->read_added) {
beva->read_added = 1;
event_base_add_virtual(bev->ev_base);
}
}
static void
bev_async_consider_writing(struct bufferevent_async *beva)
{
size_t at_most;
int limit;
struct bufferevent *bev = &beva->bev.bev;
/* Don't write if there's a write in progress, or we do not
* want to write, or when there's nothing left to write. */
if (beva->write_in_progress)
return;
if (!beva->ok || !(bev->enabled&EV_WRITE) ||
!evbuffer_get_length(bev->output)) {
bev_async_del_write(beva);
return;
}
at_most = evbuffer_get_length(bev->output);
/* XXXX This over-commits. */
limit = _bufferevent_get_write_max(&beva->bev);
if (at_most >= limit)
at_most = limit;
if (beva->bev.write_suspended) {
bev_async_del_write(beva);
return;
}
/* XXXX doesn't respect low-water mark very well. */
bufferevent_incref(bev);
if (evbuffer_launch_write(bev->output, at_most,
&beva->write_overlapped)) {
bufferevent_decref(bev);
beva->ok = 0;
_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
} else {
beva->write_in_progress = 1;
bev_async_add_write(beva);
}
}
static void
bev_async_consider_reading(struct bufferevent_async *beva)
{
size_t cur_size;
size_t read_high;
size_t at_most;
int limit;
struct bufferevent *bev = &beva->bev.bev;
/* Don't read if there is a read in progress, or we do not
* want to read. */
if (!b->ok || b->read_in_progress || !(b->bev.bev.enabled&EV_READ))
if (beva->read_in_progress)
return;
if (!beva->ok || !(bev->enabled&EV_READ)) {
bev_async_del_read(beva);
return;
}
/* Don't read if we're full */
cur_size = evbuffer_get_length(b->bev.bev.input);
read_high = b->bev.bev.wm_read.high;
cur_size = evbuffer_get_length(bev->input);
read_high = bev->wm_read.high;
if (read_high) {
if (cur_size >= read_high)
if (cur_size >= read_high) {
bev_async_del_read(beva);
return;
}
at_most = read_high - cur_size;
} else {
at_most = 16384; /* FIXME totally magic. */
}
/* XXXX This over-commits. */
limit = _bufferevent_get_read_max(&b->bev);
limit = _bufferevent_get_read_max(&beva->bev);
if (at_most >= limit)
at_most = limit;
if (b->bev.read_suspended)
if (beva->bev.read_suspended) {
bev_async_del_read(beva);
return;
if (evbuffer_launch_read(b->bev.bev.input, at_most,
&b->read_overlapped)) {
EVUTIL_ASSERT(0);
} else {
b->read_in_progress = 1;
}
bufferevent_incref(bev);
if (evbuffer_launch_read(bev->input, at_most, &beva->read_overlapped)) {
beva->ok = 0;
bufferevent_decref(bev);
_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
} else {
beva->read_in_progress = 1;
bev_async_add_read(beva);
}
return;
}
static void
@ -260,14 +335,19 @@ be_async_enable(struct bufferevent *buf, short what)
static int
be_async_disable(struct bufferevent *bev, short what)
{
struct bufferevent_async *bev_async = upcast(bev);
/* XXXX If we disable reading or writing, we may want to consider
* canceling any in-progress read or write operation, though it might
* not work. */
if (what & EV_READ)
if (what & EV_READ) {
BEV_DEL_GENERIC_READ_TIMEOUT(bev);
if (what & EV_WRITE)
bev_async_del_read(bev_async);
}
if (what & EV_WRITE) {
BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
bev_async_del_write(bev_async);
}
return 0;
}
@ -275,18 +355,36 @@ be_async_disable(struct bufferevent *bev, short what)
static void
be_async_destruct(struct bufferevent *bev)
{
struct bufferevent_async *bev_async = upcast(bev);
struct bufferevent_private *bev_p = BEV_UPCAST(bev);
evutil_socket_t fd;
EVUTIL_ASSERT(!upcast(bev)->write_in_progress && !upcast(bev)->read_in_progress);
EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
!upcast(bev)->read_in_progress);
bev_async_del_read(bev_async);
bev_async_del_write(bev_async);
/* XXX cancel any outstanding I/O operations */
fd = _evbuffer_overlapped_get_fd(bev->input);
/* delete this in case non-blocking connect was used */
event_del(&bev->ev_write);
if (bev_p->options & BEV_OPT_CLOSE_ON_FREE)
evutil_closesocket(fd);
_bufferevent_del_generic_timeout_cbs(bev);
/* delete this in case non-blocking connect was used */
if (event_initialized(&bev->ev_write)) {
event_del(&bev->ev_write);
_bufferevent_del_generic_timeout_cbs(bev);
}
}
/* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
* we use WSAGetOverlappedResult to translate. */
static void
bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
{
DWORD bytes, flags;
evutil_socket_t fd;
fd = _evbuffer_overlapped_get_fd(bev->input);
WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
}
static int
@ -303,15 +401,22 @@ connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
struct bufferevent_async *bev_a = upcast_connect(eo);
struct bufferevent *bev = &bev_a->bev.bev;
_bufferevent_incref_and_lock(bev);
BEV_LOCK(bev);
EVUTIL_ASSERT(bev_a->bev.connecting);
bev_a->bev.connecting = 0;
event_base_del_virtual(bev->ev_base);
if (ok)
bufferevent_async_set_connected(bev);
else
bev_async_set_wsa_error(bev, eo);
bufferevent_async_set_connected(bev);
_bufferevent_run_eventcb(bev,
ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR);
event_base_del_virtual(bev->ev_base);
_bufferevent_decref_and_unlock(bev);
}
@ -323,26 +428,32 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key,
struct bufferevent *bev = &bev_a->bev.bev;
short what = BEV_EVENT_READING;
_bufferevent_incref_and_lock(bev);
EVUTIL_ASSERT(bev_a->ok && bev_a->read_in_progress);
BEV_LOCK(bev);
EVUTIL_ASSERT(bev_a->read_in_progress);
evbuffer_commit_read(bev->input, nbytes);
bev_a->read_in_progress = 0;
if (ok && nbytes) {
BEV_RESET_GENERIC_READ_TIMEOUT(bev);
_bufferevent_decrement_read_buckets(&bev_a->bev, nbytes);
if (evbuffer_get_length(bev->input) >= bev->wm_read.low)
_bufferevent_run_readcb(bev);
bev_async_consider_reading(bev_a);
} else if (!ok) {
what |= BEV_EVENT_ERROR;
bev_a->ok = 0;
_bufferevent_run_eventcb(bev, what);
} else if (!nbytes) {
what |= BEV_EVENT_EOF;
bev_a->ok = 0;
_bufferevent_run_eventcb(bev, what);
if (!ok)
bev_async_set_wsa_error(bev, eo);
if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_READ_TIMEOUT(bev);
_bufferevent_decrement_read_buckets(&bev_a->bev,
nbytes);
if (evbuffer_get_length(bev->input) >= bev->wm_read.low)
_bufferevent_run_readcb(bev);
bev_async_consider_reading(bev_a);
} else if (!ok) {
what |= BEV_EVENT_ERROR;
bev_a->ok = 0;
_bufferevent_run_eventcb(bev, what);
} else if (!nbytes) {
what |= BEV_EVENT_EOF;
bev_a->ok = 0;
_bufferevent_run_eventcb(bev, what);
}
}
_bufferevent_decref_and_unlock(bev);
@ -356,26 +467,32 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key,
struct bufferevent *bev = &bev_a->bev.bev;
short what = BEV_EVENT_WRITING;
_bufferevent_incref_and_lock(bev);
EVUTIL_ASSERT(bev_a->ok && bev_a->write_in_progress);
BEV_LOCK(bev);
EVUTIL_ASSERT(bev_a->write_in_progress);
evbuffer_commit_write(bev->output, nbytes);
bev_a->write_in_progress = 0;
if (ok && nbytes) {
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
_bufferevent_decrement_write_buckets(&bev_a->bev, nbytes);
if (evbuffer_get_length(bev->output) <= bev->wm_write.low)
_bufferevent_run_writecb(bev);
bev_async_consider_writing(bev_a);
} else if (!ok) {
what |= BEV_EVENT_ERROR;
bev_a->ok = 0;
_bufferevent_run_eventcb(bev, what);
} else if (!nbytes) {
what |= BEV_EVENT_EOF;
bev_a->ok = 0;
_bufferevent_run_eventcb(bev, what);
if (!ok)
bev_async_set_wsa_error(bev, eo);
if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
_bufferevent_decrement_write_buckets(&bev_a->bev,
nbytes);
if (evbuffer_get_length(bev->output) <=
bev->wm_write.low)
_bufferevent_run_writecb(bev);
bev_async_consider_writing(bev_a);
} else if (!ok) {
what |= BEV_EVENT_ERROR;
bev_a->ok = 0;
_bufferevent_run_eventcb(bev, what);
} else if (!nbytes) {
what |= BEV_EVENT_EOF;
bev_a->ok = 0;
_bufferevent_run_eventcb(bev, what);
}
}
_bufferevent_decref_and_unlock(bev);
@ -423,8 +540,6 @@ bufferevent_async_new(struct event_base *base,
evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
evbuffer_defer_callbacks(bev->input, base);
evbuffer_defer_callbacks(bev->output, base);
event_overlapped_init(&bev_a->connect_overlapped, connect_complete);
event_overlapped_init(&bev_a->read_overlapped, read_complete);
@ -497,11 +612,16 @@ bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd,
WSAGetLastError() != WSAEINVAL)
return -1;
event_base_add_virtual(bev->ev_base);
bufferevent_incref(bev);
rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
&bev_async->connect_overlapped.overlapped);
if (rc || WSAGetLastError() == ERROR_IO_PENDING)
return 0;
event_base_del_virtual(bev->ev_base);
bufferevent_decref(bev);
return -1;
}

View File

@ -256,7 +256,8 @@ int _evbuffer_expand_fast(struct evbuffer *, size_t, int);
* Returns the number of vecs used.
*/
int _evbuffer_read_setup_vecs(struct evbuffer *buf, ev_ssize_t howmuch,
struct evbuffer_iovec *vecs, int n_vecs, struct evbuffer_chain ***chainp, int exact);
struct evbuffer_iovec *vecs, int n_vecs, struct evbuffer_chain ***chainp,
int exact);
/* Helper macro: copies an evbuffer_iovec in ei to a win32 WSABUF in i. */
#define WSABUF_FROM_EVBUFFER_IOV(i,ei) do { \

View File

@ -182,6 +182,8 @@ struct event_base {
/** Data to implement the common signal handelr code. */
struct evsig_info sig;
/** Number of virtual events */
int virtual_event_count;
/** Number of total events added to this event_base */
int event_count;
/** Number of total events active in this event_base */
@ -286,6 +288,7 @@ struct event_config_entry {
struct event_config {
TAILQ_HEAD(event_configq, event_config_entry) entries;
int n_cpus_hint;
enum event_method_feature require_features;
enum event_base_config_flag flags;
};
@ -316,6 +319,10 @@ int _evsig_restore_handler(struct event_base *base, int evsignal);
void event_active_nolock(struct event *ev, int res, short count);
/* FIXME document. */
void event_base_add_virtual(struct event_base *base);
void event_base_del_virtual(struct event_base *base);
#ifdef __cplusplus
}
#endif

67
event.c
View File

@ -615,19 +615,19 @@ event_base_new_with_config(const struct event_config *cfg)
#ifdef WIN32
if (cfg && (cfg->flags & EVENT_BASE_FLAG_STARTUP_IOCP))
event_base_start_iocp(base);
event_base_start_iocp(base, cfg->n_cpus_hint);
#endif
return (base);
}
int
event_base_start_iocp(struct event_base *base)
event_base_start_iocp(struct event_base *base, int n_cpus)
{
#ifdef WIN32
if (base->iocp)
return 0;
base->iocp = event_iocp_port_launch();
base->iocp = event_iocp_port_launch(n_cpus);
if (!base->iocp) {
event_warnx("%s: Couldn't launch IOCP", __func__);
return -1;
@ -638,6 +638,20 @@ event_base_start_iocp(struct event_base *base)
#endif
}
void
event_base_stop_iocp(struct event_base *base)
{
#ifdef WIN32
int rv;
if (!base->iocp)
return;
rv = event_iocp_shutdown(base->iocp, -1);
EVUTIL_ASSERT(rv >= 0);
base->iocp = NULL;
#endif
}
void
event_base_free(struct event_base *base)
{
@ -654,6 +668,10 @@ event_base_free(struct event_base *base)
/* XXX(niels) - check for internal events first */
EVUTIL_ASSERT(base);
#ifdef WIN32
event_base_stop_iocp(base);
#endif
/* threading fds if we have them */
if (base->th_notify_fd[0] != -1) {
event_del(&base->th_notify);
@ -921,6 +939,15 @@ event_config_require_features(struct event_config *cfg,
return (0);
}
int
event_config_set_num_cpus_hint(struct event_config *cfg, int cpus)
{
if (!cfg)
return (-1);
cfg->n_cpus_hint = cpus;
return (0);
}
int
event_priority_init(int npriorities)
{
@ -965,7 +992,7 @@ static int
event_haveevents(struct event_base *base)
{
/* Caller must hold th_base_lock */
return (base->event_count > 0);
return (base->virtual_event_count > 0 || base->event_count > 0);
}
/* "closure" function called when processing active signal events */
@ -1279,9 +1306,10 @@ event_process_active_single_queue(struct event_base *base,
}
/*
Process all the defered_cb entries in 'queue'. If *breakptr becomes set to
1, stop. Requires that we start out holding the lock on 'queue'; releases
the lock around 'queue' for each deferred_cb we process.
Process up to MAX_DEFERRED of the defered_cb entries in 'queue'. If
*breakptr becomes set to 1, stop. Requires that we start out holding
the lock on 'queue'; releases the lock around 'queue' for each deferred_cb
we process.
*/
static int
event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr)
@ -1289,6 +1317,7 @@ event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr)
int count = 0;
struct deferred_cb *cb;
#define MAX_DEFERRED 16
while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) {
cb->queued = 0;
TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next);
@ -1296,12 +1325,14 @@ event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr)
UNLOCK_DEFERRED_QUEUE(queue);
cb->cb(cb, cb->arg);
++count;
if (*breakptr)
return -1;
LOCK_DEFERRED_QUEUE(queue);
if (*breakptr)
return -1;
if (++count == MAX_DEFERRED)
break;
}
#undef MAX_DEFERRED
return count;
}
@ -2723,3 +2754,19 @@ event_base_dump_events(struct event_base *base, FILE *output)
}
}
}
void
event_base_add_virtual(struct event_base *base)
{
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
base->virtual_event_count++;
EVBASE_RELEASE_LOCK(base, th_base_lock);
}
void
event_base_del_virtual(struct event_base *base)
{
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
base->virtual_event_count--;
EVBASE_RELEASE_LOCK(base, th_base_lock);
}

View File

@ -36,6 +36,7 @@
#include "log-internal.h"
#include "mm-internal.h"
#include "event-internal.h"
#include "evthread-internal.h"
#define NOTIFICATION_KEY ((ULONG_PTR)-1)
@ -73,7 +74,8 @@ loop(void *_port)
EnterCriticalSection(&port->lock);
if (port->shutdown) {
if (--port->n_live_threads == 0)
ReleaseSemaphore(port->shutdownSemaphore, 1, NULL);
ReleaseSemaphore(port->shutdownSemaphore, 1,
NULL);
LeaveCriticalSection(&port->lock);
return;
}
@ -160,8 +162,10 @@ event_get_win32_extension_fns(void)
return &the_extension_fns;
}
#define N_CPUS_DEFAULT 2
struct event_iocp_port *
event_iocp_port_launch(void)
event_iocp_port_launch(int n_cpus)
{
struct event_iocp_port *port;
int i;
@ -171,12 +175,16 @@ event_iocp_port_launch(void)
if (!(port = mm_calloc(1, sizeof(struct event_iocp_port))))
return NULL;
port->n_threads = 2;
if (n_cpus <= 0)
n_cpus = N_CPUS_DEFAULT;
port->n_threads = n_cpus * 2;
port->threads = calloc(port->n_threads, sizeof(HANDLE));
if (!port->threads)
goto err;
port->port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, port->n_threads);
port->port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0,
n_cpus);
port->ms = -1;
if (!port->port)
goto err;
@ -233,13 +241,18 @@ event_iocp_notify_all(struct event_iocp_port *port)
int
event_iocp_shutdown(struct event_iocp_port *port, long waitMsec)
{
DWORD ms = INFINITE;
int n;
EnterCriticalSection(&port->lock);
port->shutdown = 1;
LeaveCriticalSection(&port->lock);
event_iocp_notify_all(port);
WaitForSingleObject(port->shutdownSemaphore, waitMsec);
if (waitMsec >= 0)
ms = waitMsec;
WaitForSingleObject(port->shutdownSemaphore, ms);
EnterCriticalSection(&port->lock);
n = port->n_live_threads;
LeaveCriticalSection(&port->lock);
@ -271,4 +284,3 @@ event_base_get_iocp(struct event_base *base)
return NULL;
#endif
}

View File

@ -226,6 +226,16 @@ int event_config_require_features(struct event_config *cfg, int feature);
* will be initialized, and how they'll work. */
int event_config_set_flag(struct event_config *cfg, int flag);
/**
* Records a hint for the number of CPUs in the system. This is used for
* tuning thread pools, etc, for optimal performance.
*
* @param cfg the event configuration object
* @param cpus the number of cpus
* @return 0 on success, -1 on failure.
*/
int event_config_set_num_cpus_hint(struct event_config *cfg, int cpus);
/**
Initialize the event API.

View File

@ -155,7 +155,7 @@ void evbuffer_commit_write(struct evbuffer *, ev_ssize_t);
This interface is unstable, and will change.
*/
struct event_iocp_port *event_iocp_port_launch(void);
struct event_iocp_port *event_iocp_port_launch(int n_cpus);
/** Associate a file descriptor with an iocp, such that overlapped IO on the
fd will happen on one of the iocp's worker threads.
@ -164,9 +164,10 @@ int event_iocp_port_associate(struct event_iocp_port *port, evutil_socket_t fd,
ev_uintptr_t key);
/** Tell all threads serving an iocp to stop. Wait for up to waitMsec for all
the threads to finish whatever they're doing. If all the threads are
done, free the port and return 0. Otherwise, return -1. If you get a -1
return value, it is safe to call this function again.
the threads to finish whatever they're doing. If waitMsec is -1, wait
as long as required. If all the threads are done, free the port and return
0. Otherwise, return -1. If you get a -1 return value, it is safe to call
this function again.
*/
int event_iocp_shutdown(struct event_iocp_port *port, long waitMsec);
@ -180,7 +181,8 @@ struct event_base;
struct event_iocp_port *event_base_get_iocp(struct event_base *base);
/* FIXME document. */
int event_base_start_iocp(struct event_base *base);
int event_base_start_iocp(struct event_base *base, int n_cpus);
void event_base_stop_iocp(struct event_base *base);
/* FIXME document. */
struct bufferevent *bufferevent_async_new(struct event_base *base,

View File

@ -133,7 +133,7 @@ main(int argc, char **argv)
case 'i':
use_iocp = 1;
evthread_use_windows_threads();
event_base_start_iocp(base);
event_base_start_iocp(base, 0);
break;
#endif
default:

View File

@ -426,9 +426,8 @@ listen_cb(struct evconnlistener *listener, evutil_socket_t fd,
TT_BLATHER(("Got a request on socket %d", (int)fd ));
bev = bufferevent_socket_new(base, fd, bufferevent_connect_test_flags);
tt_assert(bev);
bufferevent_write(bev, s, sizeof(s));
bufferevent_setcb(bev, NULL, sender_writecb, sender_errorcb, NULL);
bufferevent_enable(bev, EV_WRITE);
bufferevent_write(bev, s, sizeof(s));
end:
;
}
@ -527,13 +526,6 @@ test_bufferevent_connect(void *arg)
tt_want(!bufferevent_socket_connect(bev1, sa, sizeof(localhost)));
tt_want(!bufferevent_socket_connect(bev2, sa, sizeof(localhost)));
#ifdef WIN32
/* FIXME this is to get IOCP to work. it shouldn't be required. */
{
struct timeval tv = {5000,0};
event_base_loopexit(data->base, &tv);
}
#endif
event_base_dispatch(data->base);
tt_int_op(n_strings_read, ==, 2);
@ -622,13 +614,6 @@ test_bufferevent_connect_fail(void *arg)
event_add(&close_listener_event, &one_second);
close_listener_event_added = 1;
#ifdef WIN32
/* FIXME this is to get IOCP to work. it shouldn't be required. */
{
struct timeval tv = {5000,0};
event_base_loopexit(data->base, &tv);
}
#endif
event_base_dispatch(data->base);
tt_int_op(test_ok, ==, 1);

View File

@ -29,6 +29,7 @@
#include <event2/event.h>
#include <event2/thread.h>
#include <event2/buffer.h>
#include <event2/buffer_compat.h>
#include <event2/bufferevent.h>
#include <winsock2.h>
@ -44,6 +45,7 @@
#undef WIN32_LEAN_AND_MEAN
#include "iocp-internal.h"
#include "evbuffer-internal.h"
#include "evthread-internal.h"
/* FIXME remove these ones */
@ -52,6 +54,62 @@
#include "event-internal.h"
#define MAX_CALLS 16
static void *count_lock = NULL, *count_cond = NULL;
static int count = 0;
static void
count_init(void)
{
EVTHREAD_ALLOC_LOCK(count_lock, 0);
EVTHREAD_ALLOC_COND(count_cond);
tt_assert(count_lock);
tt_assert(count_cond);
end:
;
}
static void
count_free(void)
{
EVTHREAD_FREE_LOCK(count_lock, 0);
EVTHREAD_FREE_COND(count_cond);
}
static void
count_incr(void)
{
EVLOCK_LOCK(count_lock, 0);
count++;
EVTHREAD_COND_BROADCAST(count_cond);
EVLOCK_UNLOCK(count_lock, 0);
}
static int
count_wait_for(int i, int ms)
{
struct timeval tv;
DWORD elapsed;
int rv = -1;
EVLOCK_LOCK(count_lock, 0);
while (ms > 0 && count != i) {
tv.tv_sec = 0;
tv.tv_usec = ms * 1000;
elapsed = GetTickCount();
EVTHREAD_COND_WAIT_TIMED(count_cond, count_lock, &tv);
elapsed = GetTickCount() - elapsed;
ms -= elapsed;
}
if (count == i)
rv = 0;
EVLOCK_UNLOCK(count_lock, 0);
return rv;
}
struct dummy_overlapped {
struct event_overlapped eo;
void *lock;
@ -73,6 +131,8 @@ dummy_cb(struct event_overlapped *o, uintptr_t key, ev_ssize_t n, int ok)
}
d_o->call_count++;
EVLOCK_UNLOCK(d_o->lock, 0);
count_incr();
}
static int
@ -100,6 +160,7 @@ test_iocp_port(void *ptr)
memset(&o1, 0, sizeof(o1));
memset(&o2, 0, sizeof(o2));
count_init();
EVTHREAD_ALLOC_LOCK(o1.lock, EVTHREAD_LOCKTYPE_RECURSIVE);
EVTHREAD_ALLOC_LOCK(o2.lock, EVTHREAD_LOCKTYPE_RECURSIVE);
@ -109,7 +170,7 @@ test_iocp_port(void *ptr)
event_overlapped_init(&o1.eo, dummy_cb);
event_overlapped_init(&o2.eo, dummy_cb);
port = event_iocp_port_launch();
port = event_iocp_port_launch(0);
tt_assert(port);
tt_assert(!event_iocp_activate_overlapped(port, &o1.eo, 10, 100));
@ -124,10 +185,7 @@ test_iocp_port(void *ptr)
tt_assert(!event_iocp_activate_overlapped(port, &o1.eo, 13, 103));
tt_assert(!event_iocp_activate_overlapped(port, &o2.eo, 23, 203));
#ifdef WIN32
/* FIXME Be smarter. */
Sleep(1000);
#endif
tt_int_op(count_wait_for(8, 2000), ==, 0);
tt_want(!event_iocp_shutdown(port, 2000));
@ -145,8 +203,9 @@ test_iocp_port(void *ptr)
tt_want(pair_is_in(&o2, 23, 203));
end:
/* FIXME free the locks. */
;
EVTHREAD_FREE_LOCK(o1.lock, EVTHREAD_LOCKTYPE_RECURSIVE);
EVTHREAD_FREE_LOCK(o2.lock, EVTHREAD_LOCKTYPE_RECURSIVE);
count_free();
}
static struct evbuffer *rbuf = NULL, *wbuf = NULL;
@ -157,6 +216,7 @@ read_complete(struct event_overlapped *eo, uintptr_t key,
{
tt_assert(ok);
evbuffer_commit_read(rbuf, nbytes);
count_incr();
end:
;
}
@ -167,6 +227,7 @@ write_complete(struct event_overlapped *eo, uintptr_t key,
{
tt_assert(ok);
evbuffer_commit_write(wbuf, nbytes);
count_incr();
end:
;
}
@ -177,9 +238,12 @@ test_iocp_evbuffer(void *ptr)
struct event_overlapped rol, wol;
struct basic_test_data *data = ptr;
struct event_iocp_port *port = NULL;
struct evbuffer *buf;
struct evbuffer_chain *chain;
char junk[1024];
int i;
count_init();
event_overlapped_init(&rol, read_complete);
event_overlapped_init(&wol, write_complete);
@ -191,7 +255,7 @@ test_iocp_evbuffer(void *ptr)
evbuffer_enable_locking(rbuf, NULL);
evbuffer_enable_locking(wbuf, NULL);
port = event_iocp_port_launch();
port = event_iocp_port_launch(0);
tt_assert(port);
tt_assert(rbuf);
tt_assert(wbuf);
@ -202,14 +266,18 @@ test_iocp_evbuffer(void *ptr)
for (i=0;i<10;++i)
evbuffer_add(wbuf, junk, sizeof(junk));
buf = evbuffer_new();
tt_assert(buf != NULL);
evbuffer_add(rbuf, junk, sizeof(junk));
tt_assert(!evbuffer_launch_read(rbuf, 2048, &rol));
evbuffer_add_buffer(buf, rbuf);
tt_int_op(evbuffer_get_length(buf), ==, sizeof(junk));
for (chain = buf->first; chain; chain = chain->next)
tt_int_op(chain->flags & EVBUFFER_MEM_PINNED_ANY, ==, 0);
tt_assert(!evbuffer_get_length(rbuf));
tt_assert(!evbuffer_launch_write(wbuf, 512, &wol));
tt_assert(!evbuffer_launch_read(rbuf, 2048, &rol));
#ifdef WIN32
/* FIXME this is stupid. */
Sleep(1000);
#endif
tt_int_op(count_wait_for(2, 2000), ==, 0);
tt_int_op(evbuffer_get_length(rbuf),==,512);
@ -217,8 +285,20 @@ test_iocp_evbuffer(void *ptr)
tt_want(!event_iocp_shutdown(port, 2000));
end:
count_free();
evbuffer_free(rbuf);
evbuffer_free(wbuf);
evbuffer_free(buf);
}
static int got_readcb = 0;
static void
async_readcb(struct bufferevent *bev, void *arg)
{
/* Disabling read should cause the loop to quit */
bufferevent_disable(bev, EV_READ);
got_readcb++;
}
static void
@ -229,9 +309,8 @@ test_iocp_bufferevent_async(void *ptr)
struct bufferevent *bea1=NULL, *bea2=NULL;
char buf[128];
size_t n;
struct timeval one_sec = {1,0};
event_base_start_iocp(data->base);
event_base_start_iocp(data->base, 0);
port = event_base_get_iocp(data->base);
tt_assert(port);
@ -242,27 +321,27 @@ test_iocp_bufferevent_async(void *ptr)
tt_assert(bea1);
tt_assert(bea2);
/*FIXME set some callbacks */
bufferevent_setcb(bea2, async_readcb, NULL, NULL, NULL);
bufferevent_enable(bea1, EV_WRITE);
bufferevent_enable(bea2, EV_READ);
bufferevent_write(bea1, "Hello world", strlen("Hello world")+1);
event_base_loopexit(data->base, &one_sec);
event_base_dispatch(data->base);
tt_int_op(got_readcb, ==, 1);
n = bufferevent_read(bea2, buf, sizeof(buf)-1);
buf[n]='\0';
tt_str_op(buf, ==, "Hello world");
tt_want(!event_iocp_shutdown(port, 2000));
end:
/* FIXME: free stuff. */;
bufferevent_free(bea1);
bufferevent_free(bea2);
}
struct testcase_t iocp_testcases[] = {
{ "port", test_iocp_port, TT_FORK|TT_NEED_THREADS, NULL, NULL },
{ "port", test_iocp_port, TT_FORK|TT_NEED_THREADS, &basic_setup, NULL },
{ "evbuffer", test_iocp_evbuffer,
TT_FORK|TT_NEED_SOCKETPAIR|TT_NEED_THREADS,
&basic_setup, NULL },

View File

@ -209,7 +209,7 @@ basic_test_setup(const struct testcase_t *testcase)
exit(1);
}
if (testcase->flags & TT_ENABLE_IOCP_FLAG) {
if (event_base_start_iocp(base)<0) {
if (event_base_start_iocp(base, 0)<0) {
event_base_free(base);
return (void*)TT_SKIP;
}

View File

@ -33,6 +33,9 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifndef WIN32
#include <unistd.h>
#endif
#ifdef _EVENT_HAVE_PTHREADS
#include <pthread.h>
@ -52,6 +55,7 @@
#include "event2/thread.h"
#include "evthread-internal.h"
#include "event-internal.h"
#include "defer-internal.h"
#include "regress.h"
#include "tinytest_macros.h"
@ -386,6 +390,95 @@ end:
;
}
#define CB_COUNT 128
#define QUEUE_THREAD_COUNT 8
#ifdef WIN32
#define SLEEP_MS(ms) Sleep(ms)
#else
#define SLEEP_MS(ms) usleep((ms) * 1000)
#endif
struct deferred_test_data {
struct deferred_cb cbs[CB_COUNT];
struct deferred_cb_queue *queue;
};
static time_t timer_start = 0;
static time_t timer_end = 0;
static unsigned callback_count = 0;
static THREAD_T load_threads[QUEUE_THREAD_COUNT];
static struct deferred_test_data deferred_data[QUEUE_THREAD_COUNT];
static void
deferred_callback(struct deferred_cb *cb, void *arg)
{
SLEEP_MS(1);
callback_count += 1;
}
static THREAD_FN
load_deferred_queue(void *arg)
{
struct deferred_test_data *data = arg;
size_t i;
for (i = 0; i < CB_COUNT; ++i) {
event_deferred_cb_init(&data->cbs[i], deferred_callback, NULL);
event_deferred_cb_schedule(data->queue, &data->cbs[i]);
SLEEP_MS(1);
}
THREAD_RETURN();
}
static void
timer_callback(evutil_socket_t fd, short what, void *arg)
{
timer_end = time(NULL);
}
static void
start_threads_callback(evutil_socket_t fd, short what, void *arg)
{
int i;
for (i = 0; i < QUEUE_THREAD_COUNT; ++i) {
THREAD_START(load_threads[i], load_deferred_queue,
&deferred_data[i]);
}
}
static void
thread_deferred_cb_skew(void *arg)
{
struct basic_test_data *data = arg;
struct timeval tv_timer = {4, 0};
struct event event_threads;
struct deferred_cb_queue *queue;
int i;
queue = event_base_get_deferred_cb_queue(data->base);
tt_assert(queue);
for (i = 0; i < QUEUE_THREAD_COUNT; ++i)
deferred_data[i].queue = queue;
timer_start = time(NULL);
event_base_once(data->base, -1, EV_TIMEOUT, timer_callback, NULL,
&tv_timer);
event_base_once(data->base, -1, EV_TIMEOUT, start_threads_callback,
NULL, NULL);
event_base_dispatch(data->base);
TT_BLATHER(("callback count, %u", callback_count));
tt_int_op(timer_end - timer_start, ==, 4);
end:
for (i = 0; i < QUEUE_THREAD_COUNT; ++i)
THREAD_JOIN(load_threads[i]);
}
#define TEST(name) \
{ #name, thread_##name, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE, \
&basic_setup, NULL }
@ -398,6 +491,7 @@ struct testcase_t thread_testcases[] = {
&basic_setup, (char*)"forking" },
#endif
TEST(conditions_simple),
TEST(deferred_cb_skew),
END_OF_TESTCASES
};