summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSeif Lotfy <seif@lotfy.com>2012-03-24 02:46:44 +0530
committerManish Sinha <manishsinha@ubuntu.com>2012-03-24 02:46:44 +0530
commit89e99c55c831248b41f637845435739bcb335d14 (patch)
treecd7780511718d4481179238c84cd0d6b001976b1
parentb3fc96b175335c133e8f4ed4b11cd01271ba5f7a (diff)
parent8d73589f4e1d3f07ea337bac42bfad2514a8c020 (diff)
Updated the Telepathy loggers and update chrome extension to create thunbnails
-rw-r--r--chrome/zeitgeist.js48
-rw-r--r--npapi-plugin/np-zeitgeist.cc67
-rwxr-xr-xtelepathy/zeitgeist-telepathy-observer859
3 files changed, 413 insertions, 561 deletions
diff --git a/chrome/zeitgeist.js b/chrome/zeitgeist.js
index 534d3d7..82bcb01 100644
--- a/chrome/zeitgeist.js
+++ b/chrome/zeitgeist.js
@@ -1,6 +1,7 @@
var plugin = document.embeds[0];
var tabInfo = {};
var tabIdTimeouts = {};
+var currentTabs = {};
function onTabCreated (tab) {
chrome.tabs.executeScript(tab.id, {file: "content_script.js"});
@@ -13,9 +14,10 @@ function onTabRemoved (tabid) {
function onTabUpdated (tabid, changeInfo, tab) {
if (!changeInfo.url) return;
window.clearTimeout(tabIdTimeouts[tabid])
- tabIdTimeouts[tabid] = window.setTimeout(function(){
- chrome.tabs.executeScript(tabid, {file: "content_script.js"});},
- 5000);
+ tabIdTimeouts[tabid] = window.setTimeout(function(){
+ console.log("sending event for " + tab.url);
+ chrome.tabs.executeScript(tabid, {file: "content_script.js"});},
+ 5000);
}
function onBookmarkCreated (bookmarkid, bookmark) {
@@ -27,7 +29,6 @@ function onBookmarkCreated (bookmarkid, bookmark) {
}
function sendAccessEvent (documentInfo, tabid) {
- console.log("Sending access event for " + documentInfo.url);
var url = documentInfo.url;
var origin = documentInfo.origin;
var domain = documentInfo.domain;
@@ -39,10 +40,14 @@ function sendAccessEvent (documentInfo, tabid) {
}
var mimetype = documentInfo.mimeType;
var title = documentInfo.title;
- plugin.insertEvent(url,
- domain,
- mimetype ? mimetype : "text/html",
- title);
+ plugin.insertEvent(url,
+ domain,
+ mimetype ? mimetype : "text/html",
+ title);
+ console.log("save thumbnail for "+currentTabs[tabid]+": "+url);
+ chrome.tabs.captureVisibleTab(currentTabs[tabid], {format:"jpeg", quality:5}, function(dataUrl) {
+ plugin.saveSnapshot(url, dataUrl);
+ });
documentInfo.sentAccess = true;
tabInfo[tabid] = documentInfo;
@@ -51,7 +56,6 @@ function sendAccessEvent (documentInfo, tabid) {
function sendLeaveEvent (tabid) {
var documentInfo = tabInfo[tabid];
if (documentInfo == null || documentInfo.sentAccess != true) return;
- console.log("Sending leave event for " + documentInfo.url);
var url = documentInfo.url;
var origin = documentInfo.origin;
@@ -76,13 +80,8 @@ function sendLeaveEvent (tabid) {
// this works in chrome 7,8,9
function onExtensionRequest (request, sender, sendResponse) {
var id = sender.tab.id;
- if (tabInfo[id] == undefined || tabInfo[id].url != request.url) {
- /*if (tabInfo[id] != undefined) {
- console.log("Leaving " + tabInfo[id].url);
- sendLeaveEvent(id);
- }*/
- sendAccessEvent(request, id);
- }
+ sendLeaveEvent(id);
+ sendAccessEvent(request, id);
}
var is_chromium = /chromium/.test( navigator.userAgent.toLowerCase() );
@@ -90,16 +89,27 @@ if (!is_chromium) plugin.setActor("application://google-chrome.desktop");
else plugin.setActor("application://chromium-browser.desktop");
chrome.extension.onRequest.addListener (onExtensionRequest);
-chrome.bookmarks.onCreated.addListener (onBookmarkCreated);
+//chrome.bookmarks.onCreated.addListener (onBookmarkCreated);
chrome.tabs.onUpdated.addListener (onTabUpdated);
+chrome.tabs.onCreated.addListener(
+ function (tab) {
+ currentTabs[tab.id] = tab.windowId;
+ }
+);
+
+chrome.tabs.onRemoved.addListener(
+ function (tabid) {
+ delete currentTabs[tabid];
+ }
+);
//chrome.tabs.onCreated.addListener (onTabCreated);
-chrome.tabs.onRemoved.addListener (onTabRemoved);
+//chrome.tabs.onRemoved.addListener (onTabRemoved);
chrome.windows.getAll({"populate" : true}, function (windows) {
for (var i = 0; i < windows.length; i++) {
var tabs = windows[i].tabs;
for (var j = 0; j < tabs.length; j++) {
- chrome.tabs.executeScript(tabs[j].id, {file: "content_script.js"});
+ chrome.tabs.executeScript(tabs[j].id, {file: "content_script.js"});
}
}
});
diff --git a/npapi-plugin/np-zeitgeist.cc b/npapi-plugin/np-zeitgeist.cc
index 6c2c04a..0066ab0 100644
--- a/npapi-plugin/np-zeitgeist.cc
+++ b/npapi-plugin/np-zeitgeist.cc
@@ -51,6 +51,7 @@ hasMethod(NPObject* obj, NPIdentifier methodName) {
if (!strcmp(name, "insertEvent")) return true;
else if (!strcmp(name, "setActor")) return true;
+ else if (!strcmp(name, "saveSnapshot")) return true;
return false;
}
@@ -59,6 +60,7 @@ static bool
invokeInsertEvent (NPObject *obj, const NPVariant *args, uint32_t argCount, NPVariant *result)
{
/* args should be: url, origin, mimetype, title, [interpretation] */
+ g_debug("Inserting event");
char *url, *origin, *mimetype, *title;
char *interpretation = NULL;
const char *manifestation = NULL;
@@ -66,9 +68,11 @@ invokeInsertEvent (NPObject *obj, const NPVariant *args, uint32_t argCount, NPVa
const NPString *np_s;
ZeitgeistEvent *event;
+ g_debug("arg count: %d", argCount);
if(argCount < 4 || argCount > 6)
{
npnfuncs->setexception(obj, "exception during invocation");
+ g_debug("too many or too few args");
return false;
}
@@ -145,6 +149,64 @@ invokeInsertEvent (NPObject *obj, const NPVariant *args, uint32_t argCount, NPVa
}
static bool
+invokeSaveSnapshot (NPObject *obj, const NPVariant *args, uint32_t argCount, NPVariant *result)
+{
+ char *url;
+ char *screenshotURL = NULL;
+ const NPString *np_s;
+ np_s = &NPVARIANT_TO_STRING (args[0]);
+ url = g_strndup(np_s->UTF8Characters, np_s->UTF8Length);
+ np_s = &NPVARIANT_TO_STRING (args[1]);
+ screenshotURL = g_strndup(np_s->UTF8Characters, np_s->UTF8Length);
+
+ if (!screenshotURL)
+ return false;
+
+ gsize len = strlen(screenshotURL) - 22;
+ char *img = new char[len];
+ memset(img, 0, len);
+ memcpy(img, screenshotURL + 22, len);
+
+ // update thumbnail
+ gchar *thumbnail_path;
+ gchar *thumbnail_filename = NULL;
+ gchar *thumbnail_dir;
+ gchar *csum;
+
+ // create dir if it doesn't exist
+ thumbnail_dir = g_build_filename (g_get_home_dir (),
+ ".thumbnails",
+ "large",
+ NULL);
+ if (!g_file_test(thumbnail_dir, G_FILE_TEST_IS_DIR)) {
+ g_mkdir_with_parents (thumbnail_dir, 0755);
+ }
+ g_free (thumbnail_dir);
+
+ csum = g_compute_checksum_for_string (G_CHECKSUM_MD5, url, -1);
+
+ thumbnail_filename = g_strconcat (csum, ".png", NULL);
+ thumbnail_path = g_build_filename (g_get_home_dir (),
+ ".thumbnails",
+ "large",
+ thumbnail_filename,
+ NULL);
+ g_free (csum);
+
+ guchar *jpg_data = g_base64_decode(img, &len);
+
+ g_debug("Writing thumbnail to %s", thumbnail_path);
+ g_file_set_contents(thumbnail_path, (gchar*)jpg_data, len, NULL);
+
+ g_free (img);
+ g_free (jpg_data);
+ g_free (thumbnail_filename);
+ g_free (thumbnail_path);
+
+ return true;
+}
+
+static bool
invokeSetActor (NPObject *obj, const NPVariant *args, uint32_t argCount, NPVariant *result)
{
const NPString *np_s;
@@ -175,6 +237,7 @@ invoke(NPObject* obj, NPIdentifier methodName, const NPVariant *args, uint32_t a
name = npnfuncs->utf8fromidentifier(methodName);
+ g_debug("Calling %s", name);
if(name)
{
if (!strcmp (name, "insertEvent"))
@@ -185,6 +248,10 @@ invoke(NPObject* obj, NPIdentifier methodName, const NPVariant *args, uint32_t a
{
return invokeSetActor(obj, args, argCount, result);
}
+ else if (!strcmp (name, "saveSnapshot"))
+ {
+ return invokeSaveSnapshot(obj, args, argCount, result);
+ }
}
npnfuncs->setexception(obj, "exception during invocation");
diff --git a/telepathy/zeitgeist-telepathy-observer b/telepathy/zeitgeist-telepathy-observer
index 403f930..7bf8b7c 100755
--- a/telepathy/zeitgeist-telepathy-observer
+++ b/telepathy/zeitgeist-telepathy-observer
@@ -1,547 +1,322 @@
#!/usr/bin/env python
-#
-# This observer tracks text conversation and reports events to the Zeitgeist
-# engine. Eventually, it will report file transfers and media streams
-#
-# ZConnections and ZChannels are tracked by the ZObserver, their invalidation
-# is signalled by GObject signals. ZObserver.ObserveChannels uses asynchronous
-# return functions to only return from the D-Bus method once all of the channels
-# are prepared, this gives the Observer time to investigate the pending message
-# queue before any Handlers can acknowledge it.
-#
-# Authors: Danielle Madeley <danielle.madeley@collabora.co.uk>
-# Morten Mjelva <morten.mjelva@gmail.com>
-#
-import dbus, dbus.glib
-import gobject
-
-import sys
-
-from time import time
-
-import logging
-logging.basicConfig(level=logging.DEBUG)
-
-import telepathy
-from telepathy.constants import CONNECTION_STATUS_DISCONNECTED, \
- HANDLE_TYPE_CONTACT
-
-from telepathy.interfaces import CHANNEL, \
- CHANNEL_TYPE_FILE_TRANSFER, \
- CHANNEL_TYPE_STREAMED_MEDIA, \
- CHANNEL_TYPE_TEXT, \
- CLIENT, \
- CLIENT_OBSERVER, \
- CONNECTION, \
- CONNECTION_INTERFACE_ALIASING, \
- CONNECTION_INTERFACE_CONTACTS
+from gi.repository import TelepathyGLib as Tp
+from gi.repository import GObject, Gio
from zeitgeist.client import ZeitgeistClient
from zeitgeist.datamodel import Event, Subject, Interpretation, Manifestation
-
-def error(*args):
- logging.error("error %s" % args)
-
-# Create a connection to the Zeitgeist engine
-try:
- zclient = ZeitgeistClient()
-except RuntimeError, e:
- logging.error("Unable to connect to Zeitgeist. Won't send events. Reason: \
- %s" % e)
- zclient = None
-
-class ZConnection(gobject.GObject, telepathy.client.Connection):
- """
- Extends telepathy.client.Connection, storing information about proxy obj
-
- Object extends tp.Connection object, letting us store information about the
- connection proxy locally.
- Arguments:
- path: Unique path to this connection
- ready_handler: Callback function run once we're done setting up object
- """
- __gsignals__ = {
- 'disconnected': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, ()),
- }
-
- def __init__(self, path, ready_handler):
- service_name = path.replace('/', '.')[1:]
-
- # Ready callback
- self.ready_handler = ready_handler
- self.signals = []
-
- gobject.GObject.__init__(self)
- telepathy.client.Connection.__init__(self, service_name, path,
- ready_handler = self._connection_ready)
-
- def __repr__(self):
- return "ZConnection(%s)" % self.object_path
-
- def _status_changed(self, status, reason):
- if status == CONNECTION_STATUS_DISCONNECTED:
- for signal in self.signals:
- signal.remove()
-
- self.emit('disconnected')
-
- def _connection_ready(self, conn):
- def interfaces(interfaces):
- self.contact_attr_interfaces = interfaces
-
- # Connection ready
- self.ready_handler(conn)
-
- # Get contact attribute interfaces
- self[dbus.PROPERTIES_IFACE].Get(CONNECTION_INTERFACE_CONTACTS,
- 'ContactAttributeInterfaces',
- reply_handler=interfaces,
- error_handler=error)
-
- self.signals.append(self[CONNECTION].connect_to_signal(
- 'StatusChanged', self._status_changed))
-
- # This is necessary so our signal isn't sent over D-Bus
- def do_disconnected(self):
- pass
-gobject.type_register(ZConnection)
-
-class ZChannel(gobject.GObject, telepathy.client.Channel):
- """
- Extends telepathy.client.Channel
-
- This class allows us to store information about a telepathy.client.Channel
- object locally.
- Arguments:
- account_path: Path to the account used. Passed to ObserveChannels.
- connection: The connection used for the channel
- properties: Properties of the channel we're proxying, from ObserveChannels
- ready_handler: Callback function run once we're done setting up object
- """
- __gsignals__ = {
- 'closed': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, ()),
- }
-
- def __init__(self, account_path, connection, path, properties, ready_handler):
- self.account_path = account_path
- self.conn = connection
- self.properties = properties
- self.ready_handler = ready_handler
-
- self.signals = []
-
- gobject.GObject.__init__(self)
- telepathy.client.Channel.__init__(self, connection.service_name, path,
- ready_handler=self._channel_ready)
-
- def __repr__(self):
- return "ZChannel(%s)" % self.object_path
-
- def _send_to_zeitgeist(self, subjects, event_details):
- event_details['subjects'] = subjects
- zevent = Event.new_for_values(**event_details)
- zclient.insert_event(zevent)
-
- def _channel_closed_cb(self):
- for signal in self.signals:
- signal.remove()
-
- self.emit('closed')
-
- def _default_operations_finished(self):
- self._release_channel()
-
- def _get_contact_attributes_cb(self, attributes_map):
- handle = self.properties[CHANNEL + '.TargetHandle']
+from zeitgeist.mimetypes import get_interpretation_for_mimetype
+
+import json
+
+GObject.threads_init()
+
+ZG_ACTOR = "dbus://org.freedesktop.Telepathy.Logger.service"
+TP_ACCOUNT_PATH = "x-telepathy-account-path:%s"
+TP_IDENTIFIER = "x-telepathy-identifier:%s"
+
+dbus = Tp.DBusDaemon.dup()
+zg_client = ZeitgeistClient()
+
+def callback (ids):
+ print ids
+
+def error_handler (error):
+ print error
+
+def create_event(account, channel):
+ target = channel.get_target_contact()
+ event_template = Event.new_for_values(
+ actor = ZG_ACTOR,
+ interpretation = Interpretation.ACCESS_EVENT,
+ manifestation = Manifestation.USER_ACTIVITY \
+ if channel.get_properties("requested") else Manifestation.WORLD_ACTIVITY,
+ origin = TP_ACCOUNT_PATH % account.get_object_path()[len(Tp.ACCOUNT_OBJECT_PATH_BASE):])
+ event_template.subjects.append(
+ Subject.new_for_values(
+ uri = "",
+ interpretation = Interpretation.IMMESSAGE,
+ manifestation = Manifestation.SOFTWARE_SERVICE,
+ mimetype = "plain/text",
+ origin = TP_IDENTIFIER % target.get_identifier(),
+ text = target.get_alias(),
+ storage = "net"))
+ event_template.subjects.append(
+ Subject.new_for_values(
+ uri = TP_IDENTIFIER % target.get_identifier(),
+ interpretation = Interpretation.CONTACT,
+ manifestation = Manifestation.CONTACT_LIST_DATA_OBJECT,
+ origin = TP_IDENTIFIER % target.get_identifier(),
+ text = target.get_alias(),
+ storage = "net"))
+ return event_template
+
+def print_channel(event):
+ print "Event:"
+ print " - timestamp:", event.timestamp
+ print " - actor", event.actor
+ print " - interpretation:", event.interpretation
+ print " - manifestation:", event.manifestation
+ print " - origin:", event.origin
+ print " - subjects:", len(event.subjects)
+ for i, subject in enumerate(event.subjects):
+ print " - subject %i:" % (i+1)
+ print " - uri:", subject.uri
+ print " - interpretation:", subject.interpretation
+ print " - manifestation:", subject.manifestation
+ print " - mimetype:", subject.mimetype
+ print " - origin:", subject.origin
+ print " - text:", subject.text
+ print " - storage:", subject.storage
+ print " -payload:", event.payload
+ zg_client.insert_events([event], callback, error_handler)
+
+"""
+Handling of text based events
+"""
+
+def msg_recv_callback (channel, message, account):
+ if message.is_delivery_report():
+ return
+ print "=== RECEIVED MSG ==="
+ event_template = create_event (account, channel)
+ event_template.interpretation = Interpretation.RECEIVE_EVENT
+ event_template.manifestation = Manifestation.WORLD_ACTIVITY
+ print_channel(event_template)
+
+def msg_sent_callback (channel, message, flags, token, account):
+ print "=== SENT MSG ==="
+ event_template = create_event (account, channel)
+ event_template.interpretation = Interpretation.SEND_EVENT
+ event_template.manifestation = Manifestation.USER_ACTIVITY
+ print_channel(event_template)
+
+def channel_closed_callback (channel, domain, code, message, account):
+ print "=== CLOSED CHANNEL ==="
+ event_template = create_event (account, channel)
+ event_template.interpretation = Interpretation.LEAVE_EVENT
+ print_channel(event_template)
+
+def observe_textchannels(observer, account, connection, channels,
+ dispatch_op, requests, context, user_data):
+ try:
+ for channel in channels:
+ target = channel.get_target_contact()
+ if not target:
+ continue
+ event_template = create_event(account, channel)
+ print "=== CREATED CHANNEL ==="
+ print_channel(event_template)
+ for message in channel.get_pending_messages():
+ msg_recv_callback(channel, message, account)
+ event_template = create_event(account, channel)
+ channel.connect('invalidated', channel_closed_callback, account)
+ channel.connect('message-received', msg_recv_callback, account)
+ channel.connect('message-sent', msg_sent_callback, account)
+ finally:
+ context.accept()
+
+factory = Tp.AutomaticClientFactory.new (dbus)
+
+factory.add_channel_features ([Tp.Channel.get_feature_quark_contacts()])
+factory.add_contact_features ([Tp.ContactFeature.ALIAS])
+text_observer = Tp.SimpleObserver.new_with_factory(factory, True,
+ 'ZeitgeistTextObserver', True, observe_textchannels, None)
+text_observer.add_observer_filter({
+ Tp.PROP_CHANNEL_CHANNEL_TYPE: Tp.IFACE_CHANNEL_TYPE_TEXT,
+ Tp.PROP_CHANNEL_TARGET_HANDLE_TYPE: int(Tp.HandleType.CONTACT),
+
+})
+text_observer.register()
+
+
+"""
+Handling of call based events
+"""
+
+call_timers = {}
+
+def create_call_event(account, channel):
+ targets = channel.get_members()
+ if not targets:
+ return
+ event_template = Event.new_for_values(
+ actor = ZG_ACTOR,
+ interpretation = Interpretation.ACCESS_EVENT,
+ manifestation = Manifestation.USER_ACTIVITY \
+ if channel.get_property("requested") else Manifestation.WORLD_ACTIVITY,
+ origin = TP_ACCOUNT_PATH % account.get_object_path()[len(Tp.ACCOUNT_OBJECT_PATH_BASE):])
+ for i, target in enumerate(targets):
+ if i == 0:
+ event_template.subjects.append (
+ Subject.new_for_values(
+ uri = "",
+ interpretation = Interpretation.MEDIA.AUDIO,
+ manifestation = Manifestation.MEDIA_STREAM,
+ mimetype = "x-telepathy/call",
+ origin = TP_IDENTIFIER % target.get_identifier(),
+ text = target.get_alias(),
+ storage = "net"))
+ event_template.subjects.append (
+ Subject.new_for_values(
+ uri = TP_IDENTIFIER % target.get_identifier(),
+ interpretation = Interpretation.CONTACT,
+ manifestation = Manifestation.CONTACT_LIST_DATA_OBJECT,
+ origin = TP_IDENTIFIER % target.get_identifier(),
+ text = target.get_alias(),
+ storage = "net"))
+ return event_template
+
+
+def call_state_changed (channel, state, flags, reason, details, account):
+ #FIXME: Something breaks this
+ made_by_user = False
+ if reason.actor == channel.get_property("connection").get_self_handle():
+ made_by_user = True
+ #print "User Data:", user_data
+ #print "Event Template: ", event_template
+ if state in (3, 5, 6):
+ event_template = create_call_event (account, channel)
+ event_template.manifestation = Manifestation.USER_ACTIVITY if made_by_user \
+ else Manifestation.WORLD_ACTIVITY
+
+ if state == Tp.CallState.INITIALISED:
+ event_template.interpretation = Interpretation.CREATE_EVENT
+ call_timers[channel] = 0
+ print_channel(event_template)
+
+ elif state == Tp.CallState.ACTIVE:
+ event_template.interpretation = Interpretation.ACCESS_EVENT
+ call_timers[channel] = int(event_template.timestamp)
+ print_channel(event_template)
+
+ elif state == Tp.CallState.ENDED:
+ if call_timers.has_key(channel):
+ event_template.interpretation = Interpretation.LEAVE_EVENT
+ if reason.reason.numerator == Tp.CallStateChangeReason.REJECTED:
+ event_template.interpretation = Interpretation.DENY_EVENT
+ elif reason.reason.numerator == Tp.CallStateChangeReason.NO_ANSWER:
+ event_template.interpretation = Interpretation.EXPIRE_EVENT
+
+ duration = 0 if not call_timers.has_key(channel) \
+ else int(event_template.timestamp) - call_timers[channel]
+ details = {"http://zeitgeist-project.com/1.0/telepathy/call": {
+ "state": channel.get_state()[0].numerator,
+ "reason": channel.get_state()[1].numerator,
+ "requested": channel.get_property("requested"),
+ "host": TP_ACCOUNT_PATH % account.get_object_path()[len(Tp.ACCOUNT_OBJECT_PATH_BASE):] \
+ if channel.get_property("requested") else event_template.subjects[1].uri,
+ "receiver": TP_IDENTIFIER % event_template.subjects[1].uri \
+ if channel.get_property("requested") \
+ else TP_ACCOUNT_PATH % account.get_object_path()[len(Tp.ACCOUNT_OBJECT_PATH_BASE):],
+ "duration": duration
+ }}
+ details = json.dumps(details)
+ event_template.payload = details.encode("utf-8")
+ del call_timers[channel]
+ print_channel(event_template)
+
+def observe_callchannels(observer, account, connection, channels,
+ dispatch_op, requests, context, user_data):
try:
- attributes = attributes_map[handle]
- self._target_alias = attributes[CONNECTION_INTERFACE_ALIASING +
- '/alias']
- except KeyError:
- self._target_alias = self._target_id
-
- self._connect_to_signals()
- self._default_operations_finished()
-
- def _get_requested_cb(self, req):
- handle = self.properties[CHANNEL + '.TargetHandle']
- self._channel_requested = req
-
- # Get contact name
- self.conn[CONNECTION_INTERFACE_CONTACTS].GetContactAttributes(
- [handle],
- # assuming this is available is technically wrong
- [ CONNECTION_INTERFACE_ALIASING ],
- False,
- reply_handler=self._get_contact_attributes_cb,
- error_handler=error)
-
- def _release_channel(self):
- # Finished with critical tasks.
- self.ready_handler(self)
-
- def _connect_to_signals(self):
- # Connect to signals
- self.signals.append(self[CHANNEL].connect_to_signal('Closed',
- self._channel_closed_cb))
-
- # Called when channel proxy becomes ready. Chains method calls so actions
- # which depend on each other are executed in order.
- def _channel_ready(self, channel):
- self._target_id = self.properties[CHANNEL + '.TargetID']
-
- # Get contact attribute interfaces
- self[dbus.PROPERTIES_IFACE].Get(CHANNEL,
- 'Requested',
- reply_handler=self._get_requested_cb,
- error_handler=error)
-
- # This is necessary so our signal isn't sent over D-Bus
- def do_closed(self):
- pass
-gobject.type_register(ZChannel)
-
-class ZTextChannel(ZChannel):
- def __init__(self, account_path, connection, path, properties, ready_handler):
- ZChannel.__init__(self, account_path, connection, path, properties, ready_handler)
-
- self._subject = None
-
- def __repr__(self):
- return "ZTextChannel(%s)" % self.object_path
-
- def _release_channel(self):
- # Finished with critical tasks.
- self.ready_handler(self)
-
- # Report to zeitgeist
- if not self._subject:
- self._subject = [Subject.new_for_values(
- uri = "telepathy://" + self.account_path + "/" + self._target_id,
- interpretation = unicode(Interpretation.IMMESSAGE),
- manifestation = unicode(Manifestation.SOFTWARE_SERVICE),
- origin = self.account_path,
- mimetype = "text/plain",
- text = self._target_alias)]
-
- if self._channel_requested:
- manifestation = unicode(Manifestation.USER_ACTIVITY)
- else:
- manifestation = unicode(Manifestation.WORLD_ACTIVITY)
-
- timestamp = int(time() * 1000)
-
- event = Event.new_for_values(
- timestamp = timestamp,
- interpretation = unicode(Interpretation.ACCESS_EVENT),
- manifestation = manifestation,
- actor = self.account_path,
- subjects = self._subject)
- zclient.insert_event(event)
-
- def _channel_closed_cb(self):
- for signal in self.signals:
- signal.remove()
-
- timestamp = int(time() * 1000)
-
- event = Event.new_for_values(
- timestamp = timestamp,
- interpretation = unicode(Interpretation.LEAVE_EVENT),
- manifestation = unicode(Manifestation.USER_ACTIVITY),
- actor = self.account_path,
- subjects = self._subject)
- zclient.insert_event(event)
-
- self.emit('closed')
-
- def _get_pending_messages_cb(self, messages):
- for message in messages:
- self._message_received_cb(*message)
-
- self._release_channel()
-
- def _default_operations_finished(self):
- self[CHANNEL_TYPE_TEXT].ListPendingMessages(
- False,
- reply_handler=self._get_pending_messages_cb,
- error_handler=error)
-
- def _message_received_cb(self, identification, timestamp, sender, contenttype,
- flags, content):
- logging.debug("Message received")
- if not self._subject:
- self._subject = [Subject.new_for_values(
- uri = "telepathy://" + self.account_path + "/" + self._target_id,
- interpretation = unicode(Interpretation.IMMESSAGE),
- manifestation = unicode(Manifestation.SOFTWARE_SERVICE),
- origin = self.account_path,
- mimetype = "text/plain",
- text = self._target_alias)]
-
- timestamp = timestamp * 1000
-
- event = Event.new_for_values(
- timestamp = timestamp,
- interpretation = unicode(Interpretation.RECEIVE_EVENT),
- manifestation = unicode(Manifestation.WORLD_ACTIVITY),
- actor = self.account_path,
- subjects = self._subject)
- zclient.insert_event(event)
-
- def _message_sent_cb(self, timestamp, contenttype, content):
- logging.debug("Message sent")
-
- timestamp = timestamp * 1000
-
- event = Event.new_for_values(
- timestamp = timestamp,
- interpretation = unicode(Interpretation.SEND_EVENT),
- manifestation = unicode(Manifestation.USER_ACTIVITY),
- actor = self.account_path,
- subjects = self._subject)
- zclient.insert_event(event)
-
- def _connect_to_signals(self):
- # Connect to signals
- self.signals.append(self[CHANNEL_TYPE_TEXT].connect_to_signal('Sent',
- self._message_sent_cb))
- self.signals.append(self[CHANNEL_TYPE_TEXT].connect_to_signal('Received',
- self._message_received_cb))
- self.signals.append(self[CHANNEL].connect_to_signal('Closed',
- self._channel_closed_cb))
-
-class ZStreamedMediaChannel(ZChannel):
- def __init__(self, account_path, connection, path, properties, ready_handler):
- self._audio_subject = None
- self._video_subject = None
-
- self.stream_cache = {}
-
- ZChannel.__init__(self, account_path, connection, path, properties, ready_handler)
-
- def __repr__(self):
- return "ZStreamedMediaChannel(%s)" % self.object_path
-
- def _channel_closed_cb(self):
- for signal in self.signals:
- signal.remove()
-
- self.emit('closed')
-
- def _stream_added_cb(self, streamid, handle, streamtype):
- logging.debug("Stream added. Streamtype: " + str(streamtype))
- self.stream_cache[streamid] = {'handle': handle, 'streamtype': streamtype}
-
- if not self._audio_subject:
- self._audio_subject = [Subject.new_for_values(
- uri = "telepathy://" + self.account_path + "/" + self._target_id,
- interpretation = unicode(Interpretation.AUDIO),
- manifestation = unicode(Manifestation.MEDIA_STREAM),
- origin = self.account_path,
- mimetype = "text/plain",
- text = self._target_alias)]
-
- if not self._video_subject:
- self._video_subject = [Subject.new_for_values(
- uri = "telepathy://" + self.account_path + "/" + self._target_id,
- interpretation = unicode(Interpretation.VIDEO),
- manifestation = unicode(Manifestation.MEDIA_STREAM),
- origin = self.account_path,
- mimetype = "text/plain",
- text = self._target_alias)]
-
- if streamtype == 0:
- subject = self._audio_subject
- elif streamtype == 1:
- subject = self._video_subject
-
- if self._channel_requested:
- manifestation = unicode(Manifestation.USER_ACTIVITY)
- else:
- manifestation = unicode(Manifestation.WORLD_ACTIVITY)
-
- timestamp = int(time() * 1000)
- event = Event.new_for_values(
- timestamp = timestamp,
- interpretation = unicode(Interpretation.ACCESS_EVENT),
- manifestation = manifestation,
- actor = self.account_path,
- subjects = subject)
- zclient.insert_event(event)
-
-
- def _stream_removed_cb(self, streamid):
- streamtype = self.stream_cache[streamid]['streamtype']
- logging.debug("Stream removed. Streamtype: " + str(streamtype))
-
- if streamtype == 0:
- subject = self._audio_subject
- elif streamtype == 1:
- subject = self._video_subject
-
- timestamp = int(time() * 1000)
- event = Event.new_for_values(
- timestamp = timestamp,
- interpretation = unicode(Interpretation.LEAVE_EVENT),
- manifestation = unicode(Manifestation.USER_ACTIVITY),
- actor = self.account_path,
- subject = subject)
- zclient.insert_event(event)
-
- del self.stream_cache[streamid]
-
- def _list_streams_cb(self, streams):
- for stream in streams:
- self._stream_added_cb(stream[0], stream[1], stream[2])
-
- def _default_operations_finished(self):
- self._release_channel()
-
- self[CHANNEL_TYPE_STREAMED_MEDIA].ListStreams(
- reply_handler=self._list_streams_cb,
- error_handler=error)
-
- def _connect_to_signals(self):
- # Connect to signals
- self.signals.append(self[CHANNEL_TYPE_STREAMED_MEDIA].connect_to_signal('StreamAdded',
- self._stream_added_cb))
- self.signals.append(self[CHANNEL_TYPE_STREAMED_MEDIA].connect_to_signal('StreamRemoved',
- self._stream_removed_cb))
- self.signals.append(self[CHANNEL].connect_to_signal('Closed',
- self._channel_closed_cb))
-
-class ZObserver(telepathy.server.Observer,
- telepathy.server.DBusProperties):
- """
- Extends telepathy.server.Observer, listening for events
-
-
- ZObserver listens for a channel matching the filter is opened by telepathy,
- and reacts by opening connection and channel proxies in order to liisten in
- on events in the channel.
- """
-
- def __init__(self, client_name, enable_stream_observer=False,
- enable_text_observer=False):
-
- service_name = '.'.join ([CLIENT, client_name])
- object_path = '/' + service_name.replace('.', '/')
-
- bus_name = dbus.service.BusName(service_name, bus=dbus.SessionBus())
-
- telepathy.server.Observer.__init__(self, bus_name, object_path)
- telepathy.server.DBusProperties.__init__(self)
-
- self._implement_property_get(CLIENT, {
- 'Interfaces': lambda: [ CLIENT_OBSERVER ],
- })
-
-
- filter_array = dbus.Array([], signature='a{sv}')
-
- if enable_stream_observer:
- filter_array.append(dbus.Dictionary({
- CHANNEL + '.ChannelType': CHANNEL_TYPE_STREAMED_MEDIA,
- CHANNEL + '.TargetHandleType': HANDLE_TYPE_CONTACT
- }, signature='sv'))
-
- if enable_text_observer:
- filter_array.append(dbus.Dictionary({
- CHANNEL + '.ChannelType': CHANNEL_TYPE_TEXT,
- CHANNEL + '.TargetHandleType': HANDLE_TYPE_CONTACT
- }, signature='sv'))
-
- self._implement_property_get(CLIENT_OBSERVER, {
- 'ObserverChannelFilter': lambda: filter_array})
-
- self.connection_cache = {}
- self.channel_cache = {}
-
- # Set async callbacks so ObserveChannels won't return until we're finished
- @dbus.service.method(CLIENT_OBSERVER,
- in_signature='ooa(oa{sv})oaoa{sv}', out_signature='',
- async_callbacks=('_success', '_error'))
- def ObserveChannels(self, account, conn, channels, dispatch_operation,
- requests_satisfied, observer_info, _success, _error):
-
- # List of channels we're waiting to finish before we release the bus
- # with _success()
- pending_channels = []
-
- # Create ZChannel objects for the current connection
- def create_channels(conn):
- for path, properties in channels:
- if path not in self.channel_cache:
- chantype = properties[CHANNEL + '.ChannelType']
- if chantype == CHANNEL_TYPE_STREAMED_MEDIA:
- chan = ZStreamedMediaChannel(account, conn, path, \
- properties, channel_ready)
- elif chantype == CHANNEL_TYPE_TEXT:
- chan = ZTextChannel(account, conn, path, \
- properties, channel_ready)
-
- pending_channels.append(chan)
- self.channel_cache[path] = chan
-
-
- # Called when the channel proxy is invalidated
- def channel_closed(chan):
- try:
- del self.channel_cache[chan.object_path]
- logging.debug("Channel closed: %s" % chan)
- except:
- logging.error("Couldn't delete channel: %s" % chan)
-
- # No more channels means we aren't needed.
- # Telepathy will restart us when we are
- if not self.channel_cache:
- logging.warning("No channels left in list. Exiting")
- sys.exit()
-
- # Called when ZChannel object has finished its tasks
- # Removes channel from pending queue, and releases the connection
- # if all channeled have been processed
- def channel_ready(chan):
- logging.debug("Channel ready: %s" % chan)
-
- pending_channels.remove(chan)
- chan.connect('closed', channel_closed)
-
- if not pending_channels:
- _success()
-
- # Called when the connection proxy is invalidated
- def connection_closed(conn):
- logging.debug("Connection closed: %s" % conn)
- del self.connection_cache[conn.object_path]
-
- # Called when ZConnection object has finished its tasks
- # Connects to 'disconnected' signal to listen for connection
- # invalidation. Triggers creation of ZChannel objects for the
- # connection
- def connection_ready(conn):
- logging.debug("Connection ready: %s" % conn)
- conn.connect('disconnected', connection_closed)
- create_channels(conn)
-
- # If the connection exists in cache, call the handler manually to
- # set up channels. Otherwise, create the connection and cache it.
- if conn not in self.connection_cache:
- try:
- logging.debug("Trying to create connection...")
- self.connection_cache[conn] = ZConnection(conn, connection_ready)
- except:
- logging.error("Connection creation failed")
- else:
- logging.debug("Connection exists: %s" % conn)
- create_channels(self.connection_cache[conn])
-
-def main():
- ZObserver("Zeitgeist", enable_stream_observer=True,
- enable_text_observer=True)
-
-if __name__ == '__main__':
- gobject.timeout_add(0, main)
- loop = gobject.MainLoop()
- loop.run()
+ for channel in channels:
+ if channel.get_state() == Tp.CallState.INITIALISED:
+ event_template.interpretation = Interpretation.CREATE_EVENT
+ call_timers[channel] = 0
+ print_channel(event_template)
+ channel.connect("state-changed", call_state_changed, account)
+ finally:
+ context.accept()
+
+factory = Tp.AutomaticClientFactory.new (dbus)
+factory.add_channel_features ([Tp.Channel.get_feature_quark_contacts()])
+factory.add_contact_features ([Tp.ContactFeature.ALIAS])
+
+call_observer = Tp.SimpleObserver.new_with_factory(factory, True,
+ 'ZeitgeistCallObserver', True, observe_callchannels, None)
+call_observer.add_observer_filter({
+ Tp.PROP_CHANNEL_CHANNEL_TYPE: Tp.IFACE_CHANNEL_TYPE_CALL,
+ Tp.PROP_CHANNEL_TARGET_HANDLE_TYPE: int(Tp.HandleType.CONTACT),
+
+})
+call_observer.register()
+
+"""
+Handling of file transfer based events
+"""
+
+def ft_state_changed (channel, state, account):
+ state = channel.get_state()[0]
+ target = channel.get_target_contact()
+ print state, ZG_ACTOR
+ if state in (4,5):
+ # get attributes of the file being sent or received
+ attr = (Gio.FILE_ATTRIBUTE_STANDARD_DISPLAY_NAME,
+ Gio.FILE_ATTRIBUTE_STANDARD_CONTENT_TYPE,
+ Gio.FILE_ATTRIBUTE_STANDARD_SIZE )
+ info = channel.get_property("file").query_info (",".join(attr),
+ Gio.FileQueryInfoFlags.NONE, None)
+ # setup new event to be sent to zeitgeist
+ event = Event.new_for_values (
+ interpretation = Interpretation.SEND_EVENT \
+ if channel.get_property("requested") else Interpretation.RECEIVE_EVENT,
+ manifestation = Manifestation.USER_ACTIVITY \
+ if channel.get_property("requested") else Manifestation.WORLD_ACTIVITY,
+ actor = ZG_ACTOR,
+ origin = TP_ACCOUNT_PATH % account.get_object_path()[35:])
+ subject = Subject.new_for_values (
+ uri = channel.get_property("file").get_uri(),
+ interpretation = get_interpretation_for_mimetype(info.get_content_type()),
+ manifestation = Manifestation.FILE_DATA_OBJECT \
+ if channel.get_property("requested") else Manifestation.FILE_DATA_OBJECT.REMOTE_DATA_OBJECT,
+ text = info.get_display_name(),
+ mimetype = info.get_content_type(),
+ origin = "/".join(channel.get_property("file").get_uri().split("/")[:-1])+"/" \
+ if channel.get_property("requested") else TP_IDENTIFIER % target.get_identifier())
+ event.subjects.append(subject)
+ subject = Subject.new_for_values (
+ uri = TP_IDENTIFIER % target.get_identifier(),
+ interpretation = Interpretation.CONTACT.PERSON_CONTACT,
+ manifestation = Manifestation.CONTACT_LIST_DATA_OBJECT,
+ origin = TP_IDENTIFIER % target.get_identifier(),
+ text = target.get_alias(),
+ storage = "net")
+ event.subjects.append(subject)
+ details = {"http://zeitgeist-project.com/1.0/telepathy/filetransfer": {
+ "state": channel.get_state()[0].numerator,
+ "reason": channel.get_state()[1].numerator,
+ "requested": channel.get_property("requested"),
+ "sender": TP_ACCOUNT_PATH % account.get_object_path()[35:] \
+ if channel.get_property("requested") else TP_IDENTIFIER % target.get_identifier(),
+ "receiver": TP_IDENTIFIER % target.get_identifier() \
+ if channel.get_property("requested") else TP_ACCOUNT_PATH % account.get_object_path()[35:],
+ "mimetype": info.get_content_type(),
+ "date": channel.get_date().to_unix(),
+ "description": channel.get_description(),
+ "size": channel.get_size(),
+ "service": channel.get_service_name(),
+ "uri": channel.get_property("file").get_uri()
+ }}
+ details = json.dumps(details)
+ event.payload = details.encode("utf-8")
+ print_channel(event)
+
+def observe_ftchannels(observer, account, connection, channels,
+ dispatch_op, requests, context, user_data):
+ context.accept()
+ print "FT"
+ for channel in channels:
+ channel.connect("notify::state", ft_state_changed, account)
+
+ft_observer = Tp.SimpleObserver.new(dbus, True, 'ZeitgeistFTObserver', True,
+ observe_ftchannels, None)
+ft_observer.add_observer_filter({
+ Tp.PROP_CHANNEL_CHANNEL_TYPE: Tp.IFACE_CHANNEL_TYPE_FILE_TRANSFER,
+})
+ft_observer.register()
+
+"""
+Start the mainloop
+"""
+
+main_loop = GObject.MainLoop()
+main_loop.run()