xmpp/xmpp.go
changeset 183 b4bd77d58a3e
parent 180 3010996c1928
child 185 ba8a4ae40e13
equal deleted inserted replaced
182:626c390682fc 183:b4bd77d58a3e
     5 // sends structures representing XMPP stanzas. Additional stanza
     5 // sends structures representing XMPP stanzas. Additional stanza
     6 // parsers can be inserted into the stack of layers as extensions.
     6 // parsers can be inserted into the stack of layers as extensions.
     7 package xmpp
     7 package xmpp
     8 
     8 
     9 import (
     9 import (
    10 	"bytes"
       
    11 	"crypto/tls"
    10 	"crypto/tls"
    12 	"encoding/xml"
    11 	"encoding/xml"
    13 	"errors"
       
    14 	"fmt"
    12 	"fmt"
    15 	"io"
    13 	"io"
    16 	"net"
    14 	"net"
    17 	"reflect"
    15 	"reflect"
       
    16 	"sync"
    18 )
    17 )
    19 
    18 
    20 const (
    19 const (
    21 	// Version of RFC 3920 that we implement.
    20 	// Version of RFC 3920 that we implement.
    22 	XMPPVersion = "1.0"
    21 	XMPPVersion = "1.0"
    34 	// DNS SRV names
    33 	// DNS SRV names
    35 	serverSrv = "xmpp-server"
    34 	serverSrv = "xmpp-server"
    36 	clientSrv = "xmpp-client"
    35 	clientSrv = "xmpp-client"
    37 )
    36 )
    38 
    37 
    39 // Status of the connection.
       
    40 type Status int
       
    41 
       
    42 const (
       
    43 	statusUnconnected = iota
       
    44 	statusConnected
       
    45 	statusConnectedTls
       
    46 	statusAuthenticated
       
    47 	statusBound
       
    48 	statusRunning
       
    49 	statusShutdown
       
    50 )
       
    51 
       
    52 var (
       
    53 	// The client has not yet connected, or it has been
       
    54 	// disconnected from the server.
       
    55 	StatusUnconnected Status = statusUnconnected
       
    56 	// Initial connection established.
       
    57 	StatusConnected Status = statusConnected
       
    58 	// Like StatusConnected, but with TLS.
       
    59 	StatusConnectedTls Status = statusConnectedTls
       
    60 	// Authentication succeeded.
       
    61 	StatusAuthenticated Status = statusAuthenticated
       
    62 	// Resource binding complete.
       
    63 	StatusBound Status = statusBound
       
    64 	// Session has started and normal message traffic can be sent
       
    65 	// and received.
       
    66 	StatusRunning Status = statusRunning
       
    67 	// The session has closed, or is in the process of closing.
       
    68 	StatusShutdown Status = statusShutdown
       
    69 )
       
    70 
       
    71 // A filter can modify the XMPP traffic to or from the remote
    38 // A filter can modify the XMPP traffic to or from the remote
    72 // server. It's part of an Extension. The filter function will be
    39 // server. It's part of an Extension. The filter function will be
    73 // called in a new goroutine, so it doesn't need to return. The filter
    40 // called in a new goroutine, so it doesn't need to return. The filter
    74 // should close its output when its input is closed.
    41 // should close its output when its input is closed.
    75 type Filter func(in <-chan Stanza, out chan<- Stanza)
    42 type Filter func(in <-chan Stanza, out chan<- Stanza)
    76 
    43 
    77 // Extensions can add stanza filters and/or new XML element types.
    44 // Extensions can add stanza filters and/or new XML element types.
    78 type Extension struct {
    45 type Extension struct {
    79 	// Maps from an XML namespace to a function which constructs a
    46 	// Maps from an XML name to a structure which holds stanza
    80 	// structure to hold the contents of stanzas in that
    47 	// contents with that name.
    81 	// namespace.
    48 	StanzaTypes map[xml.Name]reflect.Type
    82 	StanzaHandlers map[xml.Name]reflect.Type
       
    83 	// If non-nil, will be called once to start the filter
    49 	// If non-nil, will be called once to start the filter
    84 	// running. RecvFilter intercepts incoming messages on their
    50 	// running. RecvFilter intercepts incoming messages on their
    85 	// way from the remote server to the application; SendFilter
    51 	// way from the remote server to the application; SendFilter
    86 	// intercepts messages going the other direction.
    52 	// intercepts messages going the other direction.
    87 	RecvFilter Filter
    53 	RecvFilter Filter
    99 	// Incoming XMPP stanzas from the remote will be published on
    65 	// Incoming XMPP stanzas from the remote will be published on
   100 	// this channel. Information which is used by this library to
    66 	// this channel. Information which is used by this library to
   101 	// set up the XMPP stream will not appear here.
    67 	// set up the XMPP stream will not appear here.
   102 	Recv <-chan Stanza
    68 	Recv <-chan Stanza
   103 	// Outgoing XMPP stanzas to the server should be sent to this
    69 	// Outgoing XMPP stanzas to the server should be sent to this
   104 	// channel.
    70 	// channel. The application should not close this channel;
       
    71 	// rather, call Close().
   105 	Send    chan<- Stanza
    72 	Send    chan<- Stanza
   106 	sendXml chan<- interface{}
    73 	sendRaw chan<- interface{}
   107 	statmgr *statmgr
    74 	statmgr *statmgr
   108 	// The client's roster is also known as the buddy list. It's
    75 	// The client's roster is also known as the buddy list. It's
   109 	// the set of contacts which are known to this JID, or which
    76 	// the set of contacts which are known to this JID, or which
   110 	// this JID is known to.
    77 	// this JID is known to.
   111 	Roster Roster
    78 	Roster Roster
   112 	// Features advertised by the remote.
    79 	// Features advertised by the remote.
   113 	Features                     *Features
    80 	Features                     *Features
   114 	sendFilterAdd, recvFilterAdd chan Filter
    81 	sendFilterAdd, recvFilterAdd chan Filter
   115 	tlsConfig                    tls.Config
    82 	tlsConfig                    tls.Config
   116 	layer1                       *layer1
    83 	layer1                       *layer1
       
    84 	error                        chan error
       
    85 	shutdownOnce                 sync.Once
   117 }
    86 }
   118 
    87 
   119 // Creates an XMPP client identified by the given JID, authenticating
    88 // Creates an XMPP client identified by the given JID, authenticating
   120 // with the provided password and TLS config. Zero or more extensions
    89 // with the provided password and TLS config. Zero or more extensions
   121 // may be specified. The initial presence will be broadcast. If status
    90 // may be specified. The initial presence will be broadcast. If status
   122 // is non-nil, connection progress information will be sent on it.
    91 // is non-nil, connection progress information will be sent on it.
   123 func NewClient(jid *JID, password string, tlsconf tls.Config, exts []Extension,
    92 func NewClient(jid *JID, password string, tlsconf tls.Config, exts []Extension,
   124 	pr Presence, status chan<- Status) (*Client, error) {
    93 	pr Presence, status chan<- Status) (*Client, error) {
       
    94 
       
    95 	// Resolve the domain in the JID.
       
    96 	_, srvs, err := net.LookupSRV(clientSrv, "tcp", jid.Domain())
       
    97 	if err != nil {
       
    98 		return nil, fmt.Errorf("LookupSrv %s: %v", jid.Domain, err)
       
    99 	}
       
   100 	if len(srvs) == 0 {
       
   101 		return nil, fmt.Errorf("LookupSrv %s: no results", jid.Domain)
       
   102 	}
       
   103 
       
   104 	var tcp *net.TCPConn
       
   105 	for _, srv := range srvs {
       
   106 		addrStr := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
       
   107 		var addr *net.TCPAddr
       
   108 		addr, err = net.ResolveTCPAddr("tcp", addrStr)
       
   109 		if err != nil {
       
   110 			err = fmt.Errorf("ResolveTCPAddr(%s): %s",
       
   111 				addrStr, err.Error())
       
   112 			continue
       
   113 		}
       
   114 		tcp, err = net.DialTCP("tcp", nil, addr)
       
   115 		if tcp != nil {
       
   116 			break
       
   117 		}
       
   118 	}
       
   119 	if tcp == nil {
       
   120 		return nil, err
       
   121 	}
       
   122 
       
   123 	return newClient(tcp, jid, password, tlsconf, exts, pr, status)
       
   124 }
       
   125 
       
   126 // Connect to the specified host and port. This is otherwise identical
       
   127 // to NewClient.
       
   128 func NewClientFromHost(jid *JID, password string, tlsconf tls.Config,
       
   129 	exts []Extension, pr Presence, status chan<- Status, host string,
       
   130 	port int) (*Client, error) {
       
   131 
       
   132 	addrStr := fmt.Sprintf("%s:%d", host, port)
       
   133 	addr, err := net.ResolveTCPAddr("tcp", addrStr)
       
   134 	if err != nil {
       
   135 		return nil, err
       
   136 	}
       
   137 	tcp, err := net.DialTCP("tcp", nil, addr)
       
   138 	if err != nil {
       
   139 		return nil, err
       
   140 	}
       
   141 
       
   142 	return newClient(tcp, jid, password, tlsconf, exts, pr, status)
       
   143 }
       
   144 
       
   145 func newClient(tcp *net.TCPConn, jid *JID, password string, tlsconf tls.Config,
       
   146 	exts []Extension, pr Presence, status chan<- Status) (*Client, error) {
   125 
   147 
   126 	// Include the mandatory extensions.
   148 	// Include the mandatory extensions.
   127 	roster := newRosterExt()
   149 	roster := newRosterExt()
   128 	exts = append(exts, roster.Extension)
   150 	exts = append(exts, roster.Extension)
   129 	exts = append(exts, bindExt)
   151 	exts = append(exts, bindExt)
   135 	cl.handlers = make(chan *callback, 100)
   157 	cl.handlers = make(chan *callback, 100)
   136 	cl.tlsConfig = tlsconf
   158 	cl.tlsConfig = tlsconf
   137 	cl.sendFilterAdd = make(chan Filter)
   159 	cl.sendFilterAdd = make(chan Filter)
   138 	cl.recvFilterAdd = make(chan Filter)
   160 	cl.recvFilterAdd = make(chan Filter)
   139 	cl.statmgr = newStatmgr(status)
   161 	cl.statmgr = newStatmgr(status)
       
   162 	cl.error = make(chan error, 1)
   140 
   163 
   141 	extStanza := make(map[xml.Name]reflect.Type)
   164 	extStanza := make(map[xml.Name]reflect.Type)
   142 	for _, ext := range exts {
   165 	for _, ext := range exts {
   143 		for k, v := range ext.StanzaHandlers {
   166 		for k, v := range ext.StanzaTypes {
   144 			if _, ok := extStanza[k]; ok {
   167 			if _, ok := extStanza[k]; ok {
   145 				return nil, fmt.Errorf("duplicate handler %s",
   168 				return nil, fmt.Errorf("duplicate handler %s",
   146 					k)
   169 					k)
   147 			}
   170 			}
   148 			extStanza[k] = v
   171 			extStanza[k] = v
   149 		}
   172 		}
   150 	}
   173 	}
   151 
   174 
   152 	// Resolve the domain in the JID.
   175 	// The thing that called this made a TCP connection, so now we
   153 	_, srvs, err := net.LookupSRV(clientSrv, "tcp", jid.Domain)
   176 	// can signal that it's connected.
   154 	if err != nil {
       
   155 		return nil, fmt.Errorf("LookupSrv %s: %v", jid.Domain, err)
       
   156 	}
       
   157 	if len(srvs) == 0 {
       
   158 		return nil, fmt.Errorf("LookupSrv %s: no results", jid.Domain)
       
   159 	}
       
   160 
       
   161 	var tcp *net.TCPConn
       
   162 	for _, srv := range srvs {
       
   163 		addrStr := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
       
   164 		var addr *net.TCPAddr
       
   165 		addr, err = net.ResolveTCPAddr("tcp", addrStr)
       
   166 		if err != nil {
       
   167 			err = fmt.Errorf("ResolveTCPAddr(%s): %s",
       
   168 				addrStr, err.Error())
       
   169 			continue
       
   170 		}
       
   171 		tcp, err = net.DialTCP("tcp", nil, addr)
       
   172 		if tcp != nil {
       
   173 			break
       
   174 		}
       
   175 	}
       
   176 	if tcp == nil {
       
   177 		return nil, err
       
   178 	}
       
   179 	cl.setStatus(StatusConnected)
   177 	cl.setStatus(StatusConnected)
   180 
   178 
   181 	// Start the transport handler, initially unencrypted.
   179 	// Start the transport handler, initially unencrypted.
   182 	recvReader, recvWriter := io.Pipe()
   180 	recvReader, recvWriter := io.Pipe()
   183 	sendReader, sendWriter := io.Pipe()
   181 	sendReader, sendWriter := io.Pipe()
   184 	cl.layer1 = startLayer1(tcp, recvWriter, sendReader,
   182 	cl.layer1 = cl.startLayer1(tcp, recvWriter, sendReader,
   185 		cl.statmgr.newListener())
   183 		cl.statmgr.newListener())
   186 
   184 
   187 	// Start the reader and writer that convert to and from XML.
   185 	// Start the reader and writer that convert to and from XML.
   188 	recvXmlCh := make(chan interface{})
   186 	recvXmlCh := make(chan interface{})
   189 	go recvXml(recvReader, recvXmlCh, extStanza)
   187 	go cl.recvXml(recvReader, recvXmlCh, extStanza)
   190 	sendXmlCh := make(chan interface{})
   188 	sendXmlCh := make(chan interface{})
   191 	cl.sendXml = sendXmlCh
   189 	cl.sendRaw = sendXmlCh
   192 	go sendXml(sendWriter, sendXmlCh)
   190 	go cl.sendXml(sendWriter, sendXmlCh)
   193 
   191 
   194 	// Start the reader and writer that convert between XML and
   192 	// Start the reader and writer that convert between XML and
   195 	// XMPP stanzas.
   193 	// XMPP stanzas.
   196 	recvRawXmpp := make(chan Stanza)
   194 	recvRawXmpp := make(chan Stanza)
   197 	go cl.recvStream(recvXmlCh, recvRawXmpp, cl.statmgr.newListener())
   195 	go cl.recvStream(recvXmlCh, recvRawXmpp, cl.statmgr.newListener())
   211 		cl.AddRecvFilter(ext.RecvFilter)
   209 		cl.AddRecvFilter(ext.RecvFilter)
   212 		cl.AddSendFilter(ext.SendFilter)
   210 		cl.AddSendFilter(ext.SendFilter)
   213 	}
   211 	}
   214 
   212 
   215 	// Initial handshake.
   213 	// Initial handshake.
   216 	hsOut := &stream{To: jid.Domain, Version: XMPPVersion}
   214 	hsOut := &stream{To: jid.Domain(), Version: XMPPVersion}
   217 	cl.sendXml <- hsOut
   215 	cl.sendRaw <- hsOut
   218 
   216 
   219 	// Wait until resource binding is complete.
   217 	// Wait until resource binding is complete.
   220 	if err := cl.statmgr.awaitStatus(StatusBound); err != nil {
   218 	if err := cl.statmgr.awaitStatus(StatusBound); err != nil {
   221 		return nil, err
   219 		return nil, cl.getError(err)
   222 	}
   220 	}
       
   221 
       
   222 	// Forget about the password, for paranoia's sake.
       
   223 	cl.password = ""
   223 
   224 
   224 	// Initialize the session.
   225 	// Initialize the session.
   225 	id := NextId()
   226 	id := NextId()
   226 	iq := &Iq{Header: Header{To: cl.Jid.Domain, Id: id, Type: "set",
   227 	iq := &Iq{Header: Header{To: JID(cl.Jid.Domain()), Id: id, Type: "set",
   227 		Nested: []interface{}{Generic{XMLName: xml.Name{Space: NsSession, Local: "session"}}}}}
   228 		Nested: []interface{}{Generic{XMLName: xml.Name{Space: NsSession, Local: "session"}}}}}
   228 	ch := make(chan error)
   229 	ch := make(chan error)
   229 	f := func(st Stanza) {
   230 	f := func(st Stanza) {
   230 		iq, ok := st.(*Iq)
   231 		iq, ok := st.(*Iq)
   231 		if !ok {
   232 		if !ok {
   232 			Warn.Log("iq reply not iq; can't start session")
   233 			ch <- fmt.Errorf("bad session start reply: %#v", st)
   233 			ch <- errors.New("bad session start reply")
       
   234 		}
   234 		}
   235 		if iq.Type == "error" {
   235 		if iq.Type == "error" {
   236 			Warn.Logf("Can't start session: %v", iq)
   236 			ch <- fmt.Errorf("Can't start session: %v", iq.Error)
   237 			ch <- iq.Error
       
   238 		}
   237 		}
   239 		ch <- nil
   238 		ch <- nil
   240 	}
   239 	}
   241 	cl.SetCallback(id, f)
   240 	cl.SetCallback(id, f)
   242 	cl.sendXml <- iq
   241 	cl.sendRaw <- iq
   243 	// Now wait until the callback is called.
   242 	// Now wait until the callback is called.
   244 	if err := <-ch; err != nil {
   243 	if err := <-ch; err != nil {
   245 		return nil, err
   244 		return nil, cl.getError(err)
   246 	}
   245 	}
   247 
   246 
   248 	// This allows the client to receive stanzas.
   247 	// This allows the client to receive stanzas.
   249 	cl.setStatus(StatusRunning)
   248 	cl.setStatus(StatusRunning)
   250 
   249 
   252 	cl.Roster.update()
   251 	cl.Roster.update()
   253 
   252 
   254 	// Send the initial presence.
   253 	// Send the initial presence.
   255 	cl.Send <- &pr
   254 	cl.Send <- &pr
   256 
   255 
   257 	return cl, nil
   256 	return cl, cl.getError(nil)
   258 }
   257 }
   259 
   258 
   260 func tee(r io.Reader, w io.Writer, prefix string) {
   259 func (cl *Client) Close() {
   261 	defer func(w io.Writer) {
   260 	// Shuts down the receivers:
   262 		if c, ok := w.(io.Closer); ok {
   261 	cl.setStatus(StatusShutdown)
   263 			c.Close()
   262 
   264 		}
   263 	// Shuts down the senders:
   265 	}(w)
   264 	cl.shutdownOnce.Do(func() { close(cl.Send) })
   266 
   265 }
   267 	buf := bytes.NewBuffer([]uint8(prefix))
   266 
   268 	for {
   267 // If there's a buffered error in the channel, return it. Otherwise,
   269 		var c [1]byte
   268 // return what was passed to us. The idea is that the error in the
   270 		n, _ := r.Read(c[:])
   269 // channel probably preceded (and caused) the one that's passed as an
   271 		if n == 0 {
   270 // argument here.
   272 			break
   271 func (cl *Client) getError(err1 error) error {
   273 		}
   272 	select {
   274 		n, _ = w.Write(c[:n])
   273 	case err0 := <-cl.error:
   275 		if n == 0 {
   274 		return err0
   276 			break
   275 	default:
   277 		}
   276 		return err1
   278 		buf.Write(c[:n])
   277 	}
   279 		if c[0] == '\n' || c[0] == '>' {
   278 }
   280 			Debug.Log(buf)
   279 
   281 			buf = bytes.NewBuffer([]uint8(prefix))
   280 // Register an error that happened in the internals somewhere. If
   282 		}
   281 // there's already an error in the channel, discard the newer one in
   283 	}
   282 // favor of the older.
   284 	leftover := buf.String()
   283 func (cl *Client) setError(err error) {
   285 	if leftover != "" {
   284 	defer cl.Close()
   286 		Debug.Log(buf)
   285 	defer cl.setStatus(StatusError)
   287 	}
   286 
   288 }
   287 	if len(cl.error) > 0 {
       
   288 		return
       
   289 	}
       
   290 	// If we're in a race between two calls to this function,
       
   291 	// trying to set the "first" error, just arbitrarily let one
       
   292 	// of them win.
       
   293 	select {
       
   294 	case cl.error <- err:
       
   295 	default:
       
   296 	}
       
   297 }