diff options
author | Peter Harris <pharris@opentext.com> | 2011-01-07 13:55:25 -0500 |
---|---|---|
committer | Peter Harris <pharris@opentext.com> | 2011-01-07 13:55:47 -0500 |
commit | dc9a357c9c5d95e87044b3fffff4f98cb74662c5 (patch) | |
tree | b7411202220a963ca413f2cc121becf374b47e8b | |
parent | 8db1d9a12e7ed59eca6a926464f7da9260e8eda5 (diff) |
Add (and use) callback-based reply decoders
This improves performance, as we aren't spinning up and killing
a tonne of goroutines.
-rw-r--r-- | xgob.go | 29 | ||||
-rw-r--r-- | xgob/xproto.go | 18 |
2 files changed, 35 insertions, 12 deletions
@@ -101,6 +101,7 @@ type Connection struct { conn io.ReadWriteCloser out *bufio.Writer reply map[uint64]chan interface {} + callback map[uint64]func(interface {}) replyMutex sync.Mutex writeMutex sync.Mutex lastRequestWritten uint64 @@ -418,6 +419,16 @@ func (c *Connection) registerReply (reply chan interface{}) { c.replyMutex.Unlock() } +func (c *Connection) registerCallback (reply func(interface{})) { + c.replyMutex.Lock() + + c.lastRequestWritten++ + if reply != nil { + c.callback[c.lastRequestWritten] = reply + } + c.replyMutex.Unlock() +} + func (c *Connection) postReply (seq uint64, reply interface{}) { c.replyMutex.Lock() @@ -429,10 +440,16 @@ func (c *Connection) postReply (seq uint64, reply interface{}) { target <- nil c.reply[i] = nil, false } + if target := c.callback[i]; target != nil { + target(nil) + c.callback[i] = nil, false + } } if target := c.reply[seq]; target != nil { target <- reply + } else if target := c.callback[seq]; target != nil { + target(reply) } else { error, ok := reply.(Error) if ok { @@ -497,6 +514,7 @@ func (c *Connection)connect_auth(name string, data string) { c.Event = make (chan Event, events_before_readblock) c.Error = make (chan Error, errors_before_readblock) c.reply = make (map [uint64]chan interface{}) + c.callback = make (map [uint64]func(interface{})) go c.read(in) } @@ -540,6 +558,17 @@ func (c *Connection) writeRequest (req []byte, replier bool, rv chan interface{} c.writeMutex.Unlock() } +func (c *Connection) WriteReplyRequestCallback (req []byte, closure func (interface{})) { + c.writeMutex.Lock() + + c.registerCallback(closure) + c.out.Write(req) + + c.lastReplierSent = c.lastRequestWritten + + c.writeMutex.Unlock() +} + func (c *Connection) WriteMultiRequest (req []byte, expected int) chan interface{} { if c == nil || c.conn == nil { return nil diff --git a/xgob/xproto.go b/xgob/xproto.go index 87ca85b..895ba3b 100644 --- a/xgob/xproto.go +++ b/xgob/xproto.go @@ -56,9 +56,8 @@ func getUint32(buf []byte) uint32 { return rv } -func queryTreeReply(in chan interface{}, out chan QueryTreeReply) { +func queryTreeReply(data interface{}, out chan QueryTreeReply) { var rv QueryTreeReply - data := <-in switch d := data.(type) { case xgob.Error: @@ -91,15 +90,13 @@ func QueryTree(c *xgob.Connection, window Window) chan QueryTreeReply { req = appendUint32(req, uint32(window)) rv := make(chan QueryTreeReply, 1) - reply := c.WriteReplyRequest(req) + c.WriteReplyRequestCallback(req, func (in interface{}) { queryTreeReply(in, rv) }) - go queryTreeReply(reply, rv) return rv } -func internAtomReply(in chan interface{}, out chan InternAtomReply) { +func internAtomReply(data interface{}, out chan InternAtomReply) { var rv InternAtomReply - data := <-in switch d := data.(type) { case xgob.Error: @@ -130,15 +127,13 @@ func InternAtom(c *xgob.Connection, only_if_exists bool, name string) chan Inter copy(req[8:], name) rv := make(chan InternAtomReply, 1) - reply := c.WriteReplyRequest(req) + c.WriteReplyRequestCallback(req, func (in interface{}) { internAtomReply(in, rv) }) - go internAtomReply(reply, rv) return rv } -func getPropertyReply(in chan interface{}, out chan GetPropertyReply) { +func getPropertyReply(data interface{}, out chan GetPropertyReply) { var rv GetPropertyReply - data := <-in switch d := data.(type) { case xgob.Error: @@ -176,8 +171,7 @@ func GetProperty(c *xgob.Connection, delete bool, window Window, property Atom, req = appendUint32(req, long_length) rv := make(chan GetPropertyReply, 1) - reply := c.WriteReplyRequest(req) + c.WriteReplyRequestCallback(req, func (in interface{}) { getPropertyReply(in, rv) }) - go getPropertyReply(reply, rv) return rv } |