checkpoint work on big bufferevent refactoring

svn:r1095
This commit is contained in:
Nick Mathewson 2009-02-02 19:22:13 +00:00
parent e84c765615
commit ea4b8724c0
18 changed files with 1441 additions and 793 deletions

View File

@ -87,7 +87,8 @@ event-config.h: config.h
-e 's/#ifndef /#ifndef _EVENT_/' < config.h >> $@
echo "#endif" >> $@
CORE_SRC = event.c buffer.c bufferevent.c \
CORE_SRC = event.c buffer.c \
bufferevent.c bufferevent_sock.c bufferevent_filter.c \
evmap.c log.c evutil.c strlcpy.c $(SYS_SRC)
EXTRA_SRC = event_tagging.c http.c evdns.c evrpc.c

View File

@ -33,26 +33,62 @@ extern "C" {
#include "event-config.h"
#include "evutil.h"
struct bufferevent_filter {
/** allows chaining of filters */
TAILQ_ENTRY(bufferevent_filter) (next);
/**
Implementation table for a bufferevent: holds function pointers and other
information to make the various bufferevent types work.
*/
struct bufferevent_ops {
/** The name of the bufferevent's type. */
const char *type;
/** At what offset into the implementation type will we find a
bufferevent structure?
/** used for intermediary state either on the input or output path */
struct evbuffer *buffer;
Example: if the type is implemented as
struct bufferevent_x {
int extra_data;
struct bufferevent bev;
}
then mem_offset should be offsetof(struct bufferevent_x, bev)
*/
off_t mem_offset;
/** initializes the context provided to process */
void (*init_context)(void *);
/** Enables one or more of EV_READ|EV_WRITE on a bufferevent. Does
not need to adjust the 'enabled' field. Returns 0 on success, -1
on failure.
*/
int (*enable)(struct bufferevent *, short);
/** frees any context related to ctx */
void (*free_context)(void *);
/** Disables one or more of EV_READ|EV_WRITE on a bufferevent. Does
not need to adjust the 'enabled' field. Returns 0 on success, -1
on failure.
*/
int (*disable)(struct bufferevent *, short);
enum bufferevent_filter_result (*process)(
struct evbuffer *src, struct evbuffer *dst,
enum bufferevent_filter_state flags, void *ctx);
/** Free any storage and deallocate any extra data or structures used
in this implementation.
*/
void (*destruct)(struct bufferevent *);
void *ctx;
/** Called when the timeouts on the bufferevent have changed.*/
void (*adj_timeouts)(struct bufferevent *);
/** Called to flush data. */
int (*flush)(struct bufferevent *, short, enum bufferevent_flush_mode);
};
extern const struct bufferevent_ops be_ops_socket;
extern const struct bufferevent_ops be_ops_filter;
/** Initialize the shared parts of a bufferevent. */
int bufferevent_init_common(struct bufferevent *, struct event_base *, const struct bufferevent_ops *, enum bufferevent_options options);
/** For internal use: temporarily stop all reads on bufev, because its
* read buffer is too full. */
void bufferevent_wm_suspend_read(struct bufferevent *bufev);
/** For internal use: temporarily stop all reads on bufev, because its
* read buffer is too full. */
void bufferevent_wm_unsuspend_read(struct bufferevent *bufev);
#ifdef __cplusplus
}
#endif

View File

@ -34,7 +34,6 @@
#ifdef _EVENT_HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#include <sys/queue.h>
#include <errno.h>
#include <stdio.h>
@ -53,253 +52,74 @@
#include "event2/buffer.h"
#include "event2/buffer_compat.h"
#include "event2/bufferevent_struct.h"
#include "event2/bufferevent_compat.h"
#include "event2/event.h"
#include "log-internal.h"
#include "mm-internal.h"
#include "bufferevent-internal.h"
#include "util-internal.h"
/* prototypes */
static void bufferevent_read_pressure_cb(
struct evbuffer *, size_t, size_t, void *);
static int bufferevent_process_filters(
struct bufferevent_filter *, struct evbuffer *,
enum bufferevent_filter_state);
static int
bufferevent_add(struct event *ev, int timeout)
void
bufferevent_wm_suspend_read(struct bufferevent *bufev)
{
struct timeval tv, *ptv = NULL;
if (timeout) {
evutil_timerclear(&tv);
tv.tv_sec = timeout;
ptv = &tv;
if (!bufev->read_suspended) {
bufev->be_ops->disable(bufev, EV_READ);
bufev->read_suspended = 1;
}
return (event_add(ev, ptv));
}
/*
* This callback is executed when the size of the input buffer changes.
* We use it to apply back pressure on the reading side.
*/
static void
bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
void *arg) {
struct bufferevent *bufev = arg;
/*
* If we are now below the watermark, then we don't need the callback any
* more, and we can read again.
*/
if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
evbuffer_setcb(buf, NULL, NULL);
void
bufferevent_wm_unsuspend_read(struct bufferevent *bufev)
{
if (bufev->read_suspended) {
if (bufev->enabled & EV_READ)
bufferevent_add(&bufev->ev_read, bufev->timeout_read);
bufev->be_ops->enable(bufev, EV_READ);
bufev->read_suspended = 0;
}
}
/* Callback to implement watermarks on the input buffer. Only enabled
* if the watermark is set. */
static void
bufferevent_read_closure(struct bufferevent *bufev, int progress)
{
size_t len;
/* nothing user visible changed? */
if (!progress)
return;
/* See if this callbacks meets the water marks */
len = EVBUFFER_LENGTH(bufev->input);
if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
return;
/* For read pressure, we use the buffer exposed to the users.
* Filters can arbitrarily change the data that users get to see,
* in particular, a user might select a watermark that is smaller
* then what a filter needs to make progress.
*/
if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
event_del(&bufev->ev_read);
/* Now schedule a callback for us when the buffer changes */
evbuffer_setcb(bufev->input,
bufferevent_read_pressure_cb, bufev);
}
/* Invoke the user callback - must always be called last */
if (bufev->readcb != NULL)
(*bufev->readcb)(bufev, bufev->cbarg);
}
static void
bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
bufferevent_inbuf_wm_cb(struct evbuffer *buf, size_t old, size_t now,
void *arg)
{
struct bufferevent *bufev = arg;
struct evbuffer *input;
int res = 0, progress = 1;
short what = EVBUFFER_READ;
int howmuch = -1;
if (event == EV_TIMEOUT) {
what |= EVBUFFER_TIMEOUT;
goto error;
if (now > old) {
/* Data got added. If it put us over the watermark, stop
* reading. */
if (now >= bufev->wm_read.high)
bufferevent_wm_suspend_read(bufev);
} else {
/* Data got removed. If it puts us under the watermark,
stop reading. */
if (now < bufev->wm_read.high)
bufferevent_wm_unsuspend_read(bufev);
}
if (TAILQ_FIRST(&bufev->input_filters) != NULL)
input = TAILQ_FIRST(&bufev->input_filters)->buffer;
else
input = bufev->input;
/*
* If we have a high watermark configured then we don't want to
* read more data than would make us reach the watermark.
*/
if (bufev->wm_read.high != 0) {
howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(input);
/* we might have lowered the watermark, stop reading */
if (howmuch <= 0) {
event_del(&bufev->ev_read);
evbuffer_setcb(input,
bufferevent_read_pressure_cb, bufev);
return;
}
}
res = evbuffer_read(input, fd, howmuch);
if (res == -1) {
int err = evutil_socket_geterror(fd);
if (EVUTIL_ERR_RW_RETRIABLE(err))
goto reschedule;
/* error case */
what |= EVBUFFER_ERROR;
} else if (res == 0) {
/* eof case */
what |= EVBUFFER_EOF;
}
if (TAILQ_FIRST(&bufev->input_filters) != NULL) {
int state = BEV_NORMAL;
if (what & EVBUFFER_EOF)
state = BEV_FLUSH;
/* XXX(niels): what to do about EVBUFFER_ERROR? */
progress = bufferevent_process_filters(
TAILQ_FIRST(&bufev->input_filters),
bufev->input,
state);
/* propagate potential errors to the user */
if (progress == -1) {
res = -1;
what |= EVBUFFER_ERROR;
}
}
if (res <= 0)
goto error;
bufferevent_read_closure(bufev, progress);
return;
reschedule:
return;
error:
event_del(&bufev->ev_read);
(*bufev->errorcb)(bufev, what, bufev->cbarg);
}
static void
bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
int
bufferevent_init_common(struct bufferevent *bufev, struct event_base *base,
const struct bufferevent_ops *ops,
enum bufferevent_options options)
{
struct bufferevent *bufev = arg;
int res = 0;
short what = EVBUFFER_WRITE;
if (event == EV_TIMEOUT) {
what |= EVBUFFER_TIMEOUT;
goto error;
}
if (EVBUFFER_LENGTH(bufev->output)) {
res = evbuffer_write(bufev->output, fd);
if (res == -1) {
int err = evutil_socket_geterror(fd);
/* XXXX we used to check for EINPROGRESS here too, but I
don't think write can set that. -nick */
if (EVUTIL_ERR_RW_RETRIABLE(err))
goto reschedule;
what |= EVBUFFER_ERROR;
} else if (res == 0) {
/* eof case */
what |= EVBUFFER_EOF;
}
if (res <= 0)
goto error;
}
if (EVBUFFER_LENGTH(bufev->output) == 0)
event_del(&bufev->ev_write);
/*
* Invoke the user callback if our buffer is drained or below the
* low watermark.
*/
if (bufev->writecb != NULL &&
EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
(*bufev->writecb)(bufev, bufev->cbarg);
return;
reschedule:
if (EVBUFFER_LENGTH(bufev->output) == 0)
event_del(&bufev->ev_write);
return;
error:
event_del(&bufev->ev_write);
(*bufev->errorcb)(bufev, what, bufev->cbarg);
}
/*
* Create a new buffered event object.
*
* The read callback is invoked whenever we read new data.
* The write callback is invoked whenever the output buffer is drained.
* The error callback is invoked on a write/read error or on EOF.
*
* Both read and write callbacks maybe NULL. The error callback is not
* allowed to be NULL and have to be provided always.
*/
struct bufferevent *
bufferevent_new(evutil_socket_t fd, evbuffercb readcb, evbuffercb writecb,
everrorcb errorcb, void *cbarg)
{
struct bufferevent *bufev;
if ((bufev = mm_calloc(1, sizeof(struct bufferevent))) == NULL)
return (NULL);
if ((bufev->input = evbuffer_new()) == NULL) {
mm_free(bufev);
return (NULL);
}
if ((bufev->input = evbuffer_new()) == NULL)
return -1;
if ((bufev->output = evbuffer_new()) == NULL) {
evbuffer_free(bufev->input);
mm_free(bufev);
return (NULL);
return -1;
}
event_set(&bufev->ev_read, fd, EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
event_set(&bufev->ev_write, fd, EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
bufev->ev_base = base;
bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
/* Disable timeouts. */
evutil_timerclear(&bufev->timeout_read);
evutil_timerclear(&bufev->timeout_write);
bufev->be_ops = ops;
/*
* Set to EV_WRITE so that using bufferevent_write is going to
@ -308,10 +128,9 @@ bufferevent_new(evutil_socket_t fd, evbuffercb readcb, evbuffercb writecb,
*/
bufev->enabled = EV_WRITE;
TAILQ_INIT(&bufev->input_filters);
TAILQ_INIT(&bufev->output_filters);
bufev->options = options;
return (bufev);
return 0;
}
void
@ -325,114 +144,16 @@ bufferevent_setcb(struct bufferevent *bufev,
bufev->cbarg = cbarg;
}
void
bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd)
{
struct bufferevent_filter *filter;
event_del(&bufev->ev_read);
event_del(&bufev->ev_write);
event_assign(&bufev->ev_read, bufev->ev_base, fd, EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
event_assign(&bufev->ev_write, bufev->ev_base, fd, EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
/* we need to free all filter contexts and then init them again */
TAILQ_FOREACH(filter, &bufev->input_filters, next) {
if (filter->free_context)
filter->free_context(filter->ctx);
if (filter->init_context)
filter->init_context(filter->ctx);
}
TAILQ_FOREACH(filter, &bufev->output_filters, next) {
if (filter->free_context)
filter->free_context(filter->ctx);
if (filter->init_context)
filter->init_context(filter->ctx);
}
/* might have to manually trigger event registration */
}
struct evbuffer *
bufferevent_get_input(struct bufferevent *bufev)
{
return (bufev->input);
return bufev->input;
}
struct evbuffer *
bufferevent_get_output(struct bufferevent *bufev)
{
return TAILQ_FIRST(&bufev->output_filters) != NULL ?
TAILQ_FIRST(&bufev->output_filters)->buffer :
bufev->output;
}
int
bufferevent_priority_set(struct bufferevent *bufev, int priority)
{
if (event_priority_set(&bufev->ev_read, priority) == -1)
return (-1);
if (event_priority_set(&bufev->ev_write, priority) == -1)
return (-1);
return (0);
}
/* Closing the file descriptor is the responsibility of the caller */
void
bufferevent_free(struct bufferevent *bufev)
{
struct bufferevent_filter *filter;
event_del(&bufev->ev_read);
event_del(&bufev->ev_write);
evbuffer_free(bufev->input);
evbuffer_free(bufev->output);
/* free input and output filters */
while ((filter = TAILQ_FIRST(&bufev->input_filters)) != NULL) {
bufferevent_filter_remove(bufev, BEV_INPUT, filter);
bufferevent_filter_free(filter);
}
while ((filter = TAILQ_FIRST(&bufev->output_filters)) != NULL) {
bufferevent_filter_remove(bufev, BEV_OUTPUT, filter);
bufferevent_filter_free(filter);
}
mm_free(bufev);
}
/*
* Executes filters on the written data and schedules a network write if
* necessary.
*/
static inline int
bufferevent_write_closure(struct bufferevent *bufev, int progress)
{
/* if no data was written, we do not need to do anything */
if (!progress)
return (0);
if (TAILQ_FIRST(&bufev->output_filters) != NULL) {
progress = bufferevent_process_filters(
TAILQ_FIRST(&bufev->output_filters),
bufev->output, BEV_NORMAL);
if (progress == -1) {
(*bufev->errorcb)(bufev, EVBUFFER_ERROR, bufev->cbarg);
return (-1);
}
}
/* If everything is okay, we need to schedule a write */
if (bufev->enabled & EV_WRITE)
bufferevent_add(&bufev->ev_write, bufev->timeout_write);
return (0);
return bufev->output;
}
/*
@ -443,34 +164,19 @@ bufferevent_write_closure(struct bufferevent *bufev, int progress)
int
bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
{
struct evbuffer *output;
if (TAILQ_FIRST(&bufev->output_filters) != NULL)
output = TAILQ_FIRST(&bufev->output_filters)->buffer;
else
output = bufev->output;
if (evbuffer_add(output, data, size) == -1)
if (evbuffer_add(bufev->output, data, size) == -1)
return (-1);
return (bufferevent_write_closure(bufev, size > 0));
return 0;
}
int
bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
{
int len = EVBUFFER_LENGTH(buf);
struct evbuffer *output;
if (TAILQ_FIRST(&bufev->output_filters) != NULL)
output = TAILQ_FIRST(&bufev->output_filters)->buffer;
else
output = bufev->output;
if (evbuffer_add_buffer(output, buf) == -1)
if (evbuffer_add_buffer(bufev->output, buf) == -1)
return (-1);
return (bufferevent_write_closure(bufev, len > 0));
return 0;
}
size_t
@ -488,51 +194,74 @@ bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf)
int
bufferevent_enable(struct bufferevent *bufev, short event)
{
if (event & EV_READ) {
if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
return (-1);
}
if (event & EV_WRITE) {
if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
return (-1);
}
short impl_events = event;
if (bufev->read_suspended)
impl_events &= ~EV_READ;
bufev->enabled |= event;
if (bufev->be_ops->enable(bufev, impl_events) < 0)
return -1;
return (0);
}
void
bufferevent_set_timeouts(struct bufferevent *bufev,
const struct timeval *tv_read,
const struct timeval *tv_write)
{
if (tv_read) {
bufev->timeout_read = *tv_read;
} else {
evutil_timerclear(&bufev->timeout_read);
}
if (tv_write) {
bufev->timeout_write = *tv_write;
} else {
evutil_timerclear(&bufev->timeout_write);
}
if (bufev->be_ops->adj_timeouts)
bufev->be_ops->adj_timeouts(bufev);
}
/* Obsolete; use bufferevent_set_timeouts */
void
bufferevent_settimeout(struct bufferevent *bufev,
int timeout_read, int timeout_write)
{
struct timeval tv_read, tv_write;
struct timeval *ptv_read = NULL, *ptv_write = NULL;
memset(&tv_read, 0, sizeof(tv_read));
memset(&tv_write, 0, sizeof(tv_write));
if (timeout_read) {
tv_read.tv_sec = timeout_read;
ptv_read = &tv_read;
}
if (timeout_write) {
tv_write.tv_sec = timeout_write;
ptv_write = &tv_write;
}
bufferevent_set_timeouts(bufev, ptv_read, ptv_write);
}
int
bufferevent_disable(struct bufferevent *bufev, short event)
{
if (event & EV_READ) {
if (event_del(&bufev->ev_read) == -1)
return (-1);
}
if (event & EV_WRITE) {
if (event_del(&bufev->ev_write) == -1)
return (-1);
}
bufev->enabled &= ~event;
if (bufev->be_ops->disable(bufev, event) < 0)
return (-1);
return (0);
}
/*
* Sets the read and write timeout for a buffered event.
*/
void
bufferevent_settimeout(struct bufferevent *bufev,
int timeout_read, int timeout_write) {
bufev->timeout_read = timeout_read;
bufev->timeout_write = timeout_write;
if (event_pending(&bufev->ev_read, EV_READ, NULL))
bufferevent_add(&bufev->ev_read, timeout_read);
if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
bufferevent_add(&bufev->ev_write, timeout_write);
}
/*
* Sets the water marks
*/
@ -541,185 +270,69 @@ void
bufferevent_setwatermark(struct bufferevent *bufev, short events,
size_t lowmark, size_t highmark)
{
if (events & EV_READ) {
bufev->wm_read.low = lowmark;
bufev->wm_read.high = highmark;
}
if (events & EV_WRITE) {
bufev->wm_write.low = lowmark;
bufev->wm_write.high = highmark;
}
/* If the watermarks changed then see if we should call read again */
bufferevent_read_pressure_cb(bufev->input,
0, EVBUFFER_LENGTH(bufev->input), bufev);
if (events & EV_READ) {
bufev->wm_read.low = lowmark;
bufev->wm_read.high = highmark;
if (highmark) {
/* There is now a new high-water mark for read.
enable the callback if needed, and see if we should
suspend/bufferevent_wm_unsuspend. */
if (bufev->read_watermarks_cb == NULL) {
bufev->read_watermarks_cb =
evbuffer_add_cb(bufev->input,
bufferevent_inbuf_wm_cb,
bufev);
}
evbuffer_cb_set_flags(bufev->input,
bufev->read_watermarks_cb,
EVBUFFER_CB_ENABLED);
if (EVBUFFER_LENGTH(bufev->input) > highmark)
bufferevent_wm_suspend_read(bufev);
else if (EVBUFFER_LENGTH(bufev->input) < highmark)
bufferevent_wm_unsuspend_read(bufev);
} else {
/* There is now no high-water mark for read. */
if (bufev->read_watermarks_cb)
evbuffer_cb_set_flags(bufev->input,
bufev->read_watermarks_cb,
EVBUFFER_CB_DISABLED);
bufferevent_wm_unsuspend_read(bufev);
}
}
}
int
bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
bufferevent_flush(struct bufferevent *bufev,
short iotype,
enum bufferevent_flush_mode mode)
{
int res;
bufev->ev_base = base;
res = event_base_set(base, &bufev->ev_read);
if (res == -1)
return (res);
res = event_base_set(base, &bufev->ev_write);
return (res);
}
/*
* Filtering stuff
*/
struct bufferevent_filter *
bufferevent_filter_new(
void (*init_context)(void *ctx),
void (*free_context)(void *ctx),
enum bufferevent_filter_result (*process)(
struct evbuffer *src, struct evbuffer *dst,
enum bufferevent_filter_state flags, void *ctx), void *ctx)
{
struct bufferevent_filter *filter;
if ((filter = mm_malloc(sizeof(struct bufferevent_filter))) == NULL)
return (NULL);
if ((filter->buffer = evbuffer_new()) == NULL) {
mm_free(filter);
return (NULL);
}
filter->init_context = init_context;
filter->free_context = free_context;
filter->process = process;
filter->ctx = ctx;
return (filter);
if (bufev->be_ops->flush)
return bufev->be_ops->flush(bufev, iotype, mode);
else
return -1;
}
void
bufferevent_filter_free(struct bufferevent_filter *filter)
bufferevent_free(struct bufferevent *bufev)
{
evbuffer_free(filter->buffer);
mm_free(filter);
/* Clean up the shared info */
if (bufev->be_ops->destruct)
bufev->be_ops->destruct(bufev);
/* evbuffer will free the callbacks */
evbuffer_free(bufev->input);
evbuffer_free(bufev->output);
/* Free the actual allocated memory. */
mm_free(bufev - bufev->be_ops->mem_offset);
}
void
bufferevent_filter_insert(struct bufferevent *bufev,
enum bufferevent_filter_type filter_type,
struct bufferevent_filter *filter)
{
switch (filter_type) {
case BEV_INPUT:
TAILQ_INSERT_TAIL(&bufev->input_filters, filter, next);
break;
case BEV_OUTPUT:
TAILQ_INSERT_HEAD(&bufev->output_filters, filter, next);
break;
default:
event_errx(1, "illegal filter type %d", filter_type);
}
if (filter->init_context)
filter->init_context(filter->ctx);
}
void
bufferevent_filter_remove(struct bufferevent *bufev,
enum bufferevent_filter_type filter_type,
struct bufferevent_filter *filter)
{
switch (filter_type) {
case BEV_INPUT:
TAILQ_REMOVE(&bufev->input_filters, filter, next);
break;
case BEV_OUTPUT:
TAILQ_REMOVE(&bufev->output_filters, filter, next);
break;
default:
event_errx(1, "illegal filter type %d", filter_type);
}
evbuffer_drain(filter->buffer, -1);
if (filter->free_context)
filter->free_context(filter->ctx);
}
static int
bufferevent_process_filters(
struct bufferevent_filter *filter, struct evbuffer *final,
enum bufferevent_filter_state state)
{
struct evbuffer *src, *dst;
struct bufferevent_filter *next;
int len = EVBUFFER_LENGTH(final);
for (; filter != NULL; filter = next) {
int res;
next = TAILQ_NEXT(filter, next);
src = filter->buffer;
dst = next != NULL ? next->buffer : final;
res = (*filter->process)(src, dst, state, filter->ctx);
/* an error causes complete termination of the bufferevent */
if (res == BEV_ERROR)
return (-1);
/* a read filter indicated that it cannot produce
* further data, we do not need to invoke any
* subsequent filters. Unless, a flush or something
* similar was specifically requested.
*/
if (res == BEV_NEED_MORE && state == BEV_NORMAL)
return (0);
}
/* we made user visible progress if the buffer size changed */
return (EVBUFFER_LENGTH(final) != len);
}
int
bufferevent_trigger_filter(struct bufferevent *bufev,
struct bufferevent_filter *filter, int iotype,
enum bufferevent_filter_state state)
{
struct evbuffer *dst = iotype == BEV_INPUT ?
bufev->input : bufev->output;
int progress;
/* trigger all filters if filter is not specified */
if (filter == NULL) {
struct bufferevent_filterq *head = BEV_INPUT ?
&bufev->input_filters : &bufev->output_filters;
filter = TAILQ_FIRST(head);
}
progress = bufferevent_process_filters(filter, dst, state);
if (progress == -1) {
(*bufev->errorcb)(bufev, EVBUFFER_ERROR, bufev->cbarg);
return (-1);
}
switch (iotype) {
case BEV_INPUT:
bufferevent_read_closure(bufev, progress);
break;
case BEV_OUTPUT:
if (progress && (bufev->enabled & EV_WRITE))
bufferevent_add(
&bufev->ev_write, bufev->timeout_write);
break;
default:
event_errx(1, "Illegal bufferevent iotype: %d", iotype);
}
return (0);
}

435
bufferevent_filter.c Normal file
View File

@ -0,0 +1,435 @@
/*
* Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <sys/types.h>
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#ifdef HAVE_STDARG_H
#include <stdarg.h>
#endif
#ifdef WIN32
#include <winsock2.h>
#endif
#include "event2/util.h"
#include "event2/bufferevent.h"
#include "event2/buffer.h"
#include "event2/buffer_compat.h"
#include "event2/bufferevent_struct.h"
#include "event2/bufferevent_compat.h"
#include "event2/event.h"
#include "log-internal.h"
#include "mm-internal.h"
#include "bufferevent-internal.h"
#include "util-internal.h"
/* prototypes */
static int be_filter_enable(struct bufferevent *, short);
static int be_filter_disable(struct bufferevent *, short);
static void be_filter_destruct(struct bufferevent *);
static void be_filter_adj_timeouts(struct bufferevent *);
static void be_filter_readcb(struct bufferevent *, void *);
static void be_filter_writecb(struct bufferevent *, void *);
static void be_filter_errorcb(struct bufferevent *, short, void *);
static int be_filter_flush(struct bufferevent *bufev,
short iotype, enum bufferevent_flush_mode mode);
static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
size_t old, size_t now, void *arg);
struct bufferevent_filtered {
struct bufferevent bev;
/** The bufferevent that we read/write filterd data from/to. */
struct bufferevent *underlying;
/** A callback on our outbuf to notice when somebody adds data */
struct evbuffer_cb_entry *outbuf_cb;
/** True iff we have received an EOF callback from the underlying
* bufferevent. */
unsigned got_eof;
/** Function to free context when we're done. */
void (*free_context)(void *);
/** Input filter */
bufferevent_filter_cb process_in;
/** Output filter */
bufferevent_filter_cb process_out;
/** User-supplied argument to the filters. */
void *context;
};
struct bufferevent_ops bufferevent_ops_filter = {
"filter",
evutil_offsetof(struct bufferevent_filtered, bev),
be_filter_enable,
be_filter_disable,
be_filter_destruct,
be_filter_adj_timeouts,
be_filter_flush,
};
/* Given a bufferevent that's really the bev filter of a bufferevent_filtered,
* return that bufferevent_filtered. Returns NULL otherwise.*/
static inline struct bufferevent_filtered *
upcast(struct bufferevent *bev)
{
struct bufferevent_filtered *bev_f;
if (bev->be_ops != &bufferevent_ops_filter)
return NULL;
bev_f = (void*)( ((char*)bev) -
evutil_offsetof(struct bufferevent_filtered, bev) );
assert(bev_f->bev.be_ops == &bufferevent_ops_filter);
return bev_f;
}
#define downcast(bev_f) (&(bev_f)->bev)
/** Return 1 iff bevf's underlying bufferevent's output buffer is at or
* over its high watermark such that we should not write to it in a given
* flush mode. */
static int
be_underlying_writebuf_full(struct bufferevent_filtered *bevf,
enum bufferevent_flush_mode state)
{
struct bufferevent *u = bevf->underlying;
return state == BEV_NORMAL &&
u->wm_write.high &&
EVBUFFER_LENGTH(u->output) >= u->wm_write.high;
}
/** Return 1 if our input buffer is at or over its high watermark such that we
* should not write to it in a given flush mode. */
static int
be_readbuf_full(struct bufferevent_filtered *bevf,
enum bufferevent_flush_mode state)
{
struct bufferevent *bufev = downcast(bevf);
return state == BEV_NORMAL &&
bufev->wm_read.high &&
EVBUFFER_LENGTH(bufev->input) >= bufev->wm_read.high;
}
/* Filter to use when we're created with a NULL filter. */
static enum bufferevent_filter_result
be_null_filter(struct evbuffer *src, struct evbuffer *dst, ssize_t lim,
enum bufferevent_flush_mode state, void *ctx)
{
/* XXX respect lim. */
(void)state;
if (evbuffer_add_buffer(dst, src) == 0)
return BEV_OK;
else
return BEV_ERROR;
}
struct bufferevent *
bufferevent_filter_new(struct bufferevent *underlying,
bufferevent_filter_cb input_filter,
bufferevent_filter_cb output_filter,
enum bufferevent_options options,
void (*free_context)(void *),
void *ctx)
{
struct bufferevent_filtered *bufev_f;
if (!input_filter)
input_filter = be_null_filter;
if (!output_filter)
output_filter = be_null_filter;
bufev_f = mm_calloc(1, sizeof(struct bufferevent_filtered));
if (!bufev_f)
return NULL;
if (bufferevent_init_common(&bufev_f->bev, underlying->ev_base,
&bufferevent_ops_filter, options) < 0) {
mm_free(bufev_f);
return NULL;
}
bufev_f->underlying = underlying;
bufev_f->process_in = input_filter;
bufev_f->process_out = output_filter;
bufev_f->free_context = free_context;
bufev_f->context = ctx;
bufferevent_setcb(bufev_f->underlying,
be_filter_readcb, be_filter_writecb, be_filter_errorcb, bufev_f);
bufev_f->outbuf_cb = evbuffer_add_cb(bufev_f->bev.output,
bufferevent_filtered_outbuf_cb, bufev_f);
return &bufev_f->bev;
}
static void
be_filter_destruct(struct bufferevent *bev)
{
struct bufferevent_filtered *bevf = upcast(bev);
assert(bevf);
if (bevf->free_context)
bevf->free_context(bevf->context);
if (bev->options & BEV_OPT_CLOSE_ON_FREE)
bufferevent_free(bevf->underlying);
}
static int
be_filter_enable(struct bufferevent *bev, short event)
{
struct bufferevent_filtered *bevf = upcast(bev);
return bufferevent_enable(bevf->underlying, event);
}
static int
be_filter_disable(struct bufferevent *bev, short event)
{
struct bufferevent_filtered *bevf = upcast(bev);
return bufferevent_disable(bevf->underlying, event);
}
static void
be_filter_adj_timeouts(struct bufferevent *bev)
{
struct bufferevent_filtered *bevf = upcast(bev);
struct timeval *r = NULL, *w = NULL;
if (bev->timeout_read.tv_sec >= 0)
r = &bev->timeout_read;
if (bev->timeout_write.tv_sec >= 0)
w = &bev->timeout_write;
bufferevent_set_timeouts(bevf->underlying, r, w);
}
static enum bufferevent_filter_result
be_filter_process_input(struct bufferevent_filtered *bevf,
enum bufferevent_flush_mode state,
int *processed_out)
{
enum bufferevent_filter_result res;
if (state == BEV_NORMAL) {
/* If we're in 'normal' mode, don't urge data on the filter
* unless we're reading data and under our high-water mark.*/
if (!(bevf->bev.enabled & EV_READ) ||
be_readbuf_full(bevf, state))
return BEV_OK;
}
do {
ssize_t limit = -1;
if (state == BEV_NORMAL && bevf->bev.wm_read.high)
limit = bevf->bev.wm_read.high -
EVBUFFER_LENGTH(bevf->bev.input);
res = bevf->process_in(bevf->underlying->input,
bevf->bev.input, limit, state, bevf->context);
if (res == BEV_OK)
*processed_out = 1;
} while (res == BEV_OK &&
(bevf->bev.enabled & EV_READ) &&
EVBUFFER_LENGTH(bevf->underlying->input) &&
!be_readbuf_full(bevf, state));
return res;
}
static enum bufferevent_filter_result
be_filter_process_output(struct bufferevent_filtered *bevf,
enum bufferevent_flush_mode state,
int *processed_out)
{
enum bufferevent_filter_result res = BEV_OK;
struct bufferevent *bufev = downcast(bevf);
int again = 0;
if (state == BEV_NORMAL) {
/* If we're in 'normal' mode, don't urge data on the
* filter unless we're writing data, and the underlying
* bufferevent is accepting data, and we have data to
* give the filter. If we're in 'flush' or 'finish',
* call the filter no matter what. */
if (!(bufev->enabled & EV_WRITE) ||
be_underlying_writebuf_full(bevf, state) ||
!EVBUFFER_LENGTH(bufev->output))
return BEV_OK;
}
/* disable the callback that calls this function
when the user adds to the output buffer. */
evbuffer_cb_set_flags(bufev->output, bevf->outbuf_cb, 0);
do {
int processed = 0;
again = 0;
do {
ssize_t limit = -1;
if (state == BEV_NORMAL &&
bevf->underlying->wm_write.high)
limit = bevf->underlying->wm_write.high -
EVBUFFER_LENGTH(bevf->underlying->output);
res = bevf->process_out(bevf->bev.output,
bevf->underlying->output,
limit,
state,
bevf->context);
if (res == BEV_OK)
processed = *processed_out = 1;
} while (/* Stop if the filter wasn't successful...*/
res == BEV_OK &&
/* Or if we aren't writing any more. */
(bufev->enabled & EV_WRITE) &&
/* Of if we have nothing more to write and we are
* not flushing. */
EVBUFFER_LENGTH(bufev->output) &&
/* Or if we have filled the underlying output buffer. */
!be_underlying_writebuf_full(bevf,state));
if (processed && bufev->writecb &&
EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low) {
/* call the write callback.*/
(*bufev->writecb)(bufev, bufev->cbarg);
if (res == BEV_OK &&
(bufev->enabled & EV_WRITE) &&
EVBUFFER_LENGTH(bufev->output) &&
!be_underlying_writebuf_full(bevf, state)) {
again = 1;
}
}
} while (again);
/* reenable the outbuf_cb */
evbuffer_cb_set_flags(bufev->output,bevf->outbuf_cb,
EVBUFFER_CB_ENABLED);
return res;
}
/* Called when the size of our outbuf changes. */
static void
bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
size_t old, size_t now, void *arg)
{
struct bufferevent_filtered *bevf = arg;
if (now > old) {
int processed_any = 0;
/* Somebody added more data to the output buffer. Try to
* process it, if we should. */
be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
}
}
/* Called when the underlying socket has read. */
static void
be_filter_readcb(struct bufferevent *underlying, void *_me)
{
struct bufferevent_filtered *bevf = _me;
enum bufferevent_filter_result res;
enum bufferevent_flush_mode state;
struct bufferevent *bufev = downcast(bevf);
int processed_any = 0;
if (bevf->got_eof)
state = BEV_FINISHED;
else
state = BEV_NORMAL;
res = be_filter_process_input(bevf, state, &processed_any);
if (processed_any &&
EVBUFFER_LENGTH(bufev->input) >= bufev->wm_read.low &&
bufev->readcb != NULL)
(*bufev->readcb)(bufev, bufev->cbarg);
}
/* Called when the underlying socket has drained enough that we can write to
it. */
static void
be_filter_writecb(struct bufferevent *underlying, void *_me)
{
struct bufferevent_filtered *bevf = _me;
int processed_any = 0;
be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
}
/* Called when the underlying socket has given us an error */
static void
be_filter_errorcb(struct bufferevent *underlying, short what, void *_me)
{
struct bufferevent_filtered *bevf = _me;
/* All we can really to is tell our own errorcb. */
if (bevf->bev.errorcb)
bevf->bev.errorcb(&bevf->bev, what, bevf->bev.cbarg);
}
static int
be_filter_flush(struct bufferevent *bufev,
short iotype, enum bufferevent_flush_mode mode)
{
struct bufferevent_filtered *bevf = upcast(bufev);
int processed_any = 0;
assert(bevf);
if (iotype & EV_READ) {
be_filter_process_input(bevf, mode, &processed_any);
}
if (iotype & EV_WRITE) {
be_filter_process_output(bevf, mode, &processed_any);
}
/* XXX check the return value? */
/* XXX does this want to recursively call lower-level flushes? */
bufferevent_flush(bevf->underlying, iotype, mode);
return processed_any;
}

375
bufferevent_sock.c Normal file
View File

@ -0,0 +1,375 @@
/*
* Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <sys/types.h>
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#ifdef HAVE_STDARG_H
#include <stdarg.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifdef WIN32
#include <winsock2.h>
#endif
#include "event2/util.h"
#include "event2/bufferevent.h"
#include "event2/buffer.h"
#include "event2/buffer_compat.h"
#include "event2/bufferevent_struct.h"
#include "event2/bufferevent_compat.h"
#include "event2/event.h"
#include "log-internal.h"
#include "mm-internal.h"
#include "bufferevent-internal.h"
#include "util-internal.h"
/* prototypes */
static int be_socket_enable(struct bufferevent *, short);
static int be_socket_disable(struct bufferevent *, short);
static void be_socket_destruct(struct bufferevent *);
static void be_socket_adj_timeouts(struct bufferevent *);
static int be_socket_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
struct bufferevent_ops bufferevent_ops_socket = {
"socket",
0,
be_socket_enable,
be_socket_disable,
be_socket_destruct,
be_socket_adj_timeouts,
be_socket_flush,
};
static int
be_socket_add(struct event *ev, const struct timeval *tv)
{
if (tv->tv_sec == 0 && tv->tv_usec == 0)
return event_add(ev, NULL);
else
return event_add(ev, tv);
}
static void
bufferevent_socket_outbuf_cb(struct evbuffer *buf,
size_t old, size_t now, void *arg)
{
struct bufferevent *bufev = arg;
if (now > old &&
(bufev->enabled & EV_WRITE) &&
!event_pending(&bufev->ev_write, EV_WRITE, NULL)) {
/* Somebody added data to the buffer, and we would like to
* write, and we were not writing. So, start writing. */
be_socket_add(&bufev->ev_write, &bufev->timeout_write);
}
}
static void
bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
{
struct bufferevent *bufev = arg;
struct evbuffer *input;
int res = 0;
short what = EVBUFFER_READ;
int howmuch = -1;
if (event == EV_TIMEOUT) {
what |= EVBUFFER_TIMEOUT;
goto error;
}
input = bufev->input;
/*
* If we have a high watermark configured then we don't want to
* read more data than would make us reach the watermark.
*/
if (bufev->wm_read.high != 0) {
howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(input);
/* we somehow lowered the watermark, stop reading */
if (howmuch <= 0) {
bufferevent_wm_suspend_read(bufev);
return;
}
}
res = evbuffer_read(input, fd, howmuch);
if (res == -1) {
int err = evutil_socket_geterror(fd);
if (EVUTIL_ERR_RW_RETRIABLE(err))
goto reschedule;
/* error case */
what |= EVBUFFER_ERROR;
} else if (res == 0) {
/* eof case */
what |= EVBUFFER_EOF;
}
if (res <= 0)
goto error;
/* Invoke the user callback - must always be called last */
if (EVBUFFER_LENGTH(input) >= bufev->wm_read.low &&
bufev->readcb != NULL)
(*bufev->readcb)(bufev, bufev->cbarg);
return;
reschedule:
return;
error:
event_del(&bufev->ev_read);
(*bufev->errorcb)(bufev, what, bufev->cbarg);
}
static void
bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
{
struct bufferevent *bufev = arg;
int res = 0;
short what = EVBUFFER_WRITE;
if (event == EV_TIMEOUT) {
what |= EVBUFFER_TIMEOUT;
goto error;
}
if (EVBUFFER_LENGTH(bufev->output)) {
res = evbuffer_write(bufev->output, fd);
if (res == -1) {
int err = evutil_socket_geterror(fd);
if (EVUTIL_ERR_RW_RETRIABLE(err))
goto reschedule;
what |= EVBUFFER_ERROR;
} else if (res == 0) {
/* eof case */
what |= EVBUFFER_EOF;
}
if (res <= 0)
goto error;
}
if (EVBUFFER_LENGTH(bufev->output) == 0)
event_del(&bufev->ev_write);
/*
* Invoke the user callback if our buffer is drained or below the
* low watermark.
*/
if (bufev->writecb != NULL &&
EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
(*bufev->writecb)(bufev, bufev->cbarg);
return;
reschedule:
if (EVBUFFER_LENGTH(bufev->output) == 0)
event_del(&bufev->ev_write);
return;
error:
event_del(&bufev->ev_write);
(*bufev->errorcb)(bufev, what, bufev->cbarg);
}
struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
enum bufferevent_options options)
{
struct bufferevent *bufev;
if ((bufev = mm_calloc(1, sizeof(struct bufferevent))) == NULL)
return NULL;
if (bufferevent_init_common(bufev, base, &bufferevent_ops_socket,
options) < 0) {
mm_free(bufev);
return NULL;
}
event_assign(&bufev->ev_read, bufev->ev_base, fd,
EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
event_assign(&bufev->ev_write, bufev->ev_base, fd,
EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
return bufev;
}
/*
* Create a new buffered event object.
*
* The read callback is invoked whenever we read new data.
* The write callback is invoked whenever the output buffer is drained.
* The error callback is invoked on a write/read error or on EOF.
*
* Both read and write callbacks maybe NULL. The error callback is not
* allowed to be NULL and have to be provided always.
*/
struct bufferevent *
bufferevent_new(evutil_socket_t fd, evbuffercb readcb, evbuffercb writecb,
everrorcb errorcb, void *cbarg)
{
struct bufferevent *bufev;
if (!(bufev = bufferevent_socket_new(NULL, fd, 0)))
return NULL;
bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
return bufev;
}
static int
be_socket_enable(struct bufferevent *bufev, short event)
{
if (event & EV_READ) {
if (be_socket_add(&bufev->ev_read,&bufev->timeout_read) == -1)
return -1;
}
if (event & EV_WRITE) {
if (be_socket_add(&bufev->ev_write,&bufev->timeout_write) == -1)
return -1;
}
return 0;
}
static int
be_socket_disable(struct bufferevent *bufev, short event)
{
if (event & EV_READ) {
if (event_del(&bufev->ev_read) == -1)
return -1;
}
if (event & EV_WRITE) {
if (event_del(&bufev->ev_write) == -1)
return -1;
}
return 0;
}
static void
be_socket_destruct(struct bufferevent *bufev)
{
evutil_socket_t fd;
assert(bufev->be_ops == &bufferevent_ops_socket);
fd = event_get_fd(&bufev->ev_read);
event_del(&bufev->ev_read);
event_del(&bufev->ev_write);
if (bufev->options & BEV_OPT_CLOSE_ON_FREE)
EVUTIL_CLOSESOCKET(fd);
}
static void
be_socket_adj_timeouts(struct bufferevent *bufev)
{
if (event_pending(&bufev->ev_read, EV_READ, NULL))
be_socket_add(&bufev->ev_read, &bufev->timeout_read);
if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
be_socket_add(&bufev->ev_write, &bufev->timeout_write);
}
static int
be_socket_flush(struct bufferevent *bev, short iotype,
enum bufferevent_flush_mode mode)
{
return 0;
}
void
bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd)
{
assert(bufev->be_ops == &bufferevent_ops_socket);
event_del(&bufev->ev_read);
event_del(&bufev->ev_write);
event_assign(&bufev->ev_read, bufev->ev_base, fd,
EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
event_assign(&bufev->ev_write, bufev->ev_base, fd,
EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
}
/* XXXX Should non-socket buffferevents support this? */
int
bufferevent_priority_set(struct bufferevent *bufev, int priority)
{
if (bufev->be_ops != &bufferevent_ops_socket)
return -1;
if (event_priority_set(&bufev->ev_read, priority) == -1)
return (-1);
if (event_priority_set(&bufev->ev_write, priority) == -1)
return (-1);
return (0);
}
/* XXXX Should non-socket buffferevents support this? */
int
bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
{
int res;
if (bufev->be_ops != &bufferevent_ops_socket)
return -1;
bufev->ev_base = base;
res = event_base_set(base, &bufev->ev_read);
if (res == -1)
return (res);
res = event_base_set(base, &bufev->ev_write);
return (res);
}

View File

@ -192,6 +192,7 @@ typedef unsigned short u_short;
#include <event2/buffer_compat.h>
#include <event2/bufferevent.h>
#include <event2/bufferevent_struct.h>
#include <event2/bufferevent_compat.h>
#include <event2/tag.h>
#ifdef __cplusplus

1
http.c
View File

@ -88,6 +88,7 @@
#include "event2/event.h"
#include "event2/buffer.h"
#include "event2/bufferevent.h"
#include "event2/bufferevent_compat.h"
#include "event2/http_struct.h"
#include "event2/http_compat.h"
#include "event2/util.h"

View File

@ -1,7 +1,9 @@
AUTOMAKE_OPTIONS = foreign
# XXXX Don't do all this twice.
EXTRA_SRC = event2/buffer.h event2/buffer_compat.h \
event2/thread.h event2/bufferevent.h \
event2/thread.h event2/bufferevent.h event2/bufferevent_compat.h \
event2/bufferevent_struct.h event2/event.h event2/event_compat.h \
event2/event_struct.h event2/tag.h event2/util.h \
event2/http.h event2/http_struct.h event2/http_compat.h
@ -9,6 +11,7 @@ EXTRA_SRC = event2/buffer.h event2/buffer_compat.h \
nobase_include_HEADERS = \
event2/buffer.h event2/buffer_compat.h \
event2/thread.h event2/bufferevent.h \
event2/bufferevent_compat.h \
event2/bufferevent_struct.h event2/event.h event2/event_compat.h \
event2/event_struct.h event2/tag.h event2/util.h \
event2/http.h event2/http_struct.h event2/http_compat.h \

View File

@ -413,14 +413,15 @@ int evbuffer_remove_cb_entry(struct evbuffer *buffer,
*/
int evbuffer_remove_cb(struct evbuffer *buffer, evbuffer_cb cb, void *cbarg);
#define EVBUFFER_CB_DISABLED 0
#define EVBUFFER_CB_ENABLED 1
/** Change whether a given callback is enabled on a buffer or not. A
disabled callback is not invoked even when the buffer size changes.
@param buffer the evbuffer that the callback is watching.
@param cb the callback whose status we want to change.
@param flags EVBUFFER_CB_ENABLED to enable the callback, or 0 to
disable it.
@param flags EVBUFFER_CB_ENABLED to enable the callback, or
EVBUFFER_CB_DISABLEDD to disable it.
@return 0 on success, -1 on failure.
*/
int evbuffer_cb_set_flags(struct evbuffer *buffer,

View File

@ -34,6 +34,20 @@
and one for writing, and callbacks that are invoked under certain
circumstances.
libevent provides an abstraction on top of the regular event callbacks.
This abstraction is called a buffered event. A buffered event provides
input and output buffers that get filled and drained automatically. The
user of a buffered event no longer deals directly with the I/O, but
instead is reading from input and writing to output buffers.
Once initialized, the bufferevent structure can be used repeatedly with
bufferevent_enable() and bufferevent_disable().
When read enabled the bufferevent will try to read from the file descriptor
and call the read callback. The write callback is executed whenever the
output buffer is drained below the write low watermark, which is 0 by
default.
*/
#ifdef __cplusplus
@ -102,43 +116,21 @@ typedef void (*evbuffercb)(struct bufferevent *bev, void *ctx);
typedef void (*everrorcb)(struct bufferevent *bev, short what, void *ctx);
enum bufferevent_options {
BEV_OPT_CLOSE_ON_FREE = (1<<0),
};
/**
Create a new bufferevent.
libevent provides an abstraction on top of the regular event callbacks.
This abstraction is called a buffered event. A buffered event provides
input and output buffers that get filled and drained automatically. The
user of a buffered event no longer deals directly with the I/O, but
instead is reading from input and writing to output buffers.
Once initialized, the bufferevent structure can be used repeatedly with
bufferevent_enable() and bufferevent_disable().
When read enabled the bufferevent will try to read from the file descriptor
and call the read callback. The write callback is executed whenever the
output buffer is drained below the write low watermark, which is 0 by
default.
If multiple bases are in use, bufferevent_base_set() must be called before
enabling the bufferevent for the first time.
Create a new socket bufferevent over an existing socket.
@param base the event base to associate with the new bufferevent.
@param fd the file descriptor from which data is read and written to.
This file descriptor is not allowed to be a pipe(2).
@param readcb callback to invoke when there is data to be read, or NULL if
no callback is desired
@param writecb callback to invoke when the file descriptor is ready for
writing, or NULL if no callback is desired
@param errorcb callback to invoke when there is an error on the file
descriptor
@param cbarg an argument that will be supplied to each of the callbacks
(readcb, writecb, and errorcb)
@return a pointer to a newly allocated bufferevent struct, or NULL if an
error occurred
@see bufferevent_base_set(), bufferevent_free()
@see bufferevent_free()
*/
struct bufferevent *bufferevent_new(evutil_socket_t fd,
evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg);
struct bufferevent *bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, enum bufferevent_options options);
/**
Assign a bufferevent to a specific event_base.
@ -295,11 +287,11 @@ int bufferevent_disable(struct bufferevent *bufev, short event);
Set the read and write timeout for a buffered event.
@param bufev the bufferevent to be modified
@param timeout_read the read timeout
@param timeout_write the write timeout
@param timeout_read the read timeout, or NULL
@param timeout_write the write timeout, or NULL
*/
void bufferevent_settimeout(struct bufferevent *bufev,
int timeout_read, int timeout_write);
void bufferevent_set_timeouts(struct bufferevent *bufev,
const struct timeval *timeout_read, const struct timeval *timeout_write);
/**
Sets the watermarks for read and write events.
@ -309,7 +301,9 @@ void bufferevent_settimeout(struct bufferevent *bufev,
is beyond the high watermark, the buffevent stops reading from the network.
On output, the user write callback is invoked whenever the buffered data
falls below the low watermark.
falls below the low watermark. Filters that write to this bufev will try
not to write more bytes to this buffer than the high watermark would allow,
except when flushing.
@param bufev the bufferevent to be modified
@param events EV_READ, EV_WRITE or both
@ -325,22 +319,38 @@ void bufferevent_setwatermark(struct bufferevent *bufev, short events,
/** macro for getting access to the output buffer of a bufferevent */
#define EVBUFFER_OUTPUT(x) bufferevent_get_output(x)
/**
Support for filtering input and output of bufferevents.
*/
/**
Flags that can be passed into filters to let them know how to
deal with the incoming data.
*/
enum bufferevent_filter_state {
enum bufferevent_flush_mode {
/** usually set when processing data */
BEV_NORMAL = 0,
/** encountered EOF on read or done sending data */
/** want to checkpoint all data sent. */
BEV_FLUSH = 1,
/** encountered EOF on read or done sending data */
BEV_FINISHED = 2,
};
/**
Triggers the bufferevent to produce more
data if possible.
@param bufev the bufferevent object
@param iotype either EV_READ or EV_WRITE or both.
@param state either BEV_NORMAL or BEV_FLUSH or BEV_FINISHED
@return -1 on failure, 0 if no data was produces, 1 if data was produced
*/
int bufferevent_flush(struct bufferevent *bufev,
short iotype,
enum bufferevent_flush_mode state);
/**
Support for filtering input and output of bufferevents.
*/
/**
Values that filters can return.
*/
@ -356,116 +366,44 @@ enum bufferevent_filter_result {
BEV_ERROR = 2
};
/** A callback function to implement a filter for a bufferevent.
@param src An evbuffer to drain data from.
@param dst An evbuffer to add data to.
@param limit A suggested upper bound of bytes to write to dst.
The filter may ignore this value, but doing so means that
it will overflow the high-water mark associated with dst.
-1 means "no limit".
@param state Whether we should write data as may be convenient
(BEV_NORMAL), or flush as much data as we can (BEV_FLUSH),
or flush as much as we can, possibly including an end-of-stream
marker (BEF_FINISH).
@param ctx A user-supplied pointer.
@return BEV_OK if we wrote some data; BEV_NEED_MORE if we can't
produce any more output until we get some input; and BEV_ERROR
on an error.
*/
typedef enum bufferevent_filter_result (*bufferevent_filter_cb)(
struct evbuffer *src, struct evbuffer *dst, ssize_t dst_limit,
enum bufferevent_flush_mode mode, void *ctx);
struct bufferevent_filter;
/**
Creates a new filtering object for a bufferevent.
Filters can be used to implement compression, authentication, rate limiting,
etc. for bufferevents. Filters can be associated with the input or output
path or both. Filters need to be inserted with bufferevent_filter_insert()
on either the input or output path.
For example, when implementing compression, both an input and an
output filters are required. The output filter compress all output
as it passes along whereas the input filter decompresses all input as
it is being read from the network.
Some filters may require specificaly behavior such as flushing their buffers
on EOF. To allom them to do that, a bufferevent will invoke the filter
with BEV_FLUSH to let it know that EOF has been reached.
When a filter needs more data before it can output any data, it may return
BEV_NEED_MORE in which case the filter chain is being interrupted until
more data arrives. A filter can indicate a fatal error by returning
BEV_ERROR. Otherwise, it should return BEV_OK.
@param init_context an optional function that initializes the ctx parameter.
@param free_context an optional function to free memory associated with the
ctx parameter.
@param process the filtering function that should be invokved either during
input or output depending on where the filter should be attached.
@param ctx additional context that can be passed to the process function
@return a bufferevent_filter object that can subsequently be installed
*/
struct bufferevent_filter *bufferevent_filter_new(
void (*init_context)(void *),
void (*free_context)(void *),
enum bufferevent_filter_result (*process)(
struct evbuffer *src, struct evbuffer *dst,
enum bufferevent_filter_state state, void *ctx), void *ctx);
/**
Frees the filter object.
It must have been removed from the bufferevent before it can be freed.
@param filter the filter to be freed
@see bufferevent_filter_remove()
*/
void bufferevent_filter_free(struct bufferevent_filter *filter);
/** Filter types for inserting or removing filters */
enum bufferevent_filter_type {
/** filter is being used for input */
BEV_INPUT = 0,
/** filter is being used for output */
BEV_OUTPUT = 1
enum bufferevent_filter_options {
BEV_FILT_FREE_UNDERLYING = (1<<0),
};
/**
Inserts a filter into the processing of data for bufferevent.
A filter can be inserted only once. It can not be used again for
another insert unless it have been removed via
bufferevent_filter_remove() first.
Input filters are inserted at the end, output filters at the
beginning of the queue.
@param bufev the bufferevent object into which to install the filter
@param filter_type either BEV_INPUT or BEV_OUTPUT
@param filter the filter object
@see bufferevent_filter_remove()
/** Allocate a new filtering bufferevent on top of an existing bufferevent.
*/
void bufferevent_filter_insert(struct bufferevent *bufev,
enum bufferevent_filter_type filter_type,
struct bufferevent_filter *filter);
/**
Removes a filter from the bufferevent.
A filter should be flushed via buffervent_trigger_filter before removing
it from a bufferevent. Any remaining intermediate buffer data is going
to be lost.
@param bufev the bufferevent object from which to remove the filter
@param filter_type either BEV_INPUT or BEV_OUTPUT
@param filter the filter object or NULL to trigger all filters
@see bufferevent_trigger_filter()
*/
void bufferevent_filter_remove(struct bufferevent *bufev,
enum bufferevent_filter_type filter_type,
struct bufferevent_filter *filter);
/**
Triggers the filter chain the specified filter to produce more
data is possible. This is primarily for time-based filters such
as rate-limiting to produce more data as time passes.
@param bufev the bufferevent object to which the filter belongs
@param filter the bufferevent filter at which to start
@param iotype either BEV_INPUT or BEV_OUTPUT depending on where the filter
was installed
@param state either BEV_NORMAL or BEV_FLUSH
@return -1 on failure, 0 if no data was produces, 1 if data was produced
*/
int
bufferevent_trigger_filter(struct bufferevent *bufev,
struct bufferevent_filter *filter, int iotype,
enum bufferevent_filter_state state);
struct bufferevent *
bufferevent_filter_new(struct bufferevent *underlying,
bufferevent_filter_cb input_filter,
bufferevent_filter_cb output_filter,
enum bufferevent_options options,
void (*free_context)(void *),
void *ctx);
#ifdef __cplusplus
}

View File

@ -0,0 +1,81 @@
/*
* Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _EVENT2_BUFFEREVENT_COMPAT_H_
#define _EVENT2_BUFFEREVENT_COMPAT_H_
/**
Create a new bufferevent for an fd.
This function is deprecated. Use buffevent_socket_new and
bufferevent_set_callbacks instead.
libevent provides an abstraction on top of the regular event callbacks.
This abstraction is called a buffered event. A buffered event provides
input and output buffers that get filled and drained automatically. The
user of a buffered event no longer deals directly with the I/O, but
instead is reading from input and writing to output buffers.
Once initialized, the bufferevent structure can be used repeatedly with
bufferevent_enable() and bufferevent_disable().
When read enabled the bufferevent will try to read from the file descriptor
and call the read callback. The write callback is executed whenever the
output buffer is drained below the write low watermark, which is 0 by
default.
If multiple bases are in use, bufferevent_base_set() must be called before
enabling the bufferevent for the first time.
@param fd the file descriptor from which data is read and written to.
This file descriptor is not allowed to be a pipe(2).
@param readcb callback to invoke when there is data to be read, or NULL if
no callback is desired
@param writecb callback to invoke when the file descriptor is ready for
writing, or NULL if no callback is desired
@param errorcb callback to invoke when there is an error on the file
descriptor
@param cbarg an argument that will be supplied to each of the callbacks
(readcb, writecb, and errorcb)
@return a pointer to a newly allocated bufferevent struct, or NULL if an
error occurred
@see bufferevent_base_set(), bufferevent_free()
*/
struct bufferevent *bufferevent_new(evutil_socket_t fd,
evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg);
/**
Set the read and write timeout for a buffered event.
@param bufev the bufferevent to be modified
@param timeout_read the read timeout
@param timeout_write the write timeout
*/
void bufferevent_settimeout(struct bufferevent *bufev,
int timeout_read, int timeout_write);
#endif

View File

@ -62,6 +62,7 @@ struct event_watermark {
size_t high;
};
#if 0
struct bufferevent_filter;
/* Fix so that ppl dont have to run with <sys/queue.h> */
@ -74,20 +75,42 @@ struct name { \
}
#endif /* !TAILQ_HEAD */
TAILQ_HEAD(bufferevent_filterq, bufferevent_filter);
#ifdef _EVENT_DEFINED_TQHEAD
#undef TAILQ_HEAD
#undef _EVENT_DEFINED_TQHEAD
#endif /* _EVENT_DEFINED_TQHEAD */
#endif
/**
Shared implementation of a bufferevent.
This type is exposed only because it was exposed in previous versions,
and some people's code may rely on manipulating it. Otherwise, you
should really not rely on the layout, size, or contents of this structure:
it is fairly volatile, and WILL change in future versions of the code.
**/
struct bufferevent {
/** Event base for which this bufferevent was created. */
struct event_base *ev_base;
/** Pointer to a table of function pointers to set up how this
bufferevent behaves. */
const struct bufferevent_ops *be_ops;
/** A read event that triggers when a timeout has happened or a socket
is ready to read data. Only used by some subtypes of
bufferevent. */
struct event ev_read;
/** A write event that triggers when a timeout has happened or a socket
is ready to write data. Only used by some subtypes of
bufferevent. */
struct event ev_write;
/** An input buffer. Only the bufferevent is allowed to add data to
this buffer, though the user is allowed to drain it. */
struct evbuffer *input;
/** An input buffer. Only the bufferevent is allowed to drain data
from this buffer, though the user is allowed to add it. */
struct evbuffer *output;
struct event_watermark wm_read;
@ -98,14 +121,19 @@ struct bufferevent {
everrorcb errorcb;
void *cbarg;
int timeout_read; /* in seconds */
int timeout_write; /* in seconds */
struct timeval timeout_read;
struct timeval timeout_write;
short enabled; /* events that are currently enabled */
/** Evbuffer callback to enforce watermarks on input. */
struct evbuffer_cb_entry *read_watermarks_cb;
/** the list of input and output filters */
struct bufferevent_filterq input_filters;
struct bufferevent_filterq output_filters;
/** Events that are currently enabled: currently EV_READ and EV_WRITE
are supported. */
short enabled;
/** If set, read is suspended until evbuffer some. */
unsigned read_suspended : 1; /* */
enum bufferevent_options options;
};
#ifdef __cplusplus

View File

@ -20,6 +20,7 @@ test_time_LDADD = ../libevent_core.la
regress_SOURCES = regress.c regress_buffer.c regress_http.c regress_dns.c \
regress_rpc.c regress.gen.c regress.gen.h regress_et.c \
regress_bufferevent.c \
regress_util.c tinytest.c regress_main.c \
$(regress_pthread_SOURCES) $(regress_zlib_SOURCES)
if PTHREADS

View File

@ -61,6 +61,7 @@
#include "event2/tag.h"
#include "event2/buffer.h"
#include "event2/bufferevent.h"
#include "event2/bufferevent_compat.h"
#include "event2/bufferevent_struct.h"
#include "event2/util.h"
#include "event-internal.h"
@ -150,7 +151,7 @@ multiple_write_cb(int fd, short event, void *arg)
}
woff += len;
if (woff >= sizeof(wbuf)) {
shutdown(fd, SHUT_WR);
if (usepersist)
@ -1098,8 +1099,9 @@ readcb(struct bufferevent *bev, void *arg)
bufferevent_disable(bev, EV_READ);
if (EVBUFFER_LENGTH(evbuf) == 8333)
if (EVBUFFER_LENGTH(evbuf) == 8333) {
test_ok++;
}
evbuffer_free(evbuf);
}
@ -1108,8 +1110,9 @@ readcb(struct bufferevent *bev, void *arg)
static void
writecb(struct bufferevent *bev, void *arg)
{
if (EVBUFFER_LENGTH(bev->output) == 0)
if (EVBUFFER_LENGTH(bev->output) == 0) {
test_ok++;
}
}
static void
@ -1118,7 +1121,8 @@ errorcb(struct bufferevent *bev, short what, void *arg)
test_ok = -2;
}
static void
/*XXXX move to regress_bufferevent.c; make static again. */
void
test_bufferevent(void)
{
struct bufferevent *bev1, *bev2;
@ -1179,8 +1183,11 @@ wm_readcb(struct bufferevent *bev, void *arg)
static void
wm_writecb(struct bufferevent *bev, void *arg)
{
if (EVBUFFER_LENGTH(bev->output) == 0)
assert(EVBUFFER_LENGTH(bev->output) <= 100);
if (EVBUFFER_LENGTH(bev->output) == 0) {
evbuffer_drain(bev->output, EVBUFFER_LENGTH(bev->output));
test_ok++;
}
}
static void
@ -1189,7 +1196,8 @@ wm_errorcb(struct bufferevent *bev, short what, void *arg)
test_ok = -2;
}
static void
/*XXXX move to regress_bufferevent.c; make static again. */
void
test_bufferevent_watermarks(void)
{
struct bufferevent *bev1, *bev2;
@ -1205,21 +1213,29 @@ test_bufferevent_watermarks(void)
bufferevent_enable(bev2, EV_READ);
for (i = 0; i < sizeof(buffer); i++)
buffer[i] = i;
buffer[i] = (char)i;
bufferevent_write(bev1, buffer, sizeof(buffer));
/* limit the reading on the receiving bufferevent */
bufferevent_setwatermark(bev2, EV_READ, 10, 20);
/* Tell the sending bufferevent not to notify us till it's down to
100 bytes. */
bufferevent_setwatermark(bev1, EV_WRITE, 100, 2000);
event_dispatch();
tt_int_op(test_ok, ==, 2);
/* The write callback drained all the data from outbuf, so we
* should have removed the write event... */
tt_assert(!event_pending(&bev2->ev_write, EV_WRITE, NULL));
end:
bufferevent_free(bev1);
bufferevent_free(bev2);
if (test_ok != 2)
test_ok = 0;
cleanup_test();
}
@ -1231,7 +1247,7 @@ test_bufferevent_watermarks(void)
static enum bufferevent_filter_result
bufferevent_input_filter(struct evbuffer *src, struct evbuffer *dst,
enum bufferevent_filter_state state, void *ctx)
ssize_t lim, enum bufferevent_flush_mode state, void *ctx)
{
const unsigned char *buffer;
int i;
@ -1253,7 +1269,7 @@ bufferevent_input_filter(struct evbuffer *src, struct evbuffer *dst,
static enum bufferevent_filter_result
bufferevent_output_filter(struct evbuffer *src, struct evbuffer *dst,
enum bufferevent_filter_state state, void *ctx)
ssize_t lim, enum bufferevent_flush_mode state, void *ctx)
{
const unsigned char *buffer;
int i;
@ -1268,41 +1284,39 @@ bufferevent_output_filter(struct evbuffer *src, struct evbuffer *dst,
return (BEV_OK);
}
static void
/*XXXX move to regress_bufferevent.c; make static again. */
void
test_bufferevent_filters(void)
{
struct bufferevent *bev1, *bev2;
struct bufferevent_filter *finput, *foutput;
char buffer[8333];
int i;
test_ok = 0;
setup_test("Bufferevent Filters: ");
bev1 = bufferevent_new(pair[0], NULL, writecb, errorcb, NULL);
bev2 = bufferevent_new(pair[1], readcb, NULL, errorcb, NULL);
bufferevent_disable(bev1, EV_READ);
bufferevent_enable(bev2, EV_READ);
bev1 = bufferevent_socket_new(NULL, pair[0], 0);
bev2 = bufferevent_socket_new(NULL, pair[1], 0);
for (i = 0; i < sizeof(buffer); i++)
buffer[i] = i;
bev1 = bufferevent_filter_new(bev1, NULL, bufferevent_output_filter,
0, NULL, NULL);
bev2 = bufferevent_filter_new(bev2, bufferevent_input_filter,
NULL, 0, NULL, NULL);
bufferevent_setcb(bev1, NULL, writecb, errorcb, NULL);
bufferevent_setcb(bev2, readcb, NULL, errorcb, NULL);
bufferevent_disable(bev1, EV_READ);
bufferevent_enable(bev2, EV_READ);
/* insert some filters */
finput = bufferevent_filter_new(
NULL, NULL,bufferevent_input_filter, NULL);
foutput = bufferevent_filter_new(
NULL, NULL, bufferevent_output_filter, NULL);
bufferevent_filter_insert(bev1, BEV_OUTPUT, foutput);
bufferevent_filter_insert(bev2, BEV_INPUT, finput);
bufferevent_write(bev1, buffer, sizeof(buffer));
event_dispatch();
bufferevent_filter_remove(bev1, BEV_OUTPUT, foutput);
bufferevent_filter_free(foutput);
bufferevent_free(bev1);
bufferevent_free(bev2);
@ -1630,10 +1644,6 @@ struct testcase_t legacy_testcases[] = {
LEGACY(persistent_timeout, TT_FORK|TT_NEED_BASE),
LEGACY(priorities, TT_FORK|TT_NEED_BASE),
LEGACY(bufferevent, TT_ISOLATED),
LEGACY(bufferevent_watermarks, TT_ISOLATED),
LEGACY(bufferevent_filters, TT_ISOLATED),
LEGACY(free_active_base, TT_FORK|TT_NEED_BASE),
LEGACY(event_base_new, TT_FORK|TT_NEED_SOCKETPAIR),

View File

@ -34,8 +34,15 @@ extern "C" {
#include "tinytest.h"
#include "tinytest_macros.h"
/*XXX move these to regress_bufferevent.c when bufferevent changes are
* merged. */
void test_bufferevent(void);
void test_bufferevent_watermarks(void);
void test_bufferevent_filters(void);
extern struct testcase_t legacy_testcases[];
extern struct testcase_t evbuffer_testcases[];
extern struct testcase_t bufferevent_testcases[];
extern struct testcase_t util_testcases[];
extern struct testcase_t signal_testcases[];
extern struct testcase_t http_testcases[];

View File

@ -0,0 +1,79 @@
/*
* Copyright (c) 2003-2007 Niels Provos <provos@citi.umich.edu>
* Copyright (c) 2007-2009 Niels Provos and Nick Mathewson
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifdef WIN32
#include <winsock2.h>
#include <windows.h>
#endif
#ifdef HAVE_CONFIG_H
#include "event-config.h"
#endif
#include <sys/types.h>
#include <sys/stat.h>
#ifdef _EVENT_HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#include <sys/queue.h>
#ifndef WIN32
#include <sys/socket.h>
#include <sys/wait.h>
#include <signal.h>
#include <unistd.h>
#include <netdb.h>
#endif
#include <fcntl.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include "event2/event.h"
#include "event2/event_struct.h"
#include "event2/event_compat.h"
#include "event2/tag.h"
#include "event2/buffer.h"
#include "event2/bufferevent.h"
#include "event2/bufferevent_compat.h"
#include "event2/bufferevent_struct.h"
#include "event2/util.h"
#include "bufferevent-internal.h"
#include "regress.h"
struct testcase_t bufferevent_testcases[] = {
LEGACY(bufferevent, TT_ISOLATED),
LEGACY(bufferevent_watermarks, TT_ISOLATED),
LEGACY(bufferevent_filters, TT_ISOLATED),
END_OF_TESTCASES,
};

View File

@ -189,6 +189,7 @@ struct testgroup_t testgroups[] = {
{ "evbuffer/", evbuffer_testcases },
{ "signal/", signal_testcases },
{ "util/", util_testcases },
{ "bufferevent/", bufferevent_testcases },
{ "http/", http_testcases },
{ "dns/", dns_testcases },
{ "rpc/", rpc_testcases },

View File

@ -56,21 +56,16 @@
void regress_zlib(void);
static int test_ok;
static int infilter_calls;
static int outfilter_calls;
static int readcb_finished;
static int writecb_finished;
static int errorcb_invoked;
/*
* Zlib filters
*/
static void
zlib_deflate_init(void *ctx)
{
z_streamp p = ctx;
memset(p, 0, sizeof(z_stream));
assert(deflateInit(p, Z_DEFAULT_COMPRESSION) == Z_OK);
}
static void
zlib_deflate_free(void *ctx)
{
@ -79,15 +74,6 @@ zlib_deflate_free(void *ctx)
assert(deflateEnd(p) == Z_OK);
}
static void
zlib_inflate_init(void *ctx)
{
z_streamp p = ctx;
memset(p, 0, sizeof(z_stream));
assert(inflateInit(p) == Z_OK);
}
static void
zlib_inflate_free(void *ctx)
{
@ -96,6 +82,20 @@ zlib_inflate_free(void *ctx)
assert(inflateEnd(p) == Z_OK);
}
static int
getstate(enum bufferevent_flush_mode state)
{
switch (state) {
case BEV_FINISHED:
return Z_FINISH;
case BEV_FLUSH:
return Z_SYNC_FLUSH;
case BEV_NORMAL:
default:
return Z_NO_FLUSH;
}
}
/*
* The input filter is triggered only on new input read from the network.
* That means all input data needs to be consumed or the filter needs to
@ -103,7 +103,7 @@ zlib_inflate_free(void *ctx)
*/
static enum bufferevent_filter_result
zlib_input_filter(struct evbuffer *src, struct evbuffer *dst,
enum bufferevent_filter_state state, void *ctx)
ssize_t lim, enum bufferevent_flush_mode state, void *ctx)
{
int nread, nwrite;
int res;
@ -119,9 +119,7 @@ zlib_input_filter(struct evbuffer *src, struct evbuffer *dst,
p->avail_out = 4096;
/* we need to flush zlib if we got a flush */
res = inflate(p, state == BEV_FLUSH ?
Z_FINISH : Z_NO_FLUSH);
assert(res == Z_OK || res == Z_STREAM_END);
res = inflate(p, getstate(state));
/* let's figure out how much was compressed */
nread = evbuffer_get_contiguous_space(src) - p->avail_in;
@ -129,16 +127,27 @@ zlib_input_filter(struct evbuffer *src, struct evbuffer *dst,
evbuffer_drain(src, nread);
evbuffer_commit_space(dst, nwrite);
if (res==Z_BUF_ERROR) {
/* We're out of space, or out of decodeable input.
Only if nwrite == 0 assume the latter.
*/
if (nwrite == 0)
return BEV_NEED_MORE;
} else {
assert(res == Z_OK || res == Z_STREAM_END);
}
} while (EVBUFFER_LENGTH(src) > 0);
test_ok++;
++infilter_calls;
return (BEV_OK);
}
static enum bufferevent_filter_result
zlib_output_filter(struct evbuffer *src, struct evbuffer *dst,
enum bufferevent_filter_state state, void *ctx)
ssize_t lim, enum bufferevent_flush_mode state, void *ctx)
{
int nread, nwrite;
int res;
@ -153,9 +162,9 @@ zlib_output_filter(struct evbuffer *src, struct evbuffer *dst,
p->next_out = evbuffer_reserve_space(dst, 4096);
p->avail_out = 4096;
/* we need to flush zlib if we got a flush */
res = deflate(p, state == BEV_FLUSH ? Z_FINISH : Z_NO_FLUSH);
assert(res == Z_OK || res == Z_STREAM_END);
res = deflate(p, getstate(state));
/* let's figure out how much was compressed */
nread = evbuffer_get_contiguous_space(src) - p->avail_in;
@ -163,9 +172,20 @@ zlib_output_filter(struct evbuffer *src, struct evbuffer *dst,
evbuffer_drain(src, nread);
evbuffer_commit_space(dst, nwrite);
if (res==Z_BUF_ERROR) {
/* We're out of space, or out of decodeable input.
Only if nwrite == 0 assume the latter.
*/
if (nwrite == 0)
return BEV_NEED_MORE;
} else {
assert(res == Z_OK || res == Z_STREAM_END);
}
} while (EVBUFFER_LENGTH(src) > 0);
test_ok++;
++outfilter_calls;
return (BEV_OK);
}
@ -186,8 +206,9 @@ readcb(struct bufferevent *bev, void *arg)
bufferevent_disable(bev, EV_READ);
if (EVBUFFER_LENGTH(evbuf) == 8333)
test_ok++;
if (EVBUFFER_LENGTH(evbuf) == 8333) {
++readcb_finished;
}
evbuffer_free(evbuf);
}
@ -196,26 +217,29 @@ readcb(struct bufferevent *bev, void *arg)
static void
writecb(struct bufferevent *bev, void *arg)
{
if (EVBUFFER_LENGTH(bufferevent_get_output(bev)) == 0)
test_ok++;
if (EVBUFFER_LENGTH(bufferevent_get_output(bev)) == 0) {
++writecb_finished;
}
}
static void
errorcb(struct bufferevent *bev, short what, void *arg)
{
test_ok = -2;
errorcb_invoked = 1;
}
static void
test_bufferevent_zlib(void)
{
struct bufferevent *bev1, *bev2;
struct bufferevent_filter *finput, *foutput;
struct bufferevent *bev1, *bev2, *bev1_orig, *bev2_orig;
char buffer[8333];
z_stream z_input, z_output;
int i, pair[2];
int i, pair[2], r;
int test_ok;
infilter_calls = outfilter_calls = readcb_finished = writecb_finished
= errorcb_invoked = 0;
test_ok = 0;
fprintf(stdout, "Testing Zlib Filter: ");
if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, pair) == -1) {
@ -226,21 +250,27 @@ test_bufferevent_zlib(void)
evutil_make_socket_nonblocking(pair[0]);
evutil_make_socket_nonblocking(pair[1]);
bev1 = bufferevent_new(pair[0], readcb, writecb, errorcb, NULL);
bev2 = bufferevent_new(pair[1], readcb, writecb, errorcb, NULL);
bev1_orig = bev1 = bufferevent_socket_new(NULL, pair[0], 0);
bev2_orig = bev2 = bufferevent_socket_new(NULL, pair[1], 0);
memset(&z_output, 0, sizeof(z_output));
r = deflateInit(&z_output, Z_DEFAULT_COMPRESSION);
assert(r == Z_OK);
memset(&z_input, 0, sizeof(z_input));
r = inflateInit(&z_input);
/* initialize filters */
finput = bufferevent_filter_new(
zlib_inflate_init, zlib_inflate_free,
zlib_input_filter, &z_input);
bufferevent_filter_insert(bev2, BEV_INPUT, finput);
bev1 = bufferevent_filter_new(bev1, NULL, zlib_output_filter, 0,
zlib_deflate_free, &z_output);
bev2 = bufferevent_filter_new(bev2, zlib_input_filter,
NULL, 0, zlib_inflate_free, &z_input);
bufferevent_setcb(bev1, readcb, writecb, errorcb, NULL);
bufferevent_setcb(bev2, readcb, writecb, errorcb, NULL);
foutput = bufferevent_filter_new(
zlib_deflate_init, zlib_deflate_free,
zlib_output_filter, &z_output);
bufferevent_filter_insert(bev1, BEV_OUTPUT, foutput);
bufferevent_disable(bev1, EV_READ);
bufferevent_enable(bev1, EV_WRITE);
bufferevent_enable(bev2, EV_READ);
for (i = 0; i < sizeof(buffer); i++)
@ -251,14 +281,21 @@ test_bufferevent_zlib(void)
bufferevent_write(bev1, buffer + 1800, sizeof(buffer) - 1800);
/* we are done writing - we need to flush everything */
bufferevent_trigger_filter(bev1, NULL, BEV_OUTPUT, BEV_FLUSH);
bufferevent_flush(bev1, EV_WRITE, BEV_FINISHED);
event_dispatch();
bufferevent_free(bev1);
bufferevent_free(bev2);
if (test_ok != 6) {
test_ok = infilter_calls &&
outfilter_calls &&
readcb_finished &&
writecb_finished &&
!errorcb_invoked;
if (! test_ok) {
fprintf(stdout, "FAILED: %d\n", test_ok);
exit(1);
}