allow association of meta data with RPC requests for hook processing

svn:r622
This commit is contained in:
Niels Provos 2007-12-29 22:45:54 +00:00
parent 024804cce7
commit 5a5609c753
5 changed files with 200 additions and 21 deletions

View File

@ -32,6 +32,7 @@ Changes in current version:
o support string arrays in event_rpcgen o support string arrays in event_rpcgen
o change evrpc hooking to allow pausing of RPCs; this will make it possible for the hook to do some meaning ful work; this is not backwards compatible. o change evrpc hooking to allow pausing of RPCs; this will make it possible for the hook to do some meaning ful work; this is not backwards compatible.
o allow an http request callback to take ownership of a request structure o allow an http request callback to take ownership of a request structure
o allow association of meta data with RPC requests for hook processing
Changes in 1.4.0: Changes in 1.4.0:
o allow \r or \n individually to separate HTTP headers instead of the standard "\r\n"; from Charles Kerr. o allow \r or \n individually to separate HTTP headers instead of the standard "\r\n"; from Charles Kerr.

View File

@ -36,7 +36,9 @@ struct evrpc;
struct evrpc_hook { struct evrpc_hook {
TAILQ_ENTRY(evrpc_hook) (next); TAILQ_ENTRY(evrpc_hook) (next);
/* returns -1; if the rpc should be aborted, is allowed to rewrite */ /* returns EVRPC_TERMINATE; if the rpc should be aborted.
* a hook is is allowed to rewrite the evbuffer
*/
int (*process)(void *, struct evhttp_request *, int (*process)(void *, struct evhttp_request *,
struct evbuffer *, void *); struct evbuffer *, void *);
void *process_arg; void *process_arg;
@ -98,4 +100,17 @@ struct evrpc_hook_ctx {
void (*cb)(void *, enum EVRPC_HOOK_RESULT); void (*cb)(void *, enum EVRPC_HOOK_RESULT);
}; };
struct evrpc_meta {
TAILQ_ENTRY(evrpc_meta) (next);
char *key;
void *data;
size_t data_size;
};
TAILQ_HEAD(evrpc_meta_list, evrpc_meta);
/* frees the meta data associated with a request */
static void evrpc_meta_data_free(struct evrpc_meta_list *meta_data);
#endif /* _EVRPC_INTERNAL_H_ */ #endif /* _EVRPC_INTERNAL_H_ */

102
evrpc.c
View File

@ -365,6 +365,8 @@ evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
rpc = rpc_state->rpc; rpc = rpc_state->rpc;
/* clean up all memory */ /* clean up all memory */
if (rpc_state->meta_data != NULL)
evrpc_meta_data_free(rpc_state->meta_data);
if (rpc_state->request != NULL) if (rpc_state->request != NULL)
rpc->request_free(rpc_state->request); rpc->request_free(rpc_state->request);
if (rpc_state->reply != NULL) if (rpc_state->reply != NULL)
@ -477,6 +479,8 @@ evrpc_pool_new(struct event_base *base)
static void static void
evrpc_request_wrapper_free(struct evrpc_request_wrapper *request) evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
{ {
if (request->meta_data != NULL)
evrpc_meta_data_free(request->meta_data);
event_free(request->name); event_free(request->name);
event_free(request); event_free(request);
} }
@ -749,6 +753,41 @@ evrpc_make_request(struct evrpc_request_wrapper *ctx)
return (0); return (0);
} }
struct evrpc_request_wrapper *
evrpc_send_request_generic(
struct evrpc_pool *pool, void *request, void *reply,
const char *rpcname,
void (*req_marshal)(struct evbuffer*, void *),
void (*rpl_clear)(void *),
int (*rpl_unmarshal)(void *, struct evbuffer *),
void (*cb)(struct evrpc_status *, void *, void *, void *),
void *cbarg)
{
struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *)
event_malloc(sizeof(struct evrpc_request_wrapper));
if (ctx == NULL)
return (NULL);
ctx->pool = pool;
ctx->meta_data = NULL;
ctx->evcon = NULL;
ctx->name = event_strdup(rpcname);
if (ctx->name == NULL) {
event_free(ctx);
return (NULL);
}
ctx->cb = cb;
ctx->cb_arg = cbarg;
ctx->request = request;
ctx->reply = reply;
ctx->request_marshal = req_marshal;
ctx->reply_clear = rpl_clear;
ctx->reply_unmarshal = rpl_unmarshal;
return (ctx);
}
static void static void
evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT); evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT);
@ -864,3 +903,66 @@ evrpc_request_timeout(evutil_socket_t fd, short what, void *arg)
evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT); evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
} }
/*
* frees potential meta data associated with a request.
*/
static void
evrpc_meta_data_free(struct evrpc_meta_list *meta_data)
{
struct evrpc_meta *entry;
assert(meta_data != NULL);
while ((entry = TAILQ_FIRST(meta_data)) != NULL) {
TAILQ_REMOVE(meta_data, entry, next);
event_free(entry->key);
event_free(entry->data);
event_free(entry);
}
event_free(meta_data);
}
/* adds meta data */
void
evrpc_hook_add_meta(void *ctx, const char *key,
const void *data, size_t data_size)
{
struct evrpc_request_wrapper *req = ctx;
struct evrpc_meta *meta = NULL;
if (req->meta_data == NULL) {
req->meta_data = event_malloc(sizeof(struct evrpc_meta_list));
assert(req->meta_data != NULL);
TAILQ_INIT(req->meta_data);
}
assert((meta = event_malloc(sizeof(struct evrpc_meta))) != NULL);
assert((meta->key = event_strdup(key)) != NULL);
meta->data_size = data_size;
assert((meta->data = event_malloc(data_size)) != NULL);
memcpy(meta->data, data, data_size);
TAILQ_INSERT_TAIL(req->meta_data, meta, next);
}
int
evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size)
{
struct evrpc_request_wrapper *req = ctx;
struct evrpc_meta *meta = NULL;
if (req->meta_data == NULL)
return (-1);
TAILQ_FOREACH(meta, req->meta_data, next) {
if (strcmp(meta->key, key) == 0) {
*data = meta->data;
*data_size = meta->data_size;
return (0);
}
}
return (-1);
}

