Merge branch '21_event_callback_v3'

This commit is contained in:
Nick Mathewson 2012-06-28 11:36:52 -04:00
commit 8a739b3492
19 changed files with 671 additions and 336 deletions

View File

@ -142,7 +142,7 @@
static void evbuffer_chain_align(struct evbuffer_chain *chain); static void evbuffer_chain_align(struct evbuffer_chain *chain);
static int evbuffer_chain_should_realign(struct evbuffer_chain *chain, static int evbuffer_chain_should_realign(struct evbuffer_chain *chain,
size_t datalen); size_t datalen);
static void evbuffer_deferred_callback(struct deferred_cb *cb, void *arg); static void evbuffer_deferred_callback(struct event_callback *cb, void *arg);
static int evbuffer_ptr_memcmp(const struct evbuffer *buf, static int evbuffer_ptr_memcmp(const struct evbuffer *buf,
const struct evbuffer_ptr *pos, const char *mem, size_t len); const struct evbuffer_ptr *pos, const char *mem, size_t len);
static struct evbuffer_chain *evbuffer_expand_singlechain(struct evbuffer *buf, static struct evbuffer_chain *evbuffer_expand_singlechain(struct evbuffer *buf,
@ -402,9 +402,10 @@ int
evbuffer_defer_callbacks(struct evbuffer *buffer, struct event_base *base) evbuffer_defer_callbacks(struct evbuffer *buffer, struct event_base *base)
{ {
EVBUFFER_LOCK(buffer); EVBUFFER_LOCK(buffer);
buffer->cb_queue = event_base_get_deferred_cb_queue_(base); buffer->cb_queue = base;
buffer->deferred_cbs = 1; buffer->deferred_cbs = 1;
event_deferred_cb_init_(&buffer->deferred, event_deferred_cb_init_(&buffer->deferred,
event_base_get_npriorities(base) / 2,
evbuffer_deferred_callback, buffer); evbuffer_deferred_callback, buffer);
EVBUFFER_UNLOCK(buffer); EVBUFFER_UNLOCK(buffer);
return 0; return 0;
@ -509,20 +510,19 @@ evbuffer_invoke_callbacks_(struct evbuffer *buffer)
} }
if (buffer->deferred_cbs) { if (buffer->deferred_cbs) {
if (buffer->deferred.queued) if (event_deferred_cb_schedule_(buffer->cb_queue, &buffer->deferred)) {
return; evbuffer_incref_and_lock_(buffer);
evbuffer_incref_and_lock_(buffer); if (buffer->parent)
if (buffer->parent) bufferevent_incref_(buffer->parent);
bufferevent_incref_(buffer->parent); }
EVBUFFER_UNLOCK(buffer); EVBUFFER_UNLOCK(buffer);
event_deferred_cb_schedule_(buffer->cb_queue, &buffer->deferred);
} }
evbuffer_run_callbacks(buffer, 0); evbuffer_run_callbacks(buffer, 0);
} }
static void static void
evbuffer_deferred_callback(struct deferred_cb *cb, void *arg) evbuffer_deferred_callback(struct event_callback *cb, void *arg)
{ {
struct bufferevent *parent = NULL; struct bufferevent *parent = NULL;
struct evbuffer *buffer = arg; struct evbuffer *buffer = arg;

View File

@ -31,6 +31,7 @@ extern "C" {
#endif #endif
#include "event2/event-config.h" #include "event2/event-config.h"
#include "event2/event_struct.h"
#include "evconfig-private.h" #include "evconfig-private.h"
#include "event2/util.h" #include "event2/util.h"
#include "defer-internal.h" #include "defer-internal.h"
@ -182,7 +183,7 @@ struct bufferevent_private {
int dns_error; int dns_error;
/** Used to implement deferred callbacks */ /** Used to implement deferred callbacks */
struct deferred_cb deferred; struct event_callback deferred;
/** The options this bufferevent was constructed with */ /** The options this bufferevent was constructed with */
enum bufferevent_options options; enum bufferevent_options options;

View File

@ -131,7 +131,7 @@ bufferevent_inbuf_wm_cb(struct evbuffer *buf,
} }
static void static void
bufferevent_run_deferred_callbacks_locked(struct deferred_cb *cb, void *arg) bufferevent_run_deferred_callbacks_locked(struct event_callback *cb, void *arg)
{ {
struct bufferevent_private *bufev_private = arg; struct bufferevent_private *bufev_private = arg;
struct bufferevent *bufev = &bufev_private->bev; struct bufferevent *bufev = &bufev_private->bev;
@ -164,7 +164,7 @@ bufferevent_run_deferred_callbacks_locked(struct deferred_cb *cb, void *arg)
} }
static void static void
bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *cb, void *arg) bufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg)
{ {
struct bufferevent_private *bufev_private = arg; struct bufferevent_private *bufev_private = arg;
struct bufferevent *bufev = &bufev_private->bev; struct bufferevent *bufev = &bufev_private->bev;
@ -210,10 +210,10 @@ bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *cb, void *arg)
#define SCHEDULE_DEFERRED(bevp) \ #define SCHEDULE_DEFERRED(bevp) \
do { \ do { \
bufferevent_incref_(&(bevp)->bev); \ if (event_deferred_cb_schedule_( \
event_deferred_cb_schedule_( \ (bevp)->bev.ev_base, \
event_base_get_deferred_cb_queue_((bevp)->bev.ev_base), \ &(bevp)->deferred)) \
&(bevp)->deferred); \ bufferevent_incref_(&(bevp)->bev); \
} while (0) } while (0)
@ -227,8 +227,7 @@ bufferevent_run_readcb_(struct bufferevent *bufev)
return; return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) { if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->readcb_pending = 1; p->readcb_pending = 1;
if (!p->deferred.queued) SCHEDULE_DEFERRED(p);
SCHEDULE_DEFERRED(p);
} else { } else {
bufev->readcb(bufev, bufev->cbarg); bufev->readcb(bufev, bufev->cbarg);
} }
@ -244,8 +243,7 @@ bufferevent_run_writecb_(struct bufferevent *bufev)
return; return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) { if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->writecb_pending = 1; p->writecb_pending = 1;
if (!p->deferred.queued) SCHEDULE_DEFERRED(p);
SCHEDULE_DEFERRED(p);
} else { } else {
bufev->writecb(bufev, bufev->cbarg); bufev->writecb(bufev, bufev->cbarg);
} }
@ -262,8 +260,7 @@ bufferevent_run_eventcb_(struct bufferevent *bufev, short what)
if (p->options & BEV_OPT_DEFER_CALLBACKS) { if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->eventcb_pending |= what; p->eventcb_pending |= what;
p->errno_pending = EVUTIL_SOCKET_ERROR(); p->errno_pending = EVUTIL_SOCKET_ERROR();
if (!p->deferred.queued) SCHEDULE_DEFERRED(p);
SCHEDULE_DEFERRED(p);
} else { } else {
bufev->errorcb(bufev, what, bufev->cbarg); bufev->errorcb(bufev, what, bufev->cbarg);
} }
@ -326,11 +323,15 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private,
} }
if (options & BEV_OPT_DEFER_CALLBACKS) { if (options & BEV_OPT_DEFER_CALLBACKS) {
if (options & BEV_OPT_UNLOCK_CALLBACKS) if (options & BEV_OPT_UNLOCK_CALLBACKS)
event_deferred_cb_init_(&bufev_private->deferred, event_deferred_cb_init_(
&bufev_private->deferred,
event_base_get_npriorities(base) / 2,
bufferevent_run_deferred_callbacks_unlocked, bufferevent_run_deferred_callbacks_unlocked,
bufev_private); bufev_private);
else else
event_deferred_cb_init_(&bufev_private->deferred, event_deferred_cb_init_(
&bufev_private->deferred,
event_base_get_npriorities(base) / 2,
bufferevent_run_deferred_callbacks_locked, bufferevent_run_deferred_callbacks_locked,
bufev_private); bufev_private);
} }
@ -396,6 +397,16 @@ bufferevent_get_base(struct bufferevent *bufev)
return bufev->ev_base; return bufev->ev_base;
} }
int
bufferevent_get_priority(struct bufferevent *bufev)
{
if (event_initialized(&bufev->ev_read)) {
return event_get_priority(&bufev->ev_read);
} else {
return event_base_get_npriorities(bufev->ev_base) / 2;
}
}
int int
bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
{ {

View File

@ -260,8 +260,9 @@ be_pair_disable(struct bufferevent *bev, short events)
if (events & EV_READ) { if (events & EV_READ) {
BEV_DEL_GENERIC_READ_TIMEOUT(bev); BEV_DEL_GENERIC_READ_TIMEOUT(bev);
} }
if (events & EV_WRITE) if (events & EV_WRITE) {
BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
}
return 0; return 0;
} }

View File

