/** * This file is part of TelepathyQt * * @copyright Copyright (C) 2011 Collabora Ltd. * @copyright Copyright (C) 2011 Nokia Corporation * @license LGPL 2.1 * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include "TelepathyQt/simple-observer-internal.h" #include "TelepathyQt/_gen/simple-observer.moc.hpp" #include "TelepathyQt/_gen/simple-observer-internal.moc.hpp" #include "TelepathyQt/debug-internal.h" #include #include #include #include #include #include #include #include #include namespace Tp { QHash >, WeakPtr > SimpleObserver::Private::observers; uint SimpleObserver::Private::numObservers = 0; SimpleObserver::Private::Private(SimpleObserver *parent, const AccountPtr &account, const ChannelClassSpecList &channelFilter, const QString &contactIdentifier, bool requiresNormalization, const QList &extraChannelFeatures) : parent(parent), account(account), channelFilter(channelFilter), contactIdentifier(contactIdentifier), extraChannelFeatures(extraChannelFeatures) { QSet normalizedChannelFilter = channelFilter.toSet(); QPair > observerUniqueId( account->dbusConnection().baseService(), normalizedChannelFilter); observer = SharedPtr(observers.value(observerUniqueId)); if (!observer) { SharedPtr fakeAccountFactory = FakeAccountFactory::create( account->dbusConnection()); cr = ClientRegistrar::create( AccountFactoryPtr::qObjectCast(fakeAccountFactory), account->connectionFactory(), account->channelFactory(), account->contactFactory()); QString observerName = QString(QLatin1String("TpQtSO_%1_%2")) .arg(account->dbusConnection().baseService() .replace(QLatin1String(":"), QLatin1String("_")) .replace(QLatin1String("."), QLatin1String("_"))) .arg(numObservers++); observer = SharedPtr(new Observer(cr, fakeAccountFactory, normalizedChannelFilter.toList(), observerName)); if (!cr->registerClient(observer, observerName, false)) { warning() << "Unable to register observer" << observerName; observer.reset(); cr.reset(); return; } debug() << "Observer" << observerName << "registered"; observers.insert(observerUniqueId, observer); } else { debug() << "Observer" << observer->observerName() << "already registered and matches filter, using it"; cr = ClientRegistrarPtr(observer->clientRegistrar()); } observer->registerExtraChannelFeatures(extraChannelFeatures); observer->registerAccount(account); if (contactIdentifier.isEmpty() || !requiresNormalization) { normalizedContactIdentifier = contactIdentifier; } else { parent->connect(account.data(), SIGNAL(connectionChanged(Tp::ConnectionPtr)), SLOT(onAccountConnectionChanged(Tp::ConnectionPtr))); } parent->connect(observer.data(), SIGNAL(newChannels(Tp::AccountPtr,QList)), SLOT(onNewChannels(Tp::AccountPtr,QList))); parent->connect(observer.data(), SIGNAL(channelInvalidated(Tp::AccountPtr,Tp::ChannelPtr,QString,QString)), SLOT(onChannelInvalidated(Tp::AccountPtr,Tp::ChannelPtr,QString,QString))); } bool SimpleObserver::Private::filterChannel(const AccountPtr &channelAccount, const ChannelPtr &channel) { if (channelAccount != account) { return false; } if (contactIdentifier.isEmpty()) { return true; } QString targetId = channel->immutableProperties().value( TP_QT_IFACE_CHANNEL + QLatin1String(".TargetID")).toString(); if (targetId != normalizedContactIdentifier) { // we didn't filter per contact, let's filter here return false; } return true; } void SimpleObserver::Private::insertChannels(const AccountPtr &channelsAccount, const QList &newChannels) { QSet match; foreach (const ChannelPtr &channel, newChannels) { if (!channels.contains(channel) && filterChannel(channelsAccount, channel)) { match.insert(channel); } } if (match.isEmpty()) { return; } channels.unite(match); emit parent->newChannels(match.toList()); } void SimpleObserver::Private::removeChannel(const AccountPtr &channelAccount, const ChannelPtr &channel, const QString &errorName, const QString &errorMessage) { if (!channels.contains(channel) || !filterChannel(channelAccount, channel)) { return; } channels.remove(channel); emit parent->channelInvalidated(channel, errorName, errorMessage); } void SimpleObserver::Private::processChannelsQueue() { if (channelsQueue.isEmpty()) { return; } while (!channelsQueue.isEmpty()) { (this->*(channelsQueue.dequeue()))(); } } void SimpleObserver::Private::processNewChannelsQueue() { NewChannelsInfo info = newChannelsQueue.dequeue(); insertChannels(info.channelsAccount, info.channels); } void SimpleObserver::Private::processChannelsInvalidationQueue() { ChannelInvalidationInfo info = channelsInvalidationQueue.dequeue(); removeChannel(info.channelAccount, info.channel, info.errorName, info.errorMessage); } SimpleObserver::Private::Observer::Observer(const WeakPtr &cr, const SharedPtr &fakeAccountFactory, const ChannelClassSpecList &channelFilter, const QString &observerName) : AbstractClient(), QObject(), AbstractClientObserver(channelFilter, true), mCr(cr), mFakeAccountFactory(fakeAccountFactory), mObserverName(observerName) { } SimpleObserver::Private::Observer::~Observer() { // no need to delete channel wrappers here as they have 'this' as parent // no need to delete context infos here as this observer will never be deleted before all // PendingComposites finish (hold reference to this), which will properly delete them // no need to unregister this observer here as the client registrar destructor will // unregister it } void SimpleObserver::Private::Observer::observeChannels( const MethodInvocationContextPtr<> &context, const AccountPtr &account, const ConnectionPtr &connection, const QList &channels, const ChannelDispatchOperationPtr &dispatchOperation, const QList &requestsSatisfied, const ObserverInfo &observerInfo) { if (!mAccounts.contains(account)) { context->setFinished(); return; } QList readyOps; QList newChannels; foreach (const ChannelPtr &channel, channels) { if (mIncompleteChannels.contains(channel) || mChannels.contains(channel)) { // we are already observing this channel continue; } // this shouldn't happen, but in any case if (!channel->isValid()) { warning() << "Channel received to observe is invalid. " "Ignoring channel"; continue; } SimpleObserver::Private::ChannelWrapper *wrapper = new SimpleObserver::Private::ChannelWrapper(account, channel, featuresFor(ChannelClassSpec(channel->immutableProperties())), this); mIncompleteChannels.insert(channel, wrapper); connect(wrapper, SIGNAL(channelInvalidated(Tp::AccountPtr,Tp::ChannelPtr,QString,QString)), SLOT(onChannelInvalidated(Tp::AccountPtr,Tp::ChannelPtr,QString,QString))); newChannels.append(channel); readyOps.append(wrapper->becomeReady()); } if (readyOps.isEmpty()) { context->setFinished(); return; } PendingComposite *pc = new PendingComposite(readyOps, false /* failOnFirstError */, SharedPtr(this)); mObserveChannelsInfo.insert(pc, new ContextInfo(context, account, newChannels)); connect(pc, SIGNAL(finished(Tp::PendingOperation*)), SLOT(onChannelsReady(Tp::PendingOperation*))); } void SimpleObserver::Private::Observer::onChannelInvalidated(const AccountPtr &channelAccount, const ChannelPtr &channel, const QString &errorName, const QString &errorMessage) { if (mIncompleteChannels.contains(channel)) { // we are still handling the channel, wait for onChannelsReady that will properly remove // it from mChannels return; } emit channelInvalidated(channelAccount, channel, errorName, errorMessage); Q_ASSERT(mChannels.contains(channel)); delete mChannels.take(channel); } void SimpleObserver::Private::Observer::onChannelsReady(PendingOperation *op) { ContextInfo *info = mObserveChannelsInfo.value(op); foreach (const ChannelPtr &channel, info->channels) { Q_ASSERT(mIncompleteChannels.contains(channel)); ChannelWrapper *wrapper = mIncompleteChannels.take(channel); mChannels.insert(channel, wrapper); } emit newChannels(info->account, info->channels); foreach (const ChannelPtr &channel, info->channels) { ChannelWrapper *wrapper = mChannels.value(channel); if (!channel->isValid()) { mChannels.remove(channel); emit channelInvalidated(info->account, channel, channel->invalidationReason(), channel->invalidationMessage()); delete wrapper; } } mObserveChannelsInfo.remove(op); info->context->setFinished(); delete info; } Features SimpleObserver::Private::Observer::featuresFor( const ChannelClassSpec &channelClass) const { Features features; foreach (const ChannelClassFeatures &spec, mExtraChannelFeatures) { if (spec.first.isSubsetOf(channelClass)) { features.unite(spec.second); } } return features; } SimpleObserver::Private::ChannelWrapper::ChannelWrapper(const AccountPtr &channelAccount, const ChannelPtr &channel, const Features &extraChannelFeatures, QObject *parent) : QObject(parent), mChannelAccount(channelAccount), mChannel(channel), mExtraChannelFeatures(extraChannelFeatures) { connect(channel.data(), SIGNAL(invalidated(Tp::DBusProxy*,QString,QString)), SLOT(onChannelInvalidated(Tp::DBusProxy*,QString,QString))); } PendingOperation *SimpleObserver::Private::ChannelWrapper::becomeReady() { PendingOperation *op; if (!mChannel->isReady(mExtraChannelFeatures)) { // The channel factory passed to the Account used by SimpleObserver does // not contain the extra features, request them op = mChannel->becomeReady(mExtraChannelFeatures); } else { op = new PendingSuccess(mChannel); } return op; } void SimpleObserver::Private::ChannelWrapper::onChannelInvalidated(DBusProxy *proxy, const QString &errorName, const QString &errorMessage) { Q_ASSERT(proxy == mChannel.data()); emit channelInvalidated(mChannelAccount, mChannel, errorName, errorMessage); } /** * \class SimpleObserver * \ingroup utils * \headerfile TelepathyQt/simple-observer.h * * \brief The SimpleObserver class provides an easy way to track channels * in an account and can be optionally filtered by a contact. */ /** * Create a new SimpleObserver object. * * Events will be signalled for all channels in \a account that match * \a channelFilter for all contacts. * * \param channelFilter A specification of the channels in which this observer * is interested. * \param account The account used to listen to events. * \param extraChannelFeatures Extra channel features to be enabled. All channels emitted in * newChannels() will have the extra features that match their * immutable properties enabled. * \return An SimpleObserverPtr object pointing to the newly created SimpleObserver object. */ SimpleObserverPtr SimpleObserver::create( const AccountPtr &account, const ChannelClassSpecList &channelFilter, const QList &extraChannelFeatures) { return create(account, channelFilter, QString(), false, extraChannelFeatures); } /** * Create a new SimpleObserver object. * * Events will be signalled for all channels in \a account established with * \a contact, if not null, and that match \a channelFilter. * * \param channelFilter A specification of the channels in which this observer * is interested. * \param account The account used to listen to events. * \param contact The contact used to filter events. * \param extraChannelFeatures Extra channel features to be enabled. All channels emitted in * newChannels() will have the extra features that match their * immutable properties enabled. * \return An SimpleObserverPtr object pointing to the newly created SimpleObserver object. */ SimpleObserverPtr SimpleObserver::create( const AccountPtr &account, const ChannelClassSpecList &channelFilter, const ContactPtr &contact, const QList &extraChannelFeatures) { if (contact) { return create(account, channelFilter, contact->id(), false, extraChannelFeatures); } return create(account, channelFilter, QString(), false, extraChannelFeatures); } /** * Create a new SimpleObserver object. * * Events will be signalled for all channels in \a account established * with a contact identified by \a contactIdentifier, if non-empty, and that match * \a channelFilter. * * \param channelFilter A specification of the channels in which this observer * is interested. * \param account The account used to listen to events. * \param contactIdentifier The identifier of the contact used to filter events. * \param extraChannelFeatures Extra channel features to be enabled. All channels emitted in * newChannels() will have the extra features that match their * immutable properties enabled. * \return An SimpleObserverPtr object pointing to the newly created SimpleObserver object. */ SimpleObserverPtr SimpleObserver::create( const AccountPtr &account, const ChannelClassSpecList &channelFilter, const QString &contactIdentifier, const QList &extraChannelFeatures) { return create(account, channelFilter, contactIdentifier, true, extraChannelFeatures); } SimpleObserverPtr SimpleObserver::create( const AccountPtr &account, const ChannelClassSpecList &channelFilter, const QString &contactIdentifier, bool requiresNormalization, const QList &extraChannelFeatures) { return SimpleObserverPtr(new SimpleObserver(account, channelFilter, contactIdentifier, requiresNormalization, extraChannelFeatures)); } /** * Construct a new SimpleObserver object. * * \param cr The ClientRegistrar used to register this observer. * \param channelFilter The channel filter used by this observer. * \param account The account used to listen to events. * \param extraChannelFeatures Extra channel features to be enabled. All channels emitted in * newChannels() will have the extra features that match their * immutable properties enabled. * \return An SimpleObserverPtr object pointing to the newly created SimpleObserver object. */ SimpleObserver::SimpleObserver(const AccountPtr &account, const ChannelClassSpecList &channelFilter, const QString &contactIdentifier, bool requiresNormalization, const QList &extraChannelFeatures) : mPriv(new Private(this, account, channelFilter, contactIdentifier, requiresNormalization, extraChannelFeatures)) { if (mPriv->observer) { // populate our channels list with current observer channels QHash > channels; foreach (const Private::ChannelWrapper *wrapper, mPriv->observer->channels()) { channels[wrapper->channelAccount()].append(wrapper->channel()); } QHash >::const_iterator it = channels.constBegin(); QHash >::const_iterator end = channels.constEnd(); for (; it != end; ++it) { onNewChannels(it.key(), it.value()); } if (requiresNormalization) { debug() << "Contact id requires normalization. " "Queueing events until it is normalized"; onAccountConnectionChanged(account->connection()); } } } /** * Class destructor. */ SimpleObserver::~SimpleObserver() { delete mPriv; } /** * Return the account used to listen to events. * * \return A pointer to the Account object. */ AccountPtr SimpleObserver::account() const { return mPriv->account; } /** * Return a specification of the channels that this observer is interested. * * \return The specification of the channels as a list of ChannelClassSpec objects. */ ChannelClassSpecList SimpleObserver::channelFilter() const { return mPriv->channelFilter; } /** * Return the extra channel features to be enabled based on the channels immutable properties. * * \return The features as a list of ChannelClassFeatures objects. */ QList SimpleObserver::extraChannelFeatures() const { return mPriv->extraChannelFeatures; } /** * Return the channels being observed. * * \return A list of pointers to Channel objects. */ QList SimpleObserver::channels() const { return mPriv->channels.toList(); } /** * Return the identifier of the contact used to filter events, or an empty string if none was * provided at construction. * * \return The identifier of the contact. */ QString SimpleObserver::contactIdentifier() const { return mPriv->contactIdentifier; } void SimpleObserver::onAccountConnectionChanged(const Tp::ConnectionPtr &connection) { if (connection) { connect(connection->becomeReady(Connection::FeatureConnected), SIGNAL(finished(Tp::PendingOperation*)), SLOT(onAccountConnectionConnected())); } } void SimpleObserver::onAccountConnectionConnected() { ConnectionPtr conn = mPriv->account->connection(); // check here again as the account connection may have changed and the op failed if (!conn || conn->status() != ConnectionStatusConnected) { return; } debug() << "Normalizing contact id" << mPriv->contactIdentifier; ContactManagerPtr contactManager = conn->contactManager(); connect(contactManager->contactsForIdentifiers(QStringList() << mPriv->contactIdentifier), SIGNAL(finished(Tp::PendingOperation*)), SLOT(onContactConstructed(Tp::PendingOperation*))); } void SimpleObserver::onContactConstructed(Tp::PendingOperation *op) { if (op->isError()) { // what should we do here? retry? wait for a new connection? warning() << "Normalizing contact id failed with" << op->errorName() << " : " << op->errorMessage(); return; } PendingContacts *pc = qobject_cast(op); Q_ASSERT((pc->contacts().size() + pc->invalidIdentifiers().size()) == 1); if (!pc->invalidIdentifiers().isEmpty()) { warning() << "Normalizing contact id failed with invalid id" << mPriv->contactIdentifier; return; } ContactPtr contact = pc->contacts().first(); debug() << "Contact id" << mPriv->contactIdentifier << "normalized to" << contact->id(); mPriv->normalizedContactIdentifier = contact->id(); mPriv->processChannelsQueue(); // disconnect all account signals we are handling disconnect(mPriv->account.data(), 0, this, 0); } void SimpleObserver::onNewChannels(const AccountPtr &channelsAccount, const QList &channels) { if (!mPriv->contactIdentifier.isEmpty() && mPriv->normalizedContactIdentifier.isEmpty()) { mPriv->newChannelsQueue.append(Private::NewChannelsInfo(channelsAccount, channels)); mPriv->channelsQueue.append(&SimpleObserver::Private::processNewChannelsQueue); return; } mPriv->insertChannels(channelsAccount, channels); } void SimpleObserver::onChannelInvalidated(const AccountPtr &channelAccount, const ChannelPtr &channel, const QString &errorName, const QString &errorMessage) { if (!mPriv->contactIdentifier.isEmpty() && mPriv->normalizedContactIdentifier.isEmpty()) { mPriv->channelsInvalidationQueue.append(Private::ChannelInvalidationInfo(channelAccount, channel, errorName, errorMessage)); mPriv->channelsQueue.append(&SimpleObserver::Private::processChannelsInvalidationQueue); return; } mPriv->removeChannel(channelAccount, channel, errorName, errorMessage); } /** * \fn void SimpleObserver::newChannels(const QList &channels) * * Emitted whenever new channels that match this observer's criteria are created. * * \param channels The new channels. */ /** * \fn void SimpleObserver::channelInvalidated(const Tp::ChannelPtr &channel, * const QString &errorName, const QString &errorMessage) * * Emitted whenever a channel that is being observed is invalidated. * * \param channel The channel that was invalidated. * \param errorName A D-Bus error name (a string in a subset * of ASCII, prefixed with a reversed domain name). * \param errorMessage A debugging message associated with the error. */ } // Tp