summaryrefslogtreecommitdiff
path: root/tests/twisted/tubes/accept-muc-stream-tube.py
blob: 670aa7fdf67729782f4606fde438e2b99e7a8945 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
"""Test IBB stream tube support in the context of a MUC."""

import sys

import dbus

from servicetest import call_async, EventPattern, assertEquals, assertSameSets
from gabbletest import acknowledge_iq, make_muc_presence, send_error_reply, disconnect_conn
import constants as cs
import ns
import tubetestutil as t
from bytestream import create_from_si_offer, announce_socks5_proxy, BytestreamS5BRelay, BytestreamS5BRelayBugged

from twisted.words.xish import xpath

sample_parameters = dbus.Dictionary({
    's': 'hello',
    'ay': dbus.ByteArray('hello'),
    'u': dbus.UInt32(123),
    'i': dbus.Int32(-123),
    }, signature='sv')

def test(q, bus, conn, stream, bytestream_cls,
        address_type, access_control, access_control_param):
    if bytestream_cls in [BytestreamS5BRelay, BytestreamS5BRelayBugged]:
        # disable SOCKS5 relay tests because proxy can't be used with muc
        # contacts atm
        return

    if access_control == cs.SOCKET_ACCESS_CONTROL_CREDENTIALS:
        print "Skip Socket_Access_Control_Credentials (fdo #45445)"
        return

    iq_event, disco_event = q.expect_many(
        EventPattern('stream-iq', to=None, query_ns='vcard-temp',
            query_name='vCard'),
        EventPattern('stream-iq', to='localhost', query_ns=ns.DISCO_ITEMS))

    acknowledge_iq(stream, iq_event.stanza)

    announce_socks5_proxy(q, stream, disco_event.stanza)

    # join the muc
    call_async(q, conn.Requests, 'CreateChannel', {
            cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
            cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
            cs.TARGET_ID: 'chat@conf.localhost'})

    q.expect_many(
        EventPattern('dbus-signal', signal='MembersChanged',
            predicate=lambda e:
                e.args[0] == [] and         # added
                e.args[1] == [] and         # removed
                e.args[2] == [] and         # local pending
                len(e.args[3]) == 1 and     # remote pending
                e.args[4].get('actor', 0) == 0 and
                e.args[4].get('change-reason', 0) == 0 and
                e.args[4]['contact-ids'][e.args[3][0]] == 'chat@conf.localhost/test'),
        EventPattern('stream-presence', to='chat@conf.localhost/test'))

    # Send presence for other member of room.
    stream.send(make_muc_presence('owner', 'moderator', 'chat@conf.localhost', 'bob'))

    # Send presence for own membership of room.
    stream.send(make_muc_presence('none', 'participant', 'chat@conf.localhost', 'test'))

    event = q.expect('dbus-signal', signal='MembersChanged',
            predicate=lambda e:
                len(e.args[0]) == 2 and     # added
                e.args[1] == [] and         # removed
                e.args[2] == [] and         # local pending
                e.args[3] == [] and         # remote pending
                e.args[4].get('actor', 0) == 0 and
                e.args[4].get('change-reason', 0) == 0 and
                set([e.args[4]['contact-ids'][h] for h in e.args[0]]) ==
                set(['chat@conf.localhost/test', 'chat@conf.localhost/bob']))

    for h in event.args[0]:
        if event.args[4]['contact-ids'][h] == 'chat@conf.localhost/bob':
            bob_handle = h

    event = q.expect('dbus-return', method='CreateChannel')

    # Bob offers a stream tube
    stream_tube_id = 666
    presence = make_muc_presence('owner', 'moderator', 'chat@conf.localhost', 'bob')
    tubes = presence.addElement((ns.TUBES, 'tubes'))
    tube = tubes.addElement((None, 'tube'))
    tube['type'] = 'stream'
    tube['service'] = 'echo'
    tube['id'] = str(stream_tube_id)
    parameters = tube.addElement((None, 'parameters'))
    parameter = parameters.addElement((None, 'parameter'))
    parameter['name'] = 's'
    parameter['type'] = 'str'
    parameter.addContent('hello')
    parameter = parameters.addElement((None, 'parameter'))
    parameter['name'] = 'ay'
    parameter['type'] = 'bytes'
    parameter.addContent('aGVsbG8=')
    parameter = parameters.addElement((None, 'parameter'))
    parameter['name'] = 'u'
    parameter['type'] = 'uint'
    parameter.addContent('123')
    parameter = parameters.addElement((None, 'parameter'))
    parameter['name'] = 'i'
    parameter['type'] = 'int'
    parameter.addContent('-123')

    stream.send(presence)

    # text channel
    new_event = q.expect('dbus-signal', signal='NewChannel')

    path, props = new_event.args
    assert props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_TEXT

    def new_chan_predicate(e):
        path, props = e.args
        return props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_STREAM_TUBE

    # tube channel is announced
    new_event = q.expect('dbus-signal', signal='NewChannel',
                         predicate=new_chan_predicate)

    path, props = new_event.args
    assert props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_STREAM_TUBE
    assert props[cs.INITIATOR_HANDLE] == bob_handle
    assert props[cs.INITIATOR_ID] == 'chat@conf.localhost/bob'
    assertSameSets([cs.CHANNEL_IFACE_GROUP, cs.CHANNEL_IFACE_TUBE],
            props[cs.INTERFACES])
    assert props[cs.REQUESTED] == False
    assert props[cs.TARGET_ID] == 'chat@conf.localhost'
    assert props[cs.STREAM_TUBE_SERVICE] == 'echo'
    assert props[cs.TUBE_PARAMETERS] == {'s': 'hello', 'ay': 'hello', 'u': 123, 'i': -123}
    assert access_control in \
            props[cs.STREAM_TUBE_SUPPORTED_SOCKET_TYPES][address_type]

    tube_chan = bus.get_object(conn.bus_name, path)
    tube_props = tube_chan.GetAll(cs.CHANNEL_IFACE_TUBE, dbus_interface=cs.PROPERTIES_IFACE,
        byte_arrays=True)
    tube_iface = dbus.Interface(tube_chan, cs.CHANNEL_TYPE_STREAM_TUBE)
    assert tube_props['Parameters'] == sample_parameters
    assert tube_props['State'] == cs.TUBE_CHANNEL_STATE_LOCAL_PENDING

    # Accept the tube
    call_async(q, tube_iface, 'Accept',
        address_type, access_control, access_control_param, byte_arrays=True)

    accept_return_event, _ = q.expect_many(
        EventPattern('dbus-return', method='Accept'),
        EventPattern('dbus-signal', signal='TubeChannelStateChanged',
            args=[2]))

    address = accept_return_event.value[0]

    socket_event, si_event, conn_id = t.connect_to_cm_socket(q, 'chat@conf.localhost/bob',
        address_type, address, access_control, access_control_param)

    protocol = socket_event.protocol
    protocol.sendData("hello initiator")

    def accept_tube_si_connection():
        bytestream, profile = create_from_si_offer(stream, q, bytestream_cls, si_event.stanza,
                'chat@conf.localhost/test')

        assert profile == ns.TUBES

        muc_stream_node = xpath.queryForNodes('/iq/si/muc-stream[@xmlns="%s"]' %
            ns.TUBES, si_event.stanza)[0]
        assert muc_stream_node is not None
        assert muc_stream_node['tube'] == str(stream_tube_id)

        # set the real jid of the target as 'to' because the XMPP server changes
        # it when delivering the IQ
        result, si = bytestream.create_si_reply(si_event.stanza, 'test@localhost/Resource')
        si.addElement((ns.TUBES, 'tube'))
        stream.send(result)

        bytestream.wait_bytestream_open()
        return bytestream

    bytestream = accept_tube_si_connection()

    binary = bytestream.get_data()
    assert binary == 'hello initiator'

    # reply on the socket
    bytestream.send_data('hi joiner!')

    q.expect('socket-data', protocol=protocol, data="hi joiner!")

    # peer closes the bytestream
    bytestream.close()
    e = q.expect('dbus-signal', signal='ConnectionClosed')
    assertEquals(conn_id, e.args[0])
    assertEquals(cs.CONNECTION_LOST, e.args[1])

    # establish another tube connection
    socket_event, si_event, conn_id = t.connect_to_cm_socket(q, 'chat@conf.localhost/bob',
        address_type, address, access_control, access_control_param)

    # bytestream is refused
    send_error_reply(stream, si_event.stanza)
    e, _ = q.expect_many(
        EventPattern('dbus-signal', signal='ConnectionClosed'),
        EventPattern('socket-disconnected'))
    assertEquals(conn_id, e.args[0])
    assertEquals(cs.CONNECTION_REFUSED, e.args[1])

    # establish another tube connection
    socket_event, si_event, conn_id = t.connect_to_cm_socket(q, 'chat@conf.localhost/bob',
        address_type, address, access_control, access_control_param)

    protocol = socket_event.protocol
    bytestream = accept_tube_si_connection()

    # disconnect local socket
    protocol.transport.loseConnection()
    e, _ = q.expect_many(
        EventPattern('dbus-signal', signal='ConnectionClosed'),
        EventPattern('socket-disconnected'))
    assertEquals(conn_id, e.args[0])
    assertEquals(cs.CANCELLED, e.args[1])

    # OK, we're done
    disconnect_conn(q, conn, stream)

if __name__ == '__main__':
    t.exec_stream_tube_test(test)