support for low and high watermarks

svn:r101
This commit is contained in:
Niels Provos 2004-04-04 02:20:21 +00:00
parent 3772ec8e95
commit fbf01c7f04
4 changed files with 103 additions and 5 deletions

View File

@ -88,6 +88,7 @@ int
evbuffer_add(struct evbuffer *buf, u_char *data, size_t datlen) evbuffer_add(struct evbuffer *buf, u_char *data, size_t datlen)
{ {
size_t need = buf->off + datlen; size_t need = buf->off + datlen;
size_t oldoff = buf->off;
if (buf->totallen < need) { if (buf->totallen < need) {
void *newbuf; void *newbuf;
@ -108,19 +109,30 @@ evbuffer_add(struct evbuffer *buf, u_char *data, size_t datlen)
memcpy(buf->buffer + buf->off, data, datlen); memcpy(buf->buffer + buf->off, data, datlen);
buf->off += datlen; buf->off += datlen;
if (datlen && buf->cb != NULL)
(*buf->cb)(buf, oldoff, buf->off, buf->cbarg);
return (0); return (0);
} }
void void
evbuffer_drain(struct evbuffer *buf, size_t len) evbuffer_drain(struct evbuffer *buf, size_t len)
{ {
size_t oldoff = buf->off;
if (len >= buf->off) { if (len >= buf->off) {
buf->off = 0; buf->off = 0;
return; goto done;
} }
memmove(buf->buffer, buf->buffer + len, buf->off - len); memmove(buf->buffer, buf->buffer + len, buf->off - len);
buf->off -= len; buf->off -= len;
done:
/* Tell someone about changes in this buffer */
if (buf->off != oldoff && buf->cb != NULL)
(*buf->cb)(buf, oldoff, buf->off, buf->cbarg);
} }
int int
@ -176,3 +188,11 @@ evbuffer_find(struct evbuffer *buffer, u_char *what, size_t len)
return (NULL); return (NULL);
} }
void evbuffer_setcb(struct evbuffer *buffer,
void (*cb)(struct evbuffer *, size_t, size_t, void *),
void *cbarg)
{
buffer->cb = cb;
buffer->cbarg = cbarg;
}

View File

