summaryrefslogtreecommitdiff
path: root/tests/twisted/avahi/tubes/tube-close.py
blob: ae2c2af0b59ef74a77c50e4f3be1e3e2c948151a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
"""
Offer a 1-1 stream tube and close the connection. Salut must just send a
stanza to close the tube and disconnect.
"""

from saluttest import exec_test, wait_for_contact_in_publish
from avahitest import AvahiAnnouncer, AvahiListener
from avahitest import get_host_name
import avahi

from xmppstream import setup_stream_listener, connect_to_stream
from servicetest import make_channel_proxy

from twisted.words.xish import xpath, domish

import dbus

PUBLISHED_NAME="test-tube"

CHANNEL_TYPE_TUBES = "org.freedesktop.Telepathy.Channel.Type.Tubes"
HT_CONTACT = 1
HT_CONTACT_LIST = 3
SOCKET_ADDRESS_TYPE_IPV4 = dbus.UInt32(2)
SOCKET_ACCESS_CONTROL_LOCALHOST = dbus.UInt32(0)

#print "FIXME: test-tube-close.py disabled because sending a close stanza on "
#print "disconnection is not yet implemented in telepathy-salut. It requires "
#print "to ensure the XmppConnection and reestablish it"
print "FIXME: disabled because 1-1 tubes are disabled for now"
# exiting 77 causes automake to consider the test to have been skipped
raise SystemExit(77)

def test(q, bus, conn):
    # Salut will not connect to this socket, the test finishs before
    socket_address = ('0.0.0.0', dbus.UInt16(0))

    conn.Connect()
    q.expect('dbus-signal', signal='StatusChanged', args=[0L, 0L])
    basic_txt = { "txtvers": "1", "status": "avail" }

    contact_name = PUBLISHED_NAME + "@" + get_host_name()
    listener, port = setup_stream_listener(q, contact_name)

    announcer = AvahiAnnouncer(contact_name, "_presence._tcp", port, basic_txt)

    handle = wait_for_contact_in_publish(q, bus, conn, contact_name)

    t = conn.RequestChannel(CHANNEL_TYPE_TUBES, HT_CONTACT, handle,
        True)
    tubes_channel = make_channel_proxy(conn, t, "Channel.Type.Tubes")

    tubes_channel.OfferStreamTube("http", dbus.Dictionary({}),
            SOCKET_ADDRESS_TYPE_IPV4, socket_address,
            SOCKET_ACCESS_CONTROL_LOCALHOST, "")
    
    e = q.expect('stream-iq')

    # Close the connection just after the tube has been offered.
    conn.Disconnect()

    # receive the close stanza for the tube
    event = q.expect('stream-message')
    message = event.stanza
    close_node = xpath.queryForNodes('/message/close[@xmlns="%s"]' % NS_TUBES,
        message)
    assert close_node is not None

    q.expect('dbus-signal', signal='StatusChanged', args=[2, 1])

if __name__ == '__main__':
    exec_test(test)