summaryrefslogtreecommitdiff
path: root/tests/twisted/sasl/complex.py
blob: ac550db7d15fd26af8a4e8a718fbbf7c017723a8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""
Test the server sasl channel with the PLAIN mechanism
"""
import dbus

from servicetest import EventPattern, assertSameSets, call_async
from gabbletest import exec_test
import constants as cs
from saslutil import SaslEventAuthenticator, connect_and_get_sasl_channel

JID = "test@example.org"
INITIAL_RESPONSE = 'Thunder and lightning. Enter three Witches.'
CR_PAIRS = [
        ('When shall we three meet again?', 'Ere the set of sun.'),
        ('Where the place?', 'Upon the heath.'),
        ]
SUCCESS_DATA = 'Exeunt.'
MECHANISMS = ["PLAIN", "DIGEST-MD5", "SCOTTISH-PLAY"]

def test_complex_success(q, bus, conn, stream, with_extra_data=True,
        accept_early=False):
    chan, props = connect_and_get_sasl_channel(q, bus, conn)

    assertSameSets(MECHANISMS + ['X-TELEPATHY-PASSWORD'],
            props.get(cs.SASL_AVAILABLE_MECHANISMS))

    call_async(q, chan.SASLAuthentication, 'StartMechanismWithData',
            "FOO", "")
    q.expect('dbus-error', method='StartMechanismWithData',
            name=cs.NOT_IMPLEMENTED)

    if with_extra_data:
        chan.SASLAuthentication.StartMechanismWithData("SCOTTISH-PLAY",
                INITIAL_RESPONSE)
        e = q.expect('sasl-auth', initial_response=INITIAL_RESPONSE)
    else:
        chan.SASLAuthentication.StartMechanism("SCOTTISH-PLAY")
        e = q.expect('sasl-auth', has_initial_response=False)

    authenticator = e.authenticator

    q.expect('dbus-signal', signal='SASLStatusChanged',
             interface=cs.CHANNEL_IFACE_SASL_AUTH,
             args=[cs.SASL_STATUS_IN_PROGRESS, '', {}])

    if not with_extra_data:
        # send the stage directions in-band instead
        authenticator.challenge('')
        e = q.expect('dbus-signal', signal='NewChallenge',
                     interface=cs.CHANNEL_IFACE_SASL_AUTH)
        # this ought to be '' but dbus-python has fd.o #28131
        assert e.args in ([''], ['None'])
        chan.SASLAuthentication.Respond(INITIAL_RESPONSE)
        q.expect('sasl-response', response=INITIAL_RESPONSE)

    for challenge, response in CR_PAIRS:
        authenticator.challenge(challenge)
        q.expect('dbus-signal', signal='NewChallenge',
                     interface=cs.CHANNEL_IFACE_SASL_AUTH,
                     args=[challenge])
        chan.SASLAuthentication.Respond(response)
        q.expect('sasl-response', response=response)

    if with_extra_data:
        authenticator.success(SUCCESS_DATA)
    else:
        # The success data is sent in-band as a challenge
        authenticator.challenge(SUCCESS_DATA)

    q.expect('dbus-signal', signal='NewChallenge',
                 interface=cs.CHANNEL_IFACE_SASL_AUTH,
                 args=[SUCCESS_DATA])

    if accept_early:
        # the UI can tell that this challenge isn't actually a challenge,
        # it's a success in disguise
        chan.SASLAuthentication.AcceptSASL()
        q.expect('dbus-signal', signal='SASLStatusChanged',
                interface=cs.CHANNEL_IFACE_SASL_AUTH,
                args=[cs.SASL_STATUS_CLIENT_ACCEPTED, '', {}])
    else:
        chan.SASLAuthentication.Respond(dbus.ByteArray(''))

    if with_extra_data:
        # Wocky removes the distinction between a challenge containing
        # success data followed by a plain success, and a success
        # containing initial data, so we won't get to Server_Succeeded
        # til we "respond" to the "challenge". However, at the XMPP level,
        # we shouldn't get a response to a success.
        q.forbid_events([EventPattern('sasl-response')])
    else:
        q.expect('sasl-response', response='')
        authenticator.success(None)

    if not accept_early:
        # *now* we accept
        q.expect('dbus-signal', signal='SASLStatusChanged',
                interface=cs.CHANNEL_IFACE_SASL_AUTH,
                args=[cs.SASL_STATUS_SERVER_SUCCEEDED, '', {}])
        # We're willing to accept this SASL transaction
        chan.SASLAuthentication.AcceptSASL()

    q.expect('dbus-signal', signal='SASLStatusChanged',
            interface=cs.CHANNEL_IFACE_SASL_AUTH,
            args=[cs.SASL_STATUS_SUCCEEDED, '', {}])

    q.expect('dbus-signal', signal='StatusChanged',
             args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])
    chan.Close()
    # ... and check that the Connection is still OK
    conn.Properties.Get(cs.CONN, "SelfHandle")

def test_complex_success_data(q, bus, conn, stream):
    test_complex_success(q, bus, conn, stream, True)

def test_complex_success_no_data(q, bus, conn, stream):
    test_complex_success(q, bus, conn, stream, False)

def test_complex_success_data_accept(q, bus, conn, stream):
    test_complex_success(q, bus, conn, stream, True, True)

def test_complex_success_no_data_accept(q, bus, conn, stream):
    test_complex_success(q, bus, conn, stream, False, True)

if __name__ == '__main__':
    exec_test(
        test_complex_success_data, {'password': None, 'account' : JID},
        authenticator=SaslEventAuthenticator(JID.split('@')[0], MECHANISMS),
        do_connect=False)
    exec_test(
        test_complex_success_no_data, {'password': None, 'account' : JID},
        authenticator=SaslEventAuthenticator(JID.split('@')[0], MECHANISMS),
        do_connect=False)
    exec_test(
        test_complex_success_data_accept, {'password': None, 'account' : JID},
        authenticator=SaslEventAuthenticator(JID.split('@')[0], MECHANISMS),
        do_connect=False)
    exec_test(
        test_complex_success_no_data_accept, {'password': None, 'account' : JID},
        authenticator=SaslEventAuthenticator(JID.split('@')[0], MECHANISMS),
        do_connect=False)