summaryrefslogtreecommitdiff
path: root/tests/twisted/muc/subject.py
blob: c323331ec41c18039601cde01a923b0144c9e7d0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""
Test Channel.Interface.Subject on MUC channels
"""

import dbus

from twisted.words.xish import domish

from gabbletest import exec_test, make_result_iq, make_muc_presence
from servicetest import (EventPattern, assertEquals, call_async)
import constants as cs
import ns

from mucutil import join_muc

def test(q, bus, conn, stream):
    # 3x2x2 possible combinations of change_subject, send_first, moderator:
    # unrolling the loop here so we'll get better Python tracebacks on failure
    test_subject(q, bus, conn, stream, None, False, False)
    test_subject(q, bus, conn, stream, None, False, True)
    test_subject(q, bus, conn, stream, None, True, False)
    test_subject(q, bus, conn, stream, None, False, True)
    test_subject(q, bus, conn, stream, True, False, False)
    test_subject(q, bus, conn, stream, True, False, True)
    test_subject(q, bus, conn, stream, True, True, False)
    test_subject(q, bus, conn, stream, True, False, True)
    test_subject(q, bus, conn, stream, False, False, False)
    test_subject(q, bus, conn, stream, False, False, True)
    test_subject(q, bus, conn, stream, False, True, False)
    test_subject(q, bus, conn, stream, False, False, True)

def check_subject_props(chan, subject_str, actor, flags, signal=None):
    if signal is not None:
        assertEquals(subject_str, signal.args[0])
        assertEquals(actor, signal.args[1])
        assertEquals(flags, signal.args[3])

    props = chan.GetAll(cs.CHANNEL_IFACE_SUBJECT,
                        dbus_interface=dbus.PROPERTIES_IFACE)
    subject = props['Subject']
    subject_actor = props['Actor']
    subject_can_set = props['CanSet']

def test_subject(q, bus, conn, stream, change_subject, send_first,
        moderator):
    room = 'test@conf.localhost'

    chan, path, props, disco = join_muc(q, bus, conn, stream,
            room,
            also_capture=[EventPattern('stream-iq', iq_type='get',
                query_name='query', query_ns=ns.DISCO_INFO, to=room)],
            role=(moderator and 'moderator' or 'participant'))

    assert chan.Properties.Get(cs.CHANNEL_IFACE_SUBJECT, "CanSet")

    if send_first:
        # Someone sets a subject.
        message = domish.Element((None, 'message'))
        message['from'] = room + '/bob'
        message['type'] = 'groupchat'
        message.addElement('subject', content='Testing')
        stream.send(message)

        q.expect('dbus-signal', interface=cs.PROPERTIES_IFACE,
                 signal='PropertiesChanged',
                 predicate=lambda e: e.args[0] == cs.CHANNEL_IFACE_SUBJECT)
        check_subject_props(chan, 'Testing', room + '/bob', True)

    # Reply to the disco
    iq = make_result_iq(stream, disco.stanza)
    query = iq.firstChildElement()
    feat = query.addElement('feature')
    feat['var'] = 'muc_public'

    x = query.addElement((ns.X_DATA, 'x'))
    x['type'] = 'result'

    if change_subject is not None:
        # When fd.o #13157 has been fixed, this will actually do something.
        field = x.addElement('field')
        field['var'] = 'muc#roomconfig_changesubject'
        field.addElement('value',
                content=(change_subject and 'true' or 'false'))

    stream.send(iq)

    # Someone sets a subject.
    message = domish.Element((None, 'message'))
    message['from'] = room + '/bob'
    message['type'] = 'groupchat'
    message.addElement('subject', content='lalala')
    stream.send(message)

    q.expect('dbus-signal', interface=cs.PROPERTIES_IFACE,
             signal='PropertiesChanged',
             predicate=lambda e: e.args[0] == cs.CHANNEL_IFACE_SUBJECT)
    check_subject_props(chan, 'lalala', room + '/bob', True)

    # test changing the subject
    call_async(q, chan, 'SetSubject', 'le lolz', dbus_interface=cs.CHANNEL_IFACE_SUBJECT)

    e = q.expect('stream-message', to=room)
    elem = e.stanza
    assertEquals('groupchat', elem['type'])
    assertEquals(1, len(elem.children))
    assertEquals(elem.children[0].name, 'subject')
    assertEquals(str(elem.children[0]), 'le lolz')

    elem['from'] = room + '/test'
    stream.send(elem)

    q.expect_many(EventPattern('dbus-signal', interface=cs.PROPERTIES_IFACE,
                               signal='PropertiesChanged',
                               predicate=lambda e: e.args[0] == cs.CHANNEL_IFACE_SUBJECT),
                  EventPattern('dbus-return', method='SetSubject'),
                 )

    check_subject_props(chan, 'le lolz', room + '/test', True)

    # Test changing the subject and getting an error back.
    call_async(q, chan, 'SetSubject', 'CHICKEN MAN', dbus_interface=cs.CHANNEL_IFACE_SUBJECT)

    e = q.expect('stream-message', to=room)
    elem = e.stanza
    elem['from'] = room
    elem['type'] = 'error'
    error = elem.addElement((None, 'error'))
    error['type'] = 'auth'
    error.addElement((ns.STANZA, 'forbidden'))
    stream.send(elem)
    q.expect('dbus-error', method='SetSubject', name=cs.PERMISSION_DENIED)

    # Test changing the subject and getting an error back which doesn't echo
    # the <subject> element.
    call_async(q, chan, 'SetSubject', 'CHICKEN MAN', dbus_interface=cs.CHANNEL_IFACE_SUBJECT)

    e = q.expect('stream-message', to=room)
    message = domish.Element((None, 'message'))
    message['from'] = room
    message['id'] = e.stanza['id']
    message['type'] = 'error'
    error = message.addElement((None, 'error'))
    error.addElement((ns.STANZA, 'forbidden'))
    stream.send(message)

    q.expect('dbus-error', method='SetSubject', name=cs.PERMISSION_DENIED)

    # Test changing the subject just before we leave the room (and hence not
    # getting a reply). While we're here, check that you can't have more than
    # one call in flight at a time.
    call_async(q, chan, 'SetSubject', 'le lolz', dbus_interface=cs.CHANNEL_IFACE_SUBJECT)
    e = q.expect('stream-message', to=room)

    call_async(q, chan, 'SetSubject', 'le lolz', dbus_interface=cs.CHANNEL_IFACE_SUBJECT)
    q.expect('dbus-error', method='SetSubject', name=cs.NOT_AVAILABLE)

    chan.Close()

    event = q.expect('stream-presence', to=room + '/test')
    elem = event.stanza
    assertEquals('unavailable', elem['type'])

    q.expect('dbus-error', method='SetSubject', name=cs.CANCELLED)

    call_async(q, chan, 'SetSubject', 'how about now?',
        dbus_interface=cs.CHANNEL_IFACE_SUBJECT)
    q.expect('dbus-error', method='SetSubject', name=cs.NOT_AVAILABLE)

    # The MUC confirms that we've left the room.
    echo = make_muc_presence('member', 'none', room, 'test')
    echo['type'] = 'unavailable'
    stream.send(echo)
    q.expect('dbus-signal', signal='ChannelClosed')

if __name__ == '__main__':
    exec_test(test)