diff options
author | Seif Lotfy <seif@lotfy.com> | 2012-03-24 02:46:44 +0530 |
---|---|---|
committer | Manish Sinha <manishsinha@ubuntu.com> | 2012-03-24 02:46:44 +0530 |
commit | 89e99c55c831248b41f637845435739bcb335d14 (patch) | |
tree | cd7780511718d4481179238c84cd0d6b001976b1 | |
parent | b3fc96b175335c133e8f4ed4b11cd01271ba5f7a (diff) | |
parent | 8d73589f4e1d3f07ea337bac42bfad2514a8c020 (diff) |
Updated the Telepathy loggers and update chrome extension to create thunbnails
-rw-r--r-- | chrome/zeitgeist.js | 48 | ||||
-rw-r--r-- | npapi-plugin/np-zeitgeist.cc | 67 | ||||
-rwxr-xr-x | telepathy/zeitgeist-telepathy-observer | 859 |
3 files changed, 413 insertions, 561 deletions
diff --git a/chrome/zeitgeist.js b/chrome/zeitgeist.js index 534d3d7..82bcb01 100644 --- a/chrome/zeitgeist.js +++ b/chrome/zeitgeist.js @@ -1,6 +1,7 @@ var plugin = document.embeds[0]; var tabInfo = {}; var tabIdTimeouts = {}; +var currentTabs = {}; function onTabCreated (tab) { chrome.tabs.executeScript(tab.id, {file: "content_script.js"}); @@ -13,9 +14,10 @@ function onTabRemoved (tabid) { function onTabUpdated (tabid, changeInfo, tab) { if (!changeInfo.url) return; window.clearTimeout(tabIdTimeouts[tabid]) - tabIdTimeouts[tabid] = window.setTimeout(function(){ - chrome.tabs.executeScript(tabid, {file: "content_script.js"});}, - 5000); + tabIdTimeouts[tabid] = window.setTimeout(function(){ + console.log("sending event for " + tab.url); + chrome.tabs.executeScript(tabid, {file: "content_script.js"});}, + 5000); } function onBookmarkCreated (bookmarkid, bookmark) { @@ -27,7 +29,6 @@ function onBookmarkCreated (bookmarkid, bookmark) { } function sendAccessEvent (documentInfo, tabid) { - console.log("Sending access event for " + documentInfo.url); var url = documentInfo.url; var origin = documentInfo.origin; var domain = documentInfo.domain; @@ -39,10 +40,14 @@ function sendAccessEvent (documentInfo, tabid) { } var mimetype = documentInfo.mimeType; var title = documentInfo.title; - plugin.insertEvent(url, - domain, - mimetype ? mimetype : "text/html", - title); + plugin.insertEvent(url, + domain, + mimetype ? mimetype : "text/html", + title); + console.log("save thumbnail for "+currentTabs[tabid]+": "+url); + chrome.tabs.captureVisibleTab(currentTabs[tabid], {format:"jpeg", quality:5}, function(dataUrl) { + plugin.saveSnapshot(url, dataUrl); + }); documentInfo.sentAccess = true; tabInfo[tabid] = documentInfo; @@ -51,7 +56,6 @@ function sendAccessEvent (documentInfo, tabid) { function sendLeaveEvent (tabid) { var documentInfo = tabInfo[tabid]; if (documentInfo == null || documentInfo.sentAccess != true) return; - console.log("Sending leave event for " + documentInfo.url); var url = documentInfo.url; var origin = documentInfo.origin; @@ -76,13 +80,8 @@ function sendLeaveEvent (tabid) { // this works in chrome 7,8,9 function onExtensionRequest (request, sender, sendResponse) { var id = sender.tab.id; - if (tabInfo[id] == undefined || tabInfo[id].url != request.url) { - /*if (tabInfo[id] != undefined) { - console.log("Leaving " + tabInfo[id].url); - sendLeaveEvent(id); - }*/ - sendAccessEvent(request, id); - } + sendLeaveEvent(id); + sendAccessEvent(request, id); } var is_chromium = /chromium/.test( navigator.userAgent.toLowerCase() ); @@ -90,16 +89,27 @@ if (!is_chromium) plugin.setActor("application://google-chrome.desktop"); else plugin.setActor("application://chromium-browser.desktop"); chrome.extension.onRequest.addListener (onExtensionRequest); -chrome.bookmarks.onCreated.addListener (onBookmarkCreated); +//chrome.bookmarks.onCreated.addListener (onBookmarkCreated); chrome.tabs.onUpdated.addListener (onTabUpdated); +chrome.tabs.onCreated.addListener( + function (tab) { + currentTabs[tab.id] = tab.windowId; + } +); + +chrome.tabs.onRemoved.addListener( + function (tabid) { + delete currentTabs[tabid]; + } +); //chrome.tabs.onCreated.addListener (onTabCreated); -chrome.tabs.onRemoved.addListener (onTabRemoved); +//chrome.tabs.onRemoved.addListener (onTabRemoved); chrome.windows.getAll({"populate" : true}, function (windows) { for (var i = 0; i < windows.length; i++) { var tabs = windows[i].tabs; for (var j = 0; j < tabs.length; j++) { - chrome.tabs.executeScript(tabs[j].id, {file: "content_script.js"}); + chrome.tabs.executeScript(tabs[j].id, {file: "content_script.js"}); } } }); diff --git a/npapi-plugin/np-zeitgeist.cc b/npapi-plugin/np-zeitgeist.cc index 6c2c04a..0066ab0 100644 --- a/npapi-plugin/np-zeitgeist.cc +++ b/npapi-plugin/np-zeitgeist.cc @@ -51,6 +51,7 @@ hasMethod(NPObject* obj, NPIdentifier methodName) { if (!strcmp(name, "insertEvent")) return true; else if (!strcmp(name, "setActor")) return true; + else if (!strcmp(name, "saveSnapshot")) return true; return false; } @@ -59,6 +60,7 @@ static bool invokeInsertEvent (NPObject *obj, const NPVariant *args, uint32_t argCount, NPVariant *result) { /* args should be: url, origin, mimetype, title, [interpretation] */ + g_debug("Inserting event"); char *url, *origin, *mimetype, *title; char *interpretation = NULL; const char *manifestation = NULL; @@ -66,9 +68,11 @@ invokeInsertEvent (NPObject *obj, const NPVariant *args, uint32_t argCount, NPVa const NPString *np_s; ZeitgeistEvent *event; + g_debug("arg count: %d", argCount); if(argCount < 4 || argCount > 6) { npnfuncs->setexception(obj, "exception during invocation"); + g_debug("too many or too few args"); return false; } @@ -145,6 +149,64 @@ invokeInsertEvent (NPObject *obj, const NPVariant *args, uint32_t argCount, NPVa } static bool +invokeSaveSnapshot (NPObject *obj, const NPVariant *args, uint32_t argCount, NPVariant *result) +{ + char *url; + char *screenshotURL = NULL; + const NPString *np_s; + np_s = &NPVARIANT_TO_STRING (args[0]); + url = g_strndup(np_s->UTF8Characters, np_s->UTF8Length); + np_s = &NPVARIANT_TO_STRING (args[1]); + screenshotURL = g_strndup(np_s->UTF8Characters, np_s->UTF8Length); + + if (!screenshotURL) + return false; + + gsize len = strlen(screenshotURL) - 22; + char *img = new char[len]; + memset(img, 0, len); + memcpy(img, screenshotURL + 22, len); + + // update thumbnail + gchar *thumbnail_path; + gchar *thumbnail_filename = NULL; + gchar *thumbnail_dir; + gchar *csum; + + // create dir if it doesn't exist + thumbnail_dir = g_build_filename (g_get_home_dir (), + ".thumbnails", + "large", + NULL); + if (!g_file_test(thumbnail_dir, G_FILE_TEST_IS_DIR)) { + g_mkdir_with_parents (thumbnail_dir, 0755); + } + g_free (thumbnail_dir); + + csum = g_compute_checksum_for_string (G_CHECKSUM_MD5, url, -1); + + thumbnail_filename = g_strconcat (csum, ".png", NULL); + thumbnail_path = g_build_filename (g_get_home_dir (), + ".thumbnails", + "large", + thumbnail_filename, + NULL); + g_free (csum); + + guchar *jpg_data = g_base64_decode(img, &len); + + g_debug("Writing thumbnail to %s", thumbnail_path); + g_file_set_contents(thumbnail_path, (gchar*)jpg_data, len, NULL); + + g_free (img); + g_free (jpg_data); + g_free (thumbnail_filename); + g_free (thumbnail_path); + + return true; +} + +static bool invokeSetActor (NPObject *obj, const NPVariant *args, uint32_t argCount, NPVariant *result) { const NPString *np_s; @@ -175,6 +237,7 @@ invoke(NPObject* obj, NPIdentifier methodName, const NPVariant *args, uint32_t a name = npnfuncs->utf8fromidentifier(methodName); + g_debug("Calling %s", name); if(name) { if (!strcmp (name, "insertEvent")) @@ -185,6 +248,10 @@ invoke(NPObject* obj, NPIdentifier methodName, const NPVariant *args, uint32_t a { return invokeSetActor(obj, args, argCount, result); } + else if (!strcmp (name, "saveSnapshot")) + { + return invokeSaveSnapshot(obj, args, argCount, result); + } } npnfuncs->setexception(obj, "exception during invocation"); diff --git a/telepathy/zeitgeist-telepathy-observer b/telepathy/zeitgeist-telepathy-observer index 403f930..7bf8b7c 100755 --- a/telepathy/zeitgeist-telepathy-observer +++ b/telepathy/zeitgeist-telepathy-observer @@ -1,547 +1,322 @@ #!/usr/bin/env python -# -# This observer tracks text conversation and reports events to the Zeitgeist -# engine. Eventually, it will report file transfers and media streams -# -# ZConnections and ZChannels are tracked by the ZObserver, their invalidation -# is signalled by GObject signals. ZObserver.ObserveChannels uses asynchronous -# return functions to only return from the D-Bus method once all of the channels -# are prepared, this gives the Observer time to investigate the pending message -# queue before any Handlers can acknowledge it. -# -# Authors: Danielle Madeley <danielle.madeley@collabora.co.uk> -# Morten Mjelva <morten.mjelva@gmail.com> -# -import dbus, dbus.glib -import gobject - -import sys - -from time import time - -import logging -logging.basicConfig(level=logging.DEBUG) - -import telepathy -from telepathy.constants import CONNECTION_STATUS_DISCONNECTED, \ - HANDLE_TYPE_CONTACT - -from telepathy.interfaces import CHANNEL, \ - CHANNEL_TYPE_FILE_TRANSFER, \ - CHANNEL_TYPE_STREAMED_MEDIA, \ - CHANNEL_TYPE_TEXT, \ - CLIENT, \ - CLIENT_OBSERVER, \ - CONNECTION, \ - CONNECTION_INTERFACE_ALIASING, \ - CONNECTION_INTERFACE_CONTACTS +from gi.repository import TelepathyGLib as Tp +from gi.repository import GObject, Gio from zeitgeist.client import ZeitgeistClient from zeitgeist.datamodel import Event, Subject, Interpretation, Manifestation - -def error(*args): - logging.error("error %s" % args) - -# Create a connection to the Zeitgeist engine -try: - zclient = ZeitgeistClient() -except RuntimeError, e: - logging.error("Unable to connect to Zeitgeist. Won't send events. Reason: \ - %s" % e) - zclient = None - -class ZConnection(gobject.GObject, telepathy.client.Connection): - """ - Extends telepathy.client.Connection, storing information about proxy obj - - Object extends tp.Connection object, letting us store information about the - connection proxy locally. - Arguments: - path: Unique path to this connection - ready_handler: Callback function run once we're done setting up object - """ - __gsignals__ = { - 'disconnected': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, ()), - } - - def __init__(self, path, ready_handler): - service_name = path.replace('/', '.')[1:] - - # Ready callback - self.ready_handler = ready_handler - self.signals = [] - - gobject.GObject.__init__(self) - telepathy.client.Connection.__init__(self, service_name, path, - ready_handler = self._connection_ready) - - def __repr__(self): - return "ZConnection(%s)" % self.object_path - - def _status_changed(self, status, reason): - if status == CONNECTION_STATUS_DISCONNECTED: - for signal in self.signals: - signal.remove() - - self.emit('disconnected') - - def _connection_ready(self, conn): - def interfaces(interfaces): - self.contact_attr_interfaces = interfaces - - # Connection ready - self.ready_handler(conn) - - # Get contact attribute interfaces - self[dbus.PROPERTIES_IFACE].Get(CONNECTION_INTERFACE_CONTACTS, - 'ContactAttributeInterfaces', - reply_handler=interfaces, - error_handler=error) - - self.signals.append(self[CONNECTION].connect_to_signal( - 'StatusChanged', self._status_changed)) - - # This is necessary so our signal isn't sent over D-Bus - def do_disconnected(self): - pass -gobject.type_register(ZConnection) - -class ZChannel(gobject.GObject, telepathy.client.Channel): - """ - Extends telepathy.client.Channel - - This class allows us to store information about a telepathy.client.Channel - object locally. - Arguments: - account_path: Path to the account used. Passed to ObserveChannels. - connection: The connection used for the channel - properties: Properties of the channel we're proxying, from ObserveChannels - ready_handler: Callback function run once we're done setting up object - """ - __gsignals__ = { - 'closed': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, ()), - } - - def __init__(self, account_path, connection, path, properties, ready_handler): - self.account_path = account_path - self.conn = connection - self.properties = properties - self.ready_handler = ready_handler - - self.signals = [] - - gobject.GObject.__init__(self) - telepathy.client.Channel.__init__(self, connection.service_name, path, - ready_handler=self._channel_ready) - - def __repr__(self): - return "ZChannel(%s)" % self.object_path - - def _send_to_zeitgeist(self, subjects, event_details): - event_details['subjects'] = subjects - zevent = Event.new_for_values(**event_details) - zclient.insert_event(zevent) - - def _channel_closed_cb(self): - for signal in self.signals: - signal.remove() - - self.emit('closed') - - def _default_operations_finished(self): - self._release_channel() - - def _get_contact_attributes_cb(self, attributes_map): - handle = self.properties[CHANNEL + '.TargetHandle'] +from zeitgeist.mimetypes import get_interpretation_for_mimetype + +import json + +GObject.threads_init() + +ZG_ACTOR = "dbus://org.freedesktop.Telepathy.Logger.service" +TP_ACCOUNT_PATH = "x-telepathy-account-path:%s" +TP_IDENTIFIER = "x-telepathy-identifier:%s" + +dbus = Tp.DBusDaemon.dup() +zg_client = ZeitgeistClient() + +def callback (ids): + print ids + +def error_handler (error): + print error + +def create_event(account, channel): + target = channel.get_target_contact() + event_template = Event.new_for_values( + actor = ZG_ACTOR, + interpretation = Interpretation.ACCESS_EVENT, + manifestation = Manifestation.USER_ACTIVITY \ + if channel.get_properties("requested") else Manifestation.WORLD_ACTIVITY, + origin = TP_ACCOUNT_PATH % account.get_object_path()[len(Tp.ACCOUNT_OBJECT_PATH_BASE):]) + event_template.subjects.append( + Subject.new_for_values( + uri = "", + interpretation = Interpretation.IMMESSAGE, + manifestation = Manifestation.SOFTWARE_SERVICE, + mimetype = "plain/text", + origin = TP_IDENTIFIER % target.get_identifier(), + text = target.get_alias(), + storage = "net")) + event_template.subjects.append( + Subject.new_for_values( + uri = TP_IDENTIFIER % target.get_identifier(), + interpretation = Interpretation.CONTACT, + manifestation = Manifestation.CONTACT_LIST_DATA_OBJECT, + origin = TP_IDENTIFIER % target.get_identifier(), + text = target.get_alias(), + storage = "net")) + return event_template + +def print_channel(event): + print "Event:" + print " - timestamp:", event.timestamp + print " - actor", event.actor + print " - interpretation:", event.interpretation + print " - manifestation:", event.manifestation + print " - origin:", event.origin + print " - subjects:", len(event.subjects) + for i, subject in enumerate(event.subjects): + print " - subject %i:" % (i+1) + print " - uri:", subject.uri + print " - interpretation:", subject.interpretation + print " - manifestation:", subject.manifestation + print " - mimetype:", subject.mimetype + print " - origin:", subject.origin + print " - text:", subject.text + print " - storage:", subject.storage + print " -payload:", event.payload + zg_client.insert_events([event], callback, error_handler) + +""" +Handling of text based events +""" + +def msg_recv_callback (channel, message, account): + if message.is_delivery_report(): + return + print "=== RECEIVED MSG ===" + event_template = create_event (account, channel) + event_template.interpretation = Interpretation.RECEIVE_EVENT + event_template.manifestation = Manifestation.WORLD_ACTIVITY + print_channel(event_template) + +def msg_sent_callback (channel, message, flags, token, account): + print "=== SENT MSG ===" + event_template = create_event (account, channel) + event_template.interpretation = Interpretation.SEND_EVENT + event_template.manifestation = Manifestation.USER_ACTIVITY + print_channel(event_template) + +def channel_closed_callback (channel, domain, code, message, account): + print "=== CLOSED CHANNEL ===" + event_template = create_event (account, channel) + event_template.interpretation = Interpretation.LEAVE_EVENT + print_channel(event_template) + +def observe_textchannels(observer, account, connection, channels, + dispatch_op, requests, context, user_data): + try: + for channel in channels: + target = channel.get_target_contact() + if not target: + continue + event_template = create_event(account, channel) + print "=== CREATED CHANNEL ===" + print_channel(event_template) + for message in channel.get_pending_messages(): + msg_recv_callback(channel, message, account) + event_template = create_event(account, channel) + channel.connect('invalidated', channel_closed_callback, account) + channel.connect('message-received', msg_recv_callback, account) + channel.connect('message-sent', msg_sent_callback, account) + finally: + context.accept() + +factory = Tp.AutomaticClientFactory.new (dbus) + +factory.add_channel_features ([Tp.Channel.get_feature_quark_contacts()]) +factory.add_contact_features ([Tp.ContactFeature.ALIAS]) +text_observer = Tp.SimpleObserver.new_with_factory(factory, True, + 'ZeitgeistTextObserver', True, observe_textchannels, None) +text_observer.add_observer_filter({ + Tp.PROP_CHANNEL_CHANNEL_TYPE: Tp.IFACE_CHANNEL_TYPE_TEXT, + Tp.PROP_CHANNEL_TARGET_HANDLE_TYPE: int(Tp.HandleType.CONTACT), + +}) +text_observer.register() + + +""" +Handling of call based events +""" + +call_timers = {} + +def create_call_event(account, channel): + targets = channel.get_members() + if not targets: + return + event_template = Event.new_for_values( + actor = ZG_ACTOR, + interpretation = Interpretation.ACCESS_EVENT, + manifestation = Manifestation.USER_ACTIVITY \ + if channel.get_property("requested") else Manifestation.WORLD_ACTIVITY, + origin = TP_ACCOUNT_PATH % account.get_object_path()[len(Tp.ACCOUNT_OBJECT_PATH_BASE):]) + for i, target in enumerate(targets): + if i == 0: + event_template.subjects.append ( + Subject.new_for_values( + uri = "", + interpretation = Interpretation.MEDIA.AUDIO, + manifestation = Manifestation.MEDIA_STREAM, + mimetype = "x-telepathy/call", + origin = TP_IDENTIFIER % target.get_identifier(), + text = target.get_alias(), + storage = "net")) + event_template.subjects.append ( + Subject.new_for_values( + uri = TP_IDENTIFIER % target.get_identifier(), + interpretation = Interpretation.CONTACT, + manifestation = Manifestation.CONTACT_LIST_DATA_OBJECT, + origin = TP_IDENTIFIER % target.get_identifier(), + text = target.get_alias(), + storage = "net")) + return event_template + + +def call_state_changed (channel, state, flags, reason, details, account): + #FIXME: Something breaks this + made_by_user = False + if reason.actor == channel.get_property("connection").get_self_handle(): + made_by_user = True + #print "User Data:", user_data + #print "Event Template: ", event_template + if state in (3, 5, 6): + event_template = create_call_event (account, channel) + event_template.manifestation = Manifestation.USER_ACTIVITY if made_by_user \ + else Manifestation.WORLD_ACTIVITY + + if state == Tp.CallState.INITIALISED: + event_template.interpretation = Interpretation.CREATE_EVENT + call_timers[channel] = 0 + print_channel(event_template) + + elif state == Tp.CallState.ACTIVE: + event_template.interpretation = Interpretation.ACCESS_EVENT + call_timers[channel] = int(event_template.timestamp) + print_channel(event_template) + + elif state == Tp.CallState.ENDED: + if call_timers.has_key(channel): + event_template.interpretation = Interpretation.LEAVE_EVENT + if reason.reason.numerator == Tp.CallStateChangeReason.REJECTED: + event_template.interpretation = Interpretation.DENY_EVENT + elif reason.reason.numerator == Tp.CallStateChangeReason.NO_ANSWER: + event_template.interpretation = Interpretation.EXPIRE_EVENT + + duration = 0 if not call_timers.has_key(channel) \ + else int(event_template.timestamp) - call_timers[channel] + details = {"http://zeitgeist-project.com/1.0/telepathy/call": { + "state": channel.get_state()[0].numerator, + "reason": channel.get_state()[1].numerator, + "requested": channel.get_property("requested"), + "host": TP_ACCOUNT_PATH % account.get_object_path()[len(Tp.ACCOUNT_OBJECT_PATH_BASE):] \ + if channel.get_property("requested") else event_template.subjects[1].uri, + "receiver": TP_IDENTIFIER % event_template.subjects[1].uri \ + if channel.get_property("requested") \ + else TP_ACCOUNT_PATH % account.get_object_path()[len(Tp.ACCOUNT_OBJECT_PATH_BASE):], + "duration": duration + }} + details = json.dumps(details) + event_template.payload = details.encode("utf-8") + del call_timers[channel] + print_channel(event_template) + +def observe_callchannels(observer, account, connection, channels, + dispatch_op, requests, context, user_data): try: - attributes = attributes_map[handle] - self._target_alias = attributes[CONNECTION_INTERFACE_ALIASING + - '/alias'] - except KeyError: - self._target_alias = self._target_id - - self._connect_to_signals() - self._default_operations_finished() - - def _get_requested_cb(self, req): - handle = self.properties[CHANNEL + '.TargetHandle'] - self._channel_requested = req - - # Get contact name - self.conn[CONNECTION_INTERFACE_CONTACTS].GetContactAttributes( - [handle], - # assuming this is available is technically wrong - [ CONNECTION_INTERFACE_ALIASING ], - False, - reply_handler=self._get_contact_attributes_cb, - error_handler=error) - - def _release_channel(self): - # Finished with critical tasks. - self.ready_handler(self) - - def _connect_to_signals(self): - # Connect to signals - self.signals.append(self[CHANNEL].connect_to_signal('Closed', - self._channel_closed_cb)) - - # Called when channel proxy becomes ready. Chains method calls so actions - # which depend on each other are executed in order. - def _channel_ready(self, channel): - self._target_id = self.properties[CHANNEL + '.TargetID'] - - # Get contact attribute interfaces - self[dbus.PROPERTIES_IFACE].Get(CHANNEL, - 'Requested', - reply_handler=self._get_requested_cb, - error_handler=error) - - # This is necessary so our signal isn't sent over D-Bus - def do_closed(self): - pass -gobject.type_register(ZChannel) - -class ZTextChannel(ZChannel): - def __init__(self, account_path, connection, path, properties, ready_handler): - ZChannel.__init__(self, account_path, connection, path, properties, ready_handler) - - self._subject = None - - def __repr__(self): - return "ZTextChannel(%s)" % self.object_path - - def _release_channel(self): - # Finished with critical tasks. - self.ready_handler(self) - - # Report to zeitgeist - if not self._subject: - self._subject = [Subject.new_for_values( - uri = "telepathy://" + self.account_path + "/" + self._target_id, - interpretation = unicode(Interpretation.IMMESSAGE), - manifestation = unicode(Manifestation.SOFTWARE_SERVICE), - origin = self.account_path, - mimetype = "text/plain", - text = self._target_alias)] - - if self._channel_requested: - manifestation = unicode(Manifestation.USER_ACTIVITY) - else: - manifestation = unicode(Manifestation.WORLD_ACTIVITY) - - timestamp = int(time() * 1000) - - event = Event.new_for_values( - timestamp = timestamp, - interpretation = unicode(Interpretation.ACCESS_EVENT), - manifestation = manifestation, - actor = self.account_path, - subjects = self._subject) - zclient.insert_event(event) - - def _channel_closed_cb(self): - for signal in self.signals: - signal.remove() - - timestamp = int(time() * 1000) - - event = Event.new_for_values( - timestamp = timestamp, - interpretation = unicode(Interpretation.LEAVE_EVENT), - manifestation = unicode(Manifestation.USER_ACTIVITY), - actor = self.account_path, - subjects = self._subject) - zclient.insert_event(event) - - self.emit('closed') - - def _get_pending_messages_cb(self, messages): - for message in messages: - self._message_received_cb(*message) - - self._release_channel() - - def _default_operations_finished(self): - self[CHANNEL_TYPE_TEXT].ListPendingMessages( - False, - reply_handler=self._get_pending_messages_cb, - error_handler=error) - - def _message_received_cb(self, identification, timestamp, sender, contenttype, - flags, content): - logging.debug("Message received") - if not self._subject: - self._subject = [Subject.new_for_values( - uri = "telepathy://" + self.account_path + "/" + self._target_id, - interpretation = unicode(Interpretation.IMMESSAGE), - manifestation = unicode(Manifestation.SOFTWARE_SERVICE), - origin = self.account_path, - mimetype = "text/plain", - text = self._target_alias)] - - timestamp = timestamp * 1000 - - event = Event.new_for_values( - timestamp = timestamp, - interpretation = unicode(Interpretation.RECEIVE_EVENT), - manifestation = unicode(Manifestation.WORLD_ACTIVITY), - actor = self.account_path, - subjects = self._subject) - zclient.insert_event(event) - - def _message_sent_cb(self, timestamp, contenttype, content): - logging.debug("Message sent") - - timestamp = timestamp * 1000 - - event = Event.new_for_values( - timestamp = timestamp, - interpretation = unicode(Interpretation.SEND_EVENT), - manifestation = unicode(Manifestation.USER_ACTIVITY), - actor = self.account_path, - subjects = self._subject) - zclient.insert_event(event) - - def _connect_to_signals(self): - # Connect to signals - self.signals.append(self[CHANNEL_TYPE_TEXT].connect_to_signal('Sent', - self._message_sent_cb)) - self.signals.append(self[CHANNEL_TYPE_TEXT].connect_to_signal('Received', - self._message_received_cb)) - self.signals.append(self[CHANNEL].connect_to_signal('Closed', - self._channel_closed_cb)) - -class ZStreamedMediaChannel(ZChannel): - def __init__(self, account_path, connection, path, properties, ready_handler): - self._audio_subject = None - self._video_subject = None - - self.stream_cache = {} - - ZChannel.__init__(self, account_path, connection, path, properties, ready_handler) - - def __repr__(self): - return "ZStreamedMediaChannel(%s)" % self.object_path - - def _channel_closed_cb(self): - for signal in self.signals: - signal.remove() - - self.emit('closed') - - def _stream_added_cb(self, streamid, handle, streamtype): - logging.debug("Stream added. Streamtype: " + str(streamtype)) - self.stream_cache[streamid] = {'handle': handle, 'streamtype': streamtype} - - if not self._audio_subject: - self._audio_subject = [Subject.new_for_values( - uri = "telepathy://" + self.account_path + "/" + self._target_id, - interpretation = unicode(Interpretation.AUDIO), - manifestation = unicode(Manifestation.MEDIA_STREAM), - origin = self.account_path, - mimetype = "text/plain", - text = self._target_alias)] - - if not self._video_subject: - self._video_subject = [Subject.new_for_values( - uri = "telepathy://" + self.account_path + "/" + self._target_id, - interpretation = unicode(Interpretation.VIDEO), - manifestation = unicode(Manifestation.MEDIA_STREAM), - origin = self.account_path, - mimetype = "text/plain", - text = self._target_alias)] - - if streamtype == 0: - subject = self._audio_subject - elif streamtype == 1: - subject = self._video_subject - - if self._channel_requested: - manifestation = unicode(Manifestation.USER_ACTIVITY) - else: - manifestation = unicode(Manifestation.WORLD_ACTIVITY) - - timestamp = int(time() * 1000) - event = Event.new_for_values( - timestamp = timestamp, - interpretation = unicode(Interpretation.ACCESS_EVENT), - manifestation = manifestation, - actor = self.account_path, - subjects = subject) - zclient.insert_event(event) - - - def _stream_removed_cb(self, streamid): - streamtype = self.stream_cache[streamid]['streamtype'] - logging.debug("Stream removed. Streamtype: " + str(streamtype)) - - if streamtype == 0: - subject = self._audio_subject - elif streamtype == 1: - subject = self._video_subject - - timestamp = int(time() * 1000) - event = Event.new_for_values( - timestamp = timestamp, - interpretation = unicode(Interpretation.LEAVE_EVENT), - manifestation = unicode(Manifestation.USER_ACTIVITY), - actor = self.account_path, - subject = subject) - zclient.insert_event(event) - - del self.stream_cache[streamid] - - def _list_streams_cb(self, streams): - for stream in streams: - self._stream_added_cb(stream[0], stream[1], stream[2]) - - def _default_operations_finished(self): - self._release_channel() - - self[CHANNEL_TYPE_STREAMED_MEDIA].ListStreams( - reply_handler=self._list_streams_cb, - error_handler=error) - - def _connect_to_signals(self): - # Connect to signals - self.signals.append(self[CHANNEL_TYPE_STREAMED_MEDIA].connect_to_signal('StreamAdded', - self._stream_added_cb)) - self.signals.append(self[CHANNEL_TYPE_STREAMED_MEDIA].connect_to_signal('StreamRemoved', - self._stream_removed_cb)) - self.signals.append(self[CHANNEL].connect_to_signal('Closed', - self._channel_closed_cb)) - -class ZObserver(telepathy.server.Observer, - telepathy.server.DBusProperties): - """ - Extends telepathy.server.Observer, listening for events - - - ZObserver listens for a channel matching the filter is opened by telepathy, - and reacts by opening connection and channel proxies in order to liisten in - on events in the channel. - """ - - def __init__(self, client_name, enable_stream_observer=False, - enable_text_observer=False): - - service_name = '.'.join ([CLIENT, client_name]) - object_path = '/' + service_name.replace('.', '/') - - bus_name = dbus.service.BusName(service_name, bus=dbus.SessionBus()) - - telepathy.server.Observer.__init__(self, bus_name, object_path) - telepathy.server.DBusProperties.__init__(self) - - self._implement_property_get(CLIENT, { - 'Interfaces': lambda: [ CLIENT_OBSERVER ], - }) - - - filter_array = dbus.Array([], signature='a{sv}') - - if enable_stream_observer: - filter_array.append(dbus.Dictionary({ - CHANNEL + '.ChannelType': CHANNEL_TYPE_STREAMED_MEDIA, - CHANNEL + '.TargetHandleType': HANDLE_TYPE_CONTACT - }, signature='sv')) - - if enable_text_observer: - filter_array.append(dbus.Dictionary({ - CHANNEL + '.ChannelType': CHANNEL_TYPE_TEXT, - CHANNEL + '.TargetHandleType': HANDLE_TYPE_CONTACT - }, signature='sv')) - - self._implement_property_get(CLIENT_OBSERVER, { - 'ObserverChannelFilter': lambda: filter_array}) - - self.connection_cache = {} - self.channel_cache = {} - - # Set async callbacks so ObserveChannels won't return until we're finished - @dbus.service.method(CLIENT_OBSERVER, - in_signature='ooa(oa{sv})oaoa{sv}', out_signature='', - async_callbacks=('_success', '_error')) - def ObserveChannels(self, account, conn, channels, dispatch_operation, - requests_satisfied, observer_info, _success, _error): - - # List of channels we're waiting to finish before we release the bus - # with _success() - pending_channels = [] - - # Create ZChannel objects for the current connection - def create_channels(conn): - for path, properties in channels: - if path not in self.channel_cache: - chantype = properties[CHANNEL + '.ChannelType'] - if chantype == CHANNEL_TYPE_STREAMED_MEDIA: - chan = ZStreamedMediaChannel(account, conn, path, \ - properties, channel_ready) - elif chantype == CHANNEL_TYPE_TEXT: - chan = ZTextChannel(account, conn, path, \ - properties, channel_ready) - - pending_channels.append(chan) - self.channel_cache[path] = chan - - - # Called when the channel proxy is invalidated - def channel_closed(chan): - try: - del self.channel_cache[chan.object_path] - logging.debug("Channel closed: %s" % chan) - except: - logging.error("Couldn't delete channel: %s" % chan) - - # No more channels means we aren't needed. - # Telepathy will restart us when we are - if not self.channel_cache: - logging.warning("No channels left in list. Exiting") - sys.exit() - - # Called when ZChannel object has finished its tasks - # Removes channel from pending queue, and releases the connection - # if all channeled have been processed - def channel_ready(chan): - logging.debug("Channel ready: %s" % chan) - - pending_channels.remove(chan) - chan.connect('closed', channel_closed) - - if not pending_channels: - _success() - - # Called when the connection proxy is invalidated - def connection_closed(conn): - logging.debug("Connection closed: %s" % conn) - del self.connection_cache[conn.object_path] - - # Called when ZConnection object has finished its tasks - # Connects to 'disconnected' signal to listen for connection - # invalidation. Triggers creation of ZChannel objects for the - # connection - def connection_ready(conn): - logging.debug("Connection ready: %s" % conn) - conn.connect('disconnected', connection_closed) - create_channels(conn) - - # If the connection exists in cache, call the handler manually to - # set up channels. Otherwise, create the connection and cache it. - if conn not in self.connection_cache: - try: - logging.debug("Trying to create connection...") - self.connection_cache[conn] = ZConnection(conn, connection_ready) - except: - logging.error("Connection creation failed") - else: - logging.debug("Connection exists: %s" % conn) - create_channels(self.connection_cache[conn]) - -def main(): - ZObserver("Zeitgeist", enable_stream_observer=True, - enable_text_observer=True) - -if __name__ == '__main__': - gobject.timeout_add(0, main) - loop = gobject.MainLoop() - loop.run() + for channel in channels: + if channel.get_state() == Tp.CallState.INITIALISED: + event_template.interpretation = Interpretation.CREATE_EVENT + call_timers[channel] = 0 + print_channel(event_template) + channel.connect("state-changed", call_state_changed, account) + finally: + context.accept() + +factory = Tp.AutomaticClientFactory.new (dbus) +factory.add_channel_features ([Tp.Channel.get_feature_quark_contacts()]) +factory.add_contact_features ([Tp.ContactFeature.ALIAS]) + +call_observer = Tp.SimpleObserver.new_with_factory(factory, True, + 'ZeitgeistCallObserver', True, observe_callchannels, None) +call_observer.add_observer_filter({ + Tp.PROP_CHANNEL_CHANNEL_TYPE: Tp.IFACE_CHANNEL_TYPE_CALL, + Tp.PROP_CHANNEL_TARGET_HANDLE_TYPE: int(Tp.HandleType.CONTACT), + +}) +call_observer.register() + +""" +Handling of file transfer based events +""" + +def ft_state_changed (channel, state, account): + state = channel.get_state()[0] + target = channel.get_target_contact() + print state, ZG_ACTOR + if state in (4,5): + # get attributes of the file being sent or received + attr = (Gio.FILE_ATTRIBUTE_STANDARD_DISPLAY_NAME, + Gio.FILE_ATTRIBUTE_STANDARD_CONTENT_TYPE, + Gio.FILE_ATTRIBUTE_STANDARD_SIZE ) + info = channel.get_property("file").query_info (",".join(attr), + Gio.FileQueryInfoFlags.NONE, None) + # setup new event to be sent to zeitgeist + event = Event.new_for_values ( + interpretation = Interpretation.SEND_EVENT \ + if channel.get_property("requested") else Interpretation.RECEIVE_EVENT, + manifestation = Manifestation.USER_ACTIVITY \ + if channel.get_property("requested") else Manifestation.WORLD_ACTIVITY, + actor = ZG_ACTOR, + origin = TP_ACCOUNT_PATH % account.get_object_path()[35:]) + subject = Subject.new_for_values ( + uri = channel.get_property("file").get_uri(), + interpretation = get_interpretation_for_mimetype(info.get_content_type()), + manifestation = Manifestation.FILE_DATA_OBJECT \ + if channel.get_property("requested") else Manifestation.FILE_DATA_OBJECT.REMOTE_DATA_OBJECT, + text = info.get_display_name(), + mimetype = info.get_content_type(), + origin = "/".join(channel.get_property("file").get_uri().split("/")[:-1])+"/" \ + if channel.get_property("requested") else TP_IDENTIFIER % target.get_identifier()) + event.subjects.append(subject) + subject = Subject.new_for_values ( + uri = TP_IDENTIFIER % target.get_identifier(), + interpretation = Interpretation.CONTACT.PERSON_CONTACT, + manifestation = Manifestation.CONTACT_LIST_DATA_OBJECT, + origin = TP_IDENTIFIER % target.get_identifier(), + text = target.get_alias(), + storage = "net") + event.subjects.append(subject) + details = {"http://zeitgeist-project.com/1.0/telepathy/filetransfer": { + "state": channel.get_state()[0].numerator, + "reason": channel.get_state()[1].numerator, + "requested": channel.get_property("requested"), + "sender": TP_ACCOUNT_PATH % account.get_object_path()[35:] \ + if channel.get_property("requested") else TP_IDENTIFIER % target.get_identifier(), + "receiver": TP_IDENTIFIER % target.get_identifier() \ + if channel.get_property("requested") else TP_ACCOUNT_PATH % account.get_object_path()[35:], + "mimetype": info.get_content_type(), + "date": channel.get_date().to_unix(), + "description": channel.get_description(), + "size": channel.get_size(), + "service": channel.get_service_name(), + "uri": channel.get_property("file").get_uri() + }} + details = json.dumps(details) + event.payload = details.encode("utf-8") + print_channel(event) + +def observe_ftchannels(observer, account, connection, channels, + dispatch_op, requests, context, user_data): + context.accept() + print "FT" + for channel in channels: + channel.connect("notify::state", ft_state_changed, account) + +ft_observer = Tp.SimpleObserver.new(dbus, True, 'ZeitgeistFTObserver', True, + observe_ftchannels, None) +ft_observer.add_observer_filter({ + Tp.PROP_CHANNEL_CHANNEL_TYPE: Tp.IFACE_CHANNEL_TYPE_FILE_TRANSFER, +}) +ft_observer.register() + +""" +Start the mainloop +""" + +main_loop = GObject.MainLoop() +main_loop.run() |