diff options
author | Frediano Ziglio <fziglio@redhat.com> | 2016-07-08 08:37:19 +0100 |
---|---|---|
committer | Frediano Ziglio <fziglio@redhat.com> | 2016-07-08 09:24:20 +0100 |
commit | 6a32d22853e539ddf8790d30f3f6533861db758c (patch) | |
tree | 3d18c07e268a299cdb1eecbc12b19cc0c2290662 | |
parent | 90d29f161431c8e5dff7a91bd5942d05c2cc1242 (diff) |
use two flows (one upstream one downstream)
Do not make receiving downstream if upstream is too high
-rw-r--r-- | tun.c | 125 |
1 files changed, 66 insertions, 59 deletions
@@ -79,109 +79,112 @@ int tun_setup(void) } #define MIN_PKT_LEN 2000u -#define MIN_LEN (MIN_PKT_LEN + sizeof(packet_t)) -#define PKT_BUF_LEN (1024u * 1024u * 4u) -#define ROUND_UP(n, m) (((n) + ((m) - 1)) & -(m)) +#define PKT_BUF_LEN (1024u * 1024u * 32u) +#define NUM_FLOWS 2 static pthread_mutex_t pkt_buf_mtx = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t pkt_cond_write = PTHREAD_COND_INITIALIZER; static pthread_cond_t pkt_cond_read = PTHREAD_COND_INITIALIZER; -/* buffer is either - * 0 head tail end - * (size = tail - head) - * or - * 0 tail head end - * (size = PKT_BUF_LEN - (head - tail = PKT_BUF_LEN + tail - head) - */ -static uint8_t pkt_buf[PKT_BUF_LEN]; -/* allocate from tail, read from head */ -static uint32_t pkt_head, pkt_tail; -typedef struct { +typedef struct packet { + struct packet *next; uint64_t time_to_send; uint16_t len; /* minimum MIN_PKT_LEN */ uint8_t data[]; } packet_t; +typedef struct flow_info { + uint64_t bytes_from_first_read; + uint64_t first_received_at; + packet_t *first_packet, *last_packet; +} flow_info; + +static flow_info flows[NUM_FLOWS]; +static uint32_t bytes_queued = 0; + static inline uint32_t buf_allocated(void) { - return pkt_tail >= pkt_head ? pkt_tail - pkt_head : PKT_BUF_LEN + pkt_tail - pkt_head; + return bytes_queued; } enum { PKT_DEBUG_ENABLED = 0 }; -#define PKT_DEBUG do { \ - if (PKT_DEBUG_ENABLED) \ - printf("%s:%d: head %u tail %u\n", __func__, __LINE__, pkt_head, pkt_tail); \ -} while(0) -#define PKT_DEBUG_OUT do { \ - if (PKT_DEBUG_ENABLED) \ - printf("%s:%d: head %u tail %u -> %u\n", __func__, __LINE__, pkt_head, pkt_tail, (unsigned) ((uint8_t *) pkt - pkt_buf)); \ -} while(0) /* get packet to write to */ static packet_t * alloc_packet(void) { - PKT_DEBUG; - - packet_t *pkt; - pthread_mutex_lock(&pkt_buf_mtx); - while (buf_allocated() > PKT_BUF_LEN - 2 * MIN_LEN) + while (buf_allocated() > PKT_BUF_LEN - MIN_PKT_LEN) pthread_cond_wait(&pkt_cond_read, &pkt_buf_mtx); - if (pkt_tail >= pkt_head && PKT_BUF_LEN - pkt_tail < MIN_LEN) - pkt_tail = 0; - pkt = (packet_t *) (pkt_buf + pkt_tail); pthread_mutex_unlock(&pkt_buf_mtx); - PKT_DEBUG_OUT; + + packet_t *pkt = malloc(sizeof(packet_t) + MIN_PKT_LEN); + memset(pkt, 0, sizeof(*pkt)); return pkt; } /* add packet to list */ static void -add_packet(packet_t *pkt) +add_packet(flow_info *flow, packet_t *pkt) { - PKT_DEBUG; if (PKT_DEBUG_ENABLED) printf("added packet len %u\n", pkt->len); + pkt = realloc(pkt, sizeof(*pkt) + pkt->len); pthread_mutex_lock(&pkt_buf_mtx); - pkt_tail += ROUND_UP(sizeof(packet_t) + pkt->len, 16); + bytes_queued += pkt->len; + if (flow->last_packet) + flow->last_packet->next = pkt; + else + flow->first_packet = pkt; + flow->last_packet = pkt; pthread_cond_signal(&pkt_cond_write); pthread_mutex_unlock(&pkt_buf_mtx); - PKT_DEBUG; } static packet_t * get_packet(void) { - PKT_DEBUG; - - packet_t *pkt; + flow_info *min_flow; + unsigned n; pthread_mutex_lock(&pkt_buf_mtx); while (buf_allocated() == 0) pthread_cond_wait(&pkt_cond_write, &pkt_buf_mtx); - pkt = (packet_t *) (pkt_buf + pkt_head); + + min_flow = NULL; + for (n = 0; n < NUM_FLOWS; ++n) { + flow_info *flow = &flows[n]; + if (!flow->first_packet) + continue; + if (!min_flow) { + min_flow = flow; + continue; + } + if (flow->first_packet->time_to_send < min_flow->first_packet->time_to_send) + min_flow = flow; + } + + packet_t *pkt = min_flow->first_packet; + min_flow->first_packet = pkt->next; + if (!min_flow->first_packet) + min_flow->last_packet = NULL; + pthread_mutex_unlock(&pkt_buf_mtx); - PKT_DEBUG_OUT; + pkt->next = NULL; return pkt; } static void release_packet(packet_t *pkt) { - PKT_DEBUG; - pthread_mutex_lock(&pkt_buf_mtx); - pkt_head += ROUND_UP(sizeof(packet_t) + pkt->len, 16); - if (pkt_head >= pkt_tail && PKT_BUF_LEN - pkt_head < MIN_LEN) - pkt_head = 0; + bytes_queued -= pkt->len; pthread_cond_signal(&pkt_cond_read); pthread_mutex_unlock(&pkt_buf_mtx); - PKT_DEBUG; + free(pkt); } static void* writer_proc(void *ptr) @@ -213,8 +216,9 @@ handle_tun(int fd) { packet_t *pkt; pthread_t writer; - uint64_t bytes_from_first_read = 0; - uint64_t first_received_at = 0; + flow_info *flow; + + memset(&flows, 0, sizeof(flows)); bytes2time_ratio = (double) 1000000.0 / rate_bytes; @@ -229,6 +233,9 @@ handle_tun(int fd) if (len < 0) break; + struct iphdr *ip = (struct iphdr *) pkt->data; + flow = &flows[ip->daddr == htonl(0xc0a87f00)]; + uint64_t curr_time = get_time_us(); uint64_t time_to_send; @@ -243,25 +250,25 @@ handle_tun(int fd) } pkt->len = len; - if (bytes_from_first_read == 0 || curr_time > first_received_at + bytes2time(bytes_from_first_read)) { + if (flow->bytes_from_first_read == 0 + || curr_time > flow->first_received_at + bytes2time(flow->bytes_from_first_read)) { time_to_send = curr_time; - first_received_at = curr_time; - bytes_from_first_read = len; + flow->first_received_at = curr_time; + flow->bytes_from_first_read = len; } else { - time_to_send = first_received_at + bytes2time(bytes_from_first_read); - bytes_from_first_read += len; + time_to_send = flow->first_received_at + bytes2time(flow->bytes_from_first_read); + flow->bytes_from_first_read += len; /* reduce values avoiding possible overflows */ - while (curr_time >= first_received_at + 1000000u && bytes_from_first_read >= rate_bytes) { - first_received_at += 1000000u; - bytes_from_first_read -= rate_bytes; + while (curr_time >= flow->first_received_at + 1000000u && flow->bytes_from_first_read >= rate_bytes) { + flow->first_received_at += 1000000u; + flow->bytes_from_first_read -= rate_bytes; } } pkt->time_to_send = time_to_send + latency_us; - struct iphdr *ip = (struct iphdr *) pkt->data; u_int32_t addr = ip->saddr; ip->saddr = ip->daddr; ip->daddr = addr; - add_packet(pkt); + add_packet(flow, pkt); } } |