summaryrefslogtreecommitdiff
path: root/org/freedesktop
diff options
context:
space:
mode:
authorMatthew Johnson <mjj29@hecate.trinhall.cam.ac.uk>2007-01-01 18:28:20 +0000
committerMatthew Johnson <mjj29@hecate.trinhall.cam.ac.uk>2007-01-01 18:28:20 +0000
commite1af25cc5d03833c83412fc75915980d3c39b179 (patch)
tree824e42550411f3260ab6166dfd72f881bf3a638d /org/freedesktop
parentdd26dd75596de003bd3a4063aedb4f0d75796684 (diff)
add extra sending thread in Abstract Connection, use blocking IO
Diffstat (limited to 'org/freedesktop')
-rw-r--r--org/freedesktop/dbus/AbstractConnection.java134
-rw-r--r--org/freedesktop/dbus/DBusConnection.java8
-rw-r--r--org/freedesktop/dbus/RemoteInvocationHandler.java5
-rw-r--r--org/freedesktop/dbus/Transport.java19
4 files changed, 94 insertions, 72 deletions
diff --git a/org/freedesktop/dbus/AbstractConnection.java b/org/freedesktop/dbus/AbstractConnection.java
index 9bb2e27..571c3b5 100644
--- a/org/freedesktop/dbus/AbstractConnection.java
+++ b/org/freedesktop/dbus/AbstractConnection.java
@@ -85,28 +85,6 @@ public abstract class AbstractConnection
}
}
- // write to the wire
- if (null != outgoing) synchronized (outgoing) {
- if (!outgoing.isEmpty())
- m = outgoing.remove(); }
- while (null != m) {
- sendMessage(m);
- m = null;
- if (null != outgoing) synchronized (outgoing) {
- if (!outgoing.isEmpty())
- m = outgoing.remove(); }
- }
- }
- if (null != outgoing) synchronized (outgoing) {
- if (!outgoing.isEmpty())
- m = outgoing.remove();
- }
- while (null != m) {
- sendMessage(m);
- m = null;
- if (null != outgoing) synchronized (outgoing) {
- if (!outgoing.isEmpty())
- m = outgoing.remove(); }
}
synchronized (this) { notifyAll(); }
} catch (Exception e) {
@@ -158,6 +136,44 @@ public abstract class AbstractConnection
}
}
}
+ private class _sender extends Thread
+ {
+ public _sender()
+ {
+ setName("Sender");
+ }
+ public void run()
+ {
+ Message m = null;
+
+ if (Debug.debug) Debug.print(Debug.INFO, "Monitoring outbound queue");
+ // block on the outbound queue and send from it
+ while (_run) {
+ if (null != outgoing) synchronized (outgoing) {
+ if (Debug.debug) Debug.print(Debug.VERBOSE, "Blocking");
+ while (outgoing.size() == 0 && _run)
+ try { outgoing.wait(); } catch (InterruptedException Ie) {}
+ if (Debug.debug) Debug.print(Debug.VERBOSE, "Notified");
+ if (outgoing.size() > 0)
+ m = outgoing.remove();
+ if (Debug.debug) Debug.print(Debug.DEBUG, "Got message: "+m);
+ }
+ if (null != m)
+ sendMessage(m);
+ }
+
+ if (Debug.debug) Debug.print(Debug.INFO, "Flushing outbound queue and quitting");
+ // flush the outbound queue before disconnect.
+ if (null != outgoing) do {
+ synchronized (outgoing) {
+ if (!outgoing.isEmpty())
+ m = outgoing.remove();
+ else m = null;
+ }
+ sendMessage(m);
+ } while (null != m);
+ }
+ }
/**
* Timeout in ms on checking the BUS for incoming messages and sending outgoing messages
*/
@@ -182,6 +198,7 @@ public abstract class AbstractConnection
LinkedList<Error> pendingErrors;
private static final Map<Thread,DBusCallInfo> infomap = new HashMap<Thread,DBusCallInfo>();
protected _thread thread;
+ protected _sender sender;
protected Transport transport;
protected String addr;
static final Pattern dollar_pattern = Pattern.compile("[$]");
@@ -238,6 +255,8 @@ public abstract class AbstractConnection
// start listening
thread = new _thread();
thread.start();
+ sender = new _sender();
+ sender.start();
}
/**
@@ -352,9 +371,16 @@ public abstract class AbstractConnection
*/
public void sendSignal(DBusSignal signal)
{
+ queueOutgoing(signal);
+ }
+ void queueOutgoing(Message m)
+ {
if (null == outgoing) return;
synchronized (outgoing) {
- outgoing.add(signal); }
+ outgoing.add(m);
+ if (Debug.debug) Debug.print(Debug.DEBUG, "Notifying outgoing thread");
+ outgoing.notifyAll();
+ }
}
/**
* Remove a Signal Handler.
@@ -448,6 +474,11 @@ public abstract class AbstractConnection
// stop the main thread
_run = false;
+ // unblock the sending thread.
+ synchronized (outgoing) {
+ outgoing.notifyAll();
+ }
+
// disconnect from the trasport layer
try {
transport.disconnect();
@@ -542,16 +573,14 @@ public abstract class AbstractConnection
if (null == eo) {
try {
- if (null != outgoing) synchronized (outgoing) {
- outgoing.add(new Error(m, new DBus.Error.UnknownObject(m.getPath()+" is not an object provided by this process."))); }
+ queueOutgoing(new Error(m, new DBus.Error.UnknownObject(m.getPath()+" is not an object provided by this process.")));
} catch (DBusException DBe) {}
return;
}
meth = eo.methods.get(new MethodTuple(m.getName(), m.getSig()));
if (null == meth) {
try {
- if (null != outgoing) synchronized (outgoing) {
- outgoing.add(new Error(m, new DBus.Error.UnknownMethod("The method `"+m.getInterface()+"."+m.getName()+"' does not exist on this object."))); }
+ queueOutgoing(new Error(m, new DBus.Error.UnknownMethod("The method `"+m.getInterface()+"."+m.getName()+"' does not exist on this object.")));
} catch (DBusException DBe) {}
return;
}
@@ -561,14 +590,17 @@ public abstract class AbstractConnection
// now execute it
final Method me = meth;
final Object ob = o;
- final EfficientQueue outqueue = outgoing;
final boolean noreply = (1 == (m.getFlags() & Message.Flags.NO_REPLY_EXPECTED));
final DBusCallInfo info = new DBusCallInfo(m);
final AbstractConnection conn = this;
+ if (Debug.debug) Debug.print(Debug.VERBOSE, "Adding Runnable for method "+meth);
addRunnable(new Runnable()
{
- public void run()
+ private boolean run = false;
+ public synchronized void run()
{
+ if (run) return;
+ run = true;
if (Debug.debug) Debug.print(Debug.DEBUG, "Running method "+me+" for remote call");
try {
Type[] ts = me.getGenericParameterTypes();
@@ -577,9 +609,7 @@ public abstract class AbstractConnection
} catch (Exception e) {
if (EXCEPTION_DEBUG && Debug.debug) Debug.print(Debug.ERR, e);
try {
- synchronized (outqueue) {
- outqueue.add(new Error(m, new DBus.Error.UnknownMethod("Failure in de-serializing message ("+e+")")));
- }
+ conn.queueOutgoing(new Error(m, new DBus.Error.UnknownMethod("Failure in de-serializing message ("+e+")")));
} catch (DBusException DBe) {}
return;
}
@@ -610,24 +640,17 @@ public abstract class AbstractConnection
reply = new MethodReturn(m, sb.toString(),nr);
}
- synchronized (outqueue) {
- if (Debug.debug) Debug.print(Debug.VERBOSE, "Queuing reply");
- outqueue.add(reply);
- }
+ conn.queueOutgoing(reply);
}
} catch (DBusExecutionException DBEe) {
if (EXCEPTION_DEBUG && Debug.debug) Debug.print(Debug.ERR, DBEe);
try {
- synchronized (outqueue) {
- outqueue.add(new Error(m, DBEe));
- }
+ conn.queueOutgoing(new Error(m, DBEe));
} catch (DBusException DBe) {}
} catch (Throwable e) {
if (EXCEPTION_DEBUG && Debug.debug) Debug.print(Debug.ERR, e);
try {
- synchronized (outqueue) {
- outqueue.add(new Error(m, new DBusExecutionException("Error Executing Method "+m.getInterface()+"."+m.getName()+": "+e.getMessage())));
- }
+ conn.queueOutgoing(new Error(m, new DBusExecutionException("Error Executing Method "+m.getInterface()+"."+m.getName()+": "+e.getMessage())));
} catch (DBusException DBe) {}
}
}
@@ -650,11 +673,15 @@ public abstract class AbstractConnection
if (null != t) v.addAll(t);
}
if (0 == v.size()) return;
- final EfficientQueue outqueue = outgoing;
final AbstractConnection conn = this;
- for (final DBusSigHandler h: v)
- addRunnable(new Runnable() { public void run() {
+ for (final DBusSigHandler h: v) {
+ if (Debug.debug) Debug.print(Debug.VERBOSE, "Adding Runnable for signal "+s+" with handler "+h);
+ addRunnable(new Runnable() {
+ private boolean run = false;
+ public synchronized void run()
{
+ if (run) return;
+ run = true;
try {
DBusSignal rs;
if (s instanceof DBusSignal.internalsig || s.getClass().equals(DBusSignal.class))
@@ -665,13 +692,12 @@ public abstract class AbstractConnection
} catch (DBusException DBe) {
if (EXCEPTION_DEBUG && Debug.debug) Debug.print(Debug.ERR, DBe);
try {
- synchronized (outqueue) {
- outqueue.add(new Error(s, new DBusExecutionException("Error handling signal "+s.getInterface()+"."+s.getName()+": "+DBe.getMessage())));
- }
+ conn.queueOutgoing(new Error(s, new DBusExecutionException("Error handling signal "+s.getInterface()+"."+s.getName()+": "+DBe.getMessage())));
} catch (DBusException DBe2) {}
}
}
- } });
+ });
+ }
}
private void handleMessage(final Error err)
{
@@ -702,9 +728,7 @@ public abstract class AbstractConnection
mr.setCall(m);
} else
try {
- if (null != outgoing) synchronized (outgoing) {
- outgoing.add(new Error(mr, new DBusExecutionException("Spurious reply. No message with the given serial id was awaiting a reply.")));
- }
+ queueOutgoing(new Error(mr, new DBusExecutionException("Spurious reply. No message with the given serial id was awaiting a reply.")));
} catch (DBusException DBe) {}
}
protected void sendMessage(Message m)
@@ -713,8 +737,6 @@ public abstract class AbstractConnection
if (m instanceof DBusSignal)
((DBusSignal) m).appendbody(this);
- transport.mout.writeMessage(m);
-
if (m instanceof MethodCall) {
if (0 == (m.getFlags() & Message.Flags.NO_REPLY_EXPECTED))
if (null == pendingCalls)
@@ -723,6 +745,9 @@ public abstract class AbstractConnection
pendingCalls.put(m.getSerial(),(MethodCall) m);
}
}
+
+ transport.mout.writeMessage(m);
+
} catch (Exception e) {
if (EXCEPTION_DEBUG && Debug.debug) Debug.print(Debug.ERR, e);
if (m instanceof MethodCall && e instanceof DBusExecutionException)
@@ -742,6 +767,7 @@ public abstract class AbstractConnection
} catch(DBusException IOe) {
if (EXCEPTION_DEBUG && Debug.debug) Debug.print(Debug.ERR, e);
}
+ if (e instanceof IOException) disconnect();
}
}
private Message readIncoming(int timeoutms, EfficientQueue outgoing) throws DBusException
diff --git a/org/freedesktop/dbus/DBusConnection.java b/org/freedesktop/dbus/DBusConnection.java
index 7f85718..8bf89de 100644
--- a/org/freedesktop/dbus/DBusConnection.java
+++ b/org/freedesktop/dbus/DBusConnection.java
@@ -572,14 +572,6 @@ public class DBusConnection extends AbstractConnection
}
pendingCalls = null;
}
- synchronized (outgoing) {
- for (Message m: outgoing.getKeys())
- if (m instanceof MethodCall)
- ((MethodCall) m).setReply(err);
- else
- sendMessage(m);
- outgoing = null;
- }
synchronized (pendingErrors) {
pendingErrors.add(err);
}
diff --git a/org/freedesktop/dbus/RemoteInvocationHandler.java b/org/freedesktop/dbus/RemoteInvocationHandler.java
index ee70984..d4b8273 100644
--- a/org/freedesktop/dbus/RemoteInvocationHandler.java
+++ b/org/freedesktop/dbus/RemoteInvocationHandler.java
@@ -100,10 +100,7 @@ class RemoteInvocationHandler implements InvocationHandler
throw new DBusExecutionException("Failed to construct outgoing method call: "+DBe.getMessage());
}
if (null == conn.outgoing) throw new NotConnected("Not Connected");
- synchronized (conn.outgoing) {
- if (Debug.debug) Debug.print(Debug.VERBOSE, "Adding method call to outgoing queue");
- conn.outgoing.add(call);
- }
+ conn.queueOutgoing(call);
if (async) return new DBusAsyncReply(call, m, conn);
diff --git a/org/freedesktop/dbus/Transport.java b/org/freedesktop/dbus/Transport.java
index 5d6e3bb..da4574d 100644
--- a/org/freedesktop/dbus/Transport.java
+++ b/org/freedesktop/dbus/Transport.java
@@ -665,10 +665,18 @@ public class Transport
}
public void connect(String address) throws IOException
{
- connect(new BusAddress(address));
+ connect(new BusAddress(address), true);
+ }
+ public void connect(String address, boolean blocking) throws IOException
+ {
+ connect(new BusAddress(address), blocking);
}
public void connect(BusAddress address) throws IOException
{
+ connect(address, true);
+ }
+ public void connect(BusAddress address, boolean blocking) throws IOException
+ {
if (Debug.debug) Debug.print(Debug.INFO, "Connecting to "+address);
this.address = address;
OutputStream out = null;
@@ -710,7 +718,6 @@ public class Transport
s = new Socket();
s.connect(new InetSocketAddress(address.getParameter("host"), Integer.parseInt(address.getParameter("port"))));
}
- s.setSoTimeout(0);
in = s.getInputStream();
out = s.getOutputStream();
} else {
@@ -722,12 +729,12 @@ public class Transport
throw new IOException("Failed to auth");
}
if (null != us) {
- if (Debug.debug) Debug.print(Debug.VERBOSE, "Setting non-blocking on UnixSocket");
- us.setBlocking(false);
+ if (Debug.debug) Debug.print(Debug.VERBOSE, "Setting "+(blocking?"":"non-")+"blocking on UnixSocket");
+ us.setBlocking(blocking);
}
if (null != s) {
- if (Debug.debug) Debug.print(Debug.VERBOSE, "Setting non-blocking on Socket");
- s.setSoTimeout(1);
+ if (Debug.debug) Debug.print(Debug.VERBOSE, "Setting "+(blocking?"":"non-")+"blocking on Socket");
+ s.setSoTimeout(blocking ? 0 : 1);
}
mout = new MessageWriter(out);
min = new MessageReader(in);