summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Harris <pharris@opentext.com>2011-01-07 13:55:25 -0500
committerPeter Harris <pharris@opentext.com>2011-01-07 13:55:47 -0500
commitdc9a357c9c5d95e87044b3fffff4f98cb74662c5 (patch)
treeb7411202220a963ca413f2cc121becf374b47e8b
parent8db1d9a12e7ed59eca6a926464f7da9260e8eda5 (diff)
Add (and use) callback-based reply decoders
This improves performance, as we aren't spinning up and killing a tonne of goroutines.
-rw-r--r--xgob.go29
-rw-r--r--xgob/xproto.go18
2 files changed, 35 insertions, 12 deletions
diff --git a/xgob.go b/xgob.go
index e231592..298a269 100644
--- a/xgob.go
+++ b/xgob.go
@@ -101,6 +101,7 @@ type Connection struct {
conn io.ReadWriteCloser
out *bufio.Writer
reply map[uint64]chan interface {}
+ callback map[uint64]func(interface {})
replyMutex sync.Mutex
writeMutex sync.Mutex
lastRequestWritten uint64
@@ -418,6 +419,16 @@ func (c *Connection) registerReply (reply chan interface{}) {
c.replyMutex.Unlock()
}
+func (c *Connection) registerCallback (reply func(interface{})) {
+ c.replyMutex.Lock()
+
+ c.lastRequestWritten++
+ if reply != nil {
+ c.callback[c.lastRequestWritten] = reply
+ }
+ c.replyMutex.Unlock()
+}
+
func (c *Connection) postReply (seq uint64, reply interface{}) {
c.replyMutex.Lock()
@@ -429,10 +440,16 @@ func (c *Connection) postReply (seq uint64, reply interface{}) {
target <- nil
c.reply[i] = nil, false
}
+ if target := c.callback[i]; target != nil {
+ target(nil)
+ c.callback[i] = nil, false
+ }
}
if target := c.reply[seq]; target != nil {
target <- reply
+ } else if target := c.callback[seq]; target != nil {
+ target(reply)
} else {
error, ok := reply.(Error)
if ok {
@@ -497,6 +514,7 @@ func (c *Connection)connect_auth(name string, data string) {
c.Event = make (chan Event, events_before_readblock)
c.Error = make (chan Error, errors_before_readblock)
c.reply = make (map [uint64]chan interface{})
+ c.callback = make (map [uint64]func(interface{}))
go c.read(in)
}
@@ -540,6 +558,17 @@ func (c *Connection) writeRequest (req []byte, replier bool, rv chan interface{}
c.writeMutex.Unlock()
}
+func (c *Connection) WriteReplyRequestCallback (req []byte, closure func (interface{})) {
+ c.writeMutex.Lock()
+
+ c.registerCallback(closure)
+ c.out.Write(req)
+
+ c.lastReplierSent = c.lastRequestWritten
+
+ c.writeMutex.Unlock()
+}
+
func (c *Connection) WriteMultiRequest (req []byte, expected int) chan interface{} {
if c == nil || c.conn == nil {
return nil
diff --git a/xgob/xproto.go b/xgob/xproto.go
index 87ca85b..895ba3b 100644
--- a/xgob/xproto.go
+++ b/xgob/xproto.go
@@ -56,9 +56,8 @@ func getUint32(buf []byte) uint32 {
return rv
}
-func queryTreeReply(in chan interface{}, out chan QueryTreeReply) {
+func queryTreeReply(data interface{}, out chan QueryTreeReply) {
var rv QueryTreeReply
- data := <-in
switch d := data.(type) {
case xgob.Error:
@@ -91,15 +90,13 @@ func QueryTree(c *xgob.Connection, window Window) chan QueryTreeReply {
req = appendUint32(req, uint32(window))
rv := make(chan QueryTreeReply, 1)
- reply := c.WriteReplyRequest(req)
+ c.WriteReplyRequestCallback(req, func (in interface{}) { queryTreeReply(in, rv) })
- go queryTreeReply(reply, rv)
return rv
}
-func internAtomReply(in chan interface{}, out chan InternAtomReply) {
+func internAtomReply(data interface{}, out chan InternAtomReply) {
var rv InternAtomReply
- data := <-in
switch d := data.(type) {
case xgob.Error:
@@ -130,15 +127,13 @@ func InternAtom(c *xgob.Connection, only_if_exists bool, name string) chan Inter
copy(req[8:], name)
rv := make(chan InternAtomReply, 1)
- reply := c.WriteReplyRequest(req)
+ c.WriteReplyRequestCallback(req, func (in interface{}) { internAtomReply(in, rv) })
- go internAtomReply(reply, rv)
return rv
}
-func getPropertyReply(in chan interface{}, out chan GetPropertyReply) {
+func getPropertyReply(data interface{}, out chan GetPropertyReply) {
var rv GetPropertyReply
- data := <-in
switch d := data.(type) {
case xgob.Error:
@@ -176,8 +171,7 @@ func GetProperty(c *xgob.Connection, delete bool, window Window, property Atom,
req = appendUint32(req, long_length)
rv := make(chan GetPropertyReply, 1)
- reply := c.WriteReplyRequest(req)
+ c.WriteReplyRequestCallback(req, func (in interface{}) { getPropertyReply(in, rv) })
- go getPropertyReply(reply, rv)
return rv
}