diff -r 122ab6208c3c -r c9527bbe99a6 stream.go --- a/stream.go Tue Dec 27 20:42:44 2011 -0700 +++ b/stream.go Wed Dec 28 11:30:10 2011 -0700 @@ -26,6 +26,11 @@ "xml" ) +type stanzaHandler struct { + id string + f func(Stanza) bool +} + func (cl *Client) readTransport(w io.Writer) { defer tryClose(cl.socket, w) cl.socket.SetReadTimeout(1e8) @@ -171,18 +176,35 @@ func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- interface{}) { defer tryClose(srvIn, cliOut) - for x := range srvIn { - switch obj := x.(type) { - case *Stream: - handleStream(obj) - case *Features: - cl.handleFeatures(obj) - case *starttls: - cl.handleTls(obj) - case *auth: - cl.handleSasl(obj) - default: - cliOut <- x + handlers := make(map[string] func(Stanza) bool) + // TODO This for loop will never terminate, even when the + // channels are closed. + for { + select { + case h := <- cl.handlers: + handlers[h.id] = h.f + case x := <- srvIn: + send := false + switch obj := x.(type) { + case *Stream: + handleStream(obj) + case *Features: + cl.handleFeatures(obj) + case *starttls: + cl.handleTls(obj) + case *auth: + cl.handleSasl(obj) + default: + send = true + } + if st, ok := x.(Stanza) ; ok && + handlers[st.XId()] != nil { + f := handlers[st.XId()] + send = f(st) + } + if send { + cliOut <- x + } } } } @@ -455,3 +477,8 @@ cl.xmlOut <- msg // TODO Grab the iq result from the server and update cl.Jid. } + +func (cl *Client) HandleStanza(id string, f func(Stanza) bool) { + h := &stanzaHandler{id: id, f: f} + cl.handlers <- h +}