diff options
author | Matthew Johnson <mjj29@hecate.trinhall.cam.ac.uk> | 2007-01-01 18:28:20 +0000 |
---|---|---|
committer | Matthew Johnson <mjj29@hecate.trinhall.cam.ac.uk> | 2007-01-01 18:28:20 +0000 |
commit | e1af25cc5d03833c83412fc75915980d3c39b179 (patch) | |
tree | 824e42550411f3260ab6166dfd72f881bf3a638d /org/freedesktop | |
parent | dd26dd75596de003bd3a4063aedb4f0d75796684 (diff) |
add extra sending thread in Abstract Connection, use blocking IO
Diffstat (limited to 'org/freedesktop')
-rw-r--r-- | org/freedesktop/dbus/AbstractConnection.java | 134 | ||||
-rw-r--r-- | org/freedesktop/dbus/DBusConnection.java | 8 | ||||
-rw-r--r-- | org/freedesktop/dbus/RemoteInvocationHandler.java | 5 | ||||
-rw-r--r-- | org/freedesktop/dbus/Transport.java | 19 |
4 files changed, 94 insertions, 72 deletions
diff --git a/org/freedesktop/dbus/AbstractConnection.java b/org/freedesktop/dbus/AbstractConnection.java index 9bb2e27..571c3b5 100644 --- a/org/freedesktop/dbus/AbstractConnection.java +++ b/org/freedesktop/dbus/AbstractConnection.java @@ -85,28 +85,6 @@ public abstract class AbstractConnection } } - // write to the wire - if (null != outgoing) synchronized (outgoing) { - if (!outgoing.isEmpty()) - m = outgoing.remove(); } - while (null != m) { - sendMessage(m); - m = null; - if (null != outgoing) synchronized (outgoing) { - if (!outgoing.isEmpty()) - m = outgoing.remove(); } - } - } - if (null != outgoing) synchronized (outgoing) { - if (!outgoing.isEmpty()) - m = outgoing.remove(); - } - while (null != m) { - sendMessage(m); - m = null; - if (null != outgoing) synchronized (outgoing) { - if (!outgoing.isEmpty()) - m = outgoing.remove(); } } synchronized (this) { notifyAll(); } } catch (Exception e) { @@ -158,6 +136,44 @@ public abstract class AbstractConnection } } } + private class _sender extends Thread + { + public _sender() + { + setName("Sender"); + } + public void run() + { + Message m = null; + + if (Debug.debug) Debug.print(Debug.INFO, "Monitoring outbound queue"); + // block on the outbound queue and send from it + while (_run) { + if (null != outgoing) synchronized (outgoing) { + if (Debug.debug) Debug.print(Debug.VERBOSE, "Blocking"); + while (outgoing.size() == 0 && _run) + try { outgoing.wait(); } catch (InterruptedException Ie) {} + if (Debug.debug) Debug.print(Debug.VERBOSE, "Notified"); + if (outgoing.size() > 0) + m = outgoing.remove(); + if (Debug.debug) Debug.print(Debug.DEBUG, "Got message: "+m); + } + if (null != m) + sendMessage(m); + } + + if (Debug.debug) Debug.print(Debug.INFO, "Flushing outbound queue and quitting"); + // flush the outbound queue before disconnect. + if (null != outgoing) do { + synchronized (outgoing) { + if (!outgoing.isEmpty()) + m = outgoing.remove(); + else m = null; + } + sendMessage(m); + } while (null != m); + } + } /** * Timeout in ms on checking the BUS for incoming messages and sending outgoing messages */ @@ -182,6 +198,7 @@ public abstract class AbstractConnection LinkedList<Error> pendingErrors; private static final Map<Thread,DBusCallInfo> infomap = new HashMap<Thread,DBusCallInfo>(); protected _thread thread; + protected _sender sender; protected Transport transport; protected String addr; static final Pattern dollar_pattern = Pattern.compile("[$]"); @@ -238,6 +255,8 @@ public abstract class AbstractConnection // start listening thread = new _thread(); thread.start(); + sender = new _sender(); + sender.start(); } /** @@ -352,9 +371,16 @@ public abstract class AbstractConnection */ public void sendSignal(DBusSignal signal) { + queueOutgoing(signal); + } + void queueOutgoing(Message m) + { if (null == outgoing) return; synchronized (outgoing) { - outgoing.add(signal); } + outgoing.add(m); + if (Debug.debug) Debug.print(Debug.DEBUG, "Notifying outgoing thread"); + outgoing.notifyAll(); + } } /** * Remove a Signal Handler. @@ -448,6 +474,11 @@ public abstract class AbstractConnection // stop the main thread _run = false; + // unblock the sending thread. + synchronized (outgoing) { + outgoing.notifyAll(); + } + // disconnect from the trasport layer try { transport.disconnect(); @@ -542,16 +573,14 @@ public abstract class AbstractConnection if (null == eo) { try { - if (null != outgoing) synchronized (outgoing) { - outgoing.add(new Error(m, new DBus.Error.UnknownObject(m.getPath()+" is not an object provided by this process."))); } + queueOutgoing(new Error(m, new DBus.Error.UnknownObject(m.getPath()+" is not an object provided by this process."))); } catch (DBusException DBe) {} return; } meth = eo.methods.get(new MethodTuple(m.getName(), m.getSig())); if (null == meth) { try { - if (null != outgoing) synchronized (outgoing) { - outgoing.add(new Error(m, new DBus.Error.UnknownMethod("The method `"+m.getInterface()+"."+m.getName()+"' does not exist on this object."))); } + queueOutgoing(new Error(m, new DBus.Error.UnknownMethod("The method `"+m.getInterface()+"."+m.getName()+"' does not exist on this object."))); } catch (DBusException DBe) {} return; } @@ -561,14 +590,17 @@ public abstract class AbstractConnection // now execute it final Method me = meth; final Object ob = o; - final EfficientQueue outqueue = outgoing; final boolean noreply = (1 == (m.getFlags() & Message.Flags.NO_REPLY_EXPECTED)); final DBusCallInfo info = new DBusCallInfo(m); final AbstractConnection conn = this; + if (Debug.debug) Debug.print(Debug.VERBOSE, "Adding Runnable for method "+meth); addRunnable(new Runnable() { - public void run() + private boolean run = false; + public synchronized void run() { + if (run) return; + run = true; if (Debug.debug) Debug.print(Debug.DEBUG, "Running method "+me+" for remote call"); try { Type[] ts = me.getGenericParameterTypes(); @@ -577,9 +609,7 @@ public abstract class AbstractConnection } catch (Exception e) { if (EXCEPTION_DEBUG && Debug.debug) Debug.print(Debug.ERR, e); try { - synchronized (outqueue) { - outqueue.add(new Error(m, new DBus.Error.UnknownMethod("Failure in de-serializing message ("+e+")"))); - } + conn.queueOutgoing(new Error(m, new DBus.Error.UnknownMethod("Failure in de-serializing message ("+e+")"))); } catch (DBusException DBe) {} return; } @@ -610,24 +640,17 @@ public abstract class AbstractConnection reply = new MethodReturn(m, sb.toString(),nr); } - synchronized (outqueue) { - if (Debug.debug) Debug.print(Debug.VERBOSE, "Queuing reply"); - outqueue.add(reply); - } + conn.queueOutgoing(reply); } } catch (DBusExecutionException DBEe) { if (EXCEPTION_DEBUG && Debug.debug) Debug.print(Debug.ERR, DBEe); try { - synchronized (outqueue) { - outqueue.add(new Error(m, DBEe)); - } + conn.queueOutgoing(new Error(m, DBEe)); } catch (DBusException DBe) {} } catch (Throwable e) { if (EXCEPTION_DEBUG && Debug.debug) Debug.print(Debug.ERR, e); try { - synchronized (outqueue) { - outqueue.add(new Error(m, new DBusExecutionException("Error Executing Method "+m.getInterface()+"."+m.getName()+": "+e.getMessage()))); - } + conn.queueOutgoing(new Error(m, new DBusExecutionException("Error Executing Method "+m.getInterface()+"."+m.getName()+": "+e.getMessage()))); } catch (DBusException DBe) {} } } @@ -650,11 +673,15 @@ public abstract class AbstractConnection if (null != t) v.addAll(t); } if (0 == v.size()) return; - final EfficientQueue outqueue = outgoing; final AbstractConnection conn = this; - for (final DBusSigHandler h: v) - addRunnable(new Runnable() { public void run() { + for (final DBusSigHandler h: v) { + if (Debug.debug) Debug.print(Debug.VERBOSE, "Adding Runnable for signal "+s+" with handler "+h); + addRunnable(new Runnable() { + private boolean run = false; + public synchronized void run() { + if (run) return; + run = true; try { DBusSignal rs; if (s instanceof DBusSignal.internalsig || s.getClass().equals(DBusSignal.class)) @@ -665,13 +692,12 @@ public abstract class AbstractConnection } catch (DBusException DBe) { if (EXCEPTION_DEBUG && Debug.debug) Debug.print(Debug.ERR, DBe); try { - synchronized (outqueue) { - outqueue.add(new Error(s, new DBusExecutionException("Error handling signal "+s.getInterface()+"."+s.getName()+": "+DBe.getMessage()))); - } + conn.queueOutgoing(new Error(s, new DBusExecutionException("Error handling signal "+s.getInterface()+"."+s.getName()+": "+DBe.getMessage()))); } catch (DBusException DBe2) {} } } - } }); + }); + } } private void handleMessage(final Error err) { @@ -702,9 +728,7 @@ public abstract class AbstractConnection mr.setCall(m); } else try { - if (null != outgoing) synchronized (outgoing) { - outgoing.add(new Error(mr, new DBusExecutionException("Spurious reply. No message with the given serial id was awaiting a reply."))); - } + queueOutgoing(new Error(mr, new DBusExecutionException("Spurious reply. No message with the given serial id was awaiting a reply."))); } catch (DBusException DBe) {} } protected void sendMessage(Message m) @@ -713,8 +737,6 @@ public abstract class AbstractConnection if (m instanceof DBusSignal) ((DBusSignal) m).appendbody(this); - transport.mout.writeMessage(m); - if (m instanceof MethodCall) { if (0 == (m.getFlags() & Message.Flags.NO_REPLY_EXPECTED)) if (null == pendingCalls) @@ -723,6 +745,9 @@ public abstract class AbstractConnection pendingCalls.put(m.getSerial(),(MethodCall) m); } } + + transport.mout.writeMessage(m); + } catch (Exception e) { if (EXCEPTION_DEBUG && Debug.debug) Debug.print(Debug.ERR, e); if (m instanceof MethodCall && e instanceof DBusExecutionException) @@ -742,6 +767,7 @@ public abstract class AbstractConnection } catch(DBusException IOe) { if (EXCEPTION_DEBUG && Debug.debug) Debug.print(Debug.ERR, e); } + if (e instanceof IOException) disconnect(); } } private Message readIncoming(int timeoutms, EfficientQueue outgoing) throws DBusException diff --git a/org/freedesktop/dbus/DBusConnection.java b/org/freedesktop/dbus/DBusConnection.java index 7f85718..8bf89de 100644 --- a/org/freedesktop/dbus/DBusConnection.java +++ b/org/freedesktop/dbus/DBusConnection.java @@ -572,14 +572,6 @@ public class DBusConnection extends AbstractConnection } pendingCalls = null; } - synchronized (outgoing) { - for (Message m: outgoing.getKeys()) - if (m instanceof MethodCall) - ((MethodCall) m).setReply(err); - else - sendMessage(m); - outgoing = null; - } synchronized (pendingErrors) { pendingErrors.add(err); } diff --git a/org/freedesktop/dbus/RemoteInvocationHandler.java b/org/freedesktop/dbus/RemoteInvocationHandler.java index ee70984..d4b8273 100644 --- a/org/freedesktop/dbus/RemoteInvocationHandler.java +++ b/org/freedesktop/dbus/RemoteInvocationHandler.java @@ -100,10 +100,7 @@ class RemoteInvocationHandler implements InvocationHandler throw new DBusExecutionException("Failed to construct outgoing method call: "+DBe.getMessage()); } if (null == conn.outgoing) throw new NotConnected("Not Connected"); - synchronized (conn.outgoing) { - if (Debug.debug) Debug.print(Debug.VERBOSE, "Adding method call to outgoing queue"); - conn.outgoing.add(call); - } + conn.queueOutgoing(call); if (async) return new DBusAsyncReply(call, m, conn); diff --git a/org/freedesktop/dbus/Transport.java b/org/freedesktop/dbus/Transport.java index 5d6e3bb..da4574d 100644 --- a/org/freedesktop/dbus/Transport.java +++ b/org/freedesktop/dbus/Transport.java @@ -665,10 +665,18 @@ public class Transport } public void connect(String address) throws IOException { - connect(new BusAddress(address)); + connect(new BusAddress(address), true); + } + public void connect(String address, boolean blocking) throws IOException + { + connect(new BusAddress(address), blocking); } public void connect(BusAddress address) throws IOException { + connect(address, true); + } + public void connect(BusAddress address, boolean blocking) throws IOException + { if (Debug.debug) Debug.print(Debug.INFO, "Connecting to "+address); this.address = address; OutputStream out = null; @@ -710,7 +718,6 @@ public class Transport s = new Socket(); s.connect(new InetSocketAddress(address.getParameter("host"), Integer.parseInt(address.getParameter("port")))); } - s.setSoTimeout(0); in = s.getInputStream(); out = s.getOutputStream(); } else { @@ -722,12 +729,12 @@ public class Transport throw new IOException("Failed to auth"); } if (null != us) { - if (Debug.debug) Debug.print(Debug.VERBOSE, "Setting non-blocking on UnixSocket"); - us.setBlocking(false); + if (Debug.debug) Debug.print(Debug.VERBOSE, "Setting "+(blocking?"":"non-")+"blocking on UnixSocket"); + us.setBlocking(blocking); } if (null != s) { - if (Debug.debug) Debug.print(Debug.VERBOSE, "Setting non-blocking on Socket"); - s.setSoTimeout(1); + if (Debug.debug) Debug.print(Debug.VERBOSE, "Setting "+(blocking?"":"non-")+"blocking on Socket"); + s.setSoTimeout(blocking ? 0 : 1); } mout = new MessageWriter(out); min = new MessageReader(in); |