mirror of
https://github.com/cuberite/libevent.git
synced 2025-09-09 04:19:10 -04:00
Locking support for bufferevents.
svn:r1170
This commit is contained in:
parent
1becc4c4e6
commit
915193e7df
@ -33,6 +33,8 @@ extern "C" {
|
||||
#include "event-config.h"
|
||||
#include "evutil.h"
|
||||
#include "defer-internal.h"
|
||||
#include "evthread-internal.h"
|
||||
#include "event2/thread.h"
|
||||
|
||||
struct bufferevent_private {
|
||||
struct bufferevent bev;
|
||||
@ -42,6 +44,7 @@ struct bufferevent_private {
|
||||
|
||||
/** If set, read is suspended until evbuffer some. */
|
||||
unsigned read_suspended : 1;
|
||||
unsigned own_lock : 1;
|
||||
|
||||
enum bufferevent_options options;
|
||||
|
||||
@ -106,6 +109,22 @@ void bufferevent_wm_suspend_read(struct bufferevent *bufev);
|
||||
* read buffer is too full. */
|
||||
void bufferevent_wm_unsuspend_read(struct bufferevent *bufev);
|
||||
|
||||
int bufferevent_enable_locking(struct bufferevent *bufev, void *lock);
|
||||
|
||||
#define BEV_UPCAST(b) EVUTIL_UPCAST((b), struct bufferevent_private, bev)
|
||||
|
||||
#define BEV_LOCK(b) do { \
|
||||
struct bufferevent_private *locking = BEV_UPCAST(b); \
|
||||
if (locking->lock) \
|
||||
EVLOCK_LOCK(locking->lock, EVTHREAD_WRITE); \
|
||||
} while(0)
|
||||
|
||||
#define BEV_UNLOCK(b) do { \
|
||||
struct bufferevent_private *locking = BEV_UPCAST(b); \
|
||||
if (locking->lock) \
|
||||
EVLOCK_UNLOCK(locking->lock, EVTHREAD_WRITE); \
|
||||
} while(0)
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
@ -59,16 +59,17 @@
|
||||
#include "bufferevent-internal.h"
|
||||
#include "util-internal.h"
|
||||
|
||||
|
||||
void
|
||||
bufferevent_wm_suspend_read(struct bufferevent *bufev)
|
||||
{
|
||||
struct bufferevent_private *bufev_private =
|
||||
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
|
||||
BEV_LOCK(bufev);
|
||||
if (!bufev_private->read_suspended) {
|
||||
bufev->be_ops->disable(bufev, EV_READ);
|
||||
bufev_private->read_suspended = 1;
|
||||
}
|
||||
BEV_LOCK(bufev);
|
||||
}
|
||||
|
||||
void
|
||||
@ -76,11 +77,14 @@ bufferevent_wm_unsuspend_read(struct bufferevent *bufev)
|
||||
{
|
||||
struct bufferevent_private *bufev_private =
|
||||
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
|
||||
|
||||
BEV_LOCK(bufev);
|
||||
if (bufev_private->read_suspended) {
|
||||
bufev_private->read_suspended = 0;
|
||||
if (bufev->enabled & EV_READ)
|
||||
bufev->be_ops->enable(bufev, EV_READ);
|
||||
}
|
||||
BEV_LOCK(bufev);
|
||||
}
|
||||
|
||||
/* Callback to implement watermarks on the input buffer. Only enabled
|
||||
@ -91,7 +95,9 @@ bufferevent_inbuf_wm_cb(struct evbuffer *buf,
|
||||
void *arg)
|
||||
{
|
||||
struct bufferevent *bufev = arg;
|
||||
size_t size = evbuffer_get_length(buf);
|
||||
size_t size;
|
||||
|
||||
size = evbuffer_get_length(buf);
|
||||
|
||||
if (cbinfo->n_added > cbinfo->n_deleted) {
|
||||
/* Data got added. If it put us over the watermark, stop
|
||||
@ -137,6 +143,15 @@ bufferevent_init_common(struct bufferevent_private *bufev_private,
|
||||
*/
|
||||
bufev->enabled = EV_WRITE;
|
||||
|
||||
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
|
||||
if (options & BEV_OPT_THREADSAFE) {
|
||||
if (bufferevent_enable_locking(bufev, NULL) < 0) {
|
||||
/* cleanup */
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
bufev_private->options = options;
|
||||
|
||||
return 0;
|
||||
@ -146,11 +161,14 @@ void
|
||||
bufferevent_setcb(struct bufferevent *bufev,
|
||||
evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
|
||||
{
|
||||
BEV_LOCK(bufev);
|
||||
|
||||
bufev->readcb = readcb;
|
||||
bufev->writecb = writecb;
|
||||
bufev->errorcb = errorcb;
|
||||
|
||||
bufev->cbarg = cbarg;
|
||||
BEV_UNLOCK(bufev);
|
||||
}
|
||||
|
||||
struct evbuffer *
|
||||
@ -206,15 +224,19 @@ bufferevent_enable(struct bufferevent *bufev, short event)
|
||||
struct bufferevent_private *bufev_private =
|
||||
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
|
||||
short impl_events = event;
|
||||
int r = 0;
|
||||
|
||||
BEV_LOCK(bufev);
|
||||
if (bufev_private->read_suspended)
|
||||
impl_events &= ~EV_READ;
|
||||
|
||||
bufev->enabled |= event;
|
||||
|
||||
if (bufev->be_ops->enable(bufev, impl_events) < 0)
|
||||
return -1;
|
||||
r = -1;
|
||||
|
||||
return (0);
|
||||
BEV_UNLOCK(bufev);
|
||||
return r;
|
||||
}
|
||||
|
||||
void
|
||||
@ -222,6 +244,7 @@ bufferevent_set_timeouts(struct bufferevent *bufev,
|
||||
const struct timeval *tv_read,
|
||||
const struct timeval *tv_write)
|
||||
{
|
||||
BEV_LOCK(bufev);
|
||||
if (tv_read) {
|
||||
bufev->timeout_read = *tv_read;
|
||||
} else {
|
||||
@ -235,6 +258,7 @@ bufferevent_set_timeouts(struct bufferevent *bufev,
|
||||
|
||||
if (bufev->be_ops->adj_timeouts)
|
||||
bufev->be_ops->adj_timeouts(bufev);
|
||||
BEV_UNLOCK(bufev);
|
||||
}
|
||||
|
||||
|
||||
@ -265,12 +289,16 @@ bufferevent_settimeout(struct bufferevent *bufev,
|
||||
int
|
||||
bufferevent_disable(struct bufferevent *bufev, short event)
|
||||
{
|
||||
int r = 0;
|
||||
|
||||
BEV_LOCK(bufev);
|
||||
bufev->enabled &= ~event;
|
||||
|
||||
if (bufev->be_ops->disable(bufev, event) < 0)
|
||||
return (-1);
|
||||
r = -1;
|
||||
|
||||
return (0);
|
||||
BEV_UNLOCK(bufev);
|
||||
return r;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -284,6 +312,7 @@ bufferevent_setwatermark(struct bufferevent *bufev, short events,
|
||||
struct bufferevent_private *bufev_private =
|
||||
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
|
||||
|
||||
BEV_LOCK(bufev);
|
||||
if (events & EV_WRITE) {
|
||||
bufev->wm_write.low = lowmark;
|
||||
bufev->wm_write.high = highmark;
|
||||
@ -321,6 +350,7 @@ bufferevent_setwatermark(struct bufferevent *bufev, short events,
|
||||
bufferevent_wm_unsuspend_read(bufev);
|
||||
}
|
||||
}
|
||||
BEV_UNLOCK(bufev);
|
||||
}
|
||||
|
||||
int
|
||||
@ -328,10 +358,12 @@ bufferevent_flush(struct bufferevent *bufev,
|
||||
short iotype,
|
||||
enum bufferevent_flush_mode mode)
|
||||
{
|
||||
int r = -1;
|
||||
BEV_LOCK(bufev);
|
||||
if (bufev->be_ops->flush)
|
||||
return bufev->be_ops->flush(bufev, iotype, mode);
|
||||
else
|
||||
return -1;
|
||||
r = bufev->be_ops->flush(bufev, iotype, mode);
|
||||
BEV_UNLOCK(bufev);
|
||||
return r;
|
||||
}
|
||||
|
||||
void
|
||||
@ -347,5 +379,32 @@ bufferevent_free(struct bufferevent *bufev)
|
||||
|
||||
/* Free the actual allocated memory. */
|
||||
mm_free(bufev - bufev->be_ops->mem_offset);
|
||||
/* Free lock XXX */
|
||||
}
|
||||
|
||||
int
|
||||
bufferevent_enable_locking(struct bufferevent *bufev, void *lock)
|
||||
{
|
||||
#ifdef _EVENT_DISABLE_THREAD_SUPPORT
|
||||
return -1;
|
||||
#else
|
||||
if (BEV_UPCAST(bufev)->lock)
|
||||
return -1;
|
||||
|
||||
if (!lock) {
|
||||
EVTHREAD_ALLOC_LOCK(lock);
|
||||
if (!lock)
|
||||
return -1;
|
||||
BEV_UPCAST(bufev)->lock = lock;
|
||||
BEV_UPCAST(bufev)->own_lock = 1;
|
||||
} else {
|
||||
BEV_UPCAST(bufev)->lock = lock;
|
||||
BEV_UPCAST(bufev)->own_lock = 0;
|
||||
}
|
||||
evbuffer_enable_locking(bufev->input, lock);
|
||||
evbuffer_enable_locking(bufev->output, lock);
|
||||
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
@ -170,6 +170,7 @@ bufferevent_filter_new(struct bufferevent *underlying,
|
||||
void *ctx)
|
||||
{
|
||||
struct bufferevent_filtered *bufev_f;
|
||||
enum bufferevent_options tmp_options = options & ~BEV_OPT_THREADSAFE;
|
||||
|
||||
if (!input_filter)
|
||||
input_filter = be_null_filter;
|
||||
@ -181,10 +182,18 @@ bufferevent_filter_new(struct bufferevent *underlying,
|
||||
return NULL;
|
||||
|
||||
if (bufferevent_init_common(&bufev_f->bev, underlying->ev_base,
|
||||
&bufferevent_ops_filter, options) < 0) {
|
||||
&bufferevent_ops_filter, tmp_options) < 0) {
|
||||
mm_free(bufev_f);
|
||||
return NULL;
|
||||
}
|
||||
if (options & BEV_OPT_THREADSAFE) {
|
||||
void *lock = BEV_UPCAST(underlying)->lock;
|
||||
if (!lock) {
|
||||
bufferevent_enable_locking(underlying, NULL);
|
||||
lock = BEV_UPCAST(underlying)->lock;
|
||||
}
|
||||
bufferevent_enable_locking(downcast(bufev_f), lock);
|
||||
}
|
||||
|
||||
bufev_f->underlying = underlying;
|
||||
bufev_f->process_in = input_filter;
|
||||
|
@ -75,6 +75,7 @@ run_callback(struct deferred_cb *cb, void *arg)
|
||||
struct bufferevent_pair *bufev = arg;
|
||||
struct bufferevent *bev = downcast(bufev);
|
||||
|
||||
BEV_LOCK(bev);
|
||||
if (cb == &bufev->deferred_read_cb) {
|
||||
if (bev->readcb) {
|
||||
bev->readcb(bev, bev->cbarg);
|
||||
@ -84,6 +85,7 @@ run_callback(struct deferred_cb *cb, void *arg)
|
||||
bev->writecb(bev, bev->cbarg);
|
||||
}
|
||||
}
|
||||
BEV_UNLOCK(bev);
|
||||
}
|
||||
|
||||
static struct bufferevent_pair *
|
||||
@ -115,16 +117,22 @@ bufferevent_pair_new(struct event_base *base, enum bufferevent_options options,
|
||||
struct bufferevent *pair[2])
|
||||
{
|
||||
struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL;
|
||||
enum bufferevent_options tmp_options = options & ~BEV_OPT_THREADSAFE;
|
||||
|
||||
bufev1 = bufferevent_pair_elt_new(base, options);
|
||||
if (!bufev1)
|
||||
return -1;
|
||||
bufev2 = bufferevent_pair_elt_new(base, options);
|
||||
bufev2 = bufferevent_pair_elt_new(base, tmp_options);
|
||||
if (!bufev2) {
|
||||
bufferevent_free(downcast(bufev1));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (options & BEV_OPT_THREADSAFE) {
|
||||
/*XXXX check return */
|
||||
bufferevent_enable_locking(downcast(bufev2), bufev1->bev.lock);
|
||||
}
|
||||
|
||||
bufev1->partner = bufev2;
|
||||
bufev2->partner = bufev1;
|
||||
|
||||
|
@ -342,6 +342,7 @@ be_socket_flush(struct bufferevent *bev, short iotype,
|
||||
void
|
||||
bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd)
|
||||
{
|
||||
BEV_LOCK(bufev);
|
||||
assert(bufev->be_ops == &bufferevent_ops_socket);
|
||||
|
||||
event_del(&bufev->ev_read);
|
||||
@ -351,37 +352,48 @@ bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd)
|
||||
EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
|
||||
event_assign(&bufev->ev_write, bufev->ev_base, fd,
|
||||
EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
|
||||
BEV_UNLOCK(bufev);
|
||||
}
|
||||
|
||||
/* XXXX Should non-socket buffferevents support this? */
|
||||
int
|
||||
bufferevent_priority_set(struct bufferevent *bufev, int priority)
|
||||
{
|
||||
int r = -1;
|
||||
|
||||
BEV_LOCK(bufev);
|
||||
if (bufev->be_ops != &bufferevent_ops_socket)
|
||||
return -1;
|
||||
goto done;
|
||||
|
||||
if (event_priority_set(&bufev->ev_read, priority) == -1)
|
||||
return (-1);
|
||||
goto done;
|
||||
if (event_priority_set(&bufev->ev_write, priority) == -1)
|
||||
return (-1);
|
||||
goto done;
|
||||
|
||||
return (0);
|
||||
r = 0;
|
||||
done:
|
||||
BEV_UNLOCK(bufev);
|
||||
return r;
|
||||
}
|
||||
|
||||
/* XXXX Should non-socket buffferevents support this? */
|
||||
int
|
||||
bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
|
||||
{
|
||||
int res;
|
||||
int res = -1;
|
||||
|
||||
BEV_LOCK(bufev);
|
||||
if (bufev->be_ops != &bufferevent_ops_socket)
|
||||
return -1;
|
||||
goto done;
|
||||
|
||||
bufev->ev_base = base;
|
||||
|
||||
res = event_base_set(base, &bufev->ev_read);
|
||||
if (res == -1)
|
||||
return (res);
|
||||
goto done;
|
||||
|
||||
res = event_base_set(base, &bufev->ev_write);
|
||||
return (res);
|
||||
done:
|
||||
BEV_UNLOCK(bufev);
|
||||
return res;
|
||||
}
|
||||
|
@ -118,6 +118,7 @@ typedef void (*everrorcb)(struct bufferevent *bev, short what, void *ctx);
|
||||
|
||||
enum bufferevent_options {
|
||||
BEV_OPT_CLOSE_ON_FREE = (1<<0),
|
||||
BEV_OPT_THREADSAFE = (1<<1),
|
||||
};
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user