Clean up acceptex code some more: add locking, single-threading, enable/disable.

svn:r1491
This commit is contained in:
Nick Mathewson 2009-11-02 20:20:40 +00:00
parent a84c87d76a
commit e794d716a3
2 changed files with 101 additions and 25 deletions

View File

@ -55,6 +55,7 @@
#include "log-internal.h" #include "log-internal.h"
#ifdef WIN32 #ifdef WIN32
#include "iocp-internal.h" #include "iocp-internal.h"
#include "defer-internal.h"
#endif #endif
struct evconnlistener_ops { struct evconnlistener_ops {
@ -83,6 +84,7 @@ struct evconnlistener_iocp {
evutil_socket_t fd; evutil_socket_t fd;
struct event_base *event_base; struct event_base *event_base;
struct event_iocp_port *port; struct event_iocp_port *port;
CRITICAL_SECTION lock;
int n_accepting; int n_accepting;
struct accepting_socket **accepting; struct accepting_socket **accepting;
}; };
@ -289,6 +291,7 @@ struct accepting_socket {
CRITICAL_SECTION lock; CRITICAL_SECTION lock;
struct event_overlapped overlapped; struct event_overlapped overlapped;
SOCKET s; SOCKET s;
struct deferred_cb deferred;
struct evconnlistener_iocp *lev; struct evconnlistener_iocp *lev;
ev_uint8_t buflen; ev_uint8_t buflen;
ev_uint8_t family; ev_uint8_t family;
@ -344,6 +347,7 @@ free_and_unlock_accepting_socket(struct accepting_socket *as)
static int static int
start_accepting(struct accepting_socket *as) start_accepting(struct accepting_socket *as)
{ {
/* requires lock */
int result = -1; int result = -1;
const struct win32_extension_fns *ext = const struct win32_extension_fns *ext =
event_get_win32_extension_fns(); event_get_win32_extension_fns();
@ -386,21 +390,44 @@ done:
return result; return result;
} }
#if 0
static void static void
stop_accepting(struct accepting_socket *as) stop_accepting(struct accepting_socket *as)
{ {
/* XXX */ /* requires lock. */
SOCKET s = as->s;
as->s = INVALID_SOCKET;
closesocket(s);
}
static void
accepted_socket_invoke_user_cb(struct deferred_cb *cb, void *arg)
{
struct *as = arg;
evconnlistener_cb cb;
EnterCriticalSection(&as->lock);
ext->GetAcceptExSockaddrs(as->addrbuf, 0,
as->buflen/2, as->buflen/2,
&sa_local, &socklen_local,
&sa_remote, &socklen_remote);
/* XXXX should we/can we release the lock here? */
as->lev->base.cb(&as->lev->base, as->s, sa_remote,
socklen_remote, as->lev->base.user_data);
as->s = INVALID_SOCKET;
if (as->free_on_cb) {
free_and_unlock_accepting_socket(as);
} else {
start_accepting(as);/*XXX handle error */
LeaveCriticalSection(&as->lock);
}
} }
#endif
static void static void
accepted_socket_cb(struct event_overlapped *o, uintptr_t key, ev_ssize_t n, int ok) accepted_socket_cb(struct event_overlapped *o, uintptr_t key, ev_ssize_t n, int ok)
{ {
/* Run this whole thing deferred unless some MT flag is set */
/* XXX needs locking. */
/* XXX use ok */
struct sockaddr *sa_local=NULL, *sa_remote=NULL; struct sockaddr *sa_local=NULL, *sa_remote=NULL;
int socklen_local=0, socklen_remote=0; int socklen_local=0, socklen_remote=0;
struct accepting_socket *as = struct accepting_socket *as =
@ -409,38 +436,82 @@ accepted_socket_cb(struct event_overlapped *o, uintptr_t key, ev_ssize_t n, int
event_get_win32_extension_fns(); event_get_win32_extension_fns();
EVUTIL_ASSERT(ext->GetAcceptExSockaddrs); EVUTIL_ASSERT(ext->GetAcceptExSockaddrs);
ext->GetAcceptExSockaddrs(as->addrbuf, 0, EnterCriticalSection(&as->lock);
as->buflen/2, as->buflen/2, if (ok) {
&sa_local, &socklen_local, /* XXXX Don't do this if some EV_MT flag is set. */
&sa_remote, &socklen_remote); event_deferred_cb_schedule(
event_base_get_deferred_cb_queue(as->lev->event_base),
as->lev->base.cb(&as->lev->base, as->s, sa_remote, socklen_remote, &as->deferred);
as->lev->base.user_data); LeaveCriticalSection(&as->lock);
} else if (free_on_cb) {
as->s = INVALID_SOCKET; free_and_unlock_accepting_socket(as);
} else if (as->s == INVALID_SOCKET) {
/* Avoid stack overflow XXXX */ /* This is okay; we were disabled by iocp_listener_disable. */
start_accepting(as); LeaveCriticalSection(&as->lock);
} else {
/* Some error on accept that we couldn't actually handle. */
event_sock_warn(as->fd, "Unexpected error on AcceptEx");
LeaveCriticalSection(&as->lock);
/* XXXX recover better. */
}
} }
static int static int
iocp_listener_enable(struct evconnlistener *lev) iocp_listener_enable(struct evconnlistener *lev)
{ {
/* XXXX */ int i;
struct evconnlistener_iocp *lev_iocp =
EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base);
EnterCriticalSection(&lev->lock);
for (i = 0; i < n_accepting; ++i) {
struct accepting_socket *as = lev_iocp->accepting[i];
if (!as)
continue;
EnterCriticalSection(&as->lock);
if (!as->free_on_cb && as->s == INVALID_SOCKET)
start_accepting(as); /* detect failure. */
LeaveCriticalSection(&as->lock);
}
LeaveCriticalSection(&lev->lock);
return 0; return 0;
} }
static int static int
iocp_listener_disable(struct evconnlistener *lev) iocp_listener_disable_impl(struct evconnlistener *lev, int shutdown)
{ {
/* XXXX */ int i;
struct evconnlistener_iocp *lev_iocp =
EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base);
EnterCriticalSection(&lev->lock);
for (i = 0; i < n_accepting; ++i) {
struct accepting_socket *as = lev_iocp->accepting[i];
if (!as)
continue;
EnterCriticalSection(&as->lock);
if (!as->free_on_cb && as->s != INVALID_SOCKET) {
if (shutdown)
as->free_on_cb = 1;
stop_accepting(as);
}
LeaveCriticalSection(&as->lock);
}
LeaveCriticalSection(&lev->lock);
return 0; return 0;
} }
static int
iocp_listener_disable_impl(struct evconnlistener *lev)
{
return iocp_listener_disable_impl(lev,0);
}
static void static void
iocp_listener_destroy(struct evconnlistener *lev) iocp_listener_destroy(struct evconnlistener *lev)
{ {
/* XXXX */ iocp_listener_disable_impl(lev,1);
} }
static evutil_socket_t static evutil_socket_t
iocp_listener_getfd(struct evconnlistener *lev) iocp_listener_getfd(struct evconnlistener *lev)
{ {
@ -521,6 +592,8 @@ evconnlistener_new_async(struct event_base *base,
return NULL; return NULL;
} }
InitializeCriticalSection(&lev->lock);
if (start_accepting(lev->accepting[0]) < 0) { if (start_accepting(lev->accepting[0]) < 0) {
event_warnx("Couldn't start accepting on socket"); event_warnx("Couldn't start accepting on socket");
EnterCriticalSection(&lev->accepting[0]->lock); EnterCriticalSection(&lev->accepting[0]->lock);
@ -528,6 +601,7 @@ evconnlistener_new_async(struct event_base *base,
mm_free(lev->accepting); mm_free(lev->accepting);
mm_free(lev); mm_free(lev);
closesocket(fd); closesocket(fd);
DeleteCriticalSection(&lev->lock);
return NULL; return NULL;
} }

View File

@ -112,7 +112,9 @@ regress_pick_a_port(void *arg)
evutil_socket_connect(&fd3, (struct sockaddr*)&ss2, slen2); evutil_socket_connect(&fd3, (struct sockaddr*)&ss2, slen2);
event_base_dispatch(base); event_base_dispatch(base);
// Sleep(2000); #ifdef WIN32
Sleep(1000); /* XXXX this is a stupid stopgap. */
#endif
tt_int_op(count1, ==, 0); tt_int_op(count1, ==, 0);
tt_int_op(count2, ==, 0); tt_int_op(count2, ==, 0);