mirror of
https://github.com/cuberite/libevent.git
synced 2025-09-08 11:53:00 -04:00
fix a bug where rpc would not be scheduled when they were queued; test for it.
allow a configurable timeout for connections and RPCs. svn:r274
This commit is contained in:
parent
ce436242ad
commit
2d028ef6c1
4
evhttp.h
4
evhttp.h
@ -150,6 +150,10 @@ struct evhttp_connection *evhttp_connection_new(
|
||||
/* Frees an http connection */
|
||||
void evhttp_connection_free(struct evhttp_connection *evcon);
|
||||
|
||||
/* Sets the timeout for events related to this connection */
|
||||
void evhttp_connection_set_timeout(struct evhttp_connection *evcon,
|
||||
int timeout_in_secs);
|
||||
|
||||
/* The connection gets ownership of the request */
|
||||
int evhttp_make_request(struct evhttp_connection *evcon,
|
||||
struct evhttp_request *req,
|
||||
|
@ -48,6 +48,8 @@ void evrpc_reqstate_free(struct evrpc_req_generic* rpc_state);
|
||||
struct evrpc_pool {
|
||||
struct evconq connections;
|
||||
|
||||
int timeout;
|
||||
|
||||
TAILQ_HEAD(evrpc_requestq, evrpc_request_wrapper) requests;
|
||||
};
|
||||
|
||||
|
92
evrpc.c
92
evrpc.c
@ -85,7 +85,8 @@ evrpc_free(struct evrpc_base *base)
|
||||
|
||||
}
|
||||
|
||||
void evrpc_request_cb(struct evhttp_request *, void *);
|
||||
static void evrpc_pool_schedule(struct evrpc_pool *pool);
|
||||
static void evrpc_request_cb(struct evhttp_request *, void *);
|
||||
void evrpc_request_done(struct evrpc_req_generic*);
|
||||
|
||||
/*
|
||||
@ -132,7 +133,7 @@ evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
|
||||
return (0);
|
||||
}
|
||||
|
||||
void
|
||||
static void
|
||||
evrpc_request_cb(struct evhttp_request *req, void *arg)
|
||||
{
|
||||
struct evrpc *rpc = arg;
|
||||
@ -244,6 +245,8 @@ evrpc_pool_new()
|
||||
TAILQ_INIT(&pool->connections);
|
||||
TAILQ_INIT(&pool->requests);
|
||||
|
||||
pool->timeout = -1;
|
||||
|
||||
return (pool);
|
||||
}
|
||||
|
||||
@ -285,6 +288,13 @@ evrpc_pool_add_connection(struct evrpc_pool *pool,
|
||||
assert(connection->http_server == NULL);
|
||||
TAILQ_INSERT_TAIL(&pool->connections, connection, next);
|
||||
|
||||
/*
|
||||
* unless a timeout was specifically set for a connection,
|
||||
* the connection inherits the timeout from the pool.
|
||||
*/
|
||||
if (connection->timeout == -1)
|
||||
connection->timeout = pool->timeout;
|
||||
|
||||
/*
|
||||
* if we have any requests pending, schedule them with the new
|
||||
* connections.
|
||||
@ -298,8 +308,19 @@ evrpc_pool_add_connection(struct evrpc_pool *pool,
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
|
||||
{
|
||||
struct evhttp_connection *evcon;
|
||||
TAILQ_FOREACH(evcon, &pool->connections, next) {
|
||||
evcon->timeout = timeout_in_secs;
|
||||
}
|
||||
pool->timeout = timeout_in_secs;
|
||||
}
|
||||
|
||||
|
||||
static void evrpc_reply_done(struct evhttp_request *, void *);
|
||||
static void evrpc_request_timeout(int, short, void *);
|
||||
|
||||
/*
|
||||
* Finds a connection object associated with the pool that is currently
|
||||
@ -325,6 +346,7 @@ evrpc_schedule_request(struct evhttp_connection *connection,
|
||||
struct evrpc_request_wrapper *ctx)
|
||||
{
|
||||
struct evhttp_request *req = NULL;
|
||||
struct evrpc_pool *pool = ctx->pool;
|
||||
char *uri = NULL;
|
||||
int res = 0;
|
||||
|
||||
@ -338,6 +360,19 @@ evrpc_schedule_request(struct evhttp_connection *connection,
|
||||
if (uri == NULL)
|
||||
goto error;
|
||||
|
||||
/* we need to know the connection that we might have to abort */
|
||||
ctx->evcon = connection;
|
||||
|
||||
if (pool->timeout > 0) {
|
||||
/*
|
||||
* a timeout after which the whole rpc is going to be aborted.
|
||||
*/
|
||||
struct timeval tv;
|
||||
timerclear(&tv);
|
||||
tv.tv_sec = pool->timeout;
|
||||
evtimer_add(&ctx->ev_timeout, &tv);
|
||||
}
|
||||
|
||||
/* start the request over the connection */
|
||||
res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
|
||||
free(uri);
|
||||
@ -357,24 +392,21 @@ int
|
||||
evrpc_make_request(struct evrpc_request_wrapper *ctx)
|
||||
{
|
||||
struct evrpc_pool *pool = ctx->pool;
|
||||
struct evhttp_connection *connection;
|
||||
|
||||
/* initialize the event structure for this rpc */
|
||||
evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx);
|
||||
|
||||
/* we better have some available connections on the pool */
|
||||
assert(TAILQ_FIRST(&pool->connections) != NULL);
|
||||
|
||||
|
||||
/* even if a connection might be available, we do FIFO */
|
||||
if (TAILQ_FIRST(&pool->requests) == NULL) {
|
||||
connection = evrpc_pool_find_connection(pool);
|
||||
if (connection != NULL)
|
||||
return evrpc_schedule_request(connection, ctx);
|
||||
}
|
||||
|
||||
/*
|
||||
* if no connection is available, we queue the request on the pool,
|
||||
* the next time a connection is empty, the rpc will be send on that.
|
||||
*/
|
||||
TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
|
||||
|
||||
evrpc_pool_schedule(pool);
|
||||
|
||||
return (0);
|
||||
}
|
||||
|
||||
@ -382,10 +414,15 @@ static void
|
||||
evrpc_reply_done(struct evhttp_request *req, void *arg)
|
||||
{
|
||||
struct evrpc_request_wrapper *ctx = arg;
|
||||
int res;
|
||||
struct evrpc_pool *pool = ctx->pool;
|
||||
int res = -1;
|
||||
|
||||
/* cancel any timeout we might have scheduled */
|
||||
event_del(&ctx->ev_timeout);
|
||||
|
||||
/* we need to get the reply now */
|
||||
res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
|
||||
if (req != NULL)
|
||||
res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
|
||||
if (res == -1) {
|
||||
/* clear everything that we might have written previously */
|
||||
ctx->reply_clear(ctx->reply);
|
||||
@ -396,4 +433,33 @@ evrpc_reply_done(struct evhttp_request *req, void *arg)
|
||||
evrpc_request_wrapper_free(ctx);
|
||||
|
||||
/* the http layer owns the request structure */
|
||||
|
||||
/* see if we can schedule another request */
|
||||
evrpc_pool_schedule(pool);
|
||||
}
|
||||
|
||||
static void
|
||||
evrpc_pool_schedule(struct evrpc_pool *pool)
|
||||
{
|
||||
struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
|
||||
struct evhttp_connection *evcon;
|
||||
|
||||
/* if no requests are pending, we have no work */
|
||||
if (ctx == NULL)
|
||||
return;
|
||||
|
||||
if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
|
||||
TAILQ_REMOVE(&pool->requests, ctx, next);
|
||||
evrpc_schedule_request(evcon, ctx);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
evrpc_request_timeout(int fd, short what, void *arg)
|
||||
{
|
||||
struct evrpc_request_wrapper *ctx = arg;
|
||||
struct evhttp_connection *evcon = ctx->evcon;
|
||||
assert(evcon != NULL);
|
||||
|
||||
evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
|
||||
}
|
||||
|
20
evrpc.h
20
evrpc.h
@ -154,6 +154,7 @@ int evrpc_send_request_##rpcname(struct evrpc_pool *pool, \
|
||||
return (-1); \
|
||||
} \
|
||||
ctx->pool = pool; \
|
||||
ctx->evcon = NULL; \
|
||||
ctx->name = strdup(#rpcname); \
|
||||
if (ctx->name == NULL) { \
|
||||
free(ctx); \
|
||||
@ -228,6 +229,12 @@ struct evrpc_request_wrapper {
|
||||
/* pool on which this rpc request is being made */
|
||||
struct evrpc_pool *pool;
|
||||
|
||||
/* connection on which the request is being sent */
|
||||
struct evhttp_connection *evcon;
|
||||
|
||||
/* event for implementing request timeouts */
|
||||
struct event ev_timeout;
|
||||
|
||||
/* the name of the rpc */
|
||||
char *name;
|
||||
|
||||
@ -262,4 +269,17 @@ void evrpc_pool_free(struct evrpc_pool *);
|
||||
void evrpc_pool_add_connection(struct evrpc_pool *,
|
||||
struct evhttp_connection *);
|
||||
|
||||
/*
|
||||
* Sets the timeout in secs after which a request has to complete. The
|
||||
* RPC is completely aborted if it does not complete by then. Setting
|
||||
* the timeout to 0 means that it never timeouts and can be used to
|
||||
* implement callback type RPCs.
|
||||
*
|
||||
* Any connection already in the pool will be updated with the new
|
||||
* timeout. Connections added to the pool after set_timeout has be
|
||||
* called receive the pool timeout only if no timeout has been set
|
||||
* for the connection itself.
|
||||
*/
|
||||
void evrpc_pool_set_timeout(struct evrpc_pool *, int timeout_in_secs);
|
||||
|
||||
#endif /* _EVRPC_H_ */
|
||||
|
@ -50,6 +50,8 @@ struct evhttp_connection {
|
||||
int flags;
|
||||
#define EVHTTP_CON_INCOMING 0x0001 /* only one request on it ever */
|
||||
#define EVHTTP_CON_OUTGOING 0x0002 /* multiple requests possible */
|
||||
|
||||
int timeout; /* timeout in seconds for events */
|
||||
|
||||
enum evhttp_connection_state state;
|
||||
|
||||
|
68
http.c
68
http.c
@ -218,12 +218,24 @@ evhttp_method(enum evhttp_cmd_type type)
|
||||
return (method);
|
||||
}
|
||||
|
||||
static void
|
||||
evhttp_add_event(struct event *ev, int timeout, int default_timeout)
|
||||
{
|
||||
if (timeout != 0) {
|
||||
struct timeval tv;
|
||||
|
||||
timerclear(&tv);
|
||||
tv.tv_sec = timeout != -1 ? timeout : default_timeout;
|
||||
event_add(ev, &tv);
|
||||
} else {
|
||||
event_add(ev, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
evhttp_write_buffer(struct evhttp_connection *evcon,
|
||||
void (*cb)(struct evhttp_connection *, void *), void *arg)
|
||||
{
|
||||
struct timeval tv;
|
||||
|
||||
event_debug(("%s: preparing to write buffer\n", __func__));
|
||||
|
||||
/* Set call back */
|
||||
@ -232,9 +244,7 @@ evhttp_write_buffer(struct evhttp_connection *evcon,
|
||||
|
||||
/* xxx: maybe check if the event is still pending? */
|
||||
event_set(&evcon->ev, evcon->fd, EV_WRITE, evhttp_write, evcon);
|
||||
timerclear(&tv);
|
||||
tv.tv_sec = HTTP_WRITE_TIMEOUT;
|
||||
event_add(&evcon->ev, &tv);
|
||||
evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_WRITE_TIMEOUT);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -464,7 +474,6 @@ void
|
||||
evhttp_write(int fd, short what, void *arg)
|
||||
{
|
||||
struct evhttp_connection *evcon = arg;
|
||||
struct timeval tv;
|
||||
int n;
|
||||
|
||||
if (what == EV_TIMEOUT) {
|
||||
@ -486,9 +495,8 @@ evhttp_write(int fd, short what, void *arg)
|
||||
}
|
||||
|
||||
if (EVBUFFER_LENGTH(evcon->output_buffer) != 0) {
|
||||
timerclear(&tv);
|
||||
tv.tv_sec = HTTP_WRITE_TIMEOUT;
|
||||
event_add(&evcon->ev, &tv);
|
||||
evhttp_add_event(&evcon->ev,
|
||||
evcon->timeout, HTTP_WRITE_TIMEOUT);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -549,7 +557,6 @@ evhttp_read(int fd, short what, void *arg)
|
||||
{
|
||||
struct evhttp_connection *evcon = arg;
|
||||
struct evhttp_request *req = TAILQ_FIRST(&evcon->requests);
|
||||
struct timeval tv;
|
||||
int n;
|
||||
|
||||
if (what == EV_TIMEOUT) {
|
||||
@ -574,10 +581,8 @@ evhttp_read(int fd, short what, void *arg)
|
||||
evhttp_connection_done(evcon);
|
||||
return;
|
||||
}
|
||||
|
||||
timerclear(&tv);
|
||||
tv.tv_sec = HTTP_READ_TIMEOUT;
|
||||
event_add(&evcon->ev, &tv);
|
||||
|
||||
evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT);
|
||||
}
|
||||
|
||||
void
|
||||
@ -971,7 +976,6 @@ evhttp_parse_lines(struct evhttp_request *req, struct evbuffer* buffer)
|
||||
void
|
||||
evhttp_get_body(struct evhttp_connection *evcon, struct evhttp_request *req)
|
||||
{
|
||||
struct timeval tv;
|
||||
const char *content_length;
|
||||
const char *connection;
|
||||
struct evkeyvalq *headers = req->input_headers;
|
||||
@ -1013,16 +1017,12 @@ evhttp_get_body(struct evhttp_connection *evcon, struct evhttp_request *req)
|
||||
}
|
||||
|
||||
event_set(&evcon->ev, evcon->fd, EV_READ, evhttp_read, evcon);
|
||||
timerclear(&tv);
|
||||
tv.tv_sec = HTTP_READ_TIMEOUT;
|
||||
event_add(&evcon->ev, &tv);
|
||||
return;
|
||||
evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT);
|
||||
}
|
||||
|
||||
void
|
||||
evhttp_read_header(int fd, short what, void *arg)
|
||||
{
|
||||
struct timeval tv;
|
||||
struct evhttp_connection *evcon = arg;
|
||||
struct evhttp_request *req = TAILQ_FIRST(&evcon->requests);
|
||||
int n, res;
|
||||
@ -1053,9 +1053,8 @@ evhttp_read_header(int fd, short what, void *arg)
|
||||
return;
|
||||
} else if (res == 0) {
|
||||
/* Need more header lines */
|
||||
timerclear(&tv);
|
||||
tv.tv_sec = HTTP_READ_TIMEOUT;
|
||||
event_add(&evcon->ev, &tv);
|
||||
evhttp_add_event(&evcon->ev,
|
||||
evcon->timeout, HTTP_READ_TIMEOUT);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -1105,6 +1104,8 @@ evhttp_connection_new(const char *address, unsigned short port)
|
||||
evcon->fd = -1;
|
||||
evcon->port = port;
|
||||
|
||||
evcon->timeout = -1;
|
||||
|
||||
if ((evcon->address = strdup(address)) == NULL) {
|
||||
event_warn("%s: strdup failed", __func__);
|
||||
goto error;
|
||||
@ -1131,11 +1132,16 @@ evhttp_connection_new(const char *address, unsigned short port)
|
||||
return (NULL);
|
||||
}
|
||||
|
||||
void
|
||||
evhttp_connection_set_timeout(struct evhttp_connection *evcon,
|
||||
int timeout_in_secs)
|
||||
{
|
||||
evcon->timeout = timeout_in_secs;
|
||||
}
|
||||
|
||||
int
|
||||
evhttp_connection_connect(struct evhttp_connection *evcon)
|
||||
{
|
||||
struct timeval tv;
|
||||
|
||||
if (evcon->state == EVCON_CONNECTING)
|
||||
return (0);
|
||||
|
||||
@ -1154,9 +1160,7 @@ evhttp_connection_connect(struct evhttp_connection *evcon)
|
||||
|
||||
/* Set up a callback for successful connection setup */
|
||||
event_set(&evcon->ev, evcon->fd, EV_WRITE, evhttp_connectioncb, evcon);
|
||||
timerclear(&tv);
|
||||
tv.tv_sec = HTTP_CONNECT_TIMEOUT;
|
||||
event_add(&evcon->ev, &tv);
|
||||
evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_CONNECT_TIMEOUT);
|
||||
|
||||
evcon->state = EVCON_CONNECTING;
|
||||
|
||||
@ -1217,16 +1221,12 @@ evhttp_make_request(struct evhttp_connection *evcon,
|
||||
void
|
||||
evhttp_start_read(struct evhttp_connection *evcon)
|
||||
{
|
||||
struct timeval tv;
|
||||
|
||||
/* Set up an event to read the headers */
|
||||
if (event_initialized(&evcon->ev))
|
||||
event_del(&evcon->ev);
|
||||
event_set(&evcon->ev, evcon->fd, EV_READ, evhttp_read_header, evcon);
|
||||
|
||||
timerclear(&tv);
|
||||
tv.tv_sec = HTTP_READ_TIMEOUT;
|
||||
event_add(&evcon->ev, &tv);
|
||||
|
||||
evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT);
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -315,10 +315,39 @@ GotKillCb(struct msg *msg, struct kill *kill, void *arg)
|
||||
goto done;
|
||||
|
||||
test_ok += 1;
|
||||
|
||||
done:
|
||||
event_loopexit(NULL);
|
||||
}
|
||||
|
||||
static void
|
||||
GotKillCbTwo(struct msg *msg, struct kill *kill, void *arg)
|
||||
{
|
||||
char *weapon;
|
||||
char *action;
|
||||
|
||||
if (EVTAG_GET(kill, weapon, &weapon) == -1) {
|
||||
fprintf(stderr, "get weapon\n");
|
||||
goto done;
|
||||
}
|
||||
if (EVTAG_GET(kill, action, &action) == -1) {
|
||||
fprintf(stderr, "get action\n");
|
||||
goto done;
|
||||
}
|
||||
|
||||
if (strcmp(weapon, "dagger"))
|
||||
goto done;
|
||||
|
||||
if (strcmp(action, "wave around like an idiot"))
|
||||
goto done;
|
||||
|
||||
test_ok += 1;
|
||||
|
||||
done:
|
||||
if (test_ok == 2)
|
||||
event_loopexit(NULL);
|
||||
}
|
||||
|
||||
static void
|
||||
rpc_basic_client(void)
|
||||
{
|
||||
@ -374,10 +403,62 @@ rpc_basic_client(void)
|
||||
evhttp_free(http);
|
||||
}
|
||||
|
||||
/*
|
||||
* We are testing that the second requests gets send over the same
|
||||
* connection after the first RPCs completes.
|
||||
*/
|
||||
static void
|
||||
rpc_basic_queued_client(void)
|
||||
{
|
||||
short port;
|
||||
struct evhttp *http = NULL;
|
||||
struct evrpc_base *base = NULL;
|
||||
struct evrpc_pool *pool = NULL;
|
||||
struct msg *msg;
|
||||
struct kill *kill_one, *kill_two;
|
||||
|
||||
fprintf(stdout, "Testing RPC (Queued) Client: ");
|
||||
|
||||
rpc_setup(&http, &port, &base);
|
||||
|
||||
pool = rpc_pool_with_connection(port);
|
||||
|
||||
/* set up the basic message */
|
||||
msg = msg_new();
|
||||
EVTAG_ASSIGN(msg, from_name, "niels");
|
||||
EVTAG_ASSIGN(msg, to_name, "tester");
|
||||
|
||||
kill_one = kill_new();
|
||||
kill_two = kill_new();
|
||||
|
||||
EVRPC_MAKE_REQUEST(Message, msg, kill_one, GotKillCbTwo, NULL);
|
||||
EVRPC_MAKE_REQUEST(Message, msg, kill_two, GotKillCb, NULL);
|
||||
|
||||
test_ok = 0;
|
||||
|
||||
event_dispatch();
|
||||
|
||||
if (test_ok != 2) {
|
||||
fprintf(stdout, "FAILED (1)\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
fprintf(stdout, "OK\n");
|
||||
|
||||
msg_free(msg);
|
||||
kill_free(kill_one);
|
||||
kill_free(kill_two);
|
||||
|
||||
evrpc_pool_free(pool);
|
||||
evhttp_free(http);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
rpc_suite(void)
|
||||
{
|
||||
rpc_basic_test();
|
||||
rpc_basic_message();
|
||||
rpc_basic_client();
|
||||
rpc_basic_queued_client();
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user