diff options
Diffstat (limited to 'salut/tests/twisted/avahitest.py')
-rw-r--r-- | salut/tests/twisted/avahitest.py | 269 |
1 files changed, 269 insertions, 0 deletions
diff --git a/salut/tests/twisted/avahitest.py b/salut/tests/twisted/avahitest.py new file mode 100644 index 000000000..eb68504cb --- /dev/null +++ b/salut/tests/twisted/avahitest.py @@ -0,0 +1,269 @@ +""" +Infrastructure for testing avahi +""" + +import servicetest +from servicetest import Event, TimeoutError +import dbus +import dbus.glib +import avahi +import os +import socket + +def get_server(): + bus = dbus.SystemBus() + server = dbus.Interface(bus.get_object(avahi.DBUS_NAME, + avahi.DBUS_PATH_SERVER), avahi.DBUS_INTERFACE_SERVER) + return server + +def get_host_name(): + return get_server().GetHostName() + +def get_host_name_fqdn(): + return get_server().GetHostNameFqdn() + +def get_domain_name(): + return get_server().GetDomainName() + +def txt_get_key(txt, key): + for x in txt: + if dbus.Byte('=') in x: + (rkey, value) = avahi.byte_array_to_string(x).split('=', 1) + if rkey == key: + return value + + return None + +class AvahiService: + def __init__(self, event_queue, bus, server, interface, protocol, name, + type, domain, aprotocol, flags): + + self.event_queue = event_queue + self.server = server + self.bus = bus + self.interface = interface + self.protocol = protocol + self.name = name + self.type = type + self.domain = domain + self.aprotocol = aprotocol + self.flags = flags + + def resolve(self): + resolver_path = self.server.ServiceResolverNew(self.interface, + self.protocol, self.name, self.type, self.domain, + self.aprotocol, self.flags) + + resolver_obj = self.bus.get_object(avahi.DBUS_NAME, resolver_path) + resolver = dbus.Interface(resolver_obj, + avahi.DBUS_INTERFACE_SERVICE_RESOLVER) + + resolver.connect_to_signal('Found', self._service_found) + resolver.connect_to_signal('Failed', self._service_failed) + + def _service_found(self, interface, protocol, name, stype, domain, + host_name, aprotocol, pt, port, txt, flags): + + e = Event('service-resolved', service = self, + interface = interface, protocol = protocol, name = name, + stype = stype, host_name = host_name, aprotocol = aprotocol, + pt = pt, port = port, txt = txt, flags = flags) + + self.event_queue.append(e) + + def _service_failed(self, error): + e = Event('service-failed', service = self, error = error) + self.event_queue.append(e) + +class AvahiListener: + def __init__(self, event_queue): + self.event_queue = event_queue + + self.bus = dbus.SystemBus() + self.server = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, + avahi.DBUS_PATH_SERVER), avahi.DBUS_INTERFACE_SERVER) + self.browsers = [] + + def _service_added_cb(self, interface, protocol, name, stype, domain, + flags): + + service = AvahiService(self.event_queue, self.bus, self.server, + interface, protocol, name, stype, + domain, protocol, 0) + + e = Event ('service-added', service = service, + interface=interface, protocol=protocol, name=name, stype=stype, + domain=domain, flags=flags) + + self.event_queue.append(e) + + def _service_removed_cb(self, interface, protocol, name, stype, domain, + flags): + + e = Event ('service-removed', + interface=interface, protocol=protocol, name=name, stype=stype, + domain=domain, flags=flags) + self.event_queue.append(e) + + def _all_for_now_cb(self): + self.event_queue.append(Event('service-all-for-now')) + + def listen_for_service(self, sname): + browser_path = self.server.ServiceBrowserNew(avahi.IF_UNSPEC, + avahi.PROTO_UNSPEC, sname, "local", dbus.UInt32(0)); + browser_obj = self.bus.get_object(avahi.DBUS_NAME, browser_path) + browser = dbus.Interface(browser_obj, + avahi.DBUS_INTERFACE_SERVICE_BROWSER) + + browser.connect_to_signal('ItemNew', self._service_added_cb) + browser.connect_to_signal('ItemRemove', self._service_removed_cb) + browser.connect_to_signal('AllForNow', self._all_for_now_cb) + + self.browsers.append(browser) + return self + +class AvahiRecordAnnouncer: + def __init__(self, name, clazz, type, data): + self.name = name + self.clazz = clazz + self.type = type + self.data = data + + self.bus = dbus.SystemBus() + self.server = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, + avahi.DBUS_PATH_SERVER), avahi.DBUS_INTERFACE_SERVER) + + entry_path = self.server.EntryGroupNew() + entry_obj = self.bus.get_object(avahi.DBUS_NAME, entry_path) + entry = dbus.Interface(entry_obj, + avahi.DBUS_INTERFACE_ENTRY_GROUP) + + entry.AddRecord(avahi.IF_UNSPEC, avahi.PROTO_INET, + dbus.UInt32(0), name, clazz, type, 120, data) + + entry.Commit() + + self.entry = entry + +class AvahiAnnouncer: + def __init__(self, name, type, port, txt, hostname=None, + proto=avahi.PROTO_INET): + self.name = name + self.type = type + self.port = port + self.txt = txt + self.proto = proto + + self.bus = dbus.SystemBus() + self.server = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, + avahi.DBUS_PATH_SERVER), avahi.DBUS_INTERFACE_SERVER) + + entry_path = self.server.EntryGroupNew() + entry_obj = self.bus.get_object(avahi.DBUS_NAME, entry_path) + entry = dbus.Interface(entry_obj, + avahi.DBUS_INTERFACE_ENTRY_GROUP) + + if hostname is None: + hostname = get_host_name_fqdn() + + entry.AddService(avahi.IF_UNSPEC, self.proto, + dbus.UInt32(0), name, type, get_domain_name(), hostname, + port, avahi.dict_to_txt_array(txt)) + entry.Commit() + + self.entry = entry + + def update(self, txt): + self.txt.update(txt) + + self.entry.UpdateServiceTxt(avahi.IF_UNSPEC, self.proto, + dbus.UInt32(0), self.name, self.type, get_domain_name(), + avahi.dict_to_txt_array(self.txt)) + + def set(self, txt): + self.txt = txt + self.entry.UpdateServiceTxt(avahi.IF_UNSPEC, self.proto, + dbus.UInt32(0), self.name, self.type, get_domain_name(), + avahi.dict_to_txt_array(self.txt)) + + def stop(self): + self.entry.Free() + +def check_ipv6_enabled(): + s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + + try: + s.bind(('::1', 0)) + except socket.error: + return False + + return True + +def has_another_llxmpp(): + if 'SALUT_TEST_REAL_AVAHI' not in os.environ: + return False + + q = servicetest.IteratingEventQueue() + + l = AvahiListener(q) + l.listen_for_service('_presence._tcp') + + has = False + + while True: + e = q.wait(['service']) + + if (e.type == 'service-added' and + e.flags & (avahi.LOOKUP_RESULT_LOCAL|avahi.LOOKUP_RESULT_OUR_OWN)): + has = True + elif e.type == 'service-all-for-now': + break + + return has + +def skip_if_another_llxmpp(): + if has_another_llxmpp(): + print ("SKIP: This test fails if there is another LL XMPP instance " + "running on the machine.") + # exiting 77 causes automake to consider the test to have been skipped + raise SystemExit(77) + +if __name__ == '__main__': + from twisted.internet import reactor + + txtdict = { "test0": "0", "test1": "1" } + + a = AvahiAnnouncer("test", "_test._tcp", 1234, txtdict) + + q = servicetest.IteratingEventQueue() + # Set verboseness if needed for debugging + # q.verbose = True + + l = AvahiListener(q) + l.listen_for_service("_test._tcp") + + while True: + e = q.expect ('service-added', stype='_test._tcp') + # Only care about services we announced ourselves + if e.flags & (avahi.LOOKUP_RESULT_LOCAL|avahi.LOOKUP_RESULT_OUR_OWN): + break + + assert "test" == e.name[0:len("test")] + + s = e.service + s.resolve() + + e = q.expect('service-resolved', service = s) + for (key, val ) in txtdict.iteritems(): + v = txt_get_key(e.txt, key) + assert v == val, (key, val, v) + + txtdict["test1"] = "2" + txtdict["test2"] = "2" + + a.update(txtdict) + + e = q.expect('service-resolved', service = s) + for (key, val ) in txtdict.iteritems(): + v = txt_get_key(e.txt, key) + assert v == val, (key, val, v) |