1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
import dbus
from servicetest import (
assertEquals, assertNotEquals, assertSameSets,
call_async, EventPattern, wrap_channel
)
from gabbletest import exec_test, acknowledge_iq, make_muc_presence
import constants as cs
from twisted.words.xish import xpath
import ns
from mucutil import join_muc_and_check
import tubetestutil as t
def test(q, bus, conn, stream, access_control):
iq_event = q.expect('stream-iq', to=None, query_ns='vcard-temp',
query_name='vCard')
acknowledge_iq(stream, iq_event.stanza)
muc = 'chat@conf.localhost'
_, test_handle, bob_handle = \
join_muc_and_check(q, bus, conn, stream, muc)
# Bob offers a stream tube
bob_bus_name = ':2.Ym9i'
presence = make_muc_presence('owner', 'moderator', 'chat@conf.localhost', 'bob')
tubes = presence.addElement((ns.TUBES, 'tubes'))
tube = tubes.addElement((None, 'tube'))
tube['type'] = 'dbus'
tube['initiator'] = 'chat@conf.localhost/bob'
tube['stream-id'] = '10'
tube['id'] = '1'
tube['service'] = 'com.example.Test'
tube['dbus-name'] = bob_bus_name
parameters = tube.addElement((None, 'parameters'))
parameter = parameters.addElement((None, 'parameter'))
parameter['type'] = 'str'
parameter['name'] = 'foo'
parameter.addContent('bar')
stream.send(presence)
# tube channel is created
def new_chan_predicate(e):
path, props = e.args
return props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_DBUS_TUBE
event = q.expect('dbus-signal', signal='NewChannel',
predicate=new_chan_predicate)
path, props = event.args
assertEquals(cs.CHANNEL_TYPE_DBUS_TUBE, props[cs.CHANNEL_TYPE])
assertEquals('chat@conf.localhost/bob', props[cs.INITIATOR_ID])
bob_handle = props[cs.INITIATOR_HANDLE]
assertSameSets([cs.CHANNEL_IFACE_GROUP, cs.CHANNEL_IFACE_TUBE],
props[cs.INTERFACES])
assertEquals(False, props[cs.REQUESTED])
assertEquals('chat@conf.localhost', props[cs.TARGET_ID])
assertEquals('com.example.Test', props[cs.DBUS_TUBE_SERVICE_NAME])
assertEquals({'foo': 'bar'}, props[cs.TUBE_PARAMETERS])
assertEquals([cs.SOCKET_ACCESS_CONTROL_CREDENTIALS,
cs.SOCKET_ACCESS_CONTROL_LOCALHOST],
props[cs.DBUS_TUBE_SUPPORTED_ACCESS_CONTROLS])
tube_chan = wrap_channel(bus.get_object(conn.bus_name, path), 'DBusTube1')
# only Bob is in DBusNames
dbus_names = tube_chan.Get(cs.CHANNEL_TYPE_DBUS_TUBE, 'DBusNames', dbus_interface=cs.PROPERTIES_IFACE)
assertEquals({bob_handle: bob_bus_name}, dbus_names)
call_async(q, tube_chan.DBusTube1, 'Accept', access_control)
return_event, names_changed1, names_changed2, presence_event = q.expect_many(
EventPattern('dbus-return', method='Accept'),
EventPattern('dbus-signal', signal='DBusNamesChanged', interface=cs.CHANNEL_TYPE_DBUS_TUBE),
EventPattern('dbus-signal', signal='DBusNamesChanged', interface=cs.CHANNEL_TYPE_DBUS_TUBE),
EventPattern('stream-presence', to='chat@conf.localhost/test', predicate=lambda e: t.presence_contains_tube(e)))
tube_addr = return_event.value[0]
assert len(tube_addr) > 0
# check presence stanza
tube_node = xpath.queryForNodes('/presence/tubes/tube', presence_event.stanza)[0]
assertEquals('chat@conf.localhost/bob', tube_node['initiator'])
assertEquals('com.example.Test', tube_node['service'])
assertEquals('10', tube_node['stream-id'])
assertEquals('dbus', tube_node['type'])
assertEquals('1', tube_node['id'])
self_bus_name = tube_node['dbus-name']
tubes_self_handle = tube_chan.Properties.Get(cs.CHANNEL_IFACE_GROUP, 'SelfHandle')
assertNotEquals(0, tubes_self_handle)
# both of us are in DBusNames now
dbus_names = tube_chan.Get(cs.CHANNEL_TYPE_DBUS_TUBE, 'DBusNames', dbus_interface=cs.PROPERTIES_IFACE)
assertEquals({bob_handle: bob_bus_name, tubes_self_handle: self_bus_name}, dbus_names)
added, removed = names_changed1.args
assertEquals({bob_handle: bob_bus_name}, added)
assertEquals([], removed)
added, removed = names_changed2.args
assertEquals({tubes_self_handle: self_bus_name}, added)
assertEquals([], removed)
tube_chan.Channel.Close()
q.expect_many(
EventPattern('dbus-signal', signal='Closed'),
EventPattern('dbus-signal', signal='ChannelClosed'))
if __name__ == '__main__':
# We can't use t.exec_dbus_tube_test() as we can use only the muc bytestream
exec_test(lambda q, bus, conn, stream:
test(q, bus, conn, stream, cs.SOCKET_ACCESS_CONTROL_CREDENTIALS))
exec_test(lambda q, bus, conn, stream:
test(q, bus, conn, stream, cs.SOCKET_ACCESS_CONTROL_LOCALHOST))
|