diff --git a/buffer_iocp.c b/buffer_iocp.c index b52f84d6..cab782a3 100644 --- a/buffer_iocp.c +++ b/buffer_iocp.c @@ -316,3 +316,14 @@ _evbuffer_overlapped_get_fd(struct evbuffer *buf) struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf); return buf_o ? buf_o->fd : -1; } + +void +_evbuffer_overlapped_set_fd(struct evbuffer *buf, evutil_socket_t fd) +{ + struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf); + EVBUFFER_LOCK(buf, EVTHREAD_WRITE); + /* XXX is this right?, should it cancel current I/O operations? */ + if (buf_o) + buf_o->fd = fd; + EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE); +} diff --git a/bufferevent-internal.h b/bufferevent-internal.h index eec60e1a..4ee6d908 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -141,6 +141,17 @@ extern const struct bufferevent_ops bufferevent_ops_socket; extern const struct bufferevent_ops bufferevent_ops_filter; extern const struct bufferevent_ops bufferevent_ops_pair; +#define BEV_IS_SOCKET(bevp) ((bevp)->be_ops == &bufferevent_ops_socket) +#define BEV_IS_FILTER(bevp) ((bevp)->be_ops == &bufferevent_ops_filter) +#define BEV_IS_PAIR(bevp) ((bevp)->be_ops == &bufferevent_ops_pair) + +#ifdef WIN32 +extern const struct bufferevent_ops bufferevent_ops_async; +#define BEV_IS_ASYNC(bevp) ((bevp)->be_ops == &bufferevent_ops_async) +#else +#define BEV_IS_ASYNC(bevp) 0 +#endif + /** Initialize the shared parts of a bufferevent. */ int bufferevent_init_common(struct bufferevent_private *, struct event_base *, const struct bufferevent_ops *, enum bufferevent_options options); diff --git a/bufferevent_async.c b/bufferevent_async.c index 89c674a6..d34051ce 100644 --- a/bufferevent_async.c +++ b/bufferevent_async.c @@ -45,6 +45,7 @@ #ifdef WIN32 #include +#include #endif #include "event2/util.h" @@ -78,6 +79,7 @@ const struct bufferevent_ops bufferevent_ops_async = { struct bufferevent_async { struct bufferevent_private bev; + struct event_overlapped connect_overlapped; unsigned read_in_progress : 1; unsigned write_in_progress : 1; }; @@ -93,6 +95,15 @@ upcast(struct bufferevent *bev) return bev_a; } +static inline struct bufferevent_async * +upcast_overlapped(struct event_overlapped *eo) +{ + struct bufferevent_async *bev_a; + bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped); + EVUTIL_ASSERT(bev_a->bev.bev.be_ops == &bufferevent_ops_async); + return bev_a; +} + static void bev_async_consider_writing(struct bufferevent_async *b) { @@ -244,6 +255,24 @@ be_async_flush(struct bufferevent *bev, short what, return 0; } +static void +connect_complete(struct event_overlapped *eo, uintptr_t key, + ev_ssize_t nbytes, int ok) +{ + struct bufferevent_async *bev_a = upcast_overlapped(eo); + struct bufferevent *bev = &bev_a->bev.bev; /* XXX locking issue ? */ + + _bufferevent_incref_and_lock(bev); + + EVUTIL_ASSERT(bev_a->bev.connecting); + bev_a->bev.connecting = 0; + + _bufferevent_run_eventcb(bev, + ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR); + + _bufferevent_decref_and_unlock(bev); +} + struct bufferevent * bufferevent_async_new(struct event_base *base, evutil_socket_t fd, int options) @@ -257,8 +286,14 @@ bufferevent_async_new(struct event_base *base, if (!(iocp = event_base_get_iocp(base))) return NULL; - if (event_iocp_port_associate(iocp, fd, 1)<0) - return NULL; + if (fd >= 0 && event_iocp_port_associate(iocp, fd, 1)<0) { + int err = GetLastError(); + /* We may have alrady associated this fd with a port. + * Let's hope it's this port, and that the error code + * for doing this neer changes. */ + if (err != ERROR_INVALID_PARAMETER) + return NULL; + } if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async)))) return NULL; @@ -283,14 +318,72 @@ bufferevent_async_new(struct event_base *base, evbuffer_defer_callbacks(bev->input, base); evbuffer_defer_callbacks(bev->output, base); + evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev); _bufferevent_init_generic_timeout_cbs(&bev_a->bev.bev); + event_overlapped_init(&bev_a->connect_overlapped, connect_complete); + return bev; err: bufferevent_free(&bev_a->bev.bev); return NULL; } +int +bufferevent_async_can_connect(struct bufferevent *bev) +{ + const struct win32_extension_fns *ext = + event_get_win32_extension_fns(); + + if (BEV_IS_ASYNC(bev) && + event_base_get_iocp(bev->ev_base) && + ext && ext->ConnectEx) + return 1; + + return 0; +} + +int +bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd, + const struct sockaddr *sa, int socklen) +{ + BOOL rc; + struct bufferevent_async *bev_async = upcast(bev); + struct sockaddr_storage ss; + const struct win32_extension_fns *ext = + event_get_win32_extension_fns(); + + EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL); + + /* ConnectEx() requires that the socket be bound to an address + * with bind() before using, otherwise it will fail. We attempt + * to issue a bind() here, taking into account that the error + * code is set to WSAEINVAL when the socket is already bound. */ + memset(&ss, 0, sizeof(ss)); + if (sa->sa_family == AF_INET) { + struct sockaddr_in *sin = (struct sockaddr_in *)&ss; + sin->sin_family = AF_INET; + sin->sin_addr.s_addr = INADDR_ANY; + } else if (sa->sa_family == AF_INET6) { + struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss; + sin6->sin6_family = AF_INET6; + sin6->sin6_addr = in6addr_any; + } else { + /* XXX: what to do? */ + return -1; + } + if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 && + WSAGetLastError() != WSAEINVAL) + return -1; + + rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL, + &bev_async->connect_overlapped.overlapped); + if (rc || WSAGetLastError() == ERROR_IO_PENDING) + return 0; + + return -1; +} + static int be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, union bufferevent_ctrl_data *data) @@ -299,7 +392,19 @@ be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, case BEV_CTRL_GET_FD: data->fd = _evbuffer_overlapped_get_fd(bev->input); return 0; - case BEV_CTRL_SET_FD: + case BEV_CTRL_SET_FD: { + struct event_iocp_port *iocp; + + if (data->fd == _evbuffer_overlapped_get_fd(bev->input)) + return 0; + if (!(iocp = event_base_get_iocp(bev->ev_base))) + return -1; + if (event_iocp_port_associate(iocp, data->fd, 1) < 0) + return -1; + _evbuffer_overlapped_set_fd(bev->input, data->fd); + _evbuffer_overlapped_set_fd(bev->output, data->fd); + return 0; + } case BEV_CTRL_GET_UNDERLYING: default: return -1; diff --git a/bufferevent_sock.c b/bufferevent_sock.c index c8fd1c7b..0b15b6b1 100644 --- a/bufferevent_sock.c +++ b/bufferevent_sock.c @@ -215,7 +215,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) } else { connected = 1; _bufferevent_run_eventcb(bufev, BEV_EVENT_CONNECTED); - if (!(bufev->enabled & EV_WRITE)) { + if (!(bufev->enabled & EV_WRITE) || BEV_IS_ASYNC(bufev)) { event_del(&bufev->ev_write); goto done; } @@ -332,18 +332,33 @@ bufferevent_socket_connect(struct bufferevent *bev, ownfd = 1; } if (sa) { - r = evutil_socket_connect(&fd, sa, socklen); - if (r < 0) { - _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); - if (ownfd) - EVUTIL_CLOSESOCKET(fd); - /* do something about the error? */ +#ifdef WIN32 + if (bufferevent_async_can_connect(bev)) { + bufferevent_setfd(bev, fd); + r = bufferevent_async_connect(bev, fd, sa, socklen); + if (r < 0) + goto freesock; + bufev_p->connecting = 1; + result = 0; goto done; - } + } else +#endif + r = evutil_socket_connect(&fd, sa, socklen); + if (r < 0) + goto freesock; } +#ifdef WIN32 + /* ConnectEx() isn't always around, even when IOCP is enabled. + * Here, we borrow the socket object's write handler to fall back + * on a non-blocking connect() when ConnectEx() is unavailable. */ + if (BEV_IS_ASYNC(bev)) { + event_assign(&bev->ev_write, bev->ev_base, fd, + EV_WRITE|EV_PERSIST, bufferevent_writecb, bev); + } +#endif bufferevent_setfd(bev, fd); if (r == 0) { - if (! bufferevent_enable(bev, EV_WRITE)) { + if (! be_socket_enable(bev, EV_WRITE)) { bufev_p->connecting = 1; result = 0; goto done; @@ -354,6 +369,14 @@ bufferevent_socket_connect(struct bufferevent *bev, _bufferevent_run_eventcb(bev, BEV_EVENT_CONNECTED); } + goto done; + +freesock: + _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); + if (ownfd) + EVUTIL_CLOSESOCKET(fd); + /* do something about the error? */ + done: _bufferevent_decref_and_unlock(bev); return result; diff --git a/iocp-internal.h b/iocp-internal.h index 18b89d25..c0500ace 100644 --- a/iocp-internal.h +++ b/iocp-internal.h @@ -117,6 +117,8 @@ struct evbuffer *evbuffer_overlapped_new(evutil_socket_t fd); /** XXXX Document (nickm) */ evutil_socket_t _evbuffer_overlapped_get_fd(struct evbuffer *buf); +void _evbuffer_overlapped_set_fd(struct evbuffer *buf, evutil_socket_t fd); + /** Start reading data onto the end of an overlapped evbuffer. An evbuffer can only have one read pending at a time. While the read @@ -176,6 +178,10 @@ int event_base_start_iocp(struct event_base *base); struct bufferevent *bufferevent_async_new(struct event_base *base, evutil_socket_t fd, int options); +/* FIXME document. */ +int bufferevent_async_can_connect(struct bufferevent *bev); +int bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd, + const struct sockaddr *sa, int socklen); #ifdef __cplusplus } diff --git a/test/regress.h b/test/regress.h index a586c4f7..0593a69b 100644 --- a/test/regress.h +++ b/test/regress.h @@ -84,7 +84,8 @@ void run_legacy_test_fn(void *ptr); #define TT_LEGACY (TT_FIRST_USER_FLAG<<3) #define TT_NEED_THREADS (TT_FIRST_USER_FLAG<<4) #define TT_NO_LOGS (TT_FIRST_USER_FLAG<<5) -#define TT_ENABLE_IOCP (TT_FIRST_USER_FLAG<<6) +#define TT_ENABLE_IOCP_FLAG (TT_FIRST_USER_FLAG<<6) +#define TT_ENABLE_IOCP (TT_ENABLE_IOCP_FLAG|TT_NEED_THREADS) /* All the flags that a legacy test needs. */ #define TT_ISOLATED TT_FORK|TT_NEED_SOCKETPAIR|TT_NEED_BASE diff --git a/test/regress_bufferevent.c b/test/regress_bufferevent.c index 7658741c..257abe56 100644 --- a/test/regress_bufferevent.c +++ b/test/regress_bufferevent.c @@ -73,6 +73,9 @@ #include "event2/util.h" #include "bufferevent-internal.h" +#ifdef WIN32 +#include "iocp-internal.h" +#endif #include "regress.h" @@ -412,10 +415,14 @@ listen_cb(struct evconnlistener *listener, evutil_socket_t fd, struct event_base *base = arg; struct bufferevent *bev; const char s[] = TEST_STR; + TT_BLATHER(("Got a request on socket %d", (int)fd )); bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); + tt_assert(bev); bufferevent_write(bev, s, sizeof(s)); bufferevent_setcb(bev, NULL, sender_writecb, sender_errorcb, NULL); bufferevent_enable(bev, EV_WRITE); +end: + ; } static void @@ -459,6 +466,14 @@ test_bufferevent_connect(void *arg) if (strstr((char*)data->setup_data, "lock")) { be_flags |= BEV_OPT_THREADSAFE; } +#ifdef WIN32 + if (!strcmp((char*)data->setup_data, "unset_connectex")) { + struct win32_extension_fns *ext = + (struct win32_extension_fns *) + event_get_win32_extension_fns(); + ext->ConnectEx = NULL; + } +#endif memset(&localhost, 0, sizeof(localhost)); @@ -616,17 +631,23 @@ struct testcase_t bufferevent_iocp_testcases[] = { LEGACY(bufferevent_watermarks, TT_ISOLATED|TT_ENABLE_IOCP), LEGACY(bufferevent_filters, TT_ISOLATED|TT_ENABLE_IOCP), #if 0 - { "bufferevent_connect", test_bufferevent_connect, TT_FORK|TT_NEED_BASE, - &basic_setup, (void*)"" }, + { "bufferevent_connect", test_bufferevent_connect, + TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"" }, { "bufferevent_connect_defer", test_bufferevent_connect, - TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" }, + TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"defer" }, { "bufferevent_connect_lock", test_bufferevent_connect, - TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, (void*)"lock" }, + TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup, + (void*)"lock" }, { "bufferevent_connect_lock_defer", test_bufferevent_connect, - TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, + TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup, (void*)"defer lock" }, +#endif { "bufferevent_connect_fail", test_bufferevent_connect_fail, - TT_FORK|TT_NEED_BASE, &basic_setup, NULL }, + TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, NULL }, +#if 0 + { "bufferevent_connect_nonblocking", test_bufferevent_connect, + TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, + (void*)"unset_connectex" }, #endif END_OF_TESTCASES, diff --git a/test/regress_iocp.c b/test/regress_iocp.c index 2841f4a3..f1e9af6e 100644 --- a/test/regress_iocp.c +++ b/test/regress_iocp.c @@ -38,6 +38,11 @@ #include "tinytest.h" #include "tinytest_macros.h" +#define WIN32_LEAN_AND_MEAN +#include +#include +#undef WIN32_LEAN_AND_MEAN + #include "iocp-internal.h" #include "evthread-internal.h" diff --git a/test/regress_main.c b/test/regress_main.c index 2e383971..f2528025 100644 --- a/test/regress_main.c +++ b/test/regress_main.c @@ -135,7 +135,7 @@ basic_test_setup(const struct testcase_t *testcase) struct basic_test_data *data = NULL; #ifndef WIN32 - if (testcase->flags & TT_ENABLE_IOCP) + if (testcase->flags & TT_ENABLE_IOCP_FLAG) return (void*)TT_SKIP; #endif @@ -177,7 +177,7 @@ basic_test_setup(const struct testcase_t *testcase) if (!base) exit(1); } - if (testcase->flags & TT_ENABLE_IOCP) { + if (testcase->flags & TT_ENABLE_IOCP_FLAG) { if (event_base_start_iocp(base)<0) { event_base_free(base); return (void*)TT_SKIP;