diff options
author | Alon Levy <alevy@redhat.com> | 2011-09-04 18:44:50 +0300 |
---|---|---|
committer | Alon Levy <alevy@redhat.com> | 2011-09-04 18:44:50 +0300 |
commit | 99c9549a96bdb1e4c876cc0b9b52fe1aa57757c0 (patch) | |
tree | ca6efaac0c224eeeef5a94c96ce8e5cc73bf7a86 | |
parent | 1e60f86302026184e9301ae6171654342458abc2 (diff) |
-rwxr-xr-x | bandwidthmon | 127 | ||||
-rwxr-xr-x | proxy.py | 7 | ||||
-rwxr-xr-x | test | 7 | ||||
-rwxr-xr-x | test_bandwidth_limit | 39 | ||||
-rwxr-xr-x | test_free_writing | 7 |
5 files changed, 158 insertions, 29 deletions
diff --git a/bandwidthmon b/bandwidthmon index c5d9102..4f35ce9 100755 --- a/bandwidthmon +++ b/bandwidthmon @@ -7,7 +7,7 @@ import gtk import time import socket -debug = False +debug = True def dprint(x): global debug if not debug: @@ -18,16 +18,19 @@ class Bandwidth(object): """ Compute average bandwidth over last X seconds """ - def __init__(self, window_size, average_callback = None): + def __init__(self, window_size, average_callback=None, limit=None): self.window_size = window_size + self.limit = limit self.average_callback = average_callback self.reset() - def add_packet(self, data): + def __str__(self): + return "<Bandwidth window:%d sec>" % (self.window_size) + + def add_packet(self, now, data): num_bytes = len(data) # TODO - other stuff then just length self.total += num_bytes - now = time.time() # add new datapoint self.window.append((now, num_bytes)) self.remove_old(now) @@ -55,15 +58,30 @@ class Bandwidth(object): if self.average_callback: self.average_callback(self) + def bytes_in_window(self): + # suboptimal - can update per packet + return sum(x for t, x in self.window) + def average(self): # suboptimal - return float(sum([x for t,x in self.window])) / self.window_size + return float(self.bytes_in_window()) / self.window_size def global_average(self): if self.start_time == self.last_time: return 0 return float(self.total) / (self.last_time - self.start_time) + def bytes_sendable_in_limit(self, now, count): + """ Rate limiting model: + proxy wakes up and wants to send some bytes. It asks bandwidth how + many bytes it can send. Calculation assumes we want to send as many + bytes as we can and still hold to a Average_over_last_T_seconds max. + """ + if not self.limit: + return count + self.remove_old(now) + return min(count, self.limit * self.window_size - self.bytes_in_window()) + def rate_to_string(rate): if rate == 0: return '0' @@ -74,13 +92,14 @@ def rate_to_string(rate): return '%3.2f MiB' % (float(rate) / 1024 / 1024) class UI(object): - def __init__(self, pairs): + def __init__(self, pairs, bandwidth_limit): """ | hbox vbox | in_label out_label ; reset_button """ self.pairs = pairs + self.bandwidth_limit = bandwidth_limit self.window = gtk.Window() self.vbox = gtk.VBox() self.window.add(self.vbox) @@ -109,28 +128,35 @@ class UI(object): hbox1.add(label_avg_out) hbox2.add(label_avg_in_from_reset) hbox2.add(label_avg_out_from_reset) - in_bw = Bandwidth(1, self.on_bw_average) - out_bw = Bandwidth(1, self.on_bw_average) + in_bw = Bandwidth(1, self.on_bw_average, limit=self.bandwidth_limit) + out_bw = Bandwidth(1, self.on_bw_average, limit=self.bandwidth_limit) self.port_in_bw[src] = in_bw self.port_out_bw[src] = out_bw self.port_in_bw[dst] = in_bw self.port_out_bw[dst] = out_bw # TODO - use a class self.bw_to_ui[in_bw] = dict(label_avg=label_avg_in, - label_avg_from_reset=label_avg_in_from_reset) + label_avg_from_reset=label_avg_in_from_reset, + hbox1=hbox1, hbox2=hbox2) self.bw_to_ui[out_bw] = dict(label_avg=label_avg_out, - label_avg_from_reset=label_avg_out_from_reset) + label_avg_from_reset=label_avg_out_from_reset, + hbox1=hbox1, hbox2=hbox2) def reset(self, widget, data=None): for bw in self.bw_to_ui.keys(): bw.reset() def on_bw_average(self, bw): - labels = self.bw_to_ui[bw] - labels['label_avg'].set_label(rate_to_string(bw.average())) - labels['label_avg_from_reset'].set_label(rate_to_string(bw.global_average())) + ui = self.bw_to_ui[bw] + label_avg = ui['label_avg'] + label_avg.set_label(rate_to_string(bw.average())) + label_avg_from_reset = ui['label_avg_from_reset'] + label_avg_from_reset.set_label(rate_to_string(bw.global_average())) + #for u in [label_avg, label_avg_from_reset, ui['hbox1'], ui['hbox2'], self.vbox]: + # u.queue_draw() + self.vbox.queue_draw() - def add_packet(self, src, dst, data): + def add_packet(self, now, src, dst, data): if src in self.port_in_bw: bw = self.port_in_bw[src] elif dst in self.port_out_bw: @@ -139,14 +165,29 @@ class UI(object): print "got packet for unmonitored src port %d (%d->%d %d#)" % ( src, src, dst, len(data)) return - bw.add_packet(data) + bw.add_packet(now, data) -def setup_proxy(ui, listen_port, listen_host, remote_addr, debug_proxy): + def bandwidth(self, src, dst): + if src in self.port_in_bw: + bw = self.port_in_bw[src] + elif dst in self.port_out_bw: + bw = self.port_out_bw[dst] + else: + raise Exception("%s: non existant src and dst" % (self, src, dst)) + return bw + +# How much time to wait before retrying the send of data that +# was over the limit +LIMIT_RETRY_TIMEOUT_MS = 10 +def setup_proxy(ui, listen_port, listen_host, remote_addr, debug_proxy, + bandwidth_limit): iterate_packets, handle_input, select_based_iterator, get_fds = proxy.make_proxy( listen_port, remote_addr, listen_host, debug=debug_proxy) assert(len(get_fds()) == 1) # only the accepting socket accepter = get_fds()[0] added_fds = set() + # queued data due to bandwidth limiting. We live in a single threaded world. + queued = {} def on_new_fd(fd): dprint("on_new_fd %s" % fd.fileno()) # Don't add glib.IO_OUT unless you mean it. @@ -156,6 +197,25 @@ def setup_proxy(ui, listen_port, listen_host, remote_addr, debug_proxy): glib.io_add_watch(fd.fileno(), glib.IO_IN | glib.IO_HUP | glib.IO_ERR, on_read, fd) + def resend((target, src, dst, now)): + key = (src, dst) + if key not in queued: + return + data = queued[key] + del queued[key] + if not now: + now = time.time() + sendable = ui.bandwidth(src, dst + ).bytes_sendable_in_limit(now, len(data)) + if sendable == len(data): + target.send(data) + return + target.send(data[:sendable]) + data = data[sendable:] + queued[key] = data + glib.timeout_add(LIMIT_RETRY_TIMEOUT_MS, resend, + (target, src, dst, None)) + def on_read(glib_fd, condition, fd): # lame check to find out the fd is closed try: @@ -177,9 +237,31 @@ def setup_proxy(ui, listen_port, listen_host, remote_addr, debug_proxy): dprint("%s: result %s" % (fd.fileno(), repr(result))) return fd == accepter (src, dst, data, other, completer) = result - completer() - dprint("%s: result %d->%d #%d" % (fd.fileno(), src, dst, len(data))) - ui.add_packet(src, dst, data) + now = time.time() + if bandwidth_limit: + # ugly - why access ui to get bw object? + sendable = ui.bandwidth(src, dst + ).bytes_sendable_in_limit(now, len(data)) + else: + sendable = len(data) + if sendable == len(data): + completer() + dprint("%s: result %d->%d #%d" % (fd.fileno(), src, dst, len(data))) + ui.add_packet(now, src, dst, data) + else: + # policy time: do we split packets or not. Let's cut. + dprint("%s: %s->%s over bandwidth limit, sending %d/%d" % ( + fd.fileno(), src, dst, sendable, len(data))) + key = (src, dst) + if key in queued: + dprint("%s: %s data increased from %d to %d" % + (fd.fileno(), key, len(queued[key]), + len(queued[key]) + len(data))) + # NB If this ever becomes a problem use an array (scatter/gather) + queued[key] = queued[key] + data + else: + queued[key] = data + resend((other, src, dst, now)) return True def update_fds(): fds = get_fds() @@ -199,6 +281,7 @@ def main(): # only allow same host for all ports - easier to implement. p.add_argument('--remote-host', default='127.0.0.1') p.add_argument('--listen-host', default='127.0.0.1') + p.add_argument('--bandwidth-limit', default=None, type=int) p.add_argument('--debug') args = p.parse_args(sys.argv[1:]) if len(args.listen_port) != len(args.remote_port): @@ -206,11 +289,13 @@ def main(): sys.exit(1) global debug debug = not not args.debug - ui = UI(zip(args.listen_port, args.remote_port)) + ui = UI(pairs=zip(args.listen_port, args.remote_port), + bandwidth_limit=args.bandwidth_limit) for listen_port, remote_port in zip(args.listen_port, args.remote_port): setup_proxy(ui=ui, listen_port=listen_port, listen_host=args.listen_host, remote_addr=(args.remote_host, remote_port), - debug_proxy=debug_proxy) + debug_proxy=debug_proxy, + bandwidth_limit=args.bandwidth_limit) gtk.main() if __name__ == '__main__': @@ -151,6 +151,10 @@ def make_proxy(local_port, remote_addr, host = '127.0.0.1', del open_socks[s] return def completer(): + """ + default completion of proxy action. + sends data to destination port. + """ if check_drop_next(): return try: @@ -163,7 +167,8 @@ def make_proxy(local_port, remote_addr, host = '127.0.0.1', del open_socks[s] assert(len(open_socks) == n - 1) else: - import pdb; pdb.set_trace() + print "Caught exception writing: %s" % repr(e) + #import pdb; pdb.set_trace() return src, dst, data, other, completer def select_based_iterator(): packet_iter = iterate_packets() @@ -1,7 +0,0 @@ -#!/bin/bash - -# Doesn't actually work but gives a usage example - -nc -l 9000 > /dev/null < /dev/zero & -./bandwidthmon --listen-port 10000 --remote-port 9000 & -nc localhost 10000 < /dev/zero > /dev/null diff --git a/test_bandwidth_limit b/test_bandwidth_limit new file mode 100755 index 0000000..8669768 --- /dev/null +++ b/test_bandwidth_limit @@ -0,0 +1,39 @@ +#!/bin/bash + +# Doesn't actually work but gives a usage example + +if [ "x$1" == "xpv" ]; then + nc -l 15000 | pv > /dev/null + exit 0 +fi + +if [ "x$1" == "xcat" ]; then + cat /dev/zero | nc localhost 13300 + exit 0 +fi + +gnome-terminal --geometry 80x5 -e "./test_bandwidth_limit pv" +PID1=$! +echo PID1 = $PID1 +# --bandwidth-limit 1024 +./bandwidthmon --listen-port 13300 --remote-port 15000 --bandwidth-limit 1024 & +PID2=$! +sleep 1 +gnome-terminal --geometry 80x3 -e "./test_bandwidth_limit cat" +PID3=$! +echo PID3 = $PID3 + +function sigint { + echo "SIGINT caught" + if [ "x$PID1" != "x" ]; then + kill $PID1 + fi + kill $PID2 + kill $PID3 +} + +# SIGINT +trap sigint SIGINT + +wait +echo "all processes exited" diff --git a/test_free_writing b/test_free_writing new file mode 100755 index 0000000..3299562 --- /dev/null +++ b/test_free_writing @@ -0,0 +1,7 @@ +#!/bin/bash + +# Doesn't actually work but gives a usage example + +gnome-terminal -e "nc -l 15000" +./bandwidthmon --listen-port 13300 --remote-port 15000 & +gnome-terminal -e "nc localhost 13300" |