@ -636,6 +636,8 @@ int
bufferevent_priority_set(struct bufferevent *bufev, int priority) bufferevent_priority_set(struct bufferevent *bufev, int priority)
{ {
int r = -1; int r = -1;
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev); BEV_LOCK(bufev);
if (bufev->be_ops != &bufferevent_ops_socket) if (bufev->be_ops != &bufferevent_ops_socket)
@ -646,6 +648,8 @@ bufferevent_priority_set(struct bufferevent *bufev, int priority)
if (event_priority_set(&bufev->ev_write, priority) == -1) if (event_priority_set(&bufev->ev_write, priority) == -1)
goto done; goto done;
event_deferred_cb_set_priority_(&bufev_p->deferred, priority);
r = 0; r = 0;
done: done:
BEV_UNLOCK(bufev); BEV_UNLOCK(bufev);

View File

@ -35,72 +35,36 @@ extern "C" {
#include <sys/queue.h> #include <sys/queue.h>
struct deferred_cb; struct event_callback;
typedef void (*deferred_cb_fn)(struct event_callback *, void *);
typedef void (*deferred_cb_fn)(struct deferred_cb *, void *);
/** A deferred_cb is a callback that can be scheduled to run as part of
* an event_base's event_loop, rather than running immediately. */
struct deferred_cb {
/** Links to the adjacent active (pending) deferred_cb objects. */
TAILQ_ENTRY (deferred_cb) cb_next;
/** True iff this deferred_cb is pending in an event_base. */
unsigned queued : 1;
/** The function to execute when the callback runs. */
deferred_cb_fn cb;
/** The function's second argument. */
void *arg;
};
/** A deferred_cb_queue is a list of deferred_cb that we can add to and run. */
struct deferred_cb_queue {
/** Lock used to protect the queue. */
void *lock;
/** Which event_base does this queue associate itself with?
* (Used for timing) */
struct event_base *base;
/** How many entries are in the queue? */
int active_count;
/** Function called when adding to the queue from another thread. */
void (*notify_fn)(struct deferred_cb_queue *, void *);
void *notify_arg;
/** Deferred callback management: a list of deferred callbacks to
* run active the active events. */
TAILQ_HEAD (deferred_cb_list, deferred_cb) deferred_cb_list;
};
/** /**
Initialize an empty, non-pending deferred_cb. Initialize an empty, non-pending event_callback.
@param deferred The deferred_cb structure to initialize. @param deferred The struct event_callback structure to initialize.
@param cb The function to run when the deferred_cb executes. @param priority The priority that the callback should run at.
@param cb The function to run when the struct event_callback executes.
@param arg The function's second argument. @param arg The function's second argument.
*/ */
void event_deferred_cb_init_(struct deferred_cb *, deferred_cb_fn, void *); void event_deferred_cb_init_(struct event_callback *, ev_uint8_t, deferred_cb_fn, void *);
/** /**
Cancel a deferred_cb if it is currently scheduled in an event_base. Change the priority of a non-pending event_callback.
*/ */
void event_deferred_cb_cancel_(struct deferred_cb_queue *, struct deferred_cb *); void event_deferred_cb_set_priority_(struct event_callback *, ev_uint8_t);
/** /**
Activate a deferred_cb if it is not currently scheduled in an event_base. Cancel a struct event_callback if it is currently scheduled in an event_base.
*/ */
void event_deferred_cb_schedule_(struct deferred_cb_queue *, struct deferred_cb *); void event_deferred_cb_cancel_(struct event_base *, struct event_callback *);
/**
Activate a struct event_callback if it is not currently scheduled in an event_base.
#define LOCK_DEFERRED_QUEUE(q) \ Return true iff it was not previously scheduled.
EVLOCK_LOCK((q)->lock, 0) */
#define UNLOCK_DEFERRED_QUEUE(q) \ int event_deferred_cb_schedule_(struct event_base *, struct event_callback *);
EVLOCK_UNLOCK((q)->lock, 0)
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
void event_deferred_cb_queue_init_(struct deferred_cb_queue *);
struct deferred_cb_queue *event_base_get_deferred_cb_queue_(struct event_base *);
#endif /* EVENT_INTERNAL_H_INCLUDED_ */ #endif /* EVENT_INTERNAL_H_INCLUDED_ */

View File

@ -34,6 +34,7 @@ extern "C" {
#include "event2/event-config.h" #include "event2/event-config.h"
#include "evconfig-private.h" #include "evconfig-private.h"
#include "event2/util.h" #include "event2/util.h"
#include "event2/event_struct.h"
#include "util-internal.h" #include "util-internal.h"
#include "defer-internal.h" #include "defer-internal.h"
@ -134,7 +135,7 @@ struct evbuffer {
ev_uint32_t flags; ev_uint32_t flags;
/** Used to implement deferred callbacks. */ /** Used to implement deferred callbacks. */
struct deferred_cb_queue *cb_queue; struct event_base *cb_queue;
/** A reference count on this evbuffer. When the reference count /** A reference count on this evbuffer. When the reference count
* reaches 0, the buffer is destroyed. Manipulated with * reaches 0, the buffer is destroyed. Manipulated with
@ -142,9 +143,9 @@ struct evbuffer {
* evbuffer_free. */ * evbuffer_free. */
int refcnt; int refcnt;
/** A deferred_cb handle to make all of this buffer's callbacks /** A struct event_callback handle to make all of this buffer's callbacks
* invoked from the event loop. */ * invoked from the event loop. */
struct deferred_cb deferred; struct event_callback deferred;
/** A doubly-linked-list of callback functions */ /** A doubly-linked-list of callback functions */
LIST_HEAD(evbuffer_cb_queue, evbuffer_cb_entry) callbacks; LIST_HEAD(evbuffer_cb_queue, evbuffer_cb_entry) callbacks;

11
evdns.c
View File

@ -754,7 +754,7 @@ evdns_requests_pump_waiting_queue(struct evdns_base *base) {
/* TODO(nickm) document */ /* TODO(nickm) document */
struct deferred_reply_callback { struct deferred_reply_callback {
struct deferred_cb deferred; struct event_callback deferred;
struct evdns_request *handle; struct evdns_request *handle;
u8 request_type; u8 request_type;
u8 have_reply; u8 have_reply;
@ -765,7 +765,7 @@ struct deferred_reply_callback {
}; };
static void static void
reply_run_callback(struct deferred_cb *d, void *user_pointer) reply_run_callback(struct event_callback *d, void *user_pointer)
{ {
struct deferred_reply_callback *cb = struct deferred_reply_callback *cb =
EVUTIL_UPCAST(d, struct deferred_reply_callback, deferred); EVUTIL_UPCAST(d, struct deferred_reply_callback, deferred);
@ -836,10 +836,13 @@ reply_schedule_callback(struct request *const req, u32 ttl, u32 err, struct repl
d->handle = req->handle; d->handle = req->handle;
} }
event_deferred_cb_init_(&d->deferred, reply_run_callback, event_deferred_cb_init_(
&d->deferred,
event_get_priority(&req->timeout_event),
reply_run_callback,
req->user_pointer); req->user_pointer);
event_deferred_cb_schedule_( event_deferred_cb_schedule_(
event_base_get_deferred_cb_queue_(req->base->event_base), req->base->event_base,
&d->deferred); &d->deferred);
} }

View File

@ -53,10 +53,17 @@ extern "C" {
#define ev_ncalls ev_.ev_signal.ev_ncalls #define ev_ncalls ev_.ev_signal.ev_ncalls
#define ev_pncalls ev_.ev_signal.ev_pncalls #define ev_pncalls ev_.ev_signal.ev_pncalls
/* Possible values for ev_closure in struct event. */ #define ev_pri ev_evcallback.evcb_pri
#define EV_CLOSURE_NONE 0 #define ev_flags ev_evcallback.evcb_flags
#define EV_CLOSURE_SIGNAL 1 #define ev_closure ev_evcallback.evcb_closure
#define EV_CLOSURE_PERSIST 2 #define ev_callback ev_evcallback.evcb_cb_union.evcb_callback
#define ev_arg ev_evcallback.evcb_arg
/* Possible values for evcb_closure in struct event_callback */
#define EV_CLOSURE_EVENT 0
#define EV_CLOSURE_EVENT_SIGNAL 1
#define EV_CLOSURE_EVENT_PERSIST 2
#define EV_CLOSURE_CB_SELF 3
/** Structure to define the backend of a given event_base. */ /** Structure to define the backend of a given event_base. */
struct eventop { struct eventop {
@ -170,6 +177,8 @@ extern int event_debug_mode_on_;
#define EVENT_DEBUG_MODE_IS_ON() (0) #define EVENT_DEBUG_MODE_IS_ON() (0)
#endif #endif
TAILQ_HEAD(evcallback_list, event_callback);
struct event_base { struct event_base {
/** Function pointers and other data to describe this event_base's /** Function pointers and other data to describe this event_base's
* backend. */ * backend. */
@ -209,14 +218,23 @@ struct event_base {
* reentrant invocation. */ * reentrant invocation. */
int running_loop; int running_loop;
/** Set to the number of deferred_cbs we've made 'active' in the
* loop. This is a hack to prevent starvation; it would be smarter
* to just use event_config_set_max_dispatch_interval's max_callbacks
* feature */
int n_deferreds_queued;
/* Active event management. */ /* Active event management. */
/** An array of nactivequeues queues for active events (ones that /** An array of nactivequeues queues for active event_callbacks (ones
* have triggered, and whose callbacks need to be called). Low * that have triggered, and whose callbacks need to be called). Low
* priority numbers are more important, and stall higher ones. * priority numbers are more important, and stall higher ones.
*/ */
struct event_list *activequeues; struct evcallback_list *activequeues;
/** The length of the activequeues array */ /** The length of the activequeues array */
int nactivequeues; int nactivequeues;
/** A list of event_callbacks that should become active the next time
* we process events, but not this time. */
struct evcallback_list active_later_queue;
/* common timeout logic */ /* common timeout logic */
@ -228,10 +246,6 @@ struct event_base {
/** The total size of common_timeout_queues. */ /** The total size of common_timeout_queues. */
int n_common_timeouts_allocated; int n_common_timeouts_allocated;
/** List of defered_cb that are active. We run these after the active
* events. */
struct deferred_cb_queue defer_queue;
/** Mapping from file descriptors to enabled (added) events */ /** Mapping from file descriptors to enabled (added) events */
struct event_io_map io; struct event_io_map io;
@ -266,7 +280,7 @@ struct event_base {
int current_event_waiters; int current_event_waiters;
#endif #endif
/** The event whose callback is executing right now */ /** The event whose callback is executing right now */
struct event *current_event; struct event_callback *current_event;
#ifdef _WIN32 #ifdef _WIN32
/** IOCP support structure, if IOCP is enabled. */ /** IOCP support structure, if IOCP is enabled. */
@ -347,7 +361,7 @@ struct event_config {
#endif /* TAILQ_FOREACH */ #endif /* TAILQ_FOREACH */
#define N_ACTIVE_CALLBACKS(base) \ #define N_ACTIVE_CALLBACKS(base) \
((base)->event_count_active + (base)->defer_queue.active_count) ((base)->event_count_active)
int evsig_set_handler_(struct event_base *base, int evsignal, int evsig_set_handler_(struct event_base *base, int evsignal,
void (*fn)(int)); void (*fn)(int));
@ -355,6 +369,19 @@ int evsig_restore_handler_(struct event_base *base, int evsignal);
void event_active_nolock_(struct event *ev, int res, short count); void event_active_nolock_(struct event *ev, int res, short count);
int event_callback_activate_(struct event_base *, struct event_callback *);
int event_callback_activate_nolock_(struct event_base *, struct event_callback *);
int event_callback_cancel_(struct event_base *base,
struct event_callback *evcb);
void event_active_later_(struct event *ev, int res);
void event_active_later_nolock_(struct event *ev, int res);
void event_callback_activate_later_nolock_(struct event_base *base,
struct event_callback *evcb);
int event_callback_cancel_nolock_(struct event_base *base,
struct event_callback *evcb);
void event_callback_init_(struct event_base *base,
struct event_callback *cb);
/* FIXME document. */ /* FIXME document. */
void event_base_add_virtual_(struct event_base *base); void event_base_add_virtual_(struct event_base *base);

607
event.c
View File

@ -135,12 +135,16 @@ static inline int event_add_internal(struct event *ev,
const struct timeval *tv, int tv_is_absolute); const struct timeval *tv, int tv_is_absolute);
static inline int event_del_internal(struct event *ev); static inline int event_del_internal(struct event *ev);
static void event_queue_insert_active(struct event_base *, struct event *); static void event_queue_insert_active(struct event_base *, struct event_callback *);
static void event_queue_insert_active_later(struct event_base *, struct event_callback *);
static void event_queue_insert_timeout(struct event_base *, struct event *); static void event_queue_insert_timeout(struct event_base *, struct event *);
static void event_queue_insert_inserted(struct event_base *, struct event *); static void event_queue_insert_inserted(struct event_base *, struct event *);
static void event_queue_remove_active(struct event_base *, struct event *); static void event_queue_remove_active(struct event_base *, struct event_callback *);
static void event_queue_remove_active_later(struct event_base *, struct event_callback *);
static void event_queue_remove_timeout(struct event_base *, struct event *); static void event_queue_remove_timeout(struct event_base *, struct event *);
static void event_queue_remove_inserted(struct event_base *, struct event *); static void event_queue_remove_inserted(struct event_base *, struct event *);
static void event_queue_make_later_events_active(struct event_base *base);
#ifdef USE_REINSERT_TIMEOUT #ifdef USE_REINSERT_TIMEOUT
/* This code seems buggy; only turn it on if we find out what the trouble is. */ /* This code seems buggy; only turn it on if we find out what the trouble is. */
static void event_queue_reinsert_timeout(struct event_base *,struct event *, int was_common, int is_common, int old_timeout_idx); static void event_queue_reinsert_timeout(struct event_base *,struct event *, int was_common, int is_common, int old_timeout_idx);
@ -424,6 +428,19 @@ event_base_update_cache_time(struct event_base *base)
return 0; return 0;
} }
static inline struct event *
event_callback_to_event(struct event_callback *evcb)
{
EVUTIL_ASSERT((evcb->evcb_flags & EVLIST_INIT));
return EVUTIL_UPCAST(evcb, struct event, ev_evcallback);
}
static inline struct event_callback *
event_to_event_callback(struct event *ev)
{
return &ev->ev_evcallback;
}
struct event_base * struct event_base *
event_init(void) event_init(void)
{ {
@ -489,28 +506,6 @@ event_base_get_features(const struct event_base *base)
return base->evsel->features; return base->evsel->features;
} }
void
event_deferred_cb_queue_init_(struct deferred_cb_queue *cb)
{
memset(cb, 0, sizeof(struct deferred_cb_queue));
TAILQ_INIT(&cb->deferred_cb_list);
}
/** Helper for the deferred_cb queue: wake up the event base. */
static void
notify_base_cbq_callback(struct deferred_cb_queue *cb, void *baseptr)
{
struct event_base *base = baseptr;
if (EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base);
}
struct deferred_cb_queue *
event_base_get_deferred_cb_queue_(struct event_base *base)
{
return base ? &base->defer_queue : NULL;
}
void void
event_enable_debug_mode(void) event_enable_debug_mode(void)
{ {
@ -588,10 +583,7 @@ event_base_new_with_config(const struct event_config *cfg)
base->th_notify_fd[0] = -1; base->th_notify_fd[0] = -1;
base->th_notify_fd[1] = -1; base->th_notify_fd[1] = -1;
event_deferred_cb_queue_init_(&base->defer_queue); TAILQ_INIT(&base->active_later_queue);
base->defer_queue.base = base;
base->defer_queue.notify_fn = notify_base_cbq_callback;
base->defer_queue.notify_arg = base;
evmap_io_initmap_(&base->io); evmap_io_initmap_(&base->io);
evmap_signal_initmap_(&base->sigmap); evmap_signal_initmap_(&base->sigmap);
@ -663,7 +655,6 @@ event_base_new_with_config(const struct event_config *cfg)
int r; int r;
EVTHREAD_ALLOC_LOCK(base->th_base_lock, EVTHREAD_ALLOC_LOCK(base->th_base_lock,
EVTHREAD_LOCKTYPE_RECURSIVE); EVTHREAD_LOCKTYPE_RECURSIVE);
base->defer_queue.lock = base->th_base_lock;
EVTHREAD_ALLOC_COND(base->current_event_cond); EVTHREAD_ALLOC_COND(base->current_event_cond);
r = evthread_make_base_notifiable(base); r = evthread_make_base_notifiable(base);
if (r<0) { if (r<0) {
@ -777,15 +768,36 @@ event_base_free(struct event_base *base)
mm_free(base->common_timeout_queues); mm_free(base->common_timeout_queues);
for (i = 0; i < base->nactivequeues; ++i) { for (i = 0; i < base->nactivequeues; ++i) {
for (ev = TAILQ_FIRST(&base->activequeues[i]); ev; ) { struct event_callback *evcb, *next;
struct event *next = TAILQ_NEXT(ev, ev_active_next); for (evcb = TAILQ_FIRST(&base->activequeues[i]); evcb; ) {
if (!(ev->ev_flags & EVLIST_INTERNAL)) { next = TAILQ_NEXT(evcb, evcb_active_next);
event_del(ev); if (evcb->evcb_flags & EVLIST_INIT) {
ev = event_callback_to_event(evcb);
if (!(ev->ev_flags & EVLIST_INTERNAL)) {
event_del(ev);
++n_deleted;
}
} else {
event_callback_cancel_(base, evcb);
++n_deleted; ++n_deleted;
} }
ev = next; evcb = next;
} }
} }
{
struct event_callback *evcb;
while ((evcb = TAILQ_FIRST(&base->active_later_queue))) {
if (evcb->evcb_flags & EVLIST_INIT) {
ev = event_callback_to_event(evcb);
event_del(ev);
++n_deleted;
} else {
event_callback_cancel_(base, evcb);
++n_deleted;
}
}
}
if (n_deleted) if (n_deleted)
event_debug(("%s: %d events were still set in base", event_debug(("%s: %d events were still set in base",
@ -1060,8 +1072,8 @@ event_config_set_max_dispatch_interval(struct event_config *cfg,
cfg->max_dispatch_interval.tv_sec = -1; cfg->max_dispatch_interval.tv_sec = -1;
cfg->max_dispatch_callbacks = cfg->max_dispatch_callbacks =
max_callbacks >= 0 ? max_callbacks : INT_MAX; max_callbacks >= 0 ? max_callbacks : INT_MAX;
if (min_priority <= 0) if (min_priority < 0)
min_priority = 1; min_priority = 0;
cfg->limit_callbacks_after_prio = min_priority; cfg->limit_callbacks_after_prio = min_priority;
return (0); return (0);
} }
@ -1093,8 +1105,8 @@ event_base_priority_init(struct event_base *base, int npriorities)
} }
/* Allocate our priority queues */ /* Allocate our priority queues */
base->activequeues = (struct event_list *) base->activequeues = (struct evcallback_list *)
mm_calloc(npriorities, sizeof(struct event_list)); mm_calloc(npriorities, sizeof(struct evcallback_list));
if (base->activequeues == NULL) { if (base->activequeues == NULL) {
event_warn("%s: calloc", __func__); event_warn("%s: calloc", __func__);
goto err; goto err;
@ -1117,6 +1129,9 @@ event_base_get_npriorities(struct event_base *base)
{ {
int n; int n;
if (base == NULL)
base = current_base;
EVBASE_ACQUIRE_LOCK(base, th_base_lock); EVBASE_ACQUIRE_LOCK(base, th_base_lock);
n = base->nactivequeues; n = base->nactivequeues;
EVBASE_RELEASE_LOCK(base, th_base_lock); EVBASE_RELEASE_LOCK(base, th_base_lock);
@ -1393,51 +1408,67 @@ event_persist_closure(struct event_base *base, struct event *ev)
releasing the lock as we go. This function requires that the lock be held releasing the lock as we go. This function requires that the lock be held
when it's invoked. Returns -1 if we get a signal or an event_break that when it's invoked. Returns -1 if we get a signal or an event_break that
means we should stop processing any active events now. Otherwise returns means we should stop processing any active events now. Otherwise returns
the number of non-internal events that we processed. the number of non-internal event_callbacks that we processed.
*/ */
static int static int
event_process_active_single_queue(struct event_base *base, event_process_active_single_queue(struct event_base *base,
struct event_list *activeq, struct evcallback_list *activeq,
int max_to_process, const struct timeval *endtime) int max_to_process, const struct timeval *endtime)
{ {
struct event *ev; struct event_callback *evcb;
int count = 0; int count = 0;
EVUTIL_ASSERT(activeq != NULL); EVUTIL_ASSERT(activeq != NULL);
for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) { for (evcb = TAILQ_FIRST(activeq); evcb; evcb = TAILQ_FIRST(activeq)) {
if (ev->ev_events & EV_PERSIST) struct event *ev=NULL;
event_queue_remove_active(base, ev); if (evcb->evcb_flags & EVLIST_INIT) {
else ev = event_callback_to_event(evcb);
event_del_internal(ev);
if (!(ev->ev_flags & EVLIST_INTERNAL)) if (ev->ev_events & EV_PERSIST)
event_queue_remove_active(base, evcb);
else
event_del_internal(ev);
event_debug((
"event_process_active: event: %p, %s%scall %p",
ev,
ev->ev_res & EV_READ ? "EV_READ " : " ",
ev->ev_res & EV_WRITE ? "EV_WRITE " : " ",
ev->ev_callback));
} else {
event_queue_remove_active(base, evcb);
event_debug(("event_process_active: event_callback %p, "
"closure %d, call %p",
evcb, evcb->evcb_closure, evcb->evcb_cb_union.evcb_callback));
}
if (!(evcb->evcb_flags & EVLIST_INTERNAL))
++count; ++count;
event_debug((
"event_process_active: event: %p, %s%scall %p",
ev,
ev->ev_res & EV_READ ? "EV_READ " : " ",
ev->ev_res & EV_WRITE ? "EV_WRITE " : " ",
ev->ev_callback));
base->current_event = ev; base->current_event = evcb;
#ifndef EVENT__DISABLE_THREAD_SUPPORT #ifndef EVENT__DISABLE_THREAD_SUPPORT
base->current_event_waiters = 0; base->current_event_waiters = 0;
#endif #endif
switch (ev->ev_closure) { switch (evcb->evcb_closure) {
case EV_CLOSURE_SIGNAL: case EV_CLOSURE_EVENT_SIGNAL:
event_signal_closure(base, ev); event_signal_closure(base, ev);
break; break;
case EV_CLOSURE_PERSIST: case EV_CLOSURE_EVENT_PERSIST:
event_persist_closure(base, ev); event_persist_closure(base, ev);
break; break;
default: case EV_CLOSURE_EVENT:
case EV_CLOSURE_NONE:
EVBASE_RELEASE_LOCK(base, th_base_lock); EVBASE_RELEASE_LOCK(base, th_base_lock);
(*ev->ev_callback)( (*ev->ev_callback)(
ev->ev_fd, ev->ev_res, ev->ev_arg); ev->ev_fd, ev->ev_res, ev->ev_arg);
break; break;
case EV_CLOSURE_CB_SELF:
EVBASE_RELEASE_LOCK(base, th_base_lock);
evcb->evcb_cb_union.evcb_selfcb(evcb, evcb->evcb_arg);
break;
default:
EVUTIL_ASSERT(0);
} }
EVBASE_ACQUIRE_LOCK(base, th_base_lock); EVBASE_ACQUIRE_LOCK(base, th_base_lock);
@ -1466,47 +1497,6 @@ event_process_active_single_queue(struct event_base *base,
return count; return count;
} }
/*
Process up to MAX_DEFERRED of the defered_cb entries in 'queue'. If
*breakptr becomes set to 1, stop. Requires that we start out holding
the lock on 'queue'; releases the lock around 'queue' for each deferred_cb
we process.
*/
static int
event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr,
int max_to_process, const struct timeval *endtime)
{
int count = 0;
struct deferred_cb *cb;
#define MAX_DEFERRED 16
if (max_to_process > MAX_DEFERRED)
max_to_process = MAX_DEFERRED;
#undef MAX_DEFERRED
while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) {
cb->queued = 0;
TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next);
--queue->active_count;
UNLOCK_DEFERRED_QUEUE(queue);
cb->cb(cb, cb->arg);
LOCK_DEFERRED_QUEUE(queue);
if (*breakptr)
return -1;
if (++count >= max_to_process)
break;
if (endtime) {
struct timeval now;
update_time_cache(queue->base);
gettime(queue->base, &now);
if (evutil_timercmp(&now, endtime, >=))
return count;
}
}
return count;
}
/* /*
* Active events are stored in priority queues. Lower priorities are always * Active events are stored in priority queues. Lower priorities are always
* process before higher priorities. Low priority events can starve high * process before higher priorities. Low priority events can starve high
@ -1517,7 +1507,7 @@ static int
event_process_active(struct event_base *base) event_process_active(struct event_base *base)
{ {
/* Caller must hold th_base_lock */ /* Caller must hold th_base_lock */
struct event_list *activeq = NULL; struct evcallback_list *activeq = NULL;
int i, c = 0; int i, c = 0;
const struct timeval *endtime; const struct timeval *endtime;
struct timeval tv; struct timeval tv;
@ -1543,8 +1533,7 @@ event_process_active(struct event_base *base)
c = event_process_active_single_queue(base, activeq, c = event_process_active_single_queue(base, activeq,
maxcb, endtime); maxcb, endtime);
if (c < 0) { if (c < 0) {
base->event_running_priority = -1; goto done;
return -1;
} else if (c > 0) } else if (c > 0)
break; /* Processed a real event; do not break; /* Processed a real event; do not
* consider lower-priority events */ * consider lower-priority events */
@ -1553,9 +1542,9 @@ event_process_active(struct event_base *base)
} }
} }
event_process_deferred_callbacks(&base->defer_queue,&base->event_break, done:
maxcb-c, endtime);
base->event_running_priority = -1; base->event_running_priority = -1;
return c; return c;
} }
@ -1630,6 +1619,25 @@ event_base_loopbreak(struct event_base *event_base)
return r; return r;
} }
int
event_base_loopcontinue(struct event_base *event_base)
{
int r = 0;
if (event_base == NULL)
return (-1);
EVBASE_ACQUIRE_LOCK(event_base, th_base_lock);
event_base->event_continue = 1;
if (EVBASE_NEED_NOTIFY(event_base)) {
r = evthread_notify_base(event_base);
} else {
r = (0);
}
EVBASE_RELEASE_LOCK(event_base, th_base_lock);
return r;
}
int int
event_base_got_break(struct event_base *event_base) event_base_got_break(struct event_base *event_base)
{ {
@ -1694,6 +1702,7 @@ event_base_loop(struct event_base *base, int flags)
while (!done) { while (!done) {
base->event_continue = 0; base->event_continue = 0;
base->n_deferreds_queued = 0;
/* Terminate the loop if we have been asked to */ /* Terminate the loop if we have been asked to */
if (base->event_gotterm) { if (base->event_gotterm) {
@ -1723,6 +1732,8 @@ event_base_loop(struct event_base *base, int flags)
goto done; goto done;
} }
event_queue_make_later_events_active(base);
clear_time_cache(base); clear_time_cache(base);
res = evsel->dispatch(base, tv_p); res = evsel->dispatch(base, tv_p);
@ -1865,13 +1876,13 @@ event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, shor
"EV_READ or EV_WRITE", __func__); "EV_READ or EV_WRITE", __func__);
return -1; return -1;
} }
ev->ev_closure = EV_CLOSURE_SIGNAL; ev->ev_closure = EV_CLOSURE_EVENT_SIGNAL;
} else { } else {
if (events & EV_PERSIST) { if (events & EV_PERSIST) {
evutil_timerclear(&ev->ev_io_timeout); evutil_timerclear(&ev->ev_io_timeout);
ev->ev_closure = EV_CLOSURE_PERSIST; ev->ev_closure = EV_CLOSURE_EVENT_PERSIST;
} else { } else {
ev->ev_closure = EV_CLOSURE_NONE; ev->ev_closure = EV_CLOSURE_EVENT;
} }
} }
@ -1922,8 +1933,11 @@ event_base_get_running_event(struct event_base *base)
{ {
struct event *ev = NULL; struct event *ev = NULL;
EVBASE_ACQUIRE_LOCK(base, th_base_lock); EVBASE_ACQUIRE_LOCK(base, th_base_lock);
if (EVBASE_IN_THREAD(base)) if (EVBASE_IN_THREAD(base)) {
ev = base->current_event; struct event_callback *evcb = base->current_event;
if (evcb->evcb_flags & EVLIST_INIT)
ev = event_callback_to_event(evcb);
}
EVBASE_RELEASE_LOCK(base, th_base_lock); EVBASE_RELEASE_LOCK(base, th_base_lock);
return ev; return ev;
} }
@ -1997,7 +2011,7 @@ event_pending(const struct event *ev, short event, struct timeval *tv)
if (ev->ev_flags & EVLIST_INSERTED) if (ev->ev_flags & EVLIST_INSERTED)
flags |= (ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)); flags |= (ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL));
if (ev->ev_flags & EVLIST_ACTIVE) if (ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))
flags |= ev->ev_res; flags |= ev->ev_res;
if (ev->ev_flags & EVLIST_TIMEOUT) if (ev->ev_flags & EVLIST_TIMEOUT)
flags |= EV_TIMEOUT; flags |= EV_TIMEOUT;
@ -2082,6 +2096,13 @@ event_get_callback_arg(const struct event *ev)
return ev->ev_arg; return ev->ev_arg;
} }
int
event_get_priority(const struct event *ev)
{
event_debug_assert_is_setup_(ev);
return ev->ev_pri;
}
int int
event_add(struct event *ev, const struct timeval *tv) event_add(struct event *ev, const struct timeval *tv)
{ {
@ -2138,7 +2159,7 @@ evthread_notify_base_eventfd(struct event_base *base)
/** Tell the thread currently running the event_loop for base (if any) that it /** Tell the thread currently running the event_loop for base (if any) that it
* needs to stop waiting in its dispatch function (if it is) and process all * needs to stop waiting in its dispatch function (if it is) and process all
* active events and deferred callbacks (if there are any). */ * active callbacks. */
static int static int
evthread_notify_base(struct event_base *base) evthread_notify_base(struct event_base *base)
{ {
@ -2192,7 +2213,8 @@ event_add_internal(struct event *ev, const struct timeval *tv,
* until the callback is done before we mess with the event, or else * until the callback is done before we mess with the event, or else
* we can race on ev_ncalls and ev_pncalls below. */ * we can race on ev_ncalls and ev_pncalls below. */
#ifndef EVENT__DISABLE_THREAD_SUPPORT #ifndef EVENT__DISABLE_THREAD_SUPPORT
if (base->current_event == ev && (ev->ev_events & EV_SIGNAL) if (base->current_event == event_to_event_callback(ev) &&
(ev->ev_events & EV_SIGNAL)
&& !EVBASE_IN_THREAD(base)) { && !EVBASE_IN_THREAD(base)) {
++base->current_event_waiters; ++base->current_event_waiters;
EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
@ -2200,7 +2222,7 @@ event_add_internal(struct event *ev, const struct timeval *tv,
#endif #endif
if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) && if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
!(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) { !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) {
if (ev->ev_events & (EV_READ|EV_WRITE)) if (ev->ev_events & (EV_READ|EV_WRITE))
res = evmap_io_add_(base, ev->ev_fd, ev); res = evmap_io_add_(base, ev->ev_fd, ev);
else if (ev->ev_events & EV_SIGNAL) else if (ev->ev_events & EV_SIGNAL)
@ -2232,7 +2254,7 @@ event_add_internal(struct event *ev, const struct timeval *tv,
* *
* If tv_is_absolute, this was already set. * If tv_is_absolute, this was already set.
*/ */
if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute) if (ev->ev_closure == EV_CLOSURE_EVENT_PERSIST && !tv_is_absolute)
ev->ev_io_timeout = *tv; ev->ev_io_timeout = *tv;
#ifndef USE_REINSERT_TIMEOUT #ifndef USE_REINSERT_TIMEOUT
@ -2256,7 +2278,7 @@ event_add_internal(struct event *ev, const struct timeval *tv,
} }
} }
event_queue_remove_active(base, ev); event_queue_remove_active(base, event_to_event_callback(ev));
} }
gettime(base, &now); gettime(base, &now);
@ -2356,7 +2378,8 @@ event_del_internal(struct event *ev)
* user-supplied argument. */ * user-supplied argument. */
base = ev->ev_base; base = ev->ev_base;
#ifndef EVENT__DISABLE_THREAD_SUPPORT #ifndef EVENT__DISABLE_THREAD_SUPPORT
if (base->current_event == ev && !EVBASE_IN_THREAD(base)) { if (base->current_event == event_to_event_callback(ev) &&
!EVBASE_IN_THREAD(base)) {
++base->current_event_waiters; ++base->current_event_waiters;
EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
} }
@ -2384,7 +2407,9 @@ event_del_internal(struct event *ev)
} }
if (ev->ev_flags & EVLIST_ACTIVE) if (ev->ev_flags & EVLIST_ACTIVE)
event_queue_remove_active(base, ev); event_queue_remove_active(base, event_to_event_callback(ev));
else if (ev->ev_flags & EVLIST_ACTIVE_LATER)
event_queue_remove_active_later(base, event_to_event_callback(ev));
if (ev->ev_flags & EVLIST_INSERTED) { if (ev->ev_flags & EVLIST_INSERTED) {
event_queue_remove_inserted(base, ev); event_queue_remove_inserted(base, ev);
@ -2434,25 +2459,33 @@ event_active_nolock_(struct event *ev, int res, short ncalls)
event_debug(("event_active: %p (fd %d), res %d, callback %p", event_debug(("event_active: %p (fd %d), res %d, callback %p",
ev, (int)ev->ev_fd, (int)res, ev->ev_callback)); ev, (int)ev->ev_fd, (int)res, ev->ev_callback));
/* We get different kinds of events, add them together */
if (ev->ev_flags & EVLIST_ACTIVE) {
ev->ev_res |= res;
return;
}
base = ev->ev_base; base = ev->ev_base;
EVENT_BASE_ASSERT_LOCKED(base); EVENT_BASE_ASSERT_LOCKED(base);
ev->ev_res = res; switch ((ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) {
default:
case EVLIST_ACTIVE|EVLIST_ACTIVE_LATER:
EVUTIL_ASSERT(0);
break;
case EVLIST_ACTIVE:
/* We get different kinds of events, add them together */
ev->ev_res |= res;
return;
case EVLIST_ACTIVE_LATER:
ev->ev_res |= res;
break;
case 0:
ev->ev_res = res;
break;
}
if (ev->ev_pri < base->event_running_priority) if (ev->ev_pri < base->event_running_priority)
base->event_continue = 1; base->event_continue = 1;
if (ev->ev_events & EV_SIGNAL) { if (ev->ev_events & EV_SIGNAL) {
#ifndef EVENT__DISABLE_THREAD_SUPPORT #ifndef EVENT__DISABLE_THREAD_SUPPORT
if (base->current_event == ev && !EVBASE_IN_THREAD(base)) { if (base->current_event == event_to_event_callback(ev) &&
!EVBASE_IN_THREAD(base)) {
++base->current_event_waiters; ++base->current_event_waiters;
EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
} }
@ -2461,60 +2494,171 @@ event_active_nolock_(struct event *ev, int res, short ncalls)
ev->ev_pncalls = NULL; ev->ev_pncalls = NULL;
} }
event_queue_insert_active(base, ev); event_callback_activate_nolock_(base, event_to_event_callback(ev));
}
void
event_active_later_(struct event *ev, int res)
{
EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
event_active_later_nolock_(ev, res);
EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
}
void
event_active_later_nolock_(struct event *ev, int res)
{
struct event_base *base = ev->ev_base;
EVENT_BASE_ASSERT_LOCKED(base);
if (ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) {
/* We get different kinds of events, add them together */
ev->ev_res |= res;
return;
}
ev->ev_res = res;
event_callback_activate_later_nolock_(base, event_to_event_callback(ev));
}
int
event_callback_activate_(struct event_base *base,
struct event_callback *evcb)
{
int r;
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
r = event_callback_activate_nolock_(base, evcb);
EVBASE_RELEASE_LOCK(base, th_base_lock);
return r;
}
int
event_callback_activate_nolock_(struct event_base *base,
struct event_callback *evcb)
{
int r = 1;
switch (evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) {
default:
EVUTIL_ASSERT(0);
case EVLIST_ACTIVE_LATER:
event_queue_remove_active_later(base, evcb);
r = 0;
break;
case EVLIST_ACTIVE:
return 0;
case 0:
break;
}
event_queue_insert_active(base, evcb);
if (EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base);
return r;
}
void
event_callback_activate_later_nolock_(struct event_base *base,
struct event_callback *evcb)
{
if (evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))
return;
event_queue_insert_active_later(base, evcb);
if (EVBASE_NEED_NOTIFY(base)) if (EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base); evthread_notify_base(base);
} }
void void
event_deferred_cb_init_(struct deferred_cb *cb, deferred_cb_fn fn, void *arg) event_callback_init_(struct event_base *base,
struct event_callback *cb)
{ {
memset(cb, 0, sizeof(struct deferred_cb)); memset(cb, 0, sizeof(*cb));
cb->cb = fn; cb->evcb_pri = base->nactivequeues - 1;
cb->arg = arg; }
int
event_callback_cancel_(struct event_base *base,
struct event_callback *evcb)
{
int r;
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
r = event_callback_cancel_nolock_(base, evcb);
EVBASE_RELEASE_LOCK(base, th_base_lock);
return r;
}
int
event_callback_cancel_nolock_(struct event_base *base,
struct event_callback *evcb)
{
if (evcb->evcb_flags & EVLIST_INIT)
return event_del_internal(event_callback_to_event(evcb));
switch ((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) {
default:
case EVLIST_ACTIVE|EVLIST_ACTIVE_LATER:
EVUTIL_ASSERT(0);
break;
case EVLIST_ACTIVE:
/* We get different kinds of events, add them together */
event_queue_remove_active(base, evcb);
return 0;
case EVLIST_ACTIVE_LATER:
event_queue_remove_active_later(base, evcb);
break;
case 0:
break;
}
event_base_assert_ok_(base);
return 0;
} }
void void
event_deferred_cb_cancel_(struct deferred_cb_queue *queue, event_deferred_cb_init_(struct event_callback *cb, ev_uint8_t priority, deferred_cb_fn fn, void *arg)
struct deferred_cb *cb)
{ {
if (!queue) { memset(cb, 0, sizeof(*cb));
if (current_base) cb->evcb_cb_union.evcb_selfcb = fn;
queue = &current_base->defer_queue; cb->evcb_arg = arg;
else cb->evcb_pri = priority;
return; cb->evcb_closure = EV_CLOSURE_CB_SELF;
}
LOCK_DEFERRED_QUEUE(queue);
if (cb->queued) {
TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next);
--queue->active_count;
cb->queued = 0;
}
UNLOCK_DEFERRED_QUEUE(queue);
} }
void void
event_deferred_cb_schedule_(struct deferred_cb_queue *queue, event_deferred_cb_set_priority_(struct event_callback *cb, ev_uint8_t priority)
struct deferred_cb *cb)
{ {
if (!queue) { cb->evcb_pri = priority;
if (current_base) }
queue = &current_base->defer_queue;
else
return;
}
LOCK_DEFERRED_QUEUE(queue); void
if (!cb->queued) { event_deferred_cb_cancel_(struct event_base *base, struct event_callback *cb)
cb->queued = 1; {
TAILQ_INSERT_TAIL(&queue->deferred_cb_list, cb, cb_next); if (!base)
++queue->active_count; base = current_base;
if (queue->notify_fn) event_callback_cancel_(base, cb);
queue->notify_fn(queue, queue->notify_arg); }
#define MAX_DEFERREDS_QUEUED 32
int
event_deferred_cb_schedule_(struct event_base *base, struct event_callback *cb)
{
int r = 1;
if (!base)
base = current_base;
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
if (base->n_deferreds_queued > MAX_DEFERREDS_QUEUED) {
event_callback_activate_later_nolock_(base, cb);
} else {
++base->n_deferreds_queued;
r = event_callback_activate_nolock_(base, cb);
} }
UNLOCK_DEFERRED_QUEUE(queue); EVBASE_RELEASE_LOCK(base, th_base_lock);
return r;
} }
static int static int
@ -2585,13 +2729,13 @@ timeout_process(struct event_base *base)
#error "Mismatch for value of EVLIST_INTERNAL" #error "Mismatch for value of EVLIST_INTERNAL"
#endif #endif
/* These are a fancy way to spell /* These are a fancy way to spell
if (~ev->ev_flags & EVLIST_INTERNAL) if (flags & EVLIST_INTERNAL)
base->event_count--/++; base->event_count--/++;
*/ */
#define DECR_EVENT_COUNT(base,ev) \ #define DECR_EVENT_COUNT(base,flags) \
((base)->event_count -= (~((ev)->ev_flags >> 4) & 1)) ((base)->event_count -= (~((flags) >> 4) & 1))
#define INCR_EVENT_COUNT(base, ev) \ #define INCR_EVENT_COUNT(base,flags) \
((base)->event_count += (~((ev)->ev_flags >> 4) & 1)) ((base)->event_count += (~((flags) >> 4) & 1))
static void static void
event_queue_remove_inserted(struct event_base *base, struct event *ev) event_queue_remove_inserted(struct event_base *base, struct event *ev)
@ -2602,23 +2746,39 @@ event_queue_remove_inserted(struct event_base *base, struct event *ev)
ev, ev->ev_fd, EVLIST_INSERTED); ev, ev->ev_fd, EVLIST_INSERTED);
return; return;
} }
DECR_EVENT_COUNT(base, ev); DECR_EVENT_COUNT(base, ev->ev_flags);
ev->ev_flags &= ~EVLIST_INSERTED; ev->ev_flags &= ~EVLIST_INSERTED;
} }
static void static void
event_queue_remove_active(struct event_base *base, struct event *ev) event_queue_remove_active(struct event_base *base, struct event_callback *evcb)
{ {
EVENT_BASE_ASSERT_LOCKED(base); EVENT_BASE_ASSERT_LOCKED(base);
if (EVUTIL_FAILURE_CHECK(!(ev->ev_flags & EVLIST_ACTIVE))) { if (EVUTIL_FAILURE_CHECK(!(evcb->evcb_flags & EVLIST_ACTIVE))) {
event_errx(1, "%s: %p(fd %d) not on queue %x", __func__, event_errx(1, "%s: %p not on queue %x", __func__,
ev, ev->ev_fd, EVLIST_ACTIVE); evcb, EVLIST_ACTIVE);
return; return;
} }
DECR_EVENT_COUNT(base, ev); DECR_EVENT_COUNT(base, evcb->evcb_flags);
ev->ev_flags &= ~EVLIST_ACTIVE; evcb->evcb_flags &= ~EVLIST_ACTIVE;
base->event_count_active--; base->event_count_active--;
TAILQ_REMOVE(&base->activequeues[ev->ev_pri],
ev, ev_active_next); TAILQ_REMOVE(&base->activequeues[evcb->evcb_pri],
evcb, evcb_active_next);
}
static void
event_queue_remove_active_later(struct event_base *base, struct event_callback *evcb)
{
EVENT_BASE_ASSERT_LOCKED(base);
if (EVUTIL_FAILURE_CHECK(!(evcb->evcb_flags & EVLIST_ACTIVE_LATER))) {
event_errx(1, "%s: %p not on queue %x", __func__,
evcb, EVLIST_ACTIVE_LATER);
return;
}
DECR_EVENT_COUNT(base, evcb->evcb_flags);
evcb->evcb_flags &= ~EVLIST_ACTIVE_LATER;
base->event_count_active--;
TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next);
} }
static void static void
event_queue_remove_timeout(struct event_base *base, struct event *ev) event_queue_remove_timeout(struct event_base *base, struct event *ev)
@ -2629,7 +2789,7 @@ event_queue_remove_timeout(struct event_base *base, struct event *ev)
ev, ev->ev_fd, EVLIST_TIMEOUT); ev, ev->ev_fd, EVLIST_TIMEOUT);
return; return;
} }
DECR_EVENT_COUNT(base, ev); DECR_EVENT_COUNT(base, ev->ev_flags);
ev->ev_flags &= ~EVLIST_TIMEOUT; ev->ev_flags &= ~EVLIST_TIMEOUT;
if (is_common_timeout(&ev->ev_timeout, base)) { if (is_common_timeout(&ev->ev_timeout, base)) {
@ -2725,28 +2885,45 @@ event_queue_insert_inserted(struct event_base *base, struct event *ev)
return; return;
} }
INCR_EVENT_COUNT(base, ev); INCR_EVENT_COUNT(base, ev->ev_flags);
ev->ev_flags |= EVLIST_INSERTED; ev->ev_flags |= EVLIST_INSERTED;
} }
static void static void
event_queue_insert_active(struct event_base *base, struct event *ev) event_queue_insert_active(struct event_base *base, struct event_callback *evcb)
{ {
EVENT_BASE_ASSERT_LOCKED(base); EVENT_BASE_ASSERT_LOCKED(base);
if (ev->ev_flags & EVLIST_ACTIVE) { if (evcb->evcb_flags & EVLIST_ACTIVE) {
/* Double insertion is possible for active events */ /* Double insertion is possible for active events */
return; return;
} }
INCR_EVENT_COUNT(base, ev); INCR_EVENT_COUNT(base, evcb->evcb_flags);
ev->ev_flags |= EVLIST_ACTIVE; evcb->evcb_flags |= EVLIST_ACTIVE;
base->event_count_active++; base->event_count_active++;
TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri], EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues);
ev,ev_active_next); TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri],
evcb, evcb_active_next);
}
static void
event_queue_insert_active_later(struct event_base *base, struct event_callback *evcb)
{
EVENT_BASE_ASSERT_LOCKED(base);
if (evcb->evcb_flags & (EVLIST_ACTIVE_LATER|EVLIST_ACTIVE)) {
/* Double insertion is possible */
return;
}
INCR_EVENT_COUNT(base, evcb->evcb_flags);
evcb->evcb_flags |= EVLIST_ACTIVE_LATER;
base->event_count_active++;
EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues);
TAILQ_INSERT_TAIL(&base->active_later_queue, evcb, evcb_active_next);
} }
static void static void
@ -2760,7 +2937,7 @@ event_queue_insert_timeout(struct event_base *base, struct event *ev)
return; return;
} }
INCR_EVENT_COUNT(base, ev); INCR_EVENT_COUNT(base, ev->ev_flags);
ev->ev_flags |= EVLIST_TIMEOUT; ev->ev_flags |= EVLIST_TIMEOUT;
@ -2773,6 +2950,21 @@ event_queue_insert_timeout(struct event_base *base, struct event *ev)
} }
} }
static void
event_queue_make_later_events_active(struct event_base *base)
{
struct event_callback *evcb;
EVENT_BASE_ASSERT_LOCKED(base);
while ((evcb = TAILQ_FIRST(&base->active_later_queue))) {
TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next);
evcb->evcb_flags = (evcb->evcb_flags & ~EVLIST_ACTIVE_LATER) | EVLIST_ACTIVE;
EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues);
TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], evcb, evcb_active_next);
base->n_deferreds_queued += (evcb->evcb_closure == EV_CLOSURE_CB_SELF);
}
}
/* Functions for debugging */ /* Functions for debugging */
const char * const char *
@ -3034,11 +3226,15 @@ event_base_foreach_event_(struct event_base *base,
/* Finally, we deal wit all the active events that we haven't touched /* Finally, we deal wit all the active events that we haven't touched
* yet. */ * yet. */
for (i = 0; i < base->nactivequeues; ++i) { for (i = 0; i < base->nactivequeues; ++i) {
TAILQ_FOREACH(ev, &base->activequeues[i], ev_active_next) { struct event_callback *evcb;
if (ev->ev_flags & (EVLIST_INSERTED|EVLIST_TIMEOUT)) { TAILQ_FOREACH(evcb, &base->activequeues[i], evcb_active_next) {
/* we already processed this one */ if ((evcb->evcb_flags & (EVLIST_INIT|EVLIST_INSERTED|EVLIST_TIMEOUT)) != EVLIST_INIT) {
/* This isn't an event (evlist_init clear), or
* we already processed it. (inserted or
* timeout set */
continue; continue;
} }
ev = event_callback_to_event(evcb);
if ((r = fn(base, ev, arg))) if ((r = fn(base, ev, arg)))
return r; return r;
} }
@ -3088,16 +3284,17 @@ dump_active_event_fn(struct event_base *base, struct event *e, void *arg)
const char *gloss = (e->ev_events & EV_SIGNAL) ? const char *gloss = (e->ev_events & EV_SIGNAL) ?
"sig" : "fd "; "sig" : "fd ";
if (! (e->ev_flags & EVLIST_ACTIVE)) if (! (e->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)))
return 0; return 0;
fprintf(output, " %p [%s %ld, priority=%d]%s%s%s%s active%s\n", fprintf(output, " %p [%s %ld, priority=%d]%s%s%s%s active%s%s\n",
(void*)e, gloss, (long)e->ev_fd, e->ev_pri, (void*)e, gloss, (long)e->ev_fd, e->ev_pri,
(e->ev_res&EV_READ)?" Read":"", (e->ev_res&EV_READ)?" Read":"",
(e->ev_res&EV_WRITE)?" Write":"", (e->ev_res&EV_WRITE)?" Write":"",
(e->ev_res&EV_SIGNAL)?" Signal":"", (e->ev_res&EV_SIGNAL)?" Signal":"",
(e->ev_res&EV_TIMEOUT)?" Timeout":"", (e->ev_res&EV_TIMEOUT)?" Timeout":"",
(e->ev_flags&EVLIST_INTERNAL)?" [Internal]":""); (e->ev_flags&EVLIST_INTERNAL)?" [Internal]":"",
(e->ev_flags&EVLIST_ACTIVE_LATER)?" [NextTime]":"");
return 0; return 0;
} }
@ -3196,6 +3393,7 @@ void
event_base_assert_ok_(struct event_base *base) event_base_assert_ok_(struct event_base *base)
{ {
int i; int i;
int count;
EVBASE_ACQUIRE_LOCK(base, th_base_lock); EVBASE_ACQUIRE_LOCK(base, th_base_lock);
/* First do checks on the per-fd and per-signal lists */ /* First do checks on the per-fd and per-signal lists */
@ -3230,14 +3428,25 @@ event_base_assert_ok_(struct event_base *base)
} }
/* Check the active queues. */ /* Check the active queues. */
count = 0;
for (i = 0; i < base->nactivequeues; ++i) { for (i = 0; i < base->nactivequeues; ++i) {
struct event *ev; struct event_callback *evcb;
EVUTIL_ASSERT_TAILQ_OK(&base->activequeues[i], event, ev_active_next); EVUTIL_ASSERT_TAILQ_OK(&base->activequeues[i], event_callback, evcb_active_next);
TAILQ_FOREACH(ev, &base->activequeues[i], ev_active_next) { TAILQ_FOREACH(evcb, &base->activequeues[i], evcb_active_next) {
EVUTIL_ASSERT(ev->ev_pri == i); EVUTIL_ASSERT((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) == EVLIST_ACTIVE);
EVUTIL_ASSERT(ev->ev_flags & EVLIST_ACTIVE); EVUTIL_ASSERT(evcb->evcb_pri == i);
++count;
} }
} }
{
struct event_callback *evcb;
TAILQ_FOREACH(evcb, &base->active_later_queue, evcb_active_next) {
EVUTIL_ASSERT((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) == EVLIST_ACTIVE_LATER);
++count;
}
}
EVUTIL_ASSERT(count == base->event_count_active);
EVBASE_RELEASE_LOCK(base, th_base_lock); EVBASE_RELEASE_LOCK(base, th_base_lock);
} }

