summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGuillaume Desmottes <guillaume.desmottes@collabora.co.uk>2013-10-01 13:27:16 +0200
committerGuillaume Desmottes <guillaume.desmottes@collabora.co.uk>2013-10-01 16:52:50 +0200
commitdcd961bb4d780cf88d018f25fa774993136c634e (patch)
treeb5966a0f427ac9372461842dc325110a7675118c
parentb1c0f935de0ada068294fa155ce87caff106cc61 (diff)
sync servicetest.py with Gabble
https://bugs.freedesktop.org/show_bug.cgi?id=69995
-rw-r--r--tests/twisted/servicetest.py287
1 files changed, 260 insertions, 27 deletions
diff --git a/tests/twisted/servicetest.py b/tests/twisted/servicetest.py
index b871e1b..c464d8f 100644
--- a/tests/twisted/servicetest.py
+++ b/tests/twisted/servicetest.py
@@ -1,6 +1,23 @@
+# Copyright (C) 2009 Nokia Corporation
+# Copyright (C) 2009-2013 Collabora Ltd.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+# 02110-1301 USA
"""
-Infrastructure code for testing connection managers.
+Infrastructure code for testing Telepathy services.
"""
from twisted.internet import glib2reactor
@@ -8,29 +25,55 @@ from twisted.internet.protocol import Protocol, Factory, ClientFactory
glib2reactor.install()
import sys
import time
+import os
import pprint
import unittest
-import dbus.glib
+import dbus
+import dbus.lowlevel
+from dbus.mainloop.glib import DBusGMainLoop
+DBusGMainLoop(set_as_default=True)
from twisted.internet import reactor
import constants as cs
-tp_name_prefix = 'org.freedesktop.Telepathy'
-tp_path_prefix = '/org/freedesktop/Telepathy'
+tp_name_prefix = cs.PREFIX
+tp_path_prefix = cs.PATH_PREFIX
-class Event:
+class DictionarySupersetOf (object):
+ """Utility class for expecting "a dictionary with at least these keys"."""
+ def __init__(self, dictionary):
+ self._dictionary = dictionary
+ def __repr__(self):
+ return "DictionarySupersetOf(%s)" % self._dictionary
+ def __eq__(self, other):
+ """would like to just do:
+ return set(other.items()).issuperset(self._dictionary.items())
+ but it turns out that this doesn't work if you have another dict
+ nested in the values of your dicts"""
+ try:
+ for k,v in self._dictionary.items():
+ if k not in other or other[k] != v:
+ return False
+ return True
+ except TypeError: # other is not iterable
+ return False
+
+class Event(object):
def __init__(self, type, **kw):
self.__dict__.update(kw)
self.type = type
(self.subqueue, self.subtype) = type.split ("-", 1)
+ def __str__(self):
+ return '\n'.join([ str(type(self)) ] + format_event(self))
+
def format_event(event):
ret = ['- type %s' % event.type]
- for key in dir(event):
+ for key in sorted(dir(event)):
if key != 'type' and not key.startswith('_'):
ret.append('- %s: %s' % (
key, pprint.pformat(getattr(event, key))))
@@ -79,6 +122,14 @@ class EventPattern:
class TimeoutError(Exception):
pass
+class ForbiddenEventOccurred(Exception):
+ def __init__(self, event):
+ Exception.__init__(self)
+ self.event = event
+
+ def __str__(self):
+ return '\n' + '\n'.join(format_event(self.event))
+
class BaseEventQueue:
"""Abstract event queue base class.
@@ -124,13 +175,16 @@ class BaseEventQueue:
"""
self.forbidden_events.difference_update(set(patterns))
+ def unforbid_all(self):
+ """
+ Remove all patterns from the set of forbidden events.
+ """
+ self.forbidden_events.clear()
+
def _check_forbidden(self, event):
for e in self.forbidden_events:
if e.match(event):
- print "forbidden event occurred:"
- for x in format_event(event):
- print x
- assert False
+ raise ForbiddenEventOccurred(event)
def expect(self, type, **kw):
"""
@@ -255,6 +309,11 @@ class IteratingEventQueue(BaseEventQueue):
def __init__(self, timeout=None):
BaseEventQueue.__init__(self, timeout)
+ self._dbus_method_impls = []
+ self._buses = []
+ # a message filter which will claim we handled everything
+ self._dbus_dev_null = \
+ lambda bus, message: dbus.lowlevel.HANDLER_RESULT_HANDLED
def wait(self, queues=None):
stop = [False]
@@ -279,6 +338,127 @@ class IteratingEventQueue(BaseEventQueue):
else:
raise TimeoutError
+ def add_dbus_method_impl(self, cb, bus=None, **kwargs):
+ if bus is None:
+ bus = self._buses[0]
+
+ self._dbus_method_impls.append(
+ (EventPattern('dbus-method-call', **kwargs), cb))
+
+ def dbus_emit(self, path, iface, name, *a, **k):
+ bus = k.pop('bus', self._buses[0])
+ assert 'signature' in k, k
+ message = dbus.lowlevel.SignalMessage(path, iface, name)
+ message.append(*a, **k)
+ bus.send_message(message)
+
+ def dbus_return(self, in_reply_to, *a, **k):
+ bus = k.pop('bus', self._buses[0])
+ assert 'signature' in k, k
+ reply = dbus.lowlevel.MethodReturnMessage(in_reply_to)
+ reply.append(*a, **k)
+ bus.send_message(reply)
+
+ def dbus_raise(self, in_reply_to, name, message=None, bus=None):
+ if bus is None:
+ bus = self._buses[0]
+
+ reply = dbus.lowlevel.ErrorMessage(in_reply_to, name, message)
+ bus.send_message(reply)
+
+ def attach_to_bus(self, bus):
+ if not self._buses:
+ # first-time setup
+ self._dbus_filter_bound_method = self._dbus_filter
+
+ self._buses.append(bus)
+
+ # Only subscribe to messages on the first bus connection (assumed to
+ # be the shared session bus connection used by the simulated connection
+ # manager and most of the test suite), not on subsequent bus
+ # connections (assumed to represent extra clients).
+ #
+ # When we receive a method call on the other bus connections, ignore
+ # it - the eavesdropping filter installed on the first bus connection
+ # will see it too.
+ #
+ # This is highly counter-intuitive, but it means our messages are in
+ # a guaranteed order (we don't have races between messages arriving on
+ # various connections).
+ if len(self._buses) > 1:
+ bus.add_message_filter(self._dbus_dev_null)
+ return
+
+ try:
+ # for dbus > 1.5
+ bus.add_match_string("eavesdrop=true,type='signal'")
+ except dbus.DBusException:
+ bus.add_match_string("type='signal'")
+ bus.add_match_string("type='method_call'")
+ else:
+ bus.add_match_string("eavesdrop=true,type='method_call'")
+
+ bus.add_message_filter(self._dbus_filter_bound_method)
+
+ bus.add_signal_receiver(
+ lambda *args, **kw:
+ self.append(
+ Event('dbus-signal',
+ path=unwrap(kw['path']),
+ signal=kw['member'],
+ args=map(unwrap, args),
+ interface=kw['interface'])),
+ None,
+ None,
+ None,
+ path_keyword='path',
+ member_keyword='member',
+ interface_keyword='interface',
+ byte_arrays=True,
+ )
+
+ def cleanup(self):
+ if self._buses:
+ self._buses[0].remove_message_filter(self._dbus_filter_bound_method)
+ for bus in self._buses[1:]:
+ bus.remove_message_filter(self._dbus_dev_null)
+
+ self._buses = []
+ self._dbus_method_impls = []
+
+ def _dbus_filter(self, bus, message):
+ if isinstance(message, dbus.lowlevel.MethodCallMessage):
+
+ destination = message.get_destination()
+ sender = message.get_sender()
+
+ if (destination == 'org.freedesktop.DBus' or
+ sender == self._buses[0].get_unique_name()):
+ # suppress reply and don't make an Event
+ return dbus.lowlevel.HANDLER_RESULT_HANDLED
+
+ e = Event('dbus-method-call', message=message,
+ interface=message.get_interface(), path=message.get_path(),
+ raw_args=message.get_args_list(byte_arrays=True),
+ args=map(unwrap, message.get_args_list(byte_arrays=True)),
+ destination=str(destination),
+ method=message.get_member(),
+ sender=message.get_sender(),
+ handled=False)
+
+ for pair in self._dbus_method_impls:
+ pattern, cb = pair
+ if pattern.match(e):
+ cb(e)
+ e.handled = True
+ break
+
+ self.append(e)
+
+ return dbus.lowlevel.HANDLER_RESULT_HANDLED
+
+ return dbus.lowlevel.HANDLER_RESULT_NOT_YET_HANDLED
+
class TestEventQueue(BaseEventQueue):
def __init__(self, events):
BaseEventQueue.__init__(self)
@@ -389,17 +569,18 @@ def call_async(test, proxy, method, *args, **kw):
kw.update({'reply_handler': reply_func, 'error_handler': error_func})
method_proxy(*args, **kw)
-def sync_dbus(bus, q, conn):
- # Dummy D-Bus method call
- # This won't do the right thing unless the proxy has a unique name.
- assert conn.object.bus_name.startswith(':')
- root_object = bus.get_object(conn.object.bus_name, '/')
- call_async(
- q, dbus.Interface(root_object, 'org.freedesktop.DBus.Peer'), 'Ping')
- q.expect('dbus-return', method='Ping')
+def sync_dbus(bus, q, proxy):
+ # Dummy D-Bus method call. We can't use DBus.Peer.Ping() because libdbus
+ # replies to that message immediately, rather than handing it up to
+ # dbus-glib and thence the application, which means that Ping()ing the
+ # application doesn't ensure that it's processed all D-Bus messages prior
+ # to our ping.
+ call_async(q, dbus.Interface(proxy, 'org.freedesktop.Telepathy.Tests'),
+ 'DummySyncDBus')
+ q.expect('dbus-error', method='DummySyncDBus')
class ProxyWrapper:
- def __init__(self, object, default, others):
+ def __init__(self, object, default, others={}):
self.object = object
self.default_interface = dbus.Interface(object, default)
self.Properties = dbus.Interface(object, dbus.PROPERTIES_IFACE)
@@ -418,12 +599,29 @@ class ProxyWrapper:
return getattr(self.default_interface, name)
+class ConnWrapper(ProxyWrapper):
+ def inspect_contact_sync(self, handle):
+ return self.inspect_contacts_sync([handle])[0]
+
+ def inspect_contacts_sync(self, handles):
+ h2asv = self.Contacts.GetContactAttributes(handles, [], True)
+ ret = []
+ for h in handles:
+ ret.append(h2asv[h][cs.ATTR_CONTACT_ID])
+ return ret
+
+ def get_contact_handle_sync(self, identifier):
+ return self.Contacts.GetContactByID(identifier, [])[0]
+
+ def get_contact_handles_sync(self, ids):
+ return [self.get_contact_handle_sync(i) for i in ids]
+
def wrap_connection(conn):
- return ProxyWrapper(conn, tp_name_prefix + '.Connection',
+ return ConnWrapper(conn, tp_name_prefix + '.Connection',
dict([
(name, tp_name_prefix + '.Connection.Interface.' + name)
for name in ['Aliasing', 'Avatars', 'Capabilities', 'Contacts',
- 'Presence', 'SimplePresence', 'Requests']] +
+ 'SimplePresence', 'Requests']] +
[('Peer', 'org.freedesktop.DBus.Peer'),
('ContactCapabilities', cs.CONN_IFACE_CONTACT_CAPS),
('ContactInfo', cs.CONN_IFACE_CONTACT_INFO),
@@ -432,6 +630,8 @@ def wrap_connection(conn):
('MailNotification', cs.CONN_IFACE_MAIL_NOTIFICATION),
('ContactList', cs.CONN_IFACE_CONTACT_LIST),
('ContactGroups', cs.CONN_IFACE_CONTACT_GROUPS),
+ ('PowerSaving', cs.CONN_IFACE_POWER_SAVING),
+ ('Addressing', cs.CONN_IFACE_ADDRESSING),
]))
def wrap_channel(chan, type_, extra=None):
@@ -447,14 +647,26 @@ def wrap_channel(chan, type_, extra=None):
return ProxyWrapper(chan, tp_name_prefix + '.Channel', interfaces)
+
+def wrap_content(chan, extra=None):
+ interfaces = { }
+
+ if extra:
+ interfaces.update(dict([
+ (name, tp_name_prefix + '.Call1.Content.Interface.' + name)
+ for name in extra]))
+
+ return ProxyWrapper(chan, tp_name_prefix + '.Call1.Content', interfaces)
+
def make_connection(bus, event_func, name, proto, params):
cm = bus.get_object(
tp_name_prefix + '.ConnectionManager.%s' % name,
- tp_path_prefix + '/ConnectionManager/%s' % name)
+ tp_path_prefix + '/ConnectionManager/%s' % name,
+ introspect=False)
cm_iface = dbus.Interface(cm, tp_name_prefix + '.ConnectionManager')
connection_name, connection_path = cm_iface.RequestConnection(
- proto, params)
+ proto, dbus.Dictionary(params, signature='sv'))
conn = wrap_connection(bus.get_object(connection_name, connection_path))
return conn
@@ -470,6 +682,7 @@ def make_channel_proxy(conn, path, iface):
class EventProtocol(Protocol):
def __init__(self, queue=None, block_reading=False):
self.queue = queue
+ self.block_reading = block_reading
def dataReceived(self, data):
if self.queue is not None:
@@ -566,6 +779,12 @@ def assertFlagsUnset(flags, value):
"expected none of flags %u, but %u are set in %u" % (
flags, masked, value))
+def assertDBusError(name, error):
+ if error.get_dbus_name() != name:
+ raise AssertionError(
+ "expected DBus error named:\n %s\ngot:\n %s\n(with message: %s)"
+ % (name, error.get_dbus_name(), error.message))
+
def install_colourer():
def red(s):
return '\x1b[31m%s\x1b[0m' % s
@@ -584,12 +803,26 @@ def install_colourer():
self.patterns = patterns
def write(self, s):
- f = self.patterns.get(s, lambda x: x)
- self.fh.write(f(s))
+ for p, f in self.patterns.items():
+ if s.startswith(p):
+ self.fh.write(f(p) + s[len(p):])
+ return
+
+ self.fh.write(s)
sys.stdout = Colourer(sys.stdout, patterns)
return sys.stdout
-if __name__ == '__main__':
- unittest.main()
+# this is just to shut up unittest.
+class DummyStream(object):
+ def write(self, s):
+ if 'CHECK_TWISTED_VERBOSE' in os.environ:
+ print s,
+ def flush(self):
+ pass
+
+if __name__ == '__main__':
+ stream = DummyStream()
+ runner = unittest.TextTestRunner(stream=stream)
+ unittest.main(testRunner=runner)