summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlon Levy <alevy@redhat.com>2011-09-04 18:44:50 +0300
committerAlon Levy <alevy@redhat.com>2011-09-04 18:44:50 +0300
commit99c9549a96bdb1e4c876cc0b9b52fe1aa57757c0 (patch)
treeca6efaac0c224eeeef5a94c96ce8e5cc73bf7a86
parent1e60f86302026184e9301ae6171654342458abc2 (diff)
adding --bandwidth-limit, but not working right. tester freezes uiHEADmaster
-rwxr-xr-xbandwidthmon127
-rwxr-xr-xproxy.py7
-rwxr-xr-xtest7
-rwxr-xr-xtest_bandwidth_limit39
-rwxr-xr-xtest_free_writing7
5 files changed, 158 insertions, 29 deletions
diff --git a/bandwidthmon b/bandwidthmon
index c5d9102..4f35ce9 100755
--- a/bandwidthmon
+++ b/bandwidthmon
@@ -7,7 +7,7 @@ import gtk
import time
import socket
-debug = False
+debug = True
def dprint(x):
global debug
if not debug:
@@ -18,16 +18,19 @@ class Bandwidth(object):
""" Compute average bandwidth over last X seconds
"""
- def __init__(self, window_size, average_callback = None):
+ def __init__(self, window_size, average_callback=None, limit=None):
self.window_size = window_size
+ self.limit = limit
self.average_callback = average_callback
self.reset()
- def add_packet(self, data):
+ def __str__(self):
+ return "<Bandwidth window:%d sec>" % (self.window_size)
+
+ def add_packet(self, now, data):
num_bytes = len(data)
# TODO - other stuff then just length
self.total += num_bytes
- now = time.time()
# add new datapoint
self.window.append((now, num_bytes))
self.remove_old(now)
@@ -55,15 +58,30 @@ class Bandwidth(object):
if self.average_callback:
self.average_callback(self)
+ def bytes_in_window(self):
+ # suboptimal - can update per packet
+ return sum(x for t, x in self.window)
+
def average(self):
# suboptimal
- return float(sum([x for t,x in self.window])) / self.window_size
+ return float(self.bytes_in_window()) / self.window_size
def global_average(self):
if self.start_time == self.last_time:
return 0
return float(self.total) / (self.last_time - self.start_time)
+ def bytes_sendable_in_limit(self, now, count):
+ """ Rate limiting model:
+ proxy wakes up and wants to send some bytes. It asks bandwidth how
+ many bytes it can send. Calculation assumes we want to send as many
+ bytes as we can and still hold to a Average_over_last_T_seconds max.
+ """
+ if not self.limit:
+ return count
+ self.remove_old(now)
+ return min(count, self.limit * self.window_size - self.bytes_in_window())
+
def rate_to_string(rate):
if rate == 0:
return '0'
@@ -74,13 +92,14 @@ def rate_to_string(rate):
return '%3.2f MiB' % (float(rate) / 1024 / 1024)
class UI(object):
- def __init__(self, pairs):
+ def __init__(self, pairs, bandwidth_limit):
"""
| hbox
vbox | in_label out_label
; reset_button
"""
self.pairs = pairs
+ self.bandwidth_limit = bandwidth_limit
self.window = gtk.Window()
self.vbox = gtk.VBox()
self.window.add(self.vbox)
@@ -109,28 +128,35 @@ class UI(object):
hbox1.add(label_avg_out)
hbox2.add(label_avg_in_from_reset)
hbox2.add(label_avg_out_from_reset)
- in_bw = Bandwidth(1, self.on_bw_average)
- out_bw = Bandwidth(1, self.on_bw_average)
+ in_bw = Bandwidth(1, self.on_bw_average, limit=self.bandwidth_limit)
+ out_bw = Bandwidth(1, self.on_bw_average, limit=self.bandwidth_limit)
self.port_in_bw[src] = in_bw
self.port_out_bw[src] = out_bw
self.port_in_bw[dst] = in_bw
self.port_out_bw[dst] = out_bw
# TODO - use a class
self.bw_to_ui[in_bw] = dict(label_avg=label_avg_in,
- label_avg_from_reset=label_avg_in_from_reset)
+ label_avg_from_reset=label_avg_in_from_reset,
+ hbox1=hbox1, hbox2=hbox2)
self.bw_to_ui[out_bw] = dict(label_avg=label_avg_out,
- label_avg_from_reset=label_avg_out_from_reset)
+ label_avg_from_reset=label_avg_out_from_reset,
+ hbox1=hbox1, hbox2=hbox2)
def reset(self, widget, data=None):
for bw in self.bw_to_ui.keys():
bw.reset()
def on_bw_average(self, bw):
- labels = self.bw_to_ui[bw]
- labels['label_avg'].set_label(rate_to_string(bw.average()))
- labels['label_avg_from_reset'].set_label(rate_to_string(bw.global_average()))
+ ui = self.bw_to_ui[bw]
+ label_avg = ui['label_avg']
+ label_avg.set_label(rate_to_string(bw.average()))
+ label_avg_from_reset = ui['label_avg_from_reset']
+ label_avg_from_reset.set_label(rate_to_string(bw.global_average()))
+ #for u in [label_avg, label_avg_from_reset, ui['hbox1'], ui['hbox2'], self.vbox]:
+ # u.queue_draw()
+ self.vbox.queue_draw()
- def add_packet(self, src, dst, data):
+ def add_packet(self, now, src, dst, data):
if src in self.port_in_bw:
bw = self.port_in_bw[src]
elif dst in self.port_out_bw:
@@ -139,14 +165,29 @@ class UI(object):
print "got packet for unmonitored src port %d (%d->%d %d#)" % (
src, src, dst, len(data))
return
- bw.add_packet(data)
+ bw.add_packet(now, data)
-def setup_proxy(ui, listen_port, listen_host, remote_addr, debug_proxy):
+ def bandwidth(self, src, dst):
+ if src in self.port_in_bw:
+ bw = self.port_in_bw[src]
+ elif dst in self.port_out_bw:
+ bw = self.port_out_bw[dst]
+ else:
+ raise Exception("%s: non existant src and dst" % (self, src, dst))
+ return bw
+
+# How much time to wait before retrying the send of data that
+# was over the limit
+LIMIT_RETRY_TIMEOUT_MS = 10
+def setup_proxy(ui, listen_port, listen_host, remote_addr, debug_proxy,
+ bandwidth_limit):
iterate_packets, handle_input, select_based_iterator, get_fds = proxy.make_proxy(
listen_port, remote_addr, listen_host, debug=debug_proxy)
assert(len(get_fds()) == 1) # only the accepting socket
accepter = get_fds()[0]
added_fds = set()
+ # queued data due to bandwidth limiting. We live in a single threaded world.
+ queued = {}
def on_new_fd(fd):
dprint("on_new_fd %s" % fd.fileno())
# Don't add glib.IO_OUT unless you mean it.
@@ -156,6 +197,25 @@ def setup_proxy(ui, listen_port, listen_host, remote_addr, debug_proxy):
glib.io_add_watch(fd.fileno(),
glib.IO_IN | glib.IO_HUP | glib.IO_ERR,
on_read, fd)
+ def resend((target, src, dst, now)):
+ key = (src, dst)
+ if key not in queued:
+ return
+ data = queued[key]
+ del queued[key]
+ if not now:
+ now = time.time()
+ sendable = ui.bandwidth(src, dst
+ ).bytes_sendable_in_limit(now, len(data))
+ if sendable == len(data):
+ target.send(data)
+ return
+ target.send(data[:sendable])
+ data = data[sendable:]
+ queued[key] = data
+ glib.timeout_add(LIMIT_RETRY_TIMEOUT_MS, resend,
+ (target, src, dst, None))
+
def on_read(glib_fd, condition, fd):
# lame check to find out the fd is closed
try:
@@ -177,9 +237,31 @@ def setup_proxy(ui, listen_port, listen_host, remote_addr, debug_proxy):
dprint("%s: result %s" % (fd.fileno(), repr(result)))
return fd == accepter
(src, dst, data, other, completer) = result
- completer()
- dprint("%s: result %d->%d #%d" % (fd.fileno(), src, dst, len(data)))
- ui.add_packet(src, dst, data)
+ now = time.time()
+ if bandwidth_limit:
+ # ugly - why access ui to get bw object?
+ sendable = ui.bandwidth(src, dst
+ ).bytes_sendable_in_limit(now, len(data))
+ else:
+ sendable = len(data)
+ if sendable == len(data):
+ completer()
+ dprint("%s: result %d->%d #%d" % (fd.fileno(), src, dst, len(data)))
+ ui.add_packet(now, src, dst, data)
+ else:
+ # policy time: do we split packets or not. Let's cut.
+ dprint("%s: %s->%s over bandwidth limit, sending %d/%d" % (
+ fd.fileno(), src, dst, sendable, len(data)))
+ key = (src, dst)
+ if key in queued:
+ dprint("%s: %s data increased from %d to %d" %
+ (fd.fileno(), key, len(queued[key]),
+ len(queued[key]) + len(data)))
+ # NB If this ever becomes a problem use an array (scatter/gather)
+ queued[key] = queued[key] + data
+ else:
+ queued[key] = data
+ resend((other, src, dst, now))
return True
def update_fds():
fds = get_fds()
@@ -199,6 +281,7 @@ def main():
# only allow same host for all ports - easier to implement.
p.add_argument('--remote-host', default='127.0.0.1')
p.add_argument('--listen-host', default='127.0.0.1')
+ p.add_argument('--bandwidth-limit', default=None, type=int)
p.add_argument('--debug')
args = p.parse_args(sys.argv[1:])
if len(args.listen_port) != len(args.remote_port):
@@ -206,11 +289,13 @@ def main():
sys.exit(1)
global debug
debug = not not args.debug
- ui = UI(zip(args.listen_port, args.remote_port))
+ ui = UI(pairs=zip(args.listen_port, args.remote_port),
+ bandwidth_limit=args.bandwidth_limit)
for listen_port, remote_port in zip(args.listen_port, args.remote_port):
setup_proxy(ui=ui, listen_port=listen_port, listen_host=args.listen_host,
remote_addr=(args.remote_host, remote_port),
- debug_proxy=debug_proxy)
+ debug_proxy=debug_proxy,
+ bandwidth_limit=args.bandwidth_limit)
gtk.main()
if __name__ == '__main__':
diff --git a/proxy.py b/proxy.py
index 320ae9f..312b056 100755
--- a/proxy.py
+++ b/proxy.py
@@ -151,6 +151,10 @@ def make_proxy(local_port, remote_addr, host = '127.0.0.1',
del open_socks[s]
return
def completer():
+ """
+ default completion of proxy action.
+ sends data to destination port.
+ """
if check_drop_next():
return
try:
@@ -163,7 +167,8 @@ def make_proxy(local_port, remote_addr, host = '127.0.0.1',
del open_socks[s]
assert(len(open_socks) == n - 1)
else:
- import pdb; pdb.set_trace()
+ print "Caught exception writing: %s" % repr(e)
+ #import pdb; pdb.set_trace()
return src, dst, data, other, completer
def select_based_iterator():
packet_iter = iterate_packets()
diff --git a/test b/test
deleted file mode 100755
index 6445a07..0000000
--- a/test
+++ /dev/null
@@ -1,7 +0,0 @@
-#!/bin/bash
-
-# Doesn't actually work but gives a usage example
-
-nc -l 9000 > /dev/null < /dev/zero &
-./bandwidthmon --listen-port 10000 --remote-port 9000 &
-nc localhost 10000 < /dev/zero > /dev/null
diff --git a/test_bandwidth_limit b/test_bandwidth_limit
new file mode 100755
index 0000000..8669768
--- /dev/null
+++ b/test_bandwidth_limit
@@ -0,0 +1,39 @@
+#!/bin/bash
+
+# Doesn't actually work but gives a usage example
+
+if [ "x$1" == "xpv" ]; then
+ nc -l 15000 | pv > /dev/null
+ exit 0
+fi
+
+if [ "x$1" == "xcat" ]; then
+ cat /dev/zero | nc localhost 13300
+ exit 0
+fi
+
+gnome-terminal --geometry 80x5 -e "./test_bandwidth_limit pv"
+PID1=$!
+echo PID1 = $PID1
+# --bandwidth-limit 1024
+./bandwidthmon --listen-port 13300 --remote-port 15000 --bandwidth-limit 1024 &
+PID2=$!
+sleep 1
+gnome-terminal --geometry 80x3 -e "./test_bandwidth_limit cat"
+PID3=$!
+echo PID3 = $PID3
+
+function sigint {
+ echo "SIGINT caught"
+ if [ "x$PID1" != "x" ]; then
+ kill $PID1
+ fi
+ kill $PID2
+ kill $PID3
+}
+
+# SIGINT
+trap sigint SIGINT
+
+wait
+echo "all processes exited"
diff --git a/test_free_writing b/test_free_writing
new file mode 100755
index 0000000..3299562
--- /dev/null
+++ b/test_free_writing
@@ -0,0 +1,7 @@
+#!/bin/bash
+
+# Doesn't actually work but gives a usage example
+
+gnome-terminal -e "nc -l 15000"
+./bandwidthmon --listen-port 13300 --remote-port 15000 &
+gnome-terminal -e "nc localhost 13300"