diff options
author | Jonny Lamb <jonny.lamb@collabora.co.uk> | 2010-07-09 18:04:24 +0100 |
---|---|---|
committer | Jonny Lamb <jonny.lamb@collabora.co.uk> | 2010-07-09 18:04:24 +0100 |
commit | 1db384e1d6a809721c046fe8aa782fa0bbf502cc (patch) | |
tree | 529624d568948b2c8c40038dc08b0e071e3761da | |
parent | bd71b7b0208e39b1d882d404e0da719239148388 (diff) | |
parent | d29b9888d93212d82f8d1b8732eff838f32f48ac (diff) |
Merge remote branch 'lfrb/file-transfer'
-rw-r--r-- | butterfly/capabilities.py | 76 | ||||
-rw-r--r-- | butterfly/channel/Makefile.am | 1 | ||||
-rw-r--r-- | butterfly/channel/file_transfer.py | 345 | ||||
-rw-r--r-- | butterfly/channel_manager.py | 36 | ||||
-rw-r--r-- | butterfly/connection.py | 11 | ||||
-rwxr-xr-x | telepathy-butterfly | 1 |
6 files changed, 446 insertions, 24 deletions
diff --git a/butterfly/capabilities.py b/butterfly/capabilities.py index 67025a2..1c876e5 100644 --- a/butterfly/capabilities.py +++ b/butterfly/capabilities.py @@ -65,6 +65,18 @@ class ButterflyCapabilities( telepathy.CHANNEL_TYPE_STREAMED_MEDIA + '.InitialAudio', telepathy.CHANNEL_TYPE_STREAMED_MEDIA + '.InitialVideo']) + file_transfer_class = \ + ({telepathy.CHANNEL_INTERFACE + '.ChannelType': + telepathy.CHANNEL_TYPE_FILE_TRANSFER, + telepathy.CHANNEL_INTERFACE + '.TargetHandleType': + dbus.UInt32(telepathy.HANDLE_TYPE_CONTACT)}, + [telepathy.CHANNEL_INTERFACE + '.TargetHandle', + telepathy.CHANNEL_INTERFACE + '.TargetID', + telepathy.CHANNEL_TYPE_FILE_TRANSFER + '.Requested', + telepathy.CHANNEL_TYPE_FILE_TRANSFER + '.Filename', + telepathy.CHANNEL_TYPE_FILE_TRANSFER + '.Size', + telepathy.CHANNEL_TYPE_FILE_TRANSFER + '.ContentType']) + def __init__(self): telepathy.server.ConnectionInterfaceCapabilities.__init__(self) @@ -154,31 +166,55 @@ class ButterflyCapabilities( # papyon.event.AddressBookEventInterface def on_addressbook_contact_added(self, contact): """When we add a contact in our contact list, add the - capabilities to create text channel to the contact""" + default capabilities to the contact""" if contact.is_member(papyon.Membership.FORWARD): handle = ButterflyHandleFactory(self, 'contact', contact.account, contact.network_id) - self.add_text_capabilities([handle]) + self.add_default_capabilities([handle]) - def add_text_capabilities(self, contacts_handles): - """Add the create capability for text channel to these contacts.""" + def _diff_capabilities(self, handle, ctype, new_gen=None, + new_spec=None, added_gen=None, added_spec=None): + + if handle in self._caps and ctype in self._caps[handle]: + old_gen, old_spec = self._caps[handle][ctype] + else: + old_gen = 0 + old_spec = 0 + + if new_gen is None: + new_gen = old_gen + if new_spec is None: + new_spec = old_spec + if added_gen: + new_gen |= added_gen + if added_spec: + new_spec |= new_spec + + if old_gen != new_gen or old_spec != new_spec: + diff = (int(handle), ctype, old_gen, new_gen, old_spec, new_spec) + return diff + + return None + + def add_default_capabilities(self, contacts_handles): + """Add the default capabilities to these contacts.""" ret = [] cc_ret = dbus.Dictionary({}, signature='ua(a{sv}as)') for handle in contacts_handles: + new_flag = telepathy.CONNECTION_CAPABILITY_FLAG_CREATE + ctype = telepathy.CHANNEL_TYPE_TEXT - if handle in self._caps and ctype in self._caps[handle]: - old_gen, old_spec = self._caps[handle][ctype] - else: - old_gen = 0 - old_spec = 0 - new_gen = old_gen - new_gen |= telepathy.CONNECTION_CAPABILITY_FLAG_CREATE + diff = self._diff_capabilities(handle, ctype, added_gen=new_flag) + ret.append(diff) - diff = (int(handle), ctype, old_gen, new_gen, old_spec, old_spec) + ctype = telepathy.CHANNEL_TYPE_FILE_TRANSFER + diff = self._diff_capabilities(handle, ctype, added_gen=new_flag) ret.append(diff) # ContactCapabilities - self._contact_caps.setdefault(handle, []).append(self.text_chat_class) + caps = self._contact_caps.setdefault(handle, []) + caps.append(self.text_chat_class) + caps.append(self.file_transfer_class) cc_ret[handle] = self._contact_caps[handle] self.CapabilitiesChanged(ret) @@ -190,14 +226,8 @@ class ButterflyCapabilities( ctype = telepathy.CHANNEL_TYPE_STREAMED_MEDIA new_gen, new_spec, rcc = self._get_capabilities(contact) - if handle in self._caps: - old_gen, old_spec = self._caps[handle][ctype] - else: - old_gen = 0 - old_spec = 0 - - if old_gen != new_gen or old_spec != new_spec: - diff = (int(handle), ctype, old_gen, new_gen, old_spec, new_spec) + diff = self._diff_capabilities(handle, ctype, new_gen, new_spec) + if diff is not None: self.CapabilitiesChanged([diff]) if rcc is None: @@ -243,7 +273,7 @@ class ButterflyCapabilities( @async def _populate_capabilities(self): - """ Add the capability to create text channels to all contacts in our + """ Add the default capabilities to all contacts in our contacts list.""" handles = set([self._self_handle]) for contact in self.msn_client.address_book.contacts: @@ -251,7 +281,7 @@ class ButterflyCapabilities( handle = ButterflyHandleFactory(self, 'contact', contact.account, contact.network_id) handles.add(handle) - self.add_text_capabilities(handles) + self.add_default_capabilities(handles) # These caps were updated before we were online. for caps in self._update_capabilities_calls: diff --git a/butterfly/channel/Makefile.am b/butterfly/channel/Makefile.am index 91d02bb..6ac1774 100644 --- a/butterfly/channel/Makefile.am +++ b/butterfly/channel/Makefile.am @@ -2,6 +2,7 @@ channeldir = $(pythondir)/butterfly/channel channel_PYTHON = \ conference.py \ contact_list.py \ + file_transfer.py \ group.py \ im.py \ __init__.py \ diff --git a/butterfly/channel/file_transfer.py b/butterfly/channel/file_transfer.py new file mode 100644 index 0000000..bd35817 --- /dev/null +++ b/butterfly/channel/file_transfer.py @@ -0,0 +1,345 @@ +# telepathy-butterfly - an MSN connection manager for Telepathy +# +# Copyright (C) 2010 Collabora Ltd. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +import logging +import weakref +import time +import tempfile +import os +import shutil + +import dbus +import gobject +import telepathy +import papyon +import papyon.event +import socket + +from butterfly.util.decorator import async +from butterfly.handle import ButterflyHandleFactory + +from telepathy.interfaces import CHANNEL_TYPE_FILE_TRANSFER + +__all__ = ['ButterflyFileTransferChannel'] + +logger = logging.getLogger('Butterfly.FileTransferChannel') + + +class ButterflyFileTransferChannel(telepathy.server.ChannelTypeFileTransfer): + + def __init__(self, conn, manager, session, handle, props, object_path=None): + telepathy.server.ChannelTypeFileTransfer.__init__(self, conn, manager, props, + object_path=object_path) + + self._handle = handle + self._conn_ref = weakref.ref(conn) + self._state = 0 + self._transferred = 0 + + self._receiving = not props[telepathy.CHANNEL + '.Requested'] + self.socket = None + self._tmpdir = None + + self._last_ltb_emitted = 0 + self._progress_timer = 0 + + # Incoming. + if session is None: + type = telepathy.CHANNEL_TYPE_FILE_TRANSFER + filename = props.get(type + ".Filename", None) + size = props.get(type + ".Size", None) + + if filename is None or size is None: + raise telepathy.InvalidArgument( + "New file transfer channel requires Filename and Size properties") + + client = conn.msn_client + session = client.ft_manager.send(handle.contact, filename, size) + + self._session = session + self._filename = session.filename + self._size = session.size + + session.connect("accepted", self._transfer_accepted) + session.connect("progressed", self._transfer_progressed) + session.connect("completed", self._transfer_completed) + + dbus_interface = telepathy.CHANNEL_TYPE_FILE_TRANSFER + self._implement_property_get(dbus_interface, { + 'State' : lambda: dbus.UInt32(self.state), + 'ContentType': lambda: self.content_type, + 'Filename': lambda: self.filename, + 'Size': lambda: dbus.UInt64(self.size), + 'Description': lambda: self.description, + 'AvailableSocketTypes': lambda: self.socket_types, + 'TransferredBytes': lambda: self.transferred, + 'InitialOffset': lambda: self.offset + }) + + self._add_immutables({ + 'Filename': CHANNEL_TYPE_FILE_TRANSFER, + 'Size': CHANNEL_TYPE_FILE_TRANSFER, + }) + + self.set_state(telepathy.FILE_TRANSFER_STATE_PENDING, + telepathy.FILE_TRANSFER_STATE_CHANGE_REASON_REQUESTED) + + @property + def state(self): + return self._state + + @property + def content_type(self): + return "application/octet-stream" + + @property + def filename(self): + return self._filename + + @property + def size(self): + return self._size + + @property + def description(self): + return "" + + @property + def socket_types(self): + return {telepathy.SOCKET_ADDRESS_TYPE_UNIX: + [telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, + telepathy.SOCKET_ACCESS_CONTROL_CREDENTIALS]} + + @property + def transferred(self): + return self._transferred + + @property + def offset(self): + return 0 + + def set_state(self, state, reason): + if self._state == state: + return + logger.debug("State change: %u -> %u (reason: %u)" % (self._state, state, reason)) + self._state = state + self.FileTransferStateChanged(state, reason) + + def AcceptFile(self, address_type, access_control, param, offset): + logger.debug("Accept file") + + if address_type not in self.socket_types.keys(): + raise telepathy.NotImplemented("Socket type %u is unsupported" % address_type) + + self.socket = self.add_listener() + self.channel = self.add_io_channel(self.socket) + self.set_state(telepathy.FILE_TRANSFER_STATE_PENDING, + telepathy.FILE_TRANSFER_STATE_CHANGE_REASON_REQUESTED) + self.InitialOffsetDefined(0) + self.set_state(telepathy.FILE_TRANSFER_STATE_OPEN, + telepathy.FILE_TRANSFER_STATE_CHANGE_REASON_NONE) + return self.socket.getsockname() + + def ProvideFile(self, address_type, access_control, param): + logger.debug("Provide file") + + if address_type not in self.socket_types.keys(): + raise telepathy.NotImplemented("Socket type %u is unsupported" % address_type) + + self.socket = self.add_listener() + self.channel = self.add_io_channel(self.socket) + return self.socket.getsockname() + + def Close(self): + logger.debug("Close") + self.cleanup() + if self.state not in (telepathy.FILE_TRANSFER_STATE_CANCELLED, + telepathy.FILE_TRANSFER_STATE_COMPLETED): + self.set_state(telepathy.FILE_TRANSFER_STATE_CANCELLED, + telepathy.FILE_TRANSFER_STATE_CHANGE_REASON_LOCAL_CANCELLED) + telepathy.server.ChannelTypeFileTransfer.Close(self) + self.remove_from_connection() + + def cleanup(self): + if self._receiving and self.state == telepathy.FILE_TRANSFER_STATE_PENDING: + self._session.reject() + + if self.state not in (telepathy.FILE_TRANSFER_STATE_CANCELLED, + telepathy.FILE_TRANSFER_STATE_COMPLETED): + self._session.cancel() + + if self.socket: + self.socket.close() + self.socket = None + + if self._tmpdir: + shutil.rmtree(self._tmpdir) + self._tmpdir = None + + def GetSelfHandle(self): + return self._conn.GetSelfHandle() + + def add_listener(self): + """Create a listener socket""" + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) + self._tmpdir = tempfile.mkdtemp(prefix="butterfly") + sock.bind(os.path.join(self._tmpdir, "ft-socket")) + sock.listen(1) + return sock + + def add_io_channel(self, sock): + """Set up notification on the socket via a giochannel""" + sock.setblocking(False) + channel = gobject.IOChannel(sock.fileno()) + channel.set_flags(channel.get_flags() | gobject.IO_FLAG_NONBLOCK) + channel.add_watch(gobject.IO_IN, self._socket_connected) + channel.add_watch(gobject.IO_HUP | gobject.IO_ERR, + self._socket_disconnected) + return channel + + def _socket_connected(self, channel, condition): + logger.debug("Client socket connected") + sock = self.socket.accept()[0] + if self._receiving: + buffer = DataBuffer(sock) + self._session.set_receive_data_buffer(buffer, self.size) + # Notify the other end we accepted the FT + self._session.accept() + else: + buffer = DataBuffer(sock, self.size) + self._session.send(buffer) + self.socket = sock + + def _socket_disconnected(self, channel, condition): + logger.debug("Client socket disconnected") + #self.cleanup() + #TODO only cancel if the socket is disconnected while listening + #self._session.cancel() + #self.set_state(telepathy.FILE_TRANSFER_STATE_CANCELLED, + # telepathy.FILE_TRANSFER_STATE_CHANGE_REASON_LOCAL_ERROR) + + def _transfer_accepted(self, session): + logger.debug("Transfer has been accepted") + self.set_state(telepathy.FILE_TRANSFER_STATE_ACCEPTED, + telepathy.FILE_TRANSFER_STATE_CHANGE_REASON_REQUESTED) + self.set_state(telepathy.FILE_TRANSFER_STATE_OPEN, + telepathy.FILE_TRANSFER_STATE_CHANGE_REASON_NONE) + + def _transfer_progressed(self, session, size): + self._transferred += size + + def emit_signal(): + self.TransferredBytesChanged(self.transferred) + self._last_ltb_emitted = time.time() + self._progress_timer = 0 + return False + + # If the transfer has finished send an update right away. + if self.transferred >= self.size: + emit_signal() + return + + # A progress update signal is already scheduled. + if self._progress_timer != 0: + return + + # Only emit the TransferredBytes signal if it has been one + # second since its last emission. + interval = time.time() - self._last_ltb_emitted + if interval >= 1: + emit_signal() + return + + # Get it in microseconds. + interval /= 1000 + + # Protect against clock skew, if the interval is negative the + # worst thing that can happen is that we wait an extra second + # before emitting the signal. + interval = abs(interval) + + if interval > 1000: + emit_signal() + else: + self._progress_timer = gobject.timeout_add(1000 - interval, + emit_signal) + + def _transfer_completed(self, session, data): + logger.debug("Transfer completed") + self.set_state(telepathy.FILE_TRANSFER_STATE_COMPLETED, + telepathy.FILE_TRANSFER_STATE_CHANGE_REASON_NONE) + self.cleanup() + +class DataBuffer(object): + + def __init__(self, socket, size=0): + self._socket = socket + self._size = size + self._offset = 0 + self._buffer = "" + #self.add_channel() + + def seek(self, offset, position): + if position == 0: + self._offset = offset + elif position == 2: + self._offset = self._size + + def tell(self): + return self._offset + + def read(self, max_size=None): + if max_size is None: + # we can't read all the data; + # let's just return the last chunk + return self._buffer + max_size = min(max_size, self._size - self._offset) + data = self._socket.recv(max_size) + self._buffer = data + self._offset += len(data) + return data + + def write(self, data): + self._buffer = data + self._size += len(data) + self._offset += len(data) + self._socket.send(data) + + def add_channel(self): + sock = self._socket + sock.setblocking(False) + channel = gobject.IOChannel(sock.fileno()) + channel.set_encoding(None) + channel.set_buffered(False) + channel.set_flags(channel.get_flags() | gobject.IO_FLAG_NONBLOCK) + channel.add_watch(gobject.IO_HUP | gobject.IO_ERR, self.on_error) + channel.add_watch(gobject.IO_IN | gobject.IO_PRI, self.on_stream_received) + self.channel = channel + + def on_error(self, channel, condition): + logger.error("DataBuffer %s" % condition) + + def on_stream_disconnected(self, channel, condition): + pass + + def on_stream_received(self, channel, condition): + logger.info("Received data to send") + data = channel.read(1024) + print data + #self._session.send_chunk(data) diff --git a/butterfly/channel_manager.py b/butterfly/channel_manager.py index a17fb30..396a1fb 100644 --- a/butterfly/channel_manager.py +++ b/butterfly/channel_manager.py @@ -29,6 +29,7 @@ from butterfly.channel.group import ButterflyGroupChannel from butterfly.channel.im import ButterflyImChannel from butterfly.channel.muc import ButterflyMucChannel from butterfly.channel.conference import ButterflyConferenceChannel +from butterfly.channel.file_transfer import ButterflyFileTransferChannel from butterfly.channel.media import ButterflyMediaChannel from butterfly.handle import ButterflyHandleFactory @@ -81,6 +82,7 @@ def escape_as_identifier(identifier): class ButterflyChannelManager(telepathy.server.ChannelManager): __text_channel_id = 1 __media_channel_id = 1 + __ft_channel_id = 1 def __init__(self, connection): telepathy.server.ChannelManager.__init__(self, connection) @@ -124,6 +126,20 @@ class ButterflyChannelManager(telepathy.server.ChannelManager): # ] # self.implement_channel_classes(telepathy.CHANNEL_TYPE_STREAMED_MEDIA, self._get_media_channel, classes) + classes = [ + ({telepathy.CHANNEL_INTERFACE + '.ChannelType': telepathy.CHANNEL_TYPE_FILE_TRANSFER, + telepathy.CHANNEL_INTERFACE + '.TargetHandleType': dbus.UInt32(telepathy.HANDLE_TYPE_CONTACT)}, + [telepathy.CHANNEL_INTERFACE + '.TargetHandle', + telepathy.CHANNEL_INTERFACE + '.TargetID', + telepathy.CHANNEL_TYPE_FILE_TRANSFER + '.ContentType', + telepathy.CHANNEL_TYPE_FILE_TRANSFER + '.Filename', + telepathy.CHANNEL_TYPE_FILE_TRANSFER + '.Size', + telepathy.CHANNEL_TYPE_FILE_TRANSFER + '.ContentHash', + telepathy.CHANNEL_TYPE_FILE_TRANSFER + '.Description', + telepathy.CHANNEL_TYPE_FILE_TRANSFER + '.Date']) + ] + self.implement_channel_classes(telepathy.CHANNEL_TYPE_FILE_TRANSFER, self._get_ft_channel, classes) + def _get_list_channel(self, props): _, surpress_handler, handle = self._get_type_requested_handle(props) @@ -183,9 +199,27 @@ class ButterflyChannelManager(telepathy.server.ChannelManager): client = self._conn.msn_client call = client.call_manager.create_call(contact) - path = "MediaChannel/%d" % self.__media_channel_id self.__media_channel_id += 1 return ButterflyMediaChannel(self._conn, self, call, handle, props, object_path=path) + + def _get_ft_channel(self, props, session=None): + _, surpress_handler, handle = self._get_type_requested_handle(props) + + if handle.get_type() != telepathy.HANDLE_TYPE_CONTACT: + raise telepathy.NotImplemented('Only contacts are allowed') + + contact = handle.contact + + if contact.presence == papyon.Presence.OFFLINE: + raise telepathy.NotAvailable('Contact not available') + + logger.debug('New file transfer channel') + + path = "FileTransferChannel%d" % self.__ft_channel_id + self.__ft_channel_id += 1 + + return ButterflyFileTransferChannel(self._conn, self, session, handle, + props, object_path=path) diff --git a/butterfly/connection.py b/butterfly/connection.py index 5a2632a..044790f 100644 --- a/butterfly/connection.py +++ b/butterfly/connection.py @@ -467,6 +467,17 @@ class ButterflyConnection(telepathy.server.Connection, # Notify it of the message channel.offline_message_received(message) + # papyon.event.InviteEventInterface + def on_invite_file_transfer(self, session): + logger.debug("File transfer invite") + handle = ButterflyHandleFactory(self, 'contact', session.peer.account, + session.peer.network_id) + + props = self._generate_props(telepathy.CHANNEL_TYPE_FILE_TRANSFER, + handle, False) + channel = self._channel_manager.create_channel_for_props(props, + signal=True, session=session) + def _advertise_disconnected(self): self._manager.disconnected(self) diff --git a/telepathy-butterfly b/telepathy-butterfly index e4ecbed..246f9d4 100755 --- a/telepathy-butterfly +++ b/telepathy-butterfly @@ -88,6 +88,7 @@ if __name__ == '__main__': sys.exit(1) mainloop = gobject.MainLoop(is_running=True) + gobject.threads_init() while mainloop.is_running(): try: |