summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrediano Ziglio <fziglio@redhat.com>2016-10-11 15:30:31 +0100
committerFrediano Ziglio <fziglio@redhat.com>2016-10-11 15:30:31 +0100
commit6e60eb8d59d0b75e56aa1486e988402e164d01d3 (patch)
tree36e696c67f9e45ef0bff9c8055490aa3b24ba32a
parent5fe7fcb3d758b8cd1e73de153ef3dacdf25379aa (diff)
Use ppoll and different threading
Instead of using a thread for reading from tun(s)/remote use a thread for every direction. This remove a lot of synchronisation from the code. Some test I did prove that this implementation although using a single thread for a direction is faster allowing to use higher speed. Also this make easier to reduce the internal queue at a minimum emulating better a real card and allowing to do some tests with QoS settings. Signed-off-by: Frediano Ziglio <fziglio@redhat.com>
-rw-r--r--tun.c294
1 files changed, 120 insertions, 174 deletions
diff --git a/tun.c b/tun.c
index 7a697b7..456d691 100644
--- a/tun.c
+++ b/tun.c
@@ -207,23 +207,12 @@ tun_set_server(int port)
}
#define MIN_PKT_LEN 2000u
-/* This buffer is quite big to account 2 flows in a single tun device
- * This to avoid that packet sent delay packet received (or viceversa)
- * We should use 2 tun devices to have 2 queues and reduce the
- * queue to a minimum.
- */
-#define PKT_BUF_LEN (1024u * 1024u * 32u)
-#define NUM_FLOWS 2
-static pthread_mutex_t pkt_buf_mtx = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t pkt_cond_write = PTHREAD_COND_INITIALIZER;
-static pthread_cond_t pkt_cond_read = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t log_mtx = PTHREAD_MUTEX_INITIALIZER;
typedef struct packet {
struct packet *next;
uint64_t time_to_send;
- int dest_tun;
uint16_t len;
/* minimum MIN_PKT_LEN */
uint8_t data[];
@@ -232,29 +221,16 @@ typedef struct packet {
typedef struct flow_info {
uint64_t bytes_from_first_read;
uint64_t first_received_at;
+ int dest_fd;
packet_t *first_packet, *last_packet;
} flow_info;
-static flow_info flows[NUM_FLOWS];
-static uint32_t bytes_queued = 0;
-
-static inline uint32_t
-buf_allocated(void)
-{
- return bytes_queued;
-}
-
enum { PKT_DEBUG_ENABLED = 0 };
/* get packet to write to */
static packet_t *
alloc_packet(void)
{
- pthread_mutex_lock(&pkt_buf_mtx);
- while (buf_allocated() > PKT_BUF_LEN - MIN_PKT_LEN)
- pthread_cond_wait(&pkt_cond_read, &pkt_buf_mtx);
- pthread_mutex_unlock(&pkt_buf_mtx);
-
packet_t *pkt = malloc(sizeof(packet_t) + MIN_PKT_LEN);
memset(pkt, 0, sizeof(*pkt));
return pkt;
@@ -268,64 +244,16 @@ add_packet(flow_info *flow, packet_t *pkt)
printf("added packet len %u\n", pkt->len);
pkt = realloc(pkt, sizeof(*pkt) + pkt->len);
- pthread_mutex_lock(&pkt_buf_mtx);
- bytes_queued += pkt->len;
if (flow->last_packet)
flow->last_packet->next = pkt;
else
flow->first_packet = pkt;
flow->last_packet = pkt;
- pthread_cond_signal(&pkt_cond_write);
- pthread_mutex_unlock(&pkt_buf_mtx);
-}
-
-static packet_t *
-get_packet(void)
-{
- flow_info *min_flow;
- unsigned n;
-
- pthread_mutex_lock(&pkt_buf_mtx);
- while (!term && buf_allocated() == 0) {
- pthread_cond_wait(&pkt_cond_write, &pkt_buf_mtx);
- }
- if (term) {
- pthread_mutex_unlock(&pkt_buf_mtx);
- return NULL;
- }
-
-
- /* get the flow with the packet with minimum time to send */
- min_flow = NULL;
- for (n = 0; n < NUM_FLOWS; ++n) {
- flow_info *flow = &flows[n];
- if (!flow->first_packet)
- continue;
- if (!min_flow) {
- min_flow = flow;
- continue;
- }
- if (flow->first_packet->time_to_send < min_flow->first_packet->time_to_send)
- min_flow = flow;
- }
-
- packet_t *pkt = min_flow->first_packet;
- min_flow->first_packet = pkt->next;
- if (!min_flow->first_packet)
- min_flow->last_packet = NULL;
-
- pthread_mutex_unlock(&pkt_buf_mtx);
- pkt->next = NULL;
- return pkt;
}
static void
release_packet(packet_t *pkt)
{
- pthread_mutex_lock(&pkt_buf_mtx);
- bytes_queued -= pkt->len;
- pthread_cond_signal(&pkt_cond_read);
- pthread_mutex_unlock(&pkt_buf_mtx);
free(pkt);
}
@@ -334,34 +262,14 @@ log_write_ip(const void *raw_ip, size_t len)
{
if (!pcap)
return;
+
pthread_mutex_lock(&log_mtx);
- pcap_write_ip(pcap, raw_ip, len);
+ /* check again to avoid races */
+ if (pcap)
+ pcap_write_ip(pcap, raw_ip, len);
pthread_mutex_unlock(&log_mtx);
}
-static void*
-writer_proc(void *ptr)
-{
- packet_t *pkt;
-
- while ((pkt = get_packet())) {
- uint64_t curr_time = get_time_us();
- if (pkt->time_to_send > curr_time)
- usleep(pkt->time_to_send - curr_time);
- if (remote_sock >= 0) {
- if (remote_connected)
- sendto(remote_sock, pkt->data, pkt->len, MSG_NOSIGNAL,
- &remote_addr, sizeof(remote_addr));
- } else {
- if (pkt->dest_tun == tun_fd)
- log_write_ip(pkt->data, pkt->len);
- write(pkt->dest_tun, pkt->data, pkt->len);
- }
- release_packet(pkt);
- }
- return NULL;
-}
-
/**/
static double bytes2time_ratio;
@@ -396,6 +304,34 @@ handle_fake_packet(const uint8_t *data, size_t len)
return true;
}
+static void*
+handle_remote_flow(void *param)
+{
+ packet_t *pkt = alloc_packet();
+ while (!term) {
+ int len;
+ if (!is_server) {
+ len = recv(remote_sock, pkt->data, MIN_PKT_LEN, 0);
+ } else {
+ /* for server we record the source address to
+ * be able to send packet back */
+ socklen_t sock_len = sizeof(remote_addr);
+ len = recvfrom(remote_sock, pkt->data, MIN_PKT_LEN, 0,
+ &remote_addr, &sock_len);
+ }
+ if (len <= 0)
+ break;
+ remote_connected = true;
+ if (len < sizeof(struct iphdr))
+ continue;
+ if (!handle_fake_packet(pkt->data, len)) {
+ log_write_ip(pkt->data, pkt->len);
+ write(tun_fd, pkt->data, len);
+ }
+ }
+ return NULL;
+}
+
static inline unsigned
reduce_cksum(unsigned sum)
{
@@ -460,86 +396,90 @@ nat_addresses(struct iphdr *ip)
}
}
-void
-handle_tun(void)
+static struct timespec*
+compute_polling_timeout(flow_info *flow, struct pollfd *poll_fd, struct timespec *ts)
{
- packet_t *pkt;
- pthread_t writer;
- flow_info *flow;
-
- memset(&flows, 0, sizeof(flows));
-
- bytes2time_ratio = (double) 1000000.0 / rate_bytes;
+ packet_t *pkt = flow->first_packet;
+ if (!pkt)
+ return NULL;
- if (tun_log_filename) {
- pcap = pcap_open(tun_log_filename);
- if (!pcap) {
- perror("error opening log file");
- exit(EXIT_FAILURE);
+ /* send expired packets */
+ uint64_t curr_time = get_time_us();
+ while (pkt && pkt->time_to_send <= curr_time) {
+ packet_t *next = pkt->next;
+ if (remote_sock >= 0) {
+ if (remote_connected)
+ sendto(remote_sock, pkt->data, pkt->len, MSG_NOSIGNAL,
+ &remote_addr, sizeof(remote_addr));
+ } else {
+ if (flow->dest_fd == tun_fd)
+ log_write_ip(pkt->data, pkt->len);
+ write(flow->dest_fd, pkt->data, pkt->len);
}
+ release_packet(pkt);
+ pkt = next;
+ }
+ flow->first_packet = pkt;
+ if (!pkt) {
+ flow->last_packet = NULL;
+ return NULL;
}
- pthread_create(&writer, NULL, writer_proc, NULL);
+ /* we must wakeup when we need to send a packet */
+ uint64_t timeout_us = pkt->time_to_send;
+
+ /* Do not dequeue too much packets.
+ * On a real card packets are dequeued when cable is free (actually
+ * there is a buffer in the middle). Our virtual cable is free when
+ * the last packet have to be send in a time before latency.
+ * Add an extra 5ms to account for scheduler or other delays. */
+ uint64_t timeout_read_us = flow->last_packet->time_to_send - latency_us - 5 * 1000;
+ if (timeout_read_us > curr_time) {
+ poll_fd->fd = -1;
+ if (timeout_us > timeout_read_us)
+ timeout_us = timeout_read_us;
+ }
- uint64_t old_time = get_time_us();
- uint64_t tot_bytes = 0;
- uint32_t num_packets = 0;
+ timeout_us -= curr_time;
+ ts->tv_sec = timeout_us / 1000000u;
+ ts->tv_nsec = (timeout_us % 1000000u) * 1000;
+ return ts;
+}
- struct pollfd fds[2];
- fds[0].fd = tun_fd;
+static void*
+handle_tun_flow(void *param)
+{
+ int from_fd = param ? tun_fd : tun_fd_back;
+ int dest_fd = param ? tun_fd_back : tun_fd;
+
+ struct pollfd fds[1];
fds[0].events = POLLIN;
- fds[1].fd = remote_sock >= 0 ? remote_sock : tun_fd_back;
- fds[1].events = POLLIN;
- fds[1].revents = 0;
- pkt = alloc_packet();
+ packet_t *pkt = alloc_packet();
+ flow_info flow[1];
+ memset(flow, 0, sizeof(flow));
+ flow->dest_fd = dest_fd;
while (!term) {
- if (poll(fds, 2, -1) < 0) {
+ struct timespec ts, *pts;
+ fds[0].fd = from_fd;
+ pts = compute_polling_timeout(flow, &fds[0], &ts);
+ if (ppoll(fds, 1, pts, NULL) < 0) {
if (errno != EINTR)
break;
continue;
}
- int len;
- if ((fds[1].revents & POLLIN) != 0 && remote_sock >= 0) {
- if (!is_server) {
- len = recv(remote_sock, pkt->data, MIN_PKT_LEN, 0);
- } else {
- /* for server we record the source address to
- * be able to send packet back */
- socklen_t sock_len = sizeof(remote_addr);
- len = recvfrom(remote_sock, pkt->data, MIN_PKT_LEN, 0,
- &remote_addr, &sock_len);
- }
- if (len <= 0)
- break;
- remote_connected = true;
- if (len < sizeof(struct iphdr))
- continue;
- if (!handle_fake_packet(pkt->data, len)) {
- log_write_ip(pkt->data, pkt->len);
- write(tun_fd, pkt->data, len);
- }
- }
-
- if ((fds[1].revents & POLLIN) != 0 && tun_fd_back >= 0) {
- len = read(tun_fd_back, pkt->data, MIN_PKT_LEN);
- pkt->dest_tun = tun_fd;
- flow = &flows[1];
- } else if ((fds[0].revents & POLLIN) != 0) {
- len = read(tun_fd, pkt->data, MIN_PKT_LEN);
- pkt->dest_tun = tun_fd_back;
- flow = &flows[0];
- } else {
+ if ((fds[0].revents & POLLIN) == 0)
continue;
- }
+
+ int len = read(from_fd, pkt->data, MIN_PKT_LEN);
if (len < 0)
break;
pkt->len = len;
len += framing_bytes;
- if (pkt->dest_tun == tun_fd_back)
+ if (from_fd == tun_fd)
log_write_ip(pkt->data, pkt->len);
struct iphdr *ip = (struct iphdr *) pkt->data;
@@ -548,18 +488,6 @@ handle_tun(void)
uint64_t curr_time = get_time_us();
- /* just some debugging on speed */
- /* TODO improve */
- tot_bytes += len;
- num_packets++;
- if (old_time + 1000000u <= curr_time) {
- printf("bytes/s %g\n", (double) tot_bytes * 1000000.0 / (curr_time - old_time));
- printf("%u packets (avg %u)\n", num_packets, num_packets ? (unsigned) (tot_bytes / num_packets) : 0u);
- old_time = curr_time;
- tot_bytes = 0;
- num_packets = 0;
- }
-
/* Compute time to send the packet.
* Adjusting for latency is easy, just current time + latency.
* For bandwidth is a bit more complicated as we must take
@@ -592,15 +520,33 @@ handle_tun(void)
add_packet(flow, pkt);
pkt = alloc_packet();
}
- term = 1;
+ return NULL;
+}
- /* wake up write if blocked to pick up the termination */
- pthread_mutex_lock(&pkt_buf_mtx);
- pthread_cond_signal(&pkt_cond_write);
- pthread_mutex_unlock(&pkt_buf_mtx);
+void
+handle_tun(void)
+{
+ pthread_t back_flow_thread;
- /* wait writer termination */
- pthread_join(writer, NULL);
+ bytes2time_ratio = (double) 1000000.0 / rate_bytes;
+ if (tun_log_filename) {
+ pcap = pcap_open(tun_log_filename);
+ if (!pcap) {
+ perror("error opening log file");
+ exit(EXIT_FAILURE);
+ }
+ }
+
+ if (remote_sock >= 0)
+ pthread_create(&back_flow_thread, NULL, handle_remote_flow, NULL);
+ else
+ pthread_create(&back_flow_thread, NULL, handle_tun_flow, NULL);
+ handle_tun_flow((void*) (uintptr_t) 1);
+
+ /* close capture file */
+ pthread_mutex_lock(&log_mtx);
pcap_close(pcap);
+ pcap = NULL;
+ pthread_mutex_unlock(&log_mtx);
}