xmpp/xmpp.go
changeset 163 3f891f7fe817
parent 162 7b5586a5e109
child 164 6b527647086c
equal deleted inserted replaced
162:7b5586a5e109 163:3f891f7fe817
     5 // sends structures representing XMPP stanzas. Additional stanza
     5 // sends structures representing XMPP stanzas. Additional stanza
     6 // parsers can be inserted into the stack of layers as extensions.
     6 // parsers can be inserted into the stack of layers as extensions.
     7 package xmpp
     7 package xmpp
     8 
     8 
     9 import (
     9 import (
    10 	"bytes"
       
    11 	"crypto/tls"
    10 	"crypto/tls"
    12 	"encoding/xml"
    11 	"encoding/xml"
    13 	"errors"
       
    14 	"fmt"
    12 	"fmt"
    15 	"io"
    13 	"io"
    16 	"net"
    14 	"net"
    17 	"reflect"
    15 	"reflect"
    18 )
    16 )
    45 	statusConnectedTls
    43 	statusConnectedTls
    46 	statusAuthenticated
    44 	statusAuthenticated
    47 	statusBound
    45 	statusBound
    48 	statusRunning
    46 	statusRunning
    49 	statusShutdown
    47 	statusShutdown
       
    48 	statusError
    50 )
    49 )
    51 
    50 
    52 var (
    51 var (
    53 	// The client has not yet connected, or it has been
    52 	// The client has not yet connected, or it has been
    54 	// disconnected from the server.
    53 	// disconnected from the server.
    64 	// Session has started and normal message traffic can be sent
    63 	// Session has started and normal message traffic can be sent
    65 	// and received.
    64 	// and received.
    66 	StatusRunning Status = statusRunning
    65 	StatusRunning Status = statusRunning
    67 	// The session has closed, or is in the process of closing.
    66 	// The session has closed, or is in the process of closing.
    68 	StatusShutdown Status = statusShutdown
    67 	StatusShutdown Status = statusShutdown
       
    68 	// The session has encountered an error. Otherwise identical
       
    69 	// to StatusShutdown.
       
    70 	StatusError Status = statusError
    69 )
    71 )
       
    72 
       
    73 func (s Status) fatal() bool {
       
    74 	switch s {
       
    75 	default:
       
    76 		return false
       
    77 	case StatusShutdown, StatusError:
       
    78 		return true
       
    79 	}
       
    80 }
    70 
    81 
    71 // A filter can modify the XMPP traffic to or from the remote
    82 // A filter can modify the XMPP traffic to or from the remote
    72 // server. It's part of an Extension. The filter function will be
    83 // server. It's part of an Extension. The filter function will be
    73 // called in a new goroutine, so it doesn't need to return. The filter
    84 // called in a new goroutine, so it doesn't need to return. The filter
    74 // should close its output when its input is closed.
    85 // should close its output when its input is closed.
   100 	// set up the XMPP stream will not appear here.
   111 	// set up the XMPP stream will not appear here.
   101 	Recv <-chan Stanza
   112 	Recv <-chan Stanza
   102 	// Outgoing XMPP stanzas to the server should be sent to this
   113 	// Outgoing XMPP stanzas to the server should be sent to this
   103 	// channel.
   114 	// channel.
   104 	Send    chan<- Stanza
   115 	Send    chan<- Stanza
   105 	sendXml chan<- interface{}
   116 	sendRaw chan<- interface{}
   106 	statmgr *statmgr
   117 	statmgr *statmgr
   107 	// The client's roster is also known as the buddy list. It's
   118 	// The client's roster is also known as the buddy list. It's
   108 	// the set of contacts which are known to this JID, or which
   119 	// the set of contacts which are known to this JID, or which
   109 	// this JID is known to.
   120 	// this JID is known to.
   110 	Roster Roster
   121 	Roster Roster
   111 	// Features advertised by the remote.
   122 	// Features advertised by the remote.
   112 	Features                     *Features
   123 	Features                     *Features
   113 	sendFilterAdd, recvFilterAdd chan Filter
   124 	sendFilterAdd, recvFilterAdd chan Filter
   114 	tlsConfig                    tls.Config
   125 	tlsConfig                    tls.Config
   115 	layer1                       *layer1
   126 	layer1                       *layer1
       
   127 	error                        chan error
   116 }
   128 }
   117 
   129 
   118 // Creates an XMPP client identified by the given JID, authenticating
   130 // Creates an XMPP client identified by the given JID, authenticating
   119 // with the provided password and TLS config. Zero or more extensions
   131 // with the provided password and TLS config. Zero or more extensions
   120 // may be specified. The initial presence will be broadcast. If status
   132 // may be specified. The initial presence will be broadcast. If status
   134 	cl.handlers = make(chan *callback, 100)
   146 	cl.handlers = make(chan *callback, 100)
   135 	cl.tlsConfig = tlsconf
   147 	cl.tlsConfig = tlsconf
   136 	cl.sendFilterAdd = make(chan Filter)
   148 	cl.sendFilterAdd = make(chan Filter)
   137 	cl.recvFilterAdd = make(chan Filter)
   149 	cl.recvFilterAdd = make(chan Filter)
   138 	cl.statmgr = newStatmgr(status)
   150 	cl.statmgr = newStatmgr(status)
       
   151 	cl.error = make(chan error, 1)
   139 
   152 
   140 	extStanza := make(map[xml.Name]reflect.Type)
   153 	extStanza := make(map[xml.Name]reflect.Type)
   141 	for _, ext := range exts {
   154 	for _, ext := range exts {
   142 		for k, v := range ext.StanzaTypes {
   155 		for k, v := range ext.StanzaTypes {
   143 			if _, ok := extStanza[k]; ok {
   156 			if _, ok := extStanza[k]; ok {
   178 	cl.setStatus(StatusConnected)
   191 	cl.setStatus(StatusConnected)
   179 
   192 
   180 	// Start the transport handler, initially unencrypted.
   193 	// Start the transport handler, initially unencrypted.
   181 	recvReader, recvWriter := io.Pipe()
   194 	recvReader, recvWriter := io.Pipe()
   182 	sendReader, sendWriter := io.Pipe()
   195 	sendReader, sendWriter := io.Pipe()
   183 	cl.layer1 = startLayer1(tcp, recvWriter, sendReader,
   196 	cl.layer1 = cl.startLayer1(tcp, recvWriter, sendReader,
   184 		cl.statmgr.newListener())
   197 		cl.statmgr.newListener())
   185 
   198 
   186 	// Start the reader and writer that convert to and from XML.
   199 	// Start the reader and writer that convert to and from XML.
   187 	recvXmlCh := make(chan interface{})
   200 	recvXmlCh := make(chan interface{})
   188 	go recvXml(recvReader, recvXmlCh, extStanza)
   201 	go cl.recvXml(recvReader, recvXmlCh, extStanza)
   189 	sendXmlCh := make(chan interface{})
   202 	sendXmlCh := make(chan interface{})
   190 	cl.sendXml = sendXmlCh
   203 	cl.sendRaw = sendXmlCh
   191 	go sendXml(sendWriter, sendXmlCh)
   204 	go cl.sendXml(sendWriter, sendXmlCh)
   192 
   205 
   193 	// Start the reader and writer that convert between XML and
   206 	// Start the reader and writer that convert between XML and
   194 	// XMPP stanzas.
   207 	// XMPP stanzas.
   195 	recvRawXmpp := make(chan Stanza)
   208 	recvRawXmpp := make(chan Stanza)
   196 	go cl.recvStream(recvXmlCh, recvRawXmpp, cl.statmgr.newListener())
   209 	go cl.recvStream(recvXmlCh, recvRawXmpp, cl.statmgr.newListener())
   211 		cl.AddSendFilter(ext.SendFilter)
   224 		cl.AddSendFilter(ext.SendFilter)
   212 	}
   225 	}
   213 
   226 
   214 	// Initial handshake.
   227 	// Initial handshake.
   215 	hsOut := &stream{To: jid.Domain, Version: XMPPVersion}
   228 	hsOut := &stream{To: jid.Domain, Version: XMPPVersion}
   216 	cl.sendXml <- hsOut
   229 	cl.sendRaw <- hsOut
   217 
   230 
   218 	// Wait until resource binding is complete.
   231 	// Wait until resource binding is complete.
   219 	if err := cl.statmgr.awaitStatus(StatusBound); err != nil {
   232 	if err := cl.statmgr.awaitStatus(StatusBound); err != nil {
   220 		return nil, err
   233 		return nil, cl.getError(err)
   221 	}
   234 	}
   222 
   235 
   223 	// Forget about the password, for paranoia's sake.
   236 	// Forget about the password, for paranoia's sake.
   224 	cl.password = ""
   237 	cl.password = ""
   225 
   238 
   229 		Nested: []interface{}{Generic{XMLName: xml.Name{Space: NsSession, Local: "session"}}}}}
   242 		Nested: []interface{}{Generic{XMLName: xml.Name{Space: NsSession, Local: "session"}}}}}
   230 	ch := make(chan error)
   243 	ch := make(chan error)
   231 	f := func(st Stanza) {
   244 	f := func(st Stanza) {
   232 		iq, ok := st.(*Iq)
   245 		iq, ok := st.(*Iq)
   233 		if !ok {
   246 		if !ok {
   234 			Warn.Log("iq reply not iq; can't start session")
   247 			ch <- fmt.Errorf("bad session start reply: %#v", st)
   235 			ch <- errors.New("bad session start reply")
       
   236 		}
   248 		}
   237 		if iq.Type == "error" {
   249 		if iq.Type == "error" {
   238 			Warn.Logf("Can't start session: %v", iq)
   250 			ch <- fmt.Errorf("Can't start session: %v", iq.Error)
   239 			ch <- iq.Error
       
   240 		}
   251 		}
   241 		ch <- nil
   252 		ch <- nil
   242 	}
   253 	}
   243 	cl.SetCallback(id, f)
   254 	cl.SetCallback(id, f)
   244 	cl.sendXml <- iq
   255 	cl.sendRaw <- iq
   245 	// Now wait until the callback is called.
   256 	// Now wait until the callback is called.
   246 	if err := <-ch; err != nil {
   257 	if err := <-ch; err != nil {
   247 		return nil, err
   258 		return nil, cl.getError(err)
   248 	}
   259 	}
   249 
   260 
   250 	// This allows the client to receive stanzas.
   261 	// This allows the client to receive stanzas.
   251 	cl.setStatus(StatusRunning)
   262 	cl.setStatus(StatusRunning)
   252 
   263 
   254 	cl.Roster.update()
   265 	cl.Roster.update()
   255 
   266 
   256 	// Send the initial presence.
   267 	// Send the initial presence.
   257 	cl.Send <- &pr
   268 	cl.Send <- &pr
   258 
   269 
   259 	return cl, nil
   270 	return cl, cl.getError(nil)
   260 }
       
   261 
       
   262 func tee(r io.Reader, w io.Writer, prefix string) {
       
   263 	defer func(w io.Writer) {
       
   264 		if c, ok := w.(io.Closer); ok {
       
   265 			c.Close()
       
   266 		}
       
   267 	}(w)
       
   268 
       
   269 	buf := bytes.NewBuffer([]uint8(prefix))
       
   270 	for {
       
   271 		var c [1]byte
       
   272 		n, _ := r.Read(c[:])
       
   273 		if n == 0 {
       
   274 			break
       
   275 		}
       
   276 		n, _ = w.Write(c[:n])
       
   277 		if n == 0 {
       
   278 			break
       
   279 		}
       
   280 		buf.Write(c[:n])
       
   281 		if c[0] == '\n' || c[0] == '>' {
       
   282 			Debug.Log(buf)
       
   283 			buf = bytes.NewBuffer([]uint8(prefix))
       
   284 		}
       
   285 	}
       
   286 	leftover := buf.String()
       
   287 	if leftover != "" {
       
   288 		Debug.Log(buf)
       
   289 	}
       
   290 }
   271 }
   291 
   272 
   292 func (cl *Client) Close() {
   273 func (cl *Client) Close() {
   293 	// Shuts down the receivers:
   274 	// Shuts down the receivers:
   294 	cl.setStatus(StatusShutdown)
   275 	cl.setStatus(StatusShutdown)
   295 	// Shuts down the senders:
   276 	// Shuts down the senders:
   296 	close(cl.Send)
   277 	close(cl.Send)
   297 }
   278 }
       
   279 
       
   280 // If there's a buffered error in the channel, return it. Otherwise,
       
   281 // return what was passed to us. The idea is that the error in the
       
   282 // channel probably preceded (and caused) the one that's passed as an
       
   283 // argument here.
       
   284 func (cl *Client) getError(err1 error) error {
       
   285 	select {
       
   286 	case err0 := <-cl.error:
       
   287 		return err0
       
   288 	default:
       
   289 		return err1
       
   290 	}
       
   291 }
       
   292 
       
   293 // Register an error that happened in the internals somewhere. If
       
   294 // there's already an error in the channel, discard the newer one in
       
   295 // favor of the older.
       
   296 func (cl *Client) setError(err error) {
       
   297 	cl.Close()
       
   298 	cl.setStatus(StatusError)
       
   299 	if len(cl.error) > 0 {
       
   300 		return
       
   301 	}
       
   302 	// If we're in a race between two calls to this function,
       
   303 	// trying to set the "first" error, just arbitrarily let one
       
   304 	// of them win.
       
   305 	select {
       
   306 	case cl.error <- err:
       
   307 	default:
       
   308 	}
       
   309 }