Fix remaining AcceptEx issues.

svn:r1492
This commit is contained in:
Nick Mathewson 2009-11-02 20:59:13 +00:00
parent e794d716a3
commit 0aa6f5136a
2 changed files with 62 additions and 48 deletions

View File

@ -301,6 +301,7 @@ struct accepting_socket {
static void accepted_socket_cb(struct event_overlapped *o, uintptr_t key, static void accepted_socket_cb(struct event_overlapped *o, uintptr_t key,
ev_ssize_t n, int ok); ev_ssize_t n, int ok);
static void accepted_socket_invoke_user_cb(struct deferred_cb *cb, void *arg);
static struct accepting_socket * static struct accepting_socket *
new_accepting_socket(struct evconnlistener_iocp *lev, int family) new_accepting_socket(struct evconnlistener_iocp *lev, int family)
@ -327,6 +328,9 @@ new_accepting_socket(struct evconnlistener_iocp *lev, int family)
res->buflen = buflen; res->buflen = buflen;
res->family = family; res->family = family;
event_deferred_cb_init(&res->deferred,
accepted_socket_invoke_user_cb, res);
InitializeCriticalSection(&res->lock); InitializeCriticalSection(&res->lock);
return res; return res;
@ -386,7 +390,6 @@ start_accepting(struct accepting_socket *as)
} }
done: done:
LeaveCriticalSection(&as->lock);
return result; return result;
} }
@ -402,39 +405,40 @@ stop_accepting(struct accepting_socket *as)
static void static void
accepted_socket_invoke_user_cb(struct deferred_cb *cb, void *arg) accepted_socket_invoke_user_cb(struct deferred_cb *cb, void *arg)
{ {
struct *as = arg; struct accepting_socket *as = arg;
evconnlistener_cb cb;
struct sockaddr *sa_local=NULL, *sa_remote=NULL;
int socklen_local=0, socklen_remote=0;
const struct win32_extension_fns *ext =
event_get_win32_extension_fns();
EVUTIL_ASSERT(ext->GetAcceptExSockaddrs);
EnterCriticalSection(&as->lock); EnterCriticalSection(&as->lock);
if (as->free_on_cb) {
free_and_unlock_accepting_socket(as);
return;
}
ext->GetAcceptExSockaddrs(as->addrbuf, 0, ext->GetAcceptExSockaddrs(as->addrbuf, 0,
as->buflen/2, as->buflen/2, as->buflen/2, as->buflen/2,
&sa_local, &socklen_local, &sa_local, &socklen_local,
&sa_remote, &socklen_remote); &sa_remote, &socklen_remote);
/* XXXX should we/can we release the lock here? */
as->lev->base.cb(&as->lev->base, as->s, sa_remote, as->lev->base.cb(&as->lev->base, as->s, sa_remote,
socklen_remote, as->lev->base.user_data); socklen_remote, as->lev->base.user_data);
as->s = INVALID_SOCKET; as->s = INVALID_SOCKET;
if (as->free_on_cb) { start_accepting(as);/*XXX handle error */
free_and_unlock_accepting_socket(as); LeaveCriticalSection(&as->lock);
} else {
start_accepting(as);/*XXX handle error */
LeaveCriticalSection(&as->lock);
}
} }
static void static void
accepted_socket_cb(struct event_overlapped *o, uintptr_t key, ev_ssize_t n, int ok) accepted_socket_cb(struct event_overlapped *o, uintptr_t key, ev_ssize_t n, int ok)
{ {
struct sockaddr *sa_local=NULL, *sa_remote=NULL;
int socklen_local=0, socklen_remote=0;
struct accepting_socket *as = struct accepting_socket *as =
EVUTIL_UPCAST(o, struct accepting_socket, overlapped); EVUTIL_UPCAST(o, struct accepting_socket, overlapped);
const struct win32_extension_fns *ext =
event_get_win32_extension_fns();
EVUTIL_ASSERT(ext->GetAcceptExSockaddrs);
EnterCriticalSection(&as->lock); EnterCriticalSection(&as->lock);
if (ok) { if (ok) {
@ -443,14 +447,14 @@ accepted_socket_cb(struct event_overlapped *o, uintptr_t key, ev_ssize_t n, int
event_base_get_deferred_cb_queue(as->lev->event_base), event_base_get_deferred_cb_queue(as->lev->event_base),
&as->deferred); &as->deferred);
LeaveCriticalSection(&as->lock); LeaveCriticalSection(&as->lock);
} else if (free_on_cb) { } else if (as->free_on_cb) {
free_and_unlock_accepting_socket(as); free_and_unlock_accepting_socket(as);
} else if (as->s == INVALID_SOCKET) { } else if (as->s == INVALID_SOCKET) {
/* This is okay; we were disabled by iocp_listener_disable. */ /* This is okay; we were disabled by iocp_listener_disable. */
LeaveCriticalSection(&as->lock); LeaveCriticalSection(&as->lock);
} else { } else {
/* Some error on accept that we couldn't actually handle. */ /* Some error on accept that we couldn't actually handle. */
event_sock_warn(as->fd, "Unexpected error on AcceptEx"); event_sock_warn(as->s, "Unexpected error on AcceptEx");
LeaveCriticalSection(&as->lock); LeaveCriticalSection(&as->lock);
/* XXXX recover better. */ /* XXXX recover better. */
} }
@ -463,8 +467,8 @@ iocp_listener_enable(struct evconnlistener *lev)
struct evconnlistener_iocp *lev_iocp = struct evconnlistener_iocp *lev_iocp =
EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base); EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base);
EnterCriticalSection(&lev->lock); EnterCriticalSection(&lev_iocp->lock);
for (i = 0; i < n_accepting; ++i) { for (i = 0; i < lev_iocp->n_accepting; ++i) {
struct accepting_socket *as = lev_iocp->accepting[i]; struct accepting_socket *as = lev_iocp->accepting[i];
if (!as) if (!as)
continue; continue;
@ -473,7 +477,7 @@ iocp_listener_enable(struct evconnlistener *lev)
start_accepting(as); /* detect failure. */ start_accepting(as); /* detect failure. */
LeaveCriticalSection(&as->lock); LeaveCriticalSection(&as->lock);
} }
LeaveCriticalSection(&lev->lock); LeaveCriticalSection(&lev_iocp->lock);
return 0; return 0;
} }
@ -484,8 +488,8 @@ iocp_listener_disable_impl(struct evconnlistener *lev, int shutdown)
struct evconnlistener_iocp *lev_iocp = struct evconnlistener_iocp *lev_iocp =
EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base); EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base);
EnterCriticalSection(&lev->lock); EnterCriticalSection(&lev_iocp->lock);
for (i = 0; i < n_accepting; ++i) { for (i = 0; i < lev_iocp->n_accepting; ++i) {
struct accepting_socket *as = lev_iocp->accepting[i]; struct accepting_socket *as = lev_iocp->accepting[i];
if (!as) if (!as)
continue; continue;
@ -497,12 +501,12 @@ iocp_listener_disable_impl(struct evconnlistener *lev, int shutdown)
} }
LeaveCriticalSection(&as->lock); LeaveCriticalSection(&as->lock);
} }
LeaveCriticalSection(&lev->lock); LeaveCriticalSection(&lev_iocp->lock);
return 0; return 0;
} }
static int static int
iocp_listener_disable_impl(struct evconnlistener *lev) iocp_listener_disable(struct evconnlistener *lev)
{ {
return iocp_listener_disable_impl(lev,0); return iocp_listener_disable_impl(lev,0);
} }
@ -535,6 +539,9 @@ static const struct evconnlistener_ops evconnlistener_iocp_ops = {
iocp_listener_getbase iocp_listener_getbase
}; };
/* XXX define some way to override this. */
#define N_SOCKETS_PER_LISTENER 4
struct evconnlistener * struct evconnlistener *
evconnlistener_new_async(struct event_base *base, evconnlistener_new_async(struct event_base *base,
evconnlistener_cb cb, void *ptr, unsigned flags, int backlog, evconnlistener_cb cb, void *ptr, unsigned flags, int backlog,
@ -543,6 +550,8 @@ evconnlistener_new_async(struct event_base *base,
struct sockaddr_storage ss; struct sockaddr_storage ss;
int socklen = sizeof(ss); int socklen = sizeof(ss);
struct evconnlistener_iocp *lev; struct evconnlistener_iocp *lev;
int i;
if (!event_base_get_iocp(base)) if (!event_base_get_iocp(base))
return NULL; return NULL;
@ -575,34 +584,39 @@ evconnlistener_new_async(struct event_base *base,
if (event_iocp_port_associate(lev->port, fd, 1) < 0) if (event_iocp_port_associate(lev->port, fd, 1) < 0)
return NULL; return NULL;
lev->n_accepting = 1; InitializeCriticalSection(&lev->lock);
lev->accepting = mm_calloc(1, sizeof(struct accepting_socket *));
lev->n_accepting = N_SOCKETS_PER_LISTENER;
lev->accepting = mm_calloc(lev->n_accepting,
sizeof(struct accepting_socket *));
if (!lev->accepting) { if (!lev->accepting) {
event_warn("calloc"); event_warn("calloc");
mm_free(lev); mm_free(lev);
closesocket(fd); closesocket(fd);
return NULL; return NULL;
} }
lev->accepting[0] = new_accepting_socket(lev, ss.ss_family); for (i = 0; i < lev->n_accepting; ++i) {
if (!lev->accepting[0]) { lev->accepting[i] = new_accepting_socket(lev, ss.ss_family);
event_warnx("Couldn't create accepting socket"); if (!lev->accepting[i]) {
mm_free(lev->accepting); event_warnx("Couldn't create accepting socket");
mm_free(lev); mm_free(lev->accepting);
closesocket(fd); /* DeleteCriticalSection on lev->lock XXXX */
return NULL; /* XXXX free the other elements. */
} mm_free(lev);
closesocket(fd);
return NULL;
}
InitializeCriticalSection(&lev->lock); if (start_accepting(lev->accepting[i]) < 0) {
event_warnx("Couldn't start accepting on socket");
if (start_accepting(lev->accepting[0]) < 0) { EnterCriticalSection(&lev->accepting[i]->lock);
event_warnx("Couldn't start accepting on socket"); free_and_unlock_accepting_socket(lev->accepting[i]);
EnterCriticalSection(&lev->accepting[0]->lock); mm_free(lev->accepting);
free_and_unlock_accepting_socket(lev->accepting[0]); mm_free(lev);
mm_free(lev->accepting); closesocket(fd);
mm_free(lev); DeleteCriticalSection(&lev->lock);
closesocket(fd); return NULL;
DeleteCriticalSection(&lev->lock); }
return NULL;
} }
return &lev->base; return &lev->base;

View File

@ -111,10 +111,10 @@ regress_pick_a_port(void *arg)
evutil_socket_connect(&fd2, (struct sockaddr*)&ss1, slen1); evutil_socket_connect(&fd2, (struct sockaddr*)&ss1, slen1);
evutil_socket_connect(&fd3, (struct sockaddr*)&ss2, slen2); evutil_socket_connect(&fd3, (struct sockaddr*)&ss2, slen2);
event_base_dispatch(base);
#ifdef WIN32 #ifdef WIN32
Sleep(1000); /* XXXX this is a stupid stopgap. */ Sleep(100); /* XXXX this is a stupid stopgap. */
#endif #endif
event_base_dispatch(base);
tt_int_op(count1, ==, 0); tt_int_op(count1, ==, 0);
tt_int_op(count2, ==, 0); tt_int_op(count2, ==, 0);
@ -138,7 +138,7 @@ struct testcase_t listener_testcases[] = {
}; };
struct testcase_t listener_iocp_testcases[] = { struct testcase_t listener_iocp_testcases[] = {
{ "iocp/randport", regress_pick_a_port, { "randport", regress_pick_a_port,
TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP,
&basic_setup, NULL}, &basic_setup, NULL},