From 0aa6f5136a10d4d6426ab4abb10ed4cbbde208bd Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 2 Nov 2009 20:59:13 +0000 Subject: [PATCH] Fix remaining AcceptEx issues. svn:r1492 --- listener.c | 104 +++++++++++++++++++++++----------------- test/regress_listener.c | 6 +-- 2 files changed, 62 insertions(+), 48 deletions(-) diff --git a/listener.c b/listener.c index 553eb4f4..7e49ed0d 100644 --- a/listener.c +++ b/listener.c @@ -301,6 +301,7 @@ struct accepting_socket { static void accepted_socket_cb(struct event_overlapped *o, uintptr_t key, ev_ssize_t n, int ok); +static void accepted_socket_invoke_user_cb(struct deferred_cb *cb, void *arg); static struct accepting_socket * new_accepting_socket(struct evconnlistener_iocp *lev, int family) @@ -327,6 +328,9 @@ new_accepting_socket(struct evconnlistener_iocp *lev, int family) res->buflen = buflen; res->family = family; + event_deferred_cb_init(&res->deferred, + accepted_socket_invoke_user_cb, res); + InitializeCriticalSection(&res->lock); return res; @@ -386,7 +390,6 @@ start_accepting(struct accepting_socket *as) } done: - LeaveCriticalSection(&as->lock); return result; } @@ -402,39 +405,40 @@ stop_accepting(struct accepting_socket *as) static void accepted_socket_invoke_user_cb(struct deferred_cb *cb, void *arg) { - struct *as = arg; - evconnlistener_cb cb; + struct accepting_socket *as = arg; + + struct sockaddr *sa_local=NULL, *sa_remote=NULL; + int socklen_local=0, socklen_remote=0; + const struct win32_extension_fns *ext = + event_get_win32_extension_fns(); + + EVUTIL_ASSERT(ext->GetAcceptExSockaddrs); EnterCriticalSection(&as->lock); + if (as->free_on_cb) { + free_and_unlock_accepting_socket(as); + return; + } + ext->GetAcceptExSockaddrs(as->addrbuf, 0, as->buflen/2, as->buflen/2, &sa_local, &socklen_local, &sa_remote, &socklen_remote); - /* XXXX should we/can we release the lock here? */ as->lev->base.cb(&as->lev->base, as->s, sa_remote, socklen_remote, as->lev->base.user_data); as->s = INVALID_SOCKET; - if (as->free_on_cb) { - free_and_unlock_accepting_socket(as); - } else { - start_accepting(as);/*XXX handle error */ - LeaveCriticalSection(&as->lock); - } + start_accepting(as);/*XXX handle error */ + LeaveCriticalSection(&as->lock); } static void accepted_socket_cb(struct event_overlapped *o, uintptr_t key, ev_ssize_t n, int ok) { - struct sockaddr *sa_local=NULL, *sa_remote=NULL; - int socklen_local=0, socklen_remote=0; struct accepting_socket *as = EVUTIL_UPCAST(o, struct accepting_socket, overlapped); - const struct win32_extension_fns *ext = - event_get_win32_extension_fns(); - EVUTIL_ASSERT(ext->GetAcceptExSockaddrs); EnterCriticalSection(&as->lock); if (ok) { @@ -443,14 +447,14 @@ accepted_socket_cb(struct event_overlapped *o, uintptr_t key, ev_ssize_t n, int event_base_get_deferred_cb_queue(as->lev->event_base), &as->deferred); LeaveCriticalSection(&as->lock); - } else if (free_on_cb) { + } else if (as->free_on_cb) { free_and_unlock_accepting_socket(as); } else if (as->s == INVALID_SOCKET) { /* This is okay; we were disabled by iocp_listener_disable. */ LeaveCriticalSection(&as->lock); } else { /* Some error on accept that we couldn't actually handle. */ - event_sock_warn(as->fd, "Unexpected error on AcceptEx"); + event_sock_warn(as->s, "Unexpected error on AcceptEx"); LeaveCriticalSection(&as->lock); /* XXXX recover better. */ } @@ -463,8 +467,8 @@ iocp_listener_enable(struct evconnlistener *lev) struct evconnlistener_iocp *lev_iocp = EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base); - EnterCriticalSection(&lev->lock); - for (i = 0; i < n_accepting; ++i) { + EnterCriticalSection(&lev_iocp->lock); + for (i = 0; i < lev_iocp->n_accepting; ++i) { struct accepting_socket *as = lev_iocp->accepting[i]; if (!as) continue; @@ -473,7 +477,7 @@ iocp_listener_enable(struct evconnlistener *lev) start_accepting(as); /* detect failure. */ LeaveCriticalSection(&as->lock); } - LeaveCriticalSection(&lev->lock); + LeaveCriticalSection(&lev_iocp->lock); return 0; } @@ -484,8 +488,8 @@ iocp_listener_disable_impl(struct evconnlistener *lev, int shutdown) struct evconnlistener_iocp *lev_iocp = EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base); - EnterCriticalSection(&lev->lock); - for (i = 0; i < n_accepting; ++i) { + EnterCriticalSection(&lev_iocp->lock); + for (i = 0; i < lev_iocp->n_accepting; ++i) { struct accepting_socket *as = lev_iocp->accepting[i]; if (!as) continue; @@ -497,12 +501,12 @@ iocp_listener_disable_impl(struct evconnlistener *lev, int shutdown) } LeaveCriticalSection(&as->lock); } - LeaveCriticalSection(&lev->lock); + LeaveCriticalSection(&lev_iocp->lock); return 0; } static int -iocp_listener_disable_impl(struct evconnlistener *lev) +iocp_listener_disable(struct evconnlistener *lev) { return iocp_listener_disable_impl(lev,0); } @@ -535,6 +539,9 @@ static const struct evconnlistener_ops evconnlistener_iocp_ops = { iocp_listener_getbase }; +/* XXX define some way to override this. */ +#define N_SOCKETS_PER_LISTENER 4 + struct evconnlistener * evconnlistener_new_async(struct event_base *base, evconnlistener_cb cb, void *ptr, unsigned flags, int backlog, @@ -543,6 +550,8 @@ evconnlistener_new_async(struct event_base *base, struct sockaddr_storage ss; int socklen = sizeof(ss); struct evconnlistener_iocp *lev; + int i; + if (!event_base_get_iocp(base)) return NULL; @@ -575,34 +584,39 @@ evconnlistener_new_async(struct event_base *base, if (event_iocp_port_associate(lev->port, fd, 1) < 0) return NULL; - lev->n_accepting = 1; - lev->accepting = mm_calloc(1, sizeof(struct accepting_socket *)); + InitializeCriticalSection(&lev->lock); + + lev->n_accepting = N_SOCKETS_PER_LISTENER; + lev->accepting = mm_calloc(lev->n_accepting, + sizeof(struct accepting_socket *)); if (!lev->accepting) { event_warn("calloc"); mm_free(lev); closesocket(fd); return NULL; } - lev->accepting[0] = new_accepting_socket(lev, ss.ss_family); - if (!lev->accepting[0]) { - event_warnx("Couldn't create accepting socket"); - mm_free(lev->accepting); - mm_free(lev); - closesocket(fd); - return NULL; - } + for (i = 0; i < lev->n_accepting; ++i) { + lev->accepting[i] = new_accepting_socket(lev, ss.ss_family); + if (!lev->accepting[i]) { + event_warnx("Couldn't create accepting socket"); + mm_free(lev->accepting); + /* DeleteCriticalSection on lev->lock XXXX */ + /* XXXX free the other elements. */ + mm_free(lev); + closesocket(fd); + return NULL; + } - InitializeCriticalSection(&lev->lock); - - if (start_accepting(lev->accepting[0]) < 0) { - event_warnx("Couldn't start accepting on socket"); - EnterCriticalSection(&lev->accepting[0]->lock); - free_and_unlock_accepting_socket(lev->accepting[0]); - mm_free(lev->accepting); - mm_free(lev); - closesocket(fd); - DeleteCriticalSection(&lev->lock); - return NULL; + if (start_accepting(lev->accepting[i]) < 0) { + event_warnx("Couldn't start accepting on socket"); + EnterCriticalSection(&lev->accepting[i]->lock); + free_and_unlock_accepting_socket(lev->accepting[i]); + mm_free(lev->accepting); + mm_free(lev); + closesocket(fd); + DeleteCriticalSection(&lev->lock); + return NULL; + } } return &lev->base; diff --git a/test/regress_listener.c b/test/regress_listener.c index 050a91a5..14e7a2b5 100644 --- a/test/regress_listener.c +++ b/test/regress_listener.c @@ -111,10 +111,10 @@ regress_pick_a_port(void *arg) evutil_socket_connect(&fd2, (struct sockaddr*)&ss1, slen1); evutil_socket_connect(&fd3, (struct sockaddr*)&ss2, slen2); - event_base_dispatch(base); #ifdef WIN32 - Sleep(1000); /* XXXX this is a stupid stopgap. */ + Sleep(100); /* XXXX this is a stupid stopgap. */ #endif + event_base_dispatch(base); tt_int_op(count1, ==, 0); tt_int_op(count2, ==, 0); @@ -138,7 +138,7 @@ struct testcase_t listener_testcases[] = { }; struct testcase_t listener_iocp_testcases[] = { - { "iocp/randport", regress_pick_a_port, + { "randport", regress_pick_a_port, TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, NULL},