@ -41,12 +41,34 @@ bufferevent_add(struct event *ev, int timeout)
return (event_add(ev, ptv)); return (event_add(ev, ptv));
} }
/*
* This callback is executed when the size of the input buffer changes.
* We use it to apply back pressure on the reading side.
*/
void
bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
void *arg) {
struct bufferevent *bufev = arg;
/*
* If we are below the watermak then reschedule reading if it's
* still enabled.
*/
if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
evbuffer_setcb(buf, NULL, NULL);
if (bufev->enabled & EV_READ)
bufferevent_add(&bufev->ev_read, bufev->timeout_read);
}
}
static void static void
bufferevent_readcb(int fd, short event, void *arg) bufferevent_readcb(int fd, short event, void *arg)
{ {
struct bufferevent *bufev = arg; struct bufferevent *bufev = arg;
int res = 0; int res = 0;
short what = EVBUFFER_READ; short what = EVBUFFER_READ;
size_t len;
if (event == EV_TIMEOUT) { if (event == EV_TIMEOUT) {
what |= EVBUFFER_TIMEOUT; what |= EVBUFFER_TIMEOUT;
@ -69,6 +91,19 @@ bufferevent_readcb(int fd, short event, void *arg)
bufferevent_add(&bufev->ev_read, bufev->timeout_read); bufferevent_add(&bufev->ev_read, bufev->timeout_read);
/* See if this callbacks meets the water marks */
len = EVBUFFER_LENGTH(bufev->input);
if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
return;
if (bufev->wm_read.high != 0 && len > bufev->wm_read.high) {
struct evbuffer *buf = bufev->input;
event_del(&bufev->ev_read);
/* Now schedule a callback for us */
evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
return;
}
/* Invoke the user callback - must always be called last */ /* Invoke the user callback - must always be called last */
(*bufev->readcb)(bufev, bufev->cbarg); (*bufev->readcb)(bufev, bufev->cbarg);
return; return;
@ -111,8 +146,11 @@ bufferevent_writecb(int fd, short event, void *arg)
if (EVBUFFER_LENGTH(bufev->output) != 0) if (EVBUFFER_LENGTH(bufev->output) != 0)
bufferevent_add(&bufev->ev_write, bufev->timeout_write); bufferevent_add(&bufev->ev_write, bufev->timeout_write);
/* Invoke the user callback if our buffer is drained */ /*
if (EVBUFFER_LENGTH(bufev->output) == 0) * Invoke the user callback if our buffer is drained or below the
* low watermark.
*/
if (EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
(*bufev->writecb)(bufev, bufev->cbarg); (*bufev->writecb)(bufev, bufev->cbarg);
return; return;
@ -273,3 +311,26 @@ bufferevent_settimeout(struct bufferevent *bufev,
bufev->timeout_read = timeout_read; bufev->timeout_read = timeout_read;
bufev->timeout_write = timeout_write; bufev->timeout_write = timeout_write;
} }
/*
* Sets the water marks
*/
void
bufferevent_setwatermark(struct bufferevent *bufev, short events,
size_t lowmark, size_t highmark)
{
if (events & EV_READ) {
bufev->wm_read.low = lowmark;
bufev->wm_read.high = highmark;
}
if (events & EV_WRITE) {
bufev->wm_write.low = lowmark;
bufev->wm_write.high = highmark;
}
/* If the watermarks changed then see if we should call read again */
bufferevent_read_pressure_cb(bufev->input,
0, EVBUFFER_LENGTH(bufev->input), bufev);
}

View File

@ -327,6 +327,10 @@ event_once(int fd, short events,
struct event_once *eonce; struct event_once *eonce;
struct timeval etv; struct timeval etv;
/* We cannot support signals that just fire once */
if (events & EV_SIGNAL)
return (-1);
if ((eonce = calloc(1, sizeof(struct event_once))) == NULL) if ((eonce = calloc(1, sizeof(struct event_once))) == NULL)
return (-1); return (-1);
@ -387,8 +391,10 @@ event_pending(struct event *ev, short event, struct timeval *tv)
flags |= ev->ev_res; flags |= ev->ev_res;
if (ev->ev_flags & EVLIST_TIMEOUT) if (ev->ev_flags & EVLIST_TIMEOUT)
flags |= EV_TIMEOUT; flags |= EV_TIMEOUT;
if (ev->ev_flags & EVLIST_SIGNAL)
flags |= EV_SIGNAL;
event &= (EV_TIMEOUT|EV_READ|EV_WRITE); event &= (EV_TIMEOUT|EV_READ|EV_WRITE|EV_SIGNAL);
/* See if there is a timeout that we should report */ /* See if there is a timeout that we should report */
if (tv != NULL && (flags & event & EV_TIMEOUT)) if (tv != NULL && (flags & event & EV_TIMEOUT))

13
event.h
View File

@ -173,6 +173,9 @@ struct evbuffer {
size_t totallen; size_t totallen;
size_t off; size_t off;
void (*cb)(struct evbuffer *, size_t, size_t, void *);
void *cbarg;
}; };
/* Just for error reporting - use other constants otherwise */ /* Just for error reporting - use other constants otherwise */
@ -186,6 +189,11 @@ struct bufferevent;
typedef void (*evbuffercb)(struct bufferevent *, void *); typedef void (*evbuffercb)(struct bufferevent *, void *);
typedef void (*everrorcb)(struct bufferevent *, short what, void *); typedef void (*everrorcb)(struct bufferevent *, short what, void *);
struct event_watermark {
size_t low;
size_t high;
};
struct bufferevent { struct bufferevent {
struct event ev_read; struct event ev_read;
struct event ev_write; struct event ev_write;
@ -193,6 +201,9 @@ struct bufferevent {
struct evbuffer *input; struct evbuffer *input;
struct evbuffer *output; struct evbuffer *output;
struct event_watermark wm_read;
struct event_watermark wm_write;
evbuffercb readcb; evbuffercb readcb;
evbuffercb writecb; evbuffercb writecb;
everrorcb errorcb; everrorcb errorcb;
@ -222,7 +233,6 @@ void bufferevent_settimeout(struct bufferevent *bufev,
struct evbuffer *evbuffer_new(void); struct evbuffer *evbuffer_new(void);
void evbuffer_free(struct evbuffer *); void evbuffer_free(struct evbuffer *);
void evbuffer_free(struct evbuffer *);
int evbuffer_add(struct evbuffer *, u_char *, size_t); int evbuffer_add(struct evbuffer *, u_char *, size_t);
int evbuffer_add_buffer(struct evbuffer *, struct evbuffer *); int evbuffer_add_buffer(struct evbuffer *, struct evbuffer *);
int evbuffer_add_printf(struct evbuffer *, char *fmt, ...); int evbuffer_add_printf(struct evbuffer *, char *fmt, ...);
@ -230,6 +240,7 @@ void evbuffer_drain(struct evbuffer *, size_t);
int evbuffer_write(struct evbuffer *, int); int evbuffer_write(struct evbuffer *, int);
int evbuffer_read(struct evbuffer *, int, int); int evbuffer_read(struct evbuffer *, int, int);
u_char *evbuffer_find(struct evbuffer *, u_char *, size_t); u_char *evbuffer_find(struct evbuffer *, u_char *, size_t);
void evbuffer_setcb(struct evbuffer *, void (*)(struct evbuffer *, size_t, size_t, void *), void *);
#ifdef __cplusplus #ifdef __cplusplus
} }