provide hooks for outgoing pools; associate a base with a pool

svn:r468
This commit is contained in:
Niels Provos 2007-11-03 22:51:26 +00:00
parent 30ae40cc52
commit 1d3a008af3
5 changed files with 120 additions and 37 deletions

View File

@ -31,3 +31,4 @@ Changes in current version:
o Fix evport implementation: port_disassociate called on unassociated events resulting in bogus errors; more efficient memory management; from Trond Norbye and Prakash Sangappa o Fix evport implementation: port_disassociate called on unassociated events resulting in bogus errors; more efficient memory management; from Trond Norbye and Prakash Sangappa
o support for hooks on rpc input and output; can be used to implement rpc independent processing such as compression or authentication. o support for hooks on rpc input and output; can be used to implement rpc independent processing such as compression or authentication.
o use a min heap instead of a red-black tree for timeouts; as a result finding the min is a O(1) operation now; from Maxim Yegorushkin o use a min heap instead of a red-black tree for timeouts; as a result finding the min is a O(1) operation now; from Maxim Yegorushkin
o associate an event base with an rpc pool

View File

@ -41,16 +41,30 @@ struct evrpc_hook {
void *process_arg; void *process_arg;
}; };
TAILQ_HEAD(evrpc_hook_list, evrpc_hook);
/*
* this is shared between the base and the pool, so that we can reuse
* the hook adding functions; we alias both evrpc_pool and evrpc_base
* to this common structure.
*/
struct _evrpc_hooks {
/* hooks for processing outbound and inbound rpcs */
struct evrpc_hook_list in_hooks;
struct evrpc_hook_list out_hooks;
};
#define input_hooks common.in_hooks
#define output_hooks common.out_hooks
struct evrpc_base { struct evrpc_base {
struct _evrpc_hooks common;
/* the HTTP server under which we register our RPC calls */ /* the HTTP server under which we register our RPC calls */
struct evhttp* http_server; struct evhttp* http_server;
/* a list of all RPCs registered with us */ /* a list of all RPCs registered with us */
TAILQ_HEAD(evrpc_list, evrpc) registered_rpcs; TAILQ_HEAD(evrpc_list, evrpc) registered_rpcs;
/* hooks for processing outbound and inbound rpcs */
TAILQ_HEAD(evrpc_hook_list, evrpc_hook) input_hooks;
struct evrpc_hook_list output_hooks;
}; };
struct evrpc_req_generic; struct evrpc_req_generic;
@ -58,6 +72,10 @@ void evrpc_reqstate_free(struct evrpc_req_generic* rpc_state);
/* A pool for holding evhttp_connection objects */ /* A pool for holding evhttp_connection objects */
struct evrpc_pool { struct evrpc_pool {
struct _evrpc_hooks common;
struct event_base *base;
struct evconq connections; struct evconq connections;
int timeout; int timeout;

95
evrpc.c
View File

@ -100,19 +100,20 @@ evrpc_free(struct evrpc_base *base)
} }
void * void *
evrpc_add_hook(struct evrpc_base *base, evrpc_add_hook(void *vbase,
enum EVRPC_HOOK_TYPE hook_type, enum EVRPC_HOOK_TYPE hook_type,
int (*cb)(struct evhttp_request *, struct evbuffer *, void *), int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
void *cb_arg) void *cb_arg)
{ {
struct _evrpc_hooks *base = vbase;
struct evrpc_hook_list *head = NULL; struct evrpc_hook_list *head = NULL;
struct evrpc_hook *hook = NULL; struct evrpc_hook *hook = NULL;
switch (hook_type) { switch (hook_type) {
case INPUT: case INPUT:
head = &base->input_hooks; head = &base->in_hooks;
break; break;
case OUTPUT: case OUTPUT:
head = &base->output_hooks; head = &base->out_hooks;
break; break;
default: default:
assert(hook_type == INPUT || hook_type == OUTPUT); assert(hook_type == INPUT || hook_type == OUTPUT);
@ -128,28 +129,10 @@ evrpc_add_hook(struct evrpc_base *base,
return (hook); return (hook);
} }
/* static int
* remove the hook specified by the handle evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
*/
int
evrpc_remove_hook(struct evrpc_base *base,
enum EVRPC_HOOK_TYPE hook_type,
void *handle)
{ {
struct evrpc_hook_list *head = NULL;
struct evrpc_hook *hook = NULL; struct evrpc_hook *hook = NULL;
switch (hook_type) {
case INPUT:
head = &base->input_hooks;
break;
case OUTPUT:
head = &base->output_hooks;
break;
default:
assert(hook_type == INPUT || hook_type == OUTPUT);
}
TAILQ_FOREACH(hook, head, next) { TAILQ_FOREACH(hook, head, next) {
if (hook == handle) { if (hook == handle) {
TAILQ_REMOVE(head, hook, next); TAILQ_REMOVE(head, hook, next);
@ -161,6 +144,29 @@ evrpc_remove_hook(struct evrpc_base *base,
return (0); return (0);
} }
/*
* remove the hook specified by the handle
*/
int
evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
{
struct _evrpc_hooks *base = vbase;
struct evrpc_hook_list *head = NULL;
switch (hook_type) {
case INPUT:
head = &base->in_hooks;
break;
case OUTPUT:
head = &base->out_hooks;
break;
default:
assert(hook_type == INPUT || hook_type == OUTPUT);
}
return (evrpc_remove_hook_internal(head, handle));
}
static int static int
evrpc_process_hooks(struct evrpc_hook_list *head, evrpc_process_hooks(struct evrpc_hook_list *head,
struct evhttp_request *req, struct evbuffer *evbuf) struct evhttp_request *req, struct evbuffer *evbuf)
@ -371,7 +377,7 @@ static int evrpc_schedule_request(struct evhttp_connection *connection,
struct evrpc_request_wrapper *ctx); struct evrpc_request_wrapper *ctx);
struct evrpc_pool * struct evrpc_pool *
evrpc_pool_new(void) evrpc_pool_new(struct event_base *base)
{ {
struct evrpc_pool *pool = calloc(1, sizeof(struct evrpc_pool)); struct evrpc_pool *pool = calloc(1, sizeof(struct evrpc_pool));
if (pool == NULL) if (pool == NULL)
@ -380,6 +386,10 @@ evrpc_pool_new(void)
TAILQ_INIT(&pool->connections); TAILQ_INIT(&pool->connections);
TAILQ_INIT(&pool->requests); TAILQ_INIT(&pool->requests);
TAILQ_INIT(&pool->input_hooks);
TAILQ_INIT(&pool->output_hooks);
pool->base = base;
pool->timeout = -1; pool->timeout = -1;
return (pool); return (pool);
@ -397,6 +407,7 @@ evrpc_pool_free(struct evrpc_pool *pool)
{ {
struct evhttp_connection *connection; struct evhttp_connection *connection;
struct evrpc_request_wrapper *request; struct evrpc_request_wrapper *request;
struct evrpc_hook *hook;
while ((request = TAILQ_FIRST(&pool->requests)) != NULL) { while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
TAILQ_REMOVE(&pool->requests, request, next); TAILQ_REMOVE(&pool->requests, request, next);
@ -409,6 +420,14 @@ evrpc_pool_free(struct evrpc_pool *pool)
evhttp_connection_free(connection); evhttp_connection_free(connection);
} }
while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
assert(evrpc_remove_hook(pool, INPUT, hook));
}
while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
assert(evrpc_remove_hook(pool, OUTPUT, hook));
}
free(pool); free(pool);
} }
@ -423,6 +442,12 @@ evrpc_pool_add_connection(struct evrpc_pool *pool,
assert(connection->http_server == NULL); assert(connection->http_server == NULL);
TAILQ_INSERT_TAIL(&pool->connections, connection, next); TAILQ_INSERT_TAIL(&pool->connections, connection, next);
/*
* associate an event base with this connection
*/
if (pool->base != NULL)
evhttp_connection_set_base(connection, pool->base);
/* /*
* unless a timeout was specifically set for a connection, * unless a timeout was specifically set for a connection,
* the connection inherits the timeout from the pool. * the connection inherits the timeout from the pool.
@ -499,6 +524,11 @@ evrpc_schedule_request(struct evhttp_connection *connection,
/* we need to know the connection that we might have to abort */ /* we need to know the connection that we might have to abort */
ctx->evcon = connection; ctx->evcon = connection;
/* apply hooks to the outgoing request */
if (evrpc_process_hooks(&pool->output_hooks,
req, req->output_buffer) == -1)
goto error;
if (pool->timeout > 0) { if (pool->timeout > 0) {
/* /*
* a timeout after which the whole rpc is going to be aborted. * a timeout after which the whole rpc is going to be aborted.
@ -533,6 +563,8 @@ evrpc_make_request(struct evrpc_request_wrapper *ctx)
/* initialize the event structure for this rpc */ /* initialize the event structure for this rpc */
evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx); evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx);
if (pool->base != NULL)
event_base_set(pool->base, &ctx->ev_timeout);
/* we better have some available connections on the pool */ /* we better have some available connections on the pool */
assert(TAILQ_FIRST(&pool->connections) != NULL); assert(TAILQ_FIRST(&pool->connections) != NULL);
@ -564,13 +596,22 @@ evrpc_reply_done(struct evhttp_request *req, void *arg)
/* we need to get the reply now */ /* we need to get the reply now */
if (req != NULL) { if (req != NULL) {
res = ctx->reply_unmarshal(ctx->reply, req->input_buffer); /* apply hooks to the incoming request */
if (res == -1) { if (evrpc_process_hooks(&pool->input_hooks,
status.error = EVRPC_STATUS_ERR_BADPAYLOAD; req, req->input_buffer) == -1) {
status.error = EVRPC_STATUS_ERR_HOOKABORTED;
res = -1;
} else {
res = ctx->reply_unmarshal(ctx->reply,
req->input_buffer);
if (res == -1) {
status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
}
} }
} else { } else {
status.error = EVRPC_STATUS_ERR_TIMEOUT; status.error = EVRPC_STATUS_ERR_TIMEOUT;
} }
if (res == -1) { if (res == -1) {
/* clear everything that we might have written previously */ /* clear everything that we might have written previously */
ctx->reply_clear(ctx->reply); ctx->reply_clear(ctx->reply);

14
evrpc.h
View File

@ -66,6 +66,7 @@ extern "C" {
*/ */
struct evbuffer; struct evbuffer;
struct event_base;
struct evrpc_req_generic; struct evrpc_req_generic;
/* Encapsulates a request */ /* Encapsulates a request */
@ -260,6 +261,7 @@ struct evrpc_status {
#define EVRPC_STATUS_ERR_TIMEOUT 1 #define EVRPC_STATUS_ERR_TIMEOUT 1
#define EVRPC_STATUS_ERR_BADPAYLOAD 2 #define EVRPC_STATUS_ERR_BADPAYLOAD 2
#define EVRPC_STATUS_ERR_UNSTARTED 3 #define EVRPC_STATUS_ERR_UNSTARTED 3
#define EVRPC_STATUS_ERR_HOOKABORTED 4
int error; int error;
/* for looking at headers or other information */ /* for looking at headers or other information */
@ -307,8 +309,12 @@ int evrpc_make_request(struct evrpc_request_wrapper *);
* a pool has a number of connections associated with it. * a pool has a number of connections associated with it.
* rpc requests are always made via a pool. * rpc requests are always made via a pool.
*/ */
struct evrpc_pool *evrpc_pool_new(void); struct evrpc_pool *evrpc_pool_new(struct event_base *);
void evrpc_pool_free(struct evrpc_pool *); void evrpc_pool_free(struct evrpc_pool *);
/*
* adds a connection over which rpc can be dispatched. the connection
* object must have been newly created.
*/
void evrpc_pool_add_connection(struct evrpc_pool *, void evrpc_pool_add_connection(struct evrpc_pool *,
struct evhttp_connection *); struct evhttp_connection *);
@ -329,6 +335,8 @@ void evrpc_pool_set_timeout(struct evrpc_pool *, int timeout_in_secs);
* Hooks for changing the input and output of RPCs; this can be used to * Hooks for changing the input and output of RPCs; this can be used to
* implement compression, authentication, encryption, ... * implement compression, authentication, encryption, ...
* *
* vbase may either be a pointer to struct evrpc_base or to struct evrpc_pool
*
* If a hook returns -1, the processing is aborted. * If a hook returns -1, the processing is aborted.
* *
* The add functions return handles that can be used for removing hooks. * The add functions return handles that can be used for removing hooks.
@ -338,12 +346,12 @@ enum EVRPC_HOOK_TYPE {
INPUT, OUTPUT INPUT, OUTPUT
}; };
void *evrpc_add_hook(struct evrpc_base *base, void *evrpc_add_hook(void *vbase,
enum EVRPC_HOOK_TYPE hook_type, enum EVRPC_HOOK_TYPE hook_type,
int (*cb)(struct evhttp_request *, struct evbuffer *, void *), int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
void *cb_arg); void *cb_arg);
int evrpc_remove_hook(struct evrpc_base *base, int evrpc_remove_hook(void *vbase,
enum EVRPC_HOOK_TYPE hook_type, enum EVRPC_HOOK_TYPE hook_type,
void *handle); void *handle);

View File

@ -325,7 +325,7 @@ rpc_pool_with_connection(short port)
struct evhttp_connection *evcon; struct evhttp_connection *evcon;
struct evrpc_pool *pool; struct evrpc_pool *pool;
pool = evrpc_pool_new(); pool = evrpc_pool_new(NULL);
assert(pool != NULL); assert(pool != NULL);
evcon = evhttp_connection_new("127.0.0.1", port); evcon = evhttp_connection_new("127.0.0.1", port);
@ -346,8 +346,8 @@ GotKillCb(struct evrpc_status *status,
if (need_output_hook) { if (need_output_hook) {
struct evhttp_request *req = status->http_req; struct evhttp_request *req = status->http_req;
const char *header = evhttp_find_header( const char *header = evhttp_find_header(
req->input_headers, "X-Hook"); req->input_headers, "X-Pool-Hook");
assert(strcmp(header, "output") == 0); assert(strcmp(header, "ran") == 0);
} }
if (status->error != EVRPC_STATUS_ERR_NONE) if (status->error != EVRPC_STATUS_ERR_NONE)
@ -418,6 +418,19 @@ rpc_hook_add_header(struct evhttp_request *req,
return (0); return (0);
} }
static int
rpc_hook_remove_header(struct evhttp_request *req,
struct evbuffer *evbuf, void *arg)
{
const char *header = evhttp_find_header(req->input_headers, "X-Hook");
assert(header != NULL);
assert(strcmp(header, arg) == 0);
evhttp_remove_header(req->input_headers, "X-Hook");
evhttp_add_header(req->input_headers, "X-Pool-Hook", "ran");
return (0);
}
static void static void
rpc_basic_client(void) rpc_basic_client(void)
{ {
@ -442,6 +455,8 @@ rpc_basic_client(void)
pool = rpc_pool_with_connection(port); pool = rpc_pool_with_connection(port);
assert(evrpc_add_hook(pool, INPUT, rpc_hook_remove_header, "output"));
/* set up the basic message */ /* set up the basic message */
msg = msg_new(); msg = msg_new();
EVTAG_ASSIGN(msg, from_name, "niels"); EVTAG_ASSIGN(msg, from_name, "niels");