mirror of
https://github.com/cuberite/libevent.git
synced 2025-09-10 13:04:23 -04:00
Add a LEV_OPT_THREADSAFE option for threadsafe evconnlisteners
This commit is contained in:
parent
5b7a370636
commit
127d4f2195
@ -66,6 +66,9 @@ typedef void (*evconnlistener_errorcb)(struct evconnlistener *, void *);
|
|||||||
/** Flag: Indicates that we should disable the timeout (if any) between when
|
/** Flag: Indicates that we should disable the timeout (if any) between when
|
||||||
* this socket is closed and when we can listen again on the same port. */
|
* this socket is closed and when we can listen again on the same port. */
|
||||||
#define LEV_OPT_REUSEABLE (1u<<3)
|
#define LEV_OPT_REUSEABLE (1u<<3)
|
||||||
|
/** Flag: Indicates that the listener should be locked so it's safe to use
|
||||||
|
* from multiple threadcs at once. */
|
||||||
|
#define LEV_OPT_THREADSAFE (1u<<4)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
Allocate a new evconnlistener object to listen for incoming TCP connections
|
Allocate a new evconnlistener object to listen for incoming TCP connections
|
||||||
|
233
listener.c
233
listener.c
@ -51,6 +51,7 @@
|
|||||||
#include "mm-internal.h"
|
#include "mm-internal.h"
|
||||||
#include "util-internal.h"
|
#include "util-internal.h"
|
||||||
#include "log-internal.h"
|
#include "log-internal.h"
|
||||||
|
#include "evthread-internal.h"
|
||||||
#ifdef WIN32
|
#ifdef WIN32
|
||||||
#include "iocp-internal.h"
|
#include "iocp-internal.h"
|
||||||
#include "defer-internal.h"
|
#include "defer-internal.h"
|
||||||
@ -60,16 +61,19 @@ struct evconnlistener_ops {
|
|||||||
int (*enable)(struct evconnlistener *);
|
int (*enable)(struct evconnlistener *);
|
||||||
int (*disable)(struct evconnlistener *);
|
int (*disable)(struct evconnlistener *);
|
||||||
void (*destroy)(struct evconnlistener *);
|
void (*destroy)(struct evconnlistener *);
|
||||||
|
void (*shutdown)(struct evconnlistener *);
|
||||||
evutil_socket_t (*getfd)(struct evconnlistener *);
|
evutil_socket_t (*getfd)(struct evconnlistener *);
|
||||||
struct event_base *(*getbase)(struct evconnlistener *);
|
struct event_base *(*getbase)(struct evconnlistener *);
|
||||||
};
|
};
|
||||||
|
|
||||||
struct evconnlistener {
|
struct evconnlistener {
|
||||||
const struct evconnlistener_ops *ops;
|
const struct evconnlistener_ops *ops;
|
||||||
|
void *lock;
|
||||||
evconnlistener_cb cb;
|
evconnlistener_cb cb;
|
||||||
evconnlistener_errorcb errorcb;
|
evconnlistener_errorcb errorcb;
|
||||||
void *user_data;
|
void *user_data;
|
||||||
unsigned flags;
|
unsigned flags;
|
||||||
|
int refcnt;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct evconnlistener_event {
|
struct evconnlistener_event {
|
||||||
@ -83,12 +87,15 @@ struct evconnlistener_iocp {
|
|||||||
evutil_socket_t fd;
|
evutil_socket_t fd;
|
||||||
struct event_base *event_base;
|
struct event_base *event_base;
|
||||||
struct event_iocp_port *port;
|
struct event_iocp_port *port;
|
||||||
CRITICAL_SECTION lock;
|
short n_accepting;
|
||||||
int n_accepting;
|
short shutting_down;
|
||||||
struct accepting_socket **accepting;
|
struct accepting_socket **accepting;
|
||||||
};
|
};
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#define LOCK(listener) EVLOCK_LOCK((listener)->lock, 0)
|
||||||
|
#define UNLOCK(listener) EVLOCK_UNLOCK((listener)->lock, 0)
|
||||||
|
|
||||||
struct evconnlistener *
|
struct evconnlistener *
|
||||||
evconnlistener_new_async(struct event_base *base,
|
evconnlistener_new_async(struct event_base *base,
|
||||||
evconnlistener_cb cb, void *ptr, unsigned flags, int backlog,
|
evconnlistener_cb cb, void *ptr, unsigned flags, int backlog,
|
||||||
@ -100,10 +107,36 @@ static void event_listener_destroy(struct evconnlistener *);
|
|||||||
static evutil_socket_t event_listener_getfd(struct evconnlistener *);
|
static evutil_socket_t event_listener_getfd(struct evconnlistener *);
|
||||||
static struct event_base *event_listener_getbase(struct evconnlistener *);
|
static struct event_base *event_listener_getbase(struct evconnlistener *);
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
static void
|
||||||
|
listener_incref_and_lock(struct evconnlistener *listener)
|
||||||
|
{
|
||||||
|
LOCK(listener);
|
||||||
|
++listener->refcnt;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
static int
|
||||||
|
listener_decref_and_unlock(struct evconnlistener *listener)
|
||||||
|
{
|
||||||
|
int refcnt = --listener->refcnt;
|
||||||
|
if (refcnt == 0) {
|
||||||
|
listener->ops->destroy(listener);
|
||||||
|
UNLOCK(listener);
|
||||||
|
EVTHREAD_FREE_LOCK(listener->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
|
||||||
|
mm_free(listener);
|
||||||
|
return 1;
|
||||||
|
} else {
|
||||||
|
UNLOCK(listener);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static const struct evconnlistener_ops evconnlistener_event_ops = {
|
static const struct evconnlistener_ops evconnlistener_event_ops = {
|
||||||
event_listener_enable,
|
event_listener_enable,
|
||||||
event_listener_disable,
|
event_listener_disable,
|
||||||
event_listener_destroy,
|
event_listener_destroy,
|
||||||
|
NULL, /* shutdown */
|
||||||
event_listener_getfd,
|
event_listener_getfd,
|
||||||
event_listener_getbase
|
event_listener_getbase
|
||||||
};
|
};
|
||||||
@ -143,6 +176,11 @@ evconnlistener_new(struct event_base *base,
|
|||||||
lev->base.cb = cb;
|
lev->base.cb = cb;
|
||||||
lev->base.user_data = ptr;
|
lev->base.user_data = ptr;
|
||||||
lev->base.flags = flags;
|
lev->base.flags = flags;
|
||||||
|
lev->base.refcnt = 1;
|
||||||
|
|
||||||
|
if (flags & LEV_OPT_THREADSAFE) {
|
||||||
|
EVTHREAD_ALLOC_LOCK(lev->base.lock, EVTHREAD_LOCKTYPE_RECURSIVE);
|
||||||
|
}
|
||||||
|
|
||||||
event_assign(&lev->listener, base, fd, EV_READ|EV_PERSIST,
|
event_assign(&lev->listener, base, fd, EV_READ|EV_PERSIST,
|
||||||
listener_read_cb, lev);
|
listener_read_cb, lev);
|
||||||
@ -204,8 +242,12 @@ evconnlistener_new_bind(struct event_base *base, evconnlistener_cb cb,
|
|||||||
void
|
void
|
||||||
evconnlistener_free(struct evconnlistener *lev)
|
evconnlistener_free(struct evconnlistener *lev)
|
||||||
{
|
{
|
||||||
lev->ops->destroy(lev);
|
LOCK(lev);
|
||||||
mm_free(lev);
|
lev->cb = NULL;
|
||||||
|
lev->errorcb = NULL;
|
||||||
|
if (lev->ops->shutdown)
|
||||||
|
lev->ops->shutdown(lev);
|
||||||
|
listener_decref_and_unlock(lev);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@ -223,13 +265,21 @@ event_listener_destroy(struct evconnlistener *lev)
|
|||||||
int
|
int
|
||||||
evconnlistener_enable(struct evconnlistener *lev)
|
evconnlistener_enable(struct evconnlistener *lev)
|
||||||
{
|
{
|
||||||
return lev->ops->enable(lev);
|
int r;
|
||||||
|
LOCK(lev);
|
||||||
|
r = lev->ops->enable(lev);
|
||||||
|
UNLOCK(lev);
|
||||||
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
evconnlistener_disable(struct evconnlistener *lev)
|
evconnlistener_disable(struct evconnlistener *lev)
|
||||||
{
|
{
|
||||||
return lev->ops->disable(lev);
|
int r;
|
||||||
|
LOCK(lev);
|
||||||
|
r = lev->ops->disable(lev);
|
||||||
|
UNLOCK(lev);
|
||||||
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
@ -251,7 +301,11 @@ event_listener_disable(struct evconnlistener *lev)
|
|||||||
evutil_socket_t
|
evutil_socket_t
|
||||||
evconnlistener_get_fd(struct evconnlistener *lev)
|
evconnlistener_get_fd(struct evconnlistener *lev)
|
||||||
{
|
{
|
||||||
return lev->ops->getfd(lev);
|
evutil_socket_t fd;
|
||||||
|
LOCK(lev);
|
||||||
|
fd = lev->ops->getfd(lev);
|
||||||
|
UNLOCK(lev);
|
||||||
|
return fd;
|
||||||
}
|
}
|
||||||
|
|
||||||
static evutil_socket_t
|
static evutil_socket_t
|
||||||
@ -265,7 +319,11 @@ event_listener_getfd(struct evconnlistener *lev)
|
|||||||
struct event_base *
|
struct event_base *
|
||||||
evconnlistener_get_base(struct evconnlistener *lev)
|
evconnlistener_get_base(struct evconnlistener *lev)
|
||||||
{
|
{
|
||||||
return lev->ops->getbase(lev);
|
struct event_base *base;
|
||||||
|
LOCK(lev);
|
||||||
|
base = lev->ops->getbase(lev);
|
||||||
|
UNLOCK(lev);
|
||||||
|
return base;
|
||||||
}
|
}
|
||||||
|
|
||||||
static struct event_base *
|
static struct event_base *
|
||||||
@ -279,7 +337,9 @@ event_listener_getbase(struct evconnlistener *lev)
|
|||||||
void evconnlistener_set_error_cb(struct evconnlistener *lev,
|
void evconnlistener_set_error_cb(struct evconnlistener *lev,
|
||||||
evconnlistener_errorcb errorcb)
|
evconnlistener_errorcb errorcb)
|
||||||
{
|
{
|
||||||
|
LOCK(lev);
|
||||||
lev->errorcb = errorcb;
|
lev->errorcb = errorcb;
|
||||||
|
UNLOCK(lev);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@ -287,6 +347,10 @@ listener_read_cb(evutil_socket_t fd, short what, void *p)
|
|||||||
{
|
{
|
||||||
struct evconnlistener *lev = p;
|
struct evconnlistener *lev = p;
|
||||||
int err;
|
int err;
|
||||||
|
evconnlistener_cb cb;
|
||||||
|
evconnlistener_errorcb errorcb;
|
||||||
|
void *user_data;
|
||||||
|
LOCK(lev);
|
||||||
while (1) {
|
while (1) {
|
||||||
struct sockaddr_storage ss;
|
struct sockaddr_storage ss;
|
||||||
#ifdef WIN32
|
#ifdef WIN32
|
||||||
@ -301,16 +365,40 @@ listener_read_cb(evutil_socket_t fd, short what, void *p)
|
|||||||
if (!(lev->flags & LEV_OPT_LEAVE_SOCKETS_BLOCKING))
|
if (!(lev->flags & LEV_OPT_LEAVE_SOCKETS_BLOCKING))
|
||||||
evutil_make_socket_nonblocking(new_fd);
|
evutil_make_socket_nonblocking(new_fd);
|
||||||
|
|
||||||
lev->cb(lev, new_fd, (struct sockaddr*)&ss, (int)socklen,
|
if (lev->cb == NULL) {
|
||||||
lev->user_data);
|
UNLOCK(lev);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
++lev->refcnt;
|
||||||
|
cb = lev->cb;
|
||||||
|
user_data = lev->user_data;
|
||||||
|
UNLOCK(lev);
|
||||||
|
cb(lev, new_fd, (struct sockaddr*)&ss, (int)socklen,
|
||||||
|
user_data);
|
||||||
|
LOCK(lev);
|
||||||
|
if (lev->refcnt == 1) {
|
||||||
|
int freed = listener_decref_and_unlock(lev);
|
||||||
|
EVUTIL_ASSERT(freed);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
--lev->refcnt;
|
||||||
}
|
}
|
||||||
err = evutil_socket_geterror(fd);
|
err = evutil_socket_geterror(fd);
|
||||||
if (EVUTIL_ERR_ACCEPT_RETRIABLE(err))
|
if (EVUTIL_ERR_ACCEPT_RETRIABLE(err)) {
|
||||||
|
UNLOCK(lev);
|
||||||
return;
|
return;
|
||||||
if (lev->errorcb != NULL)
|
}
|
||||||
lev->errorcb(lev, lev->user_data);
|
if (lev->errorcb != NULL) {
|
||||||
else
|
++lev->refcnt;
|
||||||
|
errorcb = lev->errorcb;
|
||||||
|
user_data = lev->user_data;
|
||||||
|
UNLOCK(lev);
|
||||||
|
errorcb(lev, user_data);
|
||||||
|
LOCK(lev);
|
||||||
|
listener_decref_and_unlock(lev);
|
||||||
|
} else {
|
||||||
event_sock_warn(fd, "Error from accept() call");
|
event_sock_warn(fd, "Error from accept() call");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef WIN32
|
#ifdef WIN32
|
||||||
@ -318,6 +406,7 @@ struct accepting_socket {
|
|||||||
CRITICAL_SECTION lock;
|
CRITICAL_SECTION lock;
|
||||||
struct event_overlapped overlapped;
|
struct event_overlapped overlapped;
|
||||||
SOCKET s;
|
SOCKET s;
|
||||||
|
int error;
|
||||||
struct deferred_cb deferred;
|
struct deferred_cb deferred;
|
||||||
struct evconnlistener_iocp *lev;
|
struct evconnlistener_iocp *lev;
|
||||||
ev_uint8_t buflen;
|
ev_uint8_t buflen;
|
||||||
@ -382,8 +471,11 @@ start_accepting(struct accepting_socket *as)
|
|||||||
const struct win32_extension_fns *ext = event_get_win32_extension_fns();
|
const struct win32_extension_fns *ext = event_get_win32_extension_fns();
|
||||||
DWORD pending = 0;
|
DWORD pending = 0;
|
||||||
SOCKET s = socket(as->family, SOCK_STREAM, 0);
|
SOCKET s = socket(as->family, SOCK_STREAM, 0);
|
||||||
if (s == INVALID_SOCKET)
|
int error = 0;
|
||||||
return -1;
|
if (s == INVALID_SOCKET) {
|
||||||
|
error = WSAGetLastError();
|
||||||
|
goto report_err;
|
||||||
|
}
|
||||||
|
|
||||||
setsockopt(s, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
|
setsockopt(s, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
|
||||||
(char *)&as->lev->fd, sizeof(&as->lev->fd));
|
(char *)&as->lev->fd, sizeof(&as->lev->fd));
|
||||||
@ -404,14 +496,20 @@ start_accepting(struct accepting_socket *as)
|
|||||||
/* Immediate success! */
|
/* Immediate success! */
|
||||||
accepted_socket_cb(&as->overlapped, 1, 0, 1);
|
accepted_socket_cb(&as->overlapped, 1, 0, 1);
|
||||||
} else {
|
} else {
|
||||||
int err = WSAGetLastError();
|
error = WSAGetLastError();
|
||||||
if (err != ERROR_IO_PENDING) {
|
if (error != ERROR_IO_PENDING) {
|
||||||
event_warnx("AcceptEx: %s", evutil_socket_error_to_string(err));
|
goto report_err;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
report_err:
|
||||||
|
as->error = error;
|
||||||
|
event_deferred_cb_schedule(
|
||||||
|
event_base_get_deferred_cb_queue(as->lev->event_base),
|
||||||
|
&as->deferred);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@ -424,32 +522,63 @@ stop_accepting(struct accepting_socket *as)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
accepted_socket_invoke_user_cb(struct deferred_cb *cb, void *arg)
|
accepted_socket_invoke_user_cb(struct deferred_cb *dcb, void *arg)
|
||||||
{
|
{
|
||||||
struct accepting_socket *as = arg;
|
struct accepting_socket *as = arg;
|
||||||
|
|
||||||
struct sockaddr *sa_local=NULL, *sa_remote=NULL;
|
struct sockaddr *sa_local=NULL, *sa_remote=NULL;
|
||||||
int socklen_local=0, socklen_remote=0;
|
int socklen_local=0, socklen_remote=0;
|
||||||
const struct win32_extension_fns *ext = event_get_win32_extension_fns();
|
const struct win32_extension_fns *ext = event_get_win32_extension_fns();
|
||||||
|
struct evconnlistener *lev = &as->lev->base;
|
||||||
|
evutil_socket_t sock=-1;
|
||||||
|
void *data;
|
||||||
|
evconnlistener_cb cb=NULL;
|
||||||
|
evconnlistener_errorcb errorcb=NULL;
|
||||||
|
int error;
|
||||||
|
|
||||||
EVUTIL_ASSERT(ext->GetAcceptExSockaddrs);
|
EVUTIL_ASSERT(ext->GetAcceptExSockaddrs);
|
||||||
|
|
||||||
|
LOCK(lev);
|
||||||
EnterCriticalSection(&as->lock);
|
EnterCriticalSection(&as->lock);
|
||||||
if (as->free_on_cb) {
|
if (as->free_on_cb) {
|
||||||
free_and_unlock_accepting_socket(as);
|
free_and_unlock_accepting_socket(as);
|
||||||
|
listener_decref_and_unlock(lev);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
ext->GetAcceptExSockaddrs(
|
++lev->refcnt;
|
||||||
as->addrbuf, 0, as->buflen/2, as->buflen/2,
|
|
||||||
&sa_local, &socklen_local, &sa_remote, &socklen_remote);
|
|
||||||
|
|
||||||
as->lev->base.cb(&as->lev->base, as->s, sa_remote,
|
error = as->error;
|
||||||
socklen_remote, as->lev->base.user_data);
|
if (error) {
|
||||||
|
as->error = 0;
|
||||||
|
errorcb = lev->errorcb;
|
||||||
|
} else {
|
||||||
|
ext->GetAcceptExSockaddrs(
|
||||||
|
as->addrbuf, 0, as->buflen/2, as->buflen/2,
|
||||||
|
&sa_local, &socklen_local, &sa_remote,
|
||||||
|
&socklen_remote);
|
||||||
|
sock = as->s;
|
||||||
|
cb = lev->cb;
|
||||||
|
as->s = INVALID_SOCKET;
|
||||||
|
}
|
||||||
|
data = lev->user_data;
|
||||||
|
|
||||||
as->s = INVALID_SOCKET;
|
LeaveCriticalSection(&as->lock);
|
||||||
|
UNLOCK(lev);
|
||||||
|
|
||||||
start_accepting(as); /* XXXX handle error */
|
if (errorcb) {
|
||||||
|
WSASetLastError(error);
|
||||||
|
errorcb(lev, data);
|
||||||
|
} else {
|
||||||
|
cb(lev, sock, sa_remote, socklen_remote, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOCK(lev);
|
||||||
|
if (listener_decref_and_unlock(lev))
|
||||||
|
return;
|
||||||
|
|
||||||
|
EnterCriticalSection(&as->lock);
|
||||||
|
start_accepting(as);
|
||||||
LeaveCriticalSection(&as->lock);
|
LeaveCriticalSection(&as->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -459,6 +588,7 @@ accepted_socket_cb(struct event_overlapped *o, ev_uintptr_t key, ev_ssize_t n, i
|
|||||||
struct accepting_socket *as =
|
struct accepting_socket *as =
|
||||||
EVUTIL_UPCAST(o, struct accepting_socket, overlapped);
|
EVUTIL_UPCAST(o, struct accepting_socket, overlapped);
|
||||||
|
|
||||||
|
LOCK(&as->lev->base);
|
||||||
EnterCriticalSection(&as->lock);
|
EnterCriticalSection(&as->lock);
|
||||||
if (ok) {
|
if (ok) {
|
||||||
/* XXXX Don't do this if some EV_MT flag is set. */
|
/* XXXX Don't do this if some EV_MT flag is set. */
|
||||||
@ -467,16 +597,32 @@ accepted_socket_cb(struct event_overlapped *o, ev_uintptr_t key, ev_ssize_t n, i
|
|||||||
&as->deferred);
|
&as->deferred);
|
||||||
LeaveCriticalSection(&as->lock);
|
LeaveCriticalSection(&as->lock);
|
||||||
} else if (as->free_on_cb) {
|
} else if (as->free_on_cb) {
|
||||||
|
struct evconnlistener *lev = &as->lev->base;
|
||||||
free_and_unlock_accepting_socket(as);
|
free_and_unlock_accepting_socket(as);
|
||||||
|
listener_decref_and_unlock(lev);
|
||||||
|
return;
|
||||||
} else if (as->s == INVALID_SOCKET) {
|
} else if (as->s == INVALID_SOCKET) {
|
||||||
/* This is okay; we were disabled by iocp_listener_disable. */
|
/* This is okay; we were disabled by iocp_listener_disable. */
|
||||||
LeaveCriticalSection(&as->lock);
|
LeaveCriticalSection(&as->lock);
|
||||||
} else {
|
} else {
|
||||||
/* Some error on accept that we couldn't actually handle. */
|
/* Some error on accept that we couldn't actually handle. */
|
||||||
|
BOOL ok;
|
||||||
|
DWORD transfer = 0, flags=0;
|
||||||
event_sock_warn(as->s, "Unexpected error on AcceptEx");
|
event_sock_warn(as->s, "Unexpected error on AcceptEx");
|
||||||
|
ok = WSAGetOverlappedResult(as->s, &o->overlapped,
|
||||||
|
&transfer, FALSE, &flags);
|
||||||
|
if (ok) {
|
||||||
|
/* well, that was confusing! */
|
||||||
|
as->error = 1;
|
||||||
|
} else {
|
||||||
|
as->error = WSAGetLastError();
|
||||||
|
}
|
||||||
|
event_deferred_cb_schedule(
|
||||||
|
event_base_get_deferred_cb_queue(as->lev->event_base),
|
||||||
|
&as->deferred);
|
||||||
LeaveCriticalSection(&as->lock);
|
LeaveCriticalSection(&as->lock);
|
||||||
/* XXXX send error to user */
|
|
||||||
}
|
}
|
||||||
|
UNLOCK(&as->lev->base);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
@ -486,17 +632,17 @@ iocp_listener_enable(struct evconnlistener *lev)
|
|||||||
struct evconnlistener_iocp *lev_iocp =
|
struct evconnlistener_iocp *lev_iocp =
|
||||||
EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base);
|
EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base);
|
||||||
|
|
||||||
EnterCriticalSection(&lev_iocp->lock);
|
LOCK(lev);
|
||||||
for (i = 0; i < lev_iocp->n_accepting; ++i) {
|
for (i = 0; i < lev_iocp->n_accepting; ++i) {
|
||||||
struct accepting_socket *as = lev_iocp->accepting[i];
|
struct accepting_socket *as = lev_iocp->accepting[i];
|
||||||
if (!as)
|
if (!as)
|
||||||
continue;
|
continue;
|
||||||
EnterCriticalSection(&as->lock);
|
EnterCriticalSection(&as->lock);
|
||||||
if (!as->free_on_cb && as->s == INVALID_SOCKET)
|
if (!as->free_on_cb && as->s == INVALID_SOCKET)
|
||||||
start_accepting(as); /* XXXX handle error */
|
start_accepting(as);
|
||||||
LeaveCriticalSection(&as->lock);
|
LeaveCriticalSection(&as->lock);
|
||||||
}
|
}
|
||||||
LeaveCriticalSection(&lev_iocp->lock);
|
UNLOCK(lev);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -507,7 +653,7 @@ iocp_listener_disable_impl(struct evconnlistener *lev, int shutdown)
|
|||||||
struct evconnlistener_iocp *lev_iocp =
|
struct evconnlistener_iocp *lev_iocp =
|
||||||
EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base);
|
EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base);
|
||||||
|
|
||||||
EnterCriticalSection(&lev_iocp->lock);
|
LOCK(lev);
|
||||||
for (i = 0; i < lev_iocp->n_accepting; ++i) {
|
for (i = 0; i < lev_iocp->n_accepting; ++i) {
|
||||||
struct accepting_socket *as = lev_iocp->accepting[i];
|
struct accepting_socket *as = lev_iocp->accepting[i];
|
||||||
if (!as)
|
if (!as)
|
||||||
@ -520,7 +666,7 @@ iocp_listener_disable_impl(struct evconnlistener *lev, int shutdown)
|
|||||||
}
|
}
|
||||||
LeaveCriticalSection(&as->lock);
|
LeaveCriticalSection(&as->lock);
|
||||||
}
|
}
|
||||||
LeaveCriticalSection(&lev_iocp->lock);
|
UNLOCK(lev);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -529,10 +675,18 @@ iocp_listener_disable(struct evconnlistener *lev)
|
|||||||
{
|
{
|
||||||
return iocp_listener_disable_impl(lev,0);
|
return iocp_listener_disable_impl(lev,0);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
iocp_listener_destroy(struct evconnlistener *lev)
|
iocp_listener_destroy(struct evconnlistener *lev)
|
||||||
{
|
{
|
||||||
iocp_listener_disable_impl(lev,1);
|
struct evconnlistener_iocp *lev_iocp =
|
||||||
|
EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base);
|
||||||
|
|
||||||
|
if (! lev_iocp->shutting_down) {
|
||||||
|
lev_iocp->shutting_down = 1;
|
||||||
|
iocp_listener_disable_impl(lev,1);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static evutil_socket_t
|
static evutil_socket_t
|
||||||
@ -554,6 +708,7 @@ static const struct evconnlistener_ops evconnlistener_iocp_ops = {
|
|||||||
iocp_listener_enable,
|
iocp_listener_enable,
|
||||||
iocp_listener_disable,
|
iocp_listener_disable,
|
||||||
iocp_listener_destroy,
|
iocp_listener_destroy,
|
||||||
|
iocp_listener_destroy, /* shutdown */
|
||||||
iocp_listener_getfd,
|
iocp_listener_getfd,
|
||||||
iocp_listener_getbase
|
iocp_listener_getbase
|
||||||
};
|
};
|
||||||
@ -571,6 +726,8 @@ evconnlistener_new_async(struct event_base *base,
|
|||||||
struct evconnlistener_iocp *lev;
|
struct evconnlistener_iocp *lev;
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
|
flags |= LEV_OPT_THREADSAFE;
|
||||||
|
|
||||||
if (!base || !event_base_get_iocp(base))
|
if (!base || !event_base_get_iocp(base))
|
||||||
goto err;
|
goto err;
|
||||||
|
|
||||||
@ -595,6 +752,7 @@ evconnlistener_new_async(struct event_base *base,
|
|||||||
lev->base.cb = cb;
|
lev->base.cb = cb;
|
||||||
lev->base.user_data = ptr;
|
lev->base.user_data = ptr;
|
||||||
lev->base.flags = flags;
|
lev->base.flags = flags;
|
||||||
|
lev->base.refcnt = 1;
|
||||||
|
|
||||||
lev->port = event_base_get_iocp(base);
|
lev->port = event_base_get_iocp(base);
|
||||||
lev->fd = fd;
|
lev->fd = fd;
|
||||||
@ -603,7 +761,7 @@ evconnlistener_new_async(struct event_base *base,
|
|||||||
if (event_iocp_port_associate(lev->port, fd, 1) < 0)
|
if (event_iocp_port_associate(lev->port, fd, 1) < 0)
|
||||||
goto err_free_lev;
|
goto err_free_lev;
|
||||||
|
|
||||||
InitializeCriticalSectionAndSpinCount(&lev->lock, 1000);
|
EVTHREAD_ALLOC_LOCK(lev->base.lock, EVTHREAD_LOCKTYPE_RECURSIVE);
|
||||||
|
|
||||||
lev->n_accepting = N_SOCKETS_PER_LISTENER;
|
lev->n_accepting = N_SOCKETS_PER_LISTENER;
|
||||||
lev->accepting = mm_calloc(lev->n_accepting,
|
lev->accepting = mm_calloc(lev->n_accepting,
|
||||||
@ -624,6 +782,7 @@ evconnlistener_new_async(struct event_base *base,
|
|||||||
free_and_unlock_accepting_socket(lev->accepting[i]);
|
free_and_unlock_accepting_socket(lev->accepting[i]);
|
||||||
goto err_free_accepting;
|
goto err_free_accepting;
|
||||||
}
|
}
|
||||||
|
++lev->base.refcnt;
|
||||||
}
|
}
|
||||||
|
|
||||||
return &lev->base;
|
return &lev->base;
|
||||||
@ -632,7 +791,7 @@ err_free_accepting:
|
|||||||
mm_free(lev->accepting);
|
mm_free(lev->accepting);
|
||||||
/* XXXX free the other elements. */
|
/* XXXX free the other elements. */
|
||||||
err_delete_lock:
|
err_delete_lock:
|
||||||
DeleteCriticalSection(&lev->lock);
|
EVTHREAD_FREE_LOCK(lev->base.lock, EVTHREAD_LOCKTYPE_RECURSIVE);
|
||||||
err_free_lev:
|
err_free_lev:
|
||||||
mm_free(lev);
|
mm_free(lev);
|
||||||
err:
|
err:
|
||||||
|
@ -77,6 +77,10 @@ regress_pick_a_port(void *arg)
|
|||||||
|
|
||||||
evutil_socket_t fd1 = -1, fd2 = -1, fd3 = -1;
|
evutil_socket_t fd1 = -1, fd2 = -1, fd3 = -1;
|
||||||
|
|
||||||
|
if (data->setup_data && strstr((char*)data->setup_data, "ts")) {
|
||||||
|
flags |= LEV_OPT_THREADSAFE;
|
||||||
|
}
|
||||||
|
|
||||||
memset(&sin, 0, sizeof(sin));
|
memset(&sin, 0, sizeof(sin));
|
||||||
sin.sin_family = AF_INET;
|
sin.sin_family = AF_INET;
|
||||||
sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
|
sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
|
||||||
@ -148,13 +152,18 @@ regress_listener_error(void *arg)
|
|||||||
struct event_base *base = data->base;
|
struct event_base *base = data->base;
|
||||||
struct evconnlistener *listener = NULL;
|
struct evconnlistener *listener = NULL;
|
||||||
int count = 1;
|
int count = 1;
|
||||||
|
unsigned int flags = LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE;
|
||||||
|
|
||||||
|
if (data->setup_data && strstr((char*)data->setup_data, "ts")) {
|
||||||
|
flags |= LEV_OPT_THREADSAFE;
|
||||||
|
}
|
||||||
|
|
||||||
/* send, so that pair[0] will look 'readable'*/
|
/* send, so that pair[0] will look 'readable'*/
|
||||||
send(data->pair[1], "hello", 5, 0);
|
send(data->pair[1], "hello", 5, 0);
|
||||||
|
|
||||||
/* Start a listener with a bogus socket. */
|
/* Start a listener with a bogus socket. */
|
||||||
listener = evconnlistener_new(base, acceptcb, &count,
|
listener = evconnlistener_new(base, acceptcb, &count,
|
||||||
LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, 0,
|
flags, 0,
|
||||||
data->pair[0]);
|
data->pair[0]);
|
||||||
tt_assert(listener);
|
tt_assert(listener);
|
||||||
|
|
||||||
@ -166,7 +175,8 @@ regress_listener_error(void *arg)
|
|||||||
tt_int_op(count,==,1000); /* set by error cb */
|
tt_int_op(count,==,1000); /* set by error cb */
|
||||||
|
|
||||||
end:
|
end:
|
||||||
evconnlistener_free(listener);
|
if (listener)
|
||||||
|
evconnlistener_free(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
struct testcase_t listener_testcases[] = {
|
struct testcase_t listener_testcases[] = {
|
||||||
@ -174,10 +184,17 @@ struct testcase_t listener_testcases[] = {
|
|||||||
{ "randport", regress_pick_a_port, TT_FORK|TT_NEED_BASE,
|
{ "randport", regress_pick_a_port, TT_FORK|TT_NEED_BASE,
|
||||||
&basic_setup, NULL},
|
&basic_setup, NULL},
|
||||||
|
|
||||||
|
{ "randport_ts", regress_pick_a_port, TT_FORK|TT_NEED_BASE,
|
||||||
|
&basic_setup, (char*)"ts"},
|
||||||
|
|
||||||
{ "error", regress_listener_error,
|
{ "error", regress_listener_error,
|
||||||
TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR,
|
TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR,
|
||||||
&basic_setup, NULL},
|
&basic_setup, NULL},
|
||||||
|
|
||||||
|
{ "error_ts", regress_listener_error,
|
||||||
|
TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR,
|
||||||
|
&basic_setup, (char*)"ts"},
|
||||||
|
|
||||||
END_OF_TESTCASES,
|
END_OF_TESTCASES,
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -187,7 +204,8 @@ struct testcase_t listener_iocp_testcases[] = {
|
|||||||
&basic_setup, NULL},
|
&basic_setup, NULL},
|
||||||
|
|
||||||
{ "error", regress_listener_error,
|
{ "error", regress_listener_error,
|
||||||
TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR|TT_ENABLE_IOCP,
|
TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR|TT_ENABLE_IOCP
|
||||||
|
|TT_SKIP/*Remove once err-handling on IOCP listeners is ok*/,
|
||||||
&basic_setup, NULL},
|
&basic_setup, NULL},
|
||||||
|
|
||||||
END_OF_TESTCASES,
|
END_OF_TESTCASES,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user