allow hooks to get access to the connection object

svn:r623
This commit is contained in:
Niels Provos 2007-12-31 19:33:30 +00:00
parent 5a5609c753
commit 2460aa5939
5 changed files with 179 additions and 97 deletions

View File

@ -33,6 +33,7 @@ Changes in current version:
o change evrpc hooking to allow pausing of RPCs; this will make it possible for the hook to do some meaning ful work; this is not backwards compatible.
o allow an http request callback to take ownership of a request structure
o allow association of meta data with RPC requests for hook processing
o associate more context for hooks to query such as the connection object
Changes in 1.4.0:
o allow \r or \n individually to separate HTTP headers instead of the standard "\r\n"; from Charles Kerr.

View File

@ -110,7 +110,19 @@ struct evrpc_meta {
TAILQ_HEAD(evrpc_meta_list, evrpc_meta);
struct evrpc_hook_meta {
struct evrpc_meta_list meta_data;
struct evhttp_connection *evcon;
};
/* allows association of meta data with a request */
static void evrpc_hook_associate_meta(struct evrpc_hook_meta **pctx,
struct evhttp_connection *evcon);
/* creates a new meta data store */
static struct evrpc_hook_meta *evrpc_hook_meta_new(void);
/* frees the meta data associated with a request */
static void evrpc_meta_data_free(struct evrpc_meta_list *meta_data);
static void evrpc_hook_context_free(struct evrpc_hook_meta *ctx);
#endif /* _EVRPC_INTERNAL_H_ */

241
evrpc.c
View File

@ -270,7 +270,6 @@ evrpc_request_cb(struct evhttp_request *req, void *arg)
{
struct evrpc *rpc = arg;
struct evrpc_req_generic *rpc_state = NULL;
int hook_res;
/* let's verify the outside parameters */
if (req->type != EVHTTP_REQ_POST ||
@ -285,26 +284,30 @@ evrpc_request_cb(struct evhttp_request *req, void *arg)
rpc_state->rpc_data = NULL;
rpc_state->done = evrpc_request_done;
if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) {
int hook_res;
/*
* we might want to allow hooks to suspend the processing,
* but at the moment, we assume that they just act as simple
* filters.
*/
hook_res = evrpc_process_hooks(&rpc->base->input_hooks,
rpc_state, req, req->input_buffer);
switch (hook_res) {
case EVRPC_TERMINATE:
goto error;
case EVRPC_PAUSE:
evrpc_pause_request(rpc->base, rpc_state,
evrpc_request_cb_closure);
return;
case EVRPC_CONTINUE:
break;
default:
assert(hook_res == EVRPC_TERMINATE ||
hook_res == EVRPC_CONTINUE || hook_res == EVRPC_PAUSE);
evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon);
/*
* allow hooks to modify the outgoing request
*/
hook_res = evrpc_process_hooks(&rpc->base->input_hooks,
rpc_state, req, req->input_buffer);
switch (hook_res) {
case EVRPC_TERMINATE:
goto error;
case EVRPC_PAUSE:
evrpc_pause_request(rpc->base, rpc_state,
evrpc_request_cb_closure);
return;
case EVRPC_CONTINUE:
break;
default:
assert(hook_res == EVRPC_TERMINATE ||
hook_res == EVRPC_CONTINUE ||
hook_res == EVRPC_PAUSE);
}
}
evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE);
@ -365,8 +368,8 @@ evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
rpc = rpc_state->rpc;
/* clean up all memory */
if (rpc_state->meta_data != NULL)
evrpc_meta_data_free(rpc_state->meta_data);
if (rpc_state->hook_meta != NULL)
evrpc_hook_context_free(rpc_state->hook_meta);
if (rpc_state->request != NULL)
rpc->request_free(rpc_state->request);
if (rpc_state->reply != NULL)
@ -384,7 +387,6 @@ evrpc_request_done(struct evrpc_req_generic *rpc_state)
{
struct evhttp_request *req = rpc_state->http_req;
struct evrpc *rpc = rpc_state->rpc;
int hook_res;
if (rpc->reply_complete(rpc_state->reply) == -1) {
/* the reply was not completely filled in. error out */
@ -399,22 +401,29 @@ evrpc_request_done(struct evrpc_req_generic *rpc_state)
/* serialize the reply */
rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply);
/* do hook based tweaks to the request */
hook_res = evrpc_process_hooks(&rpc->base->output_hooks,
rpc_state, req, rpc_state->rpc_data);
switch (hook_res) {
case EVRPC_TERMINATE:
goto error;
case EVRPC_PAUSE:
if (evrpc_pause_request(rpc->base, rpc_state,
evrpc_request_done_closure) == -1)
if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) {
int hook_res;
evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon);
/* do hook based tweaks to the request */
hook_res = evrpc_process_hooks(&rpc->base->output_hooks,
rpc_state, req, rpc_state->rpc_data);
switch (hook_res) {
case EVRPC_TERMINATE:
goto error;
return;
case EVRPC_CONTINUE:
break;
default:
assert(hook_res == EVRPC_TERMINATE ||
hook_res == EVRPC_CONTINUE || hook_res == EVRPC_PAUSE);
case EVRPC_PAUSE:
if (evrpc_pause_request(rpc->base, rpc_state,
evrpc_request_done_closure) == -1)
goto error;
return;
case EVRPC_CONTINUE:
break;
default:
assert(hook_res == EVRPC_TERMINATE ||
hook_res == EVRPC_CONTINUE ||
hook_res == EVRPC_PAUSE);
}
}
evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE);
@ -479,8 +488,8 @@ evrpc_pool_new(struct event_base *base)
static void
evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
{
if (request->meta_data != NULL)
evrpc_meta_data_free(request->meta_data);
if (request->hook_meta != NULL)
evrpc_hook_context_free(request->hook_meta);
event_free(request->name);
event_free(request);
}
@ -602,7 +611,6 @@ evrpc_schedule_request(struct evhttp_connection *connection,
struct evhttp_request *req = NULL;
struct evrpc_pool *pool = ctx->pool;
struct evrpc_status status;
int hook_res = 0;
if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
goto error;
@ -616,25 +624,32 @@ evrpc_schedule_request(struct evhttp_connection *connection,
/* if we get paused we also need to know the request */
ctx->req = req;
/* apply hooks to the outgoing request */
hook_res = evrpc_process_hooks(&pool->output_hooks,
ctx, req, req->output_buffer);
if (TAILQ_FIRST(&pool->output_hooks) != NULL) {
int hook_res;
switch (hook_res) {
case EVRPC_TERMINATE:
goto error;
case EVRPC_PAUSE:
/* we need to be explicitly resumed */
if (evrpc_pause_request(pool, ctx,
evrpc_schedule_request_closure) == -1)
evrpc_hook_associate_meta(&ctx->hook_meta, connection);
/* apply hooks to the outgoing request */
hook_res = evrpc_process_hooks(&pool->output_hooks,
ctx, req, req->output_buffer);
switch (hook_res) {
case EVRPC_TERMINATE:
goto error;
return (0);
case EVRPC_CONTINUE:
/* we can just continue */
break;
default:
assert(hook_res == EVRPC_TERMINATE ||
hook_res == EVRPC_CONTINUE || hook_res == EVRPC_PAUSE);
case EVRPC_PAUSE:
/* we need to be explicitly resumed */
if (evrpc_pause_request(pool, ctx,
evrpc_schedule_request_closure) == -1)
goto error;
return (0);
case EVRPC_CONTINUE:
/* we can just continue */
break;
default:
assert(hook_res == EVRPC_TERMINATE ||
hook_res == EVRPC_CONTINUE ||
hook_res == EVRPC_PAUSE);
}
}
evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE);
@ -770,7 +785,7 @@ evrpc_send_request_generic(
return (NULL);
ctx->pool = pool;
ctx->meta_data = NULL;
ctx->hook_meta = NULL;
ctx->evcon = NULL;
ctx->name = event_strdup(rpcname);
if (ctx->name == NULL) {
@ -796,7 +811,7 @@ evrpc_reply_done(struct evhttp_request *req, void *arg)
{
struct evrpc_request_wrapper *ctx = arg;
struct evrpc_pool *pool = ctx->pool;
int hook_res;
int hook_res = EVRPC_CONTINUE;
/* cancel any timeout we might have scheduled */
event_del(&ctx->ev_timeout);
@ -809,31 +824,39 @@ evrpc_reply_done(struct evhttp_request *req, void *arg)
return;
}
/* apply hooks to the incoming request */
hook_res = evrpc_process_hooks(&pool->input_hooks,
ctx, req, req->input_buffer);
if (TAILQ_FIRST(&pool->input_hooks) != NULL) {
evrpc_hook_associate_meta(&ctx->hook_meta, req->evcon);
switch (hook_res) {
case EVRPC_TERMINATE:
case EVRPC_CONTINUE:
evrpc_reply_done_closure(ctx, hook_res);
return;
case EVRPC_PAUSE:
/*
* if we get paused we also need to know the request.
* unfortunately, the underlying layer is going to free it.
* we need to request ownership explicitly
*/
if (req != NULL)
evhttp_request_own(req);
/* apply hooks to the incoming request */
hook_res = evrpc_process_hooks(&pool->input_hooks,
ctx, req, req->input_buffer);
evrpc_pause_request(pool, ctx, evrpc_reply_done_closure);
return;
default:
assert(hook_res == EVRPC_TERMINATE ||
hook_res == EVRPC_CONTINUE || hook_res == EVRPC_PAUSE);
switch (hook_res) {
case EVRPC_TERMINATE:
case EVRPC_CONTINUE:
break;
case EVRPC_PAUSE:
/*
* if we get paused we also need to know the
* request. unfortunately, the underlying
* layer is going to free it. we need to
* request ownership explicitly
*/
if (req != NULL)
evhttp_request_own(req);
evrpc_pause_request(pool, ctx,
evrpc_reply_done_closure);
return;
default:
assert(hook_res == EVRPC_TERMINATE ||
hook_res == EVRPC_CONTINUE ||
hook_res == EVRPC_PAUSE);
}
}
evrpc_reply_done_closure(ctx, hook_res);
/* http request is being freed by underlying layer */
}
@ -920,23 +943,49 @@ evrpc_meta_data_free(struct evrpc_meta_list *meta_data)
event_free(entry->data);
event_free(entry);
}
event_free(meta_data);
}
/* adds meta data */
static struct evrpc_hook_meta *
evrpc_hook_meta_new(void)
{
struct evrpc_hook_meta *ctx;
ctx = event_malloc(sizeof(struct evrpc_hook_meta));
assert(ctx != NULL);
TAILQ_INIT(&ctx->meta_data);
ctx->evcon = NULL;
return (ctx);
}
static void
evrpc_hook_associate_meta(struct evrpc_hook_meta **pctx,
struct evhttp_connection *evcon)
{
struct evrpc_hook_meta *ctx = *pctx;
if (ctx == NULL)
*pctx = ctx = evrpc_hook_meta_new();
ctx->evcon = evcon;
}
static void
evrpc_hook_context_free(struct evrpc_hook_meta *ctx)
{
evrpc_meta_data_free(&ctx->meta_data);
event_free(ctx);
}
/* Adds meta data */
void
evrpc_hook_add_meta(void *ctx, const char *key,
const void *data, size_t data_size)
{
struct evrpc_request_wrapper *req = ctx;
struct evrpc_hook_meta *store = NULL;
struct evrpc_meta *meta = NULL;
if (req->meta_data == NULL) {
req->meta_data = event_malloc(sizeof(struct evrpc_meta_list));
assert(req->meta_data != NULL);
TAILQ_INIT(req->meta_data);
}
if ((store = req->hook_meta) == NULL)
store = req->hook_meta = evrpc_hook_meta_new();
assert((meta = event_malloc(sizeof(struct evrpc_meta))) != NULL);
assert((meta->key = event_strdup(key)) != NULL);
@ -944,7 +993,7 @@ evrpc_hook_add_meta(void *ctx, const char *key,
assert((meta->data = event_malloc(data_size)) != NULL);
memcpy(meta->data, data, data_size);
TAILQ_INSERT_TAIL(req->meta_data, meta, next);
TAILQ_INSERT_TAIL(&store->meta_data, meta, next);
}
int
@ -953,10 +1002,10 @@ evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size)
struct evrpc_request_wrapper *req = ctx;
struct evrpc_meta *meta = NULL;
if (req->meta_data == NULL)
if (req->hook_meta == NULL)
return (-1);
TAILQ_FOREACH(meta, req->meta_data, next) {
TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) {
if (strcmp(meta->key, key) == 0) {
*data = meta->data;
*data_size = meta->data_size;
@ -966,3 +1015,11 @@ evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size)
return (-1);
}
struct evhttp_connection *
evrpc_hook_get_connection(void *ctx)
{
struct evrpc_request_wrapper *req = ctx;
return (req->evcon);
}

14
evrpc.h
View File

@ -114,7 +114,7 @@ struct evrpc {
struct evhttp_request;
struct evrpc_status;
struct evrpc_meta_list;
struct evrpc_hook_meta;
/* We alias the RPC specific structs to this voided one */
struct evrpc_req_generic {
@ -122,7 +122,7 @@ struct evrpc_req_generic {
* allows association of meta data via hooks - needs to be
* synchronized with evrpc_request_wrapper
*/
struct evrpc_meta_list *meta_data;
struct evrpc_hook_meta *hook_meta;
/* the unmarshaled request object */
void *request;
@ -165,7 +165,7 @@ struct evrpc_req_generic {
*/
#define EVRPC_HEADER(rpcname, reqstruct, rplystruct) \
EVRPC_STRUCT(rpcname) { \
struct evrpc_meta_list *meta_data; \
struct evrpc_hook_meta *hook_meta; \
struct reqstruct* request; \
struct rplystruct* reply; \
struct evrpc* rpc; \
@ -357,7 +357,7 @@ struct evrpc_request_wrapper {
* allows association of meta data via hooks - needs to be
* synchronized with evrpc_req_generic.
*/
struct evrpc_meta_list *meta_data;
struct evrpc_hook_meta *hook_meta;
TAILQ_ENTRY(evrpc_request_wrapper) next;
@ -542,6 +542,12 @@ void evrpc_hook_add_meta(void *ctx, const char *key,
int evrpc_hook_find_meta(void *ctx, const char *key,
void **data, size_t *data_size);
/** returns the connection object associated with the request
*
* @param ctx the context provided to the hook call
* @return a pointer to the evhttp_connection object
*/
struct evhttp_connection *evrpc_hook_get_connection(void *ctx);
#ifdef __cplusplus
}
#endif

View File

@ -418,6 +418,8 @@ rpc_hook_add_header(void *ctx, struct evhttp_request *req,
else
evhttp_add_header(req->output_headers, "X-Hook", hook_type);
assert(evrpc_hook_get_connection(ctx) != NULL);
return (EVRPC_CONTINUE);
}
@ -427,6 +429,8 @@ rpc_hook_add_meta(void *ctx, struct evhttp_request *req,
{
evrpc_hook_add_meta(ctx, "meta", "test", 5);
assert(evrpc_hook_get_connection(ctx) != NULL);
return (EVRPC_CONTINUE);
}
@ -448,6 +452,8 @@ rpc_hook_remove_header(void *ctx, struct evhttp_request *req,
assert(data != NULL);
assert(data_len == 5);
assert(evrpc_hook_get_connection(ctx) != NULL);
return (EVRPC_CONTINUE);
}