Functions to view and manipulate rate-limiting buckets.

We need these for Tor, and other projects probably need them too.  Uses
include:
    - Checking whether bandwidth is mostly-used, and only taking some
      actions when there's plenty of bandwidth.
    - Deducting some non-bufferevent activities from a rate-limit group.
This commit is contained in:
Nick Mathewson 2010-02-03 15:12:04 -05:00
parent aba1fff33a
commit 85047a6983
2 changed files with 278 additions and 34 deletions

View File

@ -108,6 +108,19 @@ ev_token_bucket_update(struct ev_token_bucket *bucket,
return 1;
}
static inline void
bufferevent_update_buckets(struct bufferevent_private *bev)
{
/* Must hold lock on bev. */
struct timeval now;
unsigned tick;
event_base_gettimeofday_cached(bev->bev.ev_base, &now);
tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
if (tick != bev->rate_limiting->limit.last_updated)
ev_token_bucket_update(&bev->rate_limiting->limit,
bev->rate_limiting->cfg, tick);
}
ev_uint32_t
ev_token_bucket_get_tick(const struct timeval *tv,
const struct ev_token_bucket_cfg *cfg)
@ -179,7 +192,6 @@ _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
{
/* needs lock on bev. */
int max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER;
struct timeval now;
#define LIM(x) \
(is_write ? (x).write_limit : (x).read_limit)
@ -203,12 +215,7 @@ _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
*/
if (bev->rate_limiting->cfg) {
unsigned tick;
event_base_gettimeofday_cached(bev->bev.ev_base, &now);
tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
ev_token_bucket_update(&bev->rate_limiting->limit,
bev->rate_limiting->cfg, tick);
bufferevent_update_buckets(bev);
max_so_far = LIM(bev->rate_limiting->limit);
}
if (bev->rate_limiting->group) {
@ -451,6 +458,44 @@ _bev_group_random_element(struct bufferevent_rate_limit_group *group)
} \
} while (0)
static void
_bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g)
{
int again = 0;
struct bufferevent_private *bev, *first;
g->read_suspended = 0;
FOREACH_RANDOM_ORDER({
if (EVLOCK_TRY_LOCK(bev->lock)) {
bufferevent_unsuspend_read(&bev->bev,
BEV_SUSPEND_BW_GROUP);
EVLOCK_UNLOCK(bev->lock, 0);
} else {
again = 1;
}
});
g->pending_unsuspend_read = again;
}
static void
_bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g)
{
int again = 0;
struct bufferevent_private *bev, *first;
g->write_suspended = 0;
FOREACH_RANDOM_ORDER({
if (EVLOCK_TRY_LOCK(bev->lock)) {
bufferevent_unsuspend_write(&bev->bev,
BEV_SUSPEND_BW_GROUP);
EVLOCK_UNLOCK(bev->lock, 0);
} else {
again = 1;
}
});
g->pending_unsuspend_write = again;
}
/** Callback invoked every tick to add more elements to the group bucket
and unsuspend group members as needed.
*/
@ -460,8 +505,6 @@ _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
struct bufferevent_rate_limit_group *g = arg;
unsigned tick;
struct timeval now;
int again = 0;
struct bufferevent_private *bev, *first;
event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
@ -471,33 +514,11 @@ _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
if (g->pending_unsuspend_read ||
(g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
g->read_suspended = 0;
again = 0;
FOREACH_RANDOM_ORDER({
if (EVLOCK_TRY_LOCK(bev->lock)) {
bufferevent_unsuspend_read(&bev->bev,
BEV_SUSPEND_BW_GROUP);
EVLOCK_UNLOCK(bev->lock, 0);
} else {
again = 1;
}
});
g->pending_unsuspend_read = again;
_bev_group_unsuspend_reading(g);
}
if (g->pending_unsuspend_write ||
(g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
g->write_suspended = 0;
again = 0;
FOREACH_RANDOM_ORDER({
if (EVLOCK_TRY_LOCK(bev->lock)) {
bufferevent_unsuspend_write(&bev->bev,
BEV_SUSPEND_BW_GROUP);
EVLOCK_UNLOCK(bev->lock, 0);
} else {
again = 1;
}
});
g->pending_unsuspend_write = again;
_bev_group_unsuspend_writing(g);
}
/* XXXX Rather than waiting to the next tick to unsuspend stuff
@ -660,3 +681,169 @@ bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
BEV_UNLOCK(bev);
return 0;
}
/* ===
* API functions to expose rate limits.
*
* Don't use these from inside Libevent; they're meant to be for use by
* the program.
* === */
/* Mostly you don't want to use this function from inside libevent;
* _bufferevent_get_read_max() is more likely what you want*/
ev_ssize_t
bufferevent_get_read_limit(struct bufferevent *bev)
{
ev_ssize_t r;
struct bufferevent_private *bevp;
BEV_LOCK(bev);
bevp = BEV_UPCAST(bev);
if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
bufferevent_update_buckets(bevp);
r = bevp->rate_limiting->limit.read_limit;
} else {
r = EV_SSIZE_MAX;
}
BEV_UNLOCK(bev);
return r;
}
/* Mostly you don't want to use this function from inside libevent;
* _bufferevent_get_write_max() is more likely what you want*/
ev_ssize_t
bufferevent_get_write_limit(struct bufferevent *bev)
{
ev_ssize_t r;
struct bufferevent_private *bevp;
BEV_LOCK(bev);
bevp = BEV_UPCAST(bev);
if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
bufferevent_update_buckets(bevp);
r = bevp->rate_limiting->limit.write_limit;
} else {
r = EV_SSIZE_MAX;
}
BEV_UNLOCK(bev);
return r;
}
/* Mostly you don't want to use this function from inside libevent;
* _bufferevent_get_read_max() is more likely what you want*/
ev_ssize_t
bufferevent_rate_limit_group_get_read_limit(
struct bufferevent_rate_limit_group *grp)
{
ev_ssize_t r;
LOCK_GROUP(grp);
r = grp->rate_limit.read_limit;
UNLOCK_GROUP(grp);
return r;
}
/* Mostly you don't want to use this function from inside libevent;
* _bufferevent_get_write_max() is more likely what you want. */
ev_ssize_t
bufferevent_rate_limit_group_get_write_limit(
struct bufferevent_rate_limit_group *grp)
{
ev_ssize_t r;
LOCK_GROUP(grp);
r = grp->rate_limit.write_limit;
UNLOCK_GROUP(grp);
return r;
}
int
bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
{
int r = 0;
ev_int32_t old_limit, new_limit;
struct bufferevent_private *bevp;
BEV_LOCK(bev);
bevp = BEV_UPCAST(bev);
EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
old_limit = bevp->rate_limiting->limit.read_limit;
new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
if (old_limit > 0 && new_limit <= 0) {
bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
if (event_add(&bevp->rate_limiting->refill_bucket_event,
&bevp->rate_limiting->cfg->tick_timeout) < 0)
r = -1;
} else if (old_limit <= 0 && new_limit > 0) {
event_del(&bevp->rate_limiting->refill_bucket_event);
bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
}
BEV_UNLOCK(bev);
return r;
}
int
bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
{
/* XXXX this is mostly copy-and-paste from
* bufferevent_decrement_read_limit */
int r = 0;
ev_int32_t old_limit, new_limit;
struct bufferevent_private *bevp;
BEV_LOCK(bev);
bevp = BEV_UPCAST(bev);
EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
old_limit = bevp->rate_limiting->limit.write_limit;
new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
if (old_limit > 0 && new_limit <= 0) {
bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
if (event_add(&bevp->rate_limiting->refill_bucket_event,
&bevp->rate_limiting->cfg->tick_timeout) < 0)
r = -1;
} else if (old_limit <= 0 && new_limit > 0) {
event_del(&bevp->rate_limiting->refill_bucket_event);
bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
}
BEV_UNLOCK(bev);
return r;
}
int
bufferevent_rate_limit_group_decrement_read(
struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
{
int r = 0;
ev_int32_t old_limit, new_limit;
LOCK_GROUP(grp);
old_limit = grp->rate_limit.read_limit;
new_limit = (grp->rate_limit.read_limit -= decr);
if (old_limit > 0 && new_limit <= 0) {
_bev_group_suspend_reading(grp);
} else if (old_limit <= 0 && new_limit > 0) {
_bev_group_unsuspend_reading(grp);
}
UNLOCK_GROUP(grp);
return r;
}
int
bufferevent_rate_limit_group_decrement_write(
struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
{
int r = 0;
ev_int32_t old_limit, new_limit;
LOCK_GROUP(grp);
old_limit = grp->rate_limit.write_limit;
new_limit = (grp->rate_limit.write_limit -= decr);
if (old_limit > 0 && new_limit <= 0) {
_bev_group_suspend_writing(grp);
} else if (old_limit <= 0 && new_limit > 0) {
_bev_group_unsuspend_writing(grp);
}
UNLOCK_GROUP(grp);
return r;
}

View File

@ -492,12 +492,12 @@ int
bufferevent_pair_new(struct event_base *base, int options,
struct bufferevent *pair[2]);
/**
Abstract type used to configure rate-limiting on a bufferevent or a group
of bufferevents.
*/
struct ev_token_bucket_cfg;
/**
A group of bufferevents which are configured to respect the same rate
limit.
@ -545,6 +545,7 @@ void ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg);
*/
int bufferevent_set_rate_limit(struct bufferevent *bev,
struct ev_token_bucket_cfg *cfg);
/**
Create a new rate-limit group for bufferevents. A rate-limit group
constrains the maximum number of bytes sent and received, in toto,
@ -584,6 +585,62 @@ int bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
/** Remove 'bev' from its current rate-limit group (if any). */
int bufferevent_remove_from_rate_limit_group(struct bufferevent *bev);
/*@{*/
/**
Return the current read or write bucket size for a bufferevent.
If it is not configured with a per-bufferevent ratelimit, return
EV_SSIZE_MAX. This function does not inspect the group limit, if any.
Note that it can return a negative value if the bufferevent has been
made to read or write more than its limit.
*/
ev_ssize_t bufferevent_get_read_limit(struct bufferevent *bev);
ev_ssize_t bufferevent_get_write_limit(struct bufferevent *bev);
/*@}*/
/*@{*/
/**
Return the read or write bucket size for a bufferevent rate limit
group. Note that it can return a negative value if bufferevents in
the group have been made to read or write more than their limits.
*/
ev_ssize_t bufferevent_rate_limit_group_get_read_limit(
struct bufferevent_rate_limit_group *);
ev_ssize_t bufferevent_rate_limit_group_get_write_limit(
struct bufferevent_rate_limit_group *);
/*@}*/
/*@{*/
/**
Subtract a number of bytes from a bufferevent's read or write bucket.
The decrement value can be negative, if you want to manually refill
the bucket. If the change puts the bucket above or below zero, the
bufferevent will resume or suspend reading writing as appropriate.
These functions make no change in the buckets for the bufferevent's
group, if any.
Returns 0 on success, -1 on internal error.
*/
int bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr);
int bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr);
/*@}*/
/*@{*/
/**
Subtract a number of bytes from a bufferevent rate-limiting group's
read or write bucket. The decrement value can be negative, if you
want to manually refill the bucket. If the change puts the bucket
above or below zero, the bufferevents in the group will resume or
suspend reading writing as appropriate.
Returns 0 on success, -1 on internal error.
*/
int bufferevent_rate_limit_group_decrement_read(
struct bufferevent_rate_limit_group *, ev_ssize_t);
int bufferevent_rate_limit_group_decrement_write(
struct bufferevent_rate_limit_group *, ev_ssize_t);
/*@}*/
#ifdef __cplusplus
}
#endif