Merge branch '21_evport_improved'

This commit is contained in:
Nick Mathewson 2011-05-30 12:10:26 -04:00
commit 5926d601bb
2 changed files with 85 additions and 117 deletions

197
evport.c
View File

@ -73,20 +73,8 @@
#include "evsignal-internal.h" #include "evsignal-internal.h"
#include "evmap-internal.h" #include "evmap-internal.h"
/* #define INITIAL_EVENTS_PER_GETN 8
* Default value for ed_nevents, which is the maximum file descriptor number we #define MAX_EVENTS_PER_GETN 4096
* can handle. If an event comes in for a file descriptor F > nevents, we will
* grow the array of file descriptors, doubling its size.
*/
#define DEFAULT_NFDS 16
/*
* EVENTS_PER_GETN is the maximum number of events to retrieve from port_getn on
* any particular call. You can speed things up by increasing this, but it will
* (obviously) require more memory.
*/
#define EVENTS_PER_GETN 8
/* /*
* Per-file-descriptor information about what events we're subscribed to. These * Per-file-descriptor information about what events we're subscribed to. These
@ -94,7 +82,12 @@
*/ */
struct fd_info { struct fd_info {
short fdi_what; /* combinations of EV_READ and EV_WRITE */ /* combinations of EV_READ and EV_WRITE */
short fdi_what;
/* Index of this fd within ed_pending, plus 1. Zero if this fd is
* not in ed_pending. (The +1 is a hack so that memset(0) will set
* it to a nil index. */
int pending_idx_plus_1;
}; };
#define FDI_HAS_READ(fdi) ((fdi)->fdi_what & EV_READ) #define FDI_HAS_READ(fdi) ((fdi)->fdi_what & EV_READ)
@ -105,10 +98,15 @@ struct fd_info {
struct evport_data { struct evport_data {
int ed_port; /* event port for system events */ int ed_port; /* event port for system events */
int ed_nevents; /* number of allocated fdi's */ /* How many elements of ed_pending should we look at? */
struct fd_info *ed_fds; /* allocated fdi table */ int ed_npending;
/* How many elements are allocated in ed_pending and pevtlist? */
int ed_maxevents;
/* fdi's that we need to reassoc */ /* fdi's that we need to reassoc */
int ed_pending[EVENTS_PER_GETN]; /* fd's with pending events */ int *ed_pending;
/* storage space for incoming events. */
port_event_t *ed_pevtlist;
}; };
static void* evport_init(struct event_base *); static void* evport_init(struct event_base *);
@ -116,6 +114,7 @@ static int evport_add(struct event_base *, int fd, short old, short events, void
static int evport_del(struct event_base *, int fd, short old, short events, void *); static int evport_del(struct event_base *, int fd, short old, short events, void *);
static int evport_dispatch(struct event_base *, struct timeval *); static int evport_dispatch(struct event_base *, struct timeval *);
static void evport_dealloc(struct event_base *); static void evport_dealloc(struct event_base *);
static int grow(struct evport_data *, int min_events);
const struct eventop evportops = { const struct eventop evportops = {
"evport", "evport",
@ -126,7 +125,7 @@ const struct eventop evportops = {
evport_dealloc, evport_dealloc,
1, /* need reinit */ 1, /* need reinit */
0, /* features */ 0, /* features */
0, /* fdinfo length */ sizeof(struct fd_info), /* fdinfo length */
}; };
/* /*
@ -137,7 +136,6 @@ static void*
evport_init(struct event_base *base) evport_init(struct event_base *base)
{ {
struct evport_data *evpd; struct evport_data *evpd;
int i;
if (!(evpd = mm_calloc(1, sizeof(struct evport_data)))) if (!(evpd = mm_calloc(1, sizeof(struct evport_data))))
return (NULL); return (NULL);
@ -147,24 +145,47 @@ evport_init(struct event_base *base)
return (NULL); return (NULL);
} }
/* if (grow(evpd, INITIAL_EVENTS_PER_GETN) < 0) {
* Initialize file descriptor structure
*/
evpd->ed_fds = mm_calloc(DEFAULT_NFDS, sizeof(struct fd_info));
if (evpd->ed_fds == NULL) {
close(evpd->ed_port); close(evpd->ed_port);
mm_free(evpd); mm_free(evpd);
return (NULL); return NULL;
} }
evpd->ed_nevents = DEFAULT_NFDS;
for (i = 0; i < EVENTS_PER_GETN; i++) evpd->ed_npending = 0;
evpd->ed_pending[i] = -1;
evsig_init(base); evsig_init(base);
return (evpd); return (evpd);
} }
static int
grow(struct evport_data *data, int min_events)
{
int newsize;
int *new_pending;
port_event_t *new_pevtlist;
if (data->ed_maxevents) {
newsize = data->ed_maxevents;
do {
newsize *= 2;
} while (newsize < min_events);
} else {
newsize = min_events;
}
new_pending = mm_realloc(data->ed_pending, sizeof(int)*newsize);
if (new_pending == NULL)
return -1;
data->ed_pending = new_pending;
new_pevtlist = mm_realloc(data->ed_pevtlist, sizeof(port_event_t)*newsize);
if (new_pevtlist == NULL)
return -1;
data->ed_pevtlist = new_pevtlist;
data->ed_maxevents = newsize;
return 0;
}
#ifdef CHECK_INVARIANTS #ifdef CHECK_INVARIANTS
/* /*
* Checks some basic properties about the evport_data structure. Because it * Checks some basic properties about the evport_data structure. Because it
@ -176,9 +197,7 @@ static void
check_evportop(struct evport_data *evpd) check_evportop(struct evport_data *evpd)
{ {
EVUTIL_ASSERT(evpd); EVUTIL_ASSERT(evpd);
EVUTIL_ASSERT(evpd->ed_nevents > 0);
EVUTIL_ASSERT(evpd->ed_port > 0); EVUTIL_ASSERT(evpd->ed_port > 0);
EVUTIL_ASSERT(evpd->ed_fds > 0);
} }
/* /*
@ -194,7 +213,6 @@ check_event(port_event_t* pevt)
* PORT_SOURCE_FD. * PORT_SOURCE_FD.
*/ */
EVUTIL_ASSERT(pevt->portev_source == PORT_SOURCE_FD); EVUTIL_ASSERT(pevt->portev_source == PORT_SOURCE_FD);
EVUTIL_ASSERT(pevt->portev_user == NULL);
} }
#else #else
@ -202,33 +220,6 @@ check_event(port_event_t* pevt)
#define check_event(pevt) #define check_event(pevt)
#endif /* CHECK_INVARIANTS */ #endif /* CHECK_INVARIANTS */
/*
* Doubles the size of the allocated file descriptor array.
*/
static int
grow(struct evport_data *epdp, int factor)
{
struct fd_info *tmp;
int oldsize = epdp->ed_nevents;
int newsize = factor * oldsize;
EVUTIL_ASSERT(factor > 1);
check_evportop(epdp);
tmp = mm_realloc(epdp->ed_fds, sizeof(struct fd_info) * newsize);
if (NULL == tmp)
return -1;
epdp->ed_fds = tmp;
memset((char*) (epdp->ed_fds + oldsize), 0,
(newsize - oldsize)*sizeof(struct fd_info));
epdp->ed_nevents = newsize;
check_evportop(epdp);
return 0;
}
/* /*
* (Re)associates the given file descriptor with the event port. The OS events * (Re)associates the given file descriptor with the event port. The OS events
* are specified (implicitly) from the fd_info struct. * are specified (implicitly) from the fd_info struct.
@ -240,7 +231,7 @@ reassociate(struct evport_data *epdp, struct fd_info *fdip, int fd)
if (sysevents != 0) { if (sysevents != 0) {
if (port_associate(epdp->ed_port, PORT_SOURCE_FD, if (port_associate(epdp->ed_port, PORT_SOURCE_FD,
fd, sysevents, NULL) == -1) { fd, sysevents, fdip) == -1) {
event_warn("port_associate"); event_warn("port_associate");
return (-1); return (-1);
} }
@ -261,12 +252,12 @@ evport_dispatch(struct event_base *base, struct timeval *tv)
{ {
int i, res; int i, res;
struct evport_data *epdp = base->evbase; struct evport_data *epdp = base->evbase;
port_event_t pevtlist[EVENTS_PER_GETN]; port_event_t *pevtlist = epdp->ed_pevtlist;
/* /*
* port_getn will block until it has at least nevents events. It will * port_getn will block until it has at least nevents events. It will
* also return how many it's given us (which may be more than we asked * also return how many it's given us (which may be more than we asked
* for, as long as it's less than our maximum (EVENTS_PER_GETN)) in * for, as long as it's less than our maximum (ed_maxevents)) in
* nevents. * nevents.
*/ */
int nevents = 1; int nevents = 1;
@ -289,22 +280,25 @@ evport_dispatch(struct event_base *base, struct timeval *tv)
* last time which need reassociation. See comment at the end of the * last time which need reassociation. See comment at the end of the
* loop below. * loop below.
*/ */
for (i = 0; i < EVENTS_PER_GETN; ++i) { for (i = 0; i < epdp->ed_npending; ++i) {
struct fd_info *fdi = NULL; struct fd_info *fdi = NULL;
if (epdp->ed_pending[i] != -1) { const int fd = epdp->ed_pending[i];
fdi = &(epdp->ed_fds[epdp->ed_pending[i]]); if (fd != -1) {
/* We might have cleared out this event; we need
* to be sure that it's still set. */
fdi = evmap_io_get_fdinfo(&base->io, fd);
} }
if (fdi != NULL && FDI_HAS_EVENTS(fdi)) { if (fdi != NULL && FDI_HAS_EVENTS(fdi)) {
int fd = epdp->ed_pending[i];
reassociate(epdp, fdi, fd); reassociate(epdp, fdi, fd);
epdp->ed_pending[i] = -1; // epdp->ed_pending[i] = -1;
fdi->pending_idx_plus_1 = 0;
} }
} }
EVBASE_RELEASE_LOCK(base, th_base_lock); EVBASE_RELEASE_LOCK(base, th_base_lock);
res = port_getn(epdp->ed_port, pevtlist, EVENTS_PER_GETN, res = port_getn(epdp->ed_port, pevtlist, epdp->ed_maxevents,
(unsigned int *) &nevents, ts_p); (unsigned int *) &nevents, ts_p);
EVBASE_ACQUIRE_LOCK(base, th_base_lock); EVBASE_ACQUIRE_LOCK(base, th_base_lock);
@ -324,13 +318,15 @@ evport_dispatch(struct event_base *base, struct timeval *tv)
event_debug(("%s: port_getn reports %d events", __func__, nevents)); event_debug(("%s: port_getn reports %d events", __func__, nevents));
for (i = 0; i < nevents; ++i) { for (i = 0; i < nevents; ++i) {
struct fd_info *fdi;
port_event_t *pevt = &pevtlist[i]; port_event_t *pevt = &pevtlist[i];
int fd = (int) pevt->portev_object; int fd = (int) pevt->portev_object;
struct fd_info *fdi = pevt->portev_user;
//EVUTIL_ASSERT(evmap_io_get_fdinfo(&base->io, fd) == fdi);
check_evportop(epdp); check_evportop(epdp);
check_event(pevt); check_event(pevt);
epdp->ed_pending[i] = fd; epdp->ed_pending[i] = fd;
fdi->pending_idx_plus_1 = i + 1;
/* /*
* Figure out what kind of event it was * Figure out what kind of event it was
@ -352,11 +348,16 @@ evport_dispatch(struct event_base *base, struct timeval *tv)
if (pevt->portev_events & (POLLERR|POLLHUP|POLLNVAL)) if (pevt->portev_events & (POLLERR|POLLHUP|POLLNVAL))
res |= EV_READ|EV_WRITE; res |= EV_READ|EV_WRITE;
EVUTIL_ASSERT(epdp->ed_nevents > fd);
fdi = &(epdp->ed_fds[fd]);
evmap_io_active(base, fd, res); evmap_io_active(base, fd, res);
} /* end of all events gotten */ } /* end of all events gotten */
epdp->ed_npending = nevents;
if (nevents == epdp->ed_maxevents &&
epdp->ed_maxevents < MAX_EVENTS_PER_GETN) {
/* we used all the space this time. We should be ready
* for more events next time around. */
grow(epdp, epdp->ed_maxevents * 2);
}
check_evportop(epdp); check_evportop(epdp);
@ -373,27 +374,10 @@ static int
evport_add(struct event_base *base, int fd, short old, short events, void *p) evport_add(struct event_base *base, int fd, short old, short events, void *p)
{ {
struct evport_data *evpd = base->evbase; struct evport_data *evpd = base->evbase;
struct fd_info *fdi; struct fd_info *fdi = p;
int factor;
(void)p;
check_evportop(evpd); check_evportop(evpd);
/*
* If necessary, grow the file descriptor info table
*/
factor = 1;
while (fd >= factor * evpd->ed_nevents)
factor *= 2;
if (factor > 1) {
if (-1 == grow(evpd, factor)) {
return (-1);
}
}
fdi = &evpd->ed_fds[fd];
fdi->fdi_what |= events; fdi->fdi_what |= events;
return reassociate(evpd, fdi, fd); return reassociate(evpd, fdi, fd);
@ -407,29 +391,12 @@ static int
evport_del(struct event_base *base, int fd, short old, short events, void *p) evport_del(struct event_base *base, int fd, short old, short events, void *p)
{ {
struct evport_data *evpd = base->evbase; struct evport_data *evpd = base->evbase;
struct fd_info *fdi; struct fd_info *fdi = p;
int i; int associated = ! fdi->pending_idx_plus_1;
int associated = 1;
(void)p;
check_evportop(evpd); check_evportop(evpd);
if (evpd->ed_nevents < fd) { fdi->fdi_what &= ~(events &(EV_READ|EV_WRITE));
return (-1);
}
for (i = 0; i < EVENTS_PER_GETN; ++i) {
if (evpd->ed_pending[i] == fd) {
associated = 0;
break;
}
}
fdi = &evpd->ed_fds[fd];
if (events & EV_READ)
fdi->fdi_what &= ~EV_READ;
if (events & EV_WRITE)
fdi->fdi_what &= ~EV_WRITE;
if (associated) { if (associated) {
if (!FDI_HAS_EVENTS(fdi) && if (!FDI_HAS_EVENTS(fdi) &&
@ -449,7 +416,10 @@ evport_del(struct event_base *base, int fd, short old, short events, void *p)
} }
} else { } else {
if ((fdi->fdi_what & (EV_READ|EV_WRITE)) == 0) { if ((fdi->fdi_what & (EV_READ|EV_WRITE)) == 0) {
const int i = fdi->pending_idx_plus_1 - 1;
EVUTIL_ASSERT(evpd->ed_pending[i] == fd);
evpd->ed_pending[i] = -1; evpd->ed_pending[i] = -1;
fdi->pending_idx_plus_1 = 0;
} }
} }
return 0; return 0;
@ -465,7 +435,10 @@ evport_dealloc(struct event_base *base)
close(evpd->ed_port); close(evpd->ed_port);
if (evpd->ed_fds) if (evpd->ed_pending)
mm_free(evpd->ed_fds); mm_free(evpd->ed_pending);
if (evpd->ed_pevtlist)
mm_free(evpd->ed_pevtlist);
mm_free(evpd); mm_free(evpd);
} }

View File

@ -2218,7 +2218,6 @@ test_many_events(void *arg)
int called[MANY]; int called[MANY];
int i; int i;
int loopflags = EVLOOP_NONBLOCK, evflags=0; int loopflags = EVLOOP_NONBLOCK, evflags=0;
const int is_evport = !strcmp(event_base_get_method(base),"evport");
if (one_at_a_time) { if (one_at_a_time) {
loopflags |= EVLOOP_ONCE; loopflags |= EVLOOP_ONCE;
evflags = EV_PERSIST; evflags = EV_PERSIST;
@ -2227,10 +2226,6 @@ test_many_events(void *arg)
memset(sock, 0xff, sizeof(sock)); memset(sock, 0xff, sizeof(sock));
memset(ev, 0, sizeof(ev)); memset(ev, 0, sizeof(ev));
memset(called, 0, sizeof(called)); memset(called, 0, sizeof(called));
if (is_evport && one_at_a_time) {
TT_DECLARE("NOTE", ("evport can't pass this in 2.0; skipping\n"));
tt_skip();
}
for (i = 0; i < MANY; ++i) { for (i = 0; i < MANY; ++i) {
/* We need an event that will hit the backend, and that will /* We need an event that will hit the backend, and that will