diff --git a/evthread-internal.h b/evthread-internal.h index 2be7c7ac..d710939a 100644 --- a/evthread-internal.h +++ b/evthread-internal.h @@ -40,6 +40,7 @@ struct event_base; /* Global function pointers to lock-related functions. NULL if locking isn't enabled. */ extern struct evthread_lock_callbacks _evthread_lock_fns; +extern struct evthread_condition_callbacks _evthread_cond_fns; extern unsigned long (*_evthread_id_fn)(void); extern int _evthread_lock_debugging_enabled; @@ -126,6 +127,9 @@ extern int _evthread_lock_debugging_enabled; EVLOCK_UNLOCK((base)->lockvar, 0); \ } while (0) +int _evthread_is_debug_lock_held(void *lock); +void *_evthread_debug_get_real_lock(void *lock); + /** If lock debugging is enabled, and lock is non-null, assert that 'lock' is * locked and held by us. */ #define EVLOCK_ASSERT_LOCKED(lock) \ @@ -135,8 +139,6 @@ extern int _evthread_lock_debugging_enabled; } \ } while (0) -int _evthread_is_debug_lock_held(void *lock); - /** Try to grab the lock for 'lockvar' without blocking, and return 1 if we * manage to get it. */ static inline int EVLOCK_TRY_LOCK(void *lock); @@ -153,6 +155,35 @@ EVLOCK_TRY_LOCK(void *lock) } } +/** Allocate a new condition variable and store it in the void *, condvar */ +#define EVTHREAD_ALLOC_COND(condvar) \ + do { \ + (condvar) = _evthread_cond_fns.alloc_condition ? \ + _evthread_cond_fns.alloc_condition(0) : NULL; \ + } while (0) +/** Deallocate and free a condition variable in condvar */ +#define EVTHREAD_FREE_COND(cond) \ + do { \ + if (cond) \ + _evthread_cond_fns.free_condition((cond)); \ + } while (0) +/** Signal one thread waiting on cond */ +#define EVTHREAD_COND_SIGNAL(cond) \ + ( (cond) ? _evthread_cond_fns.signal_condition((cond), 0) : 0 ) +/** Signal all threads waiting on cond */ +#define EVTHREAD_COND_BROADCAST(cond) \ + ( (cond) ? _evthread_cond_fns.signal_condition((cond), 1) : 0 ) +/** Wait until the condition 'cond' is signalled. Must be called while + * holding 'lock'. The lock will be released until the condition is + * signalled, at which point it will be acquired again. Returns 0 for + * success, -1 for failure. */ +#define EVTHREAD_COND_WAIT(cond, lock) \ + ( (cond) ? _evthread_cond_fns.wait_condition((cond), (lock), NULL) : 0 ) +/** As EVTHREAD_COND_WAIT, but gives up after 'tv' has elapsed. Returns 1 + * on timeout. */ +#define EVTHREAD_COND_WAIT_TIMED(cond, lock, tv) \ + ( (cond) ? _evthread_cond_fns.wait_condition((cond), (lock), (tv)) : 0 ) + #else /* _EVENT_DISABLE_THREAD_SUPPORT */ #define EVTHREAD_GET_ID() 1 @@ -170,6 +201,14 @@ EVLOCK_TRY_LOCK(void *lock) #define EVLOCK_ASSERT_LOCKED(lock) _EVUTIL_NIL_STMT #define EVLOCK_TRY_LOCK(lock) 1 + +#define EVTHREAD_ALLOC_COND(condvar) _EVUTIL_NIL_STMT +#define EVTHREAD_FREE_COND(cond) _EVUTIL_NIL_STMT +#define EVTHREAD_COND_SIGNAL(cond) _EVUTIL_NIL_STMT +#define EVTHREAD_COND_BROADCAST(cond) _EVUTIL_NIL_STMT +#define EVTHREAD_COND_WAIT(cond, lock) _EVUTIL_NIL_STMT +#define EVTHREAD_COND_WAIT_TIMED(cond, lock, howlong) _EVUTIL_NIL_STMT + #endif #ifdef __cplusplus diff --git a/evthread.c b/evthread.c index 0868d941..1e895c09 100644 --- a/evthread.c +++ b/evthread.c @@ -43,11 +43,18 @@ int _evthread_lock_debugging_enabled = 0; struct evthread_lock_callbacks _evthread_lock_fns = { 0, 0, NULL, NULL, NULL, NULL }; +unsigned long (*_evthread_id_fn)(void) = NULL; +struct evthread_condition_callbacks _evthread_cond_fns = { + 0, NULL, NULL, NULL, NULL +}; + /* Used for debugging */ static struct evthread_lock_callbacks _original_lock_fns = { 0, 0, NULL, NULL, NULL, NULL }; -unsigned long (*_evthread_id_fn)(void) = NULL; +static struct evthread_condition_callbacks _original_cond_fns = { + 0, NULL, NULL, NULL, NULL +}; void evthread_set_id_callback(unsigned long (*id_fn)(void)) @@ -74,6 +81,27 @@ evthread_set_lock_callbacks(const struct evthread_lock_callbacks *cbs) } } +int +evthread_set_condition_callbacks(const struct evthread_condition_callbacks *cbs) +{ + struct evthread_condition_callbacks *target = + _evthread_lock_debugging_enabled + ? &_original_cond_fns : &_evthread_cond_fns; + + if (!cbs) { + memset(target, 0, sizeof(_evthread_cond_fns)); + } else if (cbs->alloc_condition && cbs->free_condition && + cbs->signal_condition && cbs->wait_condition) { + memcpy(target, cbs, sizeof(_evthread_cond_fns)); + } + if (_evthread_lock_debugging_enabled) { + _evthread_cond_fns.alloc_condition = cbs->alloc_condition; + _evthread_cond_fns.free_condition = cbs->free_condition; + _evthread_cond_fns.signal_condition = cbs->signal_condition; + } + return 0; +} + struct debug_lock { unsigned locktype; unsigned long held_by; @@ -119,6 +147,21 @@ debug_lock_free(void *lock_, unsigned locktype) mm_free(lock); } +static void +evthread_debug_lock_mark_locked(unsigned mode, struct debug_lock *lock) +{ + ++lock->count; + if (!(lock->locktype & EVTHREAD_LOCKTYPE_RECURSIVE)) + EVUTIL_ASSERT(lock->count == 1); + if (_evthread_id_fn) { + unsigned long me; + me = _evthread_id_fn(); + if (lock->count > 1) + EVUTIL_ASSERT(lock->held_by == me); + lock->held_by = me; + } +} + static int debug_lock_lock(unsigned mode, void *lock_) { @@ -131,25 +174,14 @@ debug_lock_lock(unsigned mode, void *lock_) if (_original_lock_fns.lock) res = _original_lock_fns.lock(mode, lock->lock); if (!res) { - ++lock->count; - if (!(lock->locktype & EVTHREAD_LOCKTYPE_RECURSIVE)) - EVUTIL_ASSERT(lock->count == 1); - if (_evthread_id_fn) { - unsigned long me; - me = _evthread_id_fn(); - if (lock->count > 1) - EVUTIL_ASSERT(lock->held_by == me); - lock->held_by = me; - } + evthread_debug_lock_mark_locked(mode, lock); } return res; } -static int -debug_lock_unlock(unsigned mode, void *lock_) +static void +evthread_debug_lock_mark_unlocked(unsigned mode, struct debug_lock *lock) { - struct debug_lock *lock = lock_; - int res = 0; if (lock->locktype & EVTHREAD_LOCKTYPE_READWRITE) EVUTIL_ASSERT(mode & (EVTHREAD_READ|EVTHREAD_WRITE)); else @@ -162,11 +194,31 @@ debug_lock_unlock(unsigned mode, void *lock_) } --lock->count; EVUTIL_ASSERT(lock->count >= 0); +} + +static int +debug_lock_unlock(unsigned mode, void *lock_) +{ + struct debug_lock *lock = lock_; + int res = 0; + evthread_debug_lock_mark_unlocked(mode, lock); if (_original_lock_fns.unlock) res = _original_lock_fns.unlock(mode, lock->lock); return res; } +static int +debug_cond_wait(void *_cond, void *_lock, const struct timeval *tv) +{ + int r; + struct debug_lock *lock = _lock; + EVLOCK_ASSERT_LOCKED(_lock); + evthread_debug_lock_mark_unlocked(0, lock); + r = _original_cond_fns.wait_condition(_cond, lock->lock, tv); + evthread_debug_lock_mark_locked(0, lock); + return r; +} + void evthread_enable_lock_debuging(void) { @@ -184,6 +236,10 @@ evthread_enable_lock_debuging(void) sizeof(struct evthread_lock_callbacks)); memcpy(&_evthread_lock_fns, &cbs, sizeof(struct evthread_lock_callbacks)); + + memcpy(&_original_cond_fns, &_evthread_cond_fns, + sizeof(struct evthread_condition_callbacks)); + _evthread_cond_fns.wait_condition = debug_cond_wait; _evthread_lock_debugging_enabled = 1; } @@ -201,4 +257,11 @@ _evthread_is_debug_lock_held(void *lock_) return 1; } +void * +_evthread_debug_get_real_lock(void *lock_) +{ + struct debug_lock *lock = lock_; + return lock->lock; +} + #endif diff --git a/evthread_pthread.c b/evthread_pthread.c index c5f4c537..59433737 100644 --- a/evthread_pthread.c +++ b/evthread_pthread.c @@ -34,6 +34,7 @@ struct event_base; #include #include "mm-internal.h" +#include "evthread-internal.h" static pthread_mutexattr_t attr_recursive; @@ -89,6 +90,66 @@ evthread_posix_get_id(void) return r.id; } +static void * +evthread_posix_cond_alloc(unsigned condflags) +{ + pthread_cond_t *cond = mm_malloc(sizeof(pthread_cond_t)); + if (!cond) + return NULL; + if (pthread_cond_init(cond, NULL)) { + mm_free(cond); + return NULL; + } + return cond; +} + +static void +evthread_posix_cond_free(void *_cond) +{ + pthread_cond_t *cond = _cond; + pthread_cond_destroy(cond); + mm_free(cond); +} + +static int +evthread_posix_cond_signal(void *_cond, int broadcast) +{ + pthread_cond_t *cond = _cond; + int r; + if (broadcast) + r = pthread_cond_broadcast(cond); + else + r = pthread_cond_signal(cond); + return r ? -1 : 0; +} + +static int +evthread_posix_cond_wait(void *_cond, void *_lock, const struct timeval *tv) +{ + int r; + pthread_cond_t *cond = _cond; + pthread_mutex_t *lock = _lock; + + if (tv) { + struct timeval now, abstime; + struct timespec ts; + evutil_gettimeofday(&now, NULL); + evutil_timeradd(&now, tv, &abstime); + ts.tv_sec = abstime.tv_sec; + ts.tv_nsec = abstime.tv_usec*1000; + r = pthread_cond_timedwait(cond, lock, &ts); + if (r == ETIMEDOUT) + return 1; + else if (r) + return -1; + else + return 0; + } else { + r = pthread_cond_wait(cond, lock); + return r ? -1 : 0; + } +} + int evthread_use_pthreads(void) { @@ -100,6 +161,13 @@ evthread_use_pthreads(void) evthread_posix_lock, evthread_posix_unlock }; + struct evthread_condition_callbacks cond_cbs = { + EVTHREAD_CONDITION_API_VERSION, + evthread_posix_cond_alloc, + evthread_posix_cond_free, + evthread_posix_cond_signal, + evthread_posix_cond_wait + }; /* Set ourselves up to get recursive locks. */ if (pthread_mutexattr_init(&attr_recursive)) return -1; @@ -107,6 +175,7 @@ evthread_use_pthreads(void) return -1; evthread_set_lock_callbacks(&cbs); + evthread_set_condition_callbacks(&cond_cbs); evthread_set_id_callback(evthread_posix_get_id); return 0; } diff --git a/evthread_win32.c b/evthread_win32.c index 7b226a0c..12b8c1e6 100644 --- a/evthread_win32.c +++ b/evthread_win32.c @@ -37,6 +37,7 @@ struct event_base; #include #include "mm-internal.h" +#include "evthread-internal.h" #define SPIN_COUNT 2000 @@ -87,6 +88,198 @@ evthread_win32_get_id(void) return (unsigned long) GetCurrentThreadId(); } +#ifdef WIN32_HAVE_CONDITION_VARIABLES +static void WINAPI (*InitializeConditionVariable_fn)(PCONDITION_VARIABLE) + = NULL; +static BOOL WINAPI (*SleepConditionVariableCS_fn)( + PCONDITION_VARIABLE, PCRITICAL_SECTION, DWORD) = NULL; +static void WINAPI (*WakeAllConditionVariable_fn)(PCONDITION_VARIABLE) = NULL; +static void WINAPI (*WakeConditionVariable_fn)(PCONDITION_VARIABLE) = NULL; + +static int +evthread_win32_condvar_init(void) +{ + HANDLE lib; + + lib = GetModuleHandle(TEXT("kernel32.dll")); + if (lib == NULL) + return 0; + +#define LOAD(name) \ + name##_fn = GetProcAddress(lib, #name) + LOAD(InitializeConditionVariable); + LOAD(SleepConditionVariable); + LOAD(WakeAllConditionVariable); + LOAD(WakeConditionVariable); + + return InitializeConditionVariable_fn && SleepConditionVariableCS_fn && + WakeAllConditionVariable_fn && WakeConditionVariable_fn; +} + +/* XXXX Even if we can build this, we don't necessarily want to: the functions + * in question didn't exist before Vista, so we'd better LoadProc them. */ +static void * +evthread_win32_condvar_alloc(unsigned condflags) +{ + CONDITION_VARIABLE *cond = mm_malloc(sizeof(CONDITION_VARIABLE)); + if (!cond) + return NULL; + InitializeConditionVariable_fn(cond); + return cond; +} + +static void +evthread_win32_condvar_free(void *_cond) +{ + CONDITION_VARIABLE *cond = _cond; + /* There doesn't _seem_ to be a cleaup fn here... */ + mm_free(cond); +} + +static int +evthread_win32_condvar_signal(void *_cond, int broadcast) +{ + CONDITION_VARIABLE *cond = _cond; + if (broadcast) + WakeAllConditionVariable_fn(cond); + else + WakeConditionVariable_fn(cond); + return 0; +} + +static int +evthread_win32_condvar_wait(void *_cond, void *_lock, const struct timeval *tv) +{ + CONDITION_VARIABLE *cond = _cond; + CRITICAL_SECTION *lock = _lock; + DWORD ms, err; + BOOL result; + + if (tv) + ms = evutil_tv_to_msec(tv); + else + ms = INFINITE; + result = SleepConditionVariableCS_fn(cond, lock, ms); + if (result) { + if (GetLastError() == WAIT_TIMEOUT) + return 1; + else + return -1; + } else { + return 0; + } +} +#endif + +struct evthread_win32_cond { + HANDLE event; + + CRITICAL_SECTION lock; + int n_waiting; + int n_to_wake; + int generation; +}; + +static void * +evthread_win32_cond_alloc(unsigned flags) +{ + struct evthread_win32_cond *cond; + if (!(cond = mm_malloc(sizeof(struct evthread_win32_cond)))) + return NULL; + if (InitializeCriticalSectionAndSpinCount(&cond->lock, SPIN_COUNT)==0) { + mm_free(cond); + return NULL; + } + if ((cond->event = CreateEvent(NULL,TRUE,FALSE,NULL)) == NULL) { + DeleteCriticalSection(&cond->lock); + mm_free(cond); + return NULL; + } + cond->n_waiting = cond->n_to_wake = cond->generation = 0; + return cond; +} + +static void +evthread_win32_cond_free(void *_cond) +{ + struct evthread_win32_cond *cond = _cond; + DeleteCriticalSection(&cond->lock); + CloseHandle(cond->event); + mm_free(cond); +} + +static int +evthread_win32_cond_signal(void *_cond, int broadcast) +{ + struct evthread_win32_cond *cond = _cond; + EnterCriticalSection(&cond->lock); + if (broadcast) + cond->n_to_wake = cond->n_waiting; + else + ++cond->n_to_wake; + cond->generation++; + SetEvent(cond->event); + LeaveCriticalSection(&cond->lock); + return 0; +} + +static int +evthread_win32_cond_wait(void *_cond, void *_lock, const struct timeval *tv) +{ + struct evthread_win32_cond *cond = _cond; + CRITICAL_SECTION *lock = _lock; + int generation_at_start; + int waiting = 1; + int result = -1; + DWORD ms = INFINITE, ms_orig = INFINITE, startTime, endTime; + if (tv) + ms_orig = ms = evutil_tv_to_msec(tv); + + EnterCriticalSection(&cond->lock); + ++cond->n_waiting; + generation_at_start = cond->generation; + LeaveCriticalSection(&cond->lock); + + LeaveCriticalSection(lock); + + startTime = GetTickCount(); + do { + DWORD res; + res = WaitForSingleObject(cond->event, ms); + EnterCriticalSection(&cond->lock); + if (cond->n_to_wake && + cond->generation != generation_at_start) { + --cond->n_to_wake; + --cond->n_waiting; + result = 0; + waiting = 0; + } else if (res != WAIT_OBJECT_0) { + result = (res==WAIT_TIMEOUT) ? 1 : -1; + --cond->n_waiting; + waiting = 0; + } else if (ms != INFINITE) { + endTime = GetTickCount(); + if (startTime + ms_orig <= endTime) { + result = 1; /* Timeout */ + --cond->n_waiting; + waiting = 0; + } else { + ms = startTime + ms_orig - endTime; + } + } + LeaveCriticalSection(&cond->lock); + } while (waiting); + + EnterCriticalSection(lock); + + EnterCriticalSection(&cond->lock); + if (!cond->n_waiting) + ResetEvent(cond->event); + LeaveCriticalSection(&cond->lock); + + return result; +} + int evthread_use_windows_threads(void) { @@ -99,8 +292,34 @@ evthread_use_windows_threads(void) evthread_win32_unlock }; + + struct evthread_condition_callbacks cond_cbs = { + EVTHREAD_CONDITION_API_VERSION, + evthread_win32_cond_alloc, + evthread_win32_cond_free, + evthread_win32_cond_signal, + evthread_win32_cond_wait + }; +#ifdef WIN32_HAVE_CONDITION_VARIABLES + struct evthread_condition_callbacks condvar_cbs = { + EVTHREAD_CONDITION_API_VERSION, + evthread_win32_condvar_alloc, + evthread_win32_condvar_free, + evthread_win32_condvar_signal, + evthread_win32_condvar_wait + }; +#endif + evthread_set_lock_callbacks(&cbs); evthread_set_id_callback(evthread_win32_get_id); +#ifdef WIN32_HAVE_CONDITION_VARIABLES + if (evthread_win32_condvar_init()) { + evthread_set_condition_callbacks(&condvar_cbs); + return 0; + } +#endif + evthread_set_condition_callbacks(&cond_cbs); + return 0; } diff --git a/include/event2/thread.h b/include/event2/thread.h index 5e299e65..f51bf3ef 100644 --- a/include/event2/thread.h +++ b/include/event2/thread.h @@ -118,10 +118,61 @@ struct evthread_lock_callbacks { * * Note that if you're using Windows or the Pthreads threading library, you * probably shouldn't call this function; instead, use - * evthread_use_windos_threads() or evthread_use_posix_threads() if you can. + * evthread_use_windows_threads() or evthread_use_posix_threads() if you can. */ int evthread_set_lock_callbacks(const struct evthread_lock_callbacks *); +#define EVTHREAD_CONDITION_API_VERSION 1 + +struct timeval; + +/** This structure describes the interface a threading library uses for + * condition variables. It's used to tell evthread_set_condition_callbacks + * how to use locking on this platform. + */ +struct evthread_condition_callbacks { + /** The current version of the conditions API. Set this to + * EVTHREAD_CONDITION_API_VERSION */ + int condition_api_version; + /** Function to allocate and initialize a new condition variable. + * Returns the condition variable on success, and NULL on failure. + * The 'condtype' argument will be 0 with this API version. + */ + void *(*alloc_condition)(unsigned condtype); + /** Function to free a condition variable. */ + void (*free_condition)(void *cond); + /** Function to signal a condition variable. If 'broadcast' is 1, all + * threads waiting on 'cond' should be woken; otherwise, only on one + * thread is worken. Should return 0 on success, -1 on failure. + * This function will only be called while holding the associated + * lock for the condition. + */ + int (*signal_condition)(void *cond, int broadcast); + /** Function to wait for a condition variable. The lock 'lock' + * will be held when this function is called; should be released + * while waiting for the condition to be come signalled, and + * should be held again when this function returns. + * If timeout is provided, it is interval of seconds to wait for + * the event to become signalled; if it is NULL, the function + * should wait indefinitely. + * + * The function should return -1 on error; 0 if the condition + * was signalled, or 1 on a timeout. */ + int (*wait_condition)(void *cond, void *lock, + const struct timeval *timeout); +}; + +/** Sets a group of functions that Libevent should use for condition variables. + * For full information on the required callback API, see the + * documentation for the individual members of evthread_condition_callbacks. + * + * Note that if you're using Windows or the Pthreads threading library, you + * probably shouldn't call this function; instead, use + * evthread_use_windows_threads() or evthread_use_posix_threads() if you can. + */ +int evthread_set_condition_callbacks( + const struct evthread_condition_callbacks *); + /** Sets the function for determining the thread id.