View File

@ -103,7 +103,7 @@ struct evhttp_connection {
void (*closecb)(struct evhttp_connection *, void *); void (*closecb)(struct evhttp_connection *, void *);
void *closecb_arg; void *closecb_arg;
struct deferred_cb read_more_deferred_cb; struct event_callback read_more_deferred_cb;
struct event_base *base; struct event_base *base;
struct evdns_base *dns_base; struct evdns_base *dns_base;

8
http.c
View File

@ -1017,7 +1017,7 @@ evhttp_read_body(struct evhttp_connection *evcon, struct evhttp_request *req)
} }
#define get_deferred_queue(evcon) \ #define get_deferred_queue(evcon) \
(event_base_get_deferred_cb_queue_((evcon)->base)) ((evcon)->base)
/* /*
* Gets called when more data becomes available * Gets called when more data becomes available
@ -1079,7 +1079,7 @@ evhttp_read_cb(struct bufferevent *bufev, void *arg)
} }
static void static void
evhttp_deferred_read_cb(struct deferred_cb *cb, void *data) evhttp_deferred_read_cb(struct event_callback *cb, void *data)
{ {
struct evhttp_connection *evcon = data; struct evhttp_connection *evcon = data;
evhttp_read_cb(evcon->bufev, evcon); evhttp_read_cb(evcon->bufev, evcon);
@ -2197,7 +2197,9 @@ evhttp_connection_base_bufferevent_new(struct event_base *base, struct evdns_bas
bufferevent_base_set(base, evcon->bufev); bufferevent_base_set(base, evcon->bufev);
} }
event_deferred_cb_init_(&evcon->read_more_deferred_cb, event_deferred_cb_init_(
&evcon->read_more_deferred_cb,
bufferevent_get_priority(bev),
evhttp_deferred_read_cb, evcon); evhttp_deferred_read_cb, evcon);
evcon->dns_base = dnsbase; evcon->dns_base = dnsbase;

View File

@ -273,6 +273,12 @@ struct event_base *bufferevent_get_base(struct bufferevent *bev);
*/ */
int bufferevent_priority_set(struct bufferevent *bufev, int pri); int bufferevent_priority_set(struct bufferevent *bufev, int pri);
/**
Return the priority of a bufferevent.
Only supported for socket bufferevents
*/
int bufferevent_get_priority(struct bufferevent *bufev);
/** /**
Deallocate the storage associated with a bufferevent structure. Deallocate the storage associated with a bufferevent structure.

View File

@ -758,6 +758,25 @@ int event_base_loopexit(struct event_base *, const struct timeval *);
*/ */
int event_base_loopbreak(struct event_base *); int event_base_loopbreak(struct event_base *);
/**
Tell the active event_base_loop() to scan for new events immediately.
Calling this function makes the currently active event_base_loop()
start the loop over again (scanning for new events) after the current
event callback finishes. If the event loop is not running, this
function has no effect.
event_base_loopbreak() is typically invoked from this event's callback.
This behavior is analogous to the "continue;" statement.
Subsequent invocations of event loop will proceed normally.
@param eb the event_base structure returned by event_init()
@return 0 if successful, or -1 if an error occurred
@see event_base_loopbreak()
*/
int event_base_loopcontinue(struct event_base *);
/** /**
Checks if the event loop was told to exit by event_loopexit(). Checks if the event loop was told to exit by event_loopexit().
@ -1128,6 +1147,12 @@ event_callback_fn event_get_callback(const struct event *ev);
*/ */
void *event_get_callback_arg(const struct event *ev); void *event_get_callback_arg(const struct event *ev);
/**
Return the priority of an event.
@see event_priority_init(), event_get_priority()
*/
int event_get_priority(const struct event *ev);
/** /**
Extract _all_ of arguments given to construct a given event. The Extract _all_ of arguments given to construct a given event. The
event_base is copied into *base_out, the fd is copied into *fd_out, and so event_base is copied into *base_out, the fd is copied into *fd_out, and so
@ -1230,7 +1255,7 @@ int event_base_get_npriorities(struct event_base *eb);
@param ev an event struct @param ev an event struct
@param priority the new priority to be assigned @param priority the new priority to be assigned
@return 0 if successful, or -1 if an error occurred @return 0 if successful, or -1 if an error occurred
@see event_priority_init() @see event_priority_init(), event_get_priority()
*/ */
int event_priority_set(struct event *, int); int event_priority_set(struct event *, int);

