summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xsrc/phoenix-test.py324
1 files changed, 166 insertions, 158 deletions
diff --git a/src/phoenix-test.py b/src/phoenix-test.py
index c8d072d..d803fbe 100755
--- a/src/phoenix-test.py
+++ b/src/phoenix-test.py
@@ -1,4 +1,5 @@
#!/usr/bin/env python
+# -*- coding: utf-8 -*-
import os
import sys
@@ -13,75 +14,77 @@ from util import setup_run_dir, create_account, scrub_env
from gi.repository import GObject, Gio, GLib
from gi.repository import TelepathyGLib as Tp
-MESSAGE = "Test from TpRing"
+MESSAGE = 'Test from TpRing'
+
class TestCase:
- def __init__ (self, loop, test_contact, quiet = False):
+ def __init__(self, loop, test_contact, quiet=False):
self.loop = loop
self.quiet = quiet
self.test_contact = test_contact
- def assertNotEqual (self, expected, value):
+ def assertNotEqual(self, expected, value):
if expected == value:
self.done(False)
raise AssertionError("didn't expect: %s, got: %s",
str(expected), str(value))
- def assertEqual (self, expected, value):
+ def assertEqual(self, expected, value):
if expected != value:
self.done(False)
- raise AssertionError("expected: %s, got: %s",
+ raise AssertionError('expected: %s, got: %s',
str(expected), str(value))
- def assertNotNone (self, value):
+ def assertNotNone(self, value):
if value == None:
self.done(False)
- raise AssertionError("value was None")
+ raise AssertionError('value was None')
- def done (self, success = True):
+ def done(self, success=True):
self.loop.quit()
if not success:
- print "VOIP Test: FAILED"
+ print 'VOIP Test: FAILED'
else:
- print "VOIP Test: PASS"
+ print 'VOIP Test: PASS'
- def bite (self):
+ def bite(self):
# Bitten by the watchdog
- print "Timeout hit"
- self.done (False)
+ print 'Timeout hit'
+ self.done(False)
- def set_timeout (self, timeout = 10):
- GLib.timeout_add_seconds (timeout, self.bite)
+ def set_timeout(self, timeout=10):
+ GLib.timeout_add_seconds(timeout, self.bite)
- def write (self, string):
+ def write(self, string):
if not self.quiet:
print string
- def set_account (self, cm, protocol, *settings):
+ def set_account(self, cm, protocol, *settings):
self.cm = cm
self.protocol = protocol
self.settings = {}
self.password = None
for s in settings:
- (k, v) = s.split("=", 2)
- (t, v) = v.split(":", 2)
+ (k, v) = s.split('=', 2)
+ (t, v) = v.split(':', 2)
- if k == "password":
+ if k == 'password':
self.password = v
continue
- if t == "b":
- variant = GLib.Variant (t,
- v in [ "1", "True", "true", "t"])
+ if t == 'b':
+ variant = GLib.Variant(t,
+ v in [ '1', 'True', 'true', 't'])
else:
- variant = GLib.Variant (t, v)
+ variant = GLib.Variant(t, v)
self.settings[k] = variant
+
# Watch one Telepathy connection
class TestConnection:
- def __init__ (self, t, connection):
+ def __init__(self, t, connection):
self.t = t
self.connection = connection
self.contact = None
@@ -92,231 +95,236 @@ class TestConnection:
Tp.IFACE_CONNECTION_INTERFACE_CONTACT_LIST)
if self.contact_list:
- self.connection.connect ('notify::contact-list-state',
- self.contact_list_state_cb, None)
+ self.connection.connect('notify::contact-list-state',
+ self.contact_list_state_cb, None)
- if connection.get_contact_list_state() == Tp.ContactListState.SUCCESS:
+ if connection.get_contact_list_state() \
+ == Tp.ContactListState.SUCCESS:
self.start_test()
-
else:
self.start_test()
- def contact_list_state_cb (self, connection, spec, data):
- if connection.get_contact_list_state() == Tp.ContactListState.SUCCESS:
+ def contact_list_state_cb(self, connection, spec, data):
+ if connection.get_contact_list_state() \
+ == Tp.ContactListState.SUCCESS:
self.start_test()
- def teardown_cb (self, contact, result, u):
- contact.remove_finish (result)
+ def teardown_cb(self, contact, result, u):
+ contact.remove_finish(result)
def teardown(self):
- self.contact.remove_async (self.teardown_cb, None)
+ self.contact.remove_async(self.teardown_cb, None)
def start_test(self):
- self.connection.dup_contact_by_id_async (
- self.t.test_contact,
- [ Tp.ContactFeature.SUBSCRIPTION_STATES,
- Tp.ContactFeature.CAPABILITIES],
+ self.connection.dup_contact_by_id_async(self.t.test_contact,
+ [Tp.ContactFeature.SUBSCRIPTION_STATES,
+ Tp.ContactFeature.CAPABILITIES],
self.got_test_contact, None)
- def hangup (self):
- self.channel.hangup_async (Tp.CallStateChangeReason.USER_REQUESTED,
- "", "", None, None)
+ def hangup(self):
+ self.channel.hangup_async(Tp.CallStateChangeReason.USER_REQUESTED,
+ '', '', None, None)
- def check_call_status (self, *args):
+ def check_call_status(self, *args):
(state, flags, details, reason) = self.channel.get_state()
- got_audio = self.proxy.get_cached_property ("ReceivingAudio")
- got_video = self.proxy.get_cached_property ("ReceivingVideo")
+ got_audio = self.proxy.get_cached_property('ReceivingAudio')
+ got_video = self.proxy.get_cached_property('ReceivingVideo')
if state == Tp.CallState.ACTIVE and got_audio and got_video:
- self.t.write("Successful call, letting it run for 5 seconds")
+ self.t.write('Successful call, letting it run for 5 seconds')
self.call_success = True
- GLib.timeout_add_seconds (5, self.hangup)
+ GLib.timeout_add_seconds(5, self.hangup)
if state == Tp.CallState.ENDED:
self.t.assertEqual(True, self.call_success)
self.t.assertEqual(Tp.CallStateChangeReason.USER_REQUESTED,
- reason.reason)
- self.t.assertEqual (self.connection.get_self_handle (),
+ reason.reason)
+ self.t.assertEqual(self.connection.get_self_handle(),
reason.actor)
self.t.done()
def message_sent(self, channel, message, flags, token, data):
- message, flags = message.to_text()
+ (message, flags) = message.to_text()
self.t.assertEqual(MESSAGE, message)
self.channel.close_async(None, None)
self.t.done()
- def create_channel_finished (self, req, r, u):
+ def create_channel_finished(self, req, r, u):
try:
- self.channel = channel = req.create_and_observe_channel_finish (r)
+ self.channel = channel = \
+ req.create_and_observe_channel_finish(r)
except Exception, e:
print e
self.t.done(False)
if isinstance(self.channel, Tp.CallChannel):
- d = Gio.bus_get_sync (Gio.BusType.SESSION, None)
- self.proxy = proxy = Gio.DBusProxy.new_sync (d, 0, None,
- "org.freedesktop.Telepathy.Phoenix.Calls",
- "/org/freedesktop/Telepathy/Phoenix/Calls" +
- channel.get_object_path(),
- "org.freedesktop.Telepathy.Phoenix.CallInfo",
- None)
- proxy.connect ("g-properties-changed",
- self.check_call_status, None)
- channel.connect ("notify::state",
- self.check_call_status, None)
+ d = Gio.bus_get_sync(Gio.BusType.SESSION, None)
+ self.proxy = proxy = Gio.DBusProxy.new_sync(d, 0, None,
+ 'org.freedesktop.Telepathy.Phoenix.Calls',
+ '/org/freedesktop/Telepathy/Phoenix/Calls' +
+ channel.get_object_path(),
+ 'org.freedesktop.Telepathy.Phoenix.CallInfo',
+ None)
+ proxy.connect('g-properties-changed',
+ self.check_call_status, None)
+ channel.connect('notify::state',
+ self.check_call_status, None)
elif isinstance(self.channel, Tp.TextChannel):
- channel.connect ("message-sent",
- self.message_sent, None)
+ channel.connect('message-sent',
+ self.message_sent, None)
- m = Tp.ClientMessage.new_text (Tp.ChannelTextMessageType.NORMAL, MESSAGE)
- self.channel.send_message_async (m, 0, None, None)
+ m = Tp.ClientMessage.new_text(Tp.ChannelTextMessageType.NORMAL, MESSAGE)
+ self.channel.send_message_async(m, 0, None, None)
else:
- print "unknown channel type created:", self.channel
+ print 'unknown channel type created:', self.channel
self.t.done(Fail)
- def handle_capabilities (self):
+ def handle_capabilities(self):
caps = self.contact.get_capabilities()
account = self.contact.get_account()
- if caps.supports_audio_video_call (Tp.HandleType.CONTACT):
- req = Tp.AccountChannelRequest.new_audio_video_call (account, 0)
- req.set_hint ("call-mode", GLib.Variant('s', "test-inputs"))
- elif caps.supports_text_chats ():
- req = Tp.AccountChannelRequest.new_text (account, 0)
+ if caps.supports_audio_video_call(Tp.HandleType.CONTACT):
+ req = Tp.AccountChannelRequest.new_audio_video_call(account, 0)
+ req.set_hint('call-mode', GLib.Variant('s', 'test-inputs'))
+ elif caps.supports_text_chats():
+ req = Tp.AccountChannelRequest.new_text(account, 0)
else:
- print "no caps for call or text chat"
+ print 'no caps for call or text chat'
self.t.done(False)
return
- req.set_target_contact (self.contact)
- req.create_and_observe_channel_async ("", None,
+ req.set_target_contact(self.contact)
+ req.create_and_observe_channel_async('', None,
self.create_channel_finished, None)
- def handle_test_contact_states (self):
- p = self.contact.get_publish_state ()
- s = self.contact.get_subscribe_state ()
+ def handle_test_contact_states(self):
+ p = self.contact.get_publish_state()
+ s = self.contact.get_subscribe_state()
if p == Tp.SubscriptionState.YES and s == Tp.SubscriptionState.YES:
if not self.fully_subscribed:
self.fully_subscribed = True
- self.t.write ("Subscription complete")
+ self.t.write('Subscription complete')
elif p == Tp.SubscriptionState.ASK:
- self.contact.authorize_publication_async (
- lambda c, r, u: c.authorize_publication_finish (r), None)
+ self.contact.authorize_publication_async(
+ lambda c, r, u: c.authorize_publication_finish(r), None)
- def contact_states_cb (self, c, spec, d):
- self.handle_test_contact_states ()
+ def contact_states_cb(self, c, spec, d):
+ self.handle_test_contact_states()
- def got_test_contact (self, c, r, u):
- self.contact = c.dup_contact_by_id_finish (r)
+ def got_test_contact(self, c, r, u):
+ self.contact = c.dup_contact_by_id_finish(r)
if self.contact_list:
- s = self.contact.get_subscribe_state ()
+ s = self.contact.get_subscribe_state()
+
# If we're not subscribed, subscribe, needed to make calls for example
if s != Tp.SubscriptionState.YES:
- print "Asking for subscription"
- self.contact.request_subscription_async ("Test subscription",
- lambda c, r, u: c.request_subscription_finish (r) ,
+ print 'Asking for subscription'
+ self.contact.request_subscription_async('Test subscription',
+ lambda c, r, u: c.request_subscription_finish(r) ,
None)
- self.contact.connect ("notify::publish-state", self.contact_states_cb,
- None)
- self.contact.connect ("notify::subscribe-state", self.contact_states_cb,
- None)
+ self.contact.connect('notify::publish-state',
+ self.contact_states_cb, None)
+ self.contact.connect('notify::subscribe-state',
+ self.contact_states_cb, None)
- self.contact.connect ("notify::capabilities",
+ self.contact.connect('notify::capabilities',
lambda *x: self.handle_capabilities(), None)
- self.handle_capabilities ()
- self.handle_test_contact_states ()
+ self.handle_capabilities()
+ self.handle_test_contactf_states()
+
class TestAccount:
- def __init__ (self, t, account):
- self.t = t
- self.account = account
+ def __init__(self, t, account):
+ self.t = t
+ self.account = account
- self.t.assertEqual (False, account.is_enabled())
+ self.t.assertEqual(False, account.is_enabled())
- self.connection_notify_id = self.account.connect("notify::connection",
- self.connection_cb, None)
+ self.connection_notify_id = \
+ self.account.connect('notify::connection',
+ self.connection_cb, None)
- # Enable & Request availability, causing our connection to be setup
- self.account.set_enabled_async (True, None, None)
- self.account.request_presence_async (
- Tp.ConnectionPresenceType.AVAILABLE,
- "",
- "",
- None, None)
+ # Enable & Request availability, causing our connection to be setup
+ self.account.set_enabled_async(True, None, None)
+ self.account.request_presence_async(
+ Tp.ConnectionPresenceType.AVAILABLE,
+ '', '',
+ None, None)
- def connection_test_if_possible (self):
- s = self.account.get_property ("connection-status")
- self.t.assertNotEqual (Tp.ConnectionStatus.DISCONNECTED, s)
+ def connection_test_if_possible(self):
+ s = self.account.get_property('connection-status')
+ self.t.assertNotEqual(Tp.ConnectionStatus.DISCONNECTED, s)
- self.account.disconnect(self.connection_notify_id)
+ self.account.disconnect(self.connection_notify_id)
- c = self.account.get_property ("connection")
- if s == Tp.ConnectionStatus.CONNECTED and c != None:
- TestConnection (self.t, c)
+ c = self.account.get_property('connection')
+ if s == Tp.ConnectionStatus.CONNECTED and c != None:
+ TestConnection(self.t, c)
+
+ def connection_cb(self, account, spec, data):
+ self.connection_test_if_possible()
- def connection_cb (self, account, spec, data):
- self.connection_test_if_possible ()
# Watch the account manager
class TestManager:
- def __init__ (self, t):
- self.t = t
- self.am = am = Tp.AccountManager.dup()
- factory = am.get_factory()
- factory.add_account_features ([Tp.Account.get_feature_quark_connection ()])
- factory.add_connection_features ([
- Tp.Connection.get_feature_quark_contact_list ()])
- self.accounts = {}
-
- am.prepare_async(None, self.prepared, None)
-
- def got_account (self, account):
- self.t.assertNotNone(account)
- TestAccount (self.t, account)
-
- def prepared (self, am, result, data):
- # We start in a temporary session without existing accounts
- self.t.assertEqual([], am.get_valid_accounts())
- # Create fresh acocunt
- create_account (self.am,
- self.t.cm, self.t.protocol,
- "Phoenix Test Account",
- self.t.settings,
- self.t.password,
- self.got_account)
+ def __init__(self, t):
+ self.t = t
+ self.am = am = Tp.AccountManager.dup()
+ factory = am.get_factory()
+ factory.add_account_features([Tp.Account.get_feature_quark_connection()])
+ factory.add_connection_features(
+ [Tp.Connection.get_feature_quark_contact_list()])
+ self.accounts = {}
+
+ am.prepare_async(None, self.prepared, None)
+
+ def got_account(self, account):
+ self.t.assertNotNone(account)
+ TestAccount(self.t, account)
+
+ def prepared(self, am, result, data):
+ # We start in a temporary session without existing accounts
+ self.t.assertEqual([], am.get_valid_accounts())
+
+ # Create a fresh acocunt
+ create_account(self.am,
+ self.t.cm, self.t.protocol,
+ 'Phoenix Test Account',
+ self.t.settings,
+ self.t.password,
+ self.got_account)
if __name__ == '__main__':
quiet = False
datadir = None
testcontact = None
try:
- opts, args = getopt.getopt(sys.argv[1:],
- "q", ["datadir=", "quiet", "testcontact="])
- for o, a in opts:
- if o == "--datadir":
+ (opts, args) = getopt.getopt(sys.argv[1:],
+ 'q', ['datadir=', 'quiet', 'testcontact='])
+ for (o, a) in opts:
+ if o == '--datadir':
datadir = a
- elif o in [ "-q", "--quiet"]:
+ elif o in [ '-q', '--quiet']:
quiet = True
- elif o == "--testcontact":
+ elif o == '--testcontact':
testcontact = a
if testcontact == None:
- print "Testcontact option is required"
+ print 'Testcontact option is required'
sys.exit(2)
if len(args) < 2:
- print "usage: %s connection-manager protocol [settings]" % sys.argv[0]
+ print 'usage: %s connection-manager protocol [settings]' % sys.argv[0]
sys.exit(2)
except getopt.GetoptError, err:
print str(err)
sys.exit(2)
# Use a temporary directory for the testrun
- tempdir = tempfile.mkdtemp(prefix="phoenix-test")
+ tempdir = tempfile.mkdtemp(prefix='phoenix-test')
setup_run_dir(tempdir, quiet=quiet)
if datadir != None:
setup_data_dir(datadir, quiet)
@@ -324,9 +332,9 @@ if __name__ == '__main__':
scrub_env()
p = spawnbus(quiet)
loop = GObject.MainLoop()
- t = TestCase (loop, testcontact, quiet)
- t.set_timeout (30)
- t.set_account (*args)
+ t = TestCase(loop, testcontact, quiet)
+ t.set_timeout(30)
+ t.set_account(*args)
Tp.debug_set_flags(os.getenv('PHOENIX_TEST_DEBUG', ''))
@@ -334,7 +342,7 @@ if __name__ == '__main__':
try:
loop.run()
finally:
- os.kill (p.pid, signal.SIGKILL)
+ os.kill(p.pid, signal.SIGKILL)
# Sleep 2 seconds so everything can die a nice death
- time.sleep (2)
+ time.sleep(2)
shutil.rmtree(tempdir)