summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoão Paulo Rechi Vita <jprvita@openbossa.org>2013-02-21 18:06:38 -0300
committerJoão Paulo Rechi Vita <jprvita@openbossa.org>2013-04-17 11:16:01 -0300
commit16bdbe2b8fc0686483fe59818627ebf546779ded (patch)
tree9759aa682f4da6265f318d8e66b878d9a4e45e64
parenta2d0f434fa1fb91bd7138ad2dd09b4b9dc5aa23b (diff)
test: Create simple-handsfree-audio-agentsimple-hf-audio-agent
This python tool implements the org.ofono.HandsfreeAudioAgent interface, dealing with the SCO file descriptor passed by the Handsfree Audio Manager.
-rwxr-xr-xtest/simple-handsfree-audio-agent100
1 files changed, 100 insertions, 0 deletions
diff --git a/test/simple-handsfree-audio-agent b/test/simple-handsfree-audio-agent
new file mode 100755
index 00000000..7b117f3c
--- /dev/null
+++ b/test/simple-handsfree-audio-agent
@@ -0,0 +1,100 @@
+#!/usr/bin/python
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+import sys
+import dbus
+import dbus.service
+import dbus.mainloop.glib
+import gobject
+import os
+import glib
+
+OFONO_SERVICE = "org.ofono"
+HF_AGENT_IFACE = "org.ofono.HandsfreeAudioAgent"
+HF_AGENT_PATH = "/test/hf"
+HF_MANAGER_IFACE = "org.ofono.HandsfreeAudioManager"
+HF_MANAGER_PATH = "/"
+
+HFP_HF_UUID = "0000111E-0000-1000-8000-00805F9B34FB"
+CVSD_CODEC = dbus.Byte(0x01)
+MSBC_CODEC = dbus.Byte(0x02)
+
+
+def terminate():
+ print("Terminating")
+ in_file.close()
+ mainloop.quit()
+
+
+def sco_cb(fd, condition):
+ print("sco_cb(%s, %s)" % (fd, condition))
+
+ if (condition & glib.IO_ERR):
+ print("SCO error")
+ return False
+
+ if (condition & glib.IO_HUP):
+ print("SCO closed")
+ return False
+
+ if (condition & glib.IO_IN):
+ buf = fd.read()
+ print("Read %d bytes from SCO channel" % len(buf))
+ in_file.write(buf)
+
+ return True
+
+
+class HFAgent(dbus.service.Object):
+
+ def __init__(self, path, uuid, codecs):
+ self.path = path
+ self.uuid = uuid
+ self.codecs = codecs
+ dbus.service.Object.__init__(self, dbus.SystemBus(), self.path)
+ print("Agent created on %s" % self.path)
+ print(self.Introspect(self.path, self.connection), end="")
+
+ def register(self):
+ obj = self.connection.get_object(OFONO_SERVICE, HF_MANAGER_PATH)
+ manager = dbus.Interface(obj, HF_MANAGER_IFACE)
+ manager.Register(self.path, self.codecs)
+ print("Agent registered with " + OFONO_SERVICE)
+
+ # D-Bus methods
+ @dbus.service.method(HF_AGENT_IFACE)
+ def Release(self):
+ print("Release()")
+ terminate()
+
+ @dbus.service.method(HF_AGENT_IFACE, in_signature="ohy")
+ def NewConnection(self, card_path, sco_fd, codec):
+ self.fd = sco_fd.take()
+ print("NewConnection(%s, %d, %d)" % (card_path, self.fd, codec))
+ self.sco = os.fdopen(self.fd, "rb")
+ buf = self.sco.read()
+ print("Read %d bytes from SCO channel" % len(buf))
+ glib.io_add_watch(self.sco, glib.IO_IN | glib.IO_ERR | glib.IO_HUP, sco_cb)
+
+
+if __name__ == '__main__':
+
+ if (len(sys.argv) < 2):
+ print("Usage: %s <file to dump audio to>" %
+ (sys.argv[0]))
+ sys.exit(1)
+
+ in_file = open(sys.argv[1], "wb")
+
+ dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+
+ agent = HFAgent(HF_AGENT_PATH, HFP_HF_UUID, [CVSD_CODEC, MSBC_CODEC])
+ agent.register()
+
+ try:
+ mainloop = gobject.MainLoop()
+ mainloop.run()
+ except KeyboardInterrupt:
+ print("Keyboard interrupted")
+ terminate()