1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
"""
Regression test for <https://bugs.freedesktop.org/show_bug.cgi?id=32952>,
wherein chat states in MUCs were misparsed, and MUC chat states in general.
"""
from servicetest import assertEquals
from gabbletest import exec_test, elem
from mucutil import join_muc_and_check
import ns
import constants as cs
MUC = 'ohai@groupchat.google.com'
BOB = MUC + '/bob'
def check_state_notification(elem, name, allow_body=False):
assertEquals('message', elem.name)
assertEquals('groupchat', elem['type'])
children = list(elem.elements())
notification = [x for x in children if x.uri == ns.CHAT_STATES][0]
assert notification.name == name, notification.toXml()
if not allow_body:
assert len(children) == 1, elem.toXml()
def test(q, bus, conn, stream):
(muc_handle, chan, user, bob) = join_muc_and_check(q, bus, conn, stream,
MUC)
states = chan.Properties.Get(cs.CHANNEL_IFACE_CHAT_STATE, 'ChatStates')
assertEquals(cs.CHAT_STATE_INACTIVE,
states.get(user, cs.CHAT_STATE_INACTIVE))
assertEquals(cs.CHAT_STATE_INACTIVE,
states.get(bob, cs.CHAT_STATE_INACTIVE))
stream.send(
elem('message', from_=BOB, to='test@localhost/Resource',
type='groupchat', jid='bob@bob.bob')(
elem(ns.CHAT_STATES, 'composing'),
elem('google:nosave', 'x', value='disabled'),
elem('http://jabber.org/protocol/archive', 'record', otr='false'),
))
e = q.expect('dbus-signal', signal='ChatStateChanged')
contact, state = e.args
assertEquals(bob, contact)
assertEquals(cs.CHAT_STATE_COMPOSING, state)
states = chan.Properties.Get(cs.CHANNEL_IFACE_CHAT_STATE, 'ChatStates')
assertEquals(cs.CHAT_STATE_INACTIVE,
states.get(user, cs.CHAT_STATE_INACTIVE))
assertEquals(cs.CHAT_STATE_COMPOSING,
states.get(bob, cs.CHAT_STATE_INACTIVE))
stream.send(
elem('message', from_=BOB, to='test@localhost/Resource',
type='groupchat', jid='bob@bob.bob')(
elem(ns.CHAT_STATES, 'paused'),
elem('google:nosave', 'x', value='disabled'),
elem('http://jabber.org/protocol/archive', 'record', otr='false'),
))
e = q.expect('dbus-signal', signal='ChatStateChanged')
contact, state = e.args
assertEquals(bob, contact)
assertEquals(cs.CHAT_STATE_PAUSED, state)
states = chan.Properties.Get(cs.CHANNEL_IFACE_CHAT_STATE, 'ChatStates')
assertEquals(cs.CHAT_STATE_INACTIVE,
states.get(user, cs.CHAT_STATE_INACTIVE))
assertEquals(cs.CHAT_STATE_PAUSED,
states.get(bob, cs.CHAT_STATE_INACTIVE))
# # Bob leaves
# stream.send(
# elem('presence', from_=BOB, to='test@localhost/Resource',
# type='unavailable'))
# e = q.expect('dbus-signal', signal='ChatStateChanged')
# contact, state = e.args
# assertEquals(bob, contact)
# assertEquals(cs.CHAT_STATE_GONE, state)
# states = chan.Properties.Get(cs.CHANNEL_IFACE_CHAT_STATE, 'ChatStates')
# assertEquals(cs.CHAT_STATE_INACTIVE,
# states.get(user, cs.CHAT_STATE_INACTIVE))
# # Bob no longer has any chat state at all
# assertEquals(None, states.get(bob, None))
# Sending chat states:
# Composing...
chan.ChatState.SetChatState(cs.CHAT_STATE_COMPOSING)
stream_message = q.expect('stream-message')
check_state_notification(stream_message.stanza, 'composing')
states = chan.Properties.Get(cs.CHANNEL_IFACE_CHAT_STATE, 'ChatStates')
assertEquals(cs.CHAT_STATE_COMPOSING,
states.get(user, cs.CHAT_STATE_INACTIVE))
# XEP 0085:
# every content message SHOULD contain an <active/> notification.
chan.Text.Send(0, 'hi.')
stream_message = q.expect('stream-message')
stanza = stream_message.stanza
check_state_notification(stanza, 'active', allow_body=True)
states = chan.Properties.Get(cs.CHANNEL_IFACE_CHAT_STATE, 'ChatStates')
assertEquals(cs.CHAT_STATE_ACTIVE,
states.get(user, cs.CHAT_STATE_INACTIVE))
def is_body(e):
if e.name == 'body':
assert e.children[0] == u'hi.', e.toXml()
return True
return False
assert len([x for x in stanza.elements() if is_body(x)]) == 1, stanza.toXml()
if __name__ == '__main__':
exec_test(test)
|