diff options
Diffstat (limited to 'net/openvswitch/files/threaded.diff')
-rw-r--r-- | net/openvswitch/files/threaded.diff | 1292 |
1 files changed, 0 insertions, 1292 deletions
diff --git a/net/openvswitch/files/threaded.diff b/net/openvswitch/files/threaded.diff deleted file mode 100644 index a3bd811ca151..000000000000 --- a/net/openvswitch/files/threaded.diff +++ /dev/null @@ -1,1292 +0,0 @@ -diff --git configure.ac configure.ac -index 5692b86..ff62627 100644 ---- configure.ac -+++ configure.ac -@@ -43,6 +43,7 @@ AC_SEARCH_LIBS([clock_gettime], [rt]) - AC_SEARCH_LIBS([timer_create], [rt]) - AC_SEARCH_LIBS([pcap_open_live], [pcap]) - -+OVS_CHECK_THREADED - OVS_CHECK_COVERAGE - OVS_CHECK_NDEBUG - OVS_CHECK_NETLINK -diff --git lib/automake.mk lib/automake.mk -index 13622b3..87bdd8d 100644 ---- lib/automake.mk -+++ lib/automake.mk -@@ -37,6 +37,7 @@ lib_libopenvswitch_a_SOURCES = \ - lib/daemon.c \ - lib/daemon.h \ - lib/dhcp.h \ -+ lib/dispatch.h \ - lib/dummy.c \ - lib/dummy.h \ - lib/dhparams.h \ -diff --git lib/dispatch.h lib/dispatch.h -new file mode 100644 -index 0000000..80ac9c7 ---- /dev/null -+++ lib/dispatch.h -@@ -0,0 +1,9 @@ -+#include <sys/types.h> -+#include "ofpbuf.h" -+ -+#ifndef DISPATCH_H -+#define DISPATCH_H 1 -+ -+typedef void (*pkt_handler)(u_char *user, struct ofpbuf* buf); -+ -+#endif /* DISPATCH_H */ -diff --git lib/dpif-netdev.c lib/dpif-netdev.c -index cade79e..509e2ef 100644 ---- lib/dpif-netdev.c -+++ lib/dpif-netdev.c -@@ -32,6 +32,15 @@ - #include <sys/stat.h> - #include <unistd.h> - -+#ifdef THREADED -+#include <signal.h> -+#include <pthread.h> -+ -+#include "socket-util.h" -+#include "fatal-signal.h" -+#include "dispatch.h" -+#endif -+ - #include "csum.h" - #include "dpif.h" - #include "dpif-provider.h" -@@ -55,6 +64,16 @@ - #include "vlog.h" - - VLOG_DEFINE_THIS_MODULE(dpif_netdev); -+/* We could use these macros instead of using #ifdef and #endif every time we -+ * need to call the pthread_mutex_lock/unlock. -+#ifdef THREADED -+#define LOCK(mutex) pthread_mutex_lock(mutex) -+#define UNLOCK(mutex) pthread_mutex_unlock(mutex) -+#else -+#define LOCK(mutex) -+#define UNLOCK(mutex) -+#endif -+*/ - - /* Configuration parameters. */ - enum { MAX_PORTS = 256 }; /* Maximum number of ports. */ -@@ -82,6 +101,21 @@ struct dp_netdev { - int open_cnt; - bool destroyed; - -+#ifdef THREADED -+ /* The pipe is used to signal the presence of a packet on the queue. -+ * - dpif_netdev_recv_wait() waits on p[0] -+ * - dpif_netdev_recv() extract from queue and read p[0] -+ * - dp_netdev_output_control() send to queue and write p[1] -+ */ -+ -+ int pipe[2]; /* signal a packet on the queue */ -+ struct pollfd *pipe_fd; -+ -+ pthread_mutex_t table_mutex; /* mutex for the flow table */ -+ pthread_mutex_t port_list_mutex; /* port list mutex */ -+ -+ /* The access to this queue is protected by the table_mutex mutex */ -+#endif - struct dp_netdev_queue queues[N_QUEUES]; - struct hmap flow_table; /* Flow table. */ - -@@ -102,6 +136,9 @@ struct dp_netdev_port { - struct list node; /* Element in dp_netdev's 'port_list'. */ - struct netdev *netdev; - char *type; /* Port type as requested by user. */ -+#ifdef THREADED -+ struct pollfd *poll_fd; /* To manage the poll loop in the thread. */ -+#endif - }; - - /* A flow in dp_netdev's 'flow_table'. */ -@@ -127,6 +164,11 @@ struct dpif_netdev { - unsigned int dp_serial; - }; - -+#ifdef THREADED -+/* XXX global Descriptor of the thread that manages the datapaths. */ -+pthread_t thread_p; -+#endif -+ - /* All netdev-based datapaths. */ - static struct shash dp_netdevs = SHASH_INITIALIZER(&dp_netdevs); - -@@ -204,6 +246,23 @@ create_dp_netdev(const char *name, const struct dpif_class *class, - dp->class = class; - dp->name = xstrdup(name); - dp->open_cnt = 0; -+#ifdef THREADED -+ error = pipe(dp->pipe); -+ if (error) { -+ VLOG_ERR("Unable to create datapath thread pipe: %s", strerror(errno)); -+ return errno; -+ } -+ if (set_nonblocking(dp->pipe[0]) || set_nonblocking(dp->pipe[1])) { -+ VLOG_ERR("Unable to set nonblocking on datapath thread pipe: %s", -+ strerror(errno)); -+ return errno; -+ } -+ dp->pipe_fd = NULL; -+ VLOG_DBG("Datapath thread pipe created (%d, %d)", dp->pipe[0], dp->pipe[1]); -+ -+ pthread_mutex_init(&dp->table_mutex, NULL); -+ pthread_mutex_init(&dp->port_list_mutex, NULL); -+#endif - for (i = 0; i < N_QUEUES; i++) { - dp->queues[i].head = dp->queues[i].tail = 0; - } -@@ -221,6 +280,38 @@ create_dp_netdev(const char *name, const struct dpif_class *class, - return 0; - } - -+#ifdef THREADED -+static void * dp_thread_body(void *args OVS_UNUSED); -+ -+/* This is the function that is called in response of a fatal signal (e.g. -+ * SIGTERM) */ -+static void -+dpif_netdev_exit_hook(void *aux OVS_UNUSED) -+{ -+ if (pthread_cancel(thread_p) == 0) { -+ pthread_join(thread_p, NULL); -+ } -+} -+ -+static int -+dpif_netdev_init(void) -+{ -+ static int error = -1; -+ -+ if (error < 0) { -+ fatal_signal_add_hook(dpif_netdev_exit_hook, NULL, NULL, true); -+ error = pthread_create(&thread_p, NULL, dp_thread_body, NULL); -+ if (error != 0) { -+ VLOG_ERR("Unable to create datapath thread: %s", strerror(errno)); -+ error = errno; -+ } else { -+ VLOG_DBG("Datapath thread started"); -+ } -+ } -+ return error; -+} -+#endif -+ - static int - dpif_netdev_open(const struct dpif_class *class, const char *name, - bool create, struct dpif **dpifp) -@@ -247,9 +338,14 @@ dpif_netdev_open(const struct dpif_class *class, const char *name, - } - - *dpifp = create_dpif_netdev(dp); -+#ifdef THREADED -+ dpif_netdev_init(); -+#endif - return 0; - } - -+/* table_mutex must be locked in THREADED mode. -+ */ - static void - dp_netdev_purge_queues(struct dp_netdev *dp) - { -@@ -273,11 +369,23 @@ dp_netdev_free(struct dp_netdev *dp) - struct dp_netdev_port *port, *next; - - dp_netdev_flow_flush(dp); -+#ifdef THREADED -+ pthread_mutex_lock(&dp->port_list_mutex); -+#endif - LIST_FOR_EACH_SAFE (port, next, node, &dp->port_list) { - do_del_port(dp, port->port_no); - } -+#ifdef THREADED -+ pthread_mutex_unlock(&dp->port_list_mutex); -+ pthread_mutex_lock(&dp->table_mutex); -+#endif - dp_netdev_purge_queues(dp); - hmap_destroy(&dp->flow_table); -+#ifdef THREADED -+ pthread_mutex_unlock(&dp->table_mutex); -+ pthread_mutex_destroy(&dp->table_mutex); -+ pthread_mutex_destroy(&dp->port_list_mutex); -+#endif - free(dp->name); - free(dp); - } -@@ -306,7 +414,13 @@ static int - dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats) - { - struct dp_netdev *dp = get_dp_netdev(dpif); -+#ifdef THREADED -+ pthread_mutex_lock(&dp->table_mutex); -+#endif - stats->n_flows = hmap_count(&dp->flow_table); -+#ifdef THREADED -+ pthread_mutex_unlock(&dp->table_mutex); -+#endif - stats->n_hit = dp->n_hit; - stats->n_missed = dp->n_missed; - stats->n_lost = dp->n_lost; -@@ -354,13 +468,22 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type, - port->port_no = port_no; - port->netdev = netdev; - port->type = xstrdup(type); -+#ifdef THREADED -+ port->poll_fd = NULL; -+#endif - - error = netdev_get_mtu(netdev, &mtu); - if (!error) { - max_mtu = mtu; - } - -+#ifdef THREADED -+ pthread_mutex_lock(&dp->port_list_mutex); -+#endif - list_push_back(&dp->port_list, &port->node); -+#ifdef THREADED -+ pthread_mutex_unlock(&dp->port_list_mutex); -+#endif - dp->ports[port_no] = port; - dp->serial++; - -@@ -448,15 +571,25 @@ get_port_by_name(struct dp_netdev *dp, - { - struct dp_netdev_port *port; - -+#ifdef THREADED -+ pthread_mutex_lock(&dp->port_list_mutex); -+#endif - LIST_FOR_EACH (port, node, &dp->port_list) { - if (!strcmp(netdev_get_name(port->netdev), devname)) { - *portp = port; -+#ifdef THREADED -+ pthread_mutex_unlock(&dp->port_list_mutex); -+#endif - return 0; - } - } -+#ifdef THREADED -+ pthread_mutex_unlock(&dp->port_list_mutex); -+#endif - return ENOENT; - } - -+/* In THREADED mode, must be called with port_list_mutex held. */ - static int - do_del_port(struct dp_netdev *dp, uint16_t port_no) - { -@@ -531,7 +664,13 @@ dpif_netdev_get_max_ports(const struct dpif *dpif OVS_UNUSED) - static void - dp_netdev_free_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow) - { -+#ifdef THREADED -+ pthread_mutex_lock(&dp->table_mutex); -+#endif - hmap_remove(&dp->flow_table, &flow->node); -+#ifdef THREADED -+ pthread_mutex_unlock(&dp->table_mutex); -+#endif - free(flow->actions); - free(flow); - } -@@ -620,7 +759,11 @@ dpif_netdev_port_poll_wait(const struct dpif *dpif_) - } - - static struct dp_netdev_flow * --dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key) -+#ifdef THREADED -+dp_netdev_lookup_flow_locked(struct dp_netdev *dp, const struct flow *key) -+#else -+dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key) -+#endif - { - struct dp_netdev_flow *flow; - -@@ -632,6 +775,19 @@ dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key) - return NULL; - } - -+#ifdef THREADED -+static struct dp_netdev_flow * -+dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key) -+{ -+ struct dp_netdev_flow *flow; -+ -+ pthread_mutex_lock(&dp->table_mutex); -+ flow = dp_netdev_lookup_flow_locked(dp, key); -+ pthread_mutex_unlock(&dp->table_mutex); -+ return flow; -+} -+#endif -+ - static void - get_dpif_flow_stats(struct dp_netdev_flow *flow, struct dpif_flow_stats *stats) - { -@@ -729,7 +885,13 @@ add_flow(struct dpif *dpif, const struct flow *key, - return error; - } - -+#ifdef THREADED -+ pthread_mutex_lock(&dp->table_mutex); -+#endif - hmap_insert(&dp->flow_table, &flow->node, flow_hash(&flow->key, 0)); -+#ifdef THREADED -+ pthread_mutex_unlock(&dp->table_mutex); -+#endif - return 0; - } - -@@ -749,6 +911,7 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) - struct dp_netdev_flow *flow; - struct flow key; - int error; -+ int n_flows; - - error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &key); - if (error) { -@@ -758,7 +921,14 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) - flow = dp_netdev_lookup_flow(dp, &key); - if (!flow) { - if (put->flags & DPIF_FP_CREATE) { -- if (hmap_count(&dp->flow_table) < MAX_FLOWS) { -+#ifdef THREADED -+ pthread_mutex_lock(&dp->table_mutex); -+#endif -+ n_flows = hmap_count(&dp->flow_table); -+#ifdef THREADED -+ pthread_mutex_unlock(&dp->table_mutex); -+#endif -+ if (n_flows < MAX_FLOWS) { - if (put->stats) { - memset(put->stats, 0, sizeof *put->stats); - } -@@ -843,7 +1013,13 @@ dpif_netdev_flow_dump_next(const struct dpif *dpif, void *state_, - struct dp_netdev_flow *flow; - struct hmap_node *node; - -+#ifdef THREADED -+ pthread_mutex_lock(&dp->table_mutex); -+#endif - node = hmap_at_position(&dp->flow_table, &state->bucket, &state->offset); -+#ifdef THREADED -+ pthread_mutex_unlock(&dp->table_mutex); -+#endif - if (!node) { - return EOF; - } -@@ -949,7 +1125,13 @@ static int - dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, - struct ofpbuf *buf) - { -- struct dp_netdev_queue *q = find_nonempty_queue(dpif); -+ struct dp_netdev_queue *q; -+#ifdef THREADED -+ struct dp_netdev *dp = get_dp_netdev(dpif); -+ char c; -+ pthread_mutex_lock(&dp->table_mutex); -+#endif -+ q = find_nonempty_queue(dpif); - if (q) { - struct dpif_upcall *u = q->upcalls[q->tail++ & QUEUE_MASK]; - *upcall = *u; -@@ -958,8 +1140,19 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, - ofpbuf_uninit(buf); - *buf = *upcall->packet; - -+#ifdef THREADED -+ /* Read a byte from the pipe to signal that a packet has been -+ * received. */ -+ if (read(dp->pipe[0], &c, 1) < 0) { -+ VLOG_ERR("Pipe read error (from datapath): %s", strerror(errno)); -+ } -+ pthread_mutex_unlock(&dp->table_mutex); -+#endif - return 0; - } else { -+#ifdef THREADED -+ pthread_mutex_unlock(&dp->table_mutex); -+#endif - return EAGAIN; - } - } -@@ -967,19 +1160,32 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, - static void - dpif_netdev_recv_wait(struct dpif *dpif) - { -+#ifdef THREADED -+ struct dp_netdev *dp = get_dp_netdev(dpif); -+ -+ poll_fd_wait(dp->pipe[0], POLLIN); -+#else - if (find_nonempty_queue(dpif)) { - poll_immediate_wake(); - } else { - /* No messages ready to be received, and dp_wait() will ensure that we - * wake up to queue new messages, so there is nothing to do. */ - } -+#endif - } - - static void - dpif_netdev_recv_purge(struct dpif *dpif) - { - struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif); -+#ifdef THREADED -+ struct dp_netdev *dp = get_dp_netdev(dpif); -+ pthread_mutex_lock(&dp->table_mutex); -+#endif - dp_netdev_purge_queues(dpif_netdev->dp); -+#ifdef THREADED -+ pthread_mutex_unlock(&dp->table_mutex); -+#endif - } - - static void -@@ -1003,7 +1209,12 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port, - return; - } - flow_extract(packet, 0, 0, odp_port_to_ofp_port(port->port_no), &key); -+#ifdef THREADED -+ pthread_mutex_lock(&dp->table_mutex); -+ flow = dp_netdev_lookup_flow_locked(dp, &key); -+#else - flow = dp_netdev_lookup_flow(dp, &key); -+#endif - if (flow) { - dp_netdev_flow_used(flow, &key, packet); - dp_netdev_execute_actions(dp, packet, &key, -@@ -1013,8 +1224,22 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port, - dp->n_missed++; - dp_netdev_output_userspace(dp, packet, DPIF_UC_MISS, &key, 0); - } -+#ifdef THREADED -+ pthread_mutex_unlock(&dp->table_mutex); -+#endif - } - -+#ifdef THREADED -+static void -+dpif_netdev_run(struct dpif *dpif OVS_UNUSED) -+{ -+} -+ -+static void -+dpif_netdev_wait(struct dpif *dpif OVS_UNUSED) -+{ -+} -+#else - static void - dpif_netdev_run(struct dpif *dpif) - { -@@ -1053,6 +1278,144 @@ dpif_netdev_wait(struct dpif *dpif) - netdev_recv_wait(port->netdev); - } - } -+#endif -+ -+#ifdef THREADED -+/* -+ * pcap callback argument -+ */ -+struct dispatch_arg { -+ struct dp_netdev *dp; /* update statistics */ -+ struct dp_netdev_port *port; /* argument to flow identifier function */ -+}; -+ -+/* Process a packet. -+ * -+ * The port_input function will send immediately if it finds a flow match and -+ * the associated action is ODPAT_OUTPUT or ODPAT_OUTPUT_GROUP. -+ * If a flow is not found or for the other actions, the packet is copied. -+ */ -+static void -+process_pkt(u_char *user, struct ofpbuf *buf) -+{ -+ struct dispatch_arg *arg = (struct dispatch_arg *)user; -+ -+ ofpbuf_padto(buf, ETH_TOTAL_MIN); -+ dp_netdev_port_input(arg->dp, arg->port, buf); -+} -+ -+/* Body of the thread that manages the datapaths */ -+static void* -+dp_thread_body(void *args OVS_UNUSED) -+{ -+ struct dp_netdev *dp; -+ struct dp_netdev_port *port; -+ struct dispatch_arg arg; -+ int error; -+ int n_fds; -+ uint32_t batch = 50; /* max number of pkts processed by the dispatch */ -+ int processed; /* actual number of pkts processed by the dispatch */ -+ char readbuf[1024]; -+ -+ sigset_t sigmask; -+ -+ /*XXX Since the poll involves all ports of all datapaths, the right fds -+ * size should be MAX_PORTS * max_number_of_datapaths */ -+ struct pollfd fds[MAX_PORTS + 1]; -+ -+ /* mask the fatal signals. In this way the main thread is delegate to -+ * manage this them. */ -+ sigemptyset(&sigmask); -+ sigaddset(&sigmask, SIGTERM); -+ sigaddset(&sigmask, SIGALRM); -+ sigaddset(&sigmask, SIGINT); -+ sigaddset(&sigmask, SIGHUP); -+ -+ if (pthread_sigmask(SIG_BLOCK, &sigmask, NULL) != 0) { -+ VLOG_ERR("Error setting thread sigmask: %s", strerror(errno)); -+ } -+ -+ for(;;) { -+ struct shash_node *node; -+ n_fds = 0; -+ /* build the structure for poll */ -+ SHASH_FOR_EACH(node, &dp_netdevs) { -+ dp = (struct dp_netdev *)node->data; -+ fds[n_fds].fd = dp->pipe[1]; -+ fds[n_fds].events = POLLIN; -+ dp->pipe_fd = &fds[n_fds]; -+ n_fds++; -+ if (n_fds >= sizeof(fds) / sizeof(fds[0])) { -+ VLOG_ERR("Too many fds for poll adding pipe_fd"); -+ break; -+ } -+ pthread_mutex_lock(&dp->port_list_mutex); -+ LIST_FOR_EACH (port, node, &dp->port_list) { -+ /* insert an element in the fds structure */ -+ fds[n_fds].fd = netdev_get_fd(port->netdev); -+ fds[n_fds].events = POLLIN; -+ port->poll_fd = &fds[n_fds]; -+ n_fds++; -+ if (n_fds >= sizeof(fds) / sizeof(fds[0])) { -+ VLOG_ERR("Too many fds for poll adding port fd"); -+ break; -+ } -+ } -+ pthread_mutex_unlock(&dp->port_list_mutex); -+ } -+ -+ error = poll(fds, n_fds, 2000); -+ VLOG_DBG("dp_thread_body poll wakeup with cnt=%d", error); -+ -+ if (error < 0) { -+ if (errno == EINTR) { -+ /* XXX get this case in detach mode */ -+ continue; -+ } -+ VLOG_ERR("Datapath thread poll() error: %s\n", strerror(errno)); -+ /* XXX terminating the thread is probably not right */ -+ break; -+ } -+ pthread_testcancel(); -+ -+ SHASH_FOR_EACH (node, &dp_netdevs) { -+ dp = (struct dp_netdev *)node->data; -+ if (dp->pipe_fd && (dp->pipe_fd->revents & POLLIN)) { -+ VLOG_DBG("Signalled from main thread"); -+ while ((error = read(dp->pipe[1], readbuf, sizeof(readbuf))) > 0) -+ ; -+ if (error < 0 && errno != EAGAIN) { -+ VLOG_ERR("Pipe read error (to datapath): %s", strerror(errno)); -+ } -+ } -+ arg.dp = dp; -+ pthread_mutex_lock(&dp->port_list_mutex); -+ LIST_FOR_EACH (port, node, &dp->port_list) { -+ arg.port = port; -+ if (port->poll_fd) { -+ VLOG_DBG("fd %d revents 0x%x", port->poll_fd->fd, port->poll_fd->revents); -+ } -+ if (port->poll_fd && (port->poll_fd->revents & POLLIN)) { -+ /* call the dispatch and process the packet into -+ * its callback. We process 'batch' packets at time */ -+ processed = netdev_dispatch(port->netdev, batch, -+ process_pkt, (u_char *)&arg); -+ if (processed < 0) { /* pcap returns error */ -+ static struct vlog_rate_limit rl = -+ VLOG_RATE_LIMIT_INIT(1, 5); -+ VLOG_ERR_RL(&rl, -+ "error receiving data from XXX \n"); -+ } -+ } /* end of if poll */ -+ } /* end of port loop */ -+ pthread_mutex_unlock(&dp->port_list_mutex); -+ } /* end of dp loop */ -+ } /* for ;; */ -+ -+ return NULL; -+} -+ -+#endif /* THREADED */ - - static void - dp_netdev_set_dl(struct ofpbuf *packet, const struct ovs_key_ethernet *eth_key) -@@ -1068,11 +1431,19 @@ dp_netdev_output_port(struct dp_netdev *dp, struct ofpbuf *packet, - uint16_t out_port) - { - struct dp_netdev_port *p = dp->ports[out_port]; -+ char c = 0; -+ - if (p) { - netdev_send(p->netdev, packet); -+#ifdef THREADED -+ if (write(dp->pipe[0], &c, 1) < 0) { -+ VLOG_ERR("Pipe write error (to datapath): %s", strerror(errno)); -+ } -+#endif - } - } - -+/* In THREADED mode, must be called with table_lock_mutex held. */ - static int - dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet, - int queue_no, const struct flow *flow, uint64_t arg) -@@ -1081,6 +1452,9 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet, - struct dpif_upcall *upcall; - struct ofpbuf *buf; - size_t key_len; -+#ifdef THREADED -+ char c = 0; -+#endif - - if (q->head - q->tail >= MAX_QUEUE_LEN) { - dp->n_lost++; -@@ -1102,6 +1476,12 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet, - upcall->userdata = arg; - - q->upcalls[q->head++ & QUEUE_MASK] = upcall; -+#ifdef THREADED -+ /* Write a byte on the pipe to advertise that a packet is ready. */ -+ if (write(dp->pipe[1], &c, 1) < 0) { -+ VLOG_ERR("Pipe write error (from datapath): %s", strerror(errno)); -+ } -+#endif - - return 0; - } -@@ -1150,7 +1530,13 @@ dp_netdev_action_userspace(struct dp_netdev *dp, - - userdata_attr = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA); - userdata = userdata_attr ? nl_attr_get_u64(userdata_attr) : 0; -+#ifdef THREADED -+ pthread_mutex_lock(&dp->table_mutex); -+#endif - dp_netdev_output_userspace(dp, packet, DPIF_UC_ACTION, key, userdata); -+#ifdef THREADED -+ pthread_mutex_unlock(&dp->table_mutex); -+#endif - } - - static void -diff --git lib/netdev-bsd.c lib/netdev-bsd.c -index 0b1a37c..ff79367 100644 ---- lib/netdev-bsd.c -+++ lib/netdev-bsd.c -@@ -667,6 +667,89 @@ netdev_bsd_recv_wait(struct netdev *netdev_) - } - } - -+#ifdef THREADED -+ -+struct dispatch_arg { -+ pkt_handler h; -+ u_char *user; -+}; -+ -+static void -+dispatch_handler(u_char *user, const struct pcap_pkthdr *phdr, const u_char *pdata) -+{ -+ struct ofpbuf buf; -+ struct dispatch_arg *parg = (struct dispatch_arg*)user; -+ -+ ofpbuf_use_stub(&buf, (void*)pdata, phdr->caplen); -+ buf.size = phdr->caplen; -+ (*parg->h)(parg->user, &buf); -+ ofpbuf_uninit(&buf); -+} -+ -+static int -+netdev_bsd_dispatch_system(struct netdev_bsd *netdev, int batch, pkt_handler h, -+ u_char *user) -+{ -+ int ret; -+ struct dispatch_arg arg; -+ -+ arg.h = h; -+ arg.user = user; -+ ret = pcap_dispatch(netdev->pcap_handle, batch, dispatch_handler, (u_char*)&arg); -+ return ret; -+} -+ -+static int -+netdev_bsd_dispatch_tap(struct netdev_bsd *netdev, int batch, pkt_handler h, -+ u_char *user) -+{ -+ int ret; -+ int i; -+ const size_t size = VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX; -+ OFPBUF_STACK_BUFFER(buf_, size); -+ -+ struct ofpbuf buf; -+ ofpbuf_use_stub(&buf, buf_, size); -+ for (i = 0; i < batch; i++) { -+ ret = netdev_bsd_recv_tap(netdev, buf.data, ofpbuf_tailroom(&buf)); -+ if (ret >= 0) { -+ buf.size += ret; -+ h(user, &buf); -+ } else if (ret != -EAGAIN) { -+ return -1; -+ } else { /* ret = EAGAIN */ -+ break; -+ } -+ ofpbuf_clear(&buf); -+ } -+ ofpbuf_uninit(&buf); -+ return i; -+} -+ -+static int -+netdev_bsd_dispatch(struct netdev *netdev_, int batch, pkt_handler h, -+ u_char *user) -+{ -+ struct netdev_bsd *netdev = netdev_bsd_cast(netdev_); -+ struct netdev_dev_bsd * netdev_dev = -+ netdev_dev_bsd_cast(netdev_get_dev(netdev_)); -+ -+ if (!strcmp(netdev_get_type(netdev_), "tap") && -+ netdev->netdev_fd == netdev_dev->tap_fd) { -+ return netdev_bsd_dispatch_tap(netdev, batch, h, user); -+ } else { -+ return netdev_bsd_dispatch_system(netdev, batch, h, user); -+ } -+} -+ -+static int -+netdev_bsd_get_fd(struct netdev *netdev_) -+{ -+ struct netdev_bsd *netdev = netdev_bsd_cast(netdev_); -+ return netdev->netdev_fd; -+} -+#endif -+ - /* Discards all packets waiting to be received from 'netdev'. */ - static int - netdev_bsd_drain(struct netdev *netdev_) -@@ -1263,6 +1346,10 @@ const struct netdev_class netdev_bsd_class = { - - netdev_bsd_recv, - netdev_bsd_recv_wait, -+#ifdef THREADED -+ netdev_bsd_dispatch, -+ netdev_bsd_get_fd, -+#endif - netdev_bsd_drain, - - netdev_bsd_send, -@@ -1323,6 +1410,10 @@ const struct netdev_class netdev_tap_class = { - - netdev_bsd_recv, - netdev_bsd_recv_wait, -+#ifdef THREADED -+ netdev_bsd_dispatch, -+ netdev_bsd_get_fd, -+#endif - netdev_bsd_drain, - - netdev_bsd_send, -diff --git lib/netdev-dummy.c lib/netdev-dummy.c -index b8c23c5..4e4801c 100644 ---- lib/netdev-dummy.c -+++ lib/netdev-dummy.c -@@ -20,6 +20,12 @@ - - #include <errno.h> - -+#ifdef THREADED -+#include <pthread.h> -+#include <unistd.h> -+#include "socket-util.h" -+#endif -+ - #include "flow.h" - #include "list.h" - #include "netdev-provider.h" -@@ -51,6 +57,10 @@ struct netdev_dummy { - struct list node; /* In netdev_dev_dummy's "devs" list. */ - struct list recv_queue; - bool listening; -+#ifdef THREADED -+ pthread_mutex_t queue_mutex; -+ int s_pipe[2]; /* used to signal packet arrivals */ -+#endif - }; - - static struct shash dummy_netdev_devs = SHASH_INITIALIZER(&dummy_netdev_devs); -@@ -124,11 +134,30 @@ netdev_dummy_open(struct netdev_dev *netdev_dev_, struct netdev **netdevp) - { - struct netdev_dev_dummy *netdev_dev = netdev_dev_dummy_cast(netdev_dev_); - struct netdev_dummy *netdev; -+#ifdef THREADED -+ int error; -+#endif - - netdev = xmalloc(sizeof *netdev); - netdev_init(&netdev->netdev, netdev_dev_); - list_init(&netdev->recv_queue); - netdev->listening = false; -+#ifdef THREADED -+ error = pipe(netdev->s_pipe); -+ if (error) { -+ VLOG_ERR("Unable to create dummy pipe: %s", strerror(errno)); -+ free(netdev); -+ return errno; -+ } -+ if (set_nonblocking(netdev->s_pipe[0]) || -+ set_nonblocking(netdev->s_pipe[1])) { -+ VLOG_ERR("Unable to set nonblocking on dummy pipe: %s", -+ strerror(errno)); -+ free(netdev); -+ return errno; -+ } -+ pthread_mutex_init(&netdev->queue_mutex, NULL); -+#endif - - *netdevp = &netdev->netdev; - list_push_back(&netdev_dev->devs, &netdev->node); -@@ -141,6 +170,13 @@ netdev_dummy_close(struct netdev *netdev_) - struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); - list_remove(&netdev->node); - ofpbuf_list_delete(&netdev->recv_queue); -+#ifdef THREADED -+ if (netdev->listening) { -+ close(netdev->s_pipe[0]); -+ close(netdev->s_pipe[1]); -+ } -+ pthread_mutex_destroy(&netdev->queue_mutex); -+#endif - free(netdev); - } - -@@ -158,12 +194,29 @@ netdev_dummy_recv(struct netdev *netdev_, void *buffer, size_t size) - struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); - struct ofpbuf *packet; - size_t packet_size; -+#ifdef THREADED -+ char c; -+#endif - -+#ifdef THREADED -+ pthread_mutex_lock(&netdev->queue_mutex); -+#endif - if (list_is_empty(&netdev->recv_queue)) { -+#ifdef THREADED -+ pthread_mutex_unlock(&netdev->queue_mutex); -+#endif - return -EAGAIN; - } -+#ifdef THREADED -+ if (read(netdev->s_pipe[0], &c, 1) < 0) { -+ VLOG_ERR("Error reading dummy pipe: %s", strerror(errno)); -+ } -+#endif - - packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue)); -+#ifdef THREADED -+ pthread_mutex_unlock(&netdev->queue_mutex); -+#endif - if (packet->size > size) { - return -EMSGSIZE; - } -@@ -179,11 +232,60 @@ static void - netdev_dummy_recv_wait(struct netdev *netdev_) - { - struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); -- if (!list_is_empty(&netdev->recv_queue)) { -+ int empty; -+ -+#ifdef THREADED -+ pthread_mutex_lock(&netdev->queue_mutex); -+#endif -+ empty = list_is_empty(&netdev->recv_queue); -+#ifdef THREADED -+ pthread_mutex_unlock(&netdev->queue_mutex); -+#endif -+ if (!empty) { - poll_immediate_wake(); - } - } - -+#ifdef THREADED -+static int -+netdev_dummy_dispatch(struct netdev *netdev_, int batch, pkt_handler h, -+ u_char *user) -+{ -+ int i; -+ struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); -+ struct ofpbuf *packet; -+ VLOG_DBG("dispatch %d", batch); -+ -+ for (i = 0; i < batch; i++) { -+ char c; -+ if (read(netdev->s_pipe[0], &c, 1) < 0) { -+ if (errno == EAGAIN) -+ break; -+ VLOG_ERR("%s: error reading from the pipe: %s", -+ netdev_get_name(netdev_), strerror(errno)); -+ return -1; -+ } -+ pthread_mutex_lock(&netdev->queue_mutex); -+ if (list_is_empty(&netdev->recv_queue)) { -+ pthread_mutex_unlock(&netdev->queue_mutex); -+ return -EAGAIN; -+ } -+ packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue)); -+ pthread_mutex_unlock(&netdev->queue_mutex); -+ h(user, packet); -+ ofpbuf_delete(packet); -+ } -+ return i; -+} -+ -+static int -+netdev_dummy_get_fd(struct netdev *netdev_) -+{ -+ struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); -+ return netdev->s_pipe[0]; -+} -+#endif -+ - static int - netdev_dummy_drain(struct netdev *netdev_) - { -@@ -316,6 +418,10 @@ static const struct netdev_class dummy_class = { - netdev_dummy_listen, - netdev_dummy_recv, - netdev_dummy_recv_wait, -+#ifdef THREADED -+ netdev_dummy_dispatch, /* dispatch */ -+ netdev_dummy_get_fd, /* get_fd */ -+#endif - netdev_dummy_drain, - - NULL, /* send */ -@@ -407,6 +513,9 @@ netdev_dummy_receive(struct unixctl_conn *conn, - struct netdev_dev_dummy *dummy_dev; - int n_listeners; - int i; -+#ifdef THREADED -+ char c = 0; -+#endif - - dummy_dev = shash_find_data(&dummy_netdev_devs, argv[1]); - if (!dummy_dev) { -@@ -414,6 +523,7 @@ netdev_dummy_receive(struct unixctl_conn *conn, - return; - } - -+ - n_listeners = 0; - for (i = 2; i < argc; i++) { - struct netdev_dummy *dev; -@@ -429,7 +539,16 @@ netdev_dummy_receive(struct unixctl_conn *conn, - LIST_FOR_EACH (dev, node, &dummy_dev->devs) { - if (dev->listening) { - struct ofpbuf *copy = ofpbuf_clone(packet); -+#ifdef THREADED -+ pthread_mutex_lock(&dev->queue_mutex); -+#endif - list_push_back(&dev->recv_queue, ©->list_node); -+#ifdef THREADED -+ pthread_mutex_unlock(&dev->queue_mutex); -+ if (write(dev->s_pipe[1], &c, 1) < 0) { -+ VLOG_ERR("Error writing dummy pipe: %s", strerror(errno)); -+ } -+#endif - n_listeners++; - } - } -diff --git lib/netdev-linux.c lib/netdev-linux.c -index 4d2f3ac..c33a801 100644 ---- lib/netdev-linux.c -+++ lib/netdev-linux.c -@@ -891,6 +891,43 @@ netdev_linux_recv_wait(struct netdev *netdev_) - } - } - -+#ifdef THREADED -+static int -+netdev_linux_dispatch(struct netdev *netdev_, int batch, pkt_handler h, -+ u_char *user) -+{ -+ int ret; -+ int i; -+ const size_t size = VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX; -+ OFPBUF_STACK_BUFFER(buf_, size); -+ struct ofpbuf buf; -+ VLOG_DBG("dispatch %d", batch); -+ -+ ofpbuf_use_stub(&buf, buf_, size); -+ for (i = 0; i < batch; i++) { -+ ret = netdev_linux_recv(netdev_, buf.data, ofpbuf_tailroom(&buf)); -+ if (ret >= 0) { -+ buf.size += ret; -+ h(user, &buf); -+ } else if (ret != -EAGAIN) { -+ return -1; -+ } else { -+ break; -+ } -+ ofpbuf_clear(&buf); -+ } -+ ofpbuf_uninit(&buf); -+ return i; -+} -+ -+static int -+netdev_linux_get_fd(struct netdev *netdev_) -+{ -+ struct netdev_linux *netdev = netdev_linux_cast(netdev_); -+ return netdev->fd; -+} -+#endif -+ - /* Discards all packets waiting to be received from 'netdev'. */ - static int - netdev_linux_drain(struct netdev *netdev_) -@@ -2376,6 +2413,12 @@ netdev_linux_change_seq(const struct netdev *netdev) - return netdev_dev_linux_cast(netdev_get_dev(netdev))->change_seq; - } - -+#ifdef THREADED -+#define THREADED_NULL NULL, NULL, -+#else -+#define THREADED_NULL -+#endif -+ - #define NETDEV_LINUX_CLASS(NAME, CREATE, GET_STATS, SET_STATS, \ - GET_FEATURES, GET_STATUS) \ - { \ -@@ -2396,6 +2439,7 @@ netdev_linux_change_seq(const struct netdev *netdev) - netdev_linux_listen, \ - netdev_linux_recv, \ - netdev_linux_recv_wait, \ -+ THREADED_NULL \ - netdev_linux_drain, \ - \ - netdev_linux_send, \ -diff --git lib/netdev-provider.h lib/netdev-provider.h -index 2a91f05..ee4e757 100644 ---- lib/netdev-provider.h -+++ lib/netdev-provider.h -@@ -24,6 +24,9 @@ - #include "netdev.h" - #include "list.h" - #include "shash.h" -+#ifdef THREADED -+#include "dispatch.h" -+#endif - - #ifdef __cplusplus - extern "C" { -@@ -190,6 +193,22 @@ struct netdev_class { - * implement packet reception through the 'recv' member function. */ - void (*recv_wait)(struct netdev *netdev); - -+#ifdef THREADED -+ /* Attempts to receive 'batch' packets from 'netdev' and process them -+ * through the 'handler' callback. This function is used in the 'THREADED' -+ * version in order to optimize the forwarding process, since it permits to -+ * process packets directly in the netdev memory. -+ * -+ * Returns the number of packets processed on success; this can be 0 if no -+ * packets are available to be read. Returns -1 if an error occurred. -+ */ -+ int (*dispatch)(struct netdev *netdev, int batch, pkt_handler handler, -+ u_char *user); -+ -+ /* Return the file descriptor of the device */ -+ int (*get_fd)(struct netdev *netdev); -+#endif -+ - /* Discards all packets waiting to be received from 'netdev'. - * - * May be null if not needed, such as for a network device that does not -diff --git lib/netdev-vport.c lib/netdev-vport.c -index 1721f6b..39b26de 100644 ---- lib/netdev-vport.c -+++ lib/netdev-vport.c -@@ -889,6 +889,13 @@ unparse_patch_config(const char *name OVS_UNUSED, const char *type OVS_UNUSED, - return 0; - } - -+ -+#ifdef THREADED -+# define THREADED_NULL NULL, NULL, -+#else -+# define THREADED_NULL -+#endif -+ - #define VPORT_FUNCTIONS(GET_STATUS) \ - NULL, \ - netdev_vport_run, \ -@@ -905,6 +912,7 @@ unparse_patch_config(const char *name OVS_UNUSED, const char *type OVS_UNUSED, - NULL, /* listen */ \ - NULL, /* recv */ \ - NULL, /* recv_wait */ \ -+ THREADED_NULL \ - NULL, /* drain */ \ - \ - netdev_vport_send, /* send */ \ -diff --git lib/netdev.c lib/netdev.c -index be7cdd2..0c54e1e 100644 ---- lib/netdev.c -+++ lib/netdev.c -@@ -423,6 +423,28 @@ netdev_recv_wait(struct netdev *netdev) - } - } - -+#ifdef THREADED -+/* Attempts to receive and process 'batch' packets from 'netdev'. */ -+int -+netdev_dispatch(struct netdev *netdev, int batch, pkt_handler h, u_char *user) -+{ -+ int (*dispatch)(struct netdev*, int, pkt_handler, u_char *); -+ -+ dispatch = netdev_get_dev(netdev)->netdev_class->dispatch; -+ return dispatch ? dispatch(netdev, batch, h, user) : 0; -+} -+ -+/* Returns the file descriptor */ -+int -+netdev_get_fd(struct netdev *netdev) -+{ -+ int (*get_fd)(struct netdev *); -+ -+ get_fd = netdev_get_dev(netdev)->netdev_class->get_fd; -+ return get_fd ? get_fd(netdev) : 0; -+} -+#endif -+ - /* Discards all packets waiting to be received from 'netdev'. */ - int - netdev_drain(struct netdev *netdev) -diff --git lib/netdev.h lib/netdev.h -index 4b86c21..4fad796 100644 ---- lib/netdev.h -+++ lib/netdev.h -@@ -21,6 +21,9 @@ - #include <stddef.h> - #include <stdint.h> - #include "openvswitch/types.h" -+#ifdef THREADED -+#include "dispatch.h" -+#endif - - #ifdef __cplusplus - extern "C" { -@@ -107,6 +110,10 @@ int netdev_get_ifindex(const struct netdev *); - int netdev_listen(struct netdev *); - int netdev_recv(struct netdev *, struct ofpbuf *); - void netdev_recv_wait(struct netdev *); -+#ifdef THREADED -+int netdev_dispatch(struct netdev *, int, pkt_handler, u_char *); -+int netdev_get_fd(struct netdev *); -+#endif - int netdev_drain(struct netdev *); - - int netdev_send(struct netdev *, const struct ofpbuf *); -diff --git lib/vlog.c lib/vlog.c -index 899072e..b6bd4ef 100644 ---- lib/vlog.c -+++ lib/vlog.c -@@ -34,6 +34,9 @@ - #include "timeval.h" - #include "unixctl.h" - #include "util.h" -+#ifdef THREADED -+#include <pthread.h> -+#endif - - VLOG_DEFINE_THIS_MODULE(vlog); - -@@ -89,6 +92,10 @@ static FILE *log_file; - /* vlog initialized? */ - static bool vlog_inited; - -+#ifdef THREADED -+static pthread_mutex_t vlog_mutex; -+#endif -+ - static void format_log_message(const struct vlog_module *, enum vlog_level, - enum vlog_facility, unsigned int msg_num, - const char *message, va_list, struct ds *) -@@ -484,6 +491,9 @@ vlog_init(void) - return; - } - vlog_inited = true; -+#ifdef THREADED -+ pthread_mutex_init(&vlog_mutex, NULL); -+#endif - - /* openlog() is allowed to keep the pointer passed in, without making a - * copy. The daemonize code sometimes frees and replaces 'program_name', -@@ -691,6 +701,9 @@ vlog_valist(const struct vlog_module *module, enum vlog_level level, - - ds_init(&s); - ds_reserve(&s, 1024); -+#ifdef THREADED -+ pthread_mutex_lock(&vlog_mutex); -+#endif - msg_num++; - - if (log_to_console) { -@@ -721,6 +734,9 @@ vlog_valist(const struct vlog_module *module, enum vlog_level level, - fflush(log_file); - } - -+#ifdef THREADED -+ pthread_mutex_unlock(&vlog_mutex); -+#endif - ds_destroy(&s); - errno = save_errno; - } -diff --git m4/openvswitch.m4 m4/openvswitch.m4 -index dca9f5f..084ceff 100644 ---- m4/openvswitch.m4 -+++ m4/openvswitch.m4 -@@ -14,6 +14,25 @@ - # See the License for the specific language governing permissions and - # limitations under the License. - -+dnl Check for --enable-threaded and updates CFLAGS. -+AC_DEFUN([OVS_CHECK_THREADED], -+ [AC_REQUIRE([AC_PROG_CC]) -+ AC_ARG_ENABLE( -+ [threaded], -+ [AC_HELP_STRING([--enable-threaded], -+ [Enable threaded version of userspace implementation])], -+ [case "${enableval}" in -+ (yes) threaded=true ;; -+ (no) threaded=false ;; -+ (*) AC_MSG_ERROR([bad value ${enableval} for --enable-threaded]) ;; -+ esac], -+ [threaded=false]) -+ if $threaded; then -+ AC_DEFINE([THREADED], [1], -+ [Define to 1 if the threaded version of userspace -+ implementation is enabled.]) -+ fi]) -+ - dnl Checks for --enable-coverage and updates CFLAGS and LDFLAGS appropriately. - AC_DEFUN([OVS_CHECK_COVERAGE], - [AC_REQUIRE([AC_PROG_CC]) |