diff options
author | João Paulo Rechi Vita <jprvita@openbossa.org> | 2013-02-21 18:06:38 -0300 |
---|---|---|
committer | João Paulo Rechi Vita <jprvita@openbossa.org> | 2013-04-17 11:16:01 -0300 |
commit | 16bdbe2b8fc0686483fe59818627ebf546779ded (patch) | |
tree | 9759aa682f4da6265f318d8e66b878d9a4e45e64 | |
parent | a2d0f434fa1fb91bd7138ad2dd09b4b9dc5aa23b (diff) |
test: Create simple-handsfree-audio-agentsimple-hf-audio-agent
This python tool implements the org.ofono.HandsfreeAudioAgent interface,
dealing with the SCO file descriptor passed by the Handsfree Audio
Manager.
-rwxr-xr-x | test/simple-handsfree-audio-agent | 100 |
1 files changed, 100 insertions, 0 deletions
diff --git a/test/simple-handsfree-audio-agent b/test/simple-handsfree-audio-agent new file mode 100755 index 00000000..7b117f3c --- /dev/null +++ b/test/simple-handsfree-audio-agent @@ -0,0 +1,100 @@ +#!/usr/bin/python + +from __future__ import absolute_import, print_function, unicode_literals + +import sys +import dbus +import dbus.service +import dbus.mainloop.glib +import gobject +import os +import glib + +OFONO_SERVICE = "org.ofono" +HF_AGENT_IFACE = "org.ofono.HandsfreeAudioAgent" +HF_AGENT_PATH = "/test/hf" +HF_MANAGER_IFACE = "org.ofono.HandsfreeAudioManager" +HF_MANAGER_PATH = "/" + +HFP_HF_UUID = "0000111E-0000-1000-8000-00805F9B34FB" +CVSD_CODEC = dbus.Byte(0x01) +MSBC_CODEC = dbus.Byte(0x02) + + +def terminate(): + print("Terminating") + in_file.close() + mainloop.quit() + + +def sco_cb(fd, condition): + print("sco_cb(%s, %s)" % (fd, condition)) + + if (condition & glib.IO_ERR): + print("SCO error") + return False + + if (condition & glib.IO_HUP): + print("SCO closed") + return False + + if (condition & glib.IO_IN): + buf = fd.read() + print("Read %d bytes from SCO channel" % len(buf)) + in_file.write(buf) + + return True + + +class HFAgent(dbus.service.Object): + + def __init__(self, path, uuid, codecs): + self.path = path + self.uuid = uuid + self.codecs = codecs + dbus.service.Object.__init__(self, dbus.SystemBus(), self.path) + print("Agent created on %s" % self.path) + print(self.Introspect(self.path, self.connection), end="") + + def register(self): + obj = self.connection.get_object(OFONO_SERVICE, HF_MANAGER_PATH) + manager = dbus.Interface(obj, HF_MANAGER_IFACE) + manager.Register(self.path, self.codecs) + print("Agent registered with " + OFONO_SERVICE) + + # D-Bus methods + @dbus.service.method(HF_AGENT_IFACE) + def Release(self): + print("Release()") + terminate() + + @dbus.service.method(HF_AGENT_IFACE, in_signature="ohy") + def NewConnection(self, card_path, sco_fd, codec): + self.fd = sco_fd.take() + print("NewConnection(%s, %d, %d)" % (card_path, self.fd, codec)) + self.sco = os.fdopen(self.fd, "rb") + buf = self.sco.read() + print("Read %d bytes from SCO channel" % len(buf)) + glib.io_add_watch(self.sco, glib.IO_IN | glib.IO_ERR | glib.IO_HUP, sco_cb) + + +if __name__ == '__main__': + + if (len(sys.argv) < 2): + print("Usage: %s <file to dump audio to>" % + (sys.argv[0])) + sys.exit(1) + + in_file = open(sys.argv[1], "wb") + + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + + agent = HFAgent(HF_AGENT_PATH, HFP_HF_UUID, [CVSD_CODEC, MSBC_CODEC]) + agent.register() + + try: + mainloop = gobject.MainLoop() + mainloop.run() + except KeyboardInterrupt: + print("Keyboard interrupted") + terminate() |