# HG changeset patch # User Chris Jones # Date 1380594685 21600 # Node ID 3f891f7fe817f04fa3c9b8de8304d0a8a0833dfb # Parent 7b5586a5e109dac32bf5ffd46c7c959bc79535cd Removed the weirdo logging facility. There's now a Debug variable which can be set which replaces the former debug log. NewClient() will return an error if something goes wrong setting up the connection. diff -r 7b5586a5e109 -r 3f891f7fe817 TODO.txt --- a/TODO.txt Mon Sep 30 18:59:37 2013 -0600 +++ b/TODO.txt Mon Sep 30 20:31:25 2013 -0600 @@ -4,7 +4,3 @@ Add a Reconnect() function. Eliminate as many uses of Generic as possible. - -Get rid of logging. We're providing status updates. Include an error() -receiver function that lets internals report their errors for return -from NewClient(). diff -r 7b5586a5e109 -r 3f891f7fe817 example/interact.go --- a/example/interact.go Mon Sep 30 18:59:37 2013 -0600 +++ b/example/interact.go Mon Sep 30 20:31:25 2013 -0600 @@ -11,22 +11,8 @@ "strings" ) -type StdLogger struct { -} - -func (s *StdLogger) Log(v ...interface{}) { - log.Println(v...) -} - -func (s *StdLogger) Logf(fmt string, v ...interface{}) { - log.Printf(fmt, v...) -} - func init() { - logger := &StdLogger{} - // xmpp.Debug = logger - xmpp.Info = logger - xmpp.Warn = logger + // xmpp.Debug = true } // Demonstrate the API, and allow the user to interact with an XMPP diff -r 7b5586a5e109 -r 3f891f7fe817 xmpp/layer1.go --- a/xmpp/layer1.go Mon Sep 30 18:59:37 2013 -0600 +++ b/xmpp/layer1.go Mon Sep 30 20:31:25 2013 -0600 @@ -5,11 +5,16 @@ import ( "crypto/tls" + "fmt" "io" + "log" "net" "time" ) +// If enabled, print all sent and received XML. +var Debug = false + var l1interval = time.Second type layer1 struct { @@ -18,15 +23,15 @@ sendSocks chan net.Conn } -func startLayer1(sock net.Conn, recvWriter io.WriteCloser, +func (cl *Client) startLayer1(sock net.Conn, recvWriter io.WriteCloser, sendReader io.ReadCloser, status <-chan Status) *layer1 { l1 := layer1{sock: sock} recvSocks := make(chan net.Conn) l1.recvSocks = recvSocks sendSocks := make(chan net.Conn, 1) l1.sendSocks = sendSocks - go recvTransport(recvSocks, recvWriter, status) - go sendTransport(sendSocks, sendReader) + go cl.recvTransport(recvSocks, recvWriter, status) + go cl.sendTransport(sendSocks, sendReader) recvSocks <- sock sendSocks <- sock return &l1 @@ -50,7 +55,7 @@ l1.recvSocks <- l1.sock } -func recvTransport(socks <-chan net.Conn, w io.WriteCloser, +func (cl *Client) recvTransport(socks <-chan net.Conn, w io.WriteCloser, status <-chan Status) { defer w.Close() @@ -59,7 +64,7 @@ for { select { case stat := <-status: - if stat == StatusShutdown { + if stat.fatal() { return } @@ -78,27 +83,33 @@ continue } } - Warn.Logf("recvTransport: %s", err) + cl.setError(fmt.Errorf("recv: %v", err)) return } + if Debug { + log.Printf("recv: %s", p[:nr]) + } nw, err := w.Write(p[:nr]) if nw < nr { - Warn.Logf("recvTransport: %s", err) + cl.setError(fmt.Errorf("recv: %v", err)) return } } } } -func sendTransport(socks <-chan net.Conn, r io.Reader) { +func (cl *Client) sendTransport(socks <-chan net.Conn, r io.Reader) { var sock net.Conn p := make([]byte, 1024) for { nr, err := r.Read(p) if nr == 0 { - Warn.Logf("sendTransport: %s", err) + cl.setError(fmt.Errorf("send: %v", err)) break } + if nr > 0 && Debug { + log.Printf("send: %s", p[:nr]) + } for nr > 0 { select { case sock = <-socks: @@ -114,7 +125,7 @@ nw, err := sock.Write(p[:nr]) nr -= nw if nr != 0 { - Warn.Logf("write: %s", err) + cl.setError(fmt.Errorf("send: %v", err)) break } } diff -r 7b5586a5e109 -r 3f891f7fe817 xmpp/layer2.go --- a/xmpp/layer2.go Mon Sep 30 18:59:37 2013 -0600 +++ b/xmpp/layer2.go Mon Sep 30 20:31:25 2013 -0600 @@ -7,19 +7,16 @@ "encoding/xml" "fmt" "io" + "log" "reflect" "strings" ) // Read bytes from a reader, unmarshal them as XML into structures of // the appropriate type, and send those structures on a channel. -func recvXml(r io.Reader, ch chan<- interface{}, +func (cl *Client) recvXml(r io.Reader, ch chan<- interface{}, extStanza map[xml.Name]reflect.Type) { - if _, ok := Debug.(*noLog); !ok { - pr, pw := io.Pipe() - go tee(r, pw, "S: ") - r = pr - } + defer close(ch) // This trick loads our namespaces into the parser. @@ -35,7 +32,7 @@ t, err := p.Token() if t == nil { if err != io.EOF { - Warn.Logf("read: %s", err) + cl.setError(fmt.Errorf("recv: %v", err)) } break } @@ -51,7 +48,7 @@ case NsStream + " stream": st, err := parseStream(se) if err != nil { - Warn.Logf("unmarshal stream: %s", err) + cl.setError(fmt.Errorf("recv: %v", err)) break Loop } ch <- st @@ -73,14 +70,16 @@ obj = &Presence{} default: obj = &Generic{} - Info.Logf("Ignoring unrecognized: %s %s", se.Name.Space, - se.Name.Local) + if Debug { + log.Printf("Ignoring unrecognized: %s %s", + se.Name.Space, se.Name.Local) + } } // Read the complete XML stanza. err = p.DecodeElement(obj, &se) if err != nil { - Warn.Logf("unmarshal: %s", err) + cl.setError(fmt.Errorf("recv: %v", err)) break Loop } @@ -90,7 +89,7 @@ if st, ok := obj.(Stanza); ok { err = parseExtended(st.GetHeader(), extStanza) if err != nil { - Warn.Logf("ext unmarshal: %s", err) + cl.setError(fmt.Errorf("recv: %v", err)) break Loop } } @@ -133,12 +132,7 @@ // Receive structures on a channel, marshal them to XML, and send the // bytes on a writer. -func sendXml(w io.Writer, ch <-chan interface{}) { - if _, ok := Debug.(*noLog); !ok { - pr, pw := io.Pipe() - go tee(pr, w, "C: ") - w = pw - } +func (cl *Client) sendXml(w io.Writer, ch <-chan interface{}) { defer func(w io.Writer) { if c, ok := w.(io.Closer); ok { c.Close() @@ -151,12 +145,13 @@ if st, ok := obj.(*stream); ok { _, err := w.Write([]byte(st.String())) if err != nil { - Warn.Logf("write: %s", err) + cl.setError(fmt.Errorf("send: %v", err)) + break } } else { err := enc.Encode(obj) if err != nil { - Warn.Logf("marshal: %s", err) + cl.setError(fmt.Errorf("send: %v", err)) break } } diff -r 7b5586a5e109 -r 3f891f7fe817 xmpp/layer3.go --- a/xmpp/layer3.go Mon Sep 30 18:59:37 2013 -0600 +++ b/xmpp/layer3.go Mon Sep 30 20:31:25 2013 -0600 @@ -5,6 +5,8 @@ import ( "encoding/xml" + "fmt" + "log" ) // Callback to handle a stanza with a particular id. @@ -41,7 +43,9 @@ return } if x == nil { - Info.Log("Refusing to send nil stanza") + if Debug { + log.Println("Won't send nil stanza") + } continue } sendXml <- x @@ -77,7 +81,8 @@ case *stream: // Do nothing. case *streamError: - cl.handleStreamError(obj) + cl.setError(fmt.Errorf("%#v", obj)) + return case *Features: cl.handleFeatures(obj) case *starttls: @@ -95,23 +100,21 @@ sendXmpp <- obj } default: - Warn.Logf("Unhandled non-stanza: %T %#v", x, x) + if Debug { + log.Printf("Unrecognized input: %T %#v", + x, x) + } } } } } -func (cl *Client) handleStreamError(se *streamError) { - Info.Logf("Received stream error: %v", se) - cl.setStatus(StatusShutdown) -} - func (cl *Client) handleFeatures(fe *Features) { cl.Features = fe if fe.Starttls != nil { start := &starttls{XMLName: xml.Name{Space: NsTLS, Local: "starttls"}} - cl.sendXml <- start + cl.sendRaw <- start return } @@ -133,7 +136,7 @@ // Now re-send the initial handshake message to start the new // session. - cl.sendXml <- &stream{To: cl.Jid.Domain, Version: XMPPVersion} + cl.sendRaw <- &stream{To: cl.Jid.Domain, Version: XMPPVersion} } // Send a request to bind a resource. RFC 3920, section 7. @@ -148,10 +151,13 @@ f := func(st Stanza) { iq, ok := st.(*Iq) if !ok { - Warn.Log("non-iq response") + cl.setError(fmt.Errorf("non-iq response to bind %#v", + st)) + return } if iq.Type == "error" { - Warn.Log("Resource binding failed") + cl.setError(fmt.Errorf("Resource binding failed")) + return } var bindRepl *bindIq for _, ele := range iq.Nested { @@ -161,22 +167,26 @@ } } if bindRepl == nil { - Warn.Logf("Bad bind reply: %#v", iq) + cl.setError(fmt.Errorf("Bad bind reply: %#v", iq)) + return } jidStr := bindRepl.Jid if jidStr == nil || *jidStr == "" { - Warn.Log("Can't bind empty resource") + cl.setError(fmt.Errorf("empty resource in bind %#v", + iq)) + return } jid := new(JID) if err := jid.Set(*jidStr); err != nil { - Warn.Logf("Can't parse JID %s: %s", *jidStr, err) + cl.setError(fmt.Errorf("bind: an't parse JID %s: %v", + *jidStr, err)) + return } cl.Jid = *jid - Info.Logf("Bound resource: %s", cl.Jid.String()) cl.setStatus(StatusBound) } cl.SetCallback(msg.Id, f) - cl.sendXml <- msg + cl.sendRaw <- msg } // Register a callback to handle the next XMPP stanza (iq, message, or diff -r 7b5586a5e109 -r 3f891f7fe817 xmpp/log.go --- a/xmpp/log.go Mon Sep 30 18:59:37 2013 -0600 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,33 +0,0 @@ -// Control over logging from the XMPP library. - -package xmpp - -var ( - // If any of these are non-nil when NewClient() is called, - // they will be used to log messages of the indicated - // severity. - Warn Logger = &noLog{} - Info Logger = &noLog{} - Debug Logger = &noLog{} -) - -// Anything implementing Logger can receive log messages from the XMPP -// library. The default implementation doesn't log anything; it -// efficiently discards all messages. -type Logger interface { - Log(v ...interface{}) - Logf(fmt string, v ...interface{}) -} - -type noLog struct { - flags int - prefix string -} - -var _ Logger = &noLog{} - -func (l *noLog) Log(v ...interface{}) { -} - -func (l *noLog) Logf(fmt string, v ...interface{}) { -} diff -r 7b5586a5e109 -r 3f891f7fe817 xmpp/sasl.go --- a/xmpp/sasl.go Mon Sep 30 18:59:37 2013 -0600 +++ b/xmpp/sasl.go Mon Sep 30 20:31:25 2013 -0600 @@ -28,7 +28,7 @@ if digestMd5 { auth := &auth{XMLName: xml.Name{Space: NsSASL, Local: "auth"}, Mechanism: "DIGEST-MD5"} - cl.sendXml <- auth + cl.sendRaw <- auth } } @@ -39,7 +39,7 @@ b64 := base64.StdEncoding str, err := b64.DecodeString(srv.Chardata) if err != nil { - Warn.Logf("SASL challenge decode: %s", err) + cl.setError(fmt.Errorf("SASL: %v", err)) return } srvMap := parseSasl(string(str)) @@ -50,13 +50,12 @@ cl.saslDigest2(srvMap) } case "failure": - Info.Log("SASL authentication failed") + cl.setError(fmt.Errorf("SASL authentication failed")) case "success": cl.setStatus(StatusAuthenticated) - Info.Log("Sasl authentication succeeded") cl.Features = nil ss := &stream{To: cl.Jid.Domain, Version: XMPPVersion} - cl.sendXml <- ss + cl.sendRaw <- ss } } @@ -69,7 +68,7 @@ } } if !hasAuth { - Warn.Log("Server doesn't support SASL auth") + cl.setError(fmt.Errorf("Server doesn't support SASL auth")) return } @@ -99,7 +98,7 @@ randSize.Lsh(big.NewInt(1), 64) cnonce, err := rand.Int(rand.Reader, randSize) if err != nil { - Warn.Logf("SASL rand: %s", err) + cl.setError(fmt.Errorf("SASL rand: %v", err)) return } cnonceStr := fmt.Sprintf("%016x", cnonce) @@ -131,17 +130,17 @@ b64 := base64.StdEncoding clObj := &auth{XMLName: xml.Name{Space: NsSASL, Local: "response"}, Chardata: b64.EncodeToString([]byte(clStr))} - cl.sendXml <- clObj + cl.sendRaw <- clObj } func (cl *Client) saslDigest2(srvMap map[string]string) { if cl.saslExpected == srvMap["rspauth"] { clObj := &auth{XMLName: xml.Name{Space: NsSASL, Local: "response"}} - cl.sendXml <- clObj + cl.sendRaw <- clObj } else { clObj := &auth{XMLName: xml.Name{Space: NsSASL, Local: "failure"}, Any: &Generic{XMLName: xml.Name{Space: NsSASL, Local: "abort"}}} - cl.sendXml <- clObj + cl.sendRaw <- clObj } } diff -r 7b5586a5e109 -r 3f891f7fe817 xmpp/status.go --- a/xmpp/status.go Mon Sep 30 18:59:37 2013 -0600 +++ b/xmpp/status.go Mon Sep 30 20:31:25 2013 -0600 @@ -90,7 +90,7 @@ if current == waitFor { return nil } - if current == StatusShutdown { + if current.fatal() { break } if current > waitFor { diff -r 7b5586a5e109 -r 3f891f7fe817 xmpp/structs.go --- a/xmpp/structs.go Mon Sep 30 18:59:37 2013 -0600 +++ b/xmpp/structs.go Mon Sep 30 20:31:25 2013 -0600 @@ -7,13 +7,15 @@ "encoding/xml" "flag" "fmt" - // BUG(cjyar): Doesn't use stringprep. Could try the implementation at - // "code.google.com/p/go-idn/src/stringprep" + "log" "reflect" "regexp" "strings" ) +// BUG(cjyar): Doesn't use stringprep. Could try the implementation at +// "code.google.com/p/go-idn/src/stringprep" + // JID represents an entity that can communicate with other // entities. It looks like node@domain/resource. Node and resource are // sometimes optional. @@ -260,7 +262,7 @@ func (er *Error) Error() string { buf, err := xml.Marshal(er) if err != nil { - Warn.Log("double bad error: couldn't marshal error") + log.Println("double bad error: couldn't marshal error") return "unreadable error" } return string(buf) diff -r 7b5586a5e109 -r 3f891f7fe817 xmpp/structs_test.go --- a/xmpp/structs_test.go Mon Sep 30 18:59:37 2013 -0600 +++ b/xmpp/structs_test.go Mon Sep 30 20:31:25 2013 -0600 @@ -118,7 +118,8 @@ str := `foo!` r := strings.NewReader(str) ch := make(chan interface{}) - go recvXml(r, ch, make(map[xml.Name]reflect.Type)) + cl := &Client{} + go cl.recvXml(r, ch, make(map[xml.Name]reflect.Type)) obs := <-ch exp := &Message{XMLName: xml.Name{Local: "message", Space: "jabber:client"}, Header: Header{To: "a@b.c", Innerxml: "foo!"}, diff -r 7b5586a5e109 -r 3f891f7fe817 xmpp/xmpp.go --- a/xmpp/xmpp.go Mon Sep 30 18:59:37 2013 -0600 +++ b/xmpp/xmpp.go Mon Sep 30 20:31:25 2013 -0600 @@ -7,10 +7,8 @@ package xmpp import ( - "bytes" "crypto/tls" "encoding/xml" - "errors" "fmt" "io" "net" @@ -47,6 +45,7 @@ statusBound statusRunning statusShutdown + statusError ) var ( @@ -66,8 +65,20 @@ StatusRunning Status = statusRunning // The session has closed, or is in the process of closing. StatusShutdown Status = statusShutdown + // The session has encountered an error. Otherwise identical + // to StatusShutdown. + StatusError Status = statusError ) +func (s Status) fatal() bool { + switch s { + default: + return false + case StatusShutdown, StatusError: + return true + } +} + // A filter can modify the XMPP traffic to or from the remote // server. It's part of an Extension. The filter function will be // called in a new goroutine, so it doesn't need to return. The filter @@ -102,7 +113,7 @@ // Outgoing XMPP stanzas to the server should be sent to this // channel. Send chan<- Stanza - sendXml chan<- interface{} + sendRaw chan<- interface{} statmgr *statmgr // The client's roster is also known as the buddy list. It's // the set of contacts which are known to this JID, or which @@ -113,6 +124,7 @@ sendFilterAdd, recvFilterAdd chan Filter tlsConfig tls.Config layer1 *layer1 + error chan error } // Creates an XMPP client identified by the given JID, authenticating @@ -136,6 +148,7 @@ cl.sendFilterAdd = make(chan Filter) cl.recvFilterAdd = make(chan Filter) cl.statmgr = newStatmgr(status) + cl.error = make(chan error, 1) extStanza := make(map[xml.Name]reflect.Type) for _, ext := range exts { @@ -180,15 +193,15 @@ // Start the transport handler, initially unencrypted. recvReader, recvWriter := io.Pipe() sendReader, sendWriter := io.Pipe() - cl.layer1 = startLayer1(tcp, recvWriter, sendReader, + cl.layer1 = cl.startLayer1(tcp, recvWriter, sendReader, cl.statmgr.newListener()) // Start the reader and writer that convert to and from XML. recvXmlCh := make(chan interface{}) - go recvXml(recvReader, recvXmlCh, extStanza) + go cl.recvXml(recvReader, recvXmlCh, extStanza) sendXmlCh := make(chan interface{}) - cl.sendXml = sendXmlCh - go sendXml(sendWriter, sendXmlCh) + cl.sendRaw = sendXmlCh + go cl.sendXml(sendWriter, sendXmlCh) // Start the reader and writer that convert between XML and // XMPP stanzas. @@ -213,11 +226,11 @@ // Initial handshake. hsOut := &stream{To: jid.Domain, Version: XMPPVersion} - cl.sendXml <- hsOut + cl.sendRaw <- hsOut // Wait until resource binding is complete. if err := cl.statmgr.awaitStatus(StatusBound); err != nil { - return nil, err + return nil, cl.getError(err) } // Forget about the password, for paranoia's sake. @@ -231,20 +244,18 @@ f := func(st Stanza) { iq, ok := st.(*Iq) if !ok { - Warn.Log("iq reply not iq; can't start session") - ch <- errors.New("bad session start reply") + ch <- fmt.Errorf("bad session start reply: %#v", st) } if iq.Type == "error" { - Warn.Logf("Can't start session: %v", iq) - ch <- iq.Error + ch <- fmt.Errorf("Can't start session: %v", iq.Error) } ch <- nil } cl.SetCallback(id, f) - cl.sendXml <- iq + cl.sendRaw <- iq // Now wait until the callback is called. if err := <-ch; err != nil { - return nil, err + return nil, cl.getError(err) } // This allows the client to receive stanzas. @@ -256,37 +267,7 @@ // Send the initial presence. cl.Send <- &pr - return cl, nil -} - -func tee(r io.Reader, w io.Writer, prefix string) { - defer func(w io.Writer) { - if c, ok := w.(io.Closer); ok { - c.Close() - } - }(w) - - buf := bytes.NewBuffer([]uint8(prefix)) - for { - var c [1]byte - n, _ := r.Read(c[:]) - if n == 0 { - break - } - n, _ = w.Write(c[:n]) - if n == 0 { - break - } - buf.Write(c[:n]) - if c[0] == '\n' || c[0] == '>' { - Debug.Log(buf) - buf = bytes.NewBuffer([]uint8(prefix)) - } - } - leftover := buf.String() - if leftover != "" { - Debug.Log(buf) - } + return cl, cl.getError(nil) } func (cl *Client) Close() { @@ -295,3 +276,34 @@ // Shuts down the senders: close(cl.Send) } + +// If there's a buffered error in the channel, return it. Otherwise, +// return what was passed to us. The idea is that the error in the +// channel probably preceded (and caused) the one that's passed as an +// argument here. +func (cl *Client) getError(err1 error) error { + select { + case err0 := <-cl.error: + return err0 + default: + return err1 + } +} + +// Register an error that happened in the internals somewhere. If +// there's already an error in the channel, discard the newer one in +// favor of the older. +func (cl *Client) setError(err error) { + cl.Close() + cl.setStatus(StatusError) + if len(cl.error) > 0 { + return + } + // If we're in a race between two calls to this function, + // trying to set the "first" error, just arbitrarily let one + // of them win. + select { + case cl.error <- err: + default: + } +} diff -r 7b5586a5e109 -r 3f891f7fe817 xmpp/xmpp_test.go --- a/xmpp/xmpp_test.go Mon Sep 30 18:59:37 2013 -0600 +++ b/xmpp/xmpp_test.go Mon Sep 30 20:31:25 2013 -0600 @@ -13,7 +13,8 @@ r := strings.NewReader(`` + ``) ch := make(chan interface{}) - go recvXml(r, ch, make(map[xml.Name]reflect.Type)) + cl := &Client{} + go cl.recvXml(r, ch, make(map[xml.Name]reflect.Type)) x := <-ch se, ok := x.(*streamError) if !ok { @@ -29,7 +30,7 @@ `Error text`) ch = make(chan interface{}) - go recvXml(r, ch, make(map[xml.Name]reflect.Type)) + go cl.recvXml(r, ch, make(map[xml.Name]reflect.Type)) x = <-ch se, ok = x.(*streamError) if !ok { @@ -47,7 +48,8 @@ `xmlns="` + NsClient + `" xmlns:stream="` + NsStream + `" version="1.0">`) ch := make(chan interface{}) - go recvXml(r, ch, make(map[xml.Name]reflect.Type)) + cl := &Client{} + go cl.recvXml(r, ch, make(map[xml.Name]reflect.Type)) x := <-ch ss, ok := x.(*stream) if !ok { @@ -64,9 +66,10 @@ ch := make(chan interface{}) var wg sync.WaitGroup wg.Add(1) + cl := &Client{} go func() { defer wg.Done() - sendXml(w, ch) + cl.sendXml(w, ch) }() ch <- obj close(ch)