summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJonny Lamb <jonny.lamb@collabora.co.uk>2010-07-09 18:04:24 +0100
committerJonny Lamb <jonny.lamb@collabora.co.uk>2010-07-09 18:04:24 +0100
commit1db384e1d6a809721c046fe8aa782fa0bbf502cc (patch)
tree529624d568948b2c8c40038dc08b0e071e3761da
parentbd71b7b0208e39b1d882d404e0da719239148388 (diff)
parentd29b9888d93212d82f8d1b8732eff838f32f48ac (diff)
Merge remote branch 'lfrb/file-transfer'
-rw-r--r--butterfly/capabilities.py76
-rw-r--r--butterfly/channel/Makefile.am1
-rw-r--r--butterfly/channel/file_transfer.py345
-rw-r--r--butterfly/channel_manager.py36
-rw-r--r--butterfly/connection.py11
-rwxr-xr-xtelepathy-butterfly1
6 files changed, 446 insertions, 24 deletions
diff --git a/butterfly/capabilities.py b/butterfly/capabilities.py
index 67025a2..1c876e5 100644
--- a/butterfly/capabilities.py
+++ b/butterfly/capabilities.py
@@ -65,6 +65,18 @@ class ButterflyCapabilities(
telepathy.CHANNEL_TYPE_STREAMED_MEDIA + '.InitialAudio',
telepathy.CHANNEL_TYPE_STREAMED_MEDIA + '.InitialVideo'])
+ file_transfer_class = \
+ ({telepathy.CHANNEL_INTERFACE + '.ChannelType':
+ telepathy.CHANNEL_TYPE_FILE_TRANSFER,
+ telepathy.CHANNEL_INTERFACE + '.TargetHandleType':
+ dbus.UInt32(telepathy.HANDLE_TYPE_CONTACT)},
+ [telepathy.CHANNEL_INTERFACE + '.TargetHandle',
+ telepathy.CHANNEL_INTERFACE + '.TargetID',
+ telepathy.CHANNEL_TYPE_FILE_TRANSFER + '.Requested',
+ telepathy.CHANNEL_TYPE_FILE_TRANSFER + '.Filename',
+ telepathy.CHANNEL_TYPE_FILE_TRANSFER + '.Size',
+ telepathy.CHANNEL_TYPE_FILE_TRANSFER + '.ContentType'])
+
def __init__(self):
telepathy.server.ConnectionInterfaceCapabilities.__init__(self)
@@ -154,31 +166,55 @@ class ButterflyCapabilities(
# papyon.event.AddressBookEventInterface
def on_addressbook_contact_added(self, contact):
"""When we add a contact in our contact list, add the
- capabilities to create text channel to the contact"""
+ default capabilities to the contact"""
if contact.is_member(papyon.Membership.FORWARD):
handle = ButterflyHandleFactory(self, 'contact',
contact.account, contact.network_id)
- self.add_text_capabilities([handle])
+ self.add_default_capabilities([handle])
- def add_text_capabilities(self, contacts_handles):
- """Add the create capability for text channel to these contacts."""
+ def _diff_capabilities(self, handle, ctype, new_gen=None,
+ new_spec=None, added_gen=None, added_spec=None):
+
+ if handle in self._caps and ctype in self._caps[handle]:
+ old_gen, old_spec = self._caps[handle][ctype]
+ else:
+ old_gen = 0
+ old_spec = 0
+
+ if new_gen is None:
+ new_gen = old_gen
+ if new_spec is None:
+ new_spec = old_spec
+ if added_gen:
+ new_gen |= added_gen
+ if added_spec:
+ new_spec |= new_spec
+
+ if old_gen != new_gen or old_spec != new_spec:
+ diff = (int(handle), ctype, old_gen, new_gen, old_spec, new_spec)
+ return diff
+
+ return None
+
+ def add_default_capabilities(self, contacts_handles):
+ """Add the default capabilities to these contacts."""
ret = []
cc_ret = dbus.Dictionary({}, signature='ua(a{sv}as)')
for handle in contacts_handles:
+ new_flag = telepathy.CONNECTION_CAPABILITY_FLAG_CREATE
+
ctype = telepathy.CHANNEL_TYPE_TEXT
- if handle in self._caps and ctype in self._caps[handle]:
- old_gen, old_spec = self._caps[handle][ctype]
- else:
- old_gen = 0
- old_spec = 0
- new_gen = old_gen
- new_gen |= telepathy.CONNECTION_CAPABILITY_FLAG_CREATE
+ diff = self._diff_capabilities(handle, ctype, added_gen=new_flag)
+ ret.append(diff)
- diff = (int(handle), ctype, old_gen, new_gen, old_spec, old_spec)
+ ctype = telepathy.CHANNEL_TYPE_FILE_TRANSFER
+ diff = self._diff_capabilities(handle, ctype, added_gen=new_flag)
ret.append(diff)
# ContactCapabilities
- self._contact_caps.setdefault(handle, []).append(self.text_chat_class)
+ caps = self._contact_caps.setdefault(handle, [])
+ caps.append(self.text_chat_class)
+ caps.append(self.file_transfer_class)
cc_ret[handle] = self._contact_caps[handle]
self.CapabilitiesChanged(ret)
@@ -190,14 +226,8 @@ class ButterflyCapabilities(
ctype = telepathy.CHANNEL_TYPE_STREAMED_MEDIA
new_gen, new_spec, rcc = self._get_capabilities(contact)
- if handle in self._caps:
- old_gen, old_spec = self._caps[handle][ctype]
- else:
- old_gen = 0
- old_spec = 0
-
- if old_gen != new_gen or old_spec != new_spec:
- diff = (int(handle), ctype, old_gen, new_gen, old_spec, new_spec)
+ diff = self._diff_capabilities(handle, ctype, new_gen, new_spec)
+ if diff is not None:
self.CapabilitiesChanged([diff])
if rcc is None:
@@ -243,7 +273,7 @@ class ButterflyCapabilities(
@async
def _populate_capabilities(self):
- """ Add the capability to create text channels to all contacts in our
+ """ Add the default capabilities to all contacts in our
contacts list."""
handles = set([self._self_handle])
for contact in self.msn_client.address_book.contacts:
@@ -251,7 +281,7 @@ class ButterflyCapabilities(
handle = ButterflyHandleFactory(self, 'contact',
contact.account, contact.network_id)
handles.add(handle)
- self.add_text_capabilities(handles)
+ self.add_default_capabilities(handles)
# These caps were updated before we were online.
for caps in self._update_capabilities_calls:
diff --git a/butterfly/channel/Makefile.am b/butterfly/channel/Makefile.am
index 91d02bb..6ac1774 100644
--- a/butterfly/channel/Makefile.am
+++ b/butterfly/channel/Makefile.am
@@ -2,6 +2,7 @@ channeldir = $(pythondir)/butterfly/channel
channel_PYTHON = \
conference.py \
contact_list.py \
+ file_transfer.py \
group.py \
im.py \
__init__.py \
diff --git a/butterfly/channel/file_transfer.py b/butterfly/channel/file_transfer.py
new file mode 100644
index 0000000..bd35817
--- /dev/null
+++ b/butterfly/channel/file_transfer.py
@@ -0,0 +1,345 @@
+# telepathy-butterfly - an MSN connection manager for Telepathy
+#
+# Copyright (C) 2010 Collabora Ltd.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+import logging
+import weakref
+import time
+import tempfile
+import os
+import shutil
+
+import dbus
+import gobject
+import telepathy
+import papyon
+import papyon.event
+import socket
+
+from butterfly.util.decorator import async
+from butterfly.handle import ButterflyHandleFactory
+
+from telepathy.interfaces import CHANNEL_TYPE_FILE_TRANSFER
+
+__all__ = ['ButterflyFileTransferChannel']
+
+logger = logging.getLogger('Butterfly.FileTransferChannel')
+
+
+class ButterflyFileTransferChannel(telepathy.server.ChannelTypeFileTransfer):
+
+ def __init__(self, conn, manager, session, handle, props, object_path=None):
+ telepathy.server.ChannelTypeFileTransfer.__init__(self, conn, manager, props,
+ object_path=object_path)
+
+ self._handle = handle
+ self._conn_ref = weakref.ref(conn)
+ self._state = 0
+ self._transferred = 0
+
+ self._receiving = not props[telepathy.CHANNEL + '.Requested']
+ self.socket = None
+ self._tmpdir = None
+
+ self._last_ltb_emitted = 0
+ self._progress_timer = 0
+
+ # Incoming.
+ if session is None:
+ type = telepathy.CHANNEL_TYPE_FILE_TRANSFER
+ filename = props.get(type + ".Filename", None)
+ size = props.get(type + ".Size", None)
+
+ if filename is None or size is None:
+ raise telepathy.InvalidArgument(
+ "New file transfer channel requires Filename and Size properties")
+
+ client = conn.msn_client
+ session = client.ft_manager.send(handle.contact, filename, size)
+
+ self._session = session
+ self._filename = session.filename
+ self._size = session.size
+
+ session.connect("accepted", self._transfer_accepted)
+ session.connect("progressed", self._transfer_progressed)
+ session.connect("completed", self._transfer_completed)
+
+ dbus_interface = telepathy.CHANNEL_TYPE_FILE_TRANSFER
+ self._implement_property_get(dbus_interface, {
+ 'State' : lambda: dbus.UInt32(self.state),
+ 'ContentType': lambda: self.content_type,
+ 'Filename': lambda: self.filename,
+ 'Size': lambda: dbus.UInt64(self.size),
+ 'Description': lambda: self.description,
+ 'AvailableSocketTypes': lambda: self.socket_types,
+ 'TransferredBytes': lambda: self.transferred,
+ 'InitialOffset': lambda: self.offset
+ })
+
+ self._add_immutables({
+ 'Filename': CHANNEL_TYPE_FILE_TRANSFER,
+ 'Size': CHANNEL_TYPE_FILE_TRANSFER,
+ })
+
+ self.set_state(telepathy.FILE_TRANSFER_STATE_PENDING,
+ telepathy.FILE_TRANSFER_STATE_CHANGE_REASON_REQUESTED)
+
+ @property
+ def state(self):
+ return self._state
+
+ @property
+ def content_type(self):
+ return "application/octet-stream"
+
+ @property
+ def filename(self):
+ return self._filename
+
+ @property
+ def size(self):
+ return self._size
+
+ @property
+ def description(self):
+ return ""
+
+ @property
+ def socket_types(self):
+ return {telepathy.SOCKET_ADDRESS_TYPE_UNIX:
+ [telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST,
+ telepathy.SOCKET_ACCESS_CONTROL_CREDENTIALS]}
+
+ @property
+ def transferred(self):
+ return self._transferred
+
+ @property
+ def offset(self):
+ return 0
+
+ def set_state(self, state, reason):
+ if self._state == state:
+ return
+ logger.debug("State change: %u -> %u (reason: %u)" % (self._state, state, reason))
+ self._state = state
+ self.FileTransferStateChanged(state, reason)
+
+ def AcceptFile(self, address_type, access_control, param, offset):
+ logger.debug("Accept file")
+
+ if address_type not in self.socket_types.keys():
+ raise telepathy.NotImplemented("Socket type %u is unsupported" % address_type)
+
+ self.socket = self.add_listener()
+ self.channel = self.add_io_channel(self.socket)
+ self.set_state(telepathy.FILE_TRANSFER_STATE_PENDING,
+ telepathy.FILE_TRANSFER_STATE_CHANGE_REASON_REQUESTED)
+ self.InitialOffsetDefined(0)
+ self.set_state(telepathy.FILE_TRANSFER_STATE_OPEN,
+ telepathy.FILE_TRANSFER_STATE_CHANGE_REASON_NONE)
+ return self.socket.getsockname()
+
+ def ProvideFile(self, address_type, access_control, param):
+ logger.debug("Provide file")
+
+ if address_type not in self.socket_types.keys():
+ raise telepathy.NotImplemented("Socket type %u is unsupported" % address_type)
+
+ self.socket = self.add_listener()
+ self.channel = self.add_io_channel(self.socket)
+ return self.socket.getsockname()
+
+ def Close(self):
+ logger.debug("Close")
+ self.cleanup()
+ if self.state not in (telepathy.FILE_TRANSFER_STATE_CANCELLED,
+ telepathy.FILE_TRANSFER_STATE_COMPLETED):
+ self.set_state(telepathy.FILE_TRANSFER_STATE_CANCELLED,
+ telepathy.FILE_TRANSFER_STATE_CHANGE_REASON_LOCAL_CANCELLED)
+ telepathy.server.ChannelTypeFileTransfer.Close(self)
+ self.remove_from_connection()
+
+ def cleanup(self):
+ if self._receiving and self.state == telepathy.FILE_TRANSFER_STATE_PENDING:
+ self._session.reject()
+
+ if self.state not in (telepathy.FILE_TRANSFER_STATE_CANCELLED,
+ telepathy.FILE_TRANSFER_STATE_COMPLETED):
+ self._session.cancel()
+
+ if self.socket:
+ self.socket.close()
+ self.socket = None
+
+ if self._tmpdir:
+ shutil.rmtree(self._tmpdir)
+ self._tmpdir = None
+
+ def GetSelfHandle(self):
+ return self._conn.GetSelfHandle()
+
+ def add_listener(self):
+ """Create a listener socket"""
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
+ self._tmpdir = tempfile.mkdtemp(prefix="butterfly")
+ sock.bind(os.path.join(self._tmpdir, "ft-socket"))
+ sock.listen(1)
+ return sock
+
+ def add_io_channel(self, sock):
+ """Set up notification on the socket via a giochannel"""
+ sock.setblocking(False)
+ channel = gobject.IOChannel(sock.fileno())
+ channel.set_flags(channel.get_flags() | gobject.IO_FLAG_NONBLOCK)
+ channel.add_watch(gobject.IO_IN, self._socket_connected)
+ channel.add_watch(gobject.IO_HUP | gobject.IO_ERR,
+ self._socket_disconnected)
+ return channel
+
+ def _socket_connected(self, channel, condition):
+ logger.debug("Client socket connected")
+ sock = self.socket.accept()[0]
+ if self._receiving:
+ buffer = DataBuffer(sock)
+ self._session.set_receive_data_buffer(buffer, self.size)
+ # Notify the other end we accepted the FT
+ self._session.accept()
+ else:
+ buffer = DataBuffer(sock, self.size)
+ self._session.send(buffer)
+ self.socket = sock
+
+ def _socket_disconnected(self, channel, condition):
+ logger.debug("Client socket disconnected")
+ #self.cleanup()
+ #TODO only cancel if the socket is disconnected while listening
+ #self._session.cancel()
+ #self.set_state(telepathy.FILE_TRANSFER_STATE_CANCELLED,
+ # telepathy.FILE_TRANSFER_STATE_CHANGE_REASON_LOCAL_ERROR)
+
+ def _transfer_accepted(self, session):
+ logger.debug("Transfer has been accepted")
+ self.set_state(telepathy.FILE_TRANSFER_STATE_ACCEPTED,
+ telepathy.FILE_TRANSFER_STATE_CHANGE_REASON_REQUESTED)
+ self.set_state(telepathy.FILE_TRANSFER_STATE_OPEN,
+ telepathy.FILE_TRANSFER_STATE_CHANGE_REASON_NONE)
+
+ def _transfer_progressed(self, session, size):
+ self._transferred += size
+
+ def emit_signal():
+ self.TransferredBytesChanged(self.transferred)
+ self._last_ltb_emitted = time.time()
+ self._progress_timer = 0
+ return False
+
+ # If the transfer has finished send an update right away.
+ if self.transferred >= self.size:
+ emit_signal()
+ return
+
+ # A progress update signal is already scheduled.
+ if self._progress_timer != 0:
+ return
+
+ # Only emit the TransferredBytes signal if it has been one
+ # second since its last emission.
+ interval = time.time() - self._last_ltb_emitted
+ if interval >= 1:
+ emit_signal()
+ return
+
+ # Get it in microseconds.
+ interval /= 1000
+
+ # Protect against clock skew, if the interval is negative the
+ # worst thing that can happen is that we wait an extra second
+ # before emitting the signal.
+ interval = abs(interval)
+
+ if interval > 1000:
+ emit_signal()
+ else:
+ self._progress_timer = gobject.timeout_add(1000 - interval,
+ emit_signal)
+
+ def _transfer_completed(self, session, data):
+ logger.debug("Transfer completed")
+ self.set_state(telepathy.FILE_TRANSFER_STATE_COMPLETED,
+ telepathy.FILE_TRANSFER_STATE_CHANGE_REASON_NONE)
+ self.cleanup()
+
+class DataBuffer(object):
+
+ def __init__(self, socket, size=0):
+ self._socket = socket
+ self._size = size
+ self._offset = 0
+ self._buffer = ""
+ #self.add_channel()
+
+ def seek(self, offset, position):
+ if position == 0:
+ self._offset = offset
+ elif position == 2:
+ self._offset = self._size
+
+ def tell(self):
+ return self._offset
+
+ def read(self, max_size=None):
+ if max_size is None:
+ # we can't read all the data;
+ # let's just return the last chunk
+ return self._buffer
+ max_size = min(max_size, self._size - self._offset)
+ data = self._socket.recv(max_size)
+ self._buffer = data
+ self._offset += len(data)
+ return data
+
+ def write(self, data):
+ self._buffer = data
+ self._size += len(data)
+ self._offset += len(data)
+ self._socket.send(data)
+
+ def add_channel(self):
+ sock = self._socket
+ sock.setblocking(False)
+ channel = gobject.IOChannel(sock.fileno())
+ channel.set_encoding(None)
+ channel.set_buffered(False)
+ channel.set_flags(channel.get_flags() | gobject.IO_FLAG_NONBLOCK)
+ channel.add_watch(gobject.IO_HUP | gobject.IO_ERR, self.on_error)
+ channel.add_watch(gobject.IO_IN | gobject.IO_PRI, self.on_stream_received)
+ self.channel = channel
+
+ def on_error(self, channel, condition):
+ logger.error("DataBuffer %s" % condition)
+
+ def on_stream_disconnected(self, channel, condition):
+ pass
+
+ def on_stream_received(self, channel, condition):
+ logger.info("Received data to send")
+ data = channel.read(1024)
+ print data
+ #self._session.send_chunk(data)
diff --git a/butterfly/channel_manager.py b/butterfly/channel_manager.py
index a17fb30..396a1fb 100644
--- a/butterfly/channel_manager.py
+++ b/butterfly/channel_manager.py
@@ -29,6 +29,7 @@ from butterfly.channel.group import ButterflyGroupChannel
from butterfly.channel.im import ButterflyImChannel
from butterfly.channel.muc import ButterflyMucChannel
from butterfly.channel.conference import ButterflyConferenceChannel
+from butterfly.channel.file_transfer import ButterflyFileTransferChannel
from butterfly.channel.media import ButterflyMediaChannel
from butterfly.handle import ButterflyHandleFactory
@@ -81,6 +82,7 @@ def escape_as_identifier(identifier):
class ButterflyChannelManager(telepathy.server.ChannelManager):
__text_channel_id = 1
__media_channel_id = 1
+ __ft_channel_id = 1
def __init__(self, connection):
telepathy.server.ChannelManager.__init__(self, connection)
@@ -124,6 +126,20 @@ class ButterflyChannelManager(telepathy.server.ChannelManager):
# ]
# self.implement_channel_classes(telepathy.CHANNEL_TYPE_STREAMED_MEDIA, self._get_media_channel, classes)
+ classes = [
+ ({telepathy.CHANNEL_INTERFACE + '.ChannelType': telepathy.CHANNEL_TYPE_FILE_TRANSFER,
+ telepathy.CHANNEL_INTERFACE + '.TargetHandleType': dbus.UInt32(telepathy.HANDLE_TYPE_CONTACT)},
+ [telepathy.CHANNEL_INTERFACE + '.TargetHandle',
+ telepathy.CHANNEL_INTERFACE + '.TargetID',
+ telepathy.CHANNEL_TYPE_FILE_TRANSFER + '.ContentType',
+ telepathy.CHANNEL_TYPE_FILE_TRANSFER + '.Filename',
+ telepathy.CHANNEL_TYPE_FILE_TRANSFER + '.Size',
+ telepathy.CHANNEL_TYPE_FILE_TRANSFER + '.ContentHash',
+ telepathy.CHANNEL_TYPE_FILE_TRANSFER + '.Description',
+ telepathy.CHANNEL_TYPE_FILE_TRANSFER + '.Date'])
+ ]
+ self.implement_channel_classes(telepathy.CHANNEL_TYPE_FILE_TRANSFER, self._get_ft_channel, classes)
+
def _get_list_channel(self, props):
_, surpress_handler, handle = self._get_type_requested_handle(props)
@@ -183,9 +199,27 @@ class ButterflyChannelManager(telepathy.server.ChannelManager):
client = self._conn.msn_client
call = client.call_manager.create_call(contact)
-
path = "MediaChannel/%d" % self.__media_channel_id
self.__media_channel_id += 1
return ButterflyMediaChannel(self._conn, self, call, handle, props,
object_path=path)
+
+ def _get_ft_channel(self, props, session=None):
+ _, surpress_handler, handle = self._get_type_requested_handle(props)
+
+ if handle.get_type() != telepathy.HANDLE_TYPE_CONTACT:
+ raise telepathy.NotImplemented('Only contacts are allowed')
+
+ contact = handle.contact
+
+ if contact.presence == papyon.Presence.OFFLINE:
+ raise telepathy.NotAvailable('Contact not available')
+
+ logger.debug('New file transfer channel')
+
+ path = "FileTransferChannel%d" % self.__ft_channel_id
+ self.__ft_channel_id += 1
+
+ return ButterflyFileTransferChannel(self._conn, self, session, handle,
+ props, object_path=path)
diff --git a/butterfly/connection.py b/butterfly/connection.py
index 5a2632a..044790f 100644
--- a/butterfly/connection.py
+++ b/butterfly/connection.py
@@ -467,6 +467,17 @@ class ButterflyConnection(telepathy.server.Connection,
# Notify it of the message
channel.offline_message_received(message)
+ # papyon.event.InviteEventInterface
+ def on_invite_file_transfer(self, session):
+ logger.debug("File transfer invite")
+ handle = ButterflyHandleFactory(self, 'contact', session.peer.account,
+ session.peer.network_id)
+
+ props = self._generate_props(telepathy.CHANNEL_TYPE_FILE_TRANSFER,
+ handle, False)
+ channel = self._channel_manager.create_channel_for_props(props,
+ signal=True, session=session)
+
def _advertise_disconnected(self):
self._manager.disconnected(self)
diff --git a/telepathy-butterfly b/telepathy-butterfly
index e4ecbed..246f9d4 100755
--- a/telepathy-butterfly
+++ b/telepathy-butterfly
@@ -88,6 +88,7 @@ if __name__ == '__main__':
sys.exit(1)
mainloop = gobject.MainLoop(is_running=True)
+ gobject.threads_init()
while mainloop.is_running():
try: