Move responsibility for IOCP callback into bufferevent_async.

This patch from Chris Davis saves some callback depth, and adds proper
ref-counting to bufferevents when there's a deferred evbuffer callback
inflight.  It could use a couple more comments to really nail down what
its invariants are.

svn:r1543
This commit is contained in:
Nick Mathewson 2009-11-17 20:31:09 +00:00
parent 201d8d0baf
commit d7d1f1da09
9 changed files with 305 additions and 136 deletions

View File

@ -78,6 +78,9 @@
#include "event2/event.h"
#include "event2/buffer.h"
#include "event2/buffer_compat.h"
#include "event2/bufferevent.h"
#include "event2/bufferevent_compat.h"
#include "event2/bufferevent_struct.h"
#include "event2/thread.h"
#include "event-config.h"
#include "log-internal.h"
@ -85,6 +88,7 @@
#include "util-internal.h"
#include "evthread-internal.h"
#include "evbuffer-internal.h"
#include "bufferevent-internal.h"
/* some systems do not have MAP_FAILED */
#ifndef MAP_FAILED
@ -276,6 +280,13 @@ _evbuffer_incref(struct evbuffer *buf)
EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
}
void
_evbuffer_incref_and_lock(struct evbuffer *buf)
{
EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
++buf->refcnt;
}
int
evbuffer_defer_callbacks(struct evbuffer *buffer, struct event_base *base)
{
@ -312,6 +323,14 @@ evbuffer_enable_locking(struct evbuffer *buf, void *lock)
#endif
}
void
evbuffer_set_parent(struct evbuffer *buf, struct bufferevent *bev)
{
EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
buf->parent = bev;
EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
}
static void
evbuffer_run_callbacks(struct evbuffer *buffer)
{
@ -362,7 +381,10 @@ evbuffer_invoke_callbacks(struct evbuffer *buffer)
if (buffer->deferred_cbs) {
if (buffer->deferred.queued)
return;
_evbuffer_incref(buffer);
_evbuffer_incref_and_lock(buffer);
if (buffer->parent)
bufferevent_incref(buffer->parent);
EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE);
event_deferred_cb_schedule(buffer->cb_queue, &buffer->deferred);
} else {
evbuffer_run_callbacks(buffer);
@ -372,13 +394,17 @@ evbuffer_invoke_callbacks(struct evbuffer *buffer)
static void
evbuffer_deferred_callback(struct deferred_cb *cb, void *arg)
{
struct bufferevent *parent = NULL;
struct evbuffer *buffer = arg;
/* XXXX It would be better to run these callbacks without holding the
* lock */
EVBUFFER_LOCK(buffer, EVTHREAD_WRITE);
parent = buffer->parent;
evbuffer_run_callbacks(buffer);
_evbuffer_decref_and_unlock(buffer);
if (parent)
bufferevent_free(parent);
}
static void

View File

@ -48,34 +48,23 @@
#define MAX_WSABUFS 16
/** Wrapper for an OVERLAPPED that holds the necessary info to notice
when an overlapped read or write is done on an evbuffer.
**/
struct buffer_overlapped {
struct event_overlapped event_overlapped;
/** The first pinned chain in the buffer. */
struct evbuffer_chain *first_pinned;
/** The buffer itself. */
struct evbuffer_overlapped *buf;
/** How many chains are pinned; how many of the fields in buffers
* are we using. */
int n_buffers;
WSABUF buffers[MAX_WSABUFS];
};
/** An evbuffer that can handle overlapped IO. */
struct evbuffer_overlapped {
struct evbuffer buffer;
/** The socket that we're doing overlapped IO on. */
evutil_socket_t fd;
/** True iff we have scheduled a write. */
unsigned write_in_progress : 1;
/** True iff we have scheduled a read. */
unsigned read_in_progress : 1;
struct buffer_overlapped read_info;
struct buffer_overlapped write_info;
/** pending I/O type */
unsigned read_in_progress : 1;
unsigned write_in_progress : 1;
/** The first pinned chain in the buffer. */
struct evbuffer_chain *first_pinned;
/** How many chains are pinned; how many of the fields in buffers
* are we using. */
int n_buffers;
WSABUF buffers[MAX_WSABUFS];
};
/** Given an evbuffer, return the correponding evbuffer structure, or NULL if
@ -88,52 +77,40 @@ upcast_evbuffer(struct evbuffer *buf)
return EVUTIL_UPCAST(buf, struct evbuffer_overlapped, buffer);
}
static inline struct buffer_overlapped *
upcast_overlapped(struct event_overlapped *o)
{
return EVUTIL_UPCAST(o, struct buffer_overlapped, event_overlapped);
}
/** Unpin all the chains noted as pinned in 'eo'. */
static void
pin_release(struct event_overlapped *eo, unsigned flag)
pin_release(struct evbuffer_overlapped *eo, unsigned flag)
{
int i;
struct buffer_overlapped *bo = upcast_overlapped(eo);
struct evbuffer_chain *chain = bo->first_pinned;
struct evbuffer_chain *chain = eo->first_pinned;
for (i = 0; i < bo->n_buffers; ++i) {
for (i = 0; i < eo->n_buffers; ++i) {
EVUTIL_ASSERT(chain);
_evbuffer_chain_unpin(chain, flag);
chain = chain->next;
}
}
/** IOCP callback invoked when a read operation is finished. */
static void
read_completed(struct event_overlapped *eo, uintptr_t _, ev_ssize_t nBytes, int ok)
void
evbuffer_commit_read(struct evbuffer *evbuf, ev_ssize_t nBytes)
{
struct buffer_overlapped *buf_o = upcast_overlapped(eo);
struct evbuffer_overlapped *buf = buf_o->buf;
struct evbuffer *evbuf = &buf->buffer;
struct evbuffer_overlapped *buf = upcast_evbuffer(evbuf);
struct evbuffer_iovec iov[2];
int n_vec;
// XXXX use ok
EVBUFFER_LOCK(evbuf, EVTHREAD_WRITE);
EVUTIL_ASSERT(buf->read_in_progress && !buf->write_in_progress);
EVUTIL_ASSERT(nBytes >= 0); // XXXX Can this be false?
EVBUFFER_LOCK(evbuf, EVTHREAD_WRITE);
buf->read_in_progress = 0;
evbuffer_unfreeze(evbuf, 0);
iov[0].iov_base = buf_o->buffers[0].buf;
if ((size_t)nBytes <= buf_o->buffers[0].len) {
iov[0].iov_base = buf->buffers[0].buf;
if ((size_t)nBytes <= buf->buffers[0].len) {
iov[0].iov_len = nBytes;
n_vec = 1;
} else {
iov[0].iov_len = buf_o->buffers[0].len;
iov[1].iov_base = buf_o->buffers[1].buf;
iov[0].iov_len = buf->buffers[0].len;
iov[1].iov_base = buf->buffers[1].buf;
iov[1].iov_len = nBytes - iov[0].iov_len;
n_vec = 2;
}
@ -141,26 +118,24 @@ read_completed(struct event_overlapped *eo, uintptr_t _, ev_ssize_t nBytes, int
if (evbuffer_commit_space(evbuf, iov, n_vec) < 0)
EVUTIL_ASSERT(0); /* XXXX fail nicer. */
pin_release(eo, EVBUFFER_MEM_PINNED_R);
pin_release(buf, EVBUFFER_MEM_PINNED_R);
buf->read_in_progress = 0;
_evbuffer_decref_and_unlock(evbuf);
}
/** IOCP callback invoked when a write operation is finished. */
static void
write_completed(struct event_overlapped *eo, uintptr_t _, ev_ssize_t nBytes, int ok)
void
evbuffer_commit_write(struct evbuffer *evbuf, ev_ssize_t nBytes)
{
// XXX use ok
struct buffer_overlapped *buf_o = upcast_overlapped(eo);
struct evbuffer_overlapped *buf = buf_o->buf;
struct evbuffer *evbuf = &buf->buffer;
struct evbuffer_overlapped *buf = upcast_evbuffer(evbuf);
EVBUFFER_LOCK(evbuf, EVTHREAD_WRITE);
buf->write_in_progress = 0;
EVUTIL_ASSERT(buf->write_in_progress && !buf->read_in_progress);
evbuffer_unfreeze(evbuf, 1);
evbuffer_drain(evbuf, nBytes);
pin_release(eo,EVBUFFER_MEM_PINNED_W);
pin_release(buf,EVBUFFER_MEM_PINNED_W);
buf->write_in_progress = 0;
_evbuffer_decref_and_unlock(evbuf);
}
@ -181,7 +156,8 @@ evbuffer_overlapped_new(evutil_socket_t fd)
}
int
evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most)
evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most,
struct event_overlapped *ol)
{
struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
int r = -1;
@ -195,6 +171,7 @@ evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most)
}
EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
EVUTIL_ASSERT(!buf_o->read_in_progress);
if (buf->freeze_start || buf_o->write_in_progress)
goto done;
if (!buf->total_len) {
@ -206,14 +183,14 @@ evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most)
}
evbuffer_freeze(buf, 1);
/* XXX we could move much of this into the constructor. */
memset(&buf_o->write_info, 0, sizeof(buf_o->write_info));
buf_o->write_info.buf = buf_o;
buf_o->write_info.event_overlapped.cb = write_completed;
chain = buf_o->write_info.first_pinned = buf->first;
buf_o->first_pinned = 0;
buf_o->n_buffers = 0;
memset(buf_o->buffers, 0, sizeof(buf_o->buffers));
chain = buf_o->first_pinned = buf->first;
for (i=0; i < MAX_WSABUFS && chain; ++i, chain=chain->next) {
WSABUF *b = &buf_o->write_info.buffers[i];
WSABUF *b = &buf_o->buffers[i];
b->buf = chain->buffer + chain->misalign;
_evbuffer_chain_pin(chain, EVBUFFER_MEM_PINNED_W);
@ -227,14 +204,14 @@ evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most)
}
}
buf_o->write_info.n_buffers = i;
buf_o->n_buffers = i;
_evbuffer_incref(buf);
if (WSASend(buf_o->fd, buf_o->write_info.buffers, i, &bytesSent, 0,
&buf_o->write_info.event_overlapped.overlapped, NULL)) {
if (WSASend(buf_o->fd, buf_o->buffers, i, &bytesSent, 0,
&ol->overlapped, NULL)) {
int error = WSAGetLastError();
if (error != WSA_IO_PENDING) {
/* An actual error. */
pin_release(&buf_o->write_info.event_overlapped, EVBUFFER_MEM_PINNED_W);
pin_release(buf_o, EVBUFFER_MEM_PINNED_W);
evbuffer_unfreeze(buf, 1);
evbuffer_free(buf); /* decref */
goto done;
@ -249,7 +226,8 @@ done:
}
int
evbuffer_launch_read(struct evbuffer *buf, size_t at_most)
evbuffer_launch_read(struct evbuffer *buf, size_t at_most,
struct event_overlapped *ol)
{
struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
int r = -1, i;
@ -263,28 +241,28 @@ evbuffer_launch_read(struct evbuffer *buf, size_t at_most)
if (!buf_o)
return -1;
EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
EVUTIL_ASSERT(!buf_o->write_in_progress);
if (buf->freeze_end || buf_o->read_in_progress)
goto done;
buf_o->first_pinned = 0;
buf_o->n_buffers = 0;
memset(buf_o->buffers, 0, sizeof(buf_o->buffers));
if (_evbuffer_expand_fast(buf, at_most) == -1)
goto done;
evbuffer_freeze(buf, 0);
/* XXX we could move much of this into the constructor. */
memset(&buf_o->read_info, 0, sizeof(buf_o->read_info));
buf_o->read_info.buf = buf_o;
buf_o->read_info.event_overlapped.cb = read_completed;
nvecs = _evbuffer_read_setup_vecs(buf, at_most,
vecs, &chain, 1);
for (i=0;i<nvecs;++i) {
WSABUF_FROM_EVBUFFER_IOV(
&buf_o->read_info.buffers[i],
&buf_o->buffers[i],
&vecs[i]);
}
buf_o->read_info.n_buffers = nvecs;
buf_o->read_info.first_pinned = chain;
buf_o->n_buffers = nvecs;
buf_o->first_pinned = chain;
npin=0;
for ( ; chain; chain = chain->next) {
_evbuffer_chain_pin(chain, EVBUFFER_MEM_PINNED_R);
@ -293,11 +271,12 @@ evbuffer_launch_read(struct evbuffer *buf, size_t at_most)
EVUTIL_ASSERT(npin == nvecs);
_evbuffer_incref(buf);
if (WSARecv(buf_o->fd, buf_o->read_info.buffers, nvecs, &bytesRead, &flags, &buf_o->read_info.event_overlapped.overlapped, NULL)) {
if (WSARecv(buf_o->fd, buf_o->buffers, nvecs, &bytesRead, &flags,
&ol->overlapped, NULL)) {
int error = WSAGetLastError();
if (error != WSA_IO_PENDING) {
/* An actual error. */
pin_release(&buf_o->read_info.event_overlapped, EVBUFFER_MEM_PINNED_R);
pin_release(buf_o, EVBUFFER_MEM_PINNED_R);
evbuffer_unfreeze(buf, 0);
evbuffer_free(buf); /* decref */
goto done;

View File

@ -47,14 +47,16 @@
#include <errno.h>
#include "event2/util.h"
#include "event2/bufferevent.h"
#include "event2/buffer.h"
#include "event2/buffer_compat.h"
#include "event2/bufferevent.h"
#include "event2/bufferevent_struct.h"
#include "event2/bufferevent_compat.h"
#include "event2/event.h"
#include "log-internal.h"
#include "mm-internal.h"
#include "bufferevent-internal.h"
#include "evbuffer-internal.h"
#include "util-internal.h"
void
@ -257,6 +259,9 @@ bufferevent_init_common(struct bufferevent_private *bufev_private,
bufev_private->options = options;
evbuffer_set_parent(bufev->input, bufev);
evbuffer_set_parent(bufev->output, bufev);
return 0;
}
@ -494,6 +499,9 @@ _bufferevent_decref_and_unlock(struct bufferevent *bufev)
if (bufev->be_ops->destruct)
bufev->be_ops->destruct(bufev);
/* XXX what happens if refcnt for these buffers is > 1?
* The buffers can share a lock with this bufferevent object,
* but the lock might be destroyed below. */
/* evbuffer will free the callbacks */
evbuffer_free(bufev->input);
evbuffer_free(bufev->output);
@ -631,7 +639,7 @@ _bufferevent_init_generic_timeout_cbs(struct bufferevent *bev)
{
evtimer_assign(&bev->ev_read, bev->ev_base,
bufferevent_generic_read_timeout_cb, bev);
evtimer_assign(&bev->ev_read, bev->ev_base,
evtimer_assign(&bev->ev_write, bev->ev_base,
bufferevent_generic_write_timeout_cb, bev);
}

View File

@ -80,8 +80,11 @@ const struct bufferevent_ops bufferevent_ops_async = {
struct bufferevent_async {
struct bufferevent_private bev;
struct event_overlapped connect_overlapped;
struct event_overlapped read_overlapped;
struct event_overlapped write_overlapped;
unsigned read_in_progress : 1;
unsigned write_in_progress : 1;
unsigned ok : 1;
};
static inline struct bufferevent_async *
@ -91,16 +94,33 @@ upcast(struct bufferevent *bev)
if (bev->be_ops != &bufferevent_ops_async)
return NULL;
bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
EVUTIL_ASSERT(bev_a->bev.bev.be_ops == &bufferevent_ops_async);
return bev_a;
}
static inline struct bufferevent_async *
upcast_overlapped(struct event_overlapped *eo)
upcast_connect(struct event_overlapped *eo)
{
struct bufferevent_async *bev_a;
bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
EVUTIL_ASSERT(bev_a->bev.bev.be_ops == &bufferevent_ops_async);
EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
return bev_a;
}
static inline struct bufferevent_async *
upcast_read(struct event_overlapped *eo)
{
struct bufferevent_async *bev_a;
bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
return bev_a;
}
static inline struct bufferevent_async *
upcast_write(struct event_overlapped *eo)
{
struct bufferevent_async *bev_a;
bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
return bev_a;
}
@ -109,14 +129,15 @@ bev_async_consider_writing(struct bufferevent_async *b)
{
/* Don't write if there's a write in progress, or we do not
* want to write. */
if (b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE))
if (!b->ok || b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE))
return;
/* Don't write if there's nothing to write */
if (!evbuffer_get_length(b->bev.bev.output))
return;
/* XXXX doesn't respect low-water mark very well. */
if (evbuffer_launch_write(b->bev.bev.output, -1)) {
if (evbuffer_launch_write(b->bev.bev.output, -1,
&b->write_overlapped)) {
EVUTIL_ASSERT(0);/* XXX act sensibly. */
} else {
b->write_in_progress = 1;
@ -131,7 +152,7 @@ bev_async_consider_reading(struct bufferevent_async *b)
size_t at_most;
/* Don't read if there is a read in progress, or we do not
* want to read. */
if (b->read_in_progress || !(b->bev.bev.enabled&EV_READ))
if (!b->ok || b->read_in_progress || !(b->bev.bev.enabled&EV_READ))
return;
/* Don't read if we're full */
@ -145,7 +166,8 @@ bev_async_consider_reading(struct bufferevent_async *b)
at_most = 16384; /* FIXME totally magic. */
}
if (evbuffer_launch_read(b->bev.bev.input, at_most)) {
if (evbuffer_launch_read(b->bev.bev.input, at_most,
&b->read_overlapped)) {
EVUTIL_ASSERT(0);
} else {
b->read_in_progress = 1;
@ -159,26 +181,15 @@ be_async_outbuf_callback(struct evbuffer *buf,
{
struct bufferevent *bev = arg;
struct bufferevent_async *bev_async = upcast(bev);
/* If we successfully wrote from the outbuf, or we added data to the
* outbuf and were not writing before, we may want to write now. */
/* If we added data to the outbuf and were not writing before,
* we may want to write now. */
_bufferevent_incref_and_lock(bev);
if (cbinfo->n_deleted) {
/* XXXX can't detect 0-length write completion */
bev_async->write_in_progress = 0;
}
if (cbinfo->n_added || cbinfo->n_deleted)
if (cbinfo->n_added)
bev_async_consider_writing(bev_async);
if (cbinfo->n_deleted) {
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
if (bev->writecb != NULL &&
evbuffer_get_length(bev->output) <= bev->wm_write.low)
_bufferevent_run_writecb(bev);
}
_bufferevent_decref_and_unlock(bev);
}
@ -190,26 +201,14 @@ be_async_inbuf_callback(struct evbuffer *buf,
struct bufferevent *bev = arg;
struct bufferevent_async *bev_async = upcast(bev);
/* If we successfully read into the inbuf, or we drained data from
* the inbuf and were not reading before, we may want to read now */
/* If we drained data from the inbuf and were not reading before,
* we may want to read now */
_bufferevent_incref_and_lock(bev);
if (cbinfo->n_added) {
/* XXXX can't detect 0-length read completion */
bev_async->read_in_progress = 0;
}
if (cbinfo->n_added || cbinfo->n_deleted)
if (cbinfo->n_deleted)
bev_async_consider_reading(bev_async);
if (cbinfo->n_added) {
BEV_RESET_GENERIC_READ_TIMEOUT(bev);
if (evbuffer_get_length(bev->input) >= bev->wm_read.low &&
bev->readcb != NULL)
_bufferevent_run_readcb(bev);
}
_bufferevent_decref_and_unlock(bev);
}
@ -218,6 +217,10 @@ be_async_enable(struct bufferevent *buf, short what)
{
struct bufferevent_async *bev_async = upcast(buf);
if (!bev_async->ok)
return -1;
/* NOTE: This interferes with non-blocking connect */
_bufferevent_generic_adj_timeouts(buf);
/* If we newly enable reading or writing, and we aren't reading or
@ -245,6 +248,17 @@ be_async_disable(struct bufferevent *bev, short what)
static void
be_async_destruct(struct bufferevent *bev)
{
struct bufferevent_private *bev_p = BEV_UPCAST(bev);
evutil_socket_t fd;
EVUTIL_ASSERT(!upcast(bev)->write_in_progress && !upcast(bev)->read_in_progress);
/* XXX cancel any outstanding I/O operations */
fd = _evbuffer_overlapped_get_fd(bev->input);
/* delete this in case non-blocking connect was used */
event_del(&bev->ev_write);
if (bev_p->options & BEV_OPT_CLOSE_ON_FREE)
EVUTIL_CLOSESOCKET(fd);
_bufferevent_del_generic_timeout_cbs(bev);
}
@ -259,20 +273,87 @@ static void
connect_complete(struct event_overlapped *eo, uintptr_t key,
ev_ssize_t nbytes, int ok)
{
struct bufferevent_async *bev_a = upcast_overlapped(eo);
struct bufferevent *bev = &bev_a->bev.bev; /* XXX locking issue ? */
struct bufferevent_async *bev_a = upcast_connect(eo);
struct bufferevent *bev = &bev_a->bev.bev;
_bufferevent_incref_and_lock(bev);
EVUTIL_ASSERT(bev_a->bev.connecting);
bev_a->bev.connecting = 0;
bufferevent_async_set_connected(bev);
_bufferevent_run_eventcb(bev,
ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR);
_bufferevent_decref_and_unlock(bev);
}
static void
read_complete(struct event_overlapped *eo, uintptr_t key,
ev_ssize_t nbytes, int ok)
{
struct bufferevent_async *bev_a = upcast_read(eo);
struct bufferevent *bev = &bev_a->bev.bev;
short what = BEV_EVENT_READING;
_bufferevent_incref_and_lock(bev);
EVUTIL_ASSERT(bev_a->ok && bev_a->read_in_progress);
evbuffer_commit_read(bev->input, nbytes);
bev_a->read_in_progress = 0;
if (ok && nbytes) {
BEV_RESET_GENERIC_READ_TIMEOUT(bev);
if (bev->readcb != NULL &&
evbuffer_get_length(bev->input) >= bev->wm_read.low)
_bufferevent_run_readcb(bev);
bev_async_consider_reading(bev_a);
} else if (!ok) {
what |= BEV_EVENT_ERROR;
bev_a->ok = 0;
_bufferevent_run_eventcb(bev, what);
} else if (!nbytes) {
what |= BEV_EVENT_EOF;
bev_a->ok = 0;
_bufferevent_run_eventcb(bev, what);
}
_bufferevent_decref_and_unlock(bev);
}
static void
write_complete(struct event_overlapped *eo, uintptr_t key,
ev_ssize_t nbytes, int ok)
{
struct bufferevent_async *bev_a = upcast_write(eo);
struct bufferevent *bev = &bev_a->bev.bev;
short what = BEV_EVENT_WRITING;
_bufferevent_incref_and_lock(bev);
EVUTIL_ASSERT(bev_a->ok && bev_a->write_in_progress);
evbuffer_commit_write(bev->output, nbytes);
bev_a->write_in_progress = 0;
if (ok && nbytes) {
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
if (bev->writecb != NULL &&
evbuffer_get_length(bev->output) <= bev->wm_write.low)
_bufferevent_run_writecb(bev);
bev_async_consider_writing(bev_a);
} else if (!ok) {
what |= BEV_EVENT_ERROR;
bev_a->ok = 0;
_bufferevent_run_eventcb(bev, what);
} else if (!nbytes) {
what |= BEV_EVENT_EOF;
bev_a->ok = 0;
_bufferevent_run_eventcb(bev, what);
}
_bufferevent_decref_and_unlock(bev);
}
struct bufferevent *
bufferevent_async_new(struct event_base *base,
evutil_socket_t fd, int options)
@ -318,10 +399,11 @@ bufferevent_async_new(struct event_base *base,
evbuffer_defer_callbacks(bev->input, base);
evbuffer_defer_callbacks(bev->output, base);
evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
_bufferevent_init_generic_timeout_cbs(&bev_a->bev.bev);
event_overlapped_init(&bev_a->connect_overlapped, connect_complete);
event_overlapped_init(&bev_a->read_overlapped, read_complete);
event_overlapped_init(&bev_a->write_overlapped, write_complete);
bev_a->ok = fd >= 0;
return bev;
err:
@ -329,6 +411,16 @@ err:
return NULL;
}
void
bufferevent_async_set_connected(struct bufferevent *bev)
{
struct bufferevent_async *bev_async = upcast(bev);
bev_async->ok = 1;
_bufferevent_init_generic_timeout_cbs(bev);
/* Now's a good time to consider reading/writing */
be_async_enable(bev, bev->enabled);
}
int
bufferevent_async_can_connect(struct bufferevent *bev)
{
@ -369,7 +461,7 @@ bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd,
sin6->sin6_family = AF_INET6;
sin6->sin6_addr = in6addr_any;
} else {
/* XXX: what to do? */
/* Well, the user will have to bind() */
return -1;
}
if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&

View File

@ -212,8 +212,18 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
goto done;
} else {
connected = 1;
_bufferevent_run_eventcb(bufev, BEV_EVENT_CONNECTED);
if (!(bufev->enabled & EV_WRITE) || BEV_IS_ASYNC(bufev)) {
#ifdef WIN32
if (BEV_IS_ASYNC(bufev)) {
event_del(&bufev->ev_write);
bufferevent_async_set_connected(bufev);
_bufferevent_run_eventcb(bufev,
BEV_EVENT_CONNECTED);
goto done;
}
#endif
_bufferevent_run_eventcb(bufev,
BEV_EVENT_CONNECTED);
if (!(bufev->enabled & EV_WRITE)) {
event_del(&bufev->ev_write);
goto done;
}

View File

@ -66,6 +66,7 @@ struct evbuffer_cb_entry {
#endif
};
struct bufferevent;
struct evbuffer_chain;
struct evbuffer {
/** The first chain in this buffer's linked list of chains. */
@ -135,6 +136,10 @@ struct evbuffer {
/** A doubly-linked-list of callback functions */
TAILQ_HEAD(evbuffer_cb_queue, evbuffer_cb_entry) callbacks;
/** The parent bufferevent object this evbuffer belongs to.
* NULL if the evbuffer stands alone. */
struct bufferevent *parent;
};
/** A single item in an evbuffer. */
@ -245,6 +250,8 @@ struct evbuffer_chain_reference {
/** Increase the reference count of buf by one. */
void _evbuffer_incref(struct evbuffer *buf);
/** Increase the reference count of buf by one and acquire the lock. */
void _evbuffer_incref_and_lock(struct evbuffer *buf);
/** Pin a single buffer chain using a given flag. A pinned chunk may not be
* moved or freed until it is unpinned. */
void _evbuffer_chain_pin(struct evbuffer_chain *chain, unsigned flag);
@ -273,6 +280,9 @@ int _evbuffer_read_setup_vecs(struct evbuffer *buf, ev_ssize_t howmuch,
(i)->len = (ei)->iov_len; \
} while(0)
/** Set the parent bufferevent object for buf to bev */
void evbuffer_set_parent(struct evbuffer *buf, struct bufferevent *bev);
#ifdef __cplusplus
}
#endif

View File

@ -124,24 +124,32 @@ void _evbuffer_overlapped_set_fd(struct evbuffer *buf, evutil_socket_t fd);
An evbuffer can only have one read pending at a time. While the read
is in progress, no other data may be added to the end of the buffer.
The buffer must be created with event_overlapped_init().
evbuffer_commit_read() must be called in the completion callback.
@param buf The buffer to read onto
@param n The number of bytes to try to read.
@param ol Overlapped object with associated completion callback.
@return 0 on success, -1 on error.
*/
int evbuffer_launch_read(struct evbuffer *, size_t n);
int evbuffer_launch_read(struct evbuffer *buf, size_t n, struct event_overlapped *ol);
/** Start writing data from the start of an evbuffer.
An evbuffer can only have one write pending at a time. While the write is
in progress, no other data may be removed from the front of the buffer.
The buffer must be created with event_overlapped_init().
evbuffer_commit_write() must be called in the completion callback.
@param buf The buffer to read onto
@param n The number of bytes to try to read.
@param ol Overlapped object with associated completion callback.
@return 0 on success, -1 on error.
*/
int evbuffer_launch_write(struct evbuffer *, ev_ssize_t n);
int evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t n, struct event_overlapped *ol);
/** XXX document */
void evbuffer_commit_read(struct evbuffer *, ev_ssize_t);
void evbuffer_commit_write(struct evbuffer *, ev_ssize_t);
/** Create an IOCP, and launch its worker threads. Internal use only.
@ -179,6 +187,7 @@ struct bufferevent *bufferevent_async_new(struct event_base *base,
evutil_socket_t fd, int options);
/* FIXME document. */
void bufferevent_async_set_connected(struct bufferevent *bev);
int bufferevent_async_can_connect(struct bufferevent *bev);
int bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd,
const struct sockaddr *sa, int socklen);

View File

@ -497,6 +497,13 @@ test_bufferevent_connect(void *arg)
bufferevent_enable(bev1, EV_READ);
bufferevent_enable(bev2, EV_READ);
#ifdef WIN32
/* FIXME this is to get IOCP to work. it shouldn't be required. */
{
struct timeval tv = {5000,0};
event_base_loopexit(data->base, &tv);
}
#endif
event_base_dispatch(data->base);
tt_int_op(n_strings_read, ==, 2);
@ -580,6 +587,13 @@ test_bufferevent_connect_fail(void *arg)
event_add(&close_listener_event, &one_second);
close_listener_event_added = 1;
#ifdef WIN32
/* FIXME this is to get IOCP to work. it shouldn't be required. */
{
struct timeval tv = {5000,0};
event_base_loopexit(data->base, &tv);
}
#endif
event_base_dispatch(data->base);
tt_int_op(test_ok, ==, 1);
@ -628,7 +642,6 @@ struct testcase_t bufferevent_iocp_testcases[] = {
LEGACY(bufferevent, TT_ISOLATED|TT_ENABLE_IOCP),
LEGACY(bufferevent_watermarks, TT_ISOLATED|TT_ENABLE_IOCP),
LEGACY(bufferevent_filters, TT_ISOLATED|TT_ENABLE_IOCP),
#if 0
{ "bufferevent_connect", test_bufferevent_connect,
TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"" },
{ "bufferevent_connect_defer", test_bufferevent_connect,
@ -639,14 +652,11 @@ struct testcase_t bufferevent_iocp_testcases[] = {
{ "bufferevent_connect_lock_defer", test_bufferevent_connect,
TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup,
(void*)"defer lock" },
#endif
{ "bufferevent_connect_fail", test_bufferevent_connect_fail,
TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, NULL },
#if 0
{ "bufferevent_connect_nonblocking", test_bufferevent_connect,
TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup,
(void*)"unset_connectex" },
#endif
END_OF_TESTCASES,
};

View File

@ -152,15 +152,40 @@ end:
;
}
static struct evbuffer *rbuf = NULL, *wbuf = NULL;
static void
read_complete(struct event_overlapped *eo, uintptr_t key,
ev_ssize_t nbytes, int ok)
{
tt_assert(ok);
evbuffer_commit_read(rbuf, nbytes);
end:
;
}
static void
write_complete(struct event_overlapped *eo, uintptr_t key,
ev_ssize_t nbytes, int ok)
{
tt_assert(ok);
evbuffer_commit_write(wbuf, nbytes);
end:
;
}
static void
test_iocp_evbuffer(void *ptr)
{
struct event_overlapped rol, wol;
struct basic_test_data *data = ptr;
struct event_iocp_port *port = NULL;
struct evbuffer *rbuf = NULL, *wbuf = NULL;
char junk[1024];
int i;
event_overlapped_init(&rol, read_complete);
event_overlapped_init(&wol, write_complete);
#ifdef WIN32
evthread_use_windows_threads();
#endif
@ -185,8 +210,8 @@ test_iocp_evbuffer(void *ptr)
evbuffer_add(wbuf, junk, sizeof(junk));
tt_assert(!evbuffer_get_length(rbuf));
tt_assert(!evbuffer_launch_write(wbuf, 512));
tt_assert(!evbuffer_launch_read(rbuf, 2048));
tt_assert(!evbuffer_launch_write(wbuf, 512, &wol));
tt_assert(!evbuffer_launch_read(rbuf, 2048, &rol));
#ifdef WIN32
/* FIXME this is stupid. */