summaryrefslogtreecommitdiff
path: root/net/openvswitch/files/threaded.diff
diff options
context:
space:
mode:
Diffstat (limited to 'net/openvswitch/files/threaded.diff')
-rw-r--r--net/openvswitch/files/threaded.diff1292
1 files changed, 0 insertions, 1292 deletions
diff --git a/net/openvswitch/files/threaded.diff b/net/openvswitch/files/threaded.diff
deleted file mode 100644
index a3bd811ca151..000000000000
--- a/net/openvswitch/files/threaded.diff
+++ /dev/null
@@ -1,1292 +0,0 @@
-diff --git configure.ac configure.ac
-index 5692b86..ff62627 100644
---- configure.ac
-+++ configure.ac
-@@ -43,6 +43,7 @@ AC_SEARCH_LIBS([clock_gettime], [rt])
- AC_SEARCH_LIBS([timer_create], [rt])
- AC_SEARCH_LIBS([pcap_open_live], [pcap])
-
-+OVS_CHECK_THREADED
- OVS_CHECK_COVERAGE
- OVS_CHECK_NDEBUG
- OVS_CHECK_NETLINK
-diff --git lib/automake.mk lib/automake.mk
-index 13622b3..87bdd8d 100644
---- lib/automake.mk
-+++ lib/automake.mk
-@@ -37,6 +37,7 @@ lib_libopenvswitch_a_SOURCES = \
- lib/daemon.c \
- lib/daemon.h \
- lib/dhcp.h \
-+ lib/dispatch.h \
- lib/dummy.c \
- lib/dummy.h \
- lib/dhparams.h \
-diff --git lib/dispatch.h lib/dispatch.h
-new file mode 100644
-index 0000000..80ac9c7
---- /dev/null
-+++ lib/dispatch.h
-@@ -0,0 +1,9 @@
-+#include <sys/types.h>
-+#include "ofpbuf.h"
-+
-+#ifndef DISPATCH_H
-+#define DISPATCH_H 1
-+
-+typedef void (*pkt_handler)(u_char *user, struct ofpbuf* buf);
-+
-+#endif /* DISPATCH_H */
-diff --git lib/dpif-netdev.c lib/dpif-netdev.c
-index cade79e..509e2ef 100644
---- lib/dpif-netdev.c
-+++ lib/dpif-netdev.c
-@@ -32,6 +32,15 @@
- #include <sys/stat.h>
- #include <unistd.h>
-
-+#ifdef THREADED
-+#include <signal.h>
-+#include <pthread.h>
-+
-+#include "socket-util.h"
-+#include "fatal-signal.h"
-+#include "dispatch.h"
-+#endif
-+
- #include "csum.h"
- #include "dpif.h"
- #include "dpif-provider.h"
-@@ -55,6 +64,16 @@
- #include "vlog.h"
-
- VLOG_DEFINE_THIS_MODULE(dpif_netdev);
-+/* We could use these macros instead of using #ifdef and #endif every time we
-+ * need to call the pthread_mutex_lock/unlock.
-+#ifdef THREADED
-+#define LOCK(mutex) pthread_mutex_lock(mutex)
-+#define UNLOCK(mutex) pthread_mutex_unlock(mutex)
-+#else
-+#define LOCK(mutex)
-+#define UNLOCK(mutex)
-+#endif
-+*/
-
- /* Configuration parameters. */
- enum { MAX_PORTS = 256 }; /* Maximum number of ports. */
-@@ -82,6 +101,21 @@ struct dp_netdev {
- int open_cnt;
- bool destroyed;
-
-+#ifdef THREADED
-+ /* The pipe is used to signal the presence of a packet on the queue.
-+ * - dpif_netdev_recv_wait() waits on p[0]
-+ * - dpif_netdev_recv() extract from queue and read p[0]
-+ * - dp_netdev_output_control() send to queue and write p[1]
-+ */
-+
-+ int pipe[2]; /* signal a packet on the queue */
-+ struct pollfd *pipe_fd;
-+
-+ pthread_mutex_t table_mutex; /* mutex for the flow table */
-+ pthread_mutex_t port_list_mutex; /* port list mutex */
-+
-+ /* The access to this queue is protected by the table_mutex mutex */
-+#endif
- struct dp_netdev_queue queues[N_QUEUES];
- struct hmap flow_table; /* Flow table. */
-
-@@ -102,6 +136,9 @@ struct dp_netdev_port {
- struct list node; /* Element in dp_netdev's 'port_list'. */
- struct netdev *netdev;
- char *type; /* Port type as requested by user. */
-+#ifdef THREADED
-+ struct pollfd *poll_fd; /* To manage the poll loop in the thread. */
-+#endif
- };
-
- /* A flow in dp_netdev's 'flow_table'. */
-@@ -127,6 +164,11 @@ struct dpif_netdev {
- unsigned int dp_serial;
- };
-
-+#ifdef THREADED
-+/* XXX global Descriptor of the thread that manages the datapaths. */
-+pthread_t thread_p;
-+#endif
-+
- /* All netdev-based datapaths. */
- static struct shash dp_netdevs = SHASH_INITIALIZER(&dp_netdevs);
-
-@@ -204,6 +246,23 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
- dp->class = class;
- dp->name = xstrdup(name);
- dp->open_cnt = 0;
-+#ifdef THREADED
-+ error = pipe(dp->pipe);
-+ if (error) {
-+ VLOG_ERR("Unable to create datapath thread pipe: %s", strerror(errno));
-+ return errno;
-+ }
-+ if (set_nonblocking(dp->pipe[0]) || set_nonblocking(dp->pipe[1])) {
-+ VLOG_ERR("Unable to set nonblocking on datapath thread pipe: %s",
-+ strerror(errno));
-+ return errno;
-+ }
-+ dp->pipe_fd = NULL;
-+ VLOG_DBG("Datapath thread pipe created (%d, %d)", dp->pipe[0], dp->pipe[1]);
-+
-+ pthread_mutex_init(&dp->table_mutex, NULL);
-+ pthread_mutex_init(&dp->port_list_mutex, NULL);
-+#endif
- for (i = 0; i < N_QUEUES; i++) {
- dp->queues[i].head = dp->queues[i].tail = 0;
- }
-@@ -221,6 +280,38 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
- return 0;
- }
-
-+#ifdef THREADED
-+static void * dp_thread_body(void *args OVS_UNUSED);
-+
-+/* This is the function that is called in response of a fatal signal (e.g.
-+ * SIGTERM) */
-+static void
-+dpif_netdev_exit_hook(void *aux OVS_UNUSED)
-+{
-+ if (pthread_cancel(thread_p) == 0) {
-+ pthread_join(thread_p, NULL);
-+ }
-+}
-+
-+static int
-+dpif_netdev_init(void)
-+{
-+ static int error = -1;
-+
-+ if (error < 0) {
-+ fatal_signal_add_hook(dpif_netdev_exit_hook, NULL, NULL, true);
-+ error = pthread_create(&thread_p, NULL, dp_thread_body, NULL);
-+ if (error != 0) {
-+ VLOG_ERR("Unable to create datapath thread: %s", strerror(errno));
-+ error = errno;
-+ } else {
-+ VLOG_DBG("Datapath thread started");
-+ }
-+ }
-+ return error;
-+}
-+#endif
-+
- static int
- dpif_netdev_open(const struct dpif_class *class, const char *name,
- bool create, struct dpif **dpifp)
-@@ -247,9 +338,14 @@ dpif_netdev_open(const struct dpif_class *class, const char *name,
- }
-
- *dpifp = create_dpif_netdev(dp);
-+#ifdef THREADED
-+ dpif_netdev_init();
-+#endif
- return 0;
- }
-
-+/* table_mutex must be locked in THREADED mode.
-+ */
- static void
- dp_netdev_purge_queues(struct dp_netdev *dp)
- {
-@@ -273,11 +369,23 @@ dp_netdev_free(struct dp_netdev *dp)
- struct dp_netdev_port *port, *next;
-
- dp_netdev_flow_flush(dp);
-+#ifdef THREADED
-+ pthread_mutex_lock(&dp->port_list_mutex);
-+#endif
- LIST_FOR_EACH_SAFE (port, next, node, &dp->port_list) {
- do_del_port(dp, port->port_no);
- }
-+#ifdef THREADED
-+ pthread_mutex_unlock(&dp->port_list_mutex);
-+ pthread_mutex_lock(&dp->table_mutex);
-+#endif
- dp_netdev_purge_queues(dp);
- hmap_destroy(&dp->flow_table);
-+#ifdef THREADED
-+ pthread_mutex_unlock(&dp->table_mutex);
-+ pthread_mutex_destroy(&dp->table_mutex);
-+ pthread_mutex_destroy(&dp->port_list_mutex);
-+#endif
- free(dp->name);
- free(dp);
- }
-@@ -306,7 +414,13 @@ static int
- dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
- {
- struct dp_netdev *dp = get_dp_netdev(dpif);
-+#ifdef THREADED
-+ pthread_mutex_lock(&dp->table_mutex);
-+#endif
- stats->n_flows = hmap_count(&dp->flow_table);
-+#ifdef THREADED
-+ pthread_mutex_unlock(&dp->table_mutex);
-+#endif
- stats->n_hit = dp->n_hit;
- stats->n_missed = dp->n_missed;
- stats->n_lost = dp->n_lost;
-@@ -354,13 +468,22 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
- port->port_no = port_no;
- port->netdev = netdev;
- port->type = xstrdup(type);
-+#ifdef THREADED
-+ port->poll_fd = NULL;
-+#endif
-
- error = netdev_get_mtu(netdev, &mtu);
- if (!error) {
- max_mtu = mtu;
- }
-
-+#ifdef THREADED
-+ pthread_mutex_lock(&dp->port_list_mutex);
-+#endif
- list_push_back(&dp->port_list, &port->node);
-+#ifdef THREADED
-+ pthread_mutex_unlock(&dp->port_list_mutex);
-+#endif
- dp->ports[port_no] = port;
- dp->serial++;
-
-@@ -448,15 +571,25 @@ get_port_by_name(struct dp_netdev *dp,
- {
- struct dp_netdev_port *port;
-
-+#ifdef THREADED
-+ pthread_mutex_lock(&dp->port_list_mutex);
-+#endif
- LIST_FOR_EACH (port, node, &dp->port_list) {
- if (!strcmp(netdev_get_name(port->netdev), devname)) {
- *portp = port;
-+#ifdef THREADED
-+ pthread_mutex_unlock(&dp->port_list_mutex);
-+#endif
- return 0;
- }
- }
-+#ifdef THREADED
-+ pthread_mutex_unlock(&dp->port_list_mutex);
-+#endif
- return ENOENT;
- }
-
-+/* In THREADED mode, must be called with port_list_mutex held. */
- static int
- do_del_port(struct dp_netdev *dp, uint16_t port_no)
- {
-@@ -531,7 +664,13 @@ dpif_netdev_get_max_ports(const struct dpif *dpif OVS_UNUSED)
- static void
- dp_netdev_free_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow)
- {
-+#ifdef THREADED
-+ pthread_mutex_lock(&dp->table_mutex);
-+#endif
- hmap_remove(&dp->flow_table, &flow->node);
-+#ifdef THREADED
-+ pthread_mutex_unlock(&dp->table_mutex);
-+#endif
- free(flow->actions);
- free(flow);
- }
-@@ -620,7 +759,11 @@ dpif_netdev_port_poll_wait(const struct dpif *dpif_)
- }
-
- static struct dp_netdev_flow *
--dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key)
-+#ifdef THREADED
-+dp_netdev_lookup_flow_locked(struct dp_netdev *dp, const struct flow *key)
-+#else
-+dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key)
-+#endif
- {
- struct dp_netdev_flow *flow;
-
-@@ -632,6 +775,19 @@ dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key)
- return NULL;
- }
-
-+#ifdef THREADED
-+static struct dp_netdev_flow *
-+dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key)
-+{
-+ struct dp_netdev_flow *flow;
-+
-+ pthread_mutex_lock(&dp->table_mutex);
-+ flow = dp_netdev_lookup_flow_locked(dp, key);
-+ pthread_mutex_unlock(&dp->table_mutex);
-+ return flow;
-+}
-+#endif
-+
- static void
- get_dpif_flow_stats(struct dp_netdev_flow *flow, struct dpif_flow_stats *stats)
- {
-@@ -729,7 +885,13 @@ add_flow(struct dpif *dpif, const struct flow *key,
- return error;
- }
-
-+#ifdef THREADED
-+ pthread_mutex_lock(&dp->table_mutex);
-+#endif
- hmap_insert(&dp->flow_table, &flow->node, flow_hash(&flow->key, 0));
-+#ifdef THREADED
-+ pthread_mutex_unlock(&dp->table_mutex);
-+#endif
- return 0;
- }
-
-@@ -749,6 +911,7 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
- struct dp_netdev_flow *flow;
- struct flow key;
- int error;
-+ int n_flows;
-
- error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &key);
- if (error) {
-@@ -758,7 +921,14 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
- flow = dp_netdev_lookup_flow(dp, &key);
- if (!flow) {
- if (put->flags & DPIF_FP_CREATE) {
-- if (hmap_count(&dp->flow_table) < MAX_FLOWS) {
-+#ifdef THREADED
-+ pthread_mutex_lock(&dp->table_mutex);
-+#endif
-+ n_flows = hmap_count(&dp->flow_table);
-+#ifdef THREADED
-+ pthread_mutex_unlock(&dp->table_mutex);
-+#endif
-+ if (n_flows < MAX_FLOWS) {
- if (put->stats) {
- memset(put->stats, 0, sizeof *put->stats);
- }
-@@ -843,7 +1013,13 @@ dpif_netdev_flow_dump_next(const struct dpif *dpif, void *state_,
- struct dp_netdev_flow *flow;
- struct hmap_node *node;
-
-+#ifdef THREADED
-+ pthread_mutex_lock(&dp->table_mutex);
-+#endif
- node = hmap_at_position(&dp->flow_table, &state->bucket, &state->offset);
-+#ifdef THREADED
-+ pthread_mutex_unlock(&dp->table_mutex);
-+#endif
- if (!node) {
- return EOF;
- }
-@@ -949,7 +1125,13 @@ static int
- dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
- struct ofpbuf *buf)
- {
-- struct dp_netdev_queue *q = find_nonempty_queue(dpif);
-+ struct dp_netdev_queue *q;
-+#ifdef THREADED
-+ struct dp_netdev *dp = get_dp_netdev(dpif);
-+ char c;
-+ pthread_mutex_lock(&dp->table_mutex);
-+#endif
-+ q = find_nonempty_queue(dpif);
- if (q) {
- struct dpif_upcall *u = q->upcalls[q->tail++ & QUEUE_MASK];
- *upcall = *u;
-@@ -958,8 +1140,19 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
- ofpbuf_uninit(buf);
- *buf = *upcall->packet;
-
-+#ifdef THREADED
-+ /* Read a byte from the pipe to signal that a packet has been
-+ * received. */
-+ if (read(dp->pipe[0], &c, 1) < 0) {
-+ VLOG_ERR("Pipe read error (from datapath): %s", strerror(errno));
-+ }
-+ pthread_mutex_unlock(&dp->table_mutex);
-+#endif
- return 0;
- } else {
-+#ifdef THREADED
-+ pthread_mutex_unlock(&dp->table_mutex);
-+#endif
- return EAGAIN;
- }
- }
-@@ -967,19 +1160,32 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
- static void
- dpif_netdev_recv_wait(struct dpif *dpif)
- {
-+#ifdef THREADED
-+ struct dp_netdev *dp = get_dp_netdev(dpif);
-+
-+ poll_fd_wait(dp->pipe[0], POLLIN);
-+#else
- if (find_nonempty_queue(dpif)) {
- poll_immediate_wake();
- } else {
- /* No messages ready to be received, and dp_wait() will ensure that we
- * wake up to queue new messages, so there is nothing to do. */
- }
-+#endif
- }
-
- static void
- dpif_netdev_recv_purge(struct dpif *dpif)
- {
- struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
-+#ifdef THREADED
-+ struct dp_netdev *dp = get_dp_netdev(dpif);
-+ pthread_mutex_lock(&dp->table_mutex);
-+#endif
- dp_netdev_purge_queues(dpif_netdev->dp);
-+#ifdef THREADED
-+ pthread_mutex_unlock(&dp->table_mutex);
-+#endif
- }
-
- static void
-@@ -1003,7 +1209,12 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port,
- return;
- }
- flow_extract(packet, 0, 0, odp_port_to_ofp_port(port->port_no), &key);
-+#ifdef THREADED
-+ pthread_mutex_lock(&dp->table_mutex);
-+ flow = dp_netdev_lookup_flow_locked(dp, &key);
-+#else
- flow = dp_netdev_lookup_flow(dp, &key);
-+#endif
- if (flow) {
- dp_netdev_flow_used(flow, &key, packet);
- dp_netdev_execute_actions(dp, packet, &key,
-@@ -1013,8 +1224,22 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port,
- dp->n_missed++;
- dp_netdev_output_userspace(dp, packet, DPIF_UC_MISS, &key, 0);
- }
-+#ifdef THREADED
-+ pthread_mutex_unlock(&dp->table_mutex);
-+#endif
- }
-
-+#ifdef THREADED
-+static void
-+dpif_netdev_run(struct dpif *dpif OVS_UNUSED)
-+{
-+}
-+
-+static void
-+dpif_netdev_wait(struct dpif *dpif OVS_UNUSED)
-+{
-+}
-+#else
- static void
- dpif_netdev_run(struct dpif *dpif)
- {
-@@ -1053,6 +1278,144 @@ dpif_netdev_wait(struct dpif *dpif)
- netdev_recv_wait(port->netdev);
- }
- }
-+#endif
-+
-+#ifdef THREADED
-+/*
-+ * pcap callback argument
-+ */
-+struct dispatch_arg {
-+ struct dp_netdev *dp; /* update statistics */
-+ struct dp_netdev_port *port; /* argument to flow identifier function */
-+};
-+
-+/* Process a packet.
-+ *
-+ * The port_input function will send immediately if it finds a flow match and
-+ * the associated action is ODPAT_OUTPUT or ODPAT_OUTPUT_GROUP.
-+ * If a flow is not found or for the other actions, the packet is copied.
-+ */
-+static void
-+process_pkt(u_char *user, struct ofpbuf *buf)
-+{
-+ struct dispatch_arg *arg = (struct dispatch_arg *)user;
-+
-+ ofpbuf_padto(buf, ETH_TOTAL_MIN);
-+ dp_netdev_port_input(arg->dp, arg->port, buf);
-+}
-+
-+/* Body of the thread that manages the datapaths */
-+static void*
-+dp_thread_body(void *args OVS_UNUSED)
-+{
-+ struct dp_netdev *dp;
-+ struct dp_netdev_port *port;
-+ struct dispatch_arg arg;
-+ int error;
-+ int n_fds;
-+ uint32_t batch = 50; /* max number of pkts processed by the dispatch */
-+ int processed; /* actual number of pkts processed by the dispatch */
-+ char readbuf[1024];
-+
-+ sigset_t sigmask;
-+
-+ /*XXX Since the poll involves all ports of all datapaths, the right fds
-+ * size should be MAX_PORTS * max_number_of_datapaths */
-+ struct pollfd fds[MAX_PORTS + 1];
-+
-+ /* mask the fatal signals. In this way the main thread is delegate to
-+ * manage this them. */
-+ sigemptyset(&sigmask);
-+ sigaddset(&sigmask, SIGTERM);
-+ sigaddset(&sigmask, SIGALRM);
-+ sigaddset(&sigmask, SIGINT);
-+ sigaddset(&sigmask, SIGHUP);
-+
-+ if (pthread_sigmask(SIG_BLOCK, &sigmask, NULL) != 0) {
-+ VLOG_ERR("Error setting thread sigmask: %s", strerror(errno));
-+ }
-+
-+ for(;;) {
-+ struct shash_node *node;
-+ n_fds = 0;
-+ /* build the structure for poll */
-+ SHASH_FOR_EACH(node, &dp_netdevs) {
-+ dp = (struct dp_netdev *)node->data;
-+ fds[n_fds].fd = dp->pipe[1];
-+ fds[n_fds].events = POLLIN;
-+ dp->pipe_fd = &fds[n_fds];
-+ n_fds++;
-+ if (n_fds >= sizeof(fds) / sizeof(fds[0])) {
-+ VLOG_ERR("Too many fds for poll adding pipe_fd");
-+ break;
-+ }
-+ pthread_mutex_lock(&dp->port_list_mutex);
-+ LIST_FOR_EACH (port, node, &dp->port_list) {
-+ /* insert an element in the fds structure */
-+ fds[n_fds].fd = netdev_get_fd(port->netdev);
-+ fds[n_fds].events = POLLIN;
-+ port->poll_fd = &fds[n_fds];
-+ n_fds++;
-+ if (n_fds >= sizeof(fds) / sizeof(fds[0])) {
-+ VLOG_ERR("Too many fds for poll adding port fd");
-+ break;
-+ }
-+ }
-+ pthread_mutex_unlock(&dp->port_list_mutex);
-+ }
-+
-+ error = poll(fds, n_fds, 2000);
-+ VLOG_DBG("dp_thread_body poll wakeup with cnt=%d", error);
-+
-+ if (error < 0) {
-+ if (errno == EINTR) {
-+ /* XXX get this case in detach mode */
-+ continue;
-+ }
-+ VLOG_ERR("Datapath thread poll() error: %s\n", strerror(errno));
-+ /* XXX terminating the thread is probably not right */
-+ break;
-+ }
-+ pthread_testcancel();
-+
-+ SHASH_FOR_EACH (node, &dp_netdevs) {
-+ dp = (struct dp_netdev *)node->data;
-+ if (dp->pipe_fd && (dp->pipe_fd->revents & POLLIN)) {
-+ VLOG_DBG("Signalled from main thread");
-+ while ((error = read(dp->pipe[1], readbuf, sizeof(readbuf))) > 0)
-+ ;
-+ if (error < 0 && errno != EAGAIN) {
-+ VLOG_ERR("Pipe read error (to datapath): %s", strerror(errno));
-+ }
-+ }
-+ arg.dp = dp;
-+ pthread_mutex_lock(&dp->port_list_mutex);
-+ LIST_FOR_EACH (port, node, &dp->port_list) {
-+ arg.port = port;
-+ if (port->poll_fd) {
-+ VLOG_DBG("fd %d revents 0x%x", port->poll_fd->fd, port->poll_fd->revents);
-+ }
-+ if (port->poll_fd && (port->poll_fd->revents & POLLIN)) {
-+ /* call the dispatch and process the packet into
-+ * its callback. We process 'batch' packets at time */
-+ processed = netdev_dispatch(port->netdev, batch,
-+ process_pkt, (u_char *)&arg);
-+ if (processed < 0) { /* pcap returns error */
-+ static struct vlog_rate_limit rl =
-+ VLOG_RATE_LIMIT_INIT(1, 5);
-+ VLOG_ERR_RL(&rl,
-+ "error receiving data from XXX \n");
-+ }
-+ } /* end of if poll */
-+ } /* end of port loop */
-+ pthread_mutex_unlock(&dp->port_list_mutex);
-+ } /* end of dp loop */
-+ } /* for ;; */
-+
-+ return NULL;
-+}
-+
-+#endif /* THREADED */
-
- static void
- dp_netdev_set_dl(struct ofpbuf *packet, const struct ovs_key_ethernet *eth_key)
-@@ -1068,11 +1431,19 @@ dp_netdev_output_port(struct dp_netdev *dp, struct ofpbuf *packet,
- uint16_t out_port)
- {
- struct dp_netdev_port *p = dp->ports[out_port];
-+ char c = 0;
-+
- if (p) {
- netdev_send(p->netdev, packet);
-+#ifdef THREADED
-+ if (write(dp->pipe[0], &c, 1) < 0) {
-+ VLOG_ERR("Pipe write error (to datapath): %s", strerror(errno));
-+ }
-+#endif
- }
- }
-
-+/* In THREADED mode, must be called with table_lock_mutex held. */
- static int
- dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet,
- int queue_no, const struct flow *flow, uint64_t arg)
-@@ -1081,6 +1452,9 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet,
- struct dpif_upcall *upcall;
- struct ofpbuf *buf;
- size_t key_len;
-+#ifdef THREADED
-+ char c = 0;
-+#endif
-
- if (q->head - q->tail >= MAX_QUEUE_LEN) {
- dp->n_lost++;
-@@ -1102,6 +1476,12 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet,
- upcall->userdata = arg;
-
- q->upcalls[q->head++ & QUEUE_MASK] = upcall;
-+#ifdef THREADED
-+ /* Write a byte on the pipe to advertise that a packet is ready. */
-+ if (write(dp->pipe[1], &c, 1) < 0) {
-+ VLOG_ERR("Pipe write error (from datapath): %s", strerror(errno));
-+ }
-+#endif
-
- return 0;
- }
-@@ -1150,7 +1530,13 @@ dp_netdev_action_userspace(struct dp_netdev *dp,
-
- userdata_attr = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA);
- userdata = userdata_attr ? nl_attr_get_u64(userdata_attr) : 0;
-+#ifdef THREADED
-+ pthread_mutex_lock(&dp->table_mutex);
-+#endif
- dp_netdev_output_userspace(dp, packet, DPIF_UC_ACTION, key, userdata);
-+#ifdef THREADED
-+ pthread_mutex_unlock(&dp->table_mutex);
-+#endif
- }
-
- static void
-diff --git lib/netdev-bsd.c lib/netdev-bsd.c
-index 0b1a37c..ff79367 100644
---- lib/netdev-bsd.c
-+++ lib/netdev-bsd.c
-@@ -667,6 +667,89 @@ netdev_bsd_recv_wait(struct netdev *netdev_)
- }
- }
-
-+#ifdef THREADED
-+
-+struct dispatch_arg {
-+ pkt_handler h;
-+ u_char *user;
-+};
-+
-+static void
-+dispatch_handler(u_char *user, const struct pcap_pkthdr *phdr, const u_char *pdata)
-+{
-+ struct ofpbuf buf;
-+ struct dispatch_arg *parg = (struct dispatch_arg*)user;
-+
-+ ofpbuf_use_stub(&buf, (void*)pdata, phdr->caplen);
-+ buf.size = phdr->caplen;
-+ (*parg->h)(parg->user, &buf);
-+ ofpbuf_uninit(&buf);
-+}
-+
-+static int
-+netdev_bsd_dispatch_system(struct netdev_bsd *netdev, int batch, pkt_handler h,
-+ u_char *user)
-+{
-+ int ret;
-+ struct dispatch_arg arg;
-+
-+ arg.h = h;
-+ arg.user = user;
-+ ret = pcap_dispatch(netdev->pcap_handle, batch, dispatch_handler, (u_char*)&arg);
-+ return ret;
-+}
-+
-+static int
-+netdev_bsd_dispatch_tap(struct netdev_bsd *netdev, int batch, pkt_handler h,
-+ u_char *user)
-+{
-+ int ret;
-+ int i;
-+ const size_t size = VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX;
-+ OFPBUF_STACK_BUFFER(buf_, size);
-+
-+ struct ofpbuf buf;
-+ ofpbuf_use_stub(&buf, buf_, size);
-+ for (i = 0; i < batch; i++) {
-+ ret = netdev_bsd_recv_tap(netdev, buf.data, ofpbuf_tailroom(&buf));
-+ if (ret >= 0) {
-+ buf.size += ret;
-+ h(user, &buf);
-+ } else if (ret != -EAGAIN) {
-+ return -1;
-+ } else { /* ret = EAGAIN */
-+ break;
-+ }
-+ ofpbuf_clear(&buf);
-+ }
-+ ofpbuf_uninit(&buf);
-+ return i;
-+}
-+
-+static int
-+netdev_bsd_dispatch(struct netdev *netdev_, int batch, pkt_handler h,
-+ u_char *user)
-+{
-+ struct netdev_bsd *netdev = netdev_bsd_cast(netdev_);
-+ struct netdev_dev_bsd * netdev_dev =
-+ netdev_dev_bsd_cast(netdev_get_dev(netdev_));
-+
-+ if (!strcmp(netdev_get_type(netdev_), "tap") &&
-+ netdev->netdev_fd == netdev_dev->tap_fd) {
-+ return netdev_bsd_dispatch_tap(netdev, batch, h, user);
-+ } else {
-+ return netdev_bsd_dispatch_system(netdev, batch, h, user);
-+ }
-+}
-+
-+static int
-+netdev_bsd_get_fd(struct netdev *netdev_)
-+{
-+ struct netdev_bsd *netdev = netdev_bsd_cast(netdev_);
-+ return netdev->netdev_fd;
-+}
-+#endif
-+
- /* Discards all packets waiting to be received from 'netdev'. */
- static int
- netdev_bsd_drain(struct netdev *netdev_)
-@@ -1263,6 +1346,10 @@ const struct netdev_class netdev_bsd_class = {
-
- netdev_bsd_recv,
- netdev_bsd_recv_wait,
-+#ifdef THREADED
-+ netdev_bsd_dispatch,
-+ netdev_bsd_get_fd,
-+#endif
- netdev_bsd_drain,
-
- netdev_bsd_send,
-@@ -1323,6 +1410,10 @@ const struct netdev_class netdev_tap_class = {
-
- netdev_bsd_recv,
- netdev_bsd_recv_wait,
-+#ifdef THREADED
-+ netdev_bsd_dispatch,
-+ netdev_bsd_get_fd,
-+#endif
- netdev_bsd_drain,
-
- netdev_bsd_send,
-diff --git lib/netdev-dummy.c lib/netdev-dummy.c
-index b8c23c5..4e4801c 100644
---- lib/netdev-dummy.c
-+++ lib/netdev-dummy.c
-@@ -20,6 +20,12 @@
-
- #include <errno.h>
-
-+#ifdef THREADED
-+#include <pthread.h>
-+#include <unistd.h>
-+#include "socket-util.h"
-+#endif
-+
- #include "flow.h"
- #include "list.h"
- #include "netdev-provider.h"
-@@ -51,6 +57,10 @@ struct netdev_dummy {
- struct list node; /* In netdev_dev_dummy's "devs" list. */
- struct list recv_queue;
- bool listening;
-+#ifdef THREADED
-+ pthread_mutex_t queue_mutex;
-+ int s_pipe[2]; /* used to signal packet arrivals */
-+#endif
- };
-
- static struct shash dummy_netdev_devs = SHASH_INITIALIZER(&dummy_netdev_devs);
-@@ -124,11 +134,30 @@ netdev_dummy_open(struct netdev_dev *netdev_dev_, struct netdev **netdevp)
- {
- struct netdev_dev_dummy *netdev_dev = netdev_dev_dummy_cast(netdev_dev_);
- struct netdev_dummy *netdev;
-+#ifdef THREADED
-+ int error;
-+#endif
-
- netdev = xmalloc(sizeof *netdev);
- netdev_init(&netdev->netdev, netdev_dev_);
- list_init(&netdev->recv_queue);
- netdev->listening = false;
-+#ifdef THREADED
-+ error = pipe(netdev->s_pipe);
-+ if (error) {
-+ VLOG_ERR("Unable to create dummy pipe: %s", strerror(errno));
-+ free(netdev);
-+ return errno;
-+ }
-+ if (set_nonblocking(netdev->s_pipe[0]) ||
-+ set_nonblocking(netdev->s_pipe[1])) {
-+ VLOG_ERR("Unable to set nonblocking on dummy pipe: %s",
-+ strerror(errno));
-+ free(netdev);
-+ return errno;
-+ }
-+ pthread_mutex_init(&netdev->queue_mutex, NULL);
-+#endif
-
- *netdevp = &netdev->netdev;
- list_push_back(&netdev_dev->devs, &netdev->node);
-@@ -141,6 +170,13 @@ netdev_dummy_close(struct netdev *netdev_)
- struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
- list_remove(&netdev->node);
- ofpbuf_list_delete(&netdev->recv_queue);
-+#ifdef THREADED
-+ if (netdev->listening) {
-+ close(netdev->s_pipe[0]);
-+ close(netdev->s_pipe[1]);
-+ }
-+ pthread_mutex_destroy(&netdev->queue_mutex);
-+#endif
- free(netdev);
- }
-
-@@ -158,12 +194,29 @@ netdev_dummy_recv(struct netdev *netdev_, void *buffer, size_t size)
- struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
- struct ofpbuf *packet;
- size_t packet_size;
-+#ifdef THREADED
-+ char c;
-+#endif
-
-+#ifdef THREADED
-+ pthread_mutex_lock(&netdev->queue_mutex);
-+#endif
- if (list_is_empty(&netdev->recv_queue)) {
-+#ifdef THREADED
-+ pthread_mutex_unlock(&netdev->queue_mutex);
-+#endif
- return -EAGAIN;
- }
-+#ifdef THREADED
-+ if (read(netdev->s_pipe[0], &c, 1) < 0) {
-+ VLOG_ERR("Error reading dummy pipe: %s", strerror(errno));
-+ }
-+#endif
-
- packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue));
-+#ifdef THREADED
-+ pthread_mutex_unlock(&netdev->queue_mutex);
-+#endif
- if (packet->size > size) {
- return -EMSGSIZE;
- }
-@@ -179,11 +232,60 @@ static void
- netdev_dummy_recv_wait(struct netdev *netdev_)
- {
- struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
-- if (!list_is_empty(&netdev->recv_queue)) {
-+ int empty;
-+
-+#ifdef THREADED
-+ pthread_mutex_lock(&netdev->queue_mutex);
-+#endif
-+ empty = list_is_empty(&netdev->recv_queue);
-+#ifdef THREADED
-+ pthread_mutex_unlock(&netdev->queue_mutex);
-+#endif
-+ if (!empty) {
- poll_immediate_wake();
- }
- }
-
-+#ifdef THREADED
-+static int
-+netdev_dummy_dispatch(struct netdev *netdev_, int batch, pkt_handler h,
-+ u_char *user)
-+{
-+ int i;
-+ struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
-+ struct ofpbuf *packet;
-+ VLOG_DBG("dispatch %d", batch);
-+
-+ for (i = 0; i < batch; i++) {
-+ char c;
-+ if (read(netdev->s_pipe[0], &c, 1) < 0) {
-+ if (errno == EAGAIN)
-+ break;
-+ VLOG_ERR("%s: error reading from the pipe: %s",
-+ netdev_get_name(netdev_), strerror(errno));
-+ return -1;
-+ }
-+ pthread_mutex_lock(&netdev->queue_mutex);
-+ if (list_is_empty(&netdev->recv_queue)) {
-+ pthread_mutex_unlock(&netdev->queue_mutex);
-+ return -EAGAIN;
-+ }
-+ packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue));
-+ pthread_mutex_unlock(&netdev->queue_mutex);
-+ h(user, packet);
-+ ofpbuf_delete(packet);
-+ }
-+ return i;
-+}
-+
-+static int
-+netdev_dummy_get_fd(struct netdev *netdev_)
-+{
-+ struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
-+ return netdev->s_pipe[0];
-+}
-+#endif
-+
- static int
- netdev_dummy_drain(struct netdev *netdev_)
- {
-@@ -316,6 +418,10 @@ static const struct netdev_class dummy_class = {
- netdev_dummy_listen,
- netdev_dummy_recv,
- netdev_dummy_recv_wait,
-+#ifdef THREADED
-+ netdev_dummy_dispatch, /* dispatch */
-+ netdev_dummy_get_fd, /* get_fd */
-+#endif
- netdev_dummy_drain,
-
- NULL, /* send */
-@@ -407,6 +513,9 @@ netdev_dummy_receive(struct unixctl_conn *conn,
- struct netdev_dev_dummy *dummy_dev;
- int n_listeners;
- int i;
-+#ifdef THREADED
-+ char c = 0;
-+#endif
-
- dummy_dev = shash_find_data(&dummy_netdev_devs, argv[1]);
- if (!dummy_dev) {
-@@ -414,6 +523,7 @@ netdev_dummy_receive(struct unixctl_conn *conn,
- return;
- }
-
-+
- n_listeners = 0;
- for (i = 2; i < argc; i++) {
- struct netdev_dummy *dev;
-@@ -429,7 +539,16 @@ netdev_dummy_receive(struct unixctl_conn *conn,
- LIST_FOR_EACH (dev, node, &dummy_dev->devs) {
- if (dev->listening) {
- struct ofpbuf *copy = ofpbuf_clone(packet);
-+#ifdef THREADED
-+ pthread_mutex_lock(&dev->queue_mutex);
-+#endif
- list_push_back(&dev->recv_queue, &copy->list_node);
-+#ifdef THREADED
-+ pthread_mutex_unlock(&dev->queue_mutex);
-+ if (write(dev->s_pipe[1], &c, 1) < 0) {
-+ VLOG_ERR("Error writing dummy pipe: %s", strerror(errno));
-+ }
-+#endif
- n_listeners++;
- }
- }
-diff --git lib/netdev-linux.c lib/netdev-linux.c
-index 4d2f3ac..c33a801 100644
---- lib/netdev-linux.c
-+++ lib/netdev-linux.c
-@@ -891,6 +891,43 @@ netdev_linux_recv_wait(struct netdev *netdev_)
- }
- }
-
-+#ifdef THREADED
-+static int
-+netdev_linux_dispatch(struct netdev *netdev_, int batch, pkt_handler h,
-+ u_char *user)
-+{
-+ int ret;
-+ int i;
-+ const size_t size = VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX;
-+ OFPBUF_STACK_BUFFER(buf_, size);
-+ struct ofpbuf buf;
-+ VLOG_DBG("dispatch %d", batch);
-+
-+ ofpbuf_use_stub(&buf, buf_, size);
-+ for (i = 0; i < batch; i++) {
-+ ret = netdev_linux_recv(netdev_, buf.data, ofpbuf_tailroom(&buf));
-+ if (ret >= 0) {
-+ buf.size += ret;
-+ h(user, &buf);
-+ } else if (ret != -EAGAIN) {
-+ return -1;
-+ } else {
-+ break;
-+ }
-+ ofpbuf_clear(&buf);
-+ }
-+ ofpbuf_uninit(&buf);
-+ return i;
-+}
-+
-+static int
-+netdev_linux_get_fd(struct netdev *netdev_)
-+{
-+ struct netdev_linux *netdev = netdev_linux_cast(netdev_);
-+ return netdev->fd;
-+}
-+#endif
-+
- /* Discards all packets waiting to be received from 'netdev'. */
- static int
- netdev_linux_drain(struct netdev *netdev_)
-@@ -2376,6 +2413,12 @@ netdev_linux_change_seq(const struct netdev *netdev)
- return netdev_dev_linux_cast(netdev_get_dev(netdev))->change_seq;
- }
-
-+#ifdef THREADED
-+#define THREADED_NULL NULL, NULL,
-+#else
-+#define THREADED_NULL
-+#endif
-+
- #define NETDEV_LINUX_CLASS(NAME, CREATE, GET_STATS, SET_STATS, \
- GET_FEATURES, GET_STATUS) \
- { \
-@@ -2396,6 +2439,7 @@ netdev_linux_change_seq(const struct netdev *netdev)
- netdev_linux_listen, \
- netdev_linux_recv, \
- netdev_linux_recv_wait, \
-+ THREADED_NULL \
- netdev_linux_drain, \
- \
- netdev_linux_send, \
-diff --git lib/netdev-provider.h lib/netdev-provider.h
-index 2a91f05..ee4e757 100644
---- lib/netdev-provider.h
-+++ lib/netdev-provider.h
-@@ -24,6 +24,9 @@
- #include "netdev.h"
- #include "list.h"
- #include "shash.h"
-+#ifdef THREADED
-+#include "dispatch.h"
-+#endif
-
- #ifdef __cplusplus
- extern "C" {
-@@ -190,6 +193,22 @@ struct netdev_class {
- * implement packet reception through the 'recv' member function. */
- void (*recv_wait)(struct netdev *netdev);
-
-+#ifdef THREADED
-+ /* Attempts to receive 'batch' packets from 'netdev' and process them
-+ * through the 'handler' callback. This function is used in the 'THREADED'
-+ * version in order to optimize the forwarding process, since it permits to
-+ * process packets directly in the netdev memory.
-+ *
-+ * Returns the number of packets processed on success; this can be 0 if no
-+ * packets are available to be read. Returns -1 if an error occurred.
-+ */
-+ int (*dispatch)(struct netdev *netdev, int batch, pkt_handler handler,
-+ u_char *user);
-+
-+ /* Return the file descriptor of the device */
-+ int (*get_fd)(struct netdev *netdev);
-+#endif
-+
- /* Discards all packets waiting to be received from 'netdev'.
- *
- * May be null if not needed, such as for a network device that does not
-diff --git lib/netdev-vport.c lib/netdev-vport.c
-index 1721f6b..39b26de 100644
---- lib/netdev-vport.c
-+++ lib/netdev-vport.c
-@@ -889,6 +889,13 @@ unparse_patch_config(const char *name OVS_UNUSED, const char *type OVS_UNUSED,
- return 0;
- }
-
-+
-+#ifdef THREADED
-+# define THREADED_NULL NULL, NULL,
-+#else
-+# define THREADED_NULL
-+#endif
-+
- #define VPORT_FUNCTIONS(GET_STATUS) \
- NULL, \
- netdev_vport_run, \
-@@ -905,6 +912,7 @@ unparse_patch_config(const char *name OVS_UNUSED, const char *type OVS_UNUSED,
- NULL, /* listen */ \
- NULL, /* recv */ \
- NULL, /* recv_wait */ \
-+ THREADED_NULL \
- NULL, /* drain */ \
- \
- netdev_vport_send, /* send */ \
-diff --git lib/netdev.c lib/netdev.c
-index be7cdd2..0c54e1e 100644
---- lib/netdev.c
-+++ lib/netdev.c
-@@ -423,6 +423,28 @@ netdev_recv_wait(struct netdev *netdev)
- }
- }
-
-+#ifdef THREADED
-+/* Attempts to receive and process 'batch' packets from 'netdev'. */
-+int
-+netdev_dispatch(struct netdev *netdev, int batch, pkt_handler h, u_char *user)
-+{
-+ int (*dispatch)(struct netdev*, int, pkt_handler, u_char *);
-+
-+ dispatch = netdev_get_dev(netdev)->netdev_class->dispatch;
-+ return dispatch ? dispatch(netdev, batch, h, user) : 0;
-+}
-+
-+/* Returns the file descriptor */
-+int
-+netdev_get_fd(struct netdev *netdev)
-+{
-+ int (*get_fd)(struct netdev *);
-+
-+ get_fd = netdev_get_dev(netdev)->netdev_class->get_fd;
-+ return get_fd ? get_fd(netdev) : 0;
-+}
-+#endif
-+
- /* Discards all packets waiting to be received from 'netdev'. */
- int
- netdev_drain(struct netdev *netdev)
-diff --git lib/netdev.h lib/netdev.h
-index 4b86c21..4fad796 100644
---- lib/netdev.h
-+++ lib/netdev.h
-@@ -21,6 +21,9 @@
- #include <stddef.h>
- #include <stdint.h>
- #include "openvswitch/types.h"
-+#ifdef THREADED
-+#include "dispatch.h"
-+#endif
-
- #ifdef __cplusplus
- extern "C" {
-@@ -107,6 +110,10 @@ int netdev_get_ifindex(const struct netdev *);
- int netdev_listen(struct netdev *);
- int netdev_recv(struct netdev *, struct ofpbuf *);
- void netdev_recv_wait(struct netdev *);
-+#ifdef THREADED
-+int netdev_dispatch(struct netdev *, int, pkt_handler, u_char *);
-+int netdev_get_fd(struct netdev *);
-+#endif
- int netdev_drain(struct netdev *);
-
- int netdev_send(struct netdev *, const struct ofpbuf *);
-diff --git lib/vlog.c lib/vlog.c
-index 899072e..b6bd4ef 100644
---- lib/vlog.c
-+++ lib/vlog.c
-@@ -34,6 +34,9 @@
- #include "timeval.h"
- #include "unixctl.h"
- #include "util.h"
-+#ifdef THREADED
-+#include <pthread.h>
-+#endif
-
- VLOG_DEFINE_THIS_MODULE(vlog);
-
-@@ -89,6 +92,10 @@ static FILE *log_file;
- /* vlog initialized? */
- static bool vlog_inited;
-
-+#ifdef THREADED
-+static pthread_mutex_t vlog_mutex;
-+#endif
-+
- static void format_log_message(const struct vlog_module *, enum vlog_level,
- enum vlog_facility, unsigned int msg_num,
- const char *message, va_list, struct ds *)
-@@ -484,6 +491,9 @@ vlog_init(void)
- return;
- }
- vlog_inited = true;
-+#ifdef THREADED
-+ pthread_mutex_init(&vlog_mutex, NULL);
-+#endif
-
- /* openlog() is allowed to keep the pointer passed in, without making a
- * copy. The daemonize code sometimes frees and replaces 'program_name',
-@@ -691,6 +701,9 @@ vlog_valist(const struct vlog_module *module, enum vlog_level level,
-
- ds_init(&s);
- ds_reserve(&s, 1024);
-+#ifdef THREADED
-+ pthread_mutex_lock(&vlog_mutex);
-+#endif
- msg_num++;
-
- if (log_to_console) {
-@@ -721,6 +734,9 @@ vlog_valist(const struct vlog_module *module, enum vlog_level level,
- fflush(log_file);
- }
-
-+#ifdef THREADED
-+ pthread_mutex_unlock(&vlog_mutex);
-+#endif
- ds_destroy(&s);
- errno = save_errno;
- }
-diff --git m4/openvswitch.m4 m4/openvswitch.m4
-index dca9f5f..084ceff 100644
---- m4/openvswitch.m4
-+++ m4/openvswitch.m4
-@@ -14,6 +14,25 @@
- # See the License for the specific language governing permissions and
- # limitations under the License.
-
-+dnl Check for --enable-threaded and updates CFLAGS.
-+AC_DEFUN([OVS_CHECK_THREADED],
-+ [AC_REQUIRE([AC_PROG_CC])
-+ AC_ARG_ENABLE(
-+ [threaded],
-+ [AC_HELP_STRING([--enable-threaded],
-+ [Enable threaded version of userspace implementation])],
-+ [case "${enableval}" in
-+ (yes) threaded=true ;;
-+ (no) threaded=false ;;
-+ (*) AC_MSG_ERROR([bad value ${enableval} for --enable-threaded]) ;;
-+ esac],
-+ [threaded=false])
-+ if $threaded; then
-+ AC_DEFINE([THREADED], [1],
-+ [Define to 1 if the threaded version of userspace
-+ implementation is enabled.])
-+ fi])
-+
- dnl Checks for --enable-coverage and updates CFLAGS and LDFLAGS appropriately.
- AC_DEFUN([OVS_CHECK_COVERAGE],
- [AC_REQUIRE([AC_PROG_CC])