diff --git a/bufferevent_async.c b/bufferevent_async.c new file mode 100644 index 00000000..50cf448b --- /dev/null +++ b/bufferevent_async.c @@ -0,0 +1,233 @@ +/* + * Copyright (c) 2009 Niels Provos and Nick Mathewson + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "event-config.h" + +#ifdef _EVENT_HAVE_SYS_TIME_H +#include +#endif + +#include +#include +#include +#include +#include +#ifdef _EVENT_HAVE_STDARG_H +#include +#endif +#ifdef _EVENT_HAVE_UNISTD_H +#include +#endif + +#ifdef WIN32 +#include +#endif + +#include "event2/util.h" +#include "event2/bufferevent.h" +#include "event2/buffer.h" +#include "event2/buffer_compat.h" +#include "event2/bufferevent_struct.h" +#include "event2/bufferevent_compat.h" +#include "event2/event.h" +#include "log-internal.h" +#include "mm-internal.h" +#include "bufferevent-internal.h" +#include "util-internal.h" +#include "iocp-internal.h" + +/* prototypes */ +static int be_async_enable(struct bufferevent *, short); +static int be_async_disable(struct bufferevent *, short); +static void be_async_destruct(struct bufferevent *); +static void be_async_adj_timeouts(struct bufferevent *); +static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode); + +const struct bufferevent_ops bufferevent_ops_async = { + "socket", + 0, + be_async_enable, + be_async_disable, + be_async_destruct, + be_async_adj_timeouts, + be_async_flush, +}; + + +struct bufferevent_async { + struct bufferevent_private bev; + unsigned read_in_progress : 1; + unsigned write_in_progress : 1; +}; + +static inline struct bufferevent_async * +upcast(struct bufferevent *bev) +{ + struct bufferevent_async *bev_a; + if (bev->be_ops != &bufferevent_ops_async) + return NULL; + bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev); + assert(bev_a->bev.bev.be_ops == &bufferevent_ops_async); + return bev_a; +} + +static void +bev_async_consider_writing(struct bufferevent_async *b) +{ + /* Don't write if there's a write in progress, or we do not + * want to write. */ + if (b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE)) + return; + /* Don't write if there's nothing to write */ + if (!evbuffer_get_length(b->bev.bev.output)) + return; + + /* XXXX doesn't respect low-water mark very well. */ + if (evbuffer_launch_write(b->bev.bev.output, -1)) { + assert(0);/* XXX act sensibly. */ + } else { + b->write_in_progress = 1; + } +} + +static void +bev_async_consider_reading(struct bufferevent_async *b) +{ + size_t cur_size; + size_t read_high; + /* Don't read if there is a read in progress, or we do not + * want to read. */ + if (b->read_in_progress || !(b->bev.bev.enabled&EV_READ)) + return; + + /* Don't read if we're full */ + cur_size = evbuffer_get_length(b->bev.bev.input); + read_high = b->bev.bev.wm_read.high; + if (cur_size >= read_high) + return; + + if (evbuffer_launch_read(b->bev.bev.input, read_high-cur_size)) { + assert(0); + } else { + b->read_in_progress = 1; + } +} + +static void +be_async_outbuf_callback(struct evbuffer *buf, + const struct evbuffer_cb_info *cbinfo, + void *arg) +{ + struct bufferevent *bev = arg; + struct bufferevent_async *bev_async = upcast(bev); + /* If we successfully wrote from the outbuf, or we added data to the + * outbuf and were not writing before, we may want to write now. */ + + BEV_LOCK(bev); + if (cbinfo->n_deleted) { + /* XXXX can't detect 0-length write completion */ + bev_async->write_in_progress = 0; + } + + if (cbinfo->n_added || cbinfo->n_deleted) + bev_async_consider_writing(bev_async); + + if (cbinfo->n_deleted && + bev->writecb != NULL && + evbuffer_get_length(bev->output) <= bev->wm_write.low) + _bufferevent_run_writecb(bev); + + BEV_UNLOCK(bev); +} + +static void +be_async_inbuf_callback(struct evbuffer *buf, + const struct evbuffer_cb_info *cbinfo, + void *arg) +{ + struct bufferevent *bev = arg; + struct bufferevent_async *bev_async = upcast(bev); + + /* If we successfully read into the inbuf, or we drained data from + * the inbuf and were not reading before, we may want to read now */ + + BEV_UNLOCK(bev); + if (cbinfo->n_added) { + /* XXXX can't detect 0-length read completion */ + bev_async->read_in_progress = 0; + } + + if (cbinfo->n_added || cbinfo->n_deleted) + bev_async_consider_reading(bev_async); + + if (cbinfo->n_added && + evbuffer_get_length(bev->input) >= bev->wm_read.low && + bev->readcb != NULL) + _bufferevent_run_readcb(bev); + + BEV_UNLOCK(bev); +} + +static int +be_async_enable(struct bufferevent *buf, short what) +{ + struct bufferevent_async *bev_async = upcast(buf); + + /* If we newly enable reading or writing, and we aren't reading or + writing already, consider launching a new read or write. */ + + if (what & EV_READ) + bev_async_consider_reading(bev_async); + if (what & EV_WRITE) + bev_async_consider_writing(bev_async); + return 0; +} + +static int +be_async_disable(struct bufferevent *bev, short what) +{ + /* XXXX If we disable reading or writing, we may want to consider + * canceling any in-progress read or write operation, though it might + * not work. */ + return 0; +} + +static void +be_async_destruct(struct bufferevent *bev) +{ +} +static void +be_async_adj_timeouts(struct bufferevent *bev) +{ +} +static int +be_async_flush(struct bufferevent *bev, short what, + enum bufferevent_flush_mode mode) +{ + return 0; +}