From 1d3a008af3433485256d8386366ceaac4d48af5f Mon Sep 17 00:00:00 2001 From: Niels Provos Date: Sat, 3 Nov 2007 22:51:26 +0000 Subject: [PATCH] provide hooks for outgoing pools; associate a base with a pool svn:r468 --- ChangeLog | 1 + evrpc-internal.h | 26 +++++++++++-- evrpc.c | 95 +++++++++++++++++++++++++++++++++------------- evrpc.h | 14 +++++-- test/regress_rpc.c | 21 ++++++++-- 5 files changed, 120 insertions(+), 37 deletions(-) diff --git a/ChangeLog b/ChangeLog index ff5a535e..4a819f5f 100644 --- a/ChangeLog +++ b/ChangeLog @@ -31,3 +31,4 @@ Changes in current version: o Fix evport implementation: port_disassociate called on unassociated events resulting in bogus errors; more efficient memory management; from Trond Norbye and Prakash Sangappa o support for hooks on rpc input and output; can be used to implement rpc independent processing such as compression or authentication. o use a min heap instead of a red-black tree for timeouts; as a result finding the min is a O(1) operation now; from Maxim Yegorushkin + o associate an event base with an rpc pool diff --git a/evrpc-internal.h b/evrpc-internal.h index 8b8dd691..c900f959 100644 --- a/evrpc-internal.h +++ b/evrpc-internal.h @@ -41,16 +41,30 @@ struct evrpc_hook { void *process_arg; }; +TAILQ_HEAD(evrpc_hook_list, evrpc_hook); + +/* + * this is shared between the base and the pool, so that we can reuse + * the hook adding functions; we alias both evrpc_pool and evrpc_base + * to this common structure. + */ +struct _evrpc_hooks { + /* hooks for processing outbound and inbound rpcs */ + struct evrpc_hook_list in_hooks; + struct evrpc_hook_list out_hooks; +}; + +#define input_hooks common.in_hooks +#define output_hooks common.out_hooks + struct evrpc_base { + struct _evrpc_hooks common; + /* the HTTP server under which we register our RPC calls */ struct evhttp* http_server; /* a list of all RPCs registered with us */ TAILQ_HEAD(evrpc_list, evrpc) registered_rpcs; - - /* hooks for processing outbound and inbound rpcs */ - TAILQ_HEAD(evrpc_hook_list, evrpc_hook) input_hooks; - struct evrpc_hook_list output_hooks; }; struct evrpc_req_generic; @@ -58,6 +72,10 @@ void evrpc_reqstate_free(struct evrpc_req_generic* rpc_state); /* A pool for holding evhttp_connection objects */ struct evrpc_pool { + struct _evrpc_hooks common; + + struct event_base *base; + struct evconq connections; int timeout; diff --git a/evrpc.c b/evrpc.c index b1fb4765..6b5138d2 100644 --- a/evrpc.c +++ b/evrpc.c @@ -100,19 +100,20 @@ evrpc_free(struct evrpc_base *base) } void * -evrpc_add_hook(struct evrpc_base *base, +evrpc_add_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, int (*cb)(struct evhttp_request *, struct evbuffer *, void *), void *cb_arg) { + struct _evrpc_hooks *base = vbase; struct evrpc_hook_list *head = NULL; struct evrpc_hook *hook = NULL; switch (hook_type) { case INPUT: - head = &base->input_hooks; + head = &base->in_hooks; break; case OUTPUT: - head = &base->output_hooks; + head = &base->out_hooks; break; default: assert(hook_type == INPUT || hook_type == OUTPUT); @@ -128,28 +129,10 @@ evrpc_add_hook(struct evrpc_base *base, return (hook); } -/* - * remove the hook specified by the handle - */ - -int -evrpc_remove_hook(struct evrpc_base *base, - enum EVRPC_HOOK_TYPE hook_type, - void *handle) +static int +evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle) { - struct evrpc_hook_list *head = NULL; struct evrpc_hook *hook = NULL; - switch (hook_type) { - case INPUT: - head = &base->input_hooks; - break; - case OUTPUT: - head = &base->output_hooks; - break; - default: - assert(hook_type == INPUT || hook_type == OUTPUT); - } - TAILQ_FOREACH(hook, head, next) { if (hook == handle) { TAILQ_REMOVE(head, hook, next); @@ -161,6 +144,29 @@ evrpc_remove_hook(struct evrpc_base *base, return (0); } +/* + * remove the hook specified by the handle + */ + +int +evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle) +{ + struct _evrpc_hooks *base = vbase; + struct evrpc_hook_list *head = NULL; + switch (hook_type) { + case INPUT: + head = &base->in_hooks; + break; + case OUTPUT: + head = &base->out_hooks; + break; + default: + assert(hook_type == INPUT || hook_type == OUTPUT); + } + + return (evrpc_remove_hook_internal(head, handle)); +} + static int evrpc_process_hooks(struct evrpc_hook_list *head, struct evhttp_request *req, struct evbuffer *evbuf) @@ -371,7 +377,7 @@ static int evrpc_schedule_request(struct evhttp_connection *connection, struct evrpc_request_wrapper *ctx); struct evrpc_pool * -evrpc_pool_new(void) +evrpc_pool_new(struct event_base *base) { struct evrpc_pool *pool = calloc(1, sizeof(struct evrpc_pool)); if (pool == NULL) @@ -380,6 +386,10 @@ evrpc_pool_new(void) TAILQ_INIT(&pool->connections); TAILQ_INIT(&pool->requests); + TAILQ_INIT(&pool->input_hooks); + TAILQ_INIT(&pool->output_hooks); + + pool->base = base; pool->timeout = -1; return (pool); @@ -397,6 +407,7 @@ evrpc_pool_free(struct evrpc_pool *pool) { struct evhttp_connection *connection; struct evrpc_request_wrapper *request; + struct evrpc_hook *hook; while ((request = TAILQ_FIRST(&pool->requests)) != NULL) { TAILQ_REMOVE(&pool->requests, request, next); @@ -409,6 +420,14 @@ evrpc_pool_free(struct evrpc_pool *pool) evhttp_connection_free(connection); } + while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) { + assert(evrpc_remove_hook(pool, INPUT, hook)); + } + + while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) { + assert(evrpc_remove_hook(pool, OUTPUT, hook)); + } + free(pool); } @@ -423,6 +442,12 @@ evrpc_pool_add_connection(struct evrpc_pool *pool, assert(connection->http_server == NULL); TAILQ_INSERT_TAIL(&pool->connections, connection, next); + /* + * associate an event base with this connection + */ + if (pool->base != NULL) + evhttp_connection_set_base(connection, pool->base); + /* * unless a timeout was specifically set for a connection, * the connection inherits the timeout from the pool. @@ -499,6 +524,11 @@ evrpc_schedule_request(struct evhttp_connection *connection, /* we need to know the connection that we might have to abort */ ctx->evcon = connection; + /* apply hooks to the outgoing request */ + if (evrpc_process_hooks(&pool->output_hooks, + req, req->output_buffer) == -1) + goto error; + if (pool->timeout > 0) { /* * a timeout after which the whole rpc is going to be aborted. @@ -533,6 +563,8 @@ evrpc_make_request(struct evrpc_request_wrapper *ctx) /* initialize the event structure for this rpc */ evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx); + if (pool->base != NULL) + event_base_set(pool->base, &ctx->ev_timeout); /* we better have some available connections on the pool */ assert(TAILQ_FIRST(&pool->connections) != NULL); @@ -564,13 +596,22 @@ evrpc_reply_done(struct evhttp_request *req, void *arg) /* we need to get the reply now */ if (req != NULL) { - res = ctx->reply_unmarshal(ctx->reply, req->input_buffer); - if (res == -1) { - status.error = EVRPC_STATUS_ERR_BADPAYLOAD; + /* apply hooks to the incoming request */ + if (evrpc_process_hooks(&pool->input_hooks, + req, req->input_buffer) == -1) { + status.error = EVRPC_STATUS_ERR_HOOKABORTED; + res = -1; + } else { + res = ctx->reply_unmarshal(ctx->reply, + req->input_buffer); + if (res == -1) { + status.error = EVRPC_STATUS_ERR_BADPAYLOAD; + } } } else { status.error = EVRPC_STATUS_ERR_TIMEOUT; } + if (res == -1) { /* clear everything that we might have written previously */ ctx->reply_clear(ctx->reply); diff --git a/evrpc.h b/evrpc.h index 45911146..99b24b0a 100644 --- a/evrpc.h +++ b/evrpc.h @@ -66,6 +66,7 @@ extern "C" { */ struct evbuffer; +struct event_base; struct evrpc_req_generic; /* Encapsulates a request */ @@ -260,6 +261,7 @@ struct evrpc_status { #define EVRPC_STATUS_ERR_TIMEOUT 1 #define EVRPC_STATUS_ERR_BADPAYLOAD 2 #define EVRPC_STATUS_ERR_UNSTARTED 3 +#define EVRPC_STATUS_ERR_HOOKABORTED 4 int error; /* for looking at headers or other information */ @@ -307,8 +309,12 @@ int evrpc_make_request(struct evrpc_request_wrapper *); * a pool has a number of connections associated with it. * rpc requests are always made via a pool. */ -struct evrpc_pool *evrpc_pool_new(void); +struct evrpc_pool *evrpc_pool_new(struct event_base *); void evrpc_pool_free(struct evrpc_pool *); +/* + * adds a connection over which rpc can be dispatched. the connection + * object must have been newly created. + */ void evrpc_pool_add_connection(struct evrpc_pool *, struct evhttp_connection *); @@ -329,6 +335,8 @@ void evrpc_pool_set_timeout(struct evrpc_pool *, int timeout_in_secs); * Hooks for changing the input and output of RPCs; this can be used to * implement compression, authentication, encryption, ... * + * vbase may either be a pointer to struct evrpc_base or to struct evrpc_pool + * * If a hook returns -1, the processing is aborted. * * The add functions return handles that can be used for removing hooks. @@ -338,12 +346,12 @@ enum EVRPC_HOOK_TYPE { INPUT, OUTPUT }; -void *evrpc_add_hook(struct evrpc_base *base, +void *evrpc_add_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, int (*cb)(struct evhttp_request *, struct evbuffer *, void *), void *cb_arg); -int evrpc_remove_hook(struct evrpc_base *base, +int evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle); diff --git a/test/regress_rpc.c b/test/regress_rpc.c index 341cf991..3fa11f79 100644 --- a/test/regress_rpc.c +++ b/test/regress_rpc.c @@ -325,7 +325,7 @@ rpc_pool_with_connection(short port) struct evhttp_connection *evcon; struct evrpc_pool *pool; - pool = evrpc_pool_new(); + pool = evrpc_pool_new(NULL); assert(pool != NULL); evcon = evhttp_connection_new("127.0.0.1", port); @@ -346,8 +346,8 @@ GotKillCb(struct evrpc_status *status, if (need_output_hook) { struct evhttp_request *req = status->http_req; const char *header = evhttp_find_header( - req->input_headers, "X-Hook"); - assert(strcmp(header, "output") == 0); + req->input_headers, "X-Pool-Hook"); + assert(strcmp(header, "ran") == 0); } if (status->error != EVRPC_STATUS_ERR_NONE) @@ -418,6 +418,19 @@ rpc_hook_add_header(struct evhttp_request *req, return (0); } +static int +rpc_hook_remove_header(struct evhttp_request *req, + struct evbuffer *evbuf, void *arg) +{ + const char *header = evhttp_find_header(req->input_headers, "X-Hook"); + assert(header != NULL); + assert(strcmp(header, arg) == 0); + evhttp_remove_header(req->input_headers, "X-Hook"); + evhttp_add_header(req->input_headers, "X-Pool-Hook", "ran"); + + return (0); +} + static void rpc_basic_client(void) { @@ -442,6 +455,8 @@ rpc_basic_client(void) pool = rpc_pool_with_connection(port); + assert(evrpc_add_hook(pool, INPUT, rpc_hook_remove_header, "output")); + /* set up the basic message */ msg = msg_new(); EVTAG_ASSIGN(msg, from_name, "niels");