From 659d54d5304c476f5cf0ec502235e376e1d2d8a1 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 5 May 2009 02:59:26 +0000 Subject: [PATCH] Add new code to make and accept connections. This is stuff that it's easy to get wrong (as I noticed when writing bench_http), and that takes up a fair amount of space (see http.c). Also, it's something that we'll eventually want to abstract to use IOCP, where available. svn:r1272 --- ChangeLog | 2 + Makefile.am | 2 +- bufferevent-internal.h | 1 + bufferevent_sock.c | 63 ++++++++++++- include/event2/bufferevent.h | 21 ++++- include/event2/listener.h | 103 +++++++++++++++++++++ listener.c | 168 +++++++++++++++++++++++++++++++++++ test/regress_bufferevent.c | 109 +++++++++++++++++++++++ 8 files changed, 466 insertions(+), 3 deletions(-) create mode 100644 include/event2/listener.h create mode 100644 listener.c diff --git a/ChangeLog b/ChangeLog index fdceb498..1baf92b2 100644 --- a/ChangeLog +++ b/ChangeLog @@ -16,6 +16,8 @@ Changes in 2.0.2-alpha: o Move event_set() and its allies to event2/event_compat.h where they belong. o Remove the event_gotsig code, which has long been deprecated and unused. o Add an event_get_base() function to return the base assigned to an event. + o New function to automate connecting on a socket-based bufferevent. + o New functions to automate listening for incoming TCP connections. Changes in 2.0.1-alpha: diff --git a/Makefile.am b/Makefile.am index d8d8031c..7701dffd 100644 --- a/Makefile.am +++ b/Makefile.am @@ -108,7 +108,7 @@ event-config.h: config.h CORE_SRC = event.c buffer.c \ bufferevent.c bufferevent_sock.c bufferevent_filter.c \ - bufferevent_pair.c \ + bufferevent_pair.c listener.c \ evmap.c log.c evutil.c strlcpy.c $(SYS_SRC) EXTRA_SRC = event_tagging.c http.c evdns.c evrpc.c diff --git a/bufferevent-internal.h b/bufferevent-internal.h index 57748ebc..d937db3c 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -49,6 +49,7 @@ struct bufferevent_private { unsigned readcb_pending : 1; unsigned writecb_pending : 1; + unsigned connecting : 1; short errorcb_pending; int errno_pending; struct deferred_cb deferred; diff --git a/bufferevent_sock.c b/bufferevent_sock.c index 877c912c..77469b41 100644 --- a/bufferevent_sock.c +++ b/bufferevent_sock.c @@ -52,6 +52,10 @@ #include #endif +#ifdef _EVENT_HAVE_SYS_SOCKET_H +#include +#endif + #include "event2/util.h" #include "event2/bufferevent.h" #include "event2/buffer.h" @@ -172,6 +176,8 @@ static void bufferevent_writecb(evutil_socket_t fd, short event, void *arg) { struct bufferevent *bufev = arg; + struct bufferevent_private *bufev_p = + EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); int res = 0; short what = EVBUFFER_WRITE; @@ -179,6 +185,14 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) what |= EVBUFFER_TIMEOUT; goto error; } + if (bufev_p->connecting) { + bufev_p->connecting = 0; + _bufferevent_run_errorcb(bufev, EVBUFFER_CONNECTED); + if (!(bufev->enabled & EV_WRITE)) { + event_del(&bufev->ev_write); + return; + } + } if (evbuffer_get_length(bufev->output)) { evbuffer_unfreeze(bufev->output, 1); @@ -250,6 +264,50 @@ bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, return bufev; } +int +bufferevent_socket_connect(struct bufferevent *bev, + struct sockaddr *sa, int socklen) +{ + struct bufferevent_private *bufev_p = + EVUTIL_UPCAST(bev, struct bufferevent_private, bev); + + int family = sa->sa_family; + evutil_socket_t fd; + int made_socket = 0; + + if (!bufev_p) + return -1; + + fd = event_get_fd(&bev->ev_read); + if (fd < 0) { + made_socket = 1; + if ((fd = socket(family, SOCK_STREAM, 0)) < 0) + return -1; + if (evutil_make_socket_nonblocking(fd) < 0) { + EVUTIL_CLOSESOCKET(fd); + return -1; + } + bufferevent_setfd(bev, fd); + } + + if (connect(fd, sa, socklen)<0) { + int e = evutil_socket_geterror(fd); + if (EVUTIL_ERR_CONNECT_RETRIABLE(e)) { + if (! be_socket_enable(bev, EV_WRITE)) { + bufev_p->connecting = 1; + return 0; + } + } + _bufferevent_run_errorcb(bev, EVBUFFER_ERROR); + /* do something about the error? */ + } else { + /* The connect succeeded already. How odd. */ + _bufferevent_run_errorcb(bev, EVBUFFER_CONNECTED); + } + + return 0; +} + /* * Create a new buffered event object. * @@ -293,11 +351,14 @@ be_socket_enable(struct bufferevent *bufev, short event) static int be_socket_disable(struct bufferevent *bufev, short event) { + struct bufferevent_private *bufev_p = + EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); if (event & EV_READ) { if (event_del(&bufev->ev_read) == -1) return -1; } - if (event & EV_WRITE) { + /* Don't actually disable the write if we are trying to connect. */ + if ((event & EV_WRITE) && ! bufev_p->connecting) { if (event_del(&bufev->ev_write) == -1) return -1; } diff --git a/include/event2/bufferevent.h b/include/event2/bufferevent.h index 0130f285..893f1594 100644 --- a/include/event2/bufferevent.h +++ b/include/event2/bufferevent.h @@ -78,9 +78,11 @@ extern "C" { #define EVBUFFER_EOF 0x10 /**< eof file reached */ #define EVBUFFER_ERROR 0x20 /**< unrecoverable error encountered */ #define EVBUFFER_TIMEOUT 0x40 /**< user specified timeout reached */ +#define EVBUFFER_CONNECTED 0x80 /**< connect operation finished. */ struct bufferevent; struct event_base; struct evbuffer; +struct sockaddr; /** type definition for the read or write callback. @@ -134,13 +136,30 @@ enum bufferevent_options { @param base the event base to associate with the new bufferevent. @param fd the file descriptor from which data is read and written to. - This file descriptor is not allowed to be a pipe(2). + This file descriptor is not allowed to be a pipe(2). + It is safe to set the fd to -1, so long as you later + set it with bufferevent_setfd or bufferevent_socket_connect(). @return a pointer to a newly allocated bufferevent struct, or NULL if an error occurred @see bufferevent_free() */ struct bufferevent *bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, enum bufferevent_options options); +/** + Launch a connect() attempt with a socket. When the connect succeeds, + the errorcb will be invoked with EVBUFFER_CONNECTED set. + + If the bufferevent does not already have a socket set, we allocate a new + socket here and make it nonblocking before we begin. + + @param bufev an existing bufferevent allocated with + bufferevent_socket_new(). + @param addr the address we should connect to + @param socklen The length of the address + @return 0 on success, -1 on failure. + */ +int bufferevent_socket_connect(struct bufferevent *, struct sockaddr *, int); + /** Assign a bufferevent to a specific event_base. diff --git a/include/event2/listener.h b/include/event2/listener.h new file mode 100644 index 00000000..9abbb9ad --- /dev/null +++ b/include/event2/listener.h @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2000-2007 Niels Provos + * Copyright (c) 2007-2009 Niels Provos and Nick Mathewson + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +#ifndef _EVENT2_LISTENER_H_ +#define _EVENT2_LISTENER_H_ + +#include + +struct sockaddr; +struct evconnlistener; + +/** + A callback that we invoke when a listener has a new connection. + + @param fd The new file descriptor + @param addr The source address of the connection + @param socklen The length of addr + @param user_arg the pointer passed to evconnlistener_new() + */ +typedef void (*evconnlistener_cb)(evutil_socket_t, struct sockaddr *, int socklen, void *); + +/** Flag: Indicates that we should not make incoming sockets nonblocking + * before passing them to the callback. */ +#define LEV_OPT_LEAVE_SOCKETS_BLOCKING (1u<<0) +/** Flag: Indicates that freeing the listener should close the underlying + * socket. */ +#define LEV_OPT_CLOSE_ON_FREE (1u<<1) +/** Flag: Indicates that we should set the close-on-exec flag, if possible */ +#define LEV_OPT_CLOSE_ON_EXEC (1u<<2) +/** Flag: Indicates that we should disable the timeout (if any) between when + * this socket is closed and when we can listen again on the same port. */ +#define LEV_OPT_REUSEABLE (1u<<3) + +/** + Allocate a new evconnlistener object to listen for incoming TCP connections + on a given file descriptor. + + @param base The event base to associate the listener with. + @param cb A callback to be invoked when a new connection arrives. + @param ptr A user-supplied pointer to give to the callback. + @param flags Any number of LEV_OPT_* flags + @param backlog Passed to the listen() call to determine the length of the + acceptable connection backlog. Set to -1 for a reasonable default. + @param fd The file descriptor to listen on. It must be a nonblocking + file descriptor, and it should already be bound to an appropriate + port and address. +*/ +struct evconnlistener *evconnlistener_new(struct event_base *base, + evconnlistener_cb cb, void *ptr, unsigned flags, int backlog, + evutil_socket_t fd); +/** + Allocate a new evconnlistener object to listen for incoming TCP connections + on a given address. + + @param base The event base to associate the listener with. + @param cb A callback to be invoked when a new connection arrives. + @param ptr A user-supplied pointer to give to the callback. + @param flags Any number of LEV_OPT_* flags + @param backlog Passed to the listen() call to determine the length of the + acceptable connection backlog. Set to -1 for a reasonable default. + @param addr The address to listen for connections on. + @param socklen The length of the address. + */ +struct evconnlistener *evconnlistener_new_bind(struct event_base *base, + evconnlistener_cb cb, void *ptr, unsigned flags, int backlog, + const struct sockaddr *sa, int socklen); +/** + Disable and deallocate an evconnlistener. + */ +void evconnlistener_free(struct evconnlistener *lev); +/** + Re-enable an evconnlistener that has been disabled. + */ +int evconnlistener_enable(struct evconnlistener *lev); +/** + Stop listening for connections on an evconnlistener. + */ +int evconnlistener_disable(struct evconnlistener *lev); + +#endif diff --git a/listener.c b/listener.c new file mode 100644 index 00000000..a84bbeb8 --- /dev/null +++ b/listener.c @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2009 Niels Provos, Nick Mathewson + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include + +#ifdef HAVE_CONFIG_H +#include "event-config.h" +#endif + +#ifdef WIN32 +#include +#endif +#include +#ifdef _EVENT_HAVE_SYS_SOCKET_H +#include +#endif +#ifdef _EVENT_HAVE_FCNTL_H +#include +#endif +#ifdef _EVENT_HAVE_UNISTD_H +#include +#endif + +#include +#include +#include +#include +#include "mm-internal.h" +#include "util-internal.h" +#include "log-internal.h" + +struct evconnlistener { + struct event listener; + evconnlistener_cb cb; + void *user_data; + unsigned flags; +}; + +static void listener_read_cb(evutil_socket_t, short, void *); + +struct evconnlistener * +evconnlistener_new(struct event_base *base, + evconnlistener_cb cb, void *ptr, unsigned flags, int backlog, + evutil_socket_t fd) +{ + struct evconnlistener *lev; + if (backlog > 0) { + if (listen(fd, backlog) < 0) + return NULL; + } + lev = mm_calloc(1, sizeof(struct evconnlistener)); + if (!lev) + return NULL; + lev->cb = cb; + lev->user_data = ptr; + lev->flags = flags; + event_assign(&lev->listener, base, fd, EV_READ|EV_PERSIST, + listener_read_cb, lev); + evconnlistener_enable(lev); + return lev; +} + +struct evconnlistener * +evconnlistener_new_bind(struct event_base *base, evconnlistener_cb cb, void *ptr, + unsigned flags, int backlog, const struct sockaddr *sa, int socklen) +{ + evutil_socket_t fd; + int on = 1; + int family = sa ? sa->sa_family : AF_UNSPEC; + + fd = socket(family, SOCK_STREAM, 0); + if (fd == -1) + return NULL; + if (evutil_make_socket_nonblocking(fd) < 0) + return NULL; + +#ifndef WIN32 + if (flags & LEV_OPT_CLOSE_ON_EXEC) { + if (fcntl(fd, F_SETFD, 1) == -1) { + EVUTIL_CLOSESOCKET(fd); + return NULL; + } + } +#endif + + setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void*)&on, sizeof(on)); + if (flags & LEV_OPT_REUSEABLE) { + evutil_make_listen_socket_reuseable(fd); + } + + if (sa) { + if (bind(fd, sa, socklen)<0) { + EVUTIL_CLOSESOCKET(fd); + return NULL; + } + } + + return evconnlistener_new(base, cb, ptr, flags, backlog, fd); +} + +void +evconnlistener_free(struct evconnlistener *lev) +{ + event_del(&lev->listener); + if (lev->flags & LEV_OPT_CLOSE_ON_FREE) + EVUTIL_CLOSESOCKET(event_get_fd(&lev->listener)); + mm_free(lev); +} + +int +evconnlistener_enable(struct evconnlistener *lev) +{ + return event_add(&lev->listener, NULL); +} + +int +evconnlistener_disable(struct evconnlistener *lev) +{ + return event_del(&lev->listener); +} + +static void +listener_read_cb(evutil_socket_t fd, short what, void *p) +{ + struct evconnlistener *lev = p; + int err; + while (1) { + struct sockaddr_storage ss; + socklen_t socklen = sizeof(ss); + + evutil_socket_t new_fd = accept(fd, (struct sockaddr*)&ss, &socklen); + if (new_fd < 0) + break; + + if (!(lev->flags & LEV_OPT_LEAVE_SOCKETS_BLOCKING)) + evutil_make_socket_nonblocking(new_fd); + + lev->cb(new_fd, (struct sockaddr*)&ss, (int)socklen, + lev->user_data); + } + err = evutil_socket_geterror(fd); + if (EVUTIL_ERR_ACCEPT_RETRIABLE(err)) + return; + event_sock_warn(fd, "Error from accept() call"); +} diff --git a/test/regress_bufferevent.c b/test/regress_bufferevent.c index c8753b08..1b01ee19 100644 --- a/test/regress_bufferevent.c +++ b/test/regress_bufferevent.c @@ -55,6 +55,10 @@ #include #include +#ifdef _EVENT_HAVE_ARPA_INET_H +#include +#endif + #include "event-config.h" #include "event2/event.h" #include "event2/event_struct.h" @@ -64,6 +68,7 @@ #include "event2/bufferevent.h" #include "event2/bufferevent_compat.h" #include "event2/bufferevent_struct.h" +#include "event2/listener.h" #include "event2/util.h" #include "bufferevent-internal.h" @@ -364,6 +369,108 @@ test_bufferevent_pair_filters(void) test_bufferevent_filters_impl(1); } + +static void +sender_writecb(struct bufferevent *bev, void *ctx) +{ + if (evbuffer_get_length(bufferevent_get_output(bev)) == 0) { + bufferevent_disable(bev,EV_READ|EV_WRITE); + bufferevent_free(bev); + } +} + +static void +sender_errorcb(struct bufferevent *bev, short what, void *ctx) +{ + TT_FAIL(("Got sender error %d",(int)what)); +} + +static int n_strings_read = 0; + +#define TEST_STR "Now is the time for all good events to signal for " \ + "the good of their protocol" +static void +listen_cb(evutil_socket_t fd, struct sockaddr *sa, int socklen, void *arg) +{ + struct event_base *base = arg; + struct bufferevent *bev; + const char s[] = TEST_STR; + bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); + bufferevent_write(bev, s, sizeof(s)); + bufferevent_setcb(bev, NULL, sender_writecb, sender_errorcb, NULL); + bufferevent_enable(bev, EV_WRITE); +} + +static void +reader_eventcb(struct bufferevent *bev, short what, void *ctx) +{ + struct event_base *base = ctx; + if (what & EVBUFFER_ERROR) { + perror("foobar"); + TT_FAIL(("got connector error %d", (int)what)); + return; + } + if (what & EVBUFFER_CONNECTED) { + bufferevent_enable(bev, EV_READ); + } + if (what & EVBUFFER_EOF) { + char buf[512]; + size_t n; + n = bufferevent_read(bev, buf, sizeof(buf)-1); + buf[n] = '\0'; + tt_str_op(buf, ==, TEST_STR); + if (++n_strings_read == 2) + event_base_loopexit(base, NULL); + } +end: + ; +} + +static void +test_bufferevent_connect(void *arg) +{ + struct basic_test_data *data = arg; + struct evconnlistener *lev=NULL; + struct bufferevent *bev1=NULL, *bev2=NULL; + struct sockaddr_in localhost; + struct sockaddr *sa = (struct sockaddr*)&localhost; + + memset(&localhost, 0, sizeof(localhost)); + + localhost.sin_port = htons(27015); + localhost.sin_addr.s_addr = htonl(0x7f000001L); + localhost.sin_family = AF_INET; + + lev = evconnlistener_new_bind(data->base, listen_cb, data->base, + LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, + 16, sa, sizeof(localhost)); + tt_assert(lev); + tt_assert(!evconnlistener_enable(lev)); + bev1 = bufferevent_socket_new(data->base, -1, BEV_OPT_CLOSE_ON_FREE); + bev2 = bufferevent_socket_new(data->base, -1, BEV_OPT_CLOSE_ON_FREE); + bufferevent_setcb(bev1, NULL, NULL, reader_eventcb, data->base); + bufferevent_setcb(bev2, NULL, NULL, reader_eventcb, data->base); + + tt_want(!bufferevent_socket_connect(bev1, sa, sizeof(localhost))); + tt_want(!bufferevent_socket_connect(bev2, sa, sizeof(localhost))); + + bufferevent_enable(bev1, EV_READ); + bufferevent_enable(bev2, EV_READ); + + event_base_dispatch(data->base); + + tt_int_op(n_strings_read, ==, 2); +end: + if (lev) + evconnlistener_free(lev); + + if (bev1) + bufferevent_free(bev1); + + if (bev2) + bufferevent_free(bev2); +} + struct testcase_t bufferevent_testcases[] = { LEGACY(bufferevent, TT_ISOLATED), @@ -372,6 +479,8 @@ struct testcase_t bufferevent_testcases[] = { LEGACY(bufferevent_pair_watermarks, TT_ISOLATED), LEGACY(bufferevent_filters, TT_ISOLATED), LEGACY(bufferevent_pair_filters, TT_ISOLATED), + { "bufferevent_connect", test_bufferevent_connect, TT_FORK|TT_NEED_BASE, + &basic_setup, NULL }, #ifdef _EVENT_HAVE_LIBZ LEGACY(bufferevent_zlib, TT_ISOLATED), #else