From f064c627efa2fa22c883e633c1e4373a20f71517 Mon Sep 17 00:00:00 2001 From: Alon Levy Date: Mon, 23 Aug 2010 19:43:39 +0300 Subject: clipboard tester --- tests/clipboard.py | 308 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 308 insertions(+) create mode 100755 tests/clipboard.py (limited to 'tests') diff --git a/tests/clipboard.py b/tests/clipboard.py new file mode 100755 index 0000000..f70aece --- /dev/null +++ b/tests/clipboard.py @@ -0,0 +1,308 @@ +""" +Usage: + run with --help + +Example invocations: + +Test client paste: + guest side, listen for connection, compare paste to socketed data. + python clipboard.py --server + host side, connect to other side, paste messages of 20000 bytes (defaults to 1000, settable with -N). + python clipboard.py --client --dest --paste -n 20000 + +To test guest paste, put --paste on guest invocation and remove from client. +""" + +import sys +import hashlib +import socket +import random +import os +import cPickle +import subprocess +import time + +first_port = 9876 +ports_tried = 10 + +N = 1000 + +verbose = False +opts = None # options set from command line + +global xsel_process +xsel_process = None + +def win_set_pastebuffer(msg, aType=None): + import win32clipboard as w + import win32con + if aType is None: + aType = win32con.CF_TEXT + w.OpenClipboard() + w.EmptyClipboard() + w.SetClipboardData(aType, msg) + w.CloseClipboard() + +def win_get_pastebuffer(): + import win32clipboard as w + import win32con + w.OpenClipboard() + d = w.GetClipboardData(win32con.CF_TEXT) + w.CloseClipboard() + return d + +def x11_get_pastebuffer(): + s=subprocess.Popen('xsel', stdout=subprocess.PIPE) + sel = s.stdout.read() + s.wait() + return sel + +def x11_set_pastebuffer(msg): + global spicec_window + global console_window + global xsel_process + if xsel_process: + if verbose: + print "killing %s" % xsel_process.pid + xsel_process.terminate() + xsel_process.wait() + xsel_process = subprocess.Popen('xsel', stdin=subprocess.PIPE) + xsel_process.stdin.write(msg) + xsel_process.stdin.close() + time.sleep(opts.target_pre_activate) + # now activate spice window so the text is actually sent to the guest + spicec_window.activate(0) # needs a timestamp, can't figure out the real one, zero works with a warning + time.sleep(opts.target_post_activate) + console_window.activate(0) + time.sleep(opts.console_post_activate) + # we kill xsel on the next iteration (thereby killing the pastebuffer) + +def get_both_windows(): + import wnck + import gtk + import gobject + def getonce(): + s = wnck.screen_get_default() + ws = s.get_windows() + return ws + w = [w for w in getonce() if w.get_name().startswith('SPICEc:0')] + # run gtk event loop long enough to register all window names + gobject.idle_add(gtk.main_quit) + gtk.main() + w = [w for w in getonce() if w.get_name().startswith('SPICEc:0')] + if len(w) != 1: + if len(w) > 1: + print "more then one spice window:", w + import pdb; pdb.set_trace() + else: + print "no spice windows" + import pdb; pdb.set_trace() + sys.exit(-1) + return w[0], wnck.screen_get_default().get_active_window() + +if sys.platform == 'win32': + set_pastebuffer = win_set_pastebuffer + get_pastebuffer = win_get_pastebuffer +else: + global spicec_window + global console_window + spicec_window, console_window = get_both_windows() + set_pastebuffer = x11_set_pastebuffer + get_pastebuffer = x11_get_pastebuffer + +def compute_md5(msg): + return hashlib.md5(msg).hexdigest() + +def set_and_md5(msg): + h = compute_md5(msg) + set_pastebuffer(msg) + return h + +def paste_and_md5_random_string(n): + s = random.randint(0,1000) + msg = ''.join([str(x+s) for x in xrange(n)])[:n] + return set_and_md5(msg), msg + +def getsock(HOST, PORT): + s = None + for res in socket.getaddrinfo(HOST, PORT, socket.AF_UNSPEC, socket.SOCK_STREAM): + af, socktype, proto, canonname, sa = res + try: + s = socket.socket(af, socktype, proto) + except socket.error, msg: + s = None + continue + try: + s.connect(sa) + except socket.error, msg: + s.close() + s = None + continue + break + return s + +def client(opts): + print "starting client - %s:%s-%s" % (opts.dest, first_port, first_port+ports_tried-1) + for p in xrange(first_port, first_port + ports_tried): + s=getsock(opts.dest, p) + if s: + break + print "port = %s" % p + if not s: + print "could not open socket" + exit(-1) + return s + +def paste_many(s, n, N): + print ">>> start pasting <<<" + for i in xrange(N): + if i % 10 == 0: + print "%s/%s" % (i, N) + md5, msg = paste_and_md5_random_string(n) + s.send(cPickle.dumps((md5,msg)) + '\n') + answer = cPickle.loads(s.recv(1024)) + if answer != md5: + print "%s: failed: sent %s, got %s" % (i, md5, answer) + break + print "%s passed" % N + +def sign_pasted(s, paste=False): + print ">>> start signing <<<" + i = 0 + last_paste = '' + while True: + i += 1 + pasted_md5, pasted_from_file = pickle_from_socket(s) + pastebuffer = get_pastebuffer() + waits = 0 + # sometimes it takes a while for the new paste buffer to register + # - needs further investigation + while ((last_paste == pastebuffer + or not set(pastebuffer) <= numbers) + and waits < 30): + print "!",pastebuffer[:20] + waits += 1 + time.sleep(0.2) + pastebuffer = get_pastebuffer() + last_paste = pastebuffer + + if pastebuffer != pasted_from_file: + import pdb; pdb.set_trace() + computed_md5 = compute_md5(pastebuffer) + s.send(cPickle.dumps(computed_md5) + '\n') + if computed_md5 != pasted_md5: + print "%s: error on text of length %s" % (i, len(pastebuffer)) + import pdb; pdb.set_trace() + if verbose: + print i + +def getopts(): + global verbose + import optparse + import sys + parser = optparse.OptionParser() + parser.add_option('--paste', action='store_true', help="do paste to other side (if not present sign other side and send back)") + parser.add_option('--server', action='store_true', help="listen to socket") + parser.add_option('--client', action='store_true', help="connect to socket") + parser.add_option('--paste1', action='store_true', help="do a single send") + parser.add_option('--sleep1', type='float', help="sleep after paste1") + parser.add_option('--get1', action='store_true', help="do a single read") + parser.add_option('--dest', help="destination address for client") + parser.add_option('-N', default=N, type='int', help="set number of iterations") + parser.add_option('-n', default=100, type='int', help="set length of test strings") + parser.add_option('-v', action='count', default=False, help="be more verbose") + parser.add_option('--target_pre_activate', type='float', default=0.1, help='time to sleep before target activation') + parser.add_option('--target_post_activate', type='float', default=0.1, help='time to sleep after target activation') + parser.add_option('--console_post_activate', type='float', default=0.1, help='time to sleep before console activation') + opts, rest = parser.parse_args(sys.argv[1:]) + if opts.v is not None: + verbose = opts.v + is_server = opts.server + if opts.client and opts.dest is None: + parser.print_help() + raise SystemExit + return is_server, opts + +def accept_one(port): + HOST = None + PORT = port + s = None + for res in socket.getaddrinfo(HOST, PORT, socket.AF_UNSPEC, + socket.SOCK_STREAM, 0, socket.AI_PASSIVE): + af, socktype, proto, canonname, sa = res + try: + s = socket.socket(af, socktype, proto) + except socket.error, msg: + print msg + s = None + continue + try: + s.bind(sa) + s.listen(1) + except socket.error, msg: + print msg + s.close() + s = None + continue + break + if s is None: + print 'could not open socket' + return None + conn, addr = s.accept() + print 'Connected by', addr + return conn + +numbers = set('0123456789') + +def pickle_from_socket(s): + """ for buffers larger then a certain size you get multiple + intermediate results from recv - concat them together """ + getmore = True + r = [] + while getmore: + r.append(s.recv(1024*1024)) + if r[-1] == '': + print "socket closed, exiting" + sys.exit(0) + try: + res = cPickle.loads(''.join(r)) + except: + pass + else: + break + return res + +def run_socket(s, opts): + if opts.paste: + paste_many(s, n=opts.n, N=opts.N) + else: + sign_pasted(s) + +def run_client(opts): + run_socket(client(opts), opts=opts) + +def run_server(opts): + import sys + for p in xrange(first_port, first_port+ports_tried): + s = accept_one(p) + if s: + break + assert(s is not None) + print "port = %s" % p + run_socket(s, opts) + +if __name__ == '__main__': + is_server, opts = getopts() + if opts.paste1: + paste_and_md5_random_string(opts.n) + if opts.sleep1: + time.sleep(opts.sleep1) + elif opts.get1: + pastebuffer = get_pastebuffer() + computed_md5 = compute_md5(pastebuffer) + print "got %d bytes, md5 = %s" % (len(pastebuffer), computed_md5) + elif is_server: + run_server(opts) + else: + run_client(opts) + -- cgit v1.2.3