summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrediano Ziglio <fziglio@redhat.com>2016-07-08 08:37:19 +0100
committerFrediano Ziglio <fziglio@redhat.com>2016-07-08 09:24:20 +0100
commit6a32d22853e539ddf8790d30f3f6533861db758c (patch)
tree3d18c07e268a299cdb1eecbc12b19cc0c2290662
parent90d29f161431c8e5dff7a91bd5942d05c2cc1242 (diff)
use two flows (one upstream one downstream)
Do not make receiving downstream if upstream is too high
-rw-r--r--tun.c125
1 files changed, 66 insertions, 59 deletions
diff --git a/tun.c b/tun.c
index 3775824..568e32a 100644
--- a/tun.c
+++ b/tun.c
@@ -79,109 +79,112 @@ int tun_setup(void)
}
#define MIN_PKT_LEN 2000u
-#define MIN_LEN (MIN_PKT_LEN + sizeof(packet_t))
-#define PKT_BUF_LEN (1024u * 1024u * 4u)
-#define ROUND_UP(n, m) (((n) + ((m) - 1)) & -(m))
+#define PKT_BUF_LEN (1024u * 1024u * 32u)
+#define NUM_FLOWS 2
static pthread_mutex_t pkt_buf_mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t pkt_cond_write = PTHREAD_COND_INITIALIZER;
static pthread_cond_t pkt_cond_read = PTHREAD_COND_INITIALIZER;
-/* buffer is either
- * 0 head tail end
- * (size = tail - head)
- * or
- * 0 tail head end
- * (size = PKT_BUF_LEN - (head - tail = PKT_BUF_LEN + tail - head)
- */
-static uint8_t pkt_buf[PKT_BUF_LEN];
-/* allocate from tail, read from head */
-static uint32_t pkt_head, pkt_tail;
-typedef struct {
+typedef struct packet {
+ struct packet *next;
uint64_t time_to_send;
uint16_t len;
/* minimum MIN_PKT_LEN */
uint8_t data[];
} packet_t;
+typedef struct flow_info {
+ uint64_t bytes_from_first_read;
+ uint64_t first_received_at;
+ packet_t *first_packet, *last_packet;
+} flow_info;
+
+static flow_info flows[NUM_FLOWS];
+static uint32_t bytes_queued = 0;
+
static inline uint32_t
buf_allocated(void)
{
- return pkt_tail >= pkt_head ? pkt_tail - pkt_head : PKT_BUF_LEN + pkt_tail - pkt_head;
+ return bytes_queued;
}
enum { PKT_DEBUG_ENABLED = 0 };
-#define PKT_DEBUG do { \
- if (PKT_DEBUG_ENABLED) \
- printf("%s:%d: head %u tail %u\n", __func__, __LINE__, pkt_head, pkt_tail); \
-} while(0)
-#define PKT_DEBUG_OUT do { \
- if (PKT_DEBUG_ENABLED) \
- printf("%s:%d: head %u tail %u -> %u\n", __func__, __LINE__, pkt_head, pkt_tail, (unsigned) ((uint8_t *) pkt - pkt_buf)); \
-} while(0)
/* get packet to write to */
static packet_t *
alloc_packet(void)
{
- PKT_DEBUG;
-
- packet_t *pkt;
-
pthread_mutex_lock(&pkt_buf_mtx);
- while (buf_allocated() > PKT_BUF_LEN - 2 * MIN_LEN)
+ while (buf_allocated() > PKT_BUF_LEN - MIN_PKT_LEN)
pthread_cond_wait(&pkt_cond_read, &pkt_buf_mtx);
- if (pkt_tail >= pkt_head && PKT_BUF_LEN - pkt_tail < MIN_LEN)
- pkt_tail = 0;
- pkt = (packet_t *) (pkt_buf + pkt_tail);
pthread_mutex_unlock(&pkt_buf_mtx);
- PKT_DEBUG_OUT;
+
+ packet_t *pkt = malloc(sizeof(packet_t) + MIN_PKT_LEN);
+ memset(pkt, 0, sizeof(*pkt));
return pkt;
}
/* add packet to list */
static void
-add_packet(packet_t *pkt)
+add_packet(flow_info *flow, packet_t *pkt)
{
- PKT_DEBUG;
if (PKT_DEBUG_ENABLED)
printf("added packet len %u\n", pkt->len);
+ pkt = realloc(pkt, sizeof(*pkt) + pkt->len);
pthread_mutex_lock(&pkt_buf_mtx);
- pkt_tail += ROUND_UP(sizeof(packet_t) + pkt->len, 16);
+ bytes_queued += pkt->len;
+ if (flow->last_packet)
+ flow->last_packet->next = pkt;
+ else
+ flow->first_packet = pkt;
+ flow->last_packet = pkt;
pthread_cond_signal(&pkt_cond_write);
pthread_mutex_unlock(&pkt_buf_mtx);
- PKT_DEBUG;
}
static packet_t *
get_packet(void)
{
- PKT_DEBUG;
-
- packet_t *pkt;
+ flow_info *min_flow;
+ unsigned n;
pthread_mutex_lock(&pkt_buf_mtx);
while (buf_allocated() == 0)
pthread_cond_wait(&pkt_cond_write, &pkt_buf_mtx);
- pkt = (packet_t *) (pkt_buf + pkt_head);
+
+ min_flow = NULL;
+ for (n = 0; n < NUM_FLOWS; ++n) {
+ flow_info *flow = &flows[n];
+ if (!flow->first_packet)
+ continue;
+ if (!min_flow) {
+ min_flow = flow;
+ continue;
+ }
+ if (flow->first_packet->time_to_send < min_flow->first_packet->time_to_send)
+ min_flow = flow;
+ }
+
+ packet_t *pkt = min_flow->first_packet;
+ min_flow->first_packet = pkt->next;
+ if (!min_flow->first_packet)
+ min_flow->last_packet = NULL;
+
pthread_mutex_unlock(&pkt_buf_mtx);
- PKT_DEBUG_OUT;
+ pkt->next = NULL;
return pkt;
}
static void
release_packet(packet_t *pkt)
{
- PKT_DEBUG;
-
pthread_mutex_lock(&pkt_buf_mtx);
- pkt_head += ROUND_UP(sizeof(packet_t) + pkt->len, 16);
- if (pkt_head >= pkt_tail && PKT_BUF_LEN - pkt_head < MIN_LEN)
- pkt_head = 0;
+ bytes_queued -= pkt->len;
pthread_cond_signal(&pkt_cond_read);
pthread_mutex_unlock(&pkt_buf_mtx);
- PKT_DEBUG;
+ free(pkt);
}
static void* writer_proc(void *ptr)
@@ -213,8 +216,9 @@ handle_tun(int fd)
{
packet_t *pkt;
pthread_t writer;
- uint64_t bytes_from_first_read = 0;
- uint64_t first_received_at = 0;
+ flow_info *flow;
+
+ memset(&flows, 0, sizeof(flows));
bytes2time_ratio = (double) 1000000.0 / rate_bytes;
@@ -229,6 +233,9 @@ handle_tun(int fd)
if (len < 0)
break;
+ struct iphdr *ip = (struct iphdr *) pkt->data;
+ flow = &flows[ip->daddr == htonl(0xc0a87f00)];
+
uint64_t curr_time = get_time_us();
uint64_t time_to_send;
@@ -243,25 +250,25 @@ handle_tun(int fd)
}
pkt->len = len;
- if (bytes_from_first_read == 0 || curr_time > first_received_at + bytes2time(bytes_from_first_read)) {
+ if (flow->bytes_from_first_read == 0
+ || curr_time > flow->first_received_at + bytes2time(flow->bytes_from_first_read)) {
time_to_send = curr_time;
- first_received_at = curr_time;
- bytes_from_first_read = len;
+ flow->first_received_at = curr_time;
+ flow->bytes_from_first_read = len;
} else {
- time_to_send = first_received_at + bytes2time(bytes_from_first_read);
- bytes_from_first_read += len;
+ time_to_send = flow->first_received_at + bytes2time(flow->bytes_from_first_read);
+ flow->bytes_from_first_read += len;
/* reduce values avoiding possible overflows */
- while (curr_time >= first_received_at + 1000000u && bytes_from_first_read >= rate_bytes) {
- first_received_at += 1000000u;
- bytes_from_first_read -= rate_bytes;
+ while (curr_time >= flow->first_received_at + 1000000u && flow->bytes_from_first_read >= rate_bytes) {
+ flow->first_received_at += 1000000u;
+ flow->bytes_from_first_read -= rate_bytes;
}
}
pkt->time_to_send = time_to_send + latency_us;
- struct iphdr *ip = (struct iphdr *) pkt->data;
u_int32_t addr = ip->saddr;
ip->saddr = ip->daddr;
ip->daddr = addr;
- add_packet(pkt);
+ add_packet(flow, pkt);
}
}