--- a/xmpp/xmpp.go Mon Sep 30 18:59:37 2013 -0600
+++ b/xmpp/xmpp.go Mon Sep 30 20:31:25 2013 -0600
@@ -7,10 +7,8 @@
package xmpp
import (
- "bytes"
"crypto/tls"
"encoding/xml"
- "errors"
"fmt"
"io"
"net"
@@ -47,6 +45,7 @@
statusBound
statusRunning
statusShutdown
+ statusError
)
var (
@@ -66,8 +65,20 @@
StatusRunning Status = statusRunning
// The session has closed, or is in the process of closing.
StatusShutdown Status = statusShutdown
+ // The session has encountered an error. Otherwise identical
+ // to StatusShutdown.
+ StatusError Status = statusError
)
+func (s Status) fatal() bool {
+ switch s {
+ default:
+ return false
+ case StatusShutdown, StatusError:
+ return true
+ }
+}
+
// A filter can modify the XMPP traffic to or from the remote
// server. It's part of an Extension. The filter function will be
// called in a new goroutine, so it doesn't need to return. The filter
@@ -102,7 +113,7 @@
// Outgoing XMPP stanzas to the server should be sent to this
// channel.
Send chan<- Stanza
- sendXml chan<- interface{}
+ sendRaw chan<- interface{}
statmgr *statmgr
// The client's roster is also known as the buddy list. It's
// the set of contacts which are known to this JID, or which
@@ -113,6 +124,7 @@
sendFilterAdd, recvFilterAdd chan Filter
tlsConfig tls.Config
layer1 *layer1
+ error chan error
}
// Creates an XMPP client identified by the given JID, authenticating
@@ -136,6 +148,7 @@
cl.sendFilterAdd = make(chan Filter)
cl.recvFilterAdd = make(chan Filter)
cl.statmgr = newStatmgr(status)
+ cl.error = make(chan error, 1)
extStanza := make(map[xml.Name]reflect.Type)
for _, ext := range exts {
@@ -180,15 +193,15 @@
// Start the transport handler, initially unencrypted.
recvReader, recvWriter := io.Pipe()
sendReader, sendWriter := io.Pipe()
- cl.layer1 = startLayer1(tcp, recvWriter, sendReader,
+ cl.layer1 = cl.startLayer1(tcp, recvWriter, sendReader,
cl.statmgr.newListener())
// Start the reader and writer that convert to and from XML.
recvXmlCh := make(chan interface{})
- go recvXml(recvReader, recvXmlCh, extStanza)
+ go cl.recvXml(recvReader, recvXmlCh, extStanza)
sendXmlCh := make(chan interface{})
- cl.sendXml = sendXmlCh
- go sendXml(sendWriter, sendXmlCh)
+ cl.sendRaw = sendXmlCh
+ go cl.sendXml(sendWriter, sendXmlCh)
// Start the reader and writer that convert between XML and
// XMPP stanzas.
@@ -213,11 +226,11 @@
// Initial handshake.
hsOut := &stream{To: jid.Domain, Version: XMPPVersion}
- cl.sendXml <- hsOut
+ cl.sendRaw <- hsOut
// Wait until resource binding is complete.
if err := cl.statmgr.awaitStatus(StatusBound); err != nil {
- return nil, err
+ return nil, cl.getError(err)
}
// Forget about the password, for paranoia's sake.
@@ -231,20 +244,18 @@
f := func(st Stanza) {
iq, ok := st.(*Iq)
if !ok {
- Warn.Log("iq reply not iq; can't start session")
- ch <- errors.New("bad session start reply")
+ ch <- fmt.Errorf("bad session start reply: %#v", st)
}
if iq.Type == "error" {
- Warn.Logf("Can't start session: %v", iq)
- ch <- iq.Error
+ ch <- fmt.Errorf("Can't start session: %v", iq.Error)
}
ch <- nil
}
cl.SetCallback(id, f)
- cl.sendXml <- iq
+ cl.sendRaw <- iq
// Now wait until the callback is called.
if err := <-ch; err != nil {
- return nil, err
+ return nil, cl.getError(err)
}
// This allows the client to receive stanzas.
@@ -256,37 +267,7 @@
// Send the initial presence.
cl.Send <- &pr
- return cl, nil
-}
-
-func tee(r io.Reader, w io.Writer, prefix string) {
- defer func(w io.Writer) {
- if c, ok := w.(io.Closer); ok {
- c.Close()
- }
- }(w)
-
- buf := bytes.NewBuffer([]uint8(prefix))
- for {
- var c [1]byte
- n, _ := r.Read(c[:])
- if n == 0 {
- break
- }
- n, _ = w.Write(c[:n])
- if n == 0 {
- break
- }
- buf.Write(c[:n])
- if c[0] == '\n' || c[0] == '>' {
- Debug.Log(buf)
- buf = bytes.NewBuffer([]uint8(prefix))
- }
- }
- leftover := buf.String()
- if leftover != "" {
- Debug.Log(buf)
- }
+ return cl, cl.getError(nil)
}
func (cl *Client) Close() {
@@ -295,3 +276,34 @@
// Shuts down the senders:
close(cl.Send)
}
+
+// If there's a buffered error in the channel, return it. Otherwise,
+// return what was passed to us. The idea is that the error in the
+// channel probably preceded (and caused) the one that's passed as an
+// argument here.
+func (cl *Client) getError(err1 error) error {
+ select {
+ case err0 := <-cl.error:
+ return err0
+ default:
+ return err1
+ }
+}
+
+// Register an error that happened in the internals somewhere. If
+// there's already an error in the channel, discard the newer one in
+// favor of the older.
+func (cl *Client) setError(err error) {
+ cl.Close()
+ cl.setStatus(StatusError)
+ if len(cl.error) > 0 {
+ return
+ }
+ // If we're in a race between two calls to this function,
+ // trying to set the "first" error, just arbitrarily let one
+ // of them win.
+ select {
+ case cl.error <- err:
+ default:
+ }
+}