From ea4b8724c088934412401bdf32f52f2e2f1840b1 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 2 Feb 2009 19:22:13 +0000 Subject: [PATCH] checkpoint work on big bufferevent refactoring svn:r1095 --- Makefile.am | 3 +- bufferevent-internal.h | 62 ++- bufferevent.c | 693 ++++++---------------------- bufferevent_filter.c | 435 +++++++++++++++++ bufferevent_sock.c | 375 +++++++++++++++ event.h | 1 + http.c | 1 + include/Makefile.am | 5 +- include/event2/buffer.h | 5 +- include/event2/bufferevent.h | 228 ++++----- include/event2/bufferevent_compat.h | 81 ++++ include/event2/bufferevent_struct.h | 44 +- test/Makefile.am | 1 + test/regress.c | 78 ++-- test/regress.h | 7 + test/regress_bufferevent.c | 79 ++++ test/regress_main.c | 1 + test/regress_zlib.c | 135 ++++-- 18 files changed, 1441 insertions(+), 793 deletions(-) create mode 100644 bufferevent_filter.c create mode 100644 bufferevent_sock.c create mode 100644 include/event2/bufferevent_compat.h create mode 100644 test/regress_bufferevent.c diff --git a/Makefile.am b/Makefile.am index 7b56870e..71b2fa84 100644 --- a/Makefile.am +++ b/Makefile.am @@ -87,7 +87,8 @@ event-config.h: config.h -e 's/#ifndef /#ifndef _EVENT_/' < config.h >> $@ echo "#endif" >> $@ -CORE_SRC = event.c buffer.c bufferevent.c \ +CORE_SRC = event.c buffer.c \ + bufferevent.c bufferevent_sock.c bufferevent_filter.c \ evmap.c log.c evutil.c strlcpy.c $(SYS_SRC) EXTRA_SRC = event_tagging.c http.c evdns.c evrpc.c diff --git a/bufferevent-internal.h b/bufferevent-internal.h index 1fdb2be7..a08dfe86 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -33,26 +33,62 @@ extern "C" { #include "event-config.h" #include "evutil.h" -struct bufferevent_filter { - /** allows chaining of filters */ - TAILQ_ENTRY(bufferevent_filter) (next); +/** + Implementation table for a bufferevent: holds function pointers and other + information to make the various bufferevent types work. +*/ +struct bufferevent_ops { + /** The name of the bufferevent's type. */ + const char *type; + /** At what offset into the implementation type will we find a + bufferevent structure? - /** used for intermediary state either on the input or output path */ - struct evbuffer *buffer; + Example: if the type is implemented as + struct bufferevent_x { + int extra_data; + struct bufferevent bev; + } + then mem_offset should be offsetof(struct bufferevent_x, bev) + */ + off_t mem_offset; - /** initializes the context provided to process */ - void (*init_context)(void *); + /** Enables one or more of EV_READ|EV_WRITE on a bufferevent. Does + not need to adjust the 'enabled' field. Returns 0 on success, -1 + on failure. + */ + int (*enable)(struct bufferevent *, short); - /** frees any context related to ctx */ - void (*free_context)(void *); + /** Disables one or more of EV_READ|EV_WRITE on a bufferevent. Does + not need to adjust the 'enabled' field. Returns 0 on success, -1 + on failure. + */ + int (*disable)(struct bufferevent *, short); - enum bufferevent_filter_result (*process)( - struct evbuffer *src, struct evbuffer *dst, - enum bufferevent_filter_state flags, void *ctx); + /** Free any storage and deallocate any extra data or structures used + in this implementation. + */ + void (*destruct)(struct bufferevent *); - void *ctx; + /** Called when the timeouts on the bufferevent have changed.*/ + void (*adj_timeouts)(struct bufferevent *); + + /** Called to flush data. */ + int (*flush)(struct bufferevent *, short, enum bufferevent_flush_mode); }; +extern const struct bufferevent_ops be_ops_socket; +extern const struct bufferevent_ops be_ops_filter; + +/** Initialize the shared parts of a bufferevent. */ +int bufferevent_init_common(struct bufferevent *, struct event_base *, const struct bufferevent_ops *, enum bufferevent_options options); + +/** For internal use: temporarily stop all reads on bufev, because its + * read buffer is too full. */ +void bufferevent_wm_suspend_read(struct bufferevent *bufev); +/** For internal use: temporarily stop all reads on bufev, because its + * read buffer is too full. */ +void bufferevent_wm_unsuspend_read(struct bufferevent *bufev); + #ifdef __cplusplus } #endif diff --git a/bufferevent.c b/bufferevent.c index 8bf1d750..a0dd4b83 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -34,7 +34,6 @@ #ifdef _EVENT_HAVE_SYS_TIME_H #include #endif -#include #include #include @@ -53,253 +52,74 @@ #include "event2/buffer.h" #include "event2/buffer_compat.h" #include "event2/bufferevent_struct.h" +#include "event2/bufferevent_compat.h" #include "event2/event.h" #include "log-internal.h" #include "mm-internal.h" #include "bufferevent-internal.h" #include "util-internal.h" -/* prototypes */ -static void bufferevent_read_pressure_cb( - struct evbuffer *, size_t, size_t, void *); -static int bufferevent_process_filters( - struct bufferevent_filter *, struct evbuffer *, - enum bufferevent_filter_state); - -static int -bufferevent_add(struct event *ev, int timeout) +void +bufferevent_wm_suspend_read(struct bufferevent *bufev) { - struct timeval tv, *ptv = NULL; - - if (timeout) { - evutil_timerclear(&tv); - tv.tv_sec = timeout; - ptv = &tv; + if (!bufev->read_suspended) { + bufev->be_ops->disable(bufev, EV_READ); + bufev->read_suspended = 1; } - - return (event_add(ev, ptv)); } -/* - * This callback is executed when the size of the input buffer changes. - * We use it to apply back pressure on the reading side. - */ - -static void -bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now, - void *arg) { - struct bufferevent *bufev = arg; - /* - * If we are now below the watermark, then we don't need the callback any - * more, and we can read again. - */ - if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) { - evbuffer_setcb(buf, NULL, NULL); - +void +bufferevent_wm_unsuspend_read(struct bufferevent *bufev) +{ + if (bufev->read_suspended) { if (bufev->enabled & EV_READ) - bufferevent_add(&bufev->ev_read, bufev->timeout_read); + bufev->be_ops->enable(bufev, EV_READ); + bufev->read_suspended = 0; } } +/* Callback to implement watermarks on the input buffer. Only enabled + * if the watermark is set. */ static void -bufferevent_read_closure(struct bufferevent *bufev, int progress) -{ - size_t len; - - /* nothing user visible changed? */ - if (!progress) - return; - - /* See if this callbacks meets the water marks */ - len = EVBUFFER_LENGTH(bufev->input); - if (bufev->wm_read.low != 0 && len < bufev->wm_read.low) - return; - - /* For read pressure, we use the buffer exposed to the users. - * Filters can arbitrarily change the data that users get to see, - * in particular, a user might select a watermark that is smaller - * then what a filter needs to make progress. - */ - if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) { - event_del(&bufev->ev_read); - - /* Now schedule a callback for us when the buffer changes */ - evbuffer_setcb(bufev->input, - bufferevent_read_pressure_cb, bufev); - } - - /* Invoke the user callback - must always be called last */ - if (bufev->readcb != NULL) - (*bufev->readcb)(bufev, bufev->cbarg); -} - -static void -bufferevent_readcb(evutil_socket_t fd, short event, void *arg) +bufferevent_inbuf_wm_cb(struct evbuffer *buf, size_t old, size_t now, + void *arg) { struct bufferevent *bufev = arg; - struct evbuffer *input; - int res = 0, progress = 1; - short what = EVBUFFER_READ; - int howmuch = -1; - if (event == EV_TIMEOUT) { - what |= EVBUFFER_TIMEOUT; - goto error; + if (now > old) { + /* Data got added. If it put us over the watermark, stop + * reading. */ + if (now >= bufev->wm_read.high) + bufferevent_wm_suspend_read(bufev); + } else { + /* Data got removed. If it puts us under the watermark, + stop reading. */ + if (now < bufev->wm_read.high) + bufferevent_wm_unsuspend_read(bufev); } - - if (TAILQ_FIRST(&bufev->input_filters) != NULL) - input = TAILQ_FIRST(&bufev->input_filters)->buffer; - else - input = bufev->input; - - /* - * If we have a high watermark configured then we don't want to - * read more data than would make us reach the watermark. - */ - if (bufev->wm_read.high != 0) { - howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(input); - /* we might have lowered the watermark, stop reading */ - if (howmuch <= 0) { - event_del(&bufev->ev_read); - evbuffer_setcb(input, - bufferevent_read_pressure_cb, bufev); - return; - } - } - - res = evbuffer_read(input, fd, howmuch); - - if (res == -1) { - int err = evutil_socket_geterror(fd); - if (EVUTIL_ERR_RW_RETRIABLE(err)) - goto reschedule; - /* error case */ - what |= EVBUFFER_ERROR; - } else if (res == 0) { - /* eof case */ - what |= EVBUFFER_EOF; - } - - if (TAILQ_FIRST(&bufev->input_filters) != NULL) { - int state = BEV_NORMAL; - if (what & EVBUFFER_EOF) - state = BEV_FLUSH; - /* XXX(niels): what to do about EVBUFFER_ERROR? */ - progress = bufferevent_process_filters( - TAILQ_FIRST(&bufev->input_filters), - bufev->input, - state); - - /* propagate potential errors to the user */ - if (progress == -1) { - res = -1; - what |= EVBUFFER_ERROR; - } - } - - if (res <= 0) - goto error; - - bufferevent_read_closure(bufev, progress); - return; - - reschedule: - return; - - error: - event_del(&bufev->ev_read); - (*bufev->errorcb)(bufev, what, bufev->cbarg); - } -static void -bufferevent_writecb(evutil_socket_t fd, short event, void *arg) +int +bufferevent_init_common(struct bufferevent *bufev, struct event_base *base, + const struct bufferevent_ops *ops, + enum bufferevent_options options) { - struct bufferevent *bufev = arg; - int res = 0; - short what = EVBUFFER_WRITE; - - if (event == EV_TIMEOUT) { - what |= EVBUFFER_TIMEOUT; - goto error; - } - - if (EVBUFFER_LENGTH(bufev->output)) { - res = evbuffer_write(bufev->output, fd); - if (res == -1) { - int err = evutil_socket_geterror(fd); - /* XXXX we used to check for EINPROGRESS here too, but I - don't think write can set that. -nick */ - if (EVUTIL_ERR_RW_RETRIABLE(err)) - goto reschedule; - what |= EVBUFFER_ERROR; - } else if (res == 0) { - /* eof case */ - what |= EVBUFFER_EOF; - } - if (res <= 0) - goto error; - } - - if (EVBUFFER_LENGTH(bufev->output) == 0) - event_del(&bufev->ev_write); - - /* - * Invoke the user callback if our buffer is drained or below the - * low watermark. - */ - if (bufev->writecb != NULL && - EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low) - (*bufev->writecb)(bufev, bufev->cbarg); - - return; - - reschedule: - if (EVBUFFER_LENGTH(bufev->output) == 0) - event_del(&bufev->ev_write); - return; - - error: - event_del(&bufev->ev_write); - (*bufev->errorcb)(bufev, what, bufev->cbarg); -} - -/* - * Create a new buffered event object. - * - * The read callback is invoked whenever we read new data. - * The write callback is invoked whenever the output buffer is drained. - * The error callback is invoked on a write/read error or on EOF. - * - * Both read and write callbacks maybe NULL. The error callback is not - * allowed to be NULL and have to be provided always. - */ - -struct bufferevent * -bufferevent_new(evutil_socket_t fd, evbuffercb readcb, evbuffercb writecb, - everrorcb errorcb, void *cbarg) -{ - struct bufferevent *bufev; - - if ((bufev = mm_calloc(1, sizeof(struct bufferevent))) == NULL) - return (NULL); - - if ((bufev->input = evbuffer_new()) == NULL) { - mm_free(bufev); - return (NULL); - } + if ((bufev->input = evbuffer_new()) == NULL) + return -1; if ((bufev->output = evbuffer_new()) == NULL) { evbuffer_free(bufev->input); - mm_free(bufev); - return (NULL); + return -1; } - event_set(&bufev->ev_read, fd, EV_READ|EV_PERSIST, bufferevent_readcb, bufev); - event_set(&bufev->ev_write, fd, EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev); + bufev->ev_base = base; - bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg); + /* Disable timeouts. */ + evutil_timerclear(&bufev->timeout_read); + evutil_timerclear(&bufev->timeout_write); + + bufev->be_ops = ops; /* * Set to EV_WRITE so that using bufferevent_write is going to @@ -308,10 +128,9 @@ bufferevent_new(evutil_socket_t fd, evbuffercb readcb, evbuffercb writecb, */ bufev->enabled = EV_WRITE; - TAILQ_INIT(&bufev->input_filters); - TAILQ_INIT(&bufev->output_filters); + bufev->options = options; - return (bufev); + return 0; } void @@ -325,114 +144,16 @@ bufferevent_setcb(struct bufferevent *bufev, bufev->cbarg = cbarg; } -void -bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd) -{ - struct bufferevent_filter *filter; - - event_del(&bufev->ev_read); - event_del(&bufev->ev_write); - - event_assign(&bufev->ev_read, bufev->ev_base, fd, EV_READ|EV_PERSIST, bufferevent_readcb, bufev); - event_assign(&bufev->ev_write, bufev->ev_base, fd, EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev); - - /* we need to free all filter contexts and then init them again */ - TAILQ_FOREACH(filter, &bufev->input_filters, next) { - if (filter->free_context) - filter->free_context(filter->ctx); - if (filter->init_context) - filter->init_context(filter->ctx); - } - - TAILQ_FOREACH(filter, &bufev->output_filters, next) { - if (filter->free_context) - filter->free_context(filter->ctx); - if (filter->init_context) - filter->init_context(filter->ctx); - } - - /* might have to manually trigger event registration */ -} - struct evbuffer * bufferevent_get_input(struct bufferevent *bufev) { - return (bufev->input); + return bufev->input; } struct evbuffer * bufferevent_get_output(struct bufferevent *bufev) { - return TAILQ_FIRST(&bufev->output_filters) != NULL ? - TAILQ_FIRST(&bufev->output_filters)->buffer : - bufev->output; -} - -int -bufferevent_priority_set(struct bufferevent *bufev, int priority) -{ - if (event_priority_set(&bufev->ev_read, priority) == -1) - return (-1); - if (event_priority_set(&bufev->ev_write, priority) == -1) - return (-1); - - return (0); -} - -/* Closing the file descriptor is the responsibility of the caller */ - -void -bufferevent_free(struct bufferevent *bufev) -{ - struct bufferevent_filter *filter; - - event_del(&bufev->ev_read); - event_del(&bufev->ev_write); - - evbuffer_free(bufev->input); - evbuffer_free(bufev->output); - - /* free input and output filters */ - while ((filter = TAILQ_FIRST(&bufev->input_filters)) != NULL) { - bufferevent_filter_remove(bufev, BEV_INPUT, filter); - - bufferevent_filter_free(filter); - } - - while ((filter = TAILQ_FIRST(&bufev->output_filters)) != NULL) { - bufferevent_filter_remove(bufev, BEV_OUTPUT, filter); - - bufferevent_filter_free(filter); - } - - mm_free(bufev); -} -/* - * Executes filters on the written data and schedules a network write if - * necessary. - */ -static inline int -bufferevent_write_closure(struct bufferevent *bufev, int progress) -{ - /* if no data was written, we do not need to do anything */ - if (!progress) - return (0); - - if (TAILQ_FIRST(&bufev->output_filters) != NULL) { - progress = bufferevent_process_filters( - TAILQ_FIRST(&bufev->output_filters), - bufev->output, BEV_NORMAL); - if (progress == -1) { - (*bufev->errorcb)(bufev, EVBUFFER_ERROR, bufev->cbarg); - return (-1); - } - } - - /* If everything is okay, we need to schedule a write */ - if (bufev->enabled & EV_WRITE) - bufferevent_add(&bufev->ev_write, bufev->timeout_write); - - return (0); + return bufev->output; } /* @@ -443,34 +164,19 @@ bufferevent_write_closure(struct bufferevent *bufev, int progress) int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) { - struct evbuffer *output; - - if (TAILQ_FIRST(&bufev->output_filters) != NULL) - output = TAILQ_FIRST(&bufev->output_filters)->buffer; - else - output = bufev->output; - - if (evbuffer_add(output, data, size) == -1) + if (evbuffer_add(bufev->output, data, size) == -1) return (-1); - return (bufferevent_write_closure(bufev, size > 0)); + return 0; } int bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf) { - int len = EVBUFFER_LENGTH(buf); - struct evbuffer *output; - - if (TAILQ_FIRST(&bufev->output_filters) != NULL) - output = TAILQ_FIRST(&bufev->output_filters)->buffer; - else - output = bufev->output; - - if (evbuffer_add_buffer(output, buf) == -1) + if (evbuffer_add_buffer(bufev->output, buf) == -1) return (-1); - return (bufferevent_write_closure(bufev, len > 0)); + return 0; } size_t @@ -488,51 +194,74 @@ bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf) int bufferevent_enable(struct bufferevent *bufev, short event) { - if (event & EV_READ) { - if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1) - return (-1); - } - if (event & EV_WRITE) { - if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1) - return (-1); - } + short impl_events = event; + if (bufev->read_suspended) + impl_events &= ~EV_READ; bufev->enabled |= event; + + if (bufev->be_ops->enable(bufev, impl_events) < 0) + return -1; + return (0); } +void +bufferevent_set_timeouts(struct bufferevent *bufev, + const struct timeval *tv_read, + const struct timeval *tv_write) +{ + if (tv_read) { + bufev->timeout_read = *tv_read; + } else { + evutil_timerclear(&bufev->timeout_read); + } + if (tv_write) { + bufev->timeout_write = *tv_write; + } else { + evutil_timerclear(&bufev->timeout_write); + } + + if (bufev->be_ops->adj_timeouts) + bufev->be_ops->adj_timeouts(bufev); +} + + +/* Obsolete; use bufferevent_set_timeouts */ +void +bufferevent_settimeout(struct bufferevent *bufev, + int timeout_read, int timeout_write) +{ + struct timeval tv_read, tv_write; + struct timeval *ptv_read = NULL, *ptv_write = NULL; + + memset(&tv_read, 0, sizeof(tv_read)); + memset(&tv_write, 0, sizeof(tv_write)); + + if (timeout_read) { + tv_read.tv_sec = timeout_read; + ptv_read = &tv_read; + } + if (timeout_write) { + tv_write.tv_sec = timeout_write; + ptv_write = &tv_write; + } + + bufferevent_set_timeouts(bufev, ptv_read, ptv_write); +} + + int bufferevent_disable(struct bufferevent *bufev, short event) { - if (event & EV_READ) { - if (event_del(&bufev->ev_read) == -1) - return (-1); - } - if (event & EV_WRITE) { - if (event_del(&bufev->ev_write) == -1) - return (-1); - } - bufev->enabled &= ~event; + + if (bufev->be_ops->disable(bufev, event) < 0) + return (-1); + return (0); } -/* - * Sets the read and write timeout for a buffered event. - */ - -void -bufferevent_settimeout(struct bufferevent *bufev, - int timeout_read, int timeout_write) { - bufev->timeout_read = timeout_read; - bufev->timeout_write = timeout_write; - - if (event_pending(&bufev->ev_read, EV_READ, NULL)) - bufferevent_add(&bufev->ev_read, timeout_read); - if (event_pending(&bufev->ev_write, EV_WRITE, NULL)) - bufferevent_add(&bufev->ev_write, timeout_write); -} - /* * Sets the water marks */ @@ -541,185 +270,69 @@ void bufferevent_setwatermark(struct bufferevent *bufev, short events, size_t lowmark, size_t highmark) { - if (events & EV_READ) { - bufev->wm_read.low = lowmark; - bufev->wm_read.high = highmark; - } if (events & EV_WRITE) { bufev->wm_write.low = lowmark; bufev->wm_write.high = highmark; } - /* If the watermarks changed then see if we should call read again */ - bufferevent_read_pressure_cb(bufev->input, - 0, EVBUFFER_LENGTH(bufev->input), bufev); + if (events & EV_READ) { + bufev->wm_read.low = lowmark; + bufev->wm_read.high = highmark; + + if (highmark) { + /* There is now a new high-water mark for read. + enable the callback if needed, and see if we should + suspend/bufferevent_wm_unsuspend. */ + + if (bufev->read_watermarks_cb == NULL) { + bufev->read_watermarks_cb = + evbuffer_add_cb(bufev->input, + bufferevent_inbuf_wm_cb, + bufev); + } + evbuffer_cb_set_flags(bufev->input, + bufev->read_watermarks_cb, + EVBUFFER_CB_ENABLED); + + if (EVBUFFER_LENGTH(bufev->input) > highmark) + bufferevent_wm_suspend_read(bufev); + else if (EVBUFFER_LENGTH(bufev->input) < highmark) + bufferevent_wm_unsuspend_read(bufev); + } else { + /* There is now no high-water mark for read. */ + if (bufev->read_watermarks_cb) + evbuffer_cb_set_flags(bufev->input, + bufev->read_watermarks_cb, + EVBUFFER_CB_DISABLED); + bufferevent_wm_unsuspend_read(bufev); + } + } } int -bufferevent_base_set(struct event_base *base, struct bufferevent *bufev) +bufferevent_flush(struct bufferevent *bufev, + short iotype, + enum bufferevent_flush_mode mode) { - int res; - - bufev->ev_base = base; - - res = event_base_set(base, &bufev->ev_read); - if (res == -1) - return (res); - - res = event_base_set(base, &bufev->ev_write); - return (res); -} - -/* - * Filtering stuff - */ - -struct bufferevent_filter * -bufferevent_filter_new( - void (*init_context)(void *ctx), - void (*free_context)(void *ctx), - enum bufferevent_filter_result (*process)( - struct evbuffer *src, struct evbuffer *dst, - enum bufferevent_filter_state flags, void *ctx), void *ctx) -{ - struct bufferevent_filter *filter; - - if ((filter = mm_malloc(sizeof(struct bufferevent_filter))) == NULL) - return (NULL); - - if ((filter->buffer = evbuffer_new()) == NULL) { - mm_free(filter); - return (NULL); - } - - filter->init_context = init_context; - filter->free_context = free_context; - filter->process = process; - filter->ctx = ctx; - - return (filter); + if (bufev->be_ops->flush) + return bufev->be_ops->flush(bufev, iotype, mode); + else + return -1; } void -bufferevent_filter_free(struct bufferevent_filter *filter) +bufferevent_free(struct bufferevent *bufev) { - evbuffer_free(filter->buffer); - mm_free(filter); + /* Clean up the shared info */ + if (bufev->be_ops->destruct) + bufev->be_ops->destruct(bufev); + + /* evbuffer will free the callbacks */ + evbuffer_free(bufev->input); + evbuffer_free(bufev->output); + + /* Free the actual allocated memory. */ + mm_free(bufev - bufev->be_ops->mem_offset); } -void -bufferevent_filter_insert(struct bufferevent *bufev, - enum bufferevent_filter_type filter_type, - struct bufferevent_filter *filter) -{ - switch (filter_type) { - case BEV_INPUT: - TAILQ_INSERT_TAIL(&bufev->input_filters, filter, next); - break; - case BEV_OUTPUT: - TAILQ_INSERT_HEAD(&bufev->output_filters, filter, next); - break; - default: - event_errx(1, "illegal filter type %d", filter_type); - } - - if (filter->init_context) - filter->init_context(filter->ctx); -} - -void -bufferevent_filter_remove(struct bufferevent *bufev, - enum bufferevent_filter_type filter_type, - struct bufferevent_filter *filter) -{ - switch (filter_type) { - case BEV_INPUT: - TAILQ_REMOVE(&bufev->input_filters, filter, next); - break; - case BEV_OUTPUT: - TAILQ_REMOVE(&bufev->output_filters, filter, next); - break; - default: - event_errx(1, "illegal filter type %d", filter_type); - } - - evbuffer_drain(filter->buffer, -1); - - if (filter->free_context) - filter->free_context(filter->ctx); - -} - -static int -bufferevent_process_filters( - struct bufferevent_filter *filter, struct evbuffer *final, - enum bufferevent_filter_state state) -{ - struct evbuffer *src, *dst; - struct bufferevent_filter *next; - int len = EVBUFFER_LENGTH(final); - - for (; filter != NULL; filter = next) { - int res; - - next = TAILQ_NEXT(filter, next); - src = filter->buffer; - dst = next != NULL ? next->buffer : final; - - res = (*filter->process)(src, dst, state, filter->ctx); - - /* an error causes complete termination of the bufferevent */ - if (res == BEV_ERROR) - return (-1); - - /* a read filter indicated that it cannot produce - * further data, we do not need to invoke any - * subsequent filters. Unless, a flush or something - * similar was specifically requested. - */ - if (res == BEV_NEED_MORE && state == BEV_NORMAL) - return (0); - } - - /* we made user visible progress if the buffer size changed */ - return (EVBUFFER_LENGTH(final) != len); -} - -int -bufferevent_trigger_filter(struct bufferevent *bufev, - struct bufferevent_filter *filter, int iotype, - enum bufferevent_filter_state state) -{ - struct evbuffer *dst = iotype == BEV_INPUT ? - bufev->input : bufev->output; - int progress; - - /* trigger all filters if filter is not specified */ - if (filter == NULL) { - struct bufferevent_filterq *head = BEV_INPUT ? - &bufev->input_filters : &bufev->output_filters; - filter = TAILQ_FIRST(head); - } - - progress = bufferevent_process_filters(filter, dst, state); - if (progress == -1) { - (*bufev->errorcb)(bufev, EVBUFFER_ERROR, bufev->cbarg); - return (-1); - } - - switch (iotype) { - case BEV_INPUT: - bufferevent_read_closure(bufev, progress); - break; - case BEV_OUTPUT: - if (progress && (bufev->enabled & EV_WRITE)) - bufferevent_add( - &bufev->ev_write, bufev->timeout_write); - break; - default: - event_errx(1, "Illegal bufferevent iotype: %d", iotype); - } - - return (0); -} diff --git a/bufferevent_filter.c b/bufferevent_filter.c new file mode 100644 index 00000000..9031b042 --- /dev/null +++ b/bufferevent_filter.c @@ -0,0 +1,435 @@ +/* + * Copyright (c) 2002-2004 Niels Provos + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#ifdef HAVE_SYS_TIME_H +#include +#endif + +#include +#include +#include +#include +#include +#ifdef HAVE_STDARG_H +#include +#endif + +#ifdef WIN32 +#include +#endif + +#include "event2/util.h" +#include "event2/bufferevent.h" +#include "event2/buffer.h" +#include "event2/buffer_compat.h" +#include "event2/bufferevent_struct.h" +#include "event2/bufferevent_compat.h" +#include "event2/event.h" +#include "log-internal.h" +#include "mm-internal.h" +#include "bufferevent-internal.h" +#include "util-internal.h" + +/* prototypes */ +static int be_filter_enable(struct bufferevent *, short); +static int be_filter_disable(struct bufferevent *, short); +static void be_filter_destruct(struct bufferevent *); +static void be_filter_adj_timeouts(struct bufferevent *); + +static void be_filter_readcb(struct bufferevent *, void *); +static void be_filter_writecb(struct bufferevent *, void *); +static void be_filter_errorcb(struct bufferevent *, short, void *); +static int be_filter_flush(struct bufferevent *bufev, + short iotype, enum bufferevent_flush_mode mode); + +static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf, + size_t old, size_t now, void *arg); + +struct bufferevent_filtered { + struct bufferevent bev; + + /** The bufferevent that we read/write filterd data from/to. */ + struct bufferevent *underlying; + /** A callback on our outbuf to notice when somebody adds data */ + struct evbuffer_cb_entry *outbuf_cb; + /** True iff we have received an EOF callback from the underlying + * bufferevent. */ + unsigned got_eof; + + /** Function to free context when we're done. */ + void (*free_context)(void *); + /** Input filter */ + bufferevent_filter_cb process_in; + /** Output filter */ + bufferevent_filter_cb process_out; + + /** User-supplied argument to the filters. */ + void *context; +}; + +struct bufferevent_ops bufferevent_ops_filter = { + "filter", + evutil_offsetof(struct bufferevent_filtered, bev), + be_filter_enable, + be_filter_disable, + be_filter_destruct, + be_filter_adj_timeouts, + be_filter_flush, +}; + +/* Given a bufferevent that's really the bev filter of a bufferevent_filtered, + * return that bufferevent_filtered. Returns NULL otherwise.*/ +static inline struct bufferevent_filtered * +upcast(struct bufferevent *bev) +{ + struct bufferevent_filtered *bev_f; + if (bev->be_ops != &bufferevent_ops_filter) + return NULL; + bev_f = (void*)( ((char*)bev) - + evutil_offsetof(struct bufferevent_filtered, bev) ); + assert(bev_f->bev.be_ops == &bufferevent_ops_filter); + return bev_f; +} + +#define downcast(bev_f) (&(bev_f)->bev) + +/** Return 1 iff bevf's underlying bufferevent's output buffer is at or + * over its high watermark such that we should not write to it in a given + * flush mode. */ +static int +be_underlying_writebuf_full(struct bufferevent_filtered *bevf, + enum bufferevent_flush_mode state) +{ + struct bufferevent *u = bevf->underlying; + return state == BEV_NORMAL && + u->wm_write.high && + EVBUFFER_LENGTH(u->output) >= u->wm_write.high; +} + +/** Return 1 if our input buffer is at or over its high watermark such that we + * should not write to it in a given flush mode. */ +static int +be_readbuf_full(struct bufferevent_filtered *bevf, + enum bufferevent_flush_mode state) +{ + struct bufferevent *bufev = downcast(bevf); + return state == BEV_NORMAL && + bufev->wm_read.high && + EVBUFFER_LENGTH(bufev->input) >= bufev->wm_read.high; +} + + +/* Filter to use when we're created with a NULL filter. */ +static enum bufferevent_filter_result +be_null_filter(struct evbuffer *src, struct evbuffer *dst, ssize_t lim, + enum bufferevent_flush_mode state, void *ctx) +{ + /* XXX respect lim. */ + + (void)state; + if (evbuffer_add_buffer(dst, src) == 0) + return BEV_OK; + else + return BEV_ERROR; +} + +struct bufferevent * +bufferevent_filter_new(struct bufferevent *underlying, + bufferevent_filter_cb input_filter, + bufferevent_filter_cb output_filter, + enum bufferevent_options options, + void (*free_context)(void *), + void *ctx) +{ + struct bufferevent_filtered *bufev_f; + + if (!input_filter) + input_filter = be_null_filter; + if (!output_filter) + output_filter = be_null_filter; + + bufev_f = mm_calloc(1, sizeof(struct bufferevent_filtered)); + if (!bufev_f) + return NULL; + + if (bufferevent_init_common(&bufev_f->bev, underlying->ev_base, + &bufferevent_ops_filter, options) < 0) { + mm_free(bufev_f); + return NULL; + } + + bufev_f->underlying = underlying; + bufev_f->process_in = input_filter; + bufev_f->process_out = output_filter; + bufev_f->free_context = free_context; + bufev_f->context = ctx; + + bufferevent_setcb(bufev_f->underlying, + be_filter_readcb, be_filter_writecb, be_filter_errorcb, bufev_f); + + bufev_f->outbuf_cb = evbuffer_add_cb(bufev_f->bev.output, + bufferevent_filtered_outbuf_cb, bufev_f); + + return &bufev_f->bev; +} + +static void +be_filter_destruct(struct bufferevent *bev) +{ + struct bufferevent_filtered *bevf = upcast(bev); + assert(bevf); + if (bevf->free_context) + bevf->free_context(bevf->context); + + if (bev->options & BEV_OPT_CLOSE_ON_FREE) + bufferevent_free(bevf->underlying); +} + +static int +be_filter_enable(struct bufferevent *bev, short event) +{ + struct bufferevent_filtered *bevf = upcast(bev); + return bufferevent_enable(bevf->underlying, event); +} + +static int +be_filter_disable(struct bufferevent *bev, short event) +{ + struct bufferevent_filtered *bevf = upcast(bev); + return bufferevent_disable(bevf->underlying, event); +} + +static void +be_filter_adj_timeouts(struct bufferevent *bev) +{ + struct bufferevent_filtered *bevf = upcast(bev); + struct timeval *r = NULL, *w = NULL; + + if (bev->timeout_read.tv_sec >= 0) + r = &bev->timeout_read; + if (bev->timeout_write.tv_sec >= 0) + w = &bev->timeout_write; + + bufferevent_set_timeouts(bevf->underlying, r, w); +} + +static enum bufferevent_filter_result +be_filter_process_input(struct bufferevent_filtered *bevf, + enum bufferevent_flush_mode state, + int *processed_out) +{ + enum bufferevent_filter_result res; + + if (state == BEV_NORMAL) { + /* If we're in 'normal' mode, don't urge data on the filter + * unless we're reading data and under our high-water mark.*/ + if (!(bevf->bev.enabled & EV_READ) || + be_readbuf_full(bevf, state)) + return BEV_OK; + } + + do { + ssize_t limit = -1; + if (state == BEV_NORMAL && bevf->bev.wm_read.high) + limit = bevf->bev.wm_read.high - + EVBUFFER_LENGTH(bevf->bev.input); + + res = bevf->process_in(bevf->underlying->input, + bevf->bev.input, limit, state, bevf->context); + + if (res == BEV_OK) + *processed_out = 1; + } while (res == BEV_OK && + (bevf->bev.enabled & EV_READ) && + EVBUFFER_LENGTH(bevf->underlying->input) && + !be_readbuf_full(bevf, state)); + + return res; +} + + +static enum bufferevent_filter_result +be_filter_process_output(struct bufferevent_filtered *bevf, + enum bufferevent_flush_mode state, + int *processed_out) +{ + enum bufferevent_filter_result res = BEV_OK; + struct bufferevent *bufev = downcast(bevf); + int again = 0; + + if (state == BEV_NORMAL) { + /* If we're in 'normal' mode, don't urge data on the + * filter unless we're writing data, and the underlying + * bufferevent is accepting data, and we have data to + * give the filter. If we're in 'flush' or 'finish', + * call the filter no matter what. */ + if (!(bufev->enabled & EV_WRITE) || + be_underlying_writebuf_full(bevf, state) || + !EVBUFFER_LENGTH(bufev->output)) + return BEV_OK; + } + + /* disable the callback that calls this function + when the user adds to the output buffer. */ + evbuffer_cb_set_flags(bufev->output, bevf->outbuf_cb, 0); + + do { + int processed = 0; + again = 0; + + do { + ssize_t limit = -1; + if (state == BEV_NORMAL && + bevf->underlying->wm_write.high) + limit = bevf->underlying->wm_write.high - + EVBUFFER_LENGTH(bevf->underlying->output); + + res = bevf->process_out(bevf->bev.output, + bevf->underlying->output, + limit, + state, + bevf->context); + + if (res == BEV_OK) + processed = *processed_out = 1; + } while (/* Stop if the filter wasn't successful...*/ + res == BEV_OK && + /* Or if we aren't writing any more. */ + (bufev->enabled & EV_WRITE) && + /* Of if we have nothing more to write and we are + * not flushing. */ + EVBUFFER_LENGTH(bufev->output) && + /* Or if we have filled the underlying output buffer. */ + !be_underlying_writebuf_full(bevf,state)); + + if (processed && bufev->writecb && + EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low) { + /* call the write callback.*/ + (*bufev->writecb)(bufev, bufev->cbarg); + + if (res == BEV_OK && + (bufev->enabled & EV_WRITE) && + EVBUFFER_LENGTH(bufev->output) && + !be_underlying_writebuf_full(bevf, state)) { + again = 1; + } + } + } while (again); + + /* reenable the outbuf_cb */ + evbuffer_cb_set_flags(bufev->output,bevf->outbuf_cb, + EVBUFFER_CB_ENABLED); + + return res; +} + +/* Called when the size of our outbuf changes. */ +static void +bufferevent_filtered_outbuf_cb(struct evbuffer *buf, + size_t old, size_t now, void *arg) +{ + struct bufferevent_filtered *bevf = arg; + + if (now > old) { + int processed_any = 0; + /* Somebody added more data to the output buffer. Try to + * process it, if we should. */ + be_filter_process_output(bevf, BEV_NORMAL, &processed_any); + } +} + +/* Called when the underlying socket has read. */ +static void +be_filter_readcb(struct bufferevent *underlying, void *_me) +{ + struct bufferevent_filtered *bevf = _me; + enum bufferevent_filter_result res; + enum bufferevent_flush_mode state; + struct bufferevent *bufev = downcast(bevf); + int processed_any = 0; + + if (bevf->got_eof) + state = BEV_FINISHED; + else + state = BEV_NORMAL; + + res = be_filter_process_input(bevf, state, &processed_any); + + if (processed_any && + EVBUFFER_LENGTH(bufev->input) >= bufev->wm_read.low && + bufev->readcb != NULL) + (*bufev->readcb)(bufev, bufev->cbarg); +} + +/* Called when the underlying socket has drained enough that we can write to + it. */ +static void +be_filter_writecb(struct bufferevent *underlying, void *_me) +{ + struct bufferevent_filtered *bevf = _me; + int processed_any = 0; + + be_filter_process_output(bevf, BEV_NORMAL, &processed_any); +} + +/* Called when the underlying socket has given us an error */ +static void +be_filter_errorcb(struct bufferevent *underlying, short what, void *_me) +{ + struct bufferevent_filtered *bevf = _me; + + /* All we can really to is tell our own errorcb. */ + if (bevf->bev.errorcb) + bevf->bev.errorcb(&bevf->bev, what, bevf->bev.cbarg); +} + +static int +be_filter_flush(struct bufferevent *bufev, + short iotype, enum bufferevent_flush_mode mode) +{ + struct bufferevent_filtered *bevf = upcast(bufev); + int processed_any = 0; + assert(bevf); + + if (iotype & EV_READ) { + be_filter_process_input(bevf, mode, &processed_any); + } + if (iotype & EV_WRITE) { + be_filter_process_output(bevf, mode, &processed_any); + } + /* XXX check the return value? */ + /* XXX does this want to recursively call lower-level flushes? */ + bufferevent_flush(bevf->underlying, iotype, mode); + + return processed_any; +} diff --git a/bufferevent_sock.c b/bufferevent_sock.c new file mode 100644 index 00000000..233cfe39 --- /dev/null +++ b/bufferevent_sock.c @@ -0,0 +1,375 @@ +/* + * Copyright (c) 2002-2004 Niels Provos + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#ifdef HAVE_SYS_TIME_H +#include +#endif + +#include +#include +#include +#include +#include +#ifdef HAVE_STDARG_H +#include +#endif +#ifdef HAVE_UNISTD_H +#include +#endif + +#ifdef WIN32 +#include +#endif + +#include "event2/util.h" +#include "event2/bufferevent.h" +#include "event2/buffer.h" +#include "event2/buffer_compat.h" +#include "event2/bufferevent_struct.h" +#include "event2/bufferevent_compat.h" +#include "event2/event.h" +#include "log-internal.h" +#include "mm-internal.h" +#include "bufferevent-internal.h" +#include "util-internal.h" + +/* prototypes */ +static int be_socket_enable(struct bufferevent *, short); +static int be_socket_disable(struct bufferevent *, short); +static void be_socket_destruct(struct bufferevent *); +static void be_socket_adj_timeouts(struct bufferevent *); +static int be_socket_flush(struct bufferevent *, short, enum bufferevent_flush_mode); + +struct bufferevent_ops bufferevent_ops_socket = { + "socket", + 0, + be_socket_enable, + be_socket_disable, + be_socket_destruct, + be_socket_adj_timeouts, + be_socket_flush, +}; + +static int +be_socket_add(struct event *ev, const struct timeval *tv) +{ + if (tv->tv_sec == 0 && tv->tv_usec == 0) + return event_add(ev, NULL); + else + return event_add(ev, tv); +} + +static void +bufferevent_socket_outbuf_cb(struct evbuffer *buf, + size_t old, size_t now, void *arg) +{ + struct bufferevent *bufev = arg; + + if (now > old && + (bufev->enabled & EV_WRITE) && + !event_pending(&bufev->ev_write, EV_WRITE, NULL)) { + /* Somebody added data to the buffer, and we would like to + * write, and we were not writing. So, start writing. */ + be_socket_add(&bufev->ev_write, &bufev->timeout_write); + } +} + +static void +bufferevent_readcb(evutil_socket_t fd, short event, void *arg) +{ + struct bufferevent *bufev = arg; + struct evbuffer *input; + int res = 0; + short what = EVBUFFER_READ; + int howmuch = -1; + + if (event == EV_TIMEOUT) { + what |= EVBUFFER_TIMEOUT; + goto error; + } + + input = bufev->input; + + /* + * If we have a high watermark configured then we don't want to + * read more data than would make us reach the watermark. + */ + if (bufev->wm_read.high != 0) { + howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(input); + /* we somehow lowered the watermark, stop reading */ + if (howmuch <= 0) { + bufferevent_wm_suspend_read(bufev); + return; + } + } + + res = evbuffer_read(input, fd, howmuch); + + if (res == -1) { + int err = evutil_socket_geterror(fd); + if (EVUTIL_ERR_RW_RETRIABLE(err)) + goto reschedule; + /* error case */ + what |= EVBUFFER_ERROR; + } else if (res == 0) { + /* eof case */ + what |= EVBUFFER_EOF; + } + + if (res <= 0) + goto error; + + + /* Invoke the user callback - must always be called last */ + if (EVBUFFER_LENGTH(input) >= bufev->wm_read.low && + bufev->readcb != NULL) + (*bufev->readcb)(bufev, bufev->cbarg); + + return; + + reschedule: + return; + + error: + event_del(&bufev->ev_read); + (*bufev->errorcb)(bufev, what, bufev->cbarg); + +} + +static void +bufferevent_writecb(evutil_socket_t fd, short event, void *arg) +{ + struct bufferevent *bufev = arg; + int res = 0; + short what = EVBUFFER_WRITE; + + if (event == EV_TIMEOUT) { + what |= EVBUFFER_TIMEOUT; + goto error; + } + + if (EVBUFFER_LENGTH(bufev->output)) { + res = evbuffer_write(bufev->output, fd); + if (res == -1) { + int err = evutil_socket_geterror(fd); + if (EVUTIL_ERR_RW_RETRIABLE(err)) + goto reschedule; + what |= EVBUFFER_ERROR; + } else if (res == 0) { + /* eof case */ + what |= EVBUFFER_EOF; + } + if (res <= 0) + goto error; + } + + if (EVBUFFER_LENGTH(bufev->output) == 0) + event_del(&bufev->ev_write); + + /* + * Invoke the user callback if our buffer is drained or below the + * low watermark. + */ + if (bufev->writecb != NULL && + EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low) + (*bufev->writecb)(bufev, bufev->cbarg); + + return; + + reschedule: + if (EVBUFFER_LENGTH(bufev->output) == 0) + event_del(&bufev->ev_write); + return; + + error: + event_del(&bufev->ev_write); + (*bufev->errorcb)(bufev, what, bufev->cbarg); +} + +struct bufferevent * +bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, + enum bufferevent_options options) +{ + struct bufferevent *bufev; + + if ((bufev = mm_calloc(1, sizeof(struct bufferevent))) == NULL) + return NULL; + + if (bufferevent_init_common(bufev, base, &bufferevent_ops_socket, + options) < 0) { + mm_free(bufev); + return NULL; + } + + event_assign(&bufev->ev_read, bufev->ev_base, fd, + EV_READ|EV_PERSIST, bufferevent_readcb, bufev); + event_assign(&bufev->ev_write, bufev->ev_base, fd, + EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev); + + evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev); + + return bufev; +} + +/* + * Create a new buffered event object. + * + * The read callback is invoked whenever we read new data. + * The write callback is invoked whenever the output buffer is drained. + * The error callback is invoked on a write/read error or on EOF. + * + * Both read and write callbacks maybe NULL. The error callback is not + * allowed to be NULL and have to be provided always. + */ + +struct bufferevent * +bufferevent_new(evutil_socket_t fd, evbuffercb readcb, evbuffercb writecb, + everrorcb errorcb, void *cbarg) +{ + struct bufferevent *bufev; + + if (!(bufev = bufferevent_socket_new(NULL, fd, 0))) + return NULL; + + bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg); + + return bufev; +} + + +static int +be_socket_enable(struct bufferevent *bufev, short event) +{ + if (event & EV_READ) { + if (be_socket_add(&bufev->ev_read,&bufev->timeout_read) == -1) + return -1; + } + if (event & EV_WRITE) { + if (be_socket_add(&bufev->ev_write,&bufev->timeout_write) == -1) + return -1; + } + return 0; +} + +static int +be_socket_disable(struct bufferevent *bufev, short event) +{ + if (event & EV_READ) { + if (event_del(&bufev->ev_read) == -1) + return -1; + } + if (event & EV_WRITE) { + if (event_del(&bufev->ev_write) == -1) + return -1; + } + return 0; +} + +static void +be_socket_destruct(struct bufferevent *bufev) +{ + evutil_socket_t fd; + assert(bufev->be_ops == &bufferevent_ops_socket); + + fd = event_get_fd(&bufev->ev_read); + + event_del(&bufev->ev_read); + event_del(&bufev->ev_write); + + if (bufev->options & BEV_OPT_CLOSE_ON_FREE) + EVUTIL_CLOSESOCKET(fd); +} + +static void +be_socket_adj_timeouts(struct bufferevent *bufev) +{ + if (event_pending(&bufev->ev_read, EV_READ, NULL)) + be_socket_add(&bufev->ev_read, &bufev->timeout_read); + if (event_pending(&bufev->ev_write, EV_WRITE, NULL)) + be_socket_add(&bufev->ev_write, &bufev->timeout_write); +} + +static int +be_socket_flush(struct bufferevent *bev, short iotype, + enum bufferevent_flush_mode mode) +{ + return 0; +} + + +void +bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd) +{ + assert(bufev->be_ops == &bufferevent_ops_socket); + + event_del(&bufev->ev_read); + event_del(&bufev->ev_write); + + event_assign(&bufev->ev_read, bufev->ev_base, fd, + EV_READ|EV_PERSIST, bufferevent_readcb, bufev); + event_assign(&bufev->ev_write, bufev->ev_base, fd, + EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev); +} + +/* XXXX Should non-socket buffferevents support this? */ +int +bufferevent_priority_set(struct bufferevent *bufev, int priority) +{ + if (bufev->be_ops != &bufferevent_ops_socket) + return -1; + + if (event_priority_set(&bufev->ev_read, priority) == -1) + return (-1); + if (event_priority_set(&bufev->ev_write, priority) == -1) + return (-1); + + return (0); +} + +/* XXXX Should non-socket buffferevents support this? */ +int +bufferevent_base_set(struct event_base *base, struct bufferevent *bufev) +{ + int res; + if (bufev->be_ops != &bufferevent_ops_socket) + return -1; + + bufev->ev_base = base; + + res = event_base_set(base, &bufev->ev_read); + if (res == -1) + return (res); + + res = event_base_set(base, &bufev->ev_write); + return (res); +} diff --git a/event.h b/event.h index 9b1814b5..c7bad607 100644 --- a/event.h +++ b/event.h @@ -192,6 +192,7 @@ typedef unsigned short u_short; #include #include #include +#include #include #ifdef __cplusplus diff --git a/http.c b/http.c index 5a8063db..abee38e3 100644 --- a/http.c +++ b/http.c @@ -88,6 +88,7 @@ #include "event2/event.h" #include "event2/buffer.h" #include "event2/bufferevent.h" +#include "event2/bufferevent_compat.h" #include "event2/http_struct.h" #include "event2/http_compat.h" #include "event2/util.h" diff --git a/include/Makefile.am b/include/Makefile.am index 64a9799a..a069840f 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -1,7 +1,9 @@ AUTOMAKE_OPTIONS = foreign +# XXXX Don't do all this twice. + EXTRA_SRC = event2/buffer.h event2/buffer_compat.h \ - event2/thread.h event2/bufferevent.h \ + event2/thread.h event2/bufferevent.h event2/bufferevent_compat.h \ event2/bufferevent_struct.h event2/event.h event2/event_compat.h \ event2/event_struct.h event2/tag.h event2/util.h \ event2/http.h event2/http_struct.h event2/http_compat.h @@ -9,6 +11,7 @@ EXTRA_SRC = event2/buffer.h event2/buffer_compat.h \ nobase_include_HEADERS = \ event2/buffer.h event2/buffer_compat.h \ event2/thread.h event2/bufferevent.h \ + event2/bufferevent_compat.h \ event2/bufferevent_struct.h event2/event.h event2/event_compat.h \ event2/event_struct.h event2/tag.h event2/util.h \ event2/http.h event2/http_struct.h event2/http_compat.h \ diff --git a/include/event2/buffer.h b/include/event2/buffer.h index c5491b08..d29dfd0d 100644 --- a/include/event2/buffer.h +++ b/include/event2/buffer.h @@ -413,14 +413,15 @@ int evbuffer_remove_cb_entry(struct evbuffer *buffer, */ int evbuffer_remove_cb(struct evbuffer *buffer, evbuffer_cb cb, void *cbarg); +#define EVBUFFER_CB_DISABLED 0 #define EVBUFFER_CB_ENABLED 1 /** Change whether a given callback is enabled on a buffer or not. A disabled callback is not invoked even when the buffer size changes. @param buffer the evbuffer that the callback is watching. @param cb the callback whose status we want to change. - @param flags EVBUFFER_CB_ENABLED to enable the callback, or 0 to - disable it. + @param flags EVBUFFER_CB_ENABLED to enable the callback, or + EVBUFFER_CB_DISABLEDD to disable it. @return 0 on success, -1 on failure. */ int evbuffer_cb_set_flags(struct evbuffer *buffer, diff --git a/include/event2/bufferevent.h b/include/event2/bufferevent.h index f3039a64..6cb75330 100644 --- a/include/event2/bufferevent.h +++ b/include/event2/bufferevent.h @@ -34,6 +34,20 @@ and one for writing, and callbacks that are invoked under certain circumstances. + libevent provides an abstraction on top of the regular event callbacks. + This abstraction is called a buffered event. A buffered event provides + input and output buffers that get filled and drained automatically. The + user of a buffered event no longer deals directly with the I/O, but + instead is reading from input and writing to output buffers. + + Once initialized, the bufferevent structure can be used repeatedly with + bufferevent_enable() and bufferevent_disable(). + + When read enabled the bufferevent will try to read from the file descriptor + and call the read callback. The write callback is executed whenever the + output buffer is drained below the write low watermark, which is 0 by + default. + */ #ifdef __cplusplus @@ -102,43 +116,21 @@ typedef void (*evbuffercb)(struct bufferevent *bev, void *ctx); typedef void (*everrorcb)(struct bufferevent *bev, short what, void *ctx); +enum bufferevent_options { + BEV_OPT_CLOSE_ON_FREE = (1<<0), +}; + /** - Create a new bufferevent. - - libevent provides an abstraction on top of the regular event callbacks. - This abstraction is called a buffered event. A buffered event provides - input and output buffers that get filled and drained automatically. The - user of a buffered event no longer deals directly with the I/O, but - instead is reading from input and writing to output buffers. - - Once initialized, the bufferevent structure can be used repeatedly with - bufferevent_enable() and bufferevent_disable(). - - When read enabled the bufferevent will try to read from the file descriptor - and call the read callback. The write callback is executed whenever the - output buffer is drained below the write low watermark, which is 0 by - default. - - If multiple bases are in use, bufferevent_base_set() must be called before - enabling the bufferevent for the first time. + Create a new socket bufferevent over an existing socket. + @param base the event base to associate with the new bufferevent. @param fd the file descriptor from which data is read and written to. This file descriptor is not allowed to be a pipe(2). - @param readcb callback to invoke when there is data to be read, or NULL if - no callback is desired - @param writecb callback to invoke when the file descriptor is ready for - writing, or NULL if no callback is desired - @param errorcb callback to invoke when there is an error on the file - descriptor - @param cbarg an argument that will be supplied to each of the callbacks - (readcb, writecb, and errorcb) @return a pointer to a newly allocated bufferevent struct, or NULL if an error occurred - @see bufferevent_base_set(), bufferevent_free() + @see bufferevent_free() */ -struct bufferevent *bufferevent_new(evutil_socket_t fd, - evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg); - +struct bufferevent *bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, enum bufferevent_options options); /** Assign a bufferevent to a specific event_base. @@ -295,11 +287,11 @@ int bufferevent_disable(struct bufferevent *bufev, short event); Set the read and write timeout for a buffered event. @param bufev the bufferevent to be modified - @param timeout_read the read timeout - @param timeout_write the write timeout + @param timeout_read the read timeout, or NULL + @param timeout_write the write timeout, or NULL */ -void bufferevent_settimeout(struct bufferevent *bufev, - int timeout_read, int timeout_write); +void bufferevent_set_timeouts(struct bufferevent *bufev, + const struct timeval *timeout_read, const struct timeval *timeout_write); /** Sets the watermarks for read and write events. @@ -309,7 +301,9 @@ void bufferevent_settimeout(struct bufferevent *bufev, is beyond the high watermark, the buffevent stops reading from the network. On output, the user write callback is invoked whenever the buffered data - falls below the low watermark. + falls below the low watermark. Filters that write to this bufev will try + not to write more bytes to this buffer than the high watermark would allow, + except when flushing. @param bufev the bufferevent to be modified @param events EV_READ, EV_WRITE or both @@ -325,22 +319,38 @@ void bufferevent_setwatermark(struct bufferevent *bufev, short events, /** macro for getting access to the output buffer of a bufferevent */ #define EVBUFFER_OUTPUT(x) bufferevent_get_output(x) -/** - Support for filtering input and output of bufferevents. - */ - /** Flags that can be passed into filters to let them know how to deal with the incoming data. */ -enum bufferevent_filter_state { +enum bufferevent_flush_mode { /** usually set when processing data */ BEV_NORMAL = 0, - /** encountered EOF on read or done sending data */ + /** want to checkpoint all data sent. */ BEV_FLUSH = 1, + + /** encountered EOF on read or done sending data */ + BEV_FINISHED = 2, }; +/** + Triggers the bufferevent to produce more + data if possible. + + @param bufev the bufferevent object + @param iotype either EV_READ or EV_WRITE or both. + @param state either BEV_NORMAL or BEV_FLUSH or BEV_FINISHED + @return -1 on failure, 0 if no data was produces, 1 if data was produced + */ +int bufferevent_flush(struct bufferevent *bufev, + short iotype, + enum bufferevent_flush_mode state); + +/** + Support for filtering input and output of bufferevents. + */ + /** Values that filters can return. */ @@ -356,116 +366,44 @@ enum bufferevent_filter_result { BEV_ERROR = 2 }; +/** A callback function to implement a filter for a bufferevent. + + @param src An evbuffer to drain data from. + @param dst An evbuffer to add data to. + @param limit A suggested upper bound of bytes to write to dst. + The filter may ignore this value, but doing so means that + it will overflow the high-water mark associated with dst. + -1 means "no limit". + @param state Whether we should write data as may be convenient + (BEV_NORMAL), or flush as much data as we can (BEV_FLUSH), + or flush as much as we can, possibly including an end-of-stream + marker (BEF_FINISH). + @param ctx A user-supplied pointer. + + @return BEV_OK if we wrote some data; BEV_NEED_MORE if we can't + produce any more output until we get some input; and BEV_ERROR + on an error. + */ +typedef enum bufferevent_filter_result (*bufferevent_filter_cb)( + struct evbuffer *src, struct evbuffer *dst, ssize_t dst_limit, + enum bufferevent_flush_mode mode, void *ctx); + struct bufferevent_filter; -/** - Creates a new filtering object for a bufferevent. - - Filters can be used to implement compression, authentication, rate limiting, - etc. for bufferevents. Filters can be associated with the input or output - path or both. Filters need to be inserted with bufferevent_filter_insert() - on either the input or output path. - - For example, when implementing compression, both an input and an - output filters are required. The output filter compress all output - as it passes along whereas the input filter decompresses all input as - it is being read from the network. - - Some filters may require specificaly behavior such as flushing their buffers - on EOF. To allom them to do that, a bufferevent will invoke the filter - with BEV_FLUSH to let it know that EOF has been reached. - - When a filter needs more data before it can output any data, it may return - BEV_NEED_MORE in which case the filter chain is being interrupted until - more data arrives. A filter can indicate a fatal error by returning - BEV_ERROR. Otherwise, it should return BEV_OK. - - @param init_context an optional function that initializes the ctx parameter. - @param free_context an optional function to free memory associated with the - ctx parameter. - @param process the filtering function that should be invokved either during - input or output depending on where the filter should be attached. - @param ctx additional context that can be passed to the process function - @return a bufferevent_filter object that can subsequently be installed -*/ -struct bufferevent_filter *bufferevent_filter_new( - void (*init_context)(void *), - void (*free_context)(void *), - enum bufferevent_filter_result (*process)( - struct evbuffer *src, struct evbuffer *dst, - enum bufferevent_filter_state state, void *ctx), void *ctx); - -/** - Frees the filter object. - - It must have been removed from the bufferevent before it can be freed. - - @param filter the filter to be freed - @see bufferevent_filter_remove() -*/ -void bufferevent_filter_free(struct bufferevent_filter *filter); - -/** Filter types for inserting or removing filters */ -enum bufferevent_filter_type { - /** filter is being used for input */ - BEV_INPUT = 0, - - /** filter is being used for output */ - BEV_OUTPUT = 1 +enum bufferevent_filter_options { + BEV_FILT_FREE_UNDERLYING = (1<<0), }; -/** - Inserts a filter into the processing of data for bufferevent. - A filter can be inserted only once. It can not be used again for - another insert unless it have been removed via - bufferevent_filter_remove() first. - - Input filters are inserted at the end, output filters at the - beginning of the queue. - - @param bufev the bufferevent object into which to install the filter - @param filter_type either BEV_INPUT or BEV_OUTPUT - @param filter the filter object - @see bufferevent_filter_remove() +/** Allocate a new filtering bufferevent on top of an existing bufferevent. */ -void bufferevent_filter_insert(struct bufferevent *bufev, - enum bufferevent_filter_type filter_type, - struct bufferevent_filter *filter); - -/** - Removes a filter from the bufferevent. - - A filter should be flushed via buffervent_trigger_filter before removing - it from a bufferevent. Any remaining intermediate buffer data is going - to be lost. - - @param bufev the bufferevent object from which to remove the filter - @param filter_type either BEV_INPUT or BEV_OUTPUT - @param filter the filter object or NULL to trigger all filters - @see bufferevent_trigger_filter() -*/ -void bufferevent_filter_remove(struct bufferevent *bufev, - enum bufferevent_filter_type filter_type, - struct bufferevent_filter *filter); - -/** - Triggers the filter chain the specified filter to produce more - data is possible. This is primarily for time-based filters such - as rate-limiting to produce more data as time passes. - - @param bufev the bufferevent object to which the filter belongs - @param filter the bufferevent filter at which to start - @param iotype either BEV_INPUT or BEV_OUTPUT depending on where the filter - was installed - @param state either BEV_NORMAL or BEV_FLUSH - @return -1 on failure, 0 if no data was produces, 1 if data was produced - */ - -int -bufferevent_trigger_filter(struct bufferevent *bufev, - struct bufferevent_filter *filter, int iotype, - enum bufferevent_filter_state state); +struct bufferevent * +bufferevent_filter_new(struct bufferevent *underlying, + bufferevent_filter_cb input_filter, + bufferevent_filter_cb output_filter, + enum bufferevent_options options, + void (*free_context)(void *), + void *ctx); #ifdef __cplusplus } diff --git a/include/event2/bufferevent_compat.h b/include/event2/bufferevent_compat.h new file mode 100644 index 00000000..df00169c --- /dev/null +++ b/include/event2/bufferevent_compat.h @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2000-2007 Niels Provos + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +#ifndef _EVENT2_BUFFEREVENT_COMPAT_H_ +#define _EVENT2_BUFFEREVENT_COMPAT_H_ + +/** + Create a new bufferevent for an fd. + + This function is deprecated. Use buffevent_socket_new and + bufferevent_set_callbacks instead. + + libevent provides an abstraction on top of the regular event callbacks. + This abstraction is called a buffered event. A buffered event provides + input and output buffers that get filled and drained automatically. The + user of a buffered event no longer deals directly with the I/O, but + instead is reading from input and writing to output buffers. + + Once initialized, the bufferevent structure can be used repeatedly with + bufferevent_enable() and bufferevent_disable(). + + When read enabled the bufferevent will try to read from the file descriptor + and call the read callback. The write callback is executed whenever the + output buffer is drained below the write low watermark, which is 0 by + default. + + If multiple bases are in use, bufferevent_base_set() must be called before + enabling the bufferevent for the first time. + + @param fd the file descriptor from which data is read and written to. + This file descriptor is not allowed to be a pipe(2). + @param readcb callback to invoke when there is data to be read, or NULL if + no callback is desired + @param writecb callback to invoke when the file descriptor is ready for + writing, or NULL if no callback is desired + @param errorcb callback to invoke when there is an error on the file + descriptor + @param cbarg an argument that will be supplied to each of the callbacks + (readcb, writecb, and errorcb) + @return a pointer to a newly allocated bufferevent struct, or NULL if an + error occurred + @see bufferevent_base_set(), bufferevent_free() + */ +struct bufferevent *bufferevent_new(evutil_socket_t fd, + evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg); + + +/** + Set the read and write timeout for a buffered event. + + @param bufev the bufferevent to be modified + @param timeout_read the read timeout + @param timeout_write the write timeout + */ +void bufferevent_settimeout(struct bufferevent *bufev, + int timeout_read, int timeout_write); + +#endif diff --git a/include/event2/bufferevent_struct.h b/include/event2/bufferevent_struct.h index 80a7cbe7..322a4383 100644 --- a/include/event2/bufferevent_struct.h +++ b/include/event2/bufferevent_struct.h @@ -62,6 +62,7 @@ struct event_watermark { size_t high; }; +#if 0 struct bufferevent_filter; /* Fix so that ppl dont have to run with */ @@ -74,20 +75,42 @@ struct name { \ } #endif /* !TAILQ_HEAD */ -TAILQ_HEAD(bufferevent_filterq, bufferevent_filter); - #ifdef _EVENT_DEFINED_TQHEAD #undef TAILQ_HEAD #undef _EVENT_DEFINED_TQHEAD #endif /* _EVENT_DEFINED_TQHEAD */ +#endif +/** + Shared implementation of a bufferevent. + + This type is exposed only because it was exposed in previous versions, + and some people's code may rely on manipulating it. Otherwise, you + should really not rely on the layout, size, or contents of this structure: + it is fairly volatile, and WILL change in future versions of the code. +**/ struct bufferevent { + /** Event base for which this bufferevent was created. */ struct event_base *ev_base; + /** Pointer to a table of function pointers to set up how this + bufferevent behaves. */ + const struct bufferevent_ops *be_ops; + /** A read event that triggers when a timeout has happened or a socket + is ready to read data. Only used by some subtypes of + bufferevent. */ struct event ev_read; + /** A write event that triggers when a timeout has happened or a socket + is ready to write data. Only used by some subtypes of + bufferevent. */ struct event ev_write; + /** An input buffer. Only the bufferevent is allowed to add data to + this buffer, though the user is allowed to drain it. */ struct evbuffer *input; + + /** An input buffer. Only the bufferevent is allowed to drain data + from this buffer, though the user is allowed to add it. */ struct evbuffer *output; struct event_watermark wm_read; @@ -98,14 +121,19 @@ struct bufferevent { everrorcb errorcb; void *cbarg; - int timeout_read; /* in seconds */ - int timeout_write; /* in seconds */ + struct timeval timeout_read; + struct timeval timeout_write; - short enabled; /* events that are currently enabled */ + /** Evbuffer callback to enforce watermarks on input. */ + struct evbuffer_cb_entry *read_watermarks_cb; - /** the list of input and output filters */ - struct bufferevent_filterq input_filters; - struct bufferevent_filterq output_filters; + /** Events that are currently enabled: currently EV_READ and EV_WRITE + are supported. */ + short enabled; + /** If set, read is suspended until evbuffer some. */ + unsigned read_suspended : 1; /* */ + + enum bufferevent_options options; }; #ifdef __cplusplus diff --git a/test/Makefile.am b/test/Makefile.am index f178879c..a691f30b 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -20,6 +20,7 @@ test_time_LDADD = ../libevent_core.la regress_SOURCES = regress.c regress_buffer.c regress_http.c regress_dns.c \ regress_rpc.c regress.gen.c regress.gen.h regress_et.c \ + regress_bufferevent.c \ regress_util.c tinytest.c regress_main.c \ $(regress_pthread_SOURCES) $(regress_zlib_SOURCES) if PTHREADS diff --git a/test/regress.c b/test/regress.c index 7ef31d8a..8d3493e8 100644 --- a/test/regress.c +++ b/test/regress.c @@ -61,6 +61,7 @@ #include "event2/tag.h" #include "event2/buffer.h" #include "event2/bufferevent.h" +#include "event2/bufferevent_compat.h" #include "event2/bufferevent_struct.h" #include "event2/util.h" #include "event-internal.h" @@ -150,7 +151,7 @@ multiple_write_cb(int fd, short event, void *arg) } woff += len; - + if (woff >= sizeof(wbuf)) { shutdown(fd, SHUT_WR); if (usepersist) @@ -1098,8 +1099,9 @@ readcb(struct bufferevent *bev, void *arg) bufferevent_disable(bev, EV_READ); - if (EVBUFFER_LENGTH(evbuf) == 8333) + if (EVBUFFER_LENGTH(evbuf) == 8333) { test_ok++; + } evbuffer_free(evbuf); } @@ -1108,8 +1110,9 @@ readcb(struct bufferevent *bev, void *arg) static void writecb(struct bufferevent *bev, void *arg) { - if (EVBUFFER_LENGTH(bev->output) == 0) + if (EVBUFFER_LENGTH(bev->output) == 0) { test_ok++; + } } static void @@ -1118,7 +1121,8 @@ errorcb(struct bufferevent *bev, short what, void *arg) test_ok = -2; } -static void +/*XXXX move to regress_bufferevent.c; make static again. */ +void test_bufferevent(void) { struct bufferevent *bev1, *bev2; @@ -1179,8 +1183,11 @@ wm_readcb(struct bufferevent *bev, void *arg) static void wm_writecb(struct bufferevent *bev, void *arg) { - if (EVBUFFER_LENGTH(bev->output) == 0) + assert(EVBUFFER_LENGTH(bev->output) <= 100); + if (EVBUFFER_LENGTH(bev->output) == 0) { + evbuffer_drain(bev->output, EVBUFFER_LENGTH(bev->output)); test_ok++; + } } static void @@ -1189,7 +1196,8 @@ wm_errorcb(struct bufferevent *bev, short what, void *arg) test_ok = -2; } -static void +/*XXXX move to regress_bufferevent.c; make static again. */ +void test_bufferevent_watermarks(void) { struct bufferevent *bev1, *bev2; @@ -1205,21 +1213,29 @@ test_bufferevent_watermarks(void) bufferevent_enable(bev2, EV_READ); for (i = 0; i < sizeof(buffer); i++) - buffer[i] = i; + buffer[i] = (char)i; bufferevent_write(bev1, buffer, sizeof(buffer)); /* limit the reading on the receiving bufferevent */ bufferevent_setwatermark(bev2, EV_READ, 10, 20); + /* Tell the sending bufferevent not to notify us till it's down to + 100 bytes. */ + bufferevent_setwatermark(bev1, EV_WRITE, 100, 2000); + event_dispatch(); + tt_int_op(test_ok, ==, 2); + + /* The write callback drained all the data from outbuf, so we + * should have removed the write event... */ + tt_assert(!event_pending(&bev2->ev_write, EV_WRITE, NULL)); + +end: bufferevent_free(bev1); bufferevent_free(bev2); - if (test_ok != 2) - test_ok = 0; - cleanup_test(); } @@ -1231,7 +1247,7 @@ test_bufferevent_watermarks(void) static enum bufferevent_filter_result bufferevent_input_filter(struct evbuffer *src, struct evbuffer *dst, - enum bufferevent_filter_state state, void *ctx) + ssize_t lim, enum bufferevent_flush_mode state, void *ctx) { const unsigned char *buffer; int i; @@ -1253,7 +1269,7 @@ bufferevent_input_filter(struct evbuffer *src, struct evbuffer *dst, static enum bufferevent_filter_result bufferevent_output_filter(struct evbuffer *src, struct evbuffer *dst, - enum bufferevent_filter_state state, void *ctx) + ssize_t lim, enum bufferevent_flush_mode state, void *ctx) { const unsigned char *buffer; int i; @@ -1268,41 +1284,39 @@ bufferevent_output_filter(struct evbuffer *src, struct evbuffer *dst, return (BEV_OK); } -static void + +/*XXXX move to regress_bufferevent.c; make static again. */ +void test_bufferevent_filters(void) { struct bufferevent *bev1, *bev2; - struct bufferevent_filter *finput, *foutput; char buffer[8333]; int i; + test_ok = 0; setup_test("Bufferevent Filters: "); - bev1 = bufferevent_new(pair[0], NULL, writecb, errorcb, NULL); - bev2 = bufferevent_new(pair[1], readcb, NULL, errorcb, NULL); - - bufferevent_disable(bev1, EV_READ); - bufferevent_enable(bev2, EV_READ); + bev1 = bufferevent_socket_new(NULL, pair[0], 0); + bev2 = bufferevent_socket_new(NULL, pair[1], 0); for (i = 0; i < sizeof(buffer); i++) buffer[i] = i; + bev1 = bufferevent_filter_new(bev1, NULL, bufferevent_output_filter, + 0, NULL, NULL); + + bev2 = bufferevent_filter_new(bev2, bufferevent_input_filter, + NULL, 0, NULL, NULL); + bufferevent_setcb(bev1, NULL, writecb, errorcb, NULL); + bufferevent_setcb(bev2, readcb, NULL, errorcb, NULL); + + bufferevent_disable(bev1, EV_READ); + bufferevent_enable(bev2, EV_READ); /* insert some filters */ - finput = bufferevent_filter_new( - NULL, NULL,bufferevent_input_filter, NULL); - foutput = bufferevent_filter_new( - NULL, NULL, bufferevent_output_filter, NULL); - - bufferevent_filter_insert(bev1, BEV_OUTPUT, foutput); - bufferevent_filter_insert(bev2, BEV_INPUT, finput); - bufferevent_write(bev1, buffer, sizeof(buffer)); event_dispatch(); - bufferevent_filter_remove(bev1, BEV_OUTPUT, foutput); - bufferevent_filter_free(foutput); - bufferevent_free(bev1); bufferevent_free(bev2); @@ -1630,10 +1644,6 @@ struct testcase_t legacy_testcases[] = { LEGACY(persistent_timeout, TT_FORK|TT_NEED_BASE), LEGACY(priorities, TT_FORK|TT_NEED_BASE), - LEGACY(bufferevent, TT_ISOLATED), - LEGACY(bufferevent_watermarks, TT_ISOLATED), - LEGACY(bufferevent_filters, TT_ISOLATED), - LEGACY(free_active_base, TT_FORK|TT_NEED_BASE), LEGACY(event_base_new, TT_FORK|TT_NEED_SOCKETPAIR), diff --git a/test/regress.h b/test/regress.h index 196d3896..7cc3ca86 100644 --- a/test/regress.h +++ b/test/regress.h @@ -34,8 +34,15 @@ extern "C" { #include "tinytest.h" #include "tinytest_macros.h" +/*XXX move these to regress_bufferevent.c when bufferevent changes are + * merged. */ +void test_bufferevent(void); +void test_bufferevent_watermarks(void); +void test_bufferevent_filters(void); + extern struct testcase_t legacy_testcases[]; extern struct testcase_t evbuffer_testcases[]; +extern struct testcase_t bufferevent_testcases[]; extern struct testcase_t util_testcases[]; extern struct testcase_t signal_testcases[]; extern struct testcase_t http_testcases[]; diff --git a/test/regress_bufferevent.c b/test/regress_bufferevent.c new file mode 100644 index 00000000..afa714c8 --- /dev/null +++ b/test/regress_bufferevent.c @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2003-2007 Niels Provos + * Copyright (c) 2007-2009 Niels Provos and Nick Mathewson + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifdef WIN32 +#include +#include +#endif + +#ifdef HAVE_CONFIG_H +#include "event-config.h" +#endif + +#include +#include +#ifdef _EVENT_HAVE_SYS_TIME_H +#include +#endif +#include +#ifndef WIN32 +#include +#include +#include +#include +#include +#endif +#include +#include +#include +#include +#include +#include +#include + +#include "event2/event.h" +#include "event2/event_struct.h" +#include "event2/event_compat.h" +#include "event2/tag.h" +#include "event2/buffer.h" +#include "event2/bufferevent.h" +#include "event2/bufferevent_compat.h" +#include "event2/bufferevent_struct.h" +#include "event2/util.h" + +#include "bufferevent-internal.h" + +#include "regress.h" + +struct testcase_t bufferevent_testcases[] = { + + LEGACY(bufferevent, TT_ISOLATED), + LEGACY(bufferevent_watermarks, TT_ISOLATED), + LEGACY(bufferevent_filters, TT_ISOLATED), + + END_OF_TESTCASES, +}; diff --git a/test/regress_main.c b/test/regress_main.c index d0c4a288..a6dac356 100644 --- a/test/regress_main.c +++ b/test/regress_main.c @@ -189,6 +189,7 @@ struct testgroup_t testgroups[] = { { "evbuffer/", evbuffer_testcases }, { "signal/", signal_testcases }, { "util/", util_testcases }, + { "bufferevent/", bufferevent_testcases }, { "http/", http_testcases }, { "dns/", dns_testcases }, { "rpc/", rpc_testcases }, diff --git a/test/regress_zlib.c b/test/regress_zlib.c index 0e7d3081..4e4af464 100644 --- a/test/regress_zlib.c +++ b/test/regress_zlib.c @@ -56,21 +56,16 @@ void regress_zlib(void); -static int test_ok; +static int infilter_calls; +static int outfilter_calls; +static int readcb_finished; +static int writecb_finished; +static int errorcb_invoked; /* * Zlib filters */ -static void -zlib_deflate_init(void *ctx) -{ - z_streamp p = ctx; - - memset(p, 0, sizeof(z_stream)); - assert(deflateInit(p, Z_DEFAULT_COMPRESSION) == Z_OK); -} - static void zlib_deflate_free(void *ctx) { @@ -79,15 +74,6 @@ zlib_deflate_free(void *ctx) assert(deflateEnd(p) == Z_OK); } -static void -zlib_inflate_init(void *ctx) -{ - z_streamp p = ctx; - - memset(p, 0, sizeof(z_stream)); - assert(inflateInit(p) == Z_OK); -} - static void zlib_inflate_free(void *ctx) { @@ -96,6 +82,20 @@ zlib_inflate_free(void *ctx) assert(inflateEnd(p) == Z_OK); } +static int +getstate(enum bufferevent_flush_mode state) +{ + switch (state) { + case BEV_FINISHED: + return Z_FINISH; + case BEV_FLUSH: + return Z_SYNC_FLUSH; + case BEV_NORMAL: + default: + return Z_NO_FLUSH; + } +} + /* * The input filter is triggered only on new input read from the network. * That means all input data needs to be consumed or the filter needs to @@ -103,7 +103,7 @@ zlib_inflate_free(void *ctx) */ static enum bufferevent_filter_result zlib_input_filter(struct evbuffer *src, struct evbuffer *dst, - enum bufferevent_filter_state state, void *ctx) + ssize_t lim, enum bufferevent_flush_mode state, void *ctx) { int nread, nwrite; int res; @@ -119,9 +119,7 @@ zlib_input_filter(struct evbuffer *src, struct evbuffer *dst, p->avail_out = 4096; /* we need to flush zlib if we got a flush */ - res = inflate(p, state == BEV_FLUSH ? - Z_FINISH : Z_NO_FLUSH); - assert(res == Z_OK || res == Z_STREAM_END); + res = inflate(p, getstate(state)); /* let's figure out how much was compressed */ nread = evbuffer_get_contiguous_space(src) - p->avail_in; @@ -129,16 +127,27 @@ zlib_input_filter(struct evbuffer *src, struct evbuffer *dst, evbuffer_drain(src, nread); evbuffer_commit_space(dst, nwrite); + + if (res==Z_BUF_ERROR) { + /* We're out of space, or out of decodeable input. + Only if nwrite == 0 assume the latter. + */ + if (nwrite == 0) + return BEV_NEED_MORE; + } else { + assert(res == Z_OK || res == Z_STREAM_END); + } + } while (EVBUFFER_LENGTH(src) > 0); - test_ok++; + ++infilter_calls; return (BEV_OK); } static enum bufferevent_filter_result zlib_output_filter(struct evbuffer *src, struct evbuffer *dst, - enum bufferevent_filter_state state, void *ctx) + ssize_t lim, enum bufferevent_flush_mode state, void *ctx) { int nread, nwrite; int res; @@ -153,9 +162,9 @@ zlib_output_filter(struct evbuffer *src, struct evbuffer *dst, p->next_out = evbuffer_reserve_space(dst, 4096); p->avail_out = 4096; + /* we need to flush zlib if we got a flush */ - res = deflate(p, state == BEV_FLUSH ? Z_FINISH : Z_NO_FLUSH); - assert(res == Z_OK || res == Z_STREAM_END); + res = deflate(p, getstate(state)); /* let's figure out how much was compressed */ nread = evbuffer_get_contiguous_space(src) - p->avail_in; @@ -163,9 +172,20 @@ zlib_output_filter(struct evbuffer *src, struct evbuffer *dst, evbuffer_drain(src, nread); evbuffer_commit_space(dst, nwrite); + + if (res==Z_BUF_ERROR) { + /* We're out of space, or out of decodeable input. + Only if nwrite == 0 assume the latter. + */ + if (nwrite == 0) + return BEV_NEED_MORE; + } else { + assert(res == Z_OK || res == Z_STREAM_END); + } + } while (EVBUFFER_LENGTH(src) > 0); - test_ok++; + ++outfilter_calls; return (BEV_OK); } @@ -186,8 +206,9 @@ readcb(struct bufferevent *bev, void *arg) bufferevent_disable(bev, EV_READ); - if (EVBUFFER_LENGTH(evbuf) == 8333) - test_ok++; + if (EVBUFFER_LENGTH(evbuf) == 8333) { + ++readcb_finished; + } evbuffer_free(evbuf); } @@ -196,26 +217,29 @@ readcb(struct bufferevent *bev, void *arg) static void writecb(struct bufferevent *bev, void *arg) { - if (EVBUFFER_LENGTH(bufferevent_get_output(bev)) == 0) - test_ok++; + if (EVBUFFER_LENGTH(bufferevent_get_output(bev)) == 0) { + ++writecb_finished; + } } static void errorcb(struct bufferevent *bev, short what, void *arg) { - test_ok = -2; + errorcb_invoked = 1; } static void test_bufferevent_zlib(void) { - struct bufferevent *bev1, *bev2; - struct bufferevent_filter *finput, *foutput; + struct bufferevent *bev1, *bev2, *bev1_orig, *bev2_orig; char buffer[8333]; z_stream z_input, z_output; - int i, pair[2]; + int i, pair[2], r; + int test_ok; + + infilter_calls = outfilter_calls = readcb_finished = writecb_finished + = errorcb_invoked = 0; - test_ok = 0; fprintf(stdout, "Testing Zlib Filter: "); if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, pair) == -1) { @@ -226,21 +250,27 @@ test_bufferevent_zlib(void) evutil_make_socket_nonblocking(pair[0]); evutil_make_socket_nonblocking(pair[1]); - bev1 = bufferevent_new(pair[0], readcb, writecb, errorcb, NULL); - bev2 = bufferevent_new(pair[1], readcb, writecb, errorcb, NULL); + bev1_orig = bev1 = bufferevent_socket_new(NULL, pair[0], 0); + bev2_orig = bev2 = bufferevent_socket_new(NULL, pair[1], 0); + + memset(&z_output, 0, sizeof(z_output)); + r = deflateInit(&z_output, Z_DEFAULT_COMPRESSION); + assert(r == Z_OK); + memset(&z_input, 0, sizeof(z_input)); + r = inflateInit(&z_input); /* initialize filters */ - finput = bufferevent_filter_new( - zlib_inflate_init, zlib_inflate_free, - zlib_input_filter, &z_input); - bufferevent_filter_insert(bev2, BEV_INPUT, finput); + bev1 = bufferevent_filter_new(bev1, NULL, zlib_output_filter, 0, + zlib_deflate_free, &z_output); + bev2 = bufferevent_filter_new(bev2, zlib_input_filter, + NULL, 0, zlib_inflate_free, &z_input); + bufferevent_setcb(bev1, readcb, writecb, errorcb, NULL); + bufferevent_setcb(bev2, readcb, writecb, errorcb, NULL); - foutput = bufferevent_filter_new( - zlib_deflate_init, zlib_deflate_free, - zlib_output_filter, &z_output); - bufferevent_filter_insert(bev1, BEV_OUTPUT, foutput); bufferevent_disable(bev1, EV_READ); + bufferevent_enable(bev1, EV_WRITE); + bufferevent_enable(bev2, EV_READ); for (i = 0; i < sizeof(buffer); i++) @@ -251,14 +281,21 @@ test_bufferevent_zlib(void) bufferevent_write(bev1, buffer + 1800, sizeof(buffer) - 1800); /* we are done writing - we need to flush everything */ - bufferevent_trigger_filter(bev1, NULL, BEV_OUTPUT, BEV_FLUSH); + bufferevent_flush(bev1, EV_WRITE, BEV_FINISHED); event_dispatch(); bufferevent_free(bev1); bufferevent_free(bev2); - if (test_ok != 6) { + + test_ok = infilter_calls && + outfilter_calls && + readcb_finished && + writecb_finished && + !errorcb_invoked; + + if (! test_ok) { fprintf(stdout, "FAILED: %d\n", test_ok); exit(1); }