summaryrefslogtreecommitdiff
path: root/salut/tests/twisted/xmppstream.py
diff options
context:
space:
mode:
Diffstat (limited to 'salut/tests/twisted/xmppstream.py')
-rw-r--r--salut/tests/twisted/xmppstream.py257
1 files changed, 257 insertions, 0 deletions
diff --git a/salut/tests/twisted/xmppstream.py b/salut/tests/twisted/xmppstream.py
new file mode 100644
index 000000000..63c55bcb5
--- /dev/null
+++ b/salut/tests/twisted/xmppstream.py
@@ -0,0 +1,257 @@
+
+"""
+Infrastructure code for testing. Implements incoming and outgoing xml/xmpp
+streams
+"""
+
+import servicetest
+from servicetest import Event, EventPattern
+import twisted
+from twisted.words.xish import domish, xpath, xmlstream
+from twisted.internet.protocol import Factory, ClientFactory
+from twisted.internet import reactor
+
+from ipv6 import listenTCP6, connectTCP6
+
+NS_STREAMS = 'http://etherx.jabber.org/streams'
+
+def make_stream_event(type, stanza):
+ event = servicetest.Event(type, stanza=stanza)
+ if stanza.hasAttribute("to"):
+ event.to = stanza.getAttribute("to")
+ else:
+ event.to = None
+
+ if stanza.hasAttribute("from"):
+ event.from_ = stanza.getAttribute("from")
+ else:
+ event.from_ = None
+
+ event.name = event.to
+ event.remote_name = event.from_
+
+ return event
+
+def make_iq_event(iq):
+ event = make_stream_event('stream-iq', iq)
+ event.iq_type = iq.getAttribute("type")
+ event.iq_id = iq.getAttribute("id")
+ query = iq.firstChildElement()
+
+ if query:
+ event.query = query
+ event.query_ns = query.uri
+ event.query_name = query.name
+
+ if query.getAttribute("node"):
+ event.query_node = query.getAttribute("node")
+ else:
+ event.query = None
+
+ return event
+
+def make_presence_event(stanza):
+ event = make_stream_event('stream-presence', stanza)
+ event.presence_type = stanza.getAttribute('type')
+
+ statuses = xpath.queryForNodes('/presence/status', stanza)
+
+ if statuses:
+ event.presence_status = str(statuses[0])
+
+ return event
+
+def make_message_event(stanza):
+ event = make_stream_event('stream-message', stanza)
+ event.message_type = stanza.getAttribute('type')
+ return event
+
+class BaseXmlStream(xmlstream.XmlStream):
+ prefixes = { NS_STREAMS: 'stream' }
+ version = "1.0"
+ namespace = 'jabber:client'
+
+ def __init__(self, event_function, name = None, remote_name = None):
+ xmlstream.XmlStream.__init__(self)
+
+ self.name = name
+ self.remote_name = remote_name
+ self.event_func = event_function
+
+ self.event_function = event_function
+ self.addObserver(xmlstream.STREAM_START_EVENT,
+ lambda *args: self.event(Event('stream-opened')))
+ self.addObserver('//features', lambda x: self.event(
+ make_stream_event('stream-features', x)))
+ self.addObserver('//iq', lambda x: self.event(
+ make_iq_event(x)))
+ self.addObserver('//message', lambda x: self.event(
+ make_message_event(x)))
+ self.addObserver('//presence', lambda x: self.event(
+ make_presence_event(x)))
+
+ def send_header(self):
+ root = domish.Element((NS_STREAMS, 'stream'), 'jabber:client')
+ if self.name is not None:
+ root['from'] = self.name
+ if self.remote_name is not None:
+ root['to'] = self.remote_name
+ root['version'] = self.version
+ self.send(root.toXml(closeElement = 0, prefixes=self.prefixes))
+
+ def event(self, e):
+ e.connection = self
+ self.event_function(e)
+
+ def send(self, obj):
+ if domish.IElement.providedBy(obj):
+ if self.name != None:
+ obj["from"] = self.name
+ if self.remote_name != None:
+ obj["to"] = self.remote_name
+ obj = obj.toXml(prefixes=self.prefixes)
+
+ xmlstream.XmlStream.send(self, obj)
+
+
+class IncomingXmppStream(BaseXmlStream):
+ def __init__(self, event_func, name):
+ BaseXmlStream.__init__(self, event_func, name, None)
+
+ def onDocumentStart(self, rootElement):
+ # Use the fact that it's always salut that connects, so it sends a
+ # proper opening
+ assert rootElement.name == "stream"
+ assert rootElement.uri == NS_STREAMS
+
+ assert rootElement.hasAttribute("from")
+ assert rootElement.hasAttribute("to")
+ if self.name is not None:
+ assert rootElement["to"] == self.name, self.name
+
+ assert rootElement.hasAttribute("version")
+ assert rootElement["version"] == "1.0"
+
+ self.remote_name = rootElement["from"]
+ self.send_header()
+ self.send_features()
+ BaseXmlStream.onDocumentStart(self, rootElement)
+
+ def send_features(self):
+ features = domish.Element((NS_STREAMS, 'features'))
+ self.send(features)
+
+class IncomingXmppFactory(Factory):
+ def buildProtocol(self, addr):
+ p = self.protocol()
+ p.factory = self
+ e = Event('incoming-connection', listener = self)
+ p.event(e)
+ return p
+
+def setup_stream_listener(queue, name, port = 0, protocol = None):
+ if protocol == None:
+ protocol = IncomingXmppStream
+
+ factory = IncomingXmppFactory()
+ factory.protocol = lambda *args: protocol(queue.append, name)
+ port = reactor.listenTCP(port, factory)
+
+ return (factory, port.getHost().port)
+
+def setup_stream_listener6(queue, name, port = 0, protocol = None):
+ if protocol == None:
+ protocol = IncomingXmppStream
+
+ factory = IncomingXmppFactory()
+ factory.protocol = lambda *args: protocol(queue.append, name)
+ port = listenTCP6(port, factory)
+
+ return (factory, port.getHost().port)
+
+class OutgoingXmppStream(BaseXmlStream):
+ def __init__(self, event_function, name, remote_name):
+ BaseXmlStream.__init__(self, event_function, name, remote_name)
+ self.addObserver(xmlstream.STREAM_CONNECTED_EVENT, self.connected)
+
+ def connected (self, stream):
+ e = Event('connection-result', succeeded = True)
+ self.event(e)
+
+ self.send_header()
+
+class OutgoingXmppiChatStream(OutgoingXmppStream):
+ def __init__(self, event_function, name, remote_name):
+ # set name and remote_name as None as iChat doesn't send 'to' and
+ # 'from' attributes.
+ OutgoingXmppStream.__init__(self, event_function, None, None)
+
+class IncomingXmppiChatStream(IncomingXmppStream):
+ def __init__(self, event_func, name):
+ # set name to None as iChat doesn't send 'from' attribute.
+ IncomingXmppStream.__init__(self, event_func, None)
+
+class OutgoingXmppFactory(ClientFactory):
+ def __init__(self, event_function):
+ self.event_func = event_function
+
+ def clientConnectionFailed(self, connector, reason):
+ ClientFactory.clientConnectionFailed(self, connector, reason)
+ e = Event('connection-result', succeeded = False, reason = reason)
+ self.event_func(e)
+
+def connect_to_stream(queue, name, remote_name, host, port, protocol = None):
+ if protocol == None:
+ protocol = OutgoingXmppStream
+
+ p = protocol(queue.append, name, remote_name)
+
+ factory = OutgoingXmppFactory(queue.append)
+ factory.protocol = lambda *args: p
+ reactor.connectTCP(host, port, factory)
+
+ return p
+
+def connect_to_stream6(queue, name, remote_name, host, port, protocol = None):
+ if protocol == None:
+ protocol = OutgoingXmppStream
+
+ p = protocol(queue.append, name, remote_name)
+
+ factory = OutgoingXmppFactory(queue.append)
+ factory.protocol = lambda *args: p
+ connectTCP6(reactor, host, port, factory)
+
+ return p
+
+if __name__ == '__main__':
+ def run_test():
+ q = servicetest.IteratingEventQueue()
+ # Set verboseness if needed for debugging
+ #q.verbose = True
+
+ (listener, port) = setup_stream_listener(q, "incoming")
+ outbound = connect_to_stream(q, "outgoing",
+ "incoming", "localhost", port)
+
+ inbound = q.expect('incoming-connection',
+ listener = listener).connection
+
+ # inbound stream is opened first, then outbounds stream is opened and
+ # receive features
+ q.expect('stream-opened', connection = inbound)
+ q.expect('stream-opened', connection = outbound)
+ q.expect('stream-features', connection = outbound)
+
+
+ message = domish.Element(('','message'))
+ message.addElement('body', content="test123")
+ outbound.send(message)
+
+ e = q.expect('stream-message', connection=inbound)
+
+ # twisting twisted
+ reactor.stop()
+
+ reactor.callLater(0.1, run_test)
+ reactor.run()