diff -r 7b5586a5e109 -r 3f891f7fe817 xmpp/xmpp.go --- a/xmpp/xmpp.go Mon Sep 30 18:59:37 2013 -0600 +++ b/xmpp/xmpp.go Mon Sep 30 20:31:25 2013 -0600 @@ -7,10 +7,8 @@ package xmpp import ( - "bytes" "crypto/tls" "encoding/xml" - "errors" "fmt" "io" "net" @@ -47,6 +45,7 @@ statusBound statusRunning statusShutdown + statusError ) var ( @@ -66,8 +65,20 @@ StatusRunning Status = statusRunning // The session has closed, or is in the process of closing. StatusShutdown Status = statusShutdown + // The session has encountered an error. Otherwise identical + // to StatusShutdown. + StatusError Status = statusError ) +func (s Status) fatal() bool { + switch s { + default: + return false + case StatusShutdown, StatusError: + return true + } +} + // A filter can modify the XMPP traffic to or from the remote // server. It's part of an Extension. The filter function will be // called in a new goroutine, so it doesn't need to return. The filter @@ -102,7 +113,7 @@ // Outgoing XMPP stanzas to the server should be sent to this // channel. Send chan<- Stanza - sendXml chan<- interface{} + sendRaw chan<- interface{} statmgr *statmgr // The client's roster is also known as the buddy list. It's // the set of contacts which are known to this JID, or which @@ -113,6 +124,7 @@ sendFilterAdd, recvFilterAdd chan Filter tlsConfig tls.Config layer1 *layer1 + error chan error } // Creates an XMPP client identified by the given JID, authenticating @@ -136,6 +148,7 @@ cl.sendFilterAdd = make(chan Filter) cl.recvFilterAdd = make(chan Filter) cl.statmgr = newStatmgr(status) + cl.error = make(chan error, 1) extStanza := make(map[xml.Name]reflect.Type) for _, ext := range exts { @@ -180,15 +193,15 @@ // Start the transport handler, initially unencrypted. recvReader, recvWriter := io.Pipe() sendReader, sendWriter := io.Pipe() - cl.layer1 = startLayer1(tcp, recvWriter, sendReader, + cl.layer1 = cl.startLayer1(tcp, recvWriter, sendReader, cl.statmgr.newListener()) // Start the reader and writer that convert to and from XML. recvXmlCh := make(chan interface{}) - go recvXml(recvReader, recvXmlCh, extStanza) + go cl.recvXml(recvReader, recvXmlCh, extStanza) sendXmlCh := make(chan interface{}) - cl.sendXml = sendXmlCh - go sendXml(sendWriter, sendXmlCh) + cl.sendRaw = sendXmlCh + go cl.sendXml(sendWriter, sendXmlCh) // Start the reader and writer that convert between XML and // XMPP stanzas. @@ -213,11 +226,11 @@ // Initial handshake. hsOut := &stream{To: jid.Domain, Version: XMPPVersion} - cl.sendXml <- hsOut + cl.sendRaw <- hsOut // Wait until resource binding is complete. if err := cl.statmgr.awaitStatus(StatusBound); err != nil { - return nil, err + return nil, cl.getError(err) } // Forget about the password, for paranoia's sake. @@ -231,20 +244,18 @@ f := func(st Stanza) { iq, ok := st.(*Iq) if !ok { - Warn.Log("iq reply not iq; can't start session") - ch <- errors.New("bad session start reply") + ch <- fmt.Errorf("bad session start reply: %#v", st) } if iq.Type == "error" { - Warn.Logf("Can't start session: %v", iq) - ch <- iq.Error + ch <- fmt.Errorf("Can't start session: %v", iq.Error) } ch <- nil } cl.SetCallback(id, f) - cl.sendXml <- iq + cl.sendRaw <- iq // Now wait until the callback is called. if err := <-ch; err != nil { - return nil, err + return nil, cl.getError(err) } // This allows the client to receive stanzas. @@ -256,37 +267,7 @@ // Send the initial presence. cl.Send <- &pr - return cl, nil -} - -func tee(r io.Reader, w io.Writer, prefix string) { - defer func(w io.Writer) { - if c, ok := w.(io.Closer); ok { - c.Close() - } - }(w) - - buf := bytes.NewBuffer([]uint8(prefix)) - for { - var c [1]byte - n, _ := r.Read(c[:]) - if n == 0 { - break - } - n, _ = w.Write(c[:n]) - if n == 0 { - break - } - buf.Write(c[:n]) - if c[0] == '\n' || c[0] == '>' { - Debug.Log(buf) - buf = bytes.NewBuffer([]uint8(prefix)) - } - } - leftover := buf.String() - if leftover != "" { - Debug.Log(buf) - } + return cl, cl.getError(nil) } func (cl *Client) Close() { @@ -295,3 +276,34 @@ // Shuts down the senders: close(cl.Send) } + +// If there's a buffered error in the channel, return it. Otherwise, +// return what was passed to us. The idea is that the error in the +// channel probably preceded (and caused) the one that's passed as an +// argument here. +func (cl *Client) getError(err1 error) error { + select { + case err0 := <-cl.error: + return err0 + default: + return err1 + } +} + +// Register an error that happened in the internals somewhere. If +// there's already an error in the channel, discard the newer one in +// favor of the older. +func (cl *Client) setError(err error) { + cl.Close() + cl.setStatus(StatusError) + if len(cl.error) > 0 { + return + } + // If we're in a race between two calls to this function, + // trying to set the "first" error, just arbitrarily let one + // of them win. + select { + case cl.error <- err: + default: + } +}