""" Infrastructure code for testing Gabble by pretending to be a Jabber server. """ import base64 import os import sha import sys import time import servicetest import twisted from twisted.words.xish import domish, xpath from twisted.words.protocols.jabber.client import IQ from twisted.words.protocols.jabber import xmlstream from twisted.internet import reactor import dbus NS_XMPP_SASL = 'urn:ietf:params:xml:ns:xmpp-sasl' NS_XMPP_BIND = 'urn:ietf:params:xml:ns:xmpp-bind' def make_result_iq(stream, iq): result = IQ(stream, "result") result["id"] = iq["id"] query = iq.firstChildElement() if query: result.addElement((query.uri, query.name)) return result def acknowledge_iq(stream, iq): stream.send(make_result_iq(stream, iq)) def sync_stream(q, stream): """Used to ensure that Gabble has processed all stanzas sent to it.""" iq = IQ(stream, "get") iq.addElement(('http://jabber.org/protocol/disco#info', 'query')) stream.send(iq) q.expect('stream-iq', query_ns='http://jabber.org/protocol/disco#info') class JabberAuthenticator(xmlstream.Authenticator): "Trivial XML stream authenticator that accepts one username/digest pair." def __init__(self, username, password): self.username = username self.password = password xmlstream.Authenticator.__init__(self) def streamStarted(self, root=None): if root: self.xmlstream.sid = root.getAttribute('id') self.xmlstream.sendHeader() self.xmlstream.addOnetimeObserver( "/iq/query[@xmlns='jabber:iq:auth']", self.initialIq) def initialIq(self, iq): result = IQ(self.xmlstream, "result") result["id"] = iq["id"] query = result.addElement('query') query["xmlns"] = "jabber:iq:auth" query.addElement('username', content='test') query.addElement('password') query.addElement('digest') query.addElement('resource') self.xmlstream.addOnetimeObserver('/iq/query/username', self.secondIq) self.xmlstream.send(result) def secondIq(self, iq): username = xpath.queryForNodes('/iq/query/username', iq) assert map(str, username) == [self.username] digest = xpath.queryForNodes('/iq/query/digest', iq) expect = sha.sha(self.xmlstream.sid + self.password).hexdigest() assert map(str, digest) == [expect] resource = xpath.queryForNodes('/iq/query/resource', iq) assert map(str, resource) == ['Resource'] result = IQ(self.xmlstream, "result") result["id"] = iq["id"] self.xmlstream.send(result) self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT) class XmppAuthenticator(xmlstream.Authenticator): def __init__(self, username, password): xmlstream.Authenticator.__init__(self) self.username = username self.password = password self.authenticated = False def streamStarted(self, root=None): if root: self.xmlstream.sid = root.getAttribute('id') self.xmlstream.sendHeader() if self.authenticated: # Initiator authenticated itself, and has started a new stream. features = domish.Element((xmlstream.NS_STREAMS, 'features')) bind = features.addElement((NS_XMPP_BIND, 'bind')) self.xmlstream.send(features) self.xmlstream.addOnetimeObserver( "/iq/bind[@xmlns='%s']" % NS_XMPP_BIND, self.bindIq) else: features = domish.Element((xmlstream.NS_STREAMS, 'features')) mechanisms = features.addElement((NS_XMPP_SASL, 'mechanisms')) mechanism = mechanisms.addElement('mechanism', content='PLAIN') self.xmlstream.send(features) self.xmlstream.addOnetimeObserver("/auth", self.auth) def auth(self, auth): assert (base64.b64decode(str(auth)) == '\x00%s\x00%s' % (self.username, self.password)) success = domish.Element((NS_XMPP_SASL, 'success')) self.xmlstream.send(success) self.xmlstream.reset() self.authenticated = True def bindIq(self, iq): assert xpath.queryForString('/iq/bind/resource', iq) == 'Resource' result = IQ(self.xmlstream, "result") result["id"] = iq["id"] bind = result.addElement((NS_XMPP_BIND, 'bind')) jid = bind.addElement('jid', content='test@localhost/Resource') self.xmlstream.send(result) self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT) def make_stream_event(type, stanza): event = servicetest.Event(type, stanza=stanza) event.to = stanza.getAttribute("to") return event def make_iq_event(iq): event = make_stream_event('stream-iq', iq) event.iq_type = iq.getAttribute("type") query = iq.firstChildElement() if query: event.query = query event.query_ns = query.uri event.query_name = query.name if query.getAttribute("node"): event.query_node = query.getAttribute("node") return event def make_presence_event(stanza): event = make_stream_event('stream-presence', stanza) event.presence_type = stanza.getAttribute('type') return event def make_message_event(stanza): event = make_stream_event('stream-message', stanza) event.message_type = stanza.getAttribute('type') return event class BaseXmlStream(xmlstream.XmlStream): initiating = False namespace = 'jabber:client' def __init__(self, event_func, authenticator): xmlstream.XmlStream.__init__(self, authenticator) self.event_func = event_func self.addObserver('//iq', lambda x: event_func( make_iq_event(x))) self.addObserver('//message', lambda x: event_func( make_message_event(x))) self.addObserver('//presence', lambda x: event_func( make_presence_event(x))) self.addObserver('//event/stream/authd', self._cb_authd) def _cb_authd(self, _): # called when stream is authenticated self.addObserver( "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']", self._cb_disco_iq) self.event_func(servicetest.Event('stream-authenticated')) def _cb_disco_iq(self, iq): if iq.getAttribute('to') == 'localhost': # add PEP support nodes = xpath.queryForNodes( "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']", iq) query = nodes[0] identity = query.addElement('identity') identity['category'] = 'pubsub' identity['type'] = 'pep' iq['type'] = 'result' self.send(iq) class JabberXmlStream(BaseXmlStream): version = (0, 9) class XmppXmlStream(BaseXmlStream): version = (1, 0) def make_connection(bus, event_func, params=None): default_params = { 'account': 'test@localhost/Resource', 'password': 'pass', 'resource': 'Resource', 'server': 'localhost', 'port': dbus.UInt32(4242), } if params: default_params.update(params) return servicetest.make_connection(bus, event_func, 'gabble', 'jabber', default_params) def make_stream(event_func, authenticator=None, protocol=None, port=4242): # set up Jabber server if authenticator is None: authenticator = JabberAuthenticator('test', 'pass') if protocol is None: protocol = JabberXmlStream stream = protocol(event_func, authenticator) factory = twisted.internet.protocol.Factory() factory.protocol = lambda *args: stream port = reactor.listenTCP(port, factory) return (stream, port) def go(params=None, authenticator=None, protocol=None, start=None): # hack to ease debugging domish.Element.__repr__ = domish.Element.toXml bus = dbus.SessionBus() handler = servicetest.EventTest() conn = make_connection(bus, handler.handle_event, params) (stream, _) = make_stream(handler.handle_event, authenticator, protocol) handler.data = { 'bus': bus, 'conn': conn, 'conn_iface': dbus.Interface(conn, 'org.freedesktop.Telepathy.Connection'), 'stream': stream} handler.data['test'] = handler handler.verbose = (os.environ.get('CHECK_TWISTED_VERBOSE', '') != '') map(handler.expect, servicetest.load_event_handlers()) if '-v' in sys.argv: handler.verbose = True if start is None: handler.data['conn'].Connect() else: start(handler.data) reactor.run() def install_colourer(): def red(s): return '\x1b[31m%s\x1b[0m' % s def green(s): return '\x1b[32m%s\x1b[0m' % s patterns = { 'handled': green, 'not handled': red, } class Colourer: def __init__(self, fh, patterns): self.fh = fh self.patterns = patterns def write(self, s): f = self.patterns.get(s, lambda x: x) self.fh.write(f(s)) sys.stdout = Colourer(sys.stdout, patterns) return sys.stdout def exec_test_deferred (fun, params, protocol=None, timeout=None): # hack to ease debugging domish.Element.__repr__ = domish.Element.toXml colourer = None if sys.stdout.isatty(): colourer = install_colourer() queue = servicetest.IteratingEventQueue(timeout) queue.verbose = ( os.environ.get('CHECK_TWISTED_VERBOSE', '') != '' or '-v' in sys.argv) bus = dbus.SessionBus() conn = make_connection(bus, queue.append, params) (stream, port) = make_stream(queue.append, protocol=protocol) error = None try: fun(queue, bus, conn, stream) except Exception, e: import traceback traceback.print_exc() error = e try: if colourer: sys.stdout = colourer.fh d = port.stopListening() if error is None: d.addBoth((lambda *args: reactor.crash())) else: # please ignore the POSIX behind the curtain d.addBoth((lambda *args: os._exit(1))) conn.Disconnect() if 'GABBLE_TEST_REFDBG' in os.environ: # we have to wait that Gabble timeouts so the process is properly # exited and refdbg can generates its report time.sleep(5.5) except dbus.DBusException, e: pass def exec_test(fun, params=None, protocol=None, timeout=None): reactor.callWhenRunning (exec_test_deferred, fun, params, protocol, timeout) reactor.run() # Useful routines for server-side vCard handling current_vcard = domish.Element(('vcard-temp', 'vCard')) def handle_get_vcard(event, data): iq = event.stanza if iq['type'] != 'get': return False if iq.uri != 'jabber:client': return False vcard = list(iq.elements())[0] if vcard.name != 'vCard': return False # Send back current vCard new_iq = IQ(data['stream'], 'result') new_iq['id'] = iq['id'] new_iq.addChild(current_vcard) data['stream'].send(new_iq) return True def handle_set_vcard(event, data): global current_vcard iq = event.stanza if iq['type'] != 'set': return False if iq.uri != 'jabber:client': return False vcard = list(iq.elements())[0] if vcard.name != 'vCard': return False current_vcard = iq.firstChildElement() new_iq = IQ(data['stream'], 'result') new_iq['id'] = iq['id'] data['stream'].send(new_iq) return True def _elem_add(elem, *children): for child in children: if isinstance(child, domish.Element): elem.addChild(child) elif isinstance(child, unicode): elem.addContent(child) else: raise ValueError('invalid child object %r', child) def elem(a, b=None, **kw): r""" >>> elem('foo')().toXml() u'' >>> elem('foo', x='1')().toXml() u"" >>> elem('foo', x='1')(u'hello').toXml() u"hello" >>> elem('foo', x='1')(u'hello', ... elem('http://foo.org', 'bar', y='2')(u'bye')).toXml() u"hellobye" """ class _elem(domish.Element): def __call__(self, *children): _elem_add(self, *children) return self if b is not None: elem = _elem((a, b)) else: elem = _elem((None, a)) for k, v in kw.iteritems(): if k == 'from_': elem['from'] = v else: elem[k] = v return elem def elem_iq(server, type, **kw): class _iq(IQ): def __call__(self, *children): _elem_add(self, *children) return self iq = _iq(server, type) for k, v in kw.iteritems(): if k == 'from_': iq['from'] = v else: iq[k] = v return iq