xmpp/xmpp.go
changeset 163 3f891f7fe817
parent 162 7b5586a5e109
child 164 6b527647086c
--- a/xmpp/xmpp.go	Mon Sep 30 18:59:37 2013 -0600
+++ b/xmpp/xmpp.go	Mon Sep 30 20:31:25 2013 -0600
@@ -7,10 +7,8 @@
 package xmpp
 
 import (
-	"bytes"
 	"crypto/tls"
 	"encoding/xml"
-	"errors"
 	"fmt"
 	"io"
 	"net"
@@ -47,6 +45,7 @@
 	statusBound
 	statusRunning
 	statusShutdown
+	statusError
 )
 
 var (
@@ -66,8 +65,20 @@
 	StatusRunning Status = statusRunning
 	// The session has closed, or is in the process of closing.
 	StatusShutdown Status = statusShutdown
+	// The session has encountered an error. Otherwise identical
+	// to StatusShutdown.
+	StatusError Status = statusError
 )
 
+func (s Status) fatal() bool {
+	switch s {
+	default:
+		return false
+	case StatusShutdown, StatusError:
+		return true
+	}
+}
+
 // A filter can modify the XMPP traffic to or from the remote
 // server. It's part of an Extension. The filter function will be
 // called in a new goroutine, so it doesn't need to return. The filter
@@ -102,7 +113,7 @@
 	// Outgoing XMPP stanzas to the server should be sent to this
 	// channel.
 	Send    chan<- Stanza
-	sendXml chan<- interface{}
+	sendRaw chan<- interface{}
 	statmgr *statmgr
 	// The client's roster is also known as the buddy list. It's
 	// the set of contacts which are known to this JID, or which
@@ -113,6 +124,7 @@
 	sendFilterAdd, recvFilterAdd chan Filter
 	tlsConfig                    tls.Config
 	layer1                       *layer1
+	error                        chan error
 }
 
 // Creates an XMPP client identified by the given JID, authenticating
@@ -136,6 +148,7 @@
 	cl.sendFilterAdd = make(chan Filter)
 	cl.recvFilterAdd = make(chan Filter)
 	cl.statmgr = newStatmgr(status)
+	cl.error = make(chan error, 1)
 
 	extStanza := make(map[xml.Name]reflect.Type)
 	for _, ext := range exts {
@@ -180,15 +193,15 @@
 	// Start the transport handler, initially unencrypted.
 	recvReader, recvWriter := io.Pipe()
 	sendReader, sendWriter := io.Pipe()
-	cl.layer1 = startLayer1(tcp, recvWriter, sendReader,
+	cl.layer1 = cl.startLayer1(tcp, recvWriter, sendReader,
 		cl.statmgr.newListener())
 
 	// Start the reader and writer that convert to and from XML.
 	recvXmlCh := make(chan interface{})
-	go recvXml(recvReader, recvXmlCh, extStanza)
+	go cl.recvXml(recvReader, recvXmlCh, extStanza)
 	sendXmlCh := make(chan interface{})
-	cl.sendXml = sendXmlCh
-	go sendXml(sendWriter, sendXmlCh)
+	cl.sendRaw = sendXmlCh
+	go cl.sendXml(sendWriter, sendXmlCh)
 
 	// Start the reader and writer that convert between XML and
 	// XMPP stanzas.
@@ -213,11 +226,11 @@
 
 	// Initial handshake.
 	hsOut := &stream{To: jid.Domain, Version: XMPPVersion}
-	cl.sendXml <- hsOut
+	cl.sendRaw <- hsOut
 
 	// Wait until resource binding is complete.
 	if err := cl.statmgr.awaitStatus(StatusBound); err != nil {
-		return nil, err
+		return nil, cl.getError(err)
 	}
 
 	// Forget about the password, for paranoia's sake.
@@ -231,20 +244,18 @@
 	f := func(st Stanza) {
 		iq, ok := st.(*Iq)
 		if !ok {
-			Warn.Log("iq reply not iq; can't start session")
-			ch <- errors.New("bad session start reply")
+			ch <- fmt.Errorf("bad session start reply: %#v", st)
 		}
 		if iq.Type == "error" {
-			Warn.Logf("Can't start session: %v", iq)
-			ch <- iq.Error
+			ch <- fmt.Errorf("Can't start session: %v", iq.Error)
 		}
 		ch <- nil
 	}
 	cl.SetCallback(id, f)
-	cl.sendXml <- iq
+	cl.sendRaw <- iq
 	// Now wait until the callback is called.
 	if err := <-ch; err != nil {
-		return nil, err
+		return nil, cl.getError(err)
 	}
 
 	// This allows the client to receive stanzas.
@@ -256,37 +267,7 @@
 	// Send the initial presence.
 	cl.Send <- &pr
 
-	return cl, nil
-}
-
-func tee(r io.Reader, w io.Writer, prefix string) {
-	defer func(w io.Writer) {
-		if c, ok := w.(io.Closer); ok {
-			c.Close()
-		}
-	}(w)
-
-	buf := bytes.NewBuffer([]uint8(prefix))
-	for {
-		var c [1]byte
-		n, _ := r.Read(c[:])
-		if n == 0 {
-			break
-		}
-		n, _ = w.Write(c[:n])
-		if n == 0 {
-			break
-		}
-		buf.Write(c[:n])
-		if c[0] == '\n' || c[0] == '>' {
-			Debug.Log(buf)
-			buf = bytes.NewBuffer([]uint8(prefix))
-		}
-	}
-	leftover := buf.String()
-	if leftover != "" {
-		Debug.Log(buf)
-	}
+	return cl, cl.getError(nil)
 }
 
 func (cl *Client) Close() {
@@ -295,3 +276,34 @@
 	// Shuts down the senders:
 	close(cl.Send)
 }
+
+// If there's a buffered error in the channel, return it. Otherwise,
+// return what was passed to us. The idea is that the error in the
+// channel probably preceded (and caused) the one that's passed as an
+// argument here.
+func (cl *Client) getError(err1 error) error {
+	select {
+	case err0 := <-cl.error:
+		return err0
+	default:
+		return err1
+	}
+}
+
+// Register an error that happened in the internals somewhere. If
+// there's already an error in the channel, discard the newer one in
+// favor of the older.
+func (cl *Client) setError(err error) {
+	cl.Close()
+	cl.setStatus(StatusError)
+	if len(cl.error) > 0 {
+		return
+	}
+	// If we're in a race between two calls to this function,
+	// trying to set the "first" error, just arbitrarily let one
+	// of them win.
+	select {
+	case cl.error <- err:
+	default:
+	}
+}