summaryrefslogtreecommitdiff
path: root/tests/twisted/caps/trust-thyself.py
blob: fda08ec3e566071aee7c7d5a6ca0671bcee9ea7d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""
Test that we cache our own capabilities, so that we don't disco other people
with the same caps hash or ext='' bundles.
"""
from twisted.words.xish import xpath
from twisted.words.protocols.jabber.client import IQ

from gabbletest import exec_test, make_presence, sync_stream
from servicetest import EventPattern, assertEquals, assertNotEquals
import ns
import constants as cs

from config import VOIP_ENABLED

if not VOIP_ENABLED:
    print "NOTE: built with --disable-voip"
    raise SystemExit(77)

def test(q, bus, conn, stream):
    self_presence = q.expect('stream-presence')

    c = xpath.queryForNodes('/presence/c', self_presence.stanza)[0]

    jid = 'lol@great.big/omg'

    # Gabble shouldn't send any disco requests to our contact during this test.
    q.forbid_events([
        EventPattern('stream-iq', to=jid, iq_type='get',
            query_ns=ns.DISCO_INFO),
    ])

    # Check that Gabble doesn't disco other clients with the same caps hash.
    p = make_presence(jid,
        caps={'node': c['node'],
              'hash': c['hash'],
              'ver':  c['ver'],
             })
    stream.send(p)
    sync_stream(q, stream)

    # Check that Gabble doesn't disco its own ext='' bundles (well, its own
    # bundles as advertised by Gabbles that don't do hashed caps)
    p = make_presence(jid,
        caps={'node': c['node'],
              'ver':  c['ver'],
              # omitting hash='' so Gabble doesn't ignore ext=''
              'ext':  'voice-v1 video-v1',
            })
    stream.send(p)
    sync_stream(q, stream)

    conn.ContactCapabilities.UpdateCapabilities([
        (cs.CLIENT + '.AbiWord', [
        { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAM_TUBE,
            cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
            cs.STREAM_TUBE_SERVICE: 'x-abiword' },
        { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAM_TUBE,
            cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
            cs.STREAM_TUBE_SERVICE: 'x-abiword' },
        ], []),
        (cs.CLIENT + '.KCall', [
        { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_CALL },
        { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_CALL, cs.CALL_INITIAL_AUDIO: True},
        { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_CALL, cs.CALL_INITIAL_VIDEO: True},
        ], [
            cs.CHANNEL_TYPE_CALL + '/gtalk-p2p',
            cs.CHANNEL_TYPE_CALL + '/ice-udp',
            cs.CHANNEL_TYPE_CALL + '/video/h264',
            ]),
        ])

    self_presence = q.expect('stream-presence')
    c_ = xpath.queryForNodes('/presence/c', self_presence.stanza)[0]
    assertNotEquals(c['ver'], c_['ver'])

    for suffix in [c['ver'], 'voice-v1', 'video-v1', 'camera-v1', 'share-v1',
            'pmuc-v1'] + list(c_['ext'].split()):
        # But then someone asks us for our old caps
        iq = IQ(stream, 'get')
        iq['from'] = jid
        query = iq.addElement((ns.DISCO_INFO, 'query'))
        query['node'] = c['node'] + '#' + suffix
        stream.send(iq)

        # Gabble should still know what they are, and reply. This is
        # actually quite important: there's a bug in iChat where if you
        # return an error to a disco query, it just asks again, and again,
        # and again...
        reply = q.expect('stream-iq', to=jid)
        assertEquals('result', reply.iq_type)

if __name__ == '__main__':
    exec_test(test)