summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorCosimo Alfarano <cosimo.alfarano@collabora.co.uk>2010-02-10 20:04:29 +0000
committerDanielle Madeley <danielle.madeley@collabora.co.uk>2010-02-12 21:51:01 +1100
commitfd9c87f22fd9d23edb5227e810e2441e03e9ef23 (patch)
tree13bc9da40a0cce5e812990706f5011cc2f31a0d2 /tests
parent48f17f5d3950f130434eea9ade014a0fe396cdae (diff)
Modified twisted testcases
Diffstat (limited to 'tests')
-rw-r--r--tests/Makefile.am1
-rw-r--r--tests/README2
-rw-r--r--tests/test-tpl-log-manager.c21
-rw-r--r--tests/test-tpl-observer.c5
-rw-r--r--tests/twisted/Makefile.am2
-rw-r--r--tests/twisted/channel_text/init.py84
-rw-r--r--tests/twisted/main-debug.c2
-rw-r--r--tests/twisted/observer/init.py82
-rw-r--r--tests/twisted/tools/Makefile.am2
-rw-r--r--tests/twisted/tpltest.py684
10 files changed, 362 insertions, 523 deletions
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 1ddabb1..9666416 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -12,6 +12,7 @@ noinst_PROGRAMS = \
test-tpl-channel \
test-tpl-conf \
test-tpl-log-entry \
+ test-tpl-log-manager \
test-tpl-observer \
test-searches
diff --git a/tests/README b/tests/README
index c9da6a5..b9aab0d 100644
--- a/tests/README
+++ b/tests/README
@@ -55,7 +55,7 @@ or:
To debug an individual test you can set one of the following env variable:
* GABBLE_TEST_VALGRIND : to run Gabble inside valgrind. The report is
- added to tools/gabble-testing.log.
+ added to tools/tpl-testing.log.
export GABBLE_TEST_VALGRIND=1
* GABBLE_TEST_REFDBG : to run Gabble inside refdbg. The report is written
diff --git a/tests/test-tpl-log-manager.c b/tests/test-tpl-log-manager.c
new file mode 100644
index 0000000..c7ebaf4
--- /dev/null
+++ b/tests/test-tpl-log-manager.c
@@ -0,0 +1,21 @@
+#include <telepathy-logger/log-entry-text.h>
+
+#define gconf_client_get_bool(obj,key,err) g_print ("%s", key)
+
+#define LOG_ID 0
+#define CHAT_ID "echo@test.collabora.co.uk"
+#define DIRECTION TPL_LOG_ENTRY_DIRECTION_IN
+
+int main (int argc, char **argv)
+{
+// TplLogManager *manager;
+
+ g_type_init ();
+
+ // manager = tpl_log_manager_dup_singleton ();
+
+
+
+ return 0;
+}
+
diff --git a/tests/test-tpl-observer.c b/tests/test-tpl-observer.c
index b57576b..e62fff8 100644
--- a/tests/test-tpl-observer.c
+++ b/tests/test-tpl-observer.c
@@ -20,7 +20,6 @@ int
main (int argc, char **argv)
{
TplObserver *obs, *obs2;
- TplChannel *chan;
g_type_init ();
@@ -43,10 +42,6 @@ main (int argc, char **argv)
tpl_observer_set_channel_factory (obs, mock_factory);
- /* register a channel */
- chan = TPL_CHANNEL (tpl_channel_test_new (NULL, NULL, NULL, NULL, NULL));
- tpl_observer_register_channel (obs, chan);
-
/* proper disposal for the singleton when no references are present */
g_object_unref (obs);
g_assert (TPL_IS_OBSERVER (obs) == FALSE);
diff --git a/tests/twisted/Makefile.am b/tests/twisted/Makefile.am
index ffc2d4b..f653758 100644
--- a/tests/twisted/Makefile.am
+++ b/tests/twisted/Makefile.am
@@ -18,7 +18,7 @@ check-twisted:
$(MAKE) -C tools
rm -f tools/core
rm -f tools/vgcore.*
- rm -f tools/gabble-testing.log
+ rm -f tools/tpl-testing.log
rm -f tools/strace.log
if test -n "$$GABBLE_TEST_REFDBG"; then \
sleep=6; \
diff --git a/tests/twisted/channel_text/init.py b/tests/twisted/channel_text/init.py
index dae48e5..7489922 100644
--- a/tests/twisted/channel_text/init.py
+++ b/tests/twisted/channel_text/init.py
@@ -4,90 +4,14 @@ Test text channel initiated by me.
import dbus
-from twisted.words.xish import domish
-
-from gabbletest import exec_test
+from tpltest import exec_test
from servicetest import call_async, EventPattern
import constants as cs
-def test(q, bus, conn, stream):
- conn.Connect()
- q.expect('dbus-signal', signal='StatusChanged',
- args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])
-
- self_handle = conn.GetSelfHandle()
-
- jid = 'foo@bar.com'
- call_async(q, conn, 'RequestHandles', cs.HT_CONTACT, [jid])
-
- event = q.expect('dbus-return', method='RequestHandles')
- foo_handle = event.value[0][0]
-
- call_async(q, conn, 'RequestChannel',
- cs.CHANNEL_TYPE_TEXT, cs.HT_CONTACT, foo_handle, True)
-
- ret, sig = q.expect_many(
- EventPattern('dbus-return', method='RequestChannel'),
- EventPattern('dbus-signal', signal='NewChannel'),
- )
-
- text_chan = bus.get_object(conn.bus_name, ret.value[0])
-
- assert sig.args[0] == ret.value[0], \
- (sig.args[0], ret.value[0])
- assert sig.args[1] == cs.CHANNEL_TYPE_TEXT, sig.args[1]
- # check that handle type == contact handle
- assert sig.args[2] == 1, sig.args[1]
- assert sig.args[3] == foo_handle, (sig.args[3], foo_handle)
- assert sig.args[4] == True # suppress handler
-
- # Exercise basic Channel Properties from spec 0.17.7
- channel_props = text_chan.GetAll(
- cs.CHANNEL, dbus_interface=dbus.PROPERTIES_IFACE)
- assert channel_props.get('TargetHandle') == foo_handle,\
- (channel_props.get('TargetHandle'), foo_handle)
- assert channel_props.get('TargetHandleType') == 1,\
- channel_props.get('TargetHandleType')
- assert channel_props.get('ChannelType') == \
- cs.CHANNEL_TYPE_TEXT,\
- channel_props.get('ChannelType')
- assert cs.CHANNEL_IFACE_CHAT_STATE in \
- channel_props.get('Interfaces', ()), \
- channel_props.get('Interfaces')
- assert channel_props['TargetID'] == jid,\
- (channel_props['TargetID'], jid)
- assert channel_props['Requested'] == True
- assert channel_props['InitiatorHandle'] == self_handle,\
- (channel_props['InitiatorHandle'], self_handle)
- assert channel_props['InitiatorID'] == 'test@localhost',\
- channel_props['InitiatorID']
-
- dbus.Interface(text_chan, cs.CHANNEL_TYPE_TEXT).Send(0, 'hey')
-
- event = q.expect('stream-message')
-
- elem = event.stanza
- assert elem.name == 'message'
- assert elem['type'] == 'chat'
- body = list(event.stanza.elements())[0]
- assert body.name == 'body'
- assert body.children[0] == u'hey'
-
- # <message type="chat"><body>hello</body</message>
- m = domish.Element((None, 'message'))
- m['from'] = 'foo@bar.com/Pidgin'
- m['type'] = 'chat'
- m.addElement('body', content='hello')
- stream.send(m)
-
- event = q.expect('dbus-signal', signal='Received')
+def test(q, bus):
+ #pect('dbus-signal', signal='NewChannels')
- # message type: normal
- assert event.args[3] == 0
- # flags: none
- assert event.args[4] == 0
- # body
- assert event.args[5] == 'hello'
+ print "GOT NewChannels TEXT"
if __name__ == '__main__':
exec_test(test)
diff --git a/tests/twisted/main-debug.c b/tests/twisted/main-debug.c
index 4a44c73..870fb66 100644
--- a/tests/twisted/main-debug.c
+++ b/tests/twisted/main-debug.c
@@ -1,5 +1,5 @@
/*
- * main.c - entry point for telepathy-gabble-debug used by tests
+ * main.c - entry point for telepathy-tpl-debug used by tests
* Copyright (C) 2008 Collabora Ltd.
*
* This library is free software; you can redistribute it and/or
diff --git a/tests/twisted/observer/init.py b/tests/twisted/observer/init.py
index 6b06184..e5de829 100644
--- a/tests/twisted/observer/init.py
+++ b/tests/twisted/observer/init.py
@@ -4,88 +4,14 @@ Test text channel initiated by me.
import dbus
-from gabbletest import exec_test
+from tpltest import exec_test
from servicetest import call_async, EventPattern
import constants as cs
-def test(q, bus, conn, stream):
- conn.Connect()
- q.expect('dbus-signal', signal='StatusChanged',
- args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])
+def test(q, bus):
+ #pect('dbus-signal', signal='NewChannels')
- self_handle = conn.GetSelfHandle()
-
- jid = 'foo@bar.com'
- call_async(q, conn, 'RequestHandles', cs.HT_CONTACT, [jid])
-
- event = q.expect('dbus-return', method='RequestHandles')
- foo_handle = event.value[0][0]
-
- call_async(q, conn, 'RequestChannel',
- cs.CHANNEL_TYPE_TEXT, cs.HT_CONTACT, foo_handle, True)
-
- ret, sig = q.expect_many(
- EventPattern('dbus-return', method='RequestChannel'),
- EventPattern('dbus-signal', signal='NewChannel'),
- )
-
- text_chan = bus.get_object(conn.bus_name, ret.value[0])
-
- assert sig.args[0] == ret.value[0], \
- (sig.args[0], ret.value[0])
- assert sig.args[1] == cs.CHANNEL_TYPE_TEXT, sig.args[1]
- # check that handle type == contact handle
- assert sig.args[2] == 1, sig.args[1]
- assert sig.args[3] == foo_handle, (sig.args[3], foo_handle)
- assert sig.args[4] == True # suppress handler
-
- # Exercise basic Channel Properties from spec 0.17.7
- channel_props = text_chan.GetAll(
- cs.CHANNEL, dbus_interface=dbus.PROPERTIES_IFACE)
- assert channel_props.get('TargetHandle') == foo_handle,\
- (channel_props.get('TargetHandle'), foo_handle)
- assert channel_props.get('TargetHandleType') == 1,\
- channel_props.get('TargetHandleType')
- assert channel_props.get('ChannelType') == \
- cs.CHANNEL_TYPE_TEXT,\
- channel_props.get('ChannelType')
- assert cs.CHANNEL_IFACE_CHAT_STATE in \
- channel_props.get('Interfaces', ()), \
- channel_props.get('Interfaces')
- assert channel_props['TargetID'] == jid,\
- (channel_props['TargetID'], jid)
- assert channel_props['Requested'] == True
- assert channel_props['InitiatorHandle'] == self_handle,\
- (channel_props['InitiatorHandle'], self_handle)
- assert channel_props['InitiatorID'] == 'test@localhost',\
- channel_props['InitiatorID']
-
- dbus.Interface(text_chan, cs.CHANNEL_TYPE_TEXT).Send(0, 'hey')
-
- event = q.expect('stream-message')
-
- elem = event.stanza
- assert elem.name == 'message'
- assert elem['type'] == 'chat'
- body = list(event.stanza.elements())[0]
- assert body.name == 'body'
- assert body.children[0] == u'hey'
-
- # <message type="chat"><body>hello</body</message>
- m = domish.Element((None, 'message'))
- m['from'] = 'foo@bar.com/Pidgin'
- m['type'] = 'chat'
- m.addElement('body', content='hello')
- stream.send(m)
-
- event = q.expect('dbus-signal', signal='Received')
-
- # message type: normal
- assert event.args[3] == 0
- # flags: none
- assert event.args[4] == 0
- # body
- assert event.args[5] == 'hello'
+ print "GOT NewChannels Obs"
if __name__ == '__main__':
exec_test(test)
diff --git a/tests/twisted/tools/Makefile.am b/tests/twisted/tools/Makefile.am
index f908c2b..adbe42d 100644
--- a/tests/twisted/tools/Makefile.am
+++ b/tests/twisted/tools/Makefile.am
@@ -30,4 +30,4 @@ EXTRA_DIST = \
CLEANFILES = \
$(BUILT_SOURCES) \
- gabble-testing.log
+ tpl-testing.log
diff --git a/tests/twisted/tpltest.py b/tests/twisted/tpltest.py
index 7b8cdbf..845bdfd 100644
--- a/tests/twisted/tpltest.py
+++ b/tests/twisted/tpltest.py
@@ -11,7 +11,6 @@ import random
import re
import traceback
-import ns
import constants as cs
import servicetest
from servicetest import (
@@ -28,333 +27,333 @@ import dbus
NS_XMPP_SASL = 'urn:ietf:params:xml:ns:xmpp-sasl'
NS_XMPP_BIND = 'urn:ietf:params:xml:ns:xmpp-bind'
-def make_result_iq(stream, iq):
- result = IQ(stream, "result")
- result["id"] = iq["id"]
- to = iq.getAttribute('to')
- if to is not None:
- result["from"] = to
- query = iq.firstChildElement()
-
- if query:
- result.addElement((query.uri, query.name))
-
- return result
-
-def acknowledge_iq(stream, iq):
- stream.send(make_result_iq(stream, iq))
-
-def send_error_reply(stream, iq, error_stanza=None):
- result = IQ(stream, "error")
- result["id"] = iq["id"]
- query = iq.firstChildElement()
- to = iq.getAttribute('to')
- if to is not None:
- result["from"] = to
-
- if query:
- result.addElement((query.uri, query.name))
-
- if error_stanza:
- result.addChild(error_stanza)
-
- stream.send(result)
-
-def request_muc_handle(q, conn, stream, muc_jid):
- servicetest.call_async(q, conn, 'RequestHandles', 2, [muc_jid])
- event = q.expect('dbus-return', method='RequestHandles')
- return event.value[0][0]
-
-def make_muc_presence(affiliation, role, muc_jid, alias, jid=None):
- presence = domish.Element((None, 'presence'))
- presence['from'] = '%s/%s' % (muc_jid, alias)
- x = presence.addElement((ns.MUC_USER, 'x'))
- item = x.addElement('item')
- item['affiliation'] = affiliation
- item['role'] = role
- if jid is not None:
- item['jid'] = jid
- return presence
-
-def sync_stream(q, stream):
- """Used to ensure that Gabble has processed all stanzas sent to it."""
-
- iq = IQ(stream, "get")
- id = iq['id']
- iq.addElement(('http://jabber.org/protocol/disco#info', 'query'))
- stream.send(iq)
- q.expect('stream-iq', query_ns='http://jabber.org/protocol/disco#info',
- predicate=(lambda event:
- event.stanza['id'] == id and event.iq_type == 'result'))
-
-class JabberAuthenticator(xmlstream.Authenticator):
- "Trivial XML stream authenticator that accepts one username/digest pair."
-
- def __init__(self, username, password, resource=None):
- self.username = username
- self.password = password
- self.resource = resource
- xmlstream.Authenticator.__init__(self)
-
- # Patch in fix from http://twistedmatrix.com/trac/changeset/23418.
- # This monkeypatch taken from Gadget source code
- from twisted.words.xish.utility import EventDispatcher
-
- def _addObserver(self, onetime, event, observerfn, priority, *args,
- **kwargs):
- if self._dispatchDepth > 0:
- self._updateQueue.append(lambda: self._addObserver(onetime, event,
- observerfn, priority, *args, **kwargs))
-
- return self._oldAddObserver(onetime, event, observerfn, priority,
- *args, **kwargs)
-
- EventDispatcher._oldAddObserver = EventDispatcher._addObserver
- EventDispatcher._addObserver = _addObserver
-
- def streamStarted(self, root=None):
- if root:
- self.xmlstream.sid = '%x' % random.randint(1, sys.maxint)
-
- self.xmlstream.sendHeader()
- self.xmlstream.addOnetimeObserver(
- "/iq/query[@xmlns='jabber:iq:auth']", self.initialIq)
-
- def initialIq(self, iq):
- result = IQ(self.xmlstream, "result")
- result["id"] = iq["id"]
- query = result.addElement('query')
- query["xmlns"] = "jabber:iq:auth"
- query.addElement('username', content='test')
- query.addElement('password')
- query.addElement('digest')
- query.addElement('resource')
- self.xmlstream.addOnetimeObserver('/iq/query/username', self.secondIq)
- self.xmlstream.send(result)
-
- def secondIq(self, iq):
- username = xpath.queryForNodes('/iq/query/username', iq)
- assert map(str, username) == [self.username]
-
- digest = xpath.queryForNodes('/iq/query/digest', iq)
- expect = hashlib.sha1(self.xmlstream.sid + self.password).hexdigest()
- assert map(str, digest) == [expect]
-
- resource = xpath.queryForNodes('/iq/query/resource', iq)
- assertLength(1, resource)
- if self.resource is not None:
- assertEquals(self.resource, str(resource[0]))
-
- result = IQ(self.xmlstream, "result")
- result["id"] = iq["id"]
- self.xmlstream.send(result)
- self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
-
-
-class XmppAuthenticator(xmlstream.Authenticator):
- def __init__(self, username, password, resource=None):
- xmlstream.Authenticator.__init__(self)
- self.username = username
- self.password = password
- self.resource = resource
- self.authenticated = False
-
- def streamStarted(self, root=None):
- if root:
- self.xmlstream.sid = root.getAttribute('id')
-
- self.xmlstream.sendHeader()
-
- if self.authenticated:
- # Initiator authenticated itself, and has started a new stream.
-
- features = domish.Element((xmlstream.NS_STREAMS, 'features'))
- bind = features.addElement((NS_XMPP_BIND, 'bind'))
- self.xmlstream.send(features)
-
- self.xmlstream.addOnetimeObserver(
- "/iq/bind[@xmlns='%s']" % NS_XMPP_BIND, self.bindIq)
- else:
- features = domish.Element((xmlstream.NS_STREAMS, 'features'))
- mechanisms = features.addElement((NS_XMPP_SASL, 'mechanisms'))
- mechanism = mechanisms.addElement('mechanism', content='PLAIN')
- self.xmlstream.send(features)
-
- self.xmlstream.addOnetimeObserver("/auth", self.auth)
-
- def auth(self, auth):
- assert (base64.b64decode(str(auth)) ==
- '\x00%s\x00%s' % (self.username, self.password))
-
- success = domish.Element((NS_XMPP_SASL, 'success'))
- self.xmlstream.send(success)
- self.xmlstream.reset()
- self.authenticated = True
-
- def bindIq(self, iq):
- resource = xpath.queryForString('/iq/bind/resource', iq)
- if self.resource is not None:
- assertEquals(self.resource, resource)
- else:
- assert resource is not None
-
- result = IQ(self.xmlstream, "result")
- result["id"] = iq["id"]
- bind = result.addElement((NS_XMPP_BIND, 'bind'))
- jid = bind.addElement('jid', content=('test@localhost/%s' % resource))
- self.xmlstream.send(result)
-
- self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
-
-def make_stream_event(type, stanza):
- event = servicetest.Event(type, stanza=stanza)
- event.to = stanza.getAttribute("to")
- return event
-
-def make_iq_event(iq):
- event = make_stream_event('stream-iq', iq)
- event.iq_type = iq.getAttribute("type")
- event.iq_id = iq.getAttribute("id")
- query = iq.firstChildElement()
-
- if query:
- event.query = query
- event.query_ns = query.uri
- event.query_name = query.name
-
- if query.getAttribute("node"):
- event.query_node = query.getAttribute("node")
- else:
- event.query = None
-
- return event
-
-def make_presence_event(stanza):
- event = make_stream_event('stream-presence', stanza)
- event.presence_type = stanza.getAttribute('type')
- return event
-
-def make_message_event(stanza):
- event = make_stream_event('stream-message', stanza)
- event.message_type = stanza.getAttribute('type')
- return event
-
-class BaseXmlStream(xmlstream.XmlStream):
- initiating = False
- namespace = 'jabber:client'
-
- def __init__(self, event_func, authenticator):
- xmlstream.XmlStream.__init__(self, authenticator)
- self.event_func = event_func
- self.addObserver('//iq', lambda x: event_func(
- make_iq_event(x)))
- self.addObserver('//message', lambda x: event_func(
- make_message_event(x)))
- self.addObserver('//presence', lambda x: event_func(
- make_presence_event(x)))
- self.addObserver('//event/stream/authd', self._cb_authd)
-
- def _cb_authd(self, _):
- # called when stream is authenticated
- self.addObserver(
- "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']",
- self._cb_disco_iq)
- self.event_func(servicetest.Event('stream-authenticated'))
-
- def _cb_disco_iq(self, iq):
- if iq.getAttribute('to') == 'localhost':
- # add PEP support
- nodes = xpath.queryForNodes(
- "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']",
- iq)
- query = nodes[0]
- identity = query.addElement('identity')
- identity['category'] = 'pubsub'
- identity['type'] = 'pep'
-
- iq['type'] = 'result'
- iq['from'] = iq['to']
- self.send(iq)
-
- def onDocumentEnd(self):
- self.event_func(servicetest.Event('stream-closed'))
- # We don't chain up XmlStream.onDocumentEnd() because it will
- # disconnect the TCP connection making tests as
- # connect/disconnect-timeout.py not working
-
-class JabberXmlStream(BaseXmlStream):
- version = (0, 9)
-
-class XmppXmlStream(BaseXmlStream):
- version = (1, 0)
-
-class GoogleXmlStream(BaseXmlStream):
- version = (1, 0)
-
- def _cb_disco_iq(self, iq):
- if iq.getAttribute('to') == 'localhost':
- nodes = xpath.queryForNodes(
- "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']",
- iq)
- query = nodes[0]
- feature = query.addElement('feature')
- feature['var'] = ns.GOOGLE_ROSTER
- feature = query.addElement('feature')
- feature['var'] = ns.GOOGLE_JINGLE_INFO
-
- iq['type'] = 'result'
- iq['from'] = 'localhost'
- self.send(iq)
-
-def make_connection(bus, event_func, params=None):
- # Gabble accepts a resource in 'account', but the value of 'resource'
- # overrides it if there is one.
- account = 'test@localhost/%s' % re.sub(r'.*tests/twisted/', '', sys.argv[0])
- default_params = {
- 'account': account,
- 'password': 'pass',
- 'resource': 'Resource',
- 'server': 'localhost',
- 'port': dbus.UInt32(4242),
- 'fallback-socks5-proxies': dbus.Array([], signature='s'),
- }
-
- if params:
- default_params.update(params)
-
- return servicetest.make_connection(bus, event_func, 'gabble', 'jabber',
- default_params)
-
-def make_stream(event_func, authenticator=None, protocol=None, port=4242, resource=None):
- # set up Jabber server
-
- if authenticator is None:
- authenticator = XmppAuthenticator('test', 'pass', resource=resource)
-
- if protocol is None:
- protocol = XmppXmlStream
-
- stream = protocol(event_func, authenticator)
- factory = twisted.internet.protocol.Factory()
- factory.protocol = lambda *args: stream
- port = reactor.listenTCP(port, factory)
- return (stream, port)
-
-def disconnect_conn(q, conn, stream, expected_before=[], expected_after=[]):
- call_async(q, conn, 'Disconnect')
-
- tmp = expected_before + [
- EventPattern('dbus-signal', signal='StatusChanged', args=[cs.CONN_STATUS_DISCONNECTED, cs.CSR_REQUESTED]),
- EventPattern('stream-closed')]
-
- before_events = q.expect_many(*tmp)
-
- stream.sendFooter()
-
- tmp = expected_after + [EventPattern('dbus-return', method='Disconnect')]
- after_events = q.expect_many(*tmp)
-
- return before_events[:-2], after_events[:-1]
+#def make_result_iq(stream, iq):
+# result = IQ(stream, "result")
+# result["id"] = iq["id"]
+# to = iq.getAttribute('to')
+# if to is not None:
+# result["from"] = to
+# query = iq.firstChildElement()
+#
+# if query:
+# result.addElement((query.uri, query.name))
+#
+# return result
+#
+#def acknowledge_iq(stream, iq):
+# stream.send(make_result_iq(stream, iq))
+#
+#def send_error_reply(stream, iq, error_stanza=None):
+# result = IQ(stream, "error")
+# result["id"] = iq["id"]
+# query = iq.firstChildElement()
+# to = iq.getAttribute('to')
+# if to is not None:
+# result["from"] = to
+#
+# if query:
+# result.addElement((query.uri, query.name))
+#
+# if error_stanza:
+# result.addChild(error_stanza)
+#
+# stream.send(result)
+#
+#def request_muc_handle(q, conn, stream, muc_jid):
+# servicetest.call_async(q, conn, 'RequestHandles', 2, [muc_jid])
+# event = q.expect('dbus-return', method='RequestHandles')
+# return event.value[0][0]
+#
+#def make_muc_presence(affiliation, role, muc_jid, alias, jid=None):
+# presence = domish.Element((None, 'presence'))
+# presence['from'] = '%s/%s' % (muc_jid, alias)
+# x = presence.addElement((ns.MUC_USER, 'x'))
+# item = x.addElement('item')
+# item['affiliation'] = affiliation
+# item['role'] = role
+# if jid is not None:
+# item['jid'] = jid
+# return presence
+#
+#def sync_stream(q, stream):
+# """Used to ensure that Gabble has processed all stanzas sent to it."""
+#
+# iq = IQ(stream, "get")
+# id = iq['id']
+# iq.addElement(('http://jabber.org/protocol/disco#info', 'query'))
+# stream.send(iq)
+# q.expect('stream-iq', query_ns='http://jabber.org/protocol/disco#info',
+# predicate=(lambda event:
+# event.stanza['id'] == id and event.iq_type == 'result'))
+#
+#class JabberAuthenticator(xmlstream.Authenticator):
+# "Trivial XML stream authenticator that accepts one username/digest pair."
+#
+# def __init__(self, username, password, resource=None):
+# self.username = username
+# self.password = password
+# self.resource = resource
+# xmlstream.Authenticator.__init__(self)
+#
+# # Patch in fix from http://twistedmatrix.com/trac/changeset/23418.
+# # This monkeypatch taken from Gadget source code
+# from twisted.words.xish.utility import EventDispatcher
+#
+# def _addObserver(self, onetime, event, observerfn, priority, *args,
+# **kwargs):
+# if self._dispatchDepth > 0:
+# self._updateQueue.append(lambda: self._addObserver(onetime, event,
+# observerfn, priority, *args, **kwargs))
+#
+# return self._oldAddObserver(onetime, event, observerfn, priority,
+# *args, **kwargs)
+#
+# EventDispatcher._oldAddObserver = EventDispatcher._addObserver
+# EventDispatcher._addObserver = _addObserver
+#
+# def streamStarted(self, root=None):
+# if root:
+# self.xmlstream.sid = '%x' % random.randint(1, sys.maxint)
+#
+# self.xmlstream.sendHeader()
+# self.xmlstream.addOnetimeObserver(
+# "/iq/query[@xmlns='jabber:iq:auth']", self.initialIq)
+#
+# def initialIq(self, iq):
+# result = IQ(self.xmlstream, "result")
+# result["id"] = iq["id"]
+# query = result.addElement('query')
+# query["xmlns"] = "jabber:iq:auth"
+# query.addElement('username', content='test')
+# query.addElement('password')
+# query.addElement('digest')
+# query.addElement('resource')
+# self.xmlstream.addOnetimeObserver('/iq/query/username', self.secondIq)
+# self.xmlstream.send(result)
+#
+# def secondIq(self, iq):
+# username = xpath.queryForNodes('/iq/query/username', iq)
+# assert map(str, username) == [self.username]
+#
+# digest = xpath.queryForNodes('/iq/query/digest', iq)
+# expect = hashlib.sha1(self.xmlstream.sid + self.password).hexdigest()
+# assert map(str, digest) == [expect]
+#
+# resource = xpath.queryForNodes('/iq/query/resource', iq)
+# assertLength(1, resource)
+# if self.resource is not None:
+# assertEquals(self.resource, str(resource[0]))
+#
+# result = IQ(self.xmlstream, "result")
+# result["id"] = iq["id"]
+# self.xmlstream.send(result)
+# self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
+
+
+#class XmppAuthenticator(xmlstream.Authenticator):
+# def __init__(self, username, password, resource=None):
+# xmlstream.Authenticator.__init__(self)
+# self.username = username
+# self.password = password
+# self.resource = resource
+# self.authenticated = False
+#
+# def streamStarted(self, root=None):
+# if root:
+# self.xmlstream.sid = root.getAttribute('id')
+#
+# self.xmlstream.sendHeader()
+#
+# if self.authenticated:
+# # Initiator authenticated itself, and has started a new stream.
+#
+# features = domish.Element((xmlstream.NS_STREAMS, 'features'))
+# bind = features.addElement((NS_XMPP_BIND, 'bind'))
+# self.xmlstream.send(features)
+#
+# self.xmlstream.addOnetimeObserver(
+# "/iq/bind[@xmlns='%s']" % NS_XMPP_BIND, self.bindIq)
+# else:
+# features = domish.Element((xmlstream.NS_STREAMS, 'features'))
+# mechanisms = features.addElement((NS_XMPP_SASL, 'mechanisms'))
+# mechanism = mechanisms.addElement('mechanism', content='PLAIN')
+# self.xmlstream.send(features)
+#
+# self.xmlstream.addOnetimeObserver("/auth", self.auth)
+#
+# def auth(self, auth):
+# assert (base64.b64decode(str(auth)) ==
+# '\x00%s\x00%s' % (self.username, self.password))
+#
+# success = domish.Element((NS_XMPP_SASL, 'success'))
+# self.xmlstream.send(success)
+# self.xmlstream.reset()
+# self.authenticated = True
+#
+# def bindIq(self, iq):
+# resource = xpath.queryForString('/iq/bind/resource', iq)
+# if self.resource is not None:
+# assertEquals(self.resource, resource)
+# else:
+# assert resource is not None
+#
+# result = IQ(self.xmlstream, "result")
+# result["id"] = iq["id"]
+# bind = result.addElement((NS_XMPP_BIND, 'bind'))
+# jid = bind.addElement('jid', content=('test@localhost/%s' % resource))
+# self.xmlstream.send(result)
+#
+# self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
+
+#def make_stream_event(type, stanza):
+# event = servicetest.Event(type, stanza=stanza)
+# event.to = stanza.getAttribute("to")
+# return event
+#
+#def make_iq_event(iq):
+# event = make_stream_event('stream-iq', iq)
+# event.iq_type = iq.getAttribute("type")
+# event.iq_id = iq.getAttribute("id")
+# query = iq.firstChildElement()
+#
+# if query:
+# event.query = query
+# event.query_ns = query.uri
+# event.query_name = query.name
+#
+# if query.getAttribute("node"):
+# event.query_node = query.getAttribute("node")
+# else:
+# event.query = None
+#
+# return event
+#
+#def make_presence_event(stanza):
+# event = make_stream_event('stream-presence', stanza)
+# event.presence_type = stanza.getAttribute('type')
+# return event
+#
+#def make_message_event(stanza):
+# event = make_stream_event('stream-message', stanza)
+# event.message_type = stanza.getAttribute('type')
+# return event
+
+#class BaseXmlStream(xmlstream.XmlStream):
+# initiating = False
+# namespace = 'jabber:client'
+#
+# def __init__(self, event_func, authenticator):
+# xmlstream.XmlStream.__init__(self, authenticator)
+# self.event_func = event_func
+# self.addObserver('//iq', lambda x: event_func(
+# make_iq_event(x)))
+# self.addObserver('//message', lambda x: event_func(
+# make_message_event(x)))
+# self.addObserver('//presence', lambda x: event_func(
+# make_presence_event(x)))
+# self.addObserver('//event/stream/authd', self._cb_authd)
+#
+# def _cb_authd(self, _):
+# # called when stream is authenticated
+# self.addObserver(
+# "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']",
+# self._cb_disco_iq)
+# self.event_func(servicetest.Event('stream-authenticated'))
+#
+# def _cb_disco_iq(self, iq):
+# if iq.getAttribute('to') == 'localhost':
+# # add PEP support
+# nodes = xpath.queryForNodes(
+# "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']",
+# iq)
+# query = nodes[0]
+# identity = query.addElement('identity')
+# identity['category'] = 'pubsub'
+# identity['type'] = 'pep'
+#
+# iq['type'] = 'result'
+# iq['from'] = iq['to']
+# self.send(iq)
+#
+# def onDocumentEnd(self):
+# self.event_func(servicetest.Event('stream-closed'))
+# # We don't chain up XmlStream.onDocumentEnd() because it will
+# # disconnect the TCP connection making tests as
+# # connect/disconnect-timeout.py not working
+#
+#class JabberXmlStream(BaseXmlStream):
+# version = (0, 9)
+#
+#class XmppXmlStream(BaseXmlStream):
+# version = (1, 0)
+#
+#class GoogleXmlStream(BaseXmlStream):
+# version = (1, 0)
+#
+# def _cb_disco_iq(self, iq):
+# if iq.getAttribute('to') == 'localhost':
+# nodes = xpath.queryForNodes(
+# "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']",
+# iq)
+# query = nodes[0]
+# feature = query.addElement('feature')
+# feature['var'] = ns.GOOGLE_ROSTER
+# feature = query.addElement('feature')
+# feature['var'] = ns.GOOGLE_JINGLE_INFO
+#
+# iq['type'] = 'result'
+# iq['from'] = 'localhost'
+# self.send(iq)
+
+#def make_connection(bus, event_func, params=None):
+# # Gabble accepts a resource in 'account', but the value of 'resource'
+# # overrides it if there is one.
+# account = 'test@localhost/%s' % re.sub(r'.*tests/twisted/', '', sys.argv[0])
+# default_params = {
+# 'account': account,
+# 'password': 'pass',
+# 'resource': 'Resource',
+# 'server': 'localhost',
+# 'port': dbus.UInt32(4242),
+# 'fallback-socks5-proxies': dbus.Array([], signature='s'),
+# }
+#
+# if params:
+# default_params.update(params)
+#
+# return servicetest.make_connection(bus, event_func, 'gabble', 'jabber',
+# default_params)
+#
+#def make_stream(event_func, authenticator=None, protocol=None, port=4242, resource=None):
+# # set up Jabber server
+#
+# if authenticator is None:
+# authenticator = XmppAuthenticator('test', 'pass', resource=resource)
+#
+# if protocol is None:
+# protocol = XmppXmlStream
+#
+# stream = protocol(event_func, authenticator)
+# factory = twisted.internet.protocol.Factory()
+# factory.protocol = lambda *args: stream
+# port = reactor.listenTCP(port, factory)
+# return (stream, port)
+#
+#def disconnect_conn(q, conn, stream, expected_before=[], expected_after=[]):
+# call_async(q, conn, 'Disconnect')
+#
+# tmp = expected_before + [
+# EventPattern('dbus-signal', signal='StatusChanged', args=[cs.CONN_STATUS_DISCONNECTED, cs.CSR_REQUESTED]),
+# EventPattern('stream-closed')]
+#
+# before_events = q.expect_many(*tmp)
+#
+# stream.sendFooter()
+#
+# tmp = expected_after + [EventPattern('dbus-return', method='Disconnect')]
+# after_events = q.expect_many(*tmp)
+#
+# return before_events[:-2], after_events[:-1]
def exec_test_deferred(fun, params, protocol=None, timeout=None,
authenticator=None):
@@ -372,16 +371,11 @@ def exec_test_deferred(fun, params, protocol=None, timeout=None,
or '-v' in sys.argv)
bus = dbus.SessionBus()
- conn = make_connection(bus, queue.append, params)
- resource = params.get('resource') if params is not None else None
- print "resource!"
- (stream, port) = make_stream(queue.append, protocol=protocol,
- authenticator=authenticator, resource=resource)
error = None
try:
- fun(queue, bus, conn, stream)
+ fun(queue, bus)
except Exception, e:
traceback.print_exc()
error = e
@@ -389,28 +383,6 @@ def exec_test_deferred(fun, params, protocol=None, timeout=None,
if colourer:
sys.stdout = colourer.fh
- d = port.stopListening()
-
- if error is None:
- d.addBoth((lambda *args: reactor.crash()))
- else:
- # please ignore the POSIX behind the curtain
- d.addBoth((lambda *args: os._exit(1)))
-
- # Does the Connection object still exist?
- if not bus.name_has_owner(conn.object.bus_name):
- # Connection has already been disconnected and destroyed
- return
-
- try:
- if conn.GetStatus() == cs.CONN_STATUS_CONNECTED:
- # Connection is connected, properly disconnect it
- disconnect_conn(queue, conn, stream)
- else:
- # Connection is not connected, call Disconnect() to destroy it
- conn.Disconnect()
- except dbus.DBusException, e:
- pass
def exec_test(fun, params=None, protocol=None, timeout=None,
authenticator=None):