mirror of
https://github.com/cuberite/libevent.git
synced 2025-08-04 01:36:23 -04:00
Fix serious bugs in per-bufferevent rate-limiting code
Our old code was too zealous about deleting the refill events that would actually make connections able to read or write again after they had run out of bandwidth. Under some circumstances, this could cause a bufferevent to never actually refill one of its rate-limiting buckets. Also, the code treated setting a per-connection rate-limit on a connection that already had a group-limit as if it were changing the limit on a connection whose allocation had already run out. This patch fixes both of those problems.
This commit is contained in:
parent
819b171572
commit
34d64f8a34
@ -543,15 +543,19 @@ bufferevent_set_rate_limit(struct bufferevent *bev,
|
|||||||
struct bufferevent_rate_limit *rlim;
|
struct bufferevent_rate_limit *rlim;
|
||||||
struct timeval now;
|
struct timeval now;
|
||||||
ev_uint32_t tick;
|
ev_uint32_t tick;
|
||||||
|
int reinit = 0, suspended = 0;
|
||||||
/* XXX reference-count cfg */
|
/* XXX reference-count cfg */
|
||||||
|
|
||||||
BEV_LOCK(bev);
|
BEV_LOCK(bev);
|
||||||
|
|
||||||
if (cfg == NULL) {
|
if (cfg == NULL) {
|
||||||
if (bevp->rate_limiting) {
|
if (bevp->rate_limiting) {
|
||||||
bevp->rate_limiting->cfg = NULL;
|
rlim = bevp->rate_limiting;
|
||||||
|
rlim->cfg = NULL;
|
||||||
bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
|
bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
|
||||||
bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
|
bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
|
||||||
|
if (event_initialized(&rlim->refill_bucket_event))
|
||||||
|
event_del(&rlim->refill_bucket_event);
|
||||||
}
|
}
|
||||||
r = 0;
|
r = 0;
|
||||||
goto done;
|
goto done;
|
||||||
@ -561,29 +565,48 @@ bufferevent_set_rate_limit(struct bufferevent *bev,
|
|||||||
tick = ev_token_bucket_get_tick(&now, cfg);
|
tick = ev_token_bucket_get_tick(&now, cfg);
|
||||||
|
|
||||||
if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
|
if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
|
||||||
;
|
/* no-op */
|
||||||
} else if (bevp->rate_limiting) {
|
r = 0;
|
||||||
bevp->rate_limiting->cfg = cfg;
|
goto done;
|
||||||
ev_token_bucket_init(&bevp->rate_limiting->limit, cfg, tick, 1);
|
}
|
||||||
if (bevp->rate_limiting->limit.read_limit > 0)
|
if (bevp->rate_limiting == NULL) {
|
||||||
bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
|
|
||||||
else
|
|
||||||
bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
|
|
||||||
if (bevp->rate_limiting->limit.write_limit > 0)
|
|
||||||
bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
|
|
||||||
else
|
|
||||||
bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
|
|
||||||
} else {
|
|
||||||
rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
|
rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
|
||||||
if (!rlim)
|
if (!rlim)
|
||||||
goto done;
|
goto done;
|
||||||
rlim->cfg = cfg;
|
|
||||||
ev_token_bucket_init(&rlim->limit, cfg, tick, 0);
|
|
||||||
evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
|
|
||||||
_bev_refill_callback, bevp);
|
|
||||||
bevp->rate_limiting = rlim;
|
bevp->rate_limiting = rlim;
|
||||||
|
} else {
|
||||||
|
rlim = bevp->rate_limiting;
|
||||||
}
|
}
|
||||||
|
reinit = rlim->cfg != NULL;
|
||||||
|
|
||||||
|
rlim->cfg = cfg;
|
||||||
|
ev_token_bucket_init(&rlim->limit, cfg, tick, reinit);
|
||||||
|
|
||||||
|
if (reinit) {
|
||||||
|
EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
|
||||||
|
event_del(&rlim->refill_bucket_event);
|
||||||
|
}
|
||||||
|
evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
|
||||||
|
_bev_refill_callback, bevp);
|
||||||
|
|
||||||
|
if (rlim->limit.read_limit > 0) {
|
||||||
|
bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
|
||||||
|
} else {
|
||||||
|
bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
|
||||||
|
suspended=1;
|
||||||
|
}
|
||||||
|
if (rlim->limit.write_limit > 0) {
|
||||||
|
bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
|
||||||
|
} else {
|
||||||
|
bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
|
||||||
|
suspended = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (suspended)
|
||||||
|
event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
|
||||||
|
|
||||||
r = 0;
|
r = 0;
|
||||||
|
|
||||||
done:
|
done:
|
||||||
BEV_UNLOCK(bev);
|
BEV_UNLOCK(bev);
|
||||||
return r;
|
return r;
|
||||||
@ -856,7 +879,8 @@ bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
|
|||||||
&bevp->rate_limiting->cfg->tick_timeout) < 0)
|
&bevp->rate_limiting->cfg->tick_timeout) < 0)
|
||||||
r = -1;
|
r = -1;
|
||||||
} else if (old_limit <= 0 && new_limit > 0) {
|
} else if (old_limit <= 0 && new_limit > 0) {
|
||||||
event_del(&bevp->rate_limiting->refill_bucket_event);
|
if (!(bevp->write_suspended & BEV_SUSPEND_BW))
|
||||||
|
event_del(&bevp->rate_limiting->refill_bucket_event);
|
||||||
bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
|
bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -884,7 +908,8 @@ bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
|
|||||||
&bevp->rate_limiting->cfg->tick_timeout) < 0)
|
&bevp->rate_limiting->cfg->tick_timeout) < 0)
|
||||||
r = -1;
|
r = -1;
|
||||||
} else if (old_limit <= 0 && new_limit > 0) {
|
} else if (old_limit <= 0 && new_limit > 0) {
|
||||||
event_del(&bevp->rate_limiting->refill_bucket_event);
|
if (!(bevp->read_suspended & BEV_SUSPEND_BW))
|
||||||
|
event_del(&bevp->rate_limiting->refill_bucket_event);
|
||||||
bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
|
bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user