78
evrpc.h
View File

@ -114,9 +114,16 @@ struct evrpc {
struct evhttp_request; struct evhttp_request;
struct evrpc_status; struct evrpc_status;
struct evrpc_meta_list;
/* We alias the RPC specific structs to this voided one */ /* We alias the RPC specific structs to this voided one */
struct evrpc_req_generic { struct evrpc_req_generic {
/*
* allows association of meta data via hooks - needs to be
* synchronized with evrpc_request_wrapper
*/
struct evrpc_meta_list *meta_data;
/* the unmarshaled request object */ /* the unmarshaled request object */
void *request; void *request;
@ -158,6 +165,7 @@ struct evrpc_req_generic {
*/ */
#define EVRPC_HEADER(rpcname, reqstruct, rplystruct) \ #define EVRPC_HEADER(rpcname, reqstruct, rplystruct) \
EVRPC_STRUCT(rpcname) { \ EVRPC_STRUCT(rpcname) { \
struct evrpc_meta_list *meta_data; \
struct reqstruct* request; \ struct reqstruct* request; \
struct rplystruct* reply; \ struct rplystruct* reply; \
struct evrpc* rpc; \ struct evrpc* rpc; \
@ -172,6 +180,18 @@ int evrpc_send_request_##rpcname(struct evrpc_pool *, \
struct reqstruct *, struct rplystruct *, void *cbarg), \ struct reqstruct *, struct rplystruct *, void *cbarg), \
void *); void *);
struct evrpc_pool;
/** use EVRPC_GENERATE instead */
struct evrpc_request_wrapper *evrpc_send_request_generic(
struct evrpc_pool *pool, void *request, void *reply,
const char *rpcname,
void (*req_marshal)(struct evbuffer*, void *),
void (*rpl_clear)(void *),
int (*rpl_unmarshal)(void *, struct evbuffer *),
void (*cb)(struct evrpc_status *, void *, void *, void *),
void *cbarg);
/** Generates the code for receiving and sending an RPC message /** Generates the code for receiving and sending an RPC message
* *
* EVRPC_GENERATE is used to create the code corresponding to sending * EVRPC_GENERATE is used to create the code corresponding to sending
@ -190,25 +210,15 @@ int evrpc_send_request_##rpcname(struct evrpc_pool *pool, \
void *cbarg) { \ void *cbarg) { \
struct evrpc_status status; \ struct evrpc_status status; \
struct evrpc_request_wrapper *ctx; \ struct evrpc_request_wrapper *ctx; \
ctx = (struct evrpc_request_wrapper *) \ ctx = evrpc_send_request_generic(pool, request, reply, \
malloc(sizeof(struct evrpc_request_wrapper)); \ #rpcname, \
(void (*)(struct evbuffer *, void *))reqstruct##_marshal, \
(void (*)(void *))rplystruct##_clear, \
(int (*)(void *, struct evbuffer *))rplystruct##_unmarshal, \
(void (*)(struct evrpc_status *, void *, void *, void *))cb, \
cbarg); \
if (ctx == NULL) \ if (ctx == NULL) \
goto error; \ goto error; \
ctx->pool = pool; \
ctx->evcon = NULL; \
ctx->name = strdup(#rpcname); \
if (ctx->name == NULL) { \
free(ctx); \
goto error; \
} \
ctx->cb = (void (*)(struct evrpc_status *, \
void *, void *, void *))cb; \
ctx->cb_arg = cbarg; \
ctx->request = (void *)request; \
ctx->reply = (void *)reply; \
ctx->request_marshal = (void (*)(struct evbuffer *, void *))reqstruct##_marshal; \
ctx->reply_clear = (void (*)(void *))rplystruct##_clear; \
ctx->reply_unmarshal = (int (*)(void *, struct evbuffer *))rplystruct##_unmarshal; \
return (evrpc_make_request(ctx)); \ return (evrpc_make_request(ctx)); \
error: \ error: \
memset(&status, 0, sizeof(status)); \ memset(&status, 0, sizeof(status)); \
@ -325,7 +335,6 @@ int evrpc_unregister_rpc(struct evrpc_base *base, const char *name);
* Client-side RPC support * Client-side RPC support
*/ */
struct evrpc_pool;
struct evhttp_connection; struct evhttp_connection;
/** /**
@ -344,6 +353,12 @@ struct evrpc_status {
}; };
struct evrpc_request_wrapper { struct evrpc_request_wrapper {
/*
* allows association of meta data via hooks - needs to be
* synchronized with evrpc_req_generic.
*/
struct evrpc_meta_list *meta_data;
TAILQ_ENTRY(evrpc_request_wrapper) next; TAILQ_ENTRY(evrpc_request_wrapper) next;
/* pool on which this rpc request is being made */ /* pool on which this rpc request is being made */
@ -500,6 +515,33 @@ int evrpc_remove_hook(void *vbase,
int int
evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res); evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res);
/** adds meta data to request
*
* evrpc_hook_add_meta() allows hooks to add meta data to a request. for
* a client requet, the meta data can be inserted by an outgoing request hook
* and retrieved by the incoming request hook.
*
* @param ctx the context provided to the hook call
* @param key a NUL-terminated c-string
* @param data the data to be associated with the key
* @param data_size the size of the data
*/
void evrpc_hook_add_meta(void *ctx, const char *key,
const void *data, size_t data_size);
/** retrieves meta data previously associated
*
* evrpc_hook_find_meta() can be used to retrieve meta data associated to a
* request by a previous hook.
* @param ctx the context provided to the hook call
* @param key a NUL-terminated c-string
* @param data pointer to a data pointer that will contain the retrieved data
* @param data_size pointer tothe size of the data
* @return 0 on success or -1 on failure
*/
int evrpc_hook_find_meta(void *ctx, const char *key,
void **data, size_t *data_size);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -417,7 +417,17 @@ rpc_hook_add_header(void *ctx, struct evhttp_request *req,
evhttp_add_header(req->input_headers, "X-Hook", hook_type); evhttp_add_header(req->input_headers, "X-Hook", hook_type);
else else
evhttp_add_header(req->output_headers, "X-Hook", hook_type); evhttp_add_header(req->output_headers, "X-Hook", hook_type);
return (0);
return (EVRPC_CONTINUE);
}
static int
rpc_hook_add_meta(void *ctx, struct evhttp_request *req,
struct evbuffer *evbuf, void *arg)
{
evrpc_hook_add_meta(ctx, "meta", "test", 5);
return (EVRPC_CONTINUE);
} }
static int static int
@ -425,12 +435,20 @@ rpc_hook_remove_header(void *ctx, struct evhttp_request *req,
struct evbuffer *evbuf, void *arg) struct evbuffer *evbuf, void *arg)
{ {
const char *header = evhttp_find_header(req->input_headers, "X-Hook"); const char *header = evhttp_find_header(req->input_headers, "X-Hook");
void *data = NULL;
size_t data_len = 0;
assert(header != NULL); assert(header != NULL);
assert(strcmp(header, arg) == 0); assert(strcmp(header, arg) == 0);
evhttp_remove_header(req->input_headers, "X-Hook"); evhttp_remove_header(req->input_headers, "X-Hook");
evhttp_add_header(req->input_headers, "X-Pool-Hook", "ran"); evhttp_add_header(req->input_headers, "X-Pool-Hook", "ran");
return (0); assert(evrpc_hook_find_meta(ctx, "meta", &data, &data_len) == 0);
assert(data != NULL);
assert(data_len == 5);
return (EVRPC_CONTINUE);
} }
static void static void
@ -457,6 +475,7 @@ rpc_basic_client(void)
pool = rpc_pool_with_connection(port); pool = rpc_pool_with_connection(port);
assert(evrpc_add_hook(pool, OUTPUT, rpc_hook_add_meta, NULL));
assert(evrpc_add_hook(pool, INPUT, rpc_hook_remove_header, (void*)"output")); assert(evrpc_add_hook(pool, INPUT, rpc_hook_remove_header, (void*)"output"));
/* set up the basic message */ /* set up the basic message */