#!/usr/bin/env python
# vim: set fileencoding=utf-8 sts=4 sw=4 et :
"""
The world's worst XMPP console user interface.
Pass it the bus name of a Gabble connection; type some words; get minimalistic
error reporting.
Copyright © 2011 Collabora Ltd.
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""
import sys
import re
from xml.dom import minidom
from gi.repository import Gtk
from gi.repository import GLib
from gi.repository import Gio
from gi.repository import GtkSource
PADDING = 6
def pathify(name):
return '/' + name.replace('.', '/')
def nameify(path):
return (path[1:]).replace('/', '.')
CONN_FUTURE_IFACE = "org.freedesktop.Telepathy.Connection.FUTURE"
CONSOLE_IFACE = "org.freedesktop.Telepathy.Gabble.Plugin.Console"
class StanzaViewer(Gtk.ScrolledWindow):
def __init__(self):
Gtk.ScrolledWindow.__init__(self)
self.b = GtkSource.Buffer()
self.view = GtkSource.View.new_with_buffer(self.b)
self.b.set_language(
GtkSource.LanguageManager.get_default().get_language('xml'))
self.b.set_highlight_matching_brackets(False)
self.view.set_editable(False)
self.view.set_wrap_mode(Gtk.WrapMode.WORD_CHAR)
self.view.set_property('expand', True)
self.add(self.view)
def clear(self):
self.b.set_text("")
def append_stanza(self, xml):
pretty = minidom.parseString(xml).toprettyxml()
pretty = pretty.replace('\n', '')
i = self.b.get_end_iter()
self.b.insert(i, pretty + '\n')
def append_comment(self, text):
i = self.b.get_end_iter()
self.b.insert(i, '\n' % text)
def tell_me_everything(self):
return self.b.get_property('text')
class SpinWrapper(Gtk.Notebook):
PRIMARY_PAGE = 0
SPINNER_PAGE = 1
def __init__(self, main_widget):
Gtk.Notebook.__init__(self)
self.set_show_tabs(False)
self.set_show_border(False)
self.insert_page(main_widget, None, self.PRIMARY_PAGE)
self.spinner = Gtk.Spinner()
self.spinner.set_property('halign', Gtk.Align.CENTER)
self.spinner.set_property('valign', Gtk.Align.CENTER)
self.spinner.set_property('width-request', 32)
self.spinner.set_property('height-request', 32)
self.insert_page(self.spinner, None, self.SPINNER_PAGE)
def start_spinning(self):
self.set_current_page(self.SPINNER_PAGE)
self.spinner.start()
def stop_spinning(self):
self.spinner.stop()
self.set_current_page(self.PRIMARY_PAGE)
class Page(Gtk.Grid):
def __init__(self, console_proxy):
Gtk.Grid.__init__(self)
self.console_proxy = console_proxy
self.set_column_spacing(PADDING)
self.set_row_spacing(PADDING)
def add_title(self, title, below=None):
label = Gtk.Label()
label.set_markup("%s" % title)
label.set_property('xalign', 0)
if below is None:
self.attach(label, 0, 0, 2, 1)
else:
self.attach_next_to(label, below, Gtk.PositionType.BOTTOM, 2, 1)
return label
def add_label(self, title, below=None):
label = Gtk.Label(title)
label.set_property('margin-left', PADDING)
label.set_property('xalign', 0)
if below is None:
self.attach(label, 0, 0, 1, 1)
else:
self.attach_next_to(label, below, Gtk.PositionType.BOTTOM, 1, 1)
return label
class IQPage(Page):
def __init__(self, console_proxy):
Page.__init__(self, console_proxy)
request_label = self.add_title("Request")
recipient_label, recipient_entry = self.add_label_entry_pair(
'To:', below=request_label)
self.recipient_entry = recipient_entry
type_label = self.add_label('IQ Type:', below=recipient_label)
self.get_button = Gtk.RadioButton.new_with_label([], "get")
self.get_button.set_active(True)
self.set_button = Gtk.RadioButton.new_with_label_from_widget(
self.get_button, "set")
box = Gtk.ButtonBox.new(Gtk.Orientation.HORIZONTAL)
box.set_layout(Gtk.ButtonBoxStyle.START)
box.add(self.get_button)
box.add(self.set_button)
self.attach_next_to(box, type_label,
Gtk.PositionType.RIGHT, 1, 1)
body_label, body_entry = self.add_label_entry_pair(
'Body:', below=type_label)
body_entry.set_text(
"")
body_entry.set_icon_from_stock(
Gtk.EntryIconPosition.SECONDARY, Gtk.STOCK_GO_FORWARD)
body_entry.set_icon_tooltip_text(
Gtk.EntryIconPosition.SECONDARY, "Send this IQ")
self.body_entry = body_entry
reply_label = self.add_title("Reply", below=body_label)
self.stanza_viewer = StanzaViewer()
self.stanza_viewer.append_comment("send a request to see the reply here")
self.result_nb = SpinWrapper(self.stanza_viewer)
self.attach_next_to(self.result_nb, reply_label, Gtk.PositionType.BOTTOM, 2, 1)
body_entry.connect('activate', self.send_iq)
body_entry.connect('icon-release', self.send_iq)
def add_label_entry_pair(self, title, below):
label = self.add_label(title, below)
entry = Gtk.Entry()
entry.set_property('margin-right', PADDING)
entry.set_property('hexpand', True)
self.attach_next_to(entry, label, Gtk.PositionType.RIGHT, 1, 1)
return label, entry
def send_iq(self, *misc):
type = 'get' if self.get_button.get_active() else 'set'
to = self.recipient_entry.get_text()
body = self.body_entry.get_text()
self.console_proxy.SendIQ('(sss)', type, to, body,
result_handler=self.send_iq_cb)
self.result_nb.start_spinning()
def send_iq_cb(self, proxy, result, user_data):
self.stanza_viewer.clear()
if isinstance(result, Exception):
self.stanza_viewer.append_comment("error:\n%s" % result)
else:
reply_type, reply = result
self.stanza_viewer.append_stanza(reply)
self.result_nb.stop_spinning()
class StanzaPage(Page):
def __init__(self, console_proxy):
Page.__init__(self, console_proxy)
title = self.add_title("Enter a complete stanza:")
self.sv = StanzaViewer()
self.sv.view.set_editable(True)
self.sv.append_stanza("Been on any nice boats recently?")
self.spin_wrapper = SpinWrapper(self.sv)
self.attach_next_to(self.spin_wrapper, title, Gtk.PositionType.BOTTOM,
2, 1)
self.result_label = self.add_label('', self.spin_wrapper)
self.result_label.set_property('hexpand', True)
self.result_label.set_line_wrap(True)
b = Gtk.Button.new_with_mnemonic("_Send")
b.connect('clicked', self.__send_stanza)
b.set_property('hexpand', False)
self.attach_next_to(b, self.result_label, Gtk.PositionType.RIGHT, 1, 1)
def __send_stanza(self, button):
self.console_proxy.SendStanza('(s)', self.sv.tell_me_everything(),
result_handler=self.__send_stanza_cb)
self.spin_wrapper.start_spinning()
def __send_stanza_cb(self, proxy, result, user_data):
if isinstance(result, Exception):
# FIXME: this sucks. You can't just get the free text bit without
# the D-Bus error bit.
t = result.message
else:
t = "yes sir, captain tightpants"
self.result_label.set_text(t)
self.spin_wrapper.stop_spinning()
class SnoopyPage(Page):
def __init__(self, console_proxy):
Page.__init__(self, console_proxy)
label = self.add_label("Stanza monitor:")
label.set_property('hexpand', True)
switch = Gtk.Switch()
self.attach_next_to(switch, label, Gtk.PositionType.RIGHT, 1, 1)
self.stanza_viewer = StanzaViewer()
self.attach_next_to(self.stanza_viewer, label, Gtk.PositionType.BOTTOM, 2, 1)
switch.set_active(self.get_remote_active())
switch.connect('notify::active', self.__switch_switched_cb)
self.console_proxy.connect('g-signal', self.__g_signal_cb)
def teardown(self):
"""Turn off the monitor when we quit."""
self.__set_spew(False)
def __set_spew(self, spew):
args = GLib.Variant("(ssv)", (CONSOLE_IFACE, "SpewStanzas",
GLib.Variant.new_boolean(spew)))
self.console_proxy.call_sync(
"org.freedesktop.DBus.Properties.Set",
args,
0, -1, None)
def get_remote_active(self):
return self.console_proxy.get_cached_property('SpewStanzas').get_boolean()
def __switch_switched_cb(self, switch, pspec):
remote = self.get_remote_active()
new_local = switch.get_active()
if new_local != remote:
self.__set_spew(new_local)
self.stanza_viewer.append_comment(
'started monitoring' if new_local else 'stopped monitoring')
def __g_signal_cb(self, console_proxy, sender_name, signal_name, parameters):
if signal_name in ['StanzaSent', 'StanzaReceived']:
outgoing = (signal_name == 'StanzaSent')
xml, = parameters
self.stanza_viewer.append_comment('sent' if outgoing else 'received')
self.stanza_viewer.append_stanza(xml)
class Window(Gtk.Window):
IQ_PAGE = 0
STANZA_PAGE = 1
SNOOPY_PAGE = 2
def __init__(self, bus, connection_bus_name):
Gtk.Window.__init__(self)
self.set_title('XMPP Console')
self.set_default_size(600, 371)
conn_future_proxy = Gio.DBusProxy.new_sync(bus, 0, None,
connection_bus_name, pathify(connection_bus_name),
CONN_FUTURE_IFACE, None)
try:
sidecar_path, _ = conn_future_proxy.EnsureSidecar('(s)', CONSOLE_IFACE)
except Exception, e:
print """
Couldn't connect to the XMPP console interface on '%(connection_bus_name)s':
%(e)s
Check that it's a running Jabber connection, and that you have the console
plugin installed.""" % locals()
raise SystemExit(2)
self.console_proxy = Gio.DBusProxy.new_sync(bus, 0, None,
connection_bus_name, sidecar_path, CONSOLE_IFACE, None)
# Build up the UI
self.nb = Gtk.Notebook()
self.add(self.nb)
self.iq = IQPage(self.console_proxy)
self.nb.insert_page(self.iq,
Gtk.Label.new_with_mnemonic("_IQ console"),
self.IQ_PAGE)
self.stanza = StanzaPage(self.console_proxy)
self.nb.insert_page(self.stanza,
Gtk.Label.new_with_mnemonic("Send a s_tanza"),
self.STANZA_PAGE)
self.snoopy = SnoopyPage(self.console_proxy)
self.nb.insert_page(self.snoopy,
Gtk.Label.new_with_mnemonic("_Monitor network traffic"),
self.SNOOPY_PAGE)
self.connect('destroy', Window.__destroy_cb)
def __destroy_cb(self):
self.snoopy.teardown()
Gtk.main_quit()
GABBLE_PREFIX = 'org.freedesktop.Telepathy.Connection.gabble.jabber.'
AM_BUS_NAME = 'org.freedesktop.Telepathy.AccountManager'
ACCOUNT_PREFIX = '/org/freedesktop/Telepathy/Account'
ACCOUNT_IFACE = 'org.freedesktop.Telepathy.Account'
def usage():
print """
Usage:
%(arg0)s gabble/jabber/blahblah
%(arg0)s %(prefix)sblahblah
List account identifiers using `mc-tool list | grep gabble`.
List connection bus names using `qdbus | grep gabble`.
""" % { 'arg0': sys.argv[0],
'prefix': GABBLE_PREFIX,
}
raise SystemExit(1)
if __name__ == '__main__':
bus = Gio.bus_get_sync(Gio.BusType.SESSION, None)
if len(sys.argv) != 2:
usage()
thing = sys.argv[1]
if re.match('^gabble/jabber/[a-zA-Z0-9_]+$', thing):
# Looks like an account path to me.
account_proxy = Gio.DBusProxy.new_sync(bus, 0, None,
AM_BUS_NAME, '%s/%s' % (ACCOUNT_PREFIX, thing),
ACCOUNT_IFACE, None)
path = account_proxy.get_cached_property('Connection').get_string()
if path == '/':
print "%s is not online" % thing
raise SystemExit(1)
else:
thing = nameify(path)
if not re.match('^%s[a-zA-Z0-9_]+$' % GABBLE_PREFIX, thing):
usage()
win = Window(bus, thing)
win.show_all()
Gtk.main()
"""
.,,.
,;;*;;;;,
.-'``;-');;.
/' .-. /*;;
.' \d \;; .;;;,
/ o ` \; ,__. ,;*;;;*;,
\__, _.__,' \_.-') __)--.;;;;;*;;;;,
`""`;;;\ /-')_) __) `\' ';;;;;;
;*;;; -') `)_) |\ | ;;;;*;
;;;;| `---` O | | ;;*;;;
*;*;\| O / ;;;;;*
;;;;;/| .-------\ / ;*;;;;;
;;;*;/ \ | '. (`. ;;;*;;;
;;;;;'. ; | ) \ | ;;;;;;
,;*;;;;\/ |. / /` | ';;;*;
;;;;;;/ |/ / /__/ ';;;
'*jgs/ | / | ;*;
`""""` `""""` ;'
"""