View File

@ -54,15 +54,15 @@ extern "C" {
/* For evkeyvalq */ /* For evkeyvalq */
#include <event2/keyvalq_struct.h> #include <event2/keyvalq_struct.h>
#define EVLIST_TIMEOUT 0x01 #define EVLIST_TIMEOUT 0x01
#define EVLIST_INSERTED 0x02 #define EVLIST_INSERTED 0x02
#define EVLIST_SIGNAL 0x04 #define EVLIST_SIGNAL 0x04
#define EVLIST_ACTIVE 0x08 #define EVLIST_ACTIVE 0x08
#define EVLIST_INTERNAL 0x10 #define EVLIST_INTERNAL 0x10
#define EVLIST_INIT 0x80 #define EVLIST_ACTIVE_LATER 0x20
#define EVLIST_INIT 0x80
/* EVLIST_X_ Private space: 0x1000-0xf000 */ #define EVLIST_ALL 0xbf
#define EVLIST_ALL (0xf000 | 0x9f)
/* Fix so that people don't have to run with <sys/queue.h> */ /* Fix so that people don't have to run with <sys/queue.h> */
#ifndef TAILQ_ENTRY #ifndef TAILQ_ENTRY
@ -93,9 +93,22 @@ struct { \
} }
#endif /* !TAILQ_ENTRY */ #endif /* !TAILQ_ENTRY */
struct event_callback {
TAILQ_ENTRY(event_callback) evcb_active_next;
short evcb_flags;
ev_uint8_t evcb_pri; /* smaller numbers are higher priority */
ev_uint8_t evcb_closure;
/* allows us to adopt for different types of events */
union {
void (*evcb_callback)(evutil_socket_t, short, void *arg);
void (*evcb_selfcb)(struct event_callback *, void *arg);
} evcb_cb_union;
void *evcb_arg;
};
struct event_base; struct event_base;
struct event { struct event {
TAILQ_ENTRY(event) ev_active_next; struct event_callback ev_evcallback;
/* for managing timeouts */ /* for managing timeouts */
union { union {
@ -124,14 +137,7 @@ struct event {
short ev_events; short ev_events;
short ev_res; /* result passed to event callback */ short ev_res; /* result passed to event callback */
short ev_flags;
ev_uint8_t ev_pri; /* smaller numbers are higher priority */
ev_uint8_t ev_closure;
struct timeval ev_timeout; struct timeval ev_timeout;
/* allows us to adopt for different types of events */
void (*ev_callback)(evutil_socket_t, short, void *arg);
void *ev_arg;
}; };
TAILQ_HEAD (event_list, event); TAILQ_HEAD (event_list, event);

View File

@ -440,7 +440,7 @@ struct accepting_socket {
struct event_overlapped overlapped; struct event_overlapped overlapped;
SOCKET s; SOCKET s;
int error; int error;
struct deferred_cb deferred; struct event_callback deferred;
struct evconnlistener_iocp *lev; struct evconnlistener_iocp *lev;
ev_uint8_t buflen; ev_uint8_t buflen;
ev_uint8_t family; ev_uint8_t family;
@ -450,7 +450,7 @@ struct accepting_socket {
static void accepted_socket_cb(struct event_overlapped *o, ev_uintptr_t key, static void accepted_socket_cb(struct event_overlapped *o, ev_uintptr_t key,
ev_ssize_t n, int ok); ev_ssize_t n, int ok);
static void accepted_socket_invoke_user_cb(struct deferred_cb *cb, void *arg); static void accepted_socket_invoke_user_cb(struct event_callback *cb, void *arg);
static void static void
iocp_listener_event_add(struct evconnlistener_iocp *lev) iocp_listener_event_add(struct evconnlistener_iocp *lev)
@ -498,7 +498,8 @@ new_accepting_socket(struct evconnlistener_iocp *lev, int family)
res->family = family; res->family = family;
event_deferred_cb_init_(&res->deferred, event_deferred_cb_init_(&res->deferred,
accepted_socket_invoke_user_cb, res); event_base_get_npriorities(base) / 2,
accepted_socket_invoke_user_cb, res);
InitializeCriticalSectionAndSpinCount(&res->lock, 1000); InitializeCriticalSectionAndSpinCount(&res->lock, 1000);
@ -566,7 +567,7 @@ start_accepting(struct accepting_socket *as)
report_err: report_err:
as->error = error; as->error = error;
event_deferred_cb_schedule_( event_deferred_cb_schedule_(
event_base_get_deferred_cb_queue_(as->lev->event_base), as->lev->event_base,
&as->deferred); &as->deferred);
return 0; return 0;
} }
@ -581,7 +582,7 @@ stop_accepting(struct accepting_socket *as)
} }
static void static void
accepted_socket_invoke_user_cb(struct deferred_cb *dcb, void *arg) accepted_socket_invoke_user_cb(struct event_callback *dcb, void *arg)
{ {
struct accepting_socket *as = arg; struct accepting_socket *as = arg;
@ -658,7 +659,7 @@ accepted_socket_cb(struct event_overlapped *o, ev_uintptr_t key, ev_ssize_t n, i
if (ok) { if (ok) {
/* XXXX Don't do this if some EV_MT flag is set. */ /* XXXX Don't do this if some EV_MT flag is set. */
event_deferred_cb_schedule_( event_deferred_cb_schedule_(
event_base_get_deferred_cb_queue_(as->lev->event_base), as->lev->event_base,
&as->deferred); &as->deferred);
LeaveCriticalSection(&as->lock); LeaveCriticalSection(&as->lock);
} else if (as->free_on_cb) { } else if (as->free_on_cb) {
@ -683,7 +684,7 @@ accepted_socket_cb(struct event_overlapped *o, ev_uintptr_t key, ev_ssize_t n, i
as->error = WSAGetLastError(); as->error = WSAGetLastError();
} }
event_deferred_cb_schedule_( event_deferred_cb_schedule_(
event_base_get_deferred_cb_queue_(as->lev->event_base), as->lev->event_base,
&as->deferred); &as->deferred);
LeaveCriticalSection(&as->lock); LeaveCriticalSection(&as->lock);
} }

View File

@ -1350,6 +1350,66 @@ end:
; ;
} }
static int n_write_a_byte_cb=0;
static int n_read_and_drain_cb=0;
static int n_activate_other_event_cb=0;
static void
write_a_byte_cb(evutil_socket_t fd, short what, void *arg)
{
char buf[] = "x";
write(fd, buf, 1);
++n_write_a_byte_cb;
}
static void
read_and_drain_cb(evutil_socket_t fd, short what, void *arg)
{
char buf[128];
int n;
++n_read_and_drain_cb;
while ((n = read(fd, buf, sizeof(buf))) > 0)
;
}
static void
activate_other_event_cb(evutil_socket_t fd, short what, void *other_)
{
struct event *ev_activate = other_;
++n_activate_other_event_cb;
event_active_later_(ev_activate, EV_READ);
}
static void
test_active_later(void *ptr)
{
struct basic_test_data *data = ptr;
struct event *ev1, *ev2;
struct event ev3, ev4;
struct timeval qsec = {0, 100000};
ev1 = event_new(data->base, data->pair[0], EV_READ|EV_PERSIST, read_and_drain_cb, NULL);
ev2 = event_new(data->base, data->pair[1], EV_WRITE|EV_PERSIST, write_a_byte_cb, NULL);
event_assign(&ev3, data->base, -1, 0, activate_other_event_cb, &ev4);
event_assign(&ev4, data->base, -1, 0, activate_other_event_cb, &ev3);
event_add(ev1, NULL);
event_add(ev2, NULL);
event_active_later_(&ev3, EV_READ);
event_base_loopexit(data->base, &qsec);
event_base_loop(data->base, 0);
TT_BLATHER(("%d write calls, %d read calls, %d activate-other calls.",
n_write_a_byte_cb, n_read_and_drain_cb, n_activate_other_event_cb));
event_del(&ev3);
event_del(&ev4);
tt_int_op(n_write_a_byte_cb, ==, n_activate_other_event_cb);
tt_int_op(n_write_a_byte_cb, >, 100);
tt_int_op(n_read_and_drain_cb, >, 100);
tt_int_op(n_activate_other_event_cb, >, 100);
end:
;
}
static void static void
test_event_base_new(void *ptr) test_event_base_new(void *ptr)
{ {
@ -2468,7 +2528,9 @@ struct testcase_t main_testcases[] = {
BASIC(bad_assign, TT_FORK|TT_NEED_BASE|TT_NO_LOGS), BASIC(bad_assign, TT_FORK|TT_NEED_BASE|TT_NO_LOGS),
BASIC(bad_reentrant, TT_FORK|TT_NEED_BASE|TT_NO_LOGS), BASIC(bad_reentrant, TT_FORK|TT_NEED_BASE|TT_NO_LOGS),
BASIC(active_later, TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR),
/* These are still using the old API */
LEGACY(persistent_timeout, TT_FORK|TT_NEED_BASE), LEGACY(persistent_timeout, TT_FORK|TT_NEED_BASE),
{ "persistent_timeout_jump", test_persistent_timeout_jump, TT_FORK|TT_NEED_BASE, &basic_setup, NULL }, { "persistent_timeout_jump", test_persistent_timeout_jump, TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
{ "persistent_active_timeout", test_persistent_active_timeout, { "persistent_active_timeout", test_persistent_active_timeout,

View File

@ -57,6 +57,7 @@
#include "event2/buffer_compat.h" #include "event2/buffer_compat.h"
#include "event2/util.h" #include "event2/util.h"
#include "defer-internal.h"
#include "evbuffer-internal.h" #include "evbuffer-internal.h"
#include "log-internal.h" #include "log-internal.h"

View File

@ -415,8 +415,8 @@ SLEEP_MS(int ms)
} }
struct deferred_test_data { struct deferred_test_data {
struct deferred_cb cbs[CB_COUNT]; struct event_callback cbs[CB_COUNT];
struct deferred_cb_queue *queue; struct event_base *queue;
}; };
static struct timeval timer_start = {0,0}; static struct timeval timer_start = {0,0};
@ -426,7 +426,7 @@ static THREAD_T load_threads[QUEUE_THREAD_COUNT];
static struct deferred_test_data deferred_data[QUEUE_THREAD_COUNT]; static struct deferred_test_data deferred_data[QUEUE_THREAD_COUNT];
static void static void
deferred_callback(struct deferred_cb *cb, void *arg) deferred_callback(struct event_callback *cb, void *arg)
{ {
SLEEP_MS(1); SLEEP_MS(1);
callback_count += 1; callback_count += 1;
@ -439,7 +439,8 @@ load_deferred_queue(void *arg)
size_t i; size_t i;
for (i = 0; i < CB_COUNT; ++i) { for (i = 0; i < CB_COUNT; ++i) {
event_deferred_cb_init_(&data->cbs[i], deferred_callback, NULL); event_deferred_cb_init_(&data->cbs[i], 0, deferred_callback,
NULL);
event_deferred_cb_schedule_(data->queue, &data->cbs[i]); event_deferred_cb_schedule_(data->queue, &data->cbs[i]);
SLEEP_MS(1); SLEEP_MS(1);
} }
@ -469,23 +470,28 @@ thread_deferred_cb_skew(void *arg)
{ {
struct basic_test_data *data = arg; struct basic_test_data *data = arg;
struct timeval tv_timer = {1, 0}; struct timeval tv_timer = {1, 0};
struct deferred_cb_queue *queue; struct event_base *base = NULL;
struct event_config *cfg = NULL;
struct timeval elapsed; struct timeval elapsed;
int elapsed_usec; int elapsed_usec;
int i; int i;
queue = event_base_get_deferred_cb_queue_(data->base); cfg = event_config_new();
tt_assert(queue); tt_assert(cfg);
event_config_set_max_dispatch_interval(cfg, NULL, 16, 0);
base = event_base_new_with_config(cfg);
tt_assert(base);
for (i = 0; i < QUEUE_THREAD_COUNT; ++i) for (i = 0; i < QUEUE_THREAD_COUNT; ++i)
deferred_data[i].queue = queue; deferred_data[i].queue = base;
evutil_gettimeofday(&timer_start, NULL); evutil_gettimeofday(&timer_start, NULL);
event_base_once(data->base, -1, EV_TIMEOUT, timer_callback, NULL, event_base_once(base, -1, EV_TIMEOUT, timer_callback, NULL,
&tv_timer); &tv_timer);
event_base_once(data->base, -1, EV_TIMEOUT, start_threads_callback, event_base_once(base, -1, EV_TIMEOUT, start_threads_callback,
NULL, NULL); NULL, NULL);
event_base_dispatch(data->base); event_base_dispatch(base);
evutil_timersub(&timer_end, &timer_start, &elapsed); evutil_timersub(&timer_end, &timer_start, &elapsed);
TT_BLATHER(("callback count, %u", callback_count)); TT_BLATHER(("callback count, %u", callback_count));
@ -500,6 +506,10 @@ thread_deferred_cb_skew(void *arg)
end: end:
for (i = 0; i < QUEUE_THREAD_COUNT; ++i) for (i = 0; i < QUEUE_THREAD_COUNT; ++i)
THREAD_JOIN(load_threads[i]); THREAD_JOIN(load_threads[i]);
if (base)
event_base_free(base);
if (cfg)
event_config_free(cfg);
} }
static struct event time_events[5]; static struct event time_events[5];
@ -583,7 +593,8 @@ struct testcase_t thread_testcases[] = {
&basic_setup, (char*)"forking" }, &basic_setup, (char*)"forking" },
#endif #endif
TEST(conditions_simple), TEST(conditions_simple),
TEST(deferred_cb_skew), { "deferred_cb_skew", thread_deferred_cb_skew, TT_FORK|TT_NEED_THREADS,
&basic_setup, NULL },
TEST(no_events), TEST(no_events),
END_OF_TESTCASES END_OF_TESTCASES
}; };