summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon McVittie <simon.mcvittie@collabora.co.uk>2010-11-18 18:14:25 +0000
committerSimon McVittie <simon.mcvittie@collabora.co.uk>2010-11-18 18:14:25 +0000
commit4af95b75fc17a251b167208131a51626ae15dec1 (patch)
tree31181184c01ed8c7f7d426dcf4ee4cb74ac983f9
parent6babff19f5dafab264a6d3cd5f3af602f7437464 (diff)
tests: import servicetest.py from Salut and modernize tests to pass again
-rw-r--r--tests/twisted/servicetest.py560
-rw-r--r--tests/twisted/sofiatest.py3
-rw-r--r--tests/twisted/test-handle-normalisation.py39
-rw-r--r--tests/twisted/test-message.py6
4 files changed, 371 insertions, 237 deletions
diff --git a/tests/twisted/servicetest.py b/tests/twisted/servicetest.py
index ecb44f6..5685eec 100644
--- a/tests/twisted/servicetest.py
+++ b/tests/twisted/servicetest.py
@@ -6,54 +6,26 @@ Infrastructure code for testing connection managers.
from twisted.internet import glib2reactor
from twisted.internet.protocol import Protocol, Factory, ClientFactory
glib2reactor.install()
+import sys
+import time
import pprint
-import traceback
import unittest
import dbus.glib
from twisted.internet import reactor
+import constants as cs
+
tp_name_prefix = 'org.freedesktop.Telepathy'
tp_path_prefix = '/org/freedesktop/Telepathy'
-class TryNextHandler(Exception):
- pass
-
-def lazy(func):
- def handler(event, data):
- if func(event, data):
- return True
- else:
- raise TryNextHandler()
- handler.__name__ = func.__name__
- return handler
-
-def match(type, **kw):
- def decorate(func):
- def handler(event, data, *extra, **extra_kw):
- if event.type != type:
- return False
-
- for key, value in kw.iteritems():
- if not hasattr(event, key):
- return False
-
- if getattr(event, key) != value:
- return False
-
- return func(event, data, *extra, **extra_kw)
-
- handler.__name__ = func.__name__
- return handler
-
- return decorate
-
class Event:
def __init__(self, type, **kw):
self.__dict__.update(kw)
self.type = type
+ (self.subqueue, self.subtype) = type.split ("-", 1)
def format_event(event):
ret = ['- type %s' % event.type]
@@ -68,113 +40,24 @@ def format_event(event):
return ret
-class EventTest:
- """Somewhat odd event dispatcher for asynchronous tests.
-
- Callbacks are kept in a queue. Incoming events are passed to the first
- callback. If the callback returns True, the callback is removed. If the
- callback raises AssertionError, the test fails. If there are no more
- callbacks, the test passes. The reactor is stopped when the test passes.
- """
-
- def __init__(self):
- self.queue = []
- self.data = {'test': self}
- self.timeout_delayed_call = reactor.callLater(5, self.timeout_cb)
- #self.verbose = True
- self.verbose = False
- # ugh
- self.stopping = False
-
- def timeout_cb(self):
- print 'timed out waiting for events'
- print self.queue[0]
- self.fail()
-
- def fail(self):
- # ugh; better way to stop the reactor and exit(1)?
- import os
- os._exit(1)
-
- def expect(self, f):
- self.queue.append(f)
-
- def log(self, s):
- if self.verbose:
- print s
-
- def try_stop(self):
- if self.stopping:
- return True
-
- if not self.queue:
- self.log('no handlers left; stopping')
- self.stopping = True
- reactor.stop()
- return True
-
- return False
-
- def call_handlers(self, event):
- self.log('trying %r' % self.queue[0])
- handler = self.queue.pop(0)
-
- try:
- ret = handler(event, self.data)
- if not ret:
- self.queue.insert(0, handler)
- except TryNextHandler, e:
- if self.queue:
- ret = self.call_handlers(event)
- else:
- ret = False
- self.queue.insert(0, handler)
-
- return ret
-
- def handle_event(self, event):
- if self.try_stop():
- return
-
- self.log('got event:')
- self.log('- type: %s' % event.type)
- map(self.log, format_event(event))
-
- try:
- ret = self.call_handlers(event)
- except SystemExit, e:
- if e.code:
- print "Unsuccessful exit:", e
- self.fail()
- else:
- self.queue[:] = []
- ret = True
- except AssertionError, e:
- print 'test failed:'
- traceback.print_exc()
- self.fail()
- except (Exception, KeyboardInterrupt), e:
- print 'error in handler:'
- traceback.print_exc()
- self.fail()
-
- if ret not in (True, False):
- print ("warning: %s() returned something other than True or False"
- % self.queue[0].__name__)
-
- if ret:
- self.timeout_delayed_call.reset(5)
- self.log('event handled')
- else:
- self.log('event not handled')
-
- self.log('')
- self.try_stop()
-
class EventPattern:
def __init__(self, type, **properties):
self.type = type
+ self.predicate = None
+ if 'predicate' in properties:
+ self.predicate = properties['predicate']
+ del properties['predicate']
self.properties = properties
+ (self.subqueue, self.subtype) = type.split ("-", 1)
+
+ def __repr__(self):
+ properties = dict(self.properties)
+
+ if self.predicate is not None:
+ properties['predicate'] = self.predicate
+
+ return '%s(%r, **%r)' % (
+ self.__class__.__name__, self.type, properties)
def match(self, event):
if event.type != self.type:
@@ -187,7 +70,11 @@ class EventPattern:
except AttributeError:
return False
- return True
+ if self.predicate is None or self.predicate(event):
+ return True
+
+ return False
+
class TimeoutError(Exception):
pass
@@ -200,6 +87,8 @@ class BaseEventQueue:
def __init__(self, timeout=None):
self.verbose = False
+ self.forbidden_events = set()
+ self.event_queues = {}
if timeout is None:
self.timeout = 5
@@ -210,16 +99,56 @@ class BaseEventQueue:
if self.verbose:
print s
+ def log_queues(self, queues):
+ self.log ("Waiting for event on: %s" % ", ".join(queues))
+
+ def log_event(self, event):
+ self.log('got event:')
+
+ if self.verbose:
+ map(self.log, format_event(event))
+
+ def forbid_events(self, patterns):
+ """
+ Add patterns (an iterable of EventPattern) to the set of forbidden
+ events. If a forbidden event occurs during an expect or expect_many,
+ the test will fail.
+ """
+ self.forbidden_events.update(set(patterns))
+
+ def unforbid_events(self, patterns):
+ """
+ Remove 'patterns' (an iterable of EventPattern) from the set of
+ forbidden events. These must be the same EventPattern pointers that
+ were passed to forbid_events.
+ """
+ self.forbidden_events.difference_update(set(patterns))
+
+ def _check_forbidden(self, event):
+ for e in self.forbidden_events:
+ if e.match(event):
+ print "forbidden event occurred:"
+ for x in format_event(event):
+ print x
+ assert False
+
def expect(self, type, **kw):
+ """
+ Waits for an event matching the supplied pattern to occur, and returns
+ it. For example, to await a D-Bus signal with particular arguments:
+
+ e = q.expect('dbus-signal', signal='Badgers', args=["foo", 42])
+ """
pattern = EventPattern(type, **kw)
+ t = time.time()
while True:
- event = self.wait()
- self.log('got event:')
- map(self.log, format_event(event))
+ event = self.wait([pattern.subqueue])
+ self._check_forbidden(event)
if pattern.match(event):
- self.log('handled')
+ self.log('handled, took %0.3f ms'
+ % ((time.time() - t) * 1000.0) )
self.log('')
return event
@@ -227,16 +156,57 @@ class BaseEventQueue:
self.log('')
def expect_many(self, *patterns):
+ """
+ Waits for events matching all of the supplied EventPattern instances to
+ return, and returns a list of events in the same order as the patterns
+ they matched. After a pattern is successfully matched, it is not
+ considered for future events; if more than one unsatisfied pattern
+ matches an event, the first "wins".
+
+ Note that the expected events may occur in any order. If you're
+ expecting a series of events in a particular order, use repeated calls
+ to expect() instead.
+
+ This method is useful when you're awaiting a number of events which may
+ happen in any order. For instance, in telepathy-gabble, calling a D-Bus
+ method often causes a value to be returned immediately, as well as a
+ query to be sent to the server. Since these events may reach the test
+ in either order, the following is incorrect and will fail if the IQ
+ happens to reach the test first:
+
+ ret = q.expect('dbus-return', method='Foo')
+ query = q.expect('stream-iq', query_ns=ns.FOO)
+
+ The following would be correct:
+
+ ret, query = q.expect_many(
+ EventPattern('dbus-return', method='Foo'),
+ EventPattern('stream-iq', query_ns=ns.FOO),
+ )
+ """
ret = [None] * len(patterns)
+ t = time.time()
while None in ret:
- event = self.wait()
- self.log('got event:')
- map(self.log, format_event(event))
+ try:
+ queues = set()
+ for i, pattern in enumerate(patterns):
+ if ret[i] is None:
+ queues.add(pattern.subqueue)
+ event = self.wait(queues)
+ except TimeoutError:
+ self.log('timeout')
+ self.log('still expecting:')
+ for i, pattern in enumerate(patterns):
+ if ret[i] is None:
+ self.log(' - %r' % pattern)
+ raise
+ self._check_forbidden(event)
for i, pattern in enumerate(patterns):
- if pattern.match(event):
- self.log('handled')
+ if ret[i] is None and pattern.match(event):
+ self.log('handled, took %0.3f ms'
+ % ((time.time() - t) * 1000.0) )
self.log('')
ret[i] = event
break
@@ -249,9 +219,7 @@ class BaseEventQueue:
def demand(self, type, **kw):
pattern = EventPattern(type, **kw)
- event = self.wait()
- self.log('got event:')
- map(self.log, format_event(event))
+ event = self.wait([pattern.subqueue])
if pattern.match(event):
self.log('handled')
@@ -261,14 +229,34 @@ class BaseEventQueue:
self.log('not handled')
raise RuntimeError('expected %r, got %r' % (pattern, event))
+ def queues_available(self, queues):
+ if queues == None:
+ return self.event_queues.keys()
+ else:
+ available = self.event_queues.keys()
+ return filter(lambda x: x in available, queues)
+
+
+ def pop_next(self, queue):
+ events = self.event_queues[queue]
+ e = events.pop(0)
+ if not events:
+ self.event_queues.pop (queue)
+ return e
+
+ def append(self, event):
+ self.log ("Adding to queue")
+ self.log_event (event)
+ self.event_queues[event.subqueue] = \
+ self.event_queues.get(event.subqueue, []) + [event]
+
class IteratingEventQueue(BaseEventQueue):
"""Event queue that works by iterating the Twisted reactor."""
def __init__(self, timeout=None):
BaseEventQueue.__init__(self, timeout)
- self.events = []
- def wait(self):
+ def wait(self, queues=None):
stop = [False]
def later():
@@ -276,58 +264,92 @@ class IteratingEventQueue(BaseEventQueue):
delayed_call = reactor.callLater(self.timeout, later)
- while (not self.events) and (not stop[0]):
- reactor.iterate(0.1)
+ self.log_queues(queues)
+
+ qa = self.queues_available(queues)
+ while not qa and (not stop[0]):
+ reactor.iterate(0.01)
+ qa = self.queues_available(queues)
- if self.events:
+ if qa:
delayed_call.cancel()
- return self.events.pop(0)
+ e = self.pop_next (qa[0])
+ self.log_event (e)
+ return e
else:
raise TimeoutError
- def append(self, event):
- self.events.append(event)
-
- # compatibility
- handle_event = append
-
class TestEventQueue(BaseEventQueue):
def __init__(self, events):
BaseEventQueue.__init__(self)
- self.events = events
+ for e in events:
+ self.append (e)
- def wait(self):
- if self.events:
- return self.events.pop(0)
+ def wait(self, queues = None):
+ qa = self.queues_available(queues)
+
+ if qa:
+ return self.pop_next (qa[0])
else:
raise TimeoutError
class EventQueueTest(unittest.TestCase):
def test_expect(self):
- queue = TestEventQueue([Event('foo'), Event('bar')])
- assert queue.expect('foo').type == 'foo'
- assert queue.expect('bar').type == 'bar'
+ queue = TestEventQueue([Event('test-foo'), Event('test-bar')])
+ assert queue.expect('test-foo').type == 'test-foo'
+ assert queue.expect('test-bar').type == 'test-bar'
def test_expect_many(self):
- queue = TestEventQueue([Event('foo'), Event('bar')])
+ queue = TestEventQueue([Event('test-foo'),
+ Event('test-bar')])
bar, foo = queue.expect_many(
- EventPattern('bar'),
- EventPattern('foo'))
- assert bar.type == 'bar'
- assert foo.type == 'foo'
+ EventPattern('test-bar'),
+ EventPattern('test-foo'))
+ assert bar.type == 'test-bar'
+ assert foo.type == 'test-foo'
+
+ def test_expect_many2(self):
+ # Test that events are only matched against patterns that haven't yet
+ # been matched. This tests a regression.
+ queue = TestEventQueue([Event('test-foo', x=1), Event('test-foo', x=2)])
+ foo1, foo2 = queue.expect_many(
+ EventPattern('test-foo'),
+ EventPattern('test-foo'))
+ assert foo1.type == 'test-foo' and foo1.x == 1
+ assert foo2.type == 'test-foo' and foo2.x == 2
+
+ def test_expect_queueing(self):
+ queue = TestEventQueue([Event('foo-test', x=1),
+ Event('foo-test', x=2)])
+
+ queue.append(Event('bar-test', x=1))
+ queue.append(Event('bar-test', x=2))
+
+ queue.append(Event('baz-test', x=1))
+ queue.append(Event('baz-test', x=2))
+
+ for x in xrange(1,2):
+ e = queue.expect ('baz-test')
+ assertEquals (x, e.x)
+
+ e = queue.expect ('bar-test')
+ assertEquals (x, e.x)
+
+ e = queue.expect ('foo-test')
+ assertEquals (x, e.x)
def test_timeout(self):
queue = TestEventQueue([])
- self.assertRaises(TimeoutError, queue.expect, 'foo')
+ self.assertRaises(TimeoutError, queue.expect, 'test-foo')
def test_demand(self):
- queue = TestEventQueue([Event('foo'), Event('bar')])
- foo = queue.demand('foo')
- assert foo.type == 'foo'
+ queue = TestEventQueue([Event('test-foo'), Event('test-bar')])
+ foo = queue.demand('test-foo')
+ assert foo.type == 'test-foo'
def test_demand_fail(self):
- queue = TestEventQueue([Event('foo'), Event('bar')])
- self.assertRaises(RuntimeError, queue.demand, 'bar')
+ queue = TestEventQueue([Event('test-foo'), Event('test-bar')])
+ self.assertRaises(RuntimeError, queue.demand, 'test-bar')
def unwrap(x):
"""Hack to unwrap D-Bus values, so that they're easier to read when
@@ -342,7 +364,10 @@ def unwrap(x):
if isinstance(x, dict):
return dict([(unwrap(k), unwrap(v)) for k, v in x.iteritems()])
- for t in [unicode, str, long, int, float, bool]:
+ if isinstance(x, dbus.Boolean):
+ return bool(x)
+
+ for t in [unicode, str, long, int, float]:
if isinstance(x, t):
return t(x)
@@ -353,21 +378,33 @@ def call_async(test, proxy, method, *args, **kw):
resulting method return/error."""
def reply_func(*ret):
- test.handle_event(Event('dbus-return', method=method,
+ test.append(Event('dbus-return', method=method,
value=unwrap(ret)))
def error_func(err):
- test.handle_event(Event('dbus-error', method=method, error=err))
+ test.append(Event('dbus-error', method=method, error=err,
+ name=err.get_dbus_name(), message=str(err)))
method_proxy = getattr(proxy, method)
kw.update({'reply_handler': reply_func, 'error_handler': error_func})
method_proxy(*args, **kw)
+def sync_dbus(bus, q, conn):
+ # Dummy D-Bus method call
+ # This won't do the right thing unless the proxy has a unique name.
+ assert conn.object.bus_name.startswith(':')
+ root_object = bus.get_object(conn.object.bus_name, '/')
+ call_async(
+ q, dbus.Interface(root_object, 'org.freedesktop.DBus.Peer'), 'Ping')
+ q.expect('dbus-return', method='Ping')
class ProxyWrapper:
def __init__(self, object, default, others):
self.object = object
self.default_interface = dbus.Interface(object, default)
+ self.Properties = dbus.Interface(object, dbus.PROPERTIES_IFACE)
+ self.TpProperties = \
+ dbus.Interface(object, tp_name_prefix + '.Properties')
self.interfaces = dict([
(name, dbus.Interface(object, iface))
for name, iface in others.iteritems()])
@@ -381,8 +418,36 @@ class ProxyWrapper:
return getattr(self.default_interface, name)
-def prepare_test(event_func, name, proto, params):
- bus = dbus.SessionBus()
+def wrap_connection(conn):
+ return ProxyWrapper(conn, tp_name_prefix + '.Connection',
+ dict([
+ (name, tp_name_prefix + '.Connection.Interface.' + name)
+ for name in ['Aliasing', 'Avatars', 'Capabilities', 'Contacts',
+ 'Presence', 'SimplePresence', 'Requests']] +
+ [('Peer', 'org.freedesktop.DBus.Peer'),
+ ('ContactCapabilities', cs.CONN_IFACE_CONTACT_CAPS),
+ ('ContactInfo', cs.CONN_IFACE_CONTACT_INFO),
+ ('Location', cs.CONN_IFACE_LOCATION),
+ ('Future', tp_name_prefix + '.Connection.FUTURE'),
+ ('MailNotification', cs.CONN_IFACE_MAIL_NOTIFICATION),
+ ('ContactList', cs.CONN_IFACE_CONTACT_LIST),
+ ('ContactGroups', cs.CONN_IFACE_CONTACT_GROUPS),
+ ]))
+
+def wrap_channel(chan, type_, extra=None):
+ interfaces = {
+ type_: tp_name_prefix + '.Channel.Type.' + type_,
+ 'Group': tp_name_prefix + '.Channel.Interface.Group',
+ }
+
+ if extra:
+ interfaces.update(dict([
+ (name, tp_name_prefix + '.Channel.Interface.' + name)
+ for name in extra]))
+
+ return ProxyWrapper(chan, tp_name_prefix + '.Channel', interfaces)
+
+def make_connection(bus, event_func, name, proto, params):
cm = bus.get_object(
tp_name_prefix + '.ConnectionManager.%s' % name,
tp_path_prefix + '/ConnectionManager/%s' % name)
@@ -390,28 +455,25 @@ def prepare_test(event_func, name, proto, params):
connection_name, connection_path = cm_iface.RequestConnection(
proto, params)
- conn = bus.get_object(connection_name, connection_path)
- conn = ProxyWrapper(conn, tp_name_prefix + '.Connection',
- dict([
- (name, tp_name_prefix + '.Connection.Interface.' + name)
- for name in ['Aliasing', 'Avatars', 'Capabilities', 'Presence']] +
- [('Peer', 'org.freedesktop.DBus.Peer')]))
+ conn = wrap_connection(bus.get_object(connection_name, connection_path))
bus.add_signal_receiver(
lambda *args, **kw:
event_func(
Event('dbus-signal',
- path=unwrap(kw['path'])[len(tp_path_prefix):],
- signal=kw['member'], args=map(unwrap, args))),
+ path=unwrap(kw['path']),
+ signal=kw['member'], args=map(unwrap, args),
+ interface=kw['interface'])),
None, # signal name
None, # interface
cm._named_service,
path_keyword='path',
member_keyword='member',
+ interface_keyword='interface',
byte_arrays=True
)
- return bus, conn
+ return conn
def make_channel_proxy(conn, path, iface):
bus = dbus.SessionBus()
@@ -419,36 +481,39 @@ def make_channel_proxy(conn, path, iface):
chan = dbus.Interface(chan, tp_name_prefix + '.' + iface)
return chan
-def load_event_handlers():
- path, _, _, _ = traceback.extract_stack()[0]
- import compiler
- import __main__
- ast = compiler.parseFile(path)
- return [
- getattr(__main__, node.name)
- for node in ast.node.asList()
- if node.__class__ == compiler.ast.Function and
- node.name.startswith('expect_')]
-
+# block_reading can be used if the test want to choose when we start to read
+# data from the socket.
class EventProtocol(Protocol):
- def __init__(self, queue=None):
+ def __init__(self, queue=None, block_reading=False):
self.queue = queue
def dataReceived(self, data):
if self.queue is not None:
- self.queue.handle_event(Event('socket-data', protocol=self,
+ self.queue.append(Event('socket-data', protocol=self,
data=data))
def sendData(self, data):
self.transport.write(data)
+ def connectionMade(self):
+ if self.block_reading:
+ self.transport.stopReading()
+
+ def connectionLost(self, reason=None):
+ if self.queue is not None:
+ self.queue.append(Event('socket-disconnected', protocol=self))
+
class EventProtocolFactory(Factory):
- def __init__(self, queue):
+ def __init__(self, queue, block_reading=False):
self.queue = queue
+ self.block_reading = block_reading
+
+ def _create_protocol(self):
+ return EventProtocol(self.queue, self.block_reading)
def buildProtocol(self, addr):
- proto = EventProtocol(self.queue)
- self.queue.handle_event(Event('socket-connected', protocol=proto))
+ proto = self._create_protocol()
+ self.queue.append(Event('socket-connected', protocol=proto))
return proto
class EventProtocolClientFactory(EventProtocolFactory, ClientFactory):
@@ -456,7 +521,7 @@ class EventProtocolClientFactory(EventProtocolFactory, ClientFactory):
def watch_tube_signals(q, tube):
def got_signal_cb(*args, **kwargs):
- q.handle_event(Event('tube-signal',
+ q.append(Event('tube-signal',
path=kwargs['path'],
signal=kwargs['member'],
args=map(unwrap, args),
@@ -466,6 +531,81 @@ def watch_tube_signals(q, tube):
path_keyword='path', member_keyword='member',
byte_arrays=True)
+def pretty(x):
+ return pprint.pformat(unwrap(x))
+
+def assertEquals(expected, value):
+ if expected != value:
+ raise AssertionError(
+ "expected:\n%s\ngot:\n%s" % (pretty(expected), pretty(value)))
+
+def assertSameSets(expected, value):
+ exp_set = set(expected)
+ val_set = set(value)
+
+ if exp_set != val_set:
+ raise AssertionError(
+ "expected contents:\n%s\ngot:\n%s" % (
+ pretty(exp_set), pretty(val_set)))
+
+def assertNotEquals(expected, value):
+ if expected == value:
+ raise AssertionError(
+ "expected something other than:\n%s" % pretty(value))
+
+def assertContains(element, value):
+ if element not in value:
+ raise AssertionError(
+ "expected:\n%s\nin:\n%s" % (pretty(element), pretty(value)))
+
+def assertDoesNotContain(element, value):
+ if element in value:
+ raise AssertionError(
+ "expected:\n%s\nnot in:\n%s" % (pretty(element), pretty(value)))
+
+def assertLength(length, value):
+ if len(value) != length:
+ raise AssertionError("expected: length %d, got length %d:\n%s" % (
+ length, len(value), pretty(value)))
+
+def assertFlagsSet(flags, value):
+ masked = value & flags
+ if masked != flags:
+ raise AssertionError(
+ "expected flags %u, of which only %u are set in %u" % (
+ flags, masked, value))
+
+def assertFlagsUnset(flags, value):
+ masked = value & flags
+ if masked != 0:
+ raise AssertionError(
+ "expected none of flags %u, but %u are set in %u" % (
+ flags, masked, value))
+
+def install_colourer():
+ def red(s):
+ return '\x1b[31m%s\x1b[0m' % s
+
+ def green(s):
+ return '\x1b[32m%s\x1b[0m' % s
+
+ patterns = {
+ 'handled': green,
+ 'not handled': red,
+ }
+
+ class Colourer:
+ def __init__(self, fh, patterns):
+ self.fh = fh
+ self.patterns = patterns
+
+ def write(self, s):
+ f = self.patterns.get(s, lambda x: x)
+ self.fh.write(f(s))
+
+ sys.stdout = Colourer(sys.stdout, patterns)
+ return sys.stdout
+
if __name__ == '__main__':
unittest.main()
diff --git a/tests/twisted/sofiatest.py b/tests/twisted/sofiatest.py
index 149cacb..2ec5b75 100644
--- a/tests/twisted/sofiatest.py
+++ b/tests/twisted/sofiatest.py
@@ -50,7 +50,8 @@ def prepare_test(event_func, register_cb, params=None):
if params is not None:
actual_params.update(params)
- bus, conn = servicetest.prepare_test(event_func,
+ bus = dbus.SessionBus()
+ conn = servicetest.make_connection(bus, event_func,
'sofiasip', 'sip', actual_params)
port = int(actual_params['port'])
diff --git a/tests/twisted/test-handle-normalisation.py b/tests/twisted/test-handle-normalisation.py
index cf741eb..710ed72 100644
--- a/tests/twisted/test-handle-normalisation.py
+++ b/tests/twisted/test-handle-normalisation.py
@@ -1,14 +1,16 @@
-from servicetest import unwrap, match
-from sofiatest import go
+from servicetest import assertEquals
+from sofiatest import exec_test
import dbus
+import constants as cs
-@match('dbus-signal', signal='StatusChanged', args=[1, 1])
-def expect_connecting(event, data):
- return True
+def test(q, bus, conn, sip):
+ conn.Connect()
+ q.expect('dbus-signal', signal='StatusChanged',
+ args=[cs.CONN_STATUS_CONNECTING, cs.CSR_REQUESTED])
+ q.expect('dbus-signal', signal='StatusChanged',
+ args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])
-@match('dbus-signal', signal='StatusChanged', args=[0, 1])
-def expect_connected(event, data):
tests = [ ('sip:test@localhost', 'sip:test@localhost'),
('test@localhost', 'sip:test@localhost'),
('test', 'sip:test@127.0.0.1'),
@@ -29,22 +31,15 @@ def expect_connected(event, data):
orig = [ x[0] for x in tests ]
expect = [ x[1] for x in tests ]
- handles = data['conn_iface'].RequestHandles(1, orig)
- names = data['conn_iface'].InspectHandles(1, handles)
+ handles = conn.RequestHandles(1, orig)
+ names = conn.InspectHandles(1, handles)
- for a,b in zip(expect, map(lambda x: str(unwrap(x)), names)):
- if a != b:
- print a, b
- raise Exception("test failed")
+ for a,b in zip(expect, names):
+ assertEquals(a, b)
- data['conn_iface'].Disconnect()
- return True
-
-@match('dbus-signal', signal='StatusChanged', args=[2, 1])
-def expect_disconnected(event, data):
- return True
+ conn.Disconnect()
+ q.expect('dbus-signal', signal='StatusChanged',
+ args=[cs.CONN_STATUS_DISCONNECTED, cs.CSR_REQUESTED])
if __name__ == '__main__':
- go()
-
-
+ exec_test(test)
diff --git a/tests/twisted/test-message.py b/tests/twisted/test-message.py
index 0f0b61c..95ad159 100644
--- a/tests/twisted/test-message.py
+++ b/tests/twisted/test-message.py
@@ -135,8 +135,7 @@ def test(q, bus, conn, sip):
send_message(sip, ua_via, 'How are you doing now, old pal?',
sender=contact)
- event = q.expect('dbus-signal', signal='Received')
- assert tp_path_prefix + event.path == chan, (event.path, chan)
+ event = q.expect('dbus-signal', signal='Received', path=chan)
assert event.args[5] == 'How are you doing now, old pal?'
pending_msgs.append(tuple(event.args))
@@ -150,8 +149,7 @@ def test(q, bus, conn, sip):
dbus.Interface(requested_obj, CHANNEL).Close()
del requested_obj
- event = q.expect('dbus-signal', signal='Closed')
- assert tp_path_prefix + event.path == chan, (event.path, chan)
+ event = q.expect('dbus-signal', signal='Closed', path=chan)
requested_obj, handle = test_new_channel (q, bus, conn,
target_uri=contact,