summaryrefslogtreecommitdiff
path: root/tests/twisted/tubes/test-get-available-tubes.py
blob: af13da4d79be0c4547f63d787702eeb638249382 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
"""Test GetAvailableStreamTubeTypes and GetAvailableTubeTypes"""

import sys

import dbus

from servicetest import call_async, EventPattern, tp_name_prefix,\
    assertContains, assertEquals, assertLength
from gabbletest import (
    exec_test, make_result_iq, acknowledge_iq, make_muc_presence)
import constants as cs

sample_parameters = dbus.Dictionary({
    's': 'hello',
    'ay': dbus.ByteArray('hello'),
    'u': dbus.UInt32(123),
    'i': dbus.Int32(-123),
    }, signature='sv')

def test(q, bus, conn, stream):
    conn.Connect()

    _, iq_event = q.expect_many(
        EventPattern('dbus-signal', signal='StatusChanged',
            args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED]),
        EventPattern('stream-iq', to=None, query_ns='vcard-temp',
            query_name='vCard'))

    acknowledge_iq(stream, iq_event.stanza)

    call_async(q, conn, 'RequestHandles', cs.HT_ROOM, ['chat@conf.localhost'])

    event = q.expect('dbus-return', method='RequestHandles')
    handles = event.value[0]

    # request tubes channel (old API)
    call_async(q, conn, 'RequestChannel',
        tp_name_prefix + '.Channel.Type.Tubes', cs.HT_ROOM, handles[0], True)

    _, stream_event = q.expect_many(
        EventPattern('dbus-signal', signal='MembersChanged',
            args=[u'', [], [], [], [2], 0, 0]),
        EventPattern('stream-presence', to='chat@conf.localhost/test'))

    # Send presence for other member of room.
    stream.send(make_muc_presence('owner', 'moderator', 'chat@conf.localhost', 'bob'))

    # Send presence for own membership of room.
    stream.send(make_muc_presence('owner', 'moderator', 'chat@conf.localhost', 'test'))

    new_chans, members, event = q.expect_many(
        EventPattern('dbus-signal', signal='NewChannels'),
        EventPattern('dbus-signal', signal='MembersChanged',
            args=[u'', [2, 3], [], [], [], 0, 0]),
        EventPattern('dbus-return', method='RequestChannel'))

    channels = new_chans.args[0]
    assertLength(2, channels)

    assertEquals(conn.InspectHandles(cs.HT_CONTACT, [2]), ['chat@conf.localhost/test'])
    assertEquals(conn.InspectHandles(cs.HT_CONTACT, [3]), ['chat@conf.localhost/bob'])
    bob_handle = 3

    tubes_chan = bus.get_object(conn.bus_name, event.value[0])
    tubes_iface_muc = dbus.Interface(tubes_chan,
            tp_name_prefix + '.Channel.Type.Tubes')

    # FIXME: these using a "1-1" tubes channel too

    # test GetAvailableTubeTypes (old API)
    tube_types = tubes_iface_muc.GetAvailableTubeTypes()

    assertLength(2, tube_types)
    assertContains(cs.TUBE_TYPE_DBUS, tube_types)
    assertContains(cs.TUBE_TYPE_STREAM, tube_types)

    # test GetAvailableStreamTubeTypes (old API)
    stream_tubes_types = tubes_iface_muc.GetAvailableStreamTubeTypes()
    assertLength(3, stream_tubes_types)
    assert cs.SOCKET_ACCESS_CONTROL_LOCALHOST in \
        stream_tubes_types[cs.SOCKET_ADDRESS_TYPE_UNIX], \
        stream_tubes_types[cs.SOCKET_ADDRESS_TYPE_UNIX]
    # so far we only guarantee to support credentials-passing on Linux
    if sys.platform == 'linux2':
        assert cs.SOCKET_ACCESS_CONTROL_CREDENTIALS in \
            stream_tubes_types[cs.SOCKET_ADDRESS_TYPE_UNIX], \
            stream_tubes_types[cs.SOCKET_ADDRESS_TYPE_UNIX]
    assertEquals([cs.SOCKET_ACCESS_CONTROL_LOCALHOST, cs.SOCKET_ACCESS_CONTROL_PORT],
        stream_tubes_types[cs.SOCKET_ADDRESS_TYPE_IPV4])
    assertEquals([cs.SOCKET_ACCESS_CONTROL_LOCALHOST, cs.SOCKET_ACCESS_CONTROL_PORT],
        stream_tubes_types[cs.SOCKET_ADDRESS_TYPE_IPV6])

    # muc stream tube (new API)
    path, props = conn.Requests.CreateChannel({
        cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAM_TUBE,
        cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
        cs.TARGET_ID: 'chat@conf.localhost',
        cs.STREAM_TUBE_SERVICE: 'test'})

    tube = bus.get_object(conn.bus_name, path)
    sockets = tube.Get(cs.CHANNEL_TYPE_STREAM_TUBE, 'SupportedSocketTypes',
        dbus_interface=cs.PROPERTIES_IFACE)
    assertEquals(sockets, stream_tubes_types)

if __name__ == '__main__':
    exec_test(test)