diff options
author | Simon McVittie <simon.mcvittie@collabora.co.uk> | 2007-07-13 14:11:55 +0000 |
---|---|---|
committer | Simon McVittie <simon.mcvittie@collabora.co.uk> | 2007-07-13 14:11:55 +0000 |
commit | fd589714cddc08422f0a6473f85494f5c0da4890 (patch) | |
tree | 5bc52e48aba1a1f27b3b780f734b27cc37ea3af3 | |
parent | ed13f56b2eb40fd6783ab406bead3ab4b2cb56d0 (diff) |
Remove scw-client and pygtkconsole
-rw-r--r-- | pygtkconsole.py | 348 | ||||
-rw-r--r-- | scw-client.py | 679 |
2 files changed, 0 insertions, 1027 deletions
diff --git a/pygtkconsole.py b/pygtkconsole.py deleted file mode 100644 index d363f74..0000000 --- a/pygtkconsole.py +++ /dev/null @@ -1,348 +0,0 @@ -#!/usr/bin/env python - -# GTK Interactive Console -# (C) 2003, Jon Anderson -# See www.python.org/2.2/license.html for -# license details. -# -import gtk -import gtk.gdk -import code -import sys -import pango - -import __builtin__ -import __main__ - -banner = """GTK Interactive Python Console -%s -""" % sys.version - -class Completer: - """ - Taken from rlcompleter, with readline references stripped, and a local dictionary to use. - """ - def __init__(self, locals): - self.locals = locals - - def complete(self, text, state): - """Return the next possible completion for 'text'. - This is called successively with state == 0, 1, 2, ... until it - returns None. The completion should begin with 'text'. - - """ - if state == 0: - if "." in text: - self.matches = self.attr_matches(text) - else: - self.matches = self.global_matches(text) - try: - return self.matches[state] - except IndexError: - return None - - def global_matches(self, text): - """Compute matches when text is a simple name. - - Return a list of all keywords, built-in functions and names - currently defines in __main__ that match. - - """ - import keyword - matches = [] - n = len(text) - for list in [keyword.kwlist,__builtin__.__dict__.keys(),__main__.__dict__.keys(), self.locals.keys()]: - for word in list: - if word[:n] == text and word != "__builtins__": - matches.append(word) - return matches - - def attr_matches(self, text): - """Compute matches when text contains a dot. - - Assuming the text is of the form NAME.NAME....[NAME], and is - evaluatable in the globals of __main__, it will be evaluated - and its attributes (as revealed by dir()) are used as possible - completions. (For class instances, class members are are also - considered.) - - WARNING: this can still invoke arbitrary C code, if an object - with a __getattr__ hook is evaluated. - - """ - import re - m = re.match(r"(\w+(\.\w+)*)\.(\w*)", text) - if not m: - return - expr, attr = m.group(1, 3) - object = eval(expr, __main__.__dict__, self.locals) - words = dir(object) - if hasattr(object,'__class__'): - words.append('__class__') - words = words + get_class_members(object.__class__) - matches = [] - n = len(attr) - for word in words: - if word[:n] == attr and word != "__builtins__": - matches.append("%s.%s" % (expr, word)) - return matches - -def get_class_members(klass): - ret = dir(klass) - if hasattr(klass,'__bases__'): - for base in klass.__bases__: - ret = ret + get_class_members(base) - return ret - - - -class OutputStream: - """ - A Multiplexing output stream. - It can replace another stream, and tee output to the original stream and too - a GTK textview. - """ - def __init__(self,view,old_out,style): - self.view = view - self.buffer = view.get_buffer() - self.mark = self.buffer.create_mark("End",self.buffer.get_end_iter(), False ) - self.out = old_out - self.style = style - self.tee = 1 - - def write(self,text): - if self.tee: - self.out.write(text) - - end = self.buffer.get_end_iter() - - if not self.view == None: - self.view.scroll_to_mark(self.mark, 0, True, 1, 1) - - self.buffer.insert_with_tags(end,text,self.style) - -class GTKInterpreterConsole(gtk.ScrolledWindow): - """ - An InteractiveConsole for GTK. It's an actual widget, - so it can be dropped in just about anywhere. - """ - def __init__(self): - gtk.ScrolledWindow.__init__(self) - self.set_policy (gtk.POLICY_AUTOMATIC,gtk.POLICY_AUTOMATIC) - - self.text = gtk.TextView() - self.text.set_wrap_mode(gtk.WRAP_WORD) - - self.interpreter = code.InteractiveInterpreter() - - self.completer = Completer(self.interpreter.locals) - self.buffer = [] - self.history = [] - self.banner = banner - self.ps1 = ">>> " - self.ps2 = "... " - - self.text.add_events( gtk.gdk.KEY_PRESS_MASK ) - self.text.connect( "key_press_event", self.key_pressed ) - - self.current_history = -1 - - self.mark = self.text.get_buffer().create_mark("End",self.text.get_buffer().get_end_iter(), False ) - - #setup colors - self.style_banner = gtk.TextTag("banner") - self.style_banner.set_property( "foreground", "saddle brown" ) - - self.style_ps1 = gtk.TextTag("ps1") - self.style_ps1.set_property( "foreground", "DarkOrchid4" ) - self.style_ps1.set_property( "editable", False ) - self.style_ps1.set_property("font", "courier" ) - - self.style_ps2 = gtk.TextTag("ps2") - self.style_ps2.set_property( "foreground", "DarkOliveGreen" ) - self.style_ps2.set_property( "editable", False ) - self.style_ps2.set_property("font", "courier" ) - - self.style_out = gtk.TextTag("stdout") - self.style_out.set_property( "foreground", "midnight blue" ) - self.style_err = gtk.TextTag("stderr") - self.style_err.set_property( "style", pango.STYLE_ITALIC ) - self.style_err.set_property( "foreground", "red" ) - - self.text.get_buffer().get_tag_table().add(self.style_banner) - self.text.get_buffer().get_tag_table().add(self.style_ps1) - self.text.get_buffer().get_tag_table().add(self.style_ps2) - self.text.get_buffer().get_tag_table().add(self.style_out) - self.text.get_buffer().get_tag_table().add(self.style_err) - - self.stdout = OutputStream(self.text,sys.stdout,self.style_out) - self.stderr = OutputStream(self.text,sys.stderr,self.style_err) - - sys.stderr = self.stderr - sys.stdout = self.stdout - - self.current_prompt = None - - self.write_line(self.banner, self.style_banner) - self.prompt_ps1() - - self.add(self.text) - self.text.show() - - - - def reset_history(self): - self.history = [] - - def reset_buffer(self): - self.buffer = [] - - def prompt_ps1(self): - self.current_prompt = self.prompt_ps1 - self.write_line(self.ps1,self.style_ps1) - - def prompt_ps2(self): - self.current_prompt = self.prompt_ps2 - self.write_line(self.ps2,self.style_ps2) - - def write_line(self,text,style=None): - start,end = self.text.get_buffer().get_bounds() - if style==None: - self.text.get_buffer().insert(end,text) - else: - self.text.get_buffer().insert_with_tags(end,text,style) - - self.text.scroll_to_mark(self.mark, 0, True, 1, 1) - - def push(self, line): - - self.buffer.append(line) - if len(line) > 0: - self.history.append(line) - - source = "\n".join(self.buffer) - - more = self.interpreter.runsource(source, "<<console>>") - - if not more: - self.reset_buffer() - - return more - - def key_pressed(self,widget,event): - if event.keyval == gtk.gdk.keyval_from_name('Return'): - return self.execute_line() - - if event.keyval == gtk.gdk.keyval_from_name('Up'): - self.current_history = self.current_history - 1 - if self.current_history < - len(self.history): - self.current_history = - len(self.history) - return self.show_history() - elif event.keyval == gtk.gdk.keyval_from_name('Down'): - self.current_history = self.current_history + 1 - if self.current_history > 0: - self.current_history = 0 - return self.show_history() - elif event.keyval == gtk.gdk.keyval_from_name( 'Home'): - l = self.text.get_buffer().get_line_count() - 1 - start = self.text.get_buffer().get_iter_at_line_offset(l,4) - self.text.get_buffer().place_cursor(start) - return True - elif event.keyval == gtk.gdk.keyval_from_name( 'space') and event.state & gtk.gdk.CONTROL_MASK: - return self.complete_line() - return False - - def show_history(self): - if self.current_history == 0: - return True - else: - self.replace_line( self.history[self.current_history] ) - return True - - def current_line(self): - start,end = self.current_line_bounds() - return self.text.get_buffer().get_text(start,end, True) - - def current_line_bounds(self): - txt_buffer = self.text.get_buffer() - l = txt_buffer.get_line_count() - 1 - - start = txt_buffer.get_iter_at_line(l) - if start.get_chars_in_line() >= 4: - start.forward_chars(4) - end = txt_buffer.get_end_iter() - return start,end - - def replace_line(self,txt): - start,end = self.current_line_bounds() - self.text.get_buffer().delete(start,end) - self.write_line(txt) - - def execute_line(self): - line = self.current_line() - - self.write_line("\n") - - more = self.push(line) - - self.text.get_buffer().place_cursor(self.text.get_buffer().get_end_iter()) - - if more: - self.prompt_ps2() - else: - self.prompt_ps1() - - - self.current_history = 0 - - return True - - def complete_line(self): - line = self.current_line() - tokens = line.split() - token = tokens[-1] - - completions = [] - p = self.completer.complete(token,len(completions)) - while p != None: - completions.append(p) - p = self.completer.complete(token, len(completions)) - - if len(completions) != 1: - self.write_line("\n") - self.write_line("\n".join(completions), self.style_ps1) - self.write_line("\n") - self.current_prompt() - self.write_line(line) - else: - i = line.rfind(token) - line = line[0:i] + completions[0] - self.replace_line(line) - - return True - - -def main(): - w = gtk.Window() - console = GTKInterpreterConsole() - console.set_size_request(640,480) - w.add(console) - def destroy(arg=None): - gtk.main_quit() - - def key_event(widget,event): - if gtk.gdk.keyval_name( event.keyval) == 'd' and event.state & gtk.gdk.CONTROL_MASK: - destroy() - return False - - w.connect("destroy", destroy) - - w.add_events( gtk.gdk.KEY_PRESS_MASK ) - w.connect( 'key_press_event', key_event) - w.show_all() - - - gtk.main() - -if __name__ == '__main__': - main() diff --git a/scw-client.py b/scw-client.py deleted file mode 100644 index 7ac0dc7..0000000 --- a/scw-client.py +++ /dev/null @@ -1,679 +0,0 @@ -#!/usr/bin/env python - -# scw-client - Simple GUI client for testing parts of Telepathy connection managers -# -# Copyright (C) 2005, 2006 Collabora Limited -# Copyright (C) 2005, 2006 Nokia Corporation -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Library General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -import dbus -import dbus.glib - -assert(getattr(dbus, 'version', (0,0,0)) >= (0,51,0)) - -import getpass -import gobject -import signal -import sys - -import pygtk -pygtk.require('2.0') -import gtk -import pango -import scw - -from telepathy import * - -import telepathy.client - -_dbus_int_types = { - 'y': dbus.Byte, - 'n': dbus.Int16, - 'q': dbus.UInt16, - 'i': dbus.Int32, - 'u': dbus.UInt32, - 'x': dbus.Int64, - 't': dbus.UInt64, -} -_dbus_stringy_types = { - 's': dbus.String, - 'o': dbus.ObjectPath, - 'g': dbus.Signature, -} - -def _coerce_string_to_dbus(s, sig): - if sig == 'b': - s = s.upper().strip() - if s in ('0', 'FALSE'): - return False - elif s in ('1', 'TRUE'): - return True - raise ValueError('Unable to translate into DBUS bool: %r' % s) - elif sig in _dbus_stringy_types: - return _dbus_stringy_types[sig](s) - elif sig in _dbus_int_types: - return _dbus_int_types[sig](s) - else: - raise TypeError('Unhandled DBUS signature %s') - -class ContactWindow: - def __init__(self, conn): - self._conn = conn - self._conn[CONN_INTERFACE].GetSelfHandle(reply_handler=self.get_self_handle_reply_cb, - error_handler=self.error_cb) - - if CONN_INTERFACE_PRESENCE in self._conn.get_valid_interfaces(): - self._conn[CONN_INTERFACE_PRESENCE].connect_to_signal('PresenceUpdate', self.presence_update_signal_cb) - self._conn[CONN_INTERFACE_PRESENCE].GetStatuses(reply_handler=self.get_statuses_reply_cb, - error_handler=self.error_cb) - - self._subscribe = None - self._publish = None - - self._statuses = None - - self._window = gtk.Window() - self._window.connect("delete-event", self.gtk_delete_event_cb) - self._window.set_size_request(400, 300) - - self._box = gtk.VBox(False, 6) - self._window.add(self._box) - - self._swin = gtk.ScrolledWindow(None, None) - self._swin.set_policy (gtk.POLICY_NEVER, gtk.POLICY_AUTOMATIC) - self._box.add(self._swin) - - self._model = gtk.ListStore(gtk.gdk.Pixbuf, - scw.TYPE_PRESENCE, - gobject.TYPE_STRING, - int) - self._model_rows = {} - - self._view = scw.View() - print "connecting up activate" - self._view.connect("activate", self.gtk_view_activate_cb) - self._view.set_property("model", self._model) - self._view.set_column_foldable(2, True) - self._view.set_column_visible(3, False) - self._swin.add(self._view) - - self._toolbar = gtk.Toolbar() - self._box.pack_end(self._toolbar, False, True, 0) - - self._window.set_title("Contacts") - self._window.show_all() - - self._icon = gtk.gdk.pixbuf_new_from_file_at_size("tabby/data/images/face-surprise.png", 22, 22) - - def set_window_title_cb(self, handle, handle_type, name): - self._window.set_title("Contacts for %s" % name) - - def get_self_handle_reply_cb(self, handle): - self._conn.call_with_handle(CONNECTION_HANDLE_TYPE_CONTACT, handle, self.set_window_title_cb) - - def get_statuses_reply_cb(self, statuses): - self._statuses = statuses - print "GetStatuses replied", statuses - - def presence_update_signal_cb(self, presences): - print "Got PresenceUpdate:", presences - for (handle, presence) in presences.iteritems(): - self.update_contact(handle, presence) - - def get_status_message(self, name, parameters): - if name == 'available': - msg = 'Available' - elif name == 'away': - msg = 'Away' - elif name == 'brb': - msg = 'Be Right Back' - elif name == 'busy': - msg = 'Busy' - elif name == 'dnd': - msg = 'Do Not Disturb' - elif name == 'xa': - msg = 'Extended Away' - elif name == 'hidden': - msg = 'Hidden' - elif name == 'offline': - msg = 'Offline' - else: - msg = 'Unknown' - - if 'message' in parameters: - msg = "%s: %s" % (msg, parameters['message']) - - return msg - - def update_contact(self, handle, presence): - idle, statuses = presence - - # FIXME: horrible inefficiency - iter = self._model.get_iter_first() - while iter: - cur = self._model.get_value(iter, 3) - if cur == handle: - print "found matching contact list entry", self._model.get_value(iter, 1) - break - iter = self._model.iter_next(iter) - - for (name, params) in statuses.iteritems(): - print "update contact with presence", handle, name, params -# row = self._model_rows[handle] -# path = row.get_path() -# iter = self._model.get_iter() - if iter: - self._model.set_value(iter, 2, self.get_status_message(name, params)) - # FIXME: I don't actually know how to display multiple statuses... - # it's something we should leave to galago - break - - def add_contact(self, handle, handle_type, name): - iter = self._model.append() - self._model.set(iter, - 0, self._icon, - 1, "<b><action id='click%s'>%s</action></b>" % (handle, name), - 2, "Offline", - 3, handle) -# path = self._model.get_path(iter) -# row = gtk.TreeRowReference(self._model, path) -# self._model_rows[handle] = row - if CONN_INTERFACE_PRESENCE in self._conn.get_valid_interfaces(): - self._conn[CONN_INTERFACE_PRESENCE].RequestPresence([handle], - reply_handler=(lambda: None), - error_handler=self.error_cb) - - def subscribe_get_members_reply_cb(self, members): - for member in members: - print "subscribe_get_members_reply_cb, adding contact:", member - self._conn.call_with_handle(CONNECTION_HANDLE_TYPE_CONTACT, member, self.add_contact) - - def subscribe_members_changed_signal_cb(self, reason, added, removed, local_pending, remote_pending, actor, reason_code): - for member in added: - print "subscribe_members_changed_signal_cb, adding contact:", member - self._conn.call_with_handle(CONNECTION_HANDLE_TYPE_CONTACT, member, self.add_contact) - - def set_subscribe_list(self, subscribe): - self._subscribe = subscribe - self._subscribe[CHANNEL_INTERFACE_GROUP].GetMembers(reply_handler=self.subscribe_get_members_reply_cb, error_handler=self.error_cb) - self._subscribe[CHANNEL_INTERFACE_GROUP].connect_to_signal('MembersChanged', self.subscribe_members_changed_signal_cb) - - def set_publish_list(self, publish): - self._publish = publish - - def error_cb(self, error): - print "Exception received from asynchronous method call:" - print error - - def gtk_delete_event_cb(self, window, event): - self._conn[CONN_INTERFACE].Disconnect(reply_handler=(lambda: None), - error_handler=self.error_cb) - - def gtk_view_activate_cb(self, view, action_id, action_data): - if action_id[:5] == 'click': - handle = int(action_id[5:]) - print "requesting channel to", handle - self._conn[CONN_INTERFACE].RequestChannel(CHANNEL_TYPE_TEXT, CONNECTION_HANDLE_TYPE_CONTACT, handle, False, - reply_handler=(lambda chan: None), - error_handler=self.error_cb) - -class ContactListChannel(telepathy.client.Channel): - def __init__(self, conn, object_path, handle_type, handle): - telepathy.client.Channel.__init__(self, conn._service_name, object_path, - ready_handler=self.got_interfaces) - self.get_valid_interfaces().add(CHANNEL_TYPE_CONTACT_LIST) - self._conn = conn - self._handle_type = handle_type - self._handle = handle - self._name = handle - - def got_interfaces(self, unused): - self._conn[CONN_INTERFACE].InspectHandles(self._handle_type, [self._handle], - reply_handler=self.inspect_handle_reply_cb, error_handler=self.error_cb) - self[CHANNEL_INTERFACE_GROUP].GetMembers(reply_handler=self.get_members_reply_cb, error_handler=self.error_cb) - self[CHANNEL_INTERFACE_GROUP].GetLocalPendingMembers(reply_handler=self.get_local_pending_members_reply_cb, error_handler=self.error_cb) - self[CHANNEL_INTERFACE_GROUP].GetRemotePendingMembers(reply_handler=self.get_remote_pending_members_reply_cb, error_handler=self.error_cb) - self[CHANNEL_INTERFACE_GROUP].connect_to_signal('MembersChanged', self.members_changed_signal_cb) - - def inspect_handle_reply_cb(self, names): - assert len(names) == 1 - name = names[0] - print "CLC", self._name, "is", name - self._name = name - if name == 'subscribe': - gobject.idle_add(self.subscribe_list_idle_cb) - elif name == 'publish': - gobject.idle_add(self.publish_list_idle_cb) - - def subscribe_list_idle_cb(self): -# handle = self._conn[CONN_INTERFACE].RequestHandles(CONNECTION_HANDLE_TYPE_CONTACT, ['test2@localhost'])[0] -# self[CHANNEL_INTERFACE_GROUP].AddMembers([handle]) - return False - - def publish_list_idle_cb(self): - return False - - def members_changed_signal_cb(self, message, added, removed, local_p, remote_p, actor, reason): - print "MembersChanged on CLC", self._name - print "Message: ", message - print "Added: ", added - print "Removed: ", removed - print "Local Pending: ", local_p - print "Remote Pending: ", remote_p -# if added and self._name == 'subscribe': -# self[CHANNEL_INTERFACE_GROUP].RemoveMembers(added) -# if removed and self._name == 'subscribe': -# self[CHANNEL_INTERFACE_GROUP].AddMembers(added) - -# if added: -# self[CHANNEL_INTERFACE_GROUP].RemoveMembers(added) -# if local_p: -# self[CHANNEL_INTERFACE_GROUP].AddMembers(local_p) - - def get_members_reply_cb(self, members): - print "got members on CLC %s: %s" % (self._name, members) -# if members and self._name == 'subscribe': -# self[CHANNEL_INTERFACE_GROUP].RemoveMembers(members) -# if members: -# self[CHANNEL_INTERFACE_GROUP].RemoveMembers(members) - - def get_local_pending_members_reply_cb(self, members): - print "got local pending members on CLC %s: %s" % (self._name, members) -# if members: -# self[CHANNEL_INTERFACE_GROUP].AddMembers(members) -# print "sending addmembers" - - - def get_remote_pending_members_reply_cb(self, members): - print "got remote pending members on CLC %s: %s" % (self._name, members) - -class TextChannel(telepathy.client.Channel): - def __init__(self, conn, object_path, handle_type, handle): - telepathy.client.Channel.__init__(self, conn._service_name, object_path) - self.get_valid_interfaces().add(CHANNEL_TYPE_TEXT) - self[CHANNEL_TYPE_TEXT].connect_to_signal('Received', self.received_signal_cb) - self[CHANNEL_TYPE_TEXT].connect_to_signal('Sent', self.sent_signal_cb) - self._conn = conn - self._object_path = object_path - self._handle_type = handle_type - self._handle = handle - - self._window = gtk.Window() - self._window.connect("delete-event", self.gtk_delete_event_cb) - self._window.set_size_request(400, 300) - - self._box = gtk.VBox(False, 0) - self._window.add(self._box) - - self._toolbar = gtk.Toolbar() - image = gtk.Image() - image.set_from_file("tabby/data/images/call.png") - - item = gtk.ToolButton(icon_widget=image, label="Call") - item.connect("clicked", self.on_call_button_clicked) - - self._toolbar.insert(item, 0) - self._box.pack_start(self._toolbar, False, True) - - self._swin = gtk.ScrolledWindow(None, None) - self._swin.set_policy (gtk.POLICY_NEVER, gtk.POLICY_AUTOMATIC) - self._box.add(self._swin) - - self._model = gtk.ListStore(scw.TYPE_TIMESTAMP, - scw.TYPE_PRESENCE, -# gtk.gdk.Pixbuf, - gobject.TYPE_STRING, - scw.TYPE_ROW_COLOR) - - self._view = scw.View() - self._view.connect("activate", self.gtk_view_activate_cb) - self._view.connect("context-request", self.gtk_view_context_request_cb) - self._view.connect("key-press-event", self.gtk_view_key_press_event_cb) - self._view.set_property("model", self._model) - self._view.set_property("align-presences", True) - self._view.set_property("presence-alignment", pango.ALIGN_CENTER) - self._view.set_property("scroll-on-append", True) - self._view.set_property("timestamp-format", "%H:%M") - self._view.set_property("action-attributes", "underline='single' weight='bold'") - self._view.set_property("selection-row-separator", "\n\n") - self._view.set_property("selection-column-separator", "\t") - self._swin.add(self._view) - - self._entry = scw.Entry() - self._entry.connect("activate", self.gtk_entry_activate_cb) - self._entry.set_property("history-size", 100) - self._box.pack_end(self._entry, False, True, 2) - - # set the window title according to who you're talking to - self._window.set_title("Conversation") - if handle: - self._conn.call_with_handle(handle_type, handle, self.set_window_title_cb) - - self._window.show_all() - - # asynchronously retrieve messages that have not been acknowledged - self._handled_pending_message = None - self[CHANNEL_TYPE_TEXT].ListPendingMessages(False, reply_handler=self.list_pending_messages_reply_cb, error_handler=self.error_cb) - - # kick off a request for the user's handle number - self._self_handle = None - self._self_handle_cb = [] - self._conn[CONN_INTERFACE].GetSelfHandle(reply_handler=self.get_self_handle_reply_cb, error_handler=self.error_cb) - - def list_pending_messages_reply_cb(self, pending_messages): - print "got pending messages", pending_messages - for msg in pending_messages: - (id, timestamp, sender, message_type, flags, message) = msg - print "Handling pending message", id - self.received_signal_cb(id, timestamp, sender, message_type, flags, message) - self._handled_pending_message = id - - def gtk_delete_event_cb(self, window, event): - self[CHANNEL_INTERFACE].Close(reply_callback=(lambda: None), error_callback=self.error_cb) - print self._conn._channels.keys() - del self._conn._channels[self._object_path] - - def gtk_entry_activate_cb(self, entry): - self[CHANNEL_TYPE_TEXT].Send(CHANNEL_TEXT_MESSAGE_TYPE_NORMAL, entry.get_text()) - entry.set_text("") - - def gtk_view_activate_cb(self, view, action_id, action_data): - print "Activated id %s which has data %s" % (action_id, action_data) - - def gtk_view_key_press_event_cb(self, view, event): - # TODO: cunningness to find out whether we should focus - # the input window - pass - - def gtk_view_context_request_cb(self, view, action_id, action_data, x, y): - if action_id is not None: - print "Context request at (%i,%i) for id %s which has data %s" % \ - (x, y, action_id, action_data) - else: - print "Context request at (%i,%i) for global context menu" % (x,y) - - def set_window_title_cb(self, handle, handle_type, name): - if handle_type == CONNECTION_HANDLE_TYPE_CONTACT: - title = "Conversation with %s" % name - elif handle_type == CONNECTION_HANDLE_TYPE_ROOM: - title = "Conversation in %s" % name - else: - title = "Conversation with a list? What's going on?" - self._window.set_title(title) - - def show_received_cb(self, id, timestamp, message_type, message, sender): - iter = self._model.append() - self._model.set(iter, - 0, timestamp, - 1, "<b>%s</b>" % sender, - 2, message) - self[CHANNEL_TYPE_TEXT].AcknowledgePendingMessages([id], reply_handler=(lambda: None), error_handler=self.error_cb) - - def received_signal_cb(self, id, timestamp, sender, message_type, flags, message): - if self._handled_pending_message != None: - if id > self._handled_pending_message: - print "Now handling messages directly" - self._handled_pending_message = None - else: - print "Skipping already handled message", id - return - - self._conn.call_with_handle(CONNECTION_HANDLE_TYPE_CONTACT, sender, (lambda handle, handle_type, sender: self.show_received_cb(id, timestamp, message_type, message, sender))) - - def show_sent_cb(self, timestamp, message_type, message, sender): - iter = self._model.append() - self._model.set(iter, - 0, timestamp, - 1, "<i>%s</i>" % sender, - 2, message) - - def get_self_handle_reply_cb(self, handle): - print "get_self_handle_reply_cb", self, handle - self._self_handle = handle - for func in self._self_handle_cb: - self._conn.call_with_handle(CONNECTION_HANDLE_TYPE_CONTACT, handle, func) - - def sent_signal_cb(self, timestamp, message_type, message): - func = (lambda handle, handle_type, sender: self.show_sent_cb(timestamp, message_type, message, sender)) - if self._self_handle: - self._conn.call_with_handle(CONNECTION_HANDLE_TYPE_CONTACT, self._self_handle, func) - else: - self._self_handle_cb.append(func) - - def on_call_button_clicked(self, button): - print "on_call_button_clicked" - - self._conn[CONN_INTERFACE].RequestChannel(CHANNEL_TYPE_STREAMED_MEDIA, - self._handle_type, - self._handle, - True, - reply_handler=self.got_media_channel, - error_handler=self.error_cb) - - def got_media_channel(self, object_path): - print "got_media_channel with object path:", object_path - - self._media_channel = MediaChannel(self._conn, - self._object_path, - self._handle_type, - self._handle) - - def error_cb(self, error): - print "Exception received from asynchronous method call:" - print error - -class MediaChannel(telepathy.client.Channel): - def __init__(self, conn, object_path, handle_type, handle): - telepathy.client.Channel.__init__(self, conn._service_name, object_path) - self.get_valid_interfaces().add(CHANNEL_TYPE_TEXT) - #self[CHANNEL_TYPE_STREAMED_MEDIA].connect_to_signal('Received', self.received_signal_cb) - #self[CHANNEL_TYPE_STREAMED_MEDIA].connect_to_signal('Sent', self.sent_signal_cb) - self._conn = conn - self._object_path = object_path - -class TestConnection(telepathy.client.Connection): - def __init__(self, service_name, object_path, mainloop): - telepathy.client.Connection.__init__(self, service_name, object_path) - - self._mainloop = mainloop - self._service_name = service_name - - self._status = CONNECTION_STATUS_CONNECTING - self._channels = {} - - self._handle_cache = {} - self._handle_callbacks = {} - - self._contact_window = None - self._lists = {} - - self[CONN_INTERFACE].connect_to_signal('StatusChanged', self.status_changed_signal_cb) - self[CONN_INTERFACE].connect_to_signal('NewChannel', self.new_channel_signal_cb) - - self[CONN_INTERFACE].Connect(reply_handler=(lambda: None), error_handler=self.error_cb) - - def error_cb(self, exception): - print "Exception received from asynchronous method call:" - print exception - - def list_channels_reply_cb(self, channels): - for (obj_path, channel_type, handle_type, handle) in channels: - self.new_channel_signal_cb(obj_path, channel_type, handle_type, handle, False) - - def give_list_to_contact_window(self, channel, name): - if self._contact_window: - if name == 'subscribe': - self._contact_window.set_subscribe_list(channel) - elif name == 'publish': - self._contact_window.set_publish_list(channel) - else: - self._lists[name] = channel - - def new_channel_signal_cb(self, obj_path, channel_type, handle_type, handle, suppress_handler): - if obj_path in self._channels: - print 'Skipping existing channel', obj_path - return - - print 'NewChannel', obj_path, channel_type, handle_type, handle, suppress_handler - - channel = None - - if channel_type == CHANNEL_TYPE_TEXT: - channel = TextChannel(self, obj_path, handle_type, handle) - elif channel_type == CHANNEL_TYPE_CONTACT_LIST: - channel = ContactListChannel(self, obj_path, handle_type, handle) - self.call_with_handle(handle_type, handle, (lambda id, channel_type, name: self.give_list_to_contact_window(channel, name))) - - if channel != None: - print 'Created', channel - self._channels[obj_path] = channel - else: - print 'Unknown channel type', channel_type - - def connected_cb(self): - print "Connected!" - if CONN_INTERFACE_PRESENCE in self.get_valid_interfaces(): - self[CONN_INTERFACE_PRESENCE].connect_to_signal('PresenceUpdate', self.presence_update_signal_cb) -# handle = self[CONN_INTERFACE].RequestHandles(CONNECTION_HANDLE_TYPE_CONTACT, ['test2@localhost'])[0] -# self[CONN_INTERFACE].RequestChannel(CHANNEL_TYPE_TEXT, handle, True) -# print self[CONN_INTERFACE_PRESENCE].GetStatuses() - self._contact_window = ContactWindow(self) - if 'subscribe' in self._lists: - self._contact_window.set_subscribe_list(self._lists['subscribe']) - del self._lists['subscribe'] - if 'publish' in self._lists: - self._contact_window.set_publish_list(self._lists['publish']) - del self._lists['publish'] - return False - - def presence_update_signal_cb(self, presence): - print "got presence update:", presence - - def get_interfaces_reply_cb(self, interfaces): - self.get_valid_interfaces().update(interfaces) - self[CONN_INTERFACE].ListChannels(reply_handler=self.list_channels_reply_cb, error_handler=self.error_cb) - gobject.idle_add(self.connected_cb) - - def status_changed_signal_cb(self, status, reason): - if self._status == status: - return - else: - self._status = status - - print 'StatusChanged', status, reason - - if status == CONNECTION_STATUS_CONNECTED: - self[CONN_INTERFACE].GetInterfaces(reply_handler=self.get_interfaces_reply_cb, error_handler=self.error_cb) - if status == CONNECTION_STATUS_DISCONNECTED: - self._mainloop.quit() - - def inspect_handle_reply_cb(self, id, handle_type, names): - assert len(names) == 1 - name = names[0] - - self._handle_cache[id] = (handle_type, name) - - for func in self._handle_callbacks[id]: - func(id, handle_type, name) - - del self._handle_callbacks[id] - - def call_with_handle(self, handle_type, handle_id, func): - if handle_id in self._handle_cache: - func(handle_id, *self._handle_cache[handle_id]) - else: - if handle_id not in self._handle_callbacks: - self[CONN_INTERFACE].InspectHandles(handle_type, [handle_id], reply_handler=(lambda args: self.inspect_handle_reply_cb(handle_id, handle_type, args)), error_handler=self.error_cb) - self._handle_callbacks[handle_id] = set() - - self._handle_callbacks[handle_id].add(func) - -if __name__ == '__main__': - reg = telepathy.client.ManagerRegistry() - reg.LoadManagers() - - protos=reg.GetProtos() - - if len(protos) == 0: - print "Sorry, no connection managers found!" - sys.exit(1) - - if len(sys.argv) > 2: - protocol = sys.argv[2] - else: - protocol='' - - while (protocol not in protos): - protocol = raw_input('Protocol (one of: %s) [%s]: ' % (' '.join(protos),protos[0])) - if protocol == '': - protocol = protos[0] - - if len(sys.argv) > 1: - manager = sys.argv[1] - else: - manager = '' - managers = reg.GetManagers(protocol) - - while (manager not in managers): - manager = raw_input('Manager (one of: %s) [%s]: ' % (' '.join(managers),managers[0])) - if manager == '': - manager = managers[0] - - mgr_bus_name = reg.GetBusName(manager) - mgr_object_path = reg.GetObjectPath(manager) - - cmdline_params = dict((p.split('=') for p in sys.argv[3:])) - params = {} - - for (name, (dbus_type, default, flags)) in reg.GetParams(manager, protocol).iteritems(): - if name in cmdline_params: - print "Using cmd-line value: %s=%r (as %s)" % (name, cmdline_params[name], dbus_type) - params[name] = _coerce_string_to_dbus(cmdline_params[name], sig=dbus_type) - elif (flags & (CONN_MGR_PARAM_FLAG_REQUIRED)) == 0: - # it's not mandatory, so let's skip it - print "Not providing %s" % name - elif name == 'password': - print "Providing password as %s" % dbus_type - params[name] = _coerce_string_to_dbus(getpass.getpass(), sig=dbus_type) - else: - print "Please specify %s (will send as %s)" % (name, dbus_type) - params[name] = _coerce_string_to_dbus(raw_input(name+': '), sig=dbus_type) - params = dbus.Dictionary(params, signature="sv") - - mgr = telepathy.client.ConnectionManager(mgr_bus_name, mgr_object_path) - bus_name, object_path = mgr[CONN_MGR_INTERFACE].RequestConnection(protocol, params) - - mainloop = gobject.MainLoop() - connection = TestConnection(bus_name, object_path, mainloop) - - def quit_cb(): - connection[CONN_INTERFACE].Disconnect() - mainloop.quit() - - def sigterm_cb(): - gobject.idle_add(quit_cb) - - signal.signal(signal.SIGTERM, sigterm_cb) - - while 1: - try: - mainloop.run() - except KeyboardInterrupt: - quit_cb() - except e: - print e, e.Message - if not mainloop.is_running(): - break |