summaryrefslogtreecommitdiff
path: root/salut/tests/twisted/avahi/file-transfer/file_transfer_helper.py
diff options
context:
space:
mode:
Diffstat (limited to 'salut/tests/twisted/avahi/file-transfer/file_transfer_helper.py')
-rw-r--r--salut/tests/twisted/avahi/file-transfer/file_transfer_helper.py507
1 files changed, 507 insertions, 0 deletions
diff --git a/salut/tests/twisted/avahi/file-transfer/file_transfer_helper.py b/salut/tests/twisted/avahi/file-transfer/file_transfer_helper.py
new file mode 100644
index 000000000..d25db76b9
--- /dev/null
+++ b/salut/tests/twisted/avahi/file-transfer/file_transfer_helper.py
@@ -0,0 +1,507 @@
+import dbus
+import socket
+import hashlib
+import avahi
+import BaseHTTPServer
+import urllib
+import httplib
+import urlparse
+import sys
+import os
+
+from avahitest import AvahiAnnouncer, AvahiListener, get_host_name
+from saluttest import wait_for_contact_in_publish
+
+from caps_helper import extract_data_forms, add_dataforms, compute_caps_hash, \
+ send_disco_reply
+
+from xmppstream import setup_stream_listener, connect_to_stream
+from servicetest import make_channel_proxy, EventPattern, assertEquals, call_async, sync_dbus
+import constants as cs
+import ns
+
+from twisted.words.xish import domish, xpath
+
+from dbus import PROPERTIES_IFACE
+
+class File(object):
+ DEFAULT_DATA = "What a nice file"
+ DEFAULT_NAME = "The foo.txt"
+ DEFAULT_CONTENT_TYPE = 'text/plain'
+ DEFAULT_DESCRIPTION = "A nice file to test"
+
+ def __init__(self, data=DEFAULT_DATA, name=DEFAULT_NAME,
+ content_type=DEFAULT_CONTENT_TYPE, description=DEFAULT_DESCRIPTION,
+ hash_type=cs.FILE_HASH_TYPE_MD5):
+ self.data = data
+ self.size = len(self.data)
+ self.name = name
+
+ self.content_type = content_type
+ self.description = description
+ self.date = 0
+
+ self.compute_hash(hash_type)
+
+ self.uri = 'file:///tmp/%s' % self.name
+
+ def compute_hash(self, hash_type):
+ assert hash_type == cs.FILE_HASH_TYPE_MD5
+
+ self.hash_type = hash_type
+ self.hash = hashlib.md5(self.data).hexdigest()
+
+class FileTransferTest(object):
+ CONTACT_NAME = 'test-ft'
+
+ service_name = 'wacky.service.name'
+ metadata = {'loads': ['of', 'blahblah', 'stuff'],
+ 'mental': ['data', 'sidf']}
+
+ def __init__(self):
+ self.file = File()
+ self.contact_service = None
+
+ def connect(self):
+ self.conn.Connect()
+ self.q.expect('dbus-signal', signal='StatusChanged', args=[0L, 0L])
+
+ self.self_handle = self.conn.GetSelfHandle()
+ self.self_handle_name = self.conn.InspectHandles(cs.HT_CONTACT, [self.self_handle])[0]
+
+ def announce_contact(self, name=CONTACT_NAME, metadata=True):
+ client = 'http://telepathy.freedesktop.org/fake-client'
+ features = [ns.IQ_OOB]
+
+ if metadata:
+ features += [ns.TP_FT_METADATA]
+
+ ver = compute_caps_hash([], features, {})
+ txt_record = { "txtvers": "1", "status": "avail",
+ "node": client, "ver": ver, "hash": "sha-1"}
+
+ suffix = '@%s' % get_host_name()
+ name += ('-' + os.path.splitext(os.path.basename(sys.argv[0]))[0])
+
+ self.contact_name = name + suffix
+ if len(self.contact_name) > 63:
+ allowed = 63 - len(suffix)
+ self.contact_name = name[:allowed] + suffix
+
+ self.listener, port = setup_stream_listener(self.q, self.contact_name)
+
+ self.contact_service = AvahiAnnouncer(self.contact_name, "_presence._tcp",
+ port, txt_record)
+
+ self.handle = wait_for_contact_in_publish(self.q, self.bus, self.conn,
+ self.contact_name)
+
+ # expect salut to disco our caps
+ e = self.q.expect('incoming-connection', listener=self.listener)
+ stream = e.connection
+
+ e = self.q.expect('stream-iq', to=self.contact_name, query_ns=ns.DISCO_INFO,
+ connection=stream)
+ assertEquals(client + '#' + ver, e.query['node'])
+ send_disco_reply(stream, e.stanza, [], features)
+
+ # lose the connection here to ensure connections are created
+ # where necessary; I just wanted salut to know my caps.
+ stream.send('</stream:stream>')
+ # spend a bit of time in the main loop to ensure the last two
+ # stanzas are actually received by salut before closing the
+ # connection.
+ sync_dbus(self.bus, self.q, self.conn)
+ stream.transport.loseConnection()
+
+ def wait_for_contact(self):
+ if not hasattr(self, 'handle'):
+ self.handle = wait_for_contact_in_publish(self.q, self.bus, self.conn,
+ self.contact_name)
+
+ def create_ft_channel(self):
+ self.channel = make_channel_proxy(self.conn, self.ft_path, 'Channel')
+ self.ft_channel = make_channel_proxy(self.conn, self.ft_path, 'Channel.Type.FileTransfer')
+ self.ft_props = dbus.Interface(self.bus.get_object(
+ self.conn.object.bus_name, self.ft_path), PROPERTIES_IFACE)
+
+ def close_channel(self):
+ self.channel.Close()
+ self.q.expect('dbus-signal', signal='Closed')
+
+ def test(self, q, bus, conn):
+ self.q = q
+ self.bus = bus
+ self.conn = conn
+
+ for fct in self._actions:
+ # stop if a function returns True
+ if fct():
+ break
+
+ # if we announced the service, let's be sure to get rid of it
+ if self.contact_service:
+ self.contact_service.stop()
+
+class ReceiveFileTest(FileTransferTest):
+ def __init__(self):
+ FileTransferTest.__init__(self)
+
+ self._actions = [self.connect, self.announce_contact, self.wait_for_contact,
+ self.connect_to_salut, self.setup_http_server, self.send_ft_offer_iq,
+ self.check_new_channel, self.create_ft_channel, self.set_uri,
+ self.accept_file, self.receive_file, self.close_channel]
+
+ def _resolve_salut_presence(self):
+ AvahiListener(self.q).listen_for_service("_presence._tcp")
+ e = self.q.expect('service-added', name = self.self_handle_name,
+ protocol = avahi.PROTO_INET)
+ service = e.service
+ service.resolve()
+
+ e = self.q.expect('service-resolved', service = service)
+ return str(e.pt), e.port
+
+ def connect_to_salut(self):
+ host, port = self._resolve_salut_presence()
+
+ self.outbound = connect_to_stream(self.q, self.contact_name,
+ self.self_handle_name, host, port)
+
+ e = self.q.expect('connection-result')
+ assert e.succeeded, e.reason
+ self.q.expect('stream-opened', connection = self.outbound)
+
+ def setup_http_server(self):
+ class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+ def do_GET(self_):
+ # is that the right file ?
+ filename = self_.path.rsplit('/', 2)[-1]
+ assert filename == urllib.quote(self.file.name)
+
+ self_.send_response(200)
+ self_.send_header('Content-type', self.file.content_type)
+ self_.end_headers()
+ self_.wfile.write(self.file.data)
+
+ def log_message(self, format, *args):
+ if 'CHECK_TWISTED_VERBOSE' in os.environ:
+ BaseHTTPServer.BaseHTTPRequestHandler.log_message(self, format, *args)
+
+ self.httpd = self._get_http_server_class()(('', 0), HTTPHandler)
+
+ def _get_http_server_class(self):
+ return BaseHTTPServer.HTTPServer
+
+ def send_ft_offer_iq(self):
+ iq = domish.Element((None, 'iq'))
+ iq['to'] = self.self_handle_name
+ iq['from'] = self.contact_name
+ iq['type'] = 'set'
+ iq['id'] = 'gibber-file-transfer-0'
+ query = iq.addElement(('jabber:iq:oob', 'query'))
+ url = 'http://127.0.0.1:%u/gibber-file-transfer-0/%s' % \
+ (self.httpd.server_port, urllib.quote(self.file.name))
+ url_node = query.addElement('url', content=url)
+ url_node['type'] = 'file'
+ url_node['size'] = str(self.file.size)
+ url_node['mimeType'] = self.file.content_type
+ query.addElement('desc', content=self.file.description)
+
+ # Metadata
+ if self.service_name:
+ service_form = {ns.TP_FT_METADATA_SERVICE: {'ServiceName': [self.service_name]}}
+ add_dataforms(query, service_form)
+
+ if self.metadata:
+ metadata_form = {ns.TP_FT_METADATA: self.metadata}
+ add_dataforms(query, metadata_form)
+
+ self.outbound.send(iq)
+
+ def check_new_channel(self):
+ e = self.q.expect('dbus-signal', signal='NewChannels',
+ predicate=lambda e:
+ e.args[0][0][1][cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_FILE_TRANSFER)
+
+ channels = e.args[0]
+ assert len(channels) == 1
+ path, props = channels[0]
+
+ # check channel properties
+ # org.freedesktop.Telepathy.Channel D-Bus properties
+ assert props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_FILE_TRANSFER
+ assert props[cs.INTERFACES] == []
+ assert props[cs.TARGET_HANDLE] == self.handle
+ assert props[cs.TARGET_ID] == self.contact_name
+ assert props[cs.TARGET_HANDLE_TYPE] == cs.HT_CONTACT
+ assert props[cs.REQUESTED] == False
+ assert props[cs.INITIATOR_HANDLE] == self.handle
+ assert props[cs.INITIATOR_ID] == self.contact_name
+
+ # org.freedesktop.Telepathy.Channel.Type.FileTransfer D-Bus properties
+ assert props[cs.FT_STATE] == cs.FT_STATE_PENDING
+ assert props[cs.FT_CONTENT_TYPE] == self.file.content_type
+ assert props[cs.FT_FILENAME] == self.file.name
+ assert props[cs.FT_SIZE] == self.file.size
+ # FT's protocol doesn't allow us the send the hash info
+ assert props[cs.FT_CONTENT_HASH_TYPE] == cs.FILE_HASH_TYPE_NONE
+ assert props[cs.FT_CONTENT_HASH] == ''
+ assert props[cs.FT_DESCRIPTION] == self.file.description
+ # FT's protocol doesn't allow us the send the date info
+ assert props[cs.FT_DATE] == 0
+ assert props[cs.FT_AVAILABLE_SOCKET_TYPES] == \
+ {cs.SOCKET_ADDRESS_TYPE_UNIX: [cs.SOCKET_ACCESS_CONTROL_LOCALHOST]}
+ assert props[cs.FT_TRANSFERRED_BYTES] == 0
+ assert props[cs.FT_INITIAL_OFFSET] == 0
+
+ assertEquals(self.service_name, props[cs.FT_SERVICE_NAME])
+ assertEquals(self.metadata, props[cs.FT_METADATA])
+
+ self.ft_path = path
+
+ def set_uri(self):
+ ft_props = dbus.Interface(self.ft_channel, cs.PROPERTIES_IFACE)
+
+ # URI is not set yet
+ uri = ft_props.Get(cs.CHANNEL_TYPE_FILE_TRANSFER, 'URI')
+ assertEquals('', uri)
+
+ # Setting URI
+ call_async(self.q, ft_props, 'Set',
+ cs.CHANNEL_TYPE_FILE_TRANSFER, 'URI', self.file.uri)
+
+ self.q.expect('dbus-signal', signal='URIDefined', args=[self.file.uri])
+
+ self.q.expect('dbus-return', method='Set')
+
+ # Check it has the right value now
+ uri = ft_props.Get(cs.CHANNEL_TYPE_FILE_TRANSFER, 'URI')
+ assertEquals(self.file.uri, uri)
+
+ # We can't change it once it has been set
+ call_async(self.q, ft_props, 'Set',
+ cs.CHANNEL_TYPE_FILE_TRANSFER, 'URI', 'badger://snake')
+ self.q.expect('dbus-error', method='Set', name=cs.INVALID_ARGUMENT)
+
+ def accept_file(self):
+ self.address = self.ft_channel.AcceptFile(cs.SOCKET_ADDRESS_TYPE_UNIX,
+ cs.SOCKET_ACCESS_CONTROL_LOCALHOST, "", 5, byte_arrays=True)
+
+ e = self.q.expect('dbus-signal', signal='FileTransferStateChanged')
+ state, reason = e.args
+ assert state == cs.FT_STATE_ACCEPTED
+ assert reason == cs.FT_STATE_CHANGE_REASON_REQUESTED
+
+ e = self.q.expect('dbus-signal', signal='InitialOffsetDefined')
+ offset = e.args[0]
+ # We don't support resume
+ assert offset == 0
+
+ e = self.q.expect('dbus-signal', signal='FileTransferStateChanged')
+ state, reason = e.args
+ assert state == cs.FT_STATE_OPEN
+ assert reason == cs.FT_STATE_CHANGE_REASON_NONE
+
+ def _read_file_from_socket(self, s):
+ # Read the file from Salut's socket
+ data = ''
+ read = 0
+ while read < self.file.size:
+ data += s.recv(self.file.size - read)
+ read = len(data)
+ assert data == self.file.data
+
+ e = self.q.expect('dbus-signal', signal='TransferredBytesChanged')
+ count = e.args[0]
+ while count < self.file.size:
+ # Catch TransferredBytesChanged until we transfered all the data
+ e = self.q.expect('dbus-signal', signal='TransferredBytesChanged')
+ count = e.args[0]
+
+ e = self.q.expect('dbus-signal', signal='FileTransferStateChanged')
+ state, reason = e.args
+ assert state == cs.FT_STATE_COMPLETED
+ assert reason == cs.FT_STATE_CHANGE_REASON_NONE
+
+ def receive_file(self):
+ # Connect to Salut's socket
+ s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ s.connect(self.address)
+
+ self.httpd.handle_request()
+
+ # Receiver inform us he finished to download the file
+ self.q.expect('stream-iq', iq_type='result')
+
+ self._read_file_from_socket(s)
+
+class SendFileTest(FileTransferTest):
+ def __init__(self):
+ FileTransferTest.__init__(self)
+
+ self._actions = [self.connect, self.announce_contact, self.wait_for_contact,
+ self.check_ft_available, self.request_ft_channel, self.create_ft_channel,
+ self.got_send_iq, self.provide_file, self.client_request_file, self.send_file,
+ self.close_channel]
+
+ def check_ft_available(self):
+ properties = self.conn.GetAll(cs.CONN_IFACE_REQUESTS,
+ dbus_interface=PROPERTIES_IFACE)
+
+ assert ({cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_FILE_TRANSFER,
+ cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT},
+ [cs.FT_CONTENT_HASH_TYPE,
+ cs.TARGET_HANDLE,
+ cs.TARGET_ID,
+ cs.FT_CONTENT_TYPE,
+ cs.FT_FILENAME,
+ cs.FT_SIZE,
+ cs.FT_CONTENT_HASH,
+ cs.FT_DESCRIPTION,
+ cs.FT_DATE,
+ cs.FT_INITIAL_OFFSET,
+ cs.FT_URI,
+ cs.FT_SERVICE_NAME,
+ cs.FT_METADATA],
+ ) in properties.get('RequestableChannelClasses', []),\
+ properties.get('RequestableChannelClasses')
+
+ def request_ft_channel(self, uri=True):
+ request = { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_FILE_TRANSFER,
+ cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
+ cs.TARGET_HANDLE: self.handle,
+
+ cs.FT_CONTENT_TYPE: self.file.content_type,
+ cs.FT_FILENAME: self.file.name,
+ cs.FT_SIZE: self.file.size,
+ cs.FT_CONTENT_HASH_TYPE: self.file.hash_type,
+ cs.FT_CONTENT_HASH:self.file.hash,
+ cs.FT_DESCRIPTION: self.file.description,
+ cs.FT_DATE: self.file.date,
+ cs.FT_INITIAL_OFFSET: 0 }
+
+ if self.service_name:
+ request[cs.FT_SERVICE_NAME] = self.service_name
+ if self.metadata:
+ request[cs.FT_METADATA] = dbus.Dictionary(self.metadata, signature='sas')
+
+ if uri:
+ request[cs.FT_URI] = self.file.uri
+
+ self.ft_path, props = self.conn.Requests.CreateChannel(request)
+
+ # org.freedesktop.Telepathy.Channel D-Bus properties
+ assert props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_FILE_TRANSFER
+ assert props[cs.INTERFACES] == []
+ assert props[cs.TARGET_HANDLE] == self.handle
+ assert props[cs.TARGET_ID] == self.contact_name
+ assert props[cs.TARGET_HANDLE_TYPE] == cs.HT_CONTACT
+ assert props[cs.REQUESTED] == True
+ assert props[cs.INITIATOR_HANDLE] == self.self_handle
+ assert props[cs.INITIATOR_ID] == self.self_handle_name
+
+ # org.freedesktop.Telepathy.Channel.Type.FileTransfer D-Bus properties
+ assert props[cs.FT_STATE] == cs.FT_STATE_PENDING
+ assert props[cs.FT_CONTENT_TYPE] == self.file.content_type
+ assert props[cs.FT_FILENAME] == self.file.name
+ assert props[cs.FT_SIZE] == self.file.size
+ assert props[cs.FT_CONTENT_HASH_TYPE] == self.file.hash_type
+ assert props[cs.FT_CONTENT_HASH] == self.file.hash
+ assert props[cs.FT_DESCRIPTION] == self.file.description
+ assert props[cs.FT_DATE] == self.file.date
+ assert props[cs.FT_AVAILABLE_SOCKET_TYPES] == \
+ {cs.SOCKET_ADDRESS_TYPE_UNIX: [cs.SOCKET_ACCESS_CONTROL_LOCALHOST]}
+ assert props[cs.FT_TRANSFERRED_BYTES] == 0
+ assert props[cs.FT_INITIAL_OFFSET] == 0
+ if uri:
+ assertEquals(self.file.uri, props[cs.FT_URI])
+ else:
+ assertEquals('', props[cs.FT_URI])
+ assertEquals(self.service_name, props[cs.FT_SERVICE_NAME])
+ assertEquals(self.metadata, props[cs.FT_METADATA])
+
+ def got_send_iq(self):
+ conn_event, iq_event = self.q.expect_many(
+ EventPattern('incoming-connection', listener = self.listener),
+ EventPattern('stream-iq'))
+
+ self.incoming = conn_event.connection
+
+ self._check_oob_iq(iq_event)
+
+ def _check_oob_iq(self, iq_event):
+ assert iq_event.iq_type == 'set'
+ assert iq_event.connection == self.incoming
+ self.iq = iq_event.stanza
+ assert self.iq['to'] == self.contact_name
+ query = self.iq.firstChildElement()
+ assert query.uri == 'jabber:iq:oob'
+ url_node = xpath.queryForNodes("/iq/query/url", self.iq)[0]
+ assert url_node['type'] == 'file'
+ assert url_node['size'] == str(self.file.size)
+ assert url_node['mimeType'] == self.file.content_type
+ self.url = url_node.children[0]
+ _, self.host, self.filename, _, _, _ = urlparse.urlparse(self.url)
+ urllib.unquote(self.filename) == self.file.name
+ desc_node = xpath.queryForNodes("/iq/query/desc", self.iq)[0]
+ self.desc = desc_node.children[0]
+ assert self.desc == self.file.description
+
+ # Metadata forms
+ forms = extract_data_forms(xpath.queryForNodes('/iq/query/x', self.iq))
+
+ if self.service_name:
+ assertEquals({'ServiceName': [self.service_name]},
+ forms[ns.TP_FT_METADATA_SERVICE])
+ else:
+ assert ns.TP_FT_METADATA_SERVICE not in forms
+
+ if self.metadata:
+ assertEquals(self.metadata, forms[ns.TP_FT_METADATA])
+ else:
+ assert ns.TP_FT_METADATA not in forms
+
+ def provide_file(self):
+ self.address = self.ft_channel.ProvideFile(cs.SOCKET_ADDRESS_TYPE_UNIX,
+ cs.SOCKET_ACCESS_CONTROL_LOCALHOST, "", byte_arrays=True)
+
+ def client_request_file(self):
+ # Connect HTTP client to the CM and request the file
+ self.http = httplib.HTTPConnection(self.host)
+ self.http.request('GET', self.filename)
+
+ def _get_http_response(self):
+ response = self.http.getresponse()
+ assert (response.status, response.reason) == (200, 'OK')
+ data = response.read(self.file.size)
+ # Did we received the right file?
+ assert data == self.file.data
+
+ def send_file(self):
+ s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ s.connect(self.address)
+ s.send(self.file.data)
+
+ e = self.q.expect('dbus-signal', signal='TransferredBytesChanged')
+
+ count = e.args[0]
+ while count < self.file.size:
+ # Catch TransferredBytesChanged until we transfered all the data
+ e = self.q.expect('dbus-signal', signal='TransferredBytesChanged')
+ count = e.args[0]
+
+ self._get_http_response()
+
+ # Inform sender that we received all the file from the OOB transfer
+ reply = domish.Element(('', 'iq'))
+ reply['to'] = self.iq['from']
+ reply['from'] = self.iq['to']
+ reply['type'] = 'result'
+ reply['id'] = self.iq['id']
+ self.incoming.send(reply)
+
+ e = self.q.expect('dbus-signal', signal='FileTransferStateChanged')
+ state, reason = e.args
+ assert state == cs.FT_STATE_COMPLETED
+ assert reason == cs.FT_STATE_CHANGE_REASON_NONE