mirror of
https://github.com/cuberite/libevent.git
synced 2025-09-18 08:49:57 -04:00
Use finalization feature so bufferevents can avoid deadlocks
Since the bufferevents' events are now EV_FINALIZE (name pending), they won't deadlock. To clean up properly, though, we must use the finalization feature. This patch also split bufferevent deallocation into an "unlink" step that happens fast, and a "destruct" step that happens after finalization. More work is needed: there needs to be a way to specify a finalizer for the bufferevent's argument itself. Also, this finalizer business makes lots of the reference counting we were doing unnecessary. Also, more testing is needed.
This commit is contained in:
parent
9d893c97fa
commit
02fbf68770
18
buffer.c
18
buffer.c
@ -3345,3 +3345,21 @@ evbuffer_cb_unsuspend(struct evbuffer *buffer, struct evbuffer_cb_entry *cb)
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
int
|
||||||
|
evbuffer_get_callbacks_(struct evbuffer *buffer, struct event_callback **cbs,
|
||||||
|
int max_cbs)
|
||||||
|
{
|
||||||
|
int r = 0;
|
||||||
|
EVBUFFER_LOCK(buffer);
|
||||||
|
if (buffer->deferred_cbs) {
|
||||||
|
if (max_cbs < 1) {
|
||||||
|
r = -1;
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
cbs[0] = &buffer->deferred;
|
||||||
|
r = 1;
|
||||||
|
}
|
||||||
|
done:
|
||||||
|
EVBUFFER_UNLOCK(buffer);
|
||||||
|
return r;
|
||||||
|
}
|
||||||
|
@ -252,8 +252,11 @@ struct bufferevent_ops {
|
|||||||
*/
|
*/
|
||||||
int (*disable)(struct bufferevent *, short);
|
int (*disable)(struct bufferevent *, short);
|
||||||
|
|
||||||
|
/** DOCUMENT */
|
||||||
|
void (*unlink)(struct bufferevent *);
|
||||||
|
|
||||||
/** Free any storage and deallocate any extra data or structures used
|
/** Free any storage and deallocate any extra data or structures used
|
||||||
in this implementation.
|
in this implementation. DOCUMENT
|
||||||
*/
|
*/
|
||||||
void (*destruct)(struct bufferevent *);
|
void (*destruct)(struct bufferevent *);
|
||||||
|
|
||||||
|
@ -54,6 +54,7 @@
|
|||||||
#include "event2/bufferevent_struct.h"
|
#include "event2/bufferevent_struct.h"
|
||||||
#include "event2/bufferevent_compat.h"
|
#include "event2/bufferevent_compat.h"
|
||||||
#include "event2/event.h"
|
#include "event2/event.h"
|
||||||
|
#include "event-internal.h"
|
||||||
#include "log-internal.h"
|
#include "log-internal.h"
|
||||||
#include "mm-internal.h"
|
#include "mm-internal.h"
|
||||||
#include "bufferevent-internal.h"
|
#include "bufferevent-internal.h"
|
||||||
@ -61,7 +62,7 @@
|
|||||||
#include "util-internal.h"
|
#include "util-internal.h"
|
||||||
|
|
||||||
static void bufferevent_cancel_all_(struct bufferevent *bev);
|
static void bufferevent_cancel_all_(struct bufferevent *bev);
|
||||||
|
static void bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_);
|
||||||
|
|
||||||
void
|
void
|
||||||
bufferevent_suspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what)
|
bufferevent_suspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what)
|
||||||
@ -640,7 +641,9 @@ bufferevent_decref_and_unlock_(struct bufferevent *bufev)
|
|||||||
{
|
{
|
||||||
struct bufferevent_private *bufev_private =
|
struct bufferevent_private *bufev_private =
|
||||||
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
|
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
|
||||||
struct bufferevent *underlying;
|
int n_cbs = 0;
|
||||||
|
#define MAX_CBS 16
|
||||||
|
struct event_callback *cbs[MAX_CBS];
|
||||||
|
|
||||||
EVUTIL_ASSERT(bufev_private->refcnt > 0);
|
EVUTIL_ASSERT(bufev_private->refcnt > 0);
|
||||||
|
|
||||||
@ -649,6 +652,41 @@ bufferevent_decref_and_unlock_(struct bufferevent *bufev)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (bufev->be_ops->unlink)
|
||||||
|
bufev->be_ops->unlink(bufev);
|
||||||
|
|
||||||
|
/* Okay, we're out of references. Let's finalize this once all the
|
||||||
|
* callbacks are done running. */
|
||||||
|
cbs[0] = &bufev->ev_read.ev_evcallback;
|
||||||
|
cbs[1] = &bufev->ev_write.ev_evcallback;
|
||||||
|
cbs[2] = &bufev_private->deferred;
|
||||||
|
n_cbs = 3;
|
||||||
|
if (bufev_private->rate_limiting) {
|
||||||
|
struct event *e = &bufev_private->rate_limiting->refill_bucket_event;
|
||||||
|
if (event_initialized(e))
|
||||||
|
cbs[n_cbs++] = &e->ev_evcallback;
|
||||||
|
}
|
||||||
|
n_cbs += evbuffer_get_callbacks_(bufev->input, cbs+n_cbs, MAX_CBS-n_cbs);
|
||||||
|
n_cbs += evbuffer_get_callbacks_(bufev->output, cbs+n_cbs, MAX_CBS-n_cbs);
|
||||||
|
|
||||||
|
event_callback_finalize_many_(bufev->ev_base, n_cbs, cbs,
|
||||||
|
bufferevent_finalize_cb_);
|
||||||
|
|
||||||
|
#undef MAX_CBS
|
||||||
|
BEV_UNLOCK(bufev);
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_)
|
||||||
|
{
|
||||||
|
struct bufferevent *bufev = arg_;
|
||||||
|
struct bufferevent *underlying;
|
||||||
|
struct bufferevent_private *bufev_private =
|
||||||
|
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
|
||||||
|
|
||||||
|
BEV_LOCK(bufev);
|
||||||
underlying = bufferevent_get_underlying(bufev);
|
underlying = bufferevent_get_underlying(bufev);
|
||||||
|
|
||||||
/* Clean up the shared info */
|
/* Clean up the shared info */
|
||||||
@ -665,17 +703,13 @@ bufferevent_decref_and_unlock_(struct bufferevent *bufev)
|
|||||||
if (bufev_private->rate_limiting) {
|
if (bufev_private->rate_limiting) {
|
||||||
if (bufev_private->rate_limiting->group)
|
if (bufev_private->rate_limiting->group)
|
||||||
bufferevent_remove_from_rate_limit_group_internal_(bufev,0);
|
bufferevent_remove_from_rate_limit_group_internal_(bufev,0);
|
||||||
if (event_initialized(&bufev_private->rate_limiting->refill_bucket_event))
|
|
||||||
event_del(&bufev_private->rate_limiting->refill_bucket_event);
|
|
||||||
event_debug_unassign(&bufev_private->rate_limiting->refill_bucket_event);
|
|
||||||
mm_free(bufev_private->rate_limiting);
|
mm_free(bufev_private->rate_limiting);
|
||||||
bufev_private->rate_limiting = NULL;
|
bufev_private->rate_limiting = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
event_debug_unassign(&bufev->ev_read);
|
|
||||||
event_debug_unassign(&bufev->ev_write);
|
|
||||||
|
|
||||||
BEV_UNLOCK(bufev);
|
BEV_UNLOCK(bufev);
|
||||||
|
|
||||||
if (bufev_private->own_lock)
|
if (bufev_private->own_lock)
|
||||||
EVTHREAD_FREE_LOCK(bufev_private->lock,
|
EVTHREAD_FREE_LOCK(bufev_private->lock,
|
||||||
EVTHREAD_LOCKTYPE_RECURSIVE);
|
EVTHREAD_LOCKTYPE_RECURSIVE);
|
||||||
@ -695,8 +729,6 @@ bufferevent_decref_and_unlock_(struct bufferevent *bufev)
|
|||||||
*/
|
*/
|
||||||
if (underlying)
|
if (underlying)
|
||||||
bufferevent_decref_(underlying);
|
bufferevent_decref_(underlying);
|
||||||
|
|
||||||
return 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
@ -844,9 +876,9 @@ bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx)
|
|||||||
void
|
void
|
||||||
bufferevent_init_generic_timeout_cbs_(struct bufferevent *bev)
|
bufferevent_init_generic_timeout_cbs_(struct bufferevent *bev)
|
||||||
{
|
{
|
||||||
evtimer_assign(&bev->ev_read, bev->ev_base,
|
event_assign(&bev->ev_read, bev->ev_base, -1, EV_FINALIZE,
|
||||||
bufferevent_generic_read_timeout_cb, bev);
|
bufferevent_generic_read_timeout_cb, bev);
|
||||||
evtimer_assign(&bev->ev_write, bev->ev_base,
|
event_assign(&bev->ev_write, bev->ev_base, -1, EV_FINALIZE,
|
||||||
bufferevent_generic_write_timeout_cb, bev);
|
bufferevent_generic_write_timeout_cb, bev);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -93,6 +93,7 @@ const struct bufferevent_ops bufferevent_ops_async = {
|
|||||||
evutil_offsetof(struct bufferevent_async, bev.bev),
|
evutil_offsetof(struct bufferevent_async, bev.bev),
|
||||||
be_async_enable,
|
be_async_enable,
|
||||||
be_async_disable,
|
be_async_disable,
|
||||||
|
NULL, /* Unlink */
|
||||||
be_async_destruct,
|
be_async_destruct,
|
||||||
bufferevent_generic_adj_timeouts_,
|
bufferevent_generic_adj_timeouts_,
|
||||||
be_async_flush,
|
be_async_flush,
|
||||||
@ -384,11 +385,6 @@ be_async_destruct(struct bufferevent *bev)
|
|||||||
/* XXXX possible double-close */
|
/* XXXX possible double-close */
|
||||||
evutil_closesocket(fd);
|
evutil_closesocket(fd);
|
||||||
}
|
}
|
||||||
/* delete this in case non-blocking connect was used */
|
|
||||||
if (event_initialized(&bev->ev_write)) {
|
|
||||||
event_del(&bev->ev_write);
|
|
||||||
bufferevent_del_generic_timeout_cbs_(bev);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
|
/* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
|
||||||
|
@ -61,6 +61,7 @@
|
|||||||
/* prototypes */
|
/* prototypes */
|
||||||
static int be_filter_enable(struct bufferevent *, short);
|
static int be_filter_enable(struct bufferevent *, short);
|
||||||
static int be_filter_disable(struct bufferevent *, short);
|
static int be_filter_disable(struct bufferevent *, short);
|
||||||
|
static void be_filter_unlink(struct bufferevent *);
|
||||||
static void be_filter_destruct(struct bufferevent *);
|
static void be_filter_destruct(struct bufferevent *);
|
||||||
|
|
||||||
static void be_filter_readcb(struct bufferevent *, void *);
|
static void be_filter_readcb(struct bufferevent *, void *);
|
||||||
@ -99,6 +100,7 @@ const struct bufferevent_ops bufferevent_ops_filter = {
|
|||||||
evutil_offsetof(struct bufferevent_filtered, bev.bev),
|
evutil_offsetof(struct bufferevent_filtered, bev.bev),
|
||||||
be_filter_enable,
|
be_filter_enable,
|
||||||
be_filter_disable,
|
be_filter_disable,
|
||||||
|
be_filter_unlink,
|
||||||
be_filter_destruct,
|
be_filter_destruct,
|
||||||
bufferevent_generic_adj_timeouts_,
|
bufferevent_generic_adj_timeouts_,
|
||||||
be_filter_flush,
|
be_filter_flush,
|
||||||
@ -214,12 +216,10 @@ bufferevent_filter_new(struct bufferevent *underlying,
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
be_filter_destruct(struct bufferevent *bev)
|
be_filter_unlink(struct bufferevent *bev)
|
||||||
{
|
{
|
||||||
struct bufferevent_filtered *bevf = upcast(bev);
|
struct bufferevent_filtered *bevf = upcast(bev);
|
||||||
EVUTIL_ASSERT(bevf);
|
EVUTIL_ASSERT(bevf);
|
||||||
if (bevf->free_context)
|
|
||||||
bevf->free_context(bevf->context);
|
|
||||||
|
|
||||||
if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) {
|
if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) {
|
||||||
/* Yes, there is also a decref in bufferevent_decref_.
|
/* Yes, there is also a decref in bufferevent_decref_.
|
||||||
@ -242,8 +242,15 @@ be_filter_destruct(struct bufferevent *bev)
|
|||||||
BEV_SUSPEND_FILT_READ);
|
BEV_SUSPEND_FILT_READ);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bufferevent_del_generic_timeout_cbs_(bev);
|
static void
|
||||||
|
be_filter_destruct(struct bufferevent *bev)
|
||||||
|
{
|
||||||
|
struct bufferevent_filtered *bevf = upcast(bev);
|
||||||
|
EVUTIL_ASSERT(bevf);
|
||||||
|
if (bevf->free_context)
|
||||||
|
bevf->free_context(bevf->context);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
|
@ -326,6 +326,7 @@ struct bufferevent_openssl {
|
|||||||
|
|
||||||
static int be_openssl_enable(struct bufferevent *, short);
|
static int be_openssl_enable(struct bufferevent *, short);
|
||||||
static int be_openssl_disable(struct bufferevent *, short);
|
static int be_openssl_disable(struct bufferevent *, short);
|
||||||
|
static void be_openssl_unlink(struct bufferevent *);
|
||||||
static void be_openssl_destruct(struct bufferevent *);
|
static void be_openssl_destruct(struct bufferevent *);
|
||||||
static int be_openssl_adj_timeouts(struct bufferevent *);
|
static int be_openssl_adj_timeouts(struct bufferevent *);
|
||||||
static int be_openssl_flush(struct bufferevent *bufev,
|
static int be_openssl_flush(struct bufferevent *bufev,
|
||||||
@ -337,6 +338,7 @@ const struct bufferevent_ops bufferevent_ops_openssl = {
|
|||||||
evutil_offsetof(struct bufferevent_openssl, bev.bev),
|
evutil_offsetof(struct bufferevent_openssl, bev.bev),
|
||||||
be_openssl_enable,
|
be_openssl_enable,
|
||||||
be_openssl_disable,
|
be_openssl_disable,
|
||||||
|
be_openssl_unlink,
|
||||||
be_openssl_destruct,
|
be_openssl_destruct,
|
||||||
be_openssl_adj_timeouts,
|
be_openssl_adj_timeouts,
|
||||||
be_openssl_flush,
|
be_openssl_flush,
|
||||||
@ -977,9 +979,11 @@ set_open_callbacks(struct bufferevent_openssl *bev_ssl, evutil_socket_t fd)
|
|||||||
event_del(&bev->ev_write);
|
event_del(&bev->ev_write);
|
||||||
}
|
}
|
||||||
event_assign(&bev->ev_read, bev->ev_base, fd,
|
event_assign(&bev->ev_read, bev->ev_base, fd,
|
||||||
EV_READ|EV_PERSIST, be_openssl_readeventcb, bev_ssl);
|
EV_READ|EV_PERSIST|EV_FINALIZE,
|
||||||
|
be_openssl_readeventcb, bev_ssl);
|
||||||
event_assign(&bev->ev_write, bev->ev_base, fd,
|
event_assign(&bev->ev_write, bev->ev_base, fd,
|
||||||
EV_WRITE|EV_PERSIST, be_openssl_writeeventcb, bev_ssl);
|
EV_WRITE|EV_PERSIST|EV_FINALIZE,
|
||||||
|
be_openssl_writeeventcb, bev_ssl);
|
||||||
if (rpending)
|
if (rpending)
|
||||||
r1 = bufferevent_add_event_(&bev->ev_read, &bev->timeout_read);
|
r1 = bufferevent_add_event_(&bev->ev_read, &bev->timeout_read);
|
||||||
if (wpending)
|
if (wpending)
|
||||||
@ -1079,9 +1083,11 @@ set_handshake_callbacks(struct bufferevent_openssl *bev_ssl, evutil_socket_t fd)
|
|||||||
event_del(&bev->ev_write);
|
event_del(&bev->ev_write);
|
||||||
}
|
}
|
||||||
event_assign(&bev->ev_read, bev->ev_base, fd,
|
event_assign(&bev->ev_read, bev->ev_base, fd,
|
||||||
EV_READ|EV_PERSIST, be_openssl_handshakeeventcb, bev_ssl);
|
EV_READ|EV_PERSIST|EV_FINALIZE,
|
||||||
|
be_openssl_handshakeeventcb, bev_ssl);
|
||||||
event_assign(&bev->ev_write, bev->ev_base, fd,
|
event_assign(&bev->ev_write, bev->ev_base, fd,
|
||||||
EV_WRITE|EV_PERSIST, be_openssl_handshakeeventcb, bev_ssl);
|
EV_WRITE|EV_PERSIST|EV_FINALIZE,
|
||||||
|
be_openssl_handshakeeventcb, bev_ssl);
|
||||||
if (fd >= 0) {
|
if (fd >= 0) {
|
||||||
r1 = bufferevent_add_event_(&bev->ev_read, &bev->timeout_read);
|
r1 = bufferevent_add_event_(&bev->ev_read, &bev->timeout_read);
|
||||||
r2 = bufferevent_add_event_(&bev->ev_write, &bev->timeout_write);
|
r2 = bufferevent_add_event_(&bev->ev_write, &bev->timeout_write);
|
||||||
@ -1176,17 +1182,10 @@ be_openssl_disable(struct bufferevent *bev, short events)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
be_openssl_destruct(struct bufferevent *bev)
|
be_openssl_unlink(struct bufferevent *bev)
|
||||||
{
|
{
|
||||||
struct bufferevent_openssl *bev_ssl = upcast(bev);
|
struct bufferevent_openssl *bev_ssl = upcast(bev);
|
||||||
|
|
||||||
if (bev_ssl->underlying) {
|
|
||||||
bufferevent_del_generic_timeout_cbs_(bev);
|
|
||||||
} else {
|
|
||||||
event_del(&bev->ev_read);
|
|
||||||
event_del(&bev->ev_write);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (bev_ssl->bev.options & BEV_OPT_CLOSE_ON_FREE) {
|
if (bev_ssl->bev.options & BEV_OPT_CLOSE_ON_FREE) {
|
||||||
if (bev_ssl->underlying) {
|
if (bev_ssl->underlying) {
|
||||||
if (BEV_UPCAST(bev_ssl->underlying)->refcnt < 2) {
|
if (BEV_UPCAST(bev_ssl->underlying)->refcnt < 2) {
|
||||||
@ -1194,17 +1193,11 @@ be_openssl_destruct(struct bufferevent *bev)
|
|||||||
"bufferevent with too few references");
|
"bufferevent with too few references");
|
||||||
} else {
|
} else {
|
||||||
bufferevent_free(bev_ssl->underlying);
|
bufferevent_free(bev_ssl->underlying);
|
||||||
bev_ssl->underlying = NULL;
|
/* We still have a reference to it, since DOCUMENT. So we don't
|
||||||
|
* drop this. */
|
||||||
|
// bev_ssl->underlying = NULL;
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
evutil_socket_t fd = -1;
|
|
||||||
BIO *bio = SSL_get_wbio(bev_ssl->ssl);
|
|
||||||
if (bio)
|
|
||||||
fd = BIO_get_fd(bio, NULL);
|
|
||||||
if (fd >= 0)
|
|
||||||
evutil_closesocket(fd);
|
|
||||||
}
|
}
|
||||||
SSL_free(bev_ssl->ssl);
|
|
||||||
} else {
|
} else {
|
||||||
if (bev_ssl->underlying) {
|
if (bev_ssl->underlying) {
|
||||||
if (bev_ssl->underlying->errorcb == be_openssl_eventcb)
|
if (bev_ssl->underlying->errorcb == be_openssl_eventcb)
|
||||||
@ -1216,6 +1209,24 @@ be_openssl_destruct(struct bufferevent *bev)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
be_openssl_destruct(struct bufferevent *bev)
|
||||||
|
{
|
||||||
|
struct bufferevent_openssl *bev_ssl = upcast(bev);
|
||||||
|
|
||||||
|
if (bev_ssl->bev.options & BEV_OPT_CLOSE_ON_FREE) {
|
||||||
|
if (! bev_ssl->underlying) {
|
||||||
|
evutil_socket_t fd = -1;
|
||||||
|
BIO *bio = SSL_get_wbio(bev_ssl->ssl);
|
||||||
|
if (bio)
|
||||||
|
fd = BIO_get_fd(bio, NULL);
|
||||||
|
if (fd >= 0)
|
||||||
|
evutil_closesocket(fd);
|
||||||
|
}
|
||||||
|
SSL_free(bev_ssl->ssl);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
be_openssl_adj_timeouts(struct bufferevent *bev)
|
be_openssl_adj_timeouts(struct bufferevent *bev)
|
||||||
{
|
{
|
||||||
|
@ -267,7 +267,7 @@ be_pair_disable(struct bufferevent *bev, short events)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
be_pair_destruct(struct bufferevent *bev)
|
be_pair_unlink(struct bufferevent *bev)
|
||||||
{
|
{
|
||||||
struct bufferevent_pair *bev_p = upcast(bev);
|
struct bufferevent_pair *bev_p = upcast(bev);
|
||||||
|
|
||||||
@ -275,8 +275,6 @@ be_pair_destruct(struct bufferevent *bev)
|
|||||||
bev_p->partner->partner = NULL;
|
bev_p->partner->partner = NULL;
|
||||||
bev_p->partner = NULL;
|
bev_p->partner = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
bufferevent_del_generic_timeout_cbs_(bev);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
@ -327,7 +325,8 @@ const struct bufferevent_ops bufferevent_ops_pair = {
|
|||||||
evutil_offsetof(struct bufferevent_pair, bev.bev),
|
evutil_offsetof(struct bufferevent_pair, bev.bev),
|
||||||
be_pair_enable,
|
be_pair_enable,
|
||||||
be_pair_disable,
|
be_pair_disable,
|
||||||
be_pair_destruct,
|
be_pair_unlink,
|
||||||
|
NULL, /* be_pair_destruct, */
|
||||||
bufferevent_generic_adj_timeouts_,
|
bufferevent_generic_adj_timeouts_,
|
||||||
be_pair_flush,
|
be_pair_flush,
|
||||||
NULL, /* ctrl */
|
NULL, /* ctrl */
|
||||||
|
@ -609,8 +609,8 @@ bufferevent_set_rate_limit(struct bufferevent *bev,
|
|||||||
EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
|
EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
|
||||||
event_del(&rlim->refill_bucket_event);
|
event_del(&rlim->refill_bucket_event);
|
||||||
}
|
}
|
||||||
evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
|
event_assign(&rlim->refill_bucket_event, bev->ev_base,
|
||||||
bev_refill_callback_, bevp);
|
-1, EV_FINALIZE, bev_refill_callback_, bevp);
|
||||||
|
|
||||||
if (rlim->limit.read_limit > 0) {
|
if (rlim->limit.read_limit > 0) {
|
||||||
bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
|
bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
|
||||||
@ -654,7 +654,7 @@ bufferevent_rate_limit_group_new(struct event_base *base,
|
|||||||
|
|
||||||
ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
|
ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
|
||||||
|
|
||||||
event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
|
event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
|
||||||
bev_group_refill_callback_, g);
|
bev_group_refill_callback_, g);
|
||||||
/*XXXX handle event_add failure */
|
/*XXXX handle event_add failure */
|
||||||
event_add(&g->master_refill_event, &cfg->tick_timeout);
|
event_add(&g->master_refill_event, &cfg->tick_timeout);
|
||||||
@ -748,8 +748,8 @@ bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
|
|||||||
BEV_UNLOCK(bev);
|
BEV_UNLOCK(bev);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
|
event_assign(&rlim->refill_bucket_event, bev->ev_base,
|
||||||
bev_refill_callback_, bevp);
|
-1, EV_FINALIZE, bev_refill_callback_, bevp);
|
||||||
bevp->rate_limiting = rlim;
|
bevp->rate_limiting = rlim;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,6 +90,7 @@ const struct bufferevent_ops bufferevent_ops_socket = {
|
|||||||
evutil_offsetof(struct bufferevent_private, bev),
|
evutil_offsetof(struct bufferevent_private, bev),
|
||||||
be_socket_enable,
|
be_socket_enable,
|
||||||
be_socket_disable,
|
be_socket_disable,
|
||||||
|
NULL, /* unlink */
|
||||||
be_socket_destruct,
|
be_socket_destruct,
|
||||||
be_socket_adj_timeouts,
|
be_socket_adj_timeouts,
|
||||||
be_socket_flush,
|
be_socket_flush,
|
||||||
@ -338,9 +339,9 @@ bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
|
|||||||
evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
|
evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
|
||||||
|
|
||||||
event_assign(&bufev->ev_read, bufev->ev_base, fd,
|
event_assign(&bufev->ev_read, bufev->ev_base, fd,
|
||||||
EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
|
EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev);
|
||||||
event_assign(&bufev->ev_write, bufev->ev_base, fd,
|
event_assign(&bufev->ev_write, bufev->ev_base, fd,
|
||||||
EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
|
EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev);
|
||||||
|
|
||||||
evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
|
evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
|
||||||
|
|
||||||
@ -399,7 +400,7 @@ bufferevent_socket_connect(struct bufferevent *bev,
|
|||||||
* on a non-blocking connect() when ConnectEx() is unavailable. */
|
* on a non-blocking connect() when ConnectEx() is unavailable. */
|
||||||
if (BEV_IS_ASYNC(bev)) {
|
if (BEV_IS_ASYNC(bev)) {
|
||||||
event_assign(&bev->ev_write, bev->ev_base, fd,
|
event_assign(&bev->ev_write, bev->ev_base, fd,
|
||||||
EV_WRITE|EV_PERSIST, bufferevent_writecb, bev);
|
EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bev);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
bufferevent_setfd(bev, fd);
|
bufferevent_setfd(bev, fd);
|
||||||
@ -589,9 +590,6 @@ be_socket_destruct(struct bufferevent *bufev)
|
|||||||
|
|
||||||
fd = event_get_fd(&bufev->ev_read);
|
fd = event_get_fd(&bufev->ev_read);
|
||||||
|
|
||||||
event_del(&bufev->ev_read);
|
|
||||||
event_del(&bufev->ev_write);
|
|
||||||
|
|
||||||
if ((bufev_p->options & BEV_OPT_CLOSE_ON_FREE) && fd >= 0)
|
if ((bufev_p->options & BEV_OPT_CLOSE_ON_FREE) && fd >= 0)
|
||||||
EVUTIL_CLOSESOCKET(fd);
|
EVUTIL_CLOSESOCKET(fd);
|
||||||
}
|
}
|
||||||
@ -637,9 +635,9 @@ be_socket_setfd(struct bufferevent *bufev, evutil_socket_t fd)
|
|||||||
event_del(&bufev->ev_write);
|
event_del(&bufev->ev_write);
|
||||||
|
|
||||||
event_assign(&bufev->ev_read, bufev->ev_base, fd,
|
event_assign(&bufev->ev_read, bufev->ev_base, fd,
|
||||||
EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
|
EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev);
|
||||||
event_assign(&bufev->ev_write, bufev->ev_base, fd,
|
event_assign(&bufev->ev_write, bufev->ev_base, fd,
|
||||||
EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
|
EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev);
|
||||||
|
|
||||||
if (fd >= 0)
|
if (fd >= 0)
|
||||||
bufferevent_enable(bufev, bufev->enabled);
|
bufferevent_enable(bufev, bufev->enabled);
|
||||||
|
@ -327,6 +327,11 @@ void evbuffer_set_parent_(struct evbuffer *buf, struct bufferevent *bev);
|
|||||||
|
|
||||||
void evbuffer_invoke_callbacks_(struct evbuffer *buf);
|
void evbuffer_invoke_callbacks_(struct evbuffer *buf);
|
||||||
|
|
||||||
|
|
||||||
|
int evbuffer_get_callbacks_(struct evbuffer *buffer,
|
||||||
|
struct event_callback **cbs,
|
||||||
|
int max_cbs);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
Loading…
x
Reference in New Issue
Block a user