diff options
author | Frediano Ziglio <fziglio@redhat.com> | 2016-10-11 15:30:31 +0100 |
---|---|---|
committer | Frediano Ziglio <fziglio@redhat.com> | 2016-10-11 15:30:31 +0100 |
commit | 6e60eb8d59d0b75e56aa1486e988402e164d01d3 (patch) | |
tree | 36e696c67f9e45ef0bff9c8055490aa3b24ba32a | |
parent | 5fe7fcb3d758b8cd1e73de153ef3dacdf25379aa (diff) |
Use ppoll and different threading
Instead of using a thread for reading from tun(s)/remote use a
thread for every direction.
This remove a lot of synchronisation from the code.
Some test I did prove that this implementation although using
a single thread for a direction is faster allowing to use higher
speed.
Also this make easier to reduce the internal queue at a minimum
emulating better a real card and allowing to do some tests with QoS
settings.
Signed-off-by: Frediano Ziglio <fziglio@redhat.com>
-rw-r--r-- | tun.c | 294 |
1 files changed, 120 insertions, 174 deletions
@@ -207,23 +207,12 @@ tun_set_server(int port) } #define MIN_PKT_LEN 2000u -/* This buffer is quite big to account 2 flows in a single tun device - * This to avoid that packet sent delay packet received (or viceversa) - * We should use 2 tun devices to have 2 queues and reduce the - * queue to a minimum. - */ -#define PKT_BUF_LEN (1024u * 1024u * 32u) -#define NUM_FLOWS 2 -static pthread_mutex_t pkt_buf_mtx = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t pkt_cond_write = PTHREAD_COND_INITIALIZER; -static pthread_cond_t pkt_cond_read = PTHREAD_COND_INITIALIZER; static pthread_mutex_t log_mtx = PTHREAD_MUTEX_INITIALIZER; typedef struct packet { struct packet *next; uint64_t time_to_send; - int dest_tun; uint16_t len; /* minimum MIN_PKT_LEN */ uint8_t data[]; @@ -232,29 +221,16 @@ typedef struct packet { typedef struct flow_info { uint64_t bytes_from_first_read; uint64_t first_received_at; + int dest_fd; packet_t *first_packet, *last_packet; } flow_info; -static flow_info flows[NUM_FLOWS]; -static uint32_t bytes_queued = 0; - -static inline uint32_t -buf_allocated(void) -{ - return bytes_queued; -} - enum { PKT_DEBUG_ENABLED = 0 }; /* get packet to write to */ static packet_t * alloc_packet(void) { - pthread_mutex_lock(&pkt_buf_mtx); - while (buf_allocated() > PKT_BUF_LEN - MIN_PKT_LEN) - pthread_cond_wait(&pkt_cond_read, &pkt_buf_mtx); - pthread_mutex_unlock(&pkt_buf_mtx); - packet_t *pkt = malloc(sizeof(packet_t) + MIN_PKT_LEN); memset(pkt, 0, sizeof(*pkt)); return pkt; @@ -268,64 +244,16 @@ add_packet(flow_info *flow, packet_t *pkt) printf("added packet len %u\n", pkt->len); pkt = realloc(pkt, sizeof(*pkt) + pkt->len); - pthread_mutex_lock(&pkt_buf_mtx); - bytes_queued += pkt->len; if (flow->last_packet) flow->last_packet->next = pkt; else flow->first_packet = pkt; flow->last_packet = pkt; - pthread_cond_signal(&pkt_cond_write); - pthread_mutex_unlock(&pkt_buf_mtx); -} - -static packet_t * -get_packet(void) -{ - flow_info *min_flow; - unsigned n; - - pthread_mutex_lock(&pkt_buf_mtx); - while (!term && buf_allocated() == 0) { - pthread_cond_wait(&pkt_cond_write, &pkt_buf_mtx); - } - if (term) { - pthread_mutex_unlock(&pkt_buf_mtx); - return NULL; - } - - - /* get the flow with the packet with minimum time to send */ - min_flow = NULL; - for (n = 0; n < NUM_FLOWS; ++n) { - flow_info *flow = &flows[n]; - if (!flow->first_packet) - continue; - if (!min_flow) { - min_flow = flow; - continue; - } - if (flow->first_packet->time_to_send < min_flow->first_packet->time_to_send) - min_flow = flow; - } - - packet_t *pkt = min_flow->first_packet; - min_flow->first_packet = pkt->next; - if (!min_flow->first_packet) - min_flow->last_packet = NULL; - - pthread_mutex_unlock(&pkt_buf_mtx); - pkt->next = NULL; - return pkt; } static void release_packet(packet_t *pkt) { - pthread_mutex_lock(&pkt_buf_mtx); - bytes_queued -= pkt->len; - pthread_cond_signal(&pkt_cond_read); - pthread_mutex_unlock(&pkt_buf_mtx); free(pkt); } @@ -334,34 +262,14 @@ log_write_ip(const void *raw_ip, size_t len) { if (!pcap) return; + pthread_mutex_lock(&log_mtx); - pcap_write_ip(pcap, raw_ip, len); + /* check again to avoid races */ + if (pcap) + pcap_write_ip(pcap, raw_ip, len); pthread_mutex_unlock(&log_mtx); } -static void* -writer_proc(void *ptr) -{ - packet_t *pkt; - - while ((pkt = get_packet())) { - uint64_t curr_time = get_time_us(); - if (pkt->time_to_send > curr_time) - usleep(pkt->time_to_send - curr_time); - if (remote_sock >= 0) { - if (remote_connected) - sendto(remote_sock, pkt->data, pkt->len, MSG_NOSIGNAL, - &remote_addr, sizeof(remote_addr)); - } else { - if (pkt->dest_tun == tun_fd) - log_write_ip(pkt->data, pkt->len); - write(pkt->dest_tun, pkt->data, pkt->len); - } - release_packet(pkt); - } - return NULL; -} - /**/ static double bytes2time_ratio; @@ -396,6 +304,34 @@ handle_fake_packet(const uint8_t *data, size_t len) return true; } +static void* +handle_remote_flow(void *param) +{ + packet_t *pkt = alloc_packet(); + while (!term) { + int len; + if (!is_server) { + len = recv(remote_sock, pkt->data, MIN_PKT_LEN, 0); + } else { + /* for server we record the source address to + * be able to send packet back */ + socklen_t sock_len = sizeof(remote_addr); + len = recvfrom(remote_sock, pkt->data, MIN_PKT_LEN, 0, + &remote_addr, &sock_len); + } + if (len <= 0) + break; + remote_connected = true; + if (len < sizeof(struct iphdr)) + continue; + if (!handle_fake_packet(pkt->data, len)) { + log_write_ip(pkt->data, pkt->len); + write(tun_fd, pkt->data, len); + } + } + return NULL; +} + static inline unsigned reduce_cksum(unsigned sum) { @@ -460,86 +396,90 @@ nat_addresses(struct iphdr *ip) } } -void -handle_tun(void) +static struct timespec* +compute_polling_timeout(flow_info *flow, struct pollfd *poll_fd, struct timespec *ts) { - packet_t *pkt; - pthread_t writer; - flow_info *flow; - - memset(&flows, 0, sizeof(flows)); - - bytes2time_ratio = (double) 1000000.0 / rate_bytes; + packet_t *pkt = flow->first_packet; + if (!pkt) + return NULL; - if (tun_log_filename) { - pcap = pcap_open(tun_log_filename); - if (!pcap) { - perror("error opening log file"); - exit(EXIT_FAILURE); + /* send expired packets */ + uint64_t curr_time = get_time_us(); + while (pkt && pkt->time_to_send <= curr_time) { + packet_t *next = pkt->next; + if (remote_sock >= 0) { + if (remote_connected) + sendto(remote_sock, pkt->data, pkt->len, MSG_NOSIGNAL, + &remote_addr, sizeof(remote_addr)); + } else { + if (flow->dest_fd == tun_fd) + log_write_ip(pkt->data, pkt->len); + write(flow->dest_fd, pkt->data, pkt->len); } + release_packet(pkt); + pkt = next; + } + flow->first_packet = pkt; + if (!pkt) { + flow->last_packet = NULL; + return NULL; } - pthread_create(&writer, NULL, writer_proc, NULL); + /* we must wakeup when we need to send a packet */ + uint64_t timeout_us = pkt->time_to_send; + + /* Do not dequeue too much packets. + * On a real card packets are dequeued when cable is free (actually + * there is a buffer in the middle). Our virtual cable is free when + * the last packet have to be send in a time before latency. + * Add an extra 5ms to account for scheduler or other delays. */ + uint64_t timeout_read_us = flow->last_packet->time_to_send - latency_us - 5 * 1000; + if (timeout_read_us > curr_time) { + poll_fd->fd = -1; + if (timeout_us > timeout_read_us) + timeout_us = timeout_read_us; + } - uint64_t old_time = get_time_us(); - uint64_t tot_bytes = 0; - uint32_t num_packets = 0; + timeout_us -= curr_time; + ts->tv_sec = timeout_us / 1000000u; + ts->tv_nsec = (timeout_us % 1000000u) * 1000; + return ts; +} - struct pollfd fds[2]; - fds[0].fd = tun_fd; +static void* +handle_tun_flow(void *param) +{ + int from_fd = param ? tun_fd : tun_fd_back; + int dest_fd = param ? tun_fd_back : tun_fd; + + struct pollfd fds[1]; fds[0].events = POLLIN; - fds[1].fd = remote_sock >= 0 ? remote_sock : tun_fd_back; - fds[1].events = POLLIN; - fds[1].revents = 0; - pkt = alloc_packet(); + packet_t *pkt = alloc_packet(); + flow_info flow[1]; + memset(flow, 0, sizeof(flow)); + flow->dest_fd = dest_fd; while (!term) { - if (poll(fds, 2, -1) < 0) { + struct timespec ts, *pts; + fds[0].fd = from_fd; + pts = compute_polling_timeout(flow, &fds[0], &ts); + if (ppoll(fds, 1, pts, NULL) < 0) { if (errno != EINTR) break; continue; } - int len; - if ((fds[1].revents & POLLIN) != 0 && remote_sock >= 0) { - if (!is_server) { - len = recv(remote_sock, pkt->data, MIN_PKT_LEN, 0); - } else { - /* for server we record the source address to - * be able to send packet back */ - socklen_t sock_len = sizeof(remote_addr); - len = recvfrom(remote_sock, pkt->data, MIN_PKT_LEN, 0, - &remote_addr, &sock_len); - } - if (len <= 0) - break; - remote_connected = true; - if (len < sizeof(struct iphdr)) - continue; - if (!handle_fake_packet(pkt->data, len)) { - log_write_ip(pkt->data, pkt->len); - write(tun_fd, pkt->data, len); - } - } - - if ((fds[1].revents & POLLIN) != 0 && tun_fd_back >= 0) { - len = read(tun_fd_back, pkt->data, MIN_PKT_LEN); - pkt->dest_tun = tun_fd; - flow = &flows[1]; - } else if ((fds[0].revents & POLLIN) != 0) { - len = read(tun_fd, pkt->data, MIN_PKT_LEN); - pkt->dest_tun = tun_fd_back; - flow = &flows[0]; - } else { + if ((fds[0].revents & POLLIN) == 0) continue; - } + + int len = read(from_fd, pkt->data, MIN_PKT_LEN); if (len < 0) break; pkt->len = len; len += framing_bytes; - if (pkt->dest_tun == tun_fd_back) + if (from_fd == tun_fd) log_write_ip(pkt->data, pkt->len); struct iphdr *ip = (struct iphdr *) pkt->data; @@ -548,18 +488,6 @@ handle_tun(void) uint64_t curr_time = get_time_us(); - /* just some debugging on speed */ - /* TODO improve */ - tot_bytes += len; - num_packets++; - if (old_time + 1000000u <= curr_time) { - printf("bytes/s %g\n", (double) tot_bytes * 1000000.0 / (curr_time - old_time)); - printf("%u packets (avg %u)\n", num_packets, num_packets ? (unsigned) (tot_bytes / num_packets) : 0u); - old_time = curr_time; - tot_bytes = 0; - num_packets = 0; - } - /* Compute time to send the packet. * Adjusting for latency is easy, just current time + latency. * For bandwidth is a bit more complicated as we must take @@ -592,15 +520,33 @@ handle_tun(void) add_packet(flow, pkt); pkt = alloc_packet(); } - term = 1; + return NULL; +} - /* wake up write if blocked to pick up the termination */ - pthread_mutex_lock(&pkt_buf_mtx); - pthread_cond_signal(&pkt_cond_write); - pthread_mutex_unlock(&pkt_buf_mtx); +void +handle_tun(void) +{ + pthread_t back_flow_thread; - /* wait writer termination */ - pthread_join(writer, NULL); + bytes2time_ratio = (double) 1000000.0 / rate_bytes; + if (tun_log_filename) { + pcap = pcap_open(tun_log_filename); + if (!pcap) { + perror("error opening log file"); + exit(EXIT_FAILURE); + } + } + + if (remote_sock >= 0) + pthread_create(&back_flow_thread, NULL, handle_remote_flow, NULL); + else + pthread_create(&back_flow_thread, NULL, handle_tun_flow, NULL); + handle_tun_flow((void*) (uintptr_t) 1); + + /* close capture file */ + pthread_mutex_lock(&log_mtx); pcap_close(pcap); + pcap = NULL; + pthread_mutex_unlock(&log_mtx); } |