1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
"""
Test the server sasl channel with the PLAIN mechanism
"""
import dbus
from servicetest import EventPattern, assertEquals
from gabbletest import exec_test
import constants as cs
from saslutil import SaslComplexAuthenticator, connect_and_get_sasl_channel
JID = "test@example.org"
EXCHANGE = [("", "Hi dear"),
("Hello", "How are you?"),
("Swell, how are you?", "Fine, thank you"),
("Do anything fun today?", "Nah"),
("Server data", "")]
MECHANISMS = ["PLAIN", "DIGEST-MD5", "POLITE-TEST"]
def test_complex_success(q, bus, conn, stream):
chan, auth_info, avail_mechs = connect_and_get_sasl_channel(q, bus, conn)
assertEquals(MECHANISMS, avail_mechs)
try:
chan.SaslAuthentication.StartMechanism("FOO", "")
except dbus.DBusException:
pass
else:
raise AssertionError, \
"Expected DBusException when choosing unavailable mechanism"
if EXCHANGE[0][0] == "":
initial_data = EXCHANGE[0][1]
else:
initial_data = ""
chan.SaslAuthentication.StartMechanism("POLITE-TEST", initial_data)
q.expect('dbus-signal', signal='StateChanged',
interface=cs.CHANNEL_IFACE_SASL_AUTH,
args=[cs.SASL_STATUS_IN_PROGRESS, '', ''])
for i, pair in enumerate(EXCHANGE[1:]):
challenge, response = pair
e = q.expect('dbus-signal', signal='NewChallenge',
interface=cs.CHANNEL_IFACE_SASL_AUTH,
args=[challenge or 'None'])
challenge_prop = ''.join(
map(chr, chan.Properties.Get(
cs.CHANNEL_IFACE_SASL_AUTH, "CurrentChallenge")))
assertEquals(challenge, challenge_prop)
if i == len(EXCHANGE) - 2:
chan.SaslAuthentication.Accept()
q.expect('dbus-signal', signal='StateChanged',
interface=cs.CHANNEL_IFACE_SASL_AUTH,
args=[cs.SASL_STATUS_CLIENT_ACCEPTED, '', ''])
else:
chan.SaslAuthentication.Respond(response)
q.expect('dbus-signal', signal='StateChanged',
interface=cs.CHANNEL_IFACE_SASL_AUTH,
args=[cs.SASL_STATUS_SUCCEEDED, '', ''])
q.expect('dbus-signal', signal='StatusChanged',
args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])
if __name__ == '__main__':
# Data on success
exec_test(
test_complex_success, {'password': None, 'account' : JID},
authenticator=SaslComplexAuthenticator(JID.split('@')[0],
EXCHANGE, MECHANISMS))
# No data on success
exec_test(
test_complex_success, {'password': None, 'account' : JID},
authenticator=SaslComplexAuthenticator(JID.split('@')[0],
EXCHANGE, MECHANISMS, False))
|