xmpp/xmpp.go
changeset 153 bbd4166df95d
parent 148 b1b4900eee5b
child 157 eadf15a06ff5
equal deleted inserted replaced
152:69c5b4382e39 153:bbd4166df95d
    34 	// DNS SRV names
    34 	// DNS SRV names
    35 	serverSrv = "xmpp-server"
    35 	serverSrv = "xmpp-server"
    36 	clientSrv = "xmpp-client"
    36 	clientSrv = "xmpp-client"
    37 )
    37 )
    38 
    38 
    39 // Flow control for preventing sending stanzas until negotiation has
    39 // Status of the connection.
    40 // completed.
    40 type Status int
    41 type sendCmd int
       
    42 
    41 
    43 const (
    42 const (
    44 	sendAllowConst = iota
    43 	statusUnconnected = iota
    45 	sendDenyConst
    44 	statusConnected
    46 	sendAbortConst
    45 	statusConnectedTls
       
    46 	statusAuthenticated
       
    47 	statusBound
       
    48 	statusRunning
       
    49 	statusShutdown
    47 )
    50 )
    48 
    51 
    49 var (
    52 var (
    50 	sendAllow sendCmd = sendAllowConst
    53 	// The client has not yet connected, or it has been
    51 	sendDeny  sendCmd = sendDenyConst
    54 	// disconnected from the server.
    52 	sendAbort sendCmd = sendAbortConst
    55 	StatusUnconnected Status = statusUnconnected
       
    56 	// Initial connection established.
       
    57 	StatusConnected Status = statusConnected
       
    58 	// Like StatusConnected, but with TLS.
       
    59 	StatusConnectedTls Status = statusConnectedTls
       
    60 	// Authentication succeeded.
       
    61 	StatusAuthenticated Status = statusAuthenticated
       
    62 	// Resource binding complete.
       
    63 	StatusBound Status = statusBound
       
    64 	// Session has started and normal message traffic can be sent
       
    65 	// and received.
       
    66 	StatusRunning Status = statusRunning
       
    67 	// The session has closed, or is in the process of closing.
       
    68 	StatusShutdown Status = statusShutdown
    53 )
    69 )
    54 
    70 
    55 // A filter can modify the XMPP traffic to or from the remote
    71 // A filter can modify the XMPP traffic to or from the remote
    56 // server. It's part of an Extension. The filter function will be
    72 // server. It's part of an Extension. The filter function will be
    57 // called in a new goroutine, so it doesn't need to return. The filter
    73 // called in a new goroutine, so it doesn't need to return. The filter
    72 	SendFilter Filter
    88 	SendFilter Filter
    73 }
    89 }
    74 
    90 
    75 // The client in a client-server XMPP connection.
    91 // The client in a client-server XMPP connection.
    76 type Client struct {
    92 type Client struct {
    77 	// This client's JID. This will be updated asynchronously by
    93 	// This client's full JID, including resource
    78 	// the time StartSession() returns.
       
    79 	Jid          JID
    94 	Jid          JID
    80 	password     string
    95 	password     string
    81 	saslExpected string
    96 	saslExpected string
    82 	authDone     bool
    97 	authDone     bool
    83 	handlers     chan *callback
    98 	handlers     chan *callback
    84 	inputControl chan sendCmd
       
    85 	// Incoming XMPP stanzas from the remote will be published on
    99 	// Incoming XMPP stanzas from the remote will be published on
    86 	// this channel. Information which is used by this library to
   100 	// this channel. Information which is used by this library to
    87 	// set up the XMPP stream will not appear here.
   101 	// set up the XMPP stream will not appear here.
    88 	Recv <-chan Stanza
   102 	Recv <-chan Stanza
    89 	// Outgoing XMPP stanzas to the server should be sent to this
   103 	// Outgoing XMPP stanzas to the server should be sent to this
    90 	// channel.
   104 	// channel.
    91 	Send    chan<- Stanza
   105 	Send    chan<- Stanza
    92 	sendXml chan<- interface{}
   106 	sendXml chan<- interface{}
       
   107 	statmgr *statmgr
    93 	// The client's roster is also known as the buddy list. It's
   108 	// The client's roster is also known as the buddy list. It's
    94 	// the set of contacts which are known to this JID, or which
   109 	// the set of contacts which are known to this JID, or which
    95 	// this JID is known to.
   110 	// this JID is known to.
    96 	Roster Roster
   111 	Roster Roster
    97 	// Features advertised by the remote. This will be updated
   112 	// Features advertised by the remote.
    98 	// asynchronously as new features are received throughout the
       
    99 	// connection process. It should not be updated once
       
   100 	// StartSession() returns.
       
   101 	Features                     *Features
   113 	Features                     *Features
   102 	sendFilterAdd, recvFilterAdd chan Filter
   114 	sendFilterAdd, recvFilterAdd chan Filter
   103 	// Allows the user to override the TLS configuration.
   115 	tlsConfig                    tls.Config
   104 	tlsConfig tls.Config
   116 	layer1                       *layer1
   105 	layer1    *layer1
   117 }
   106 }
   118 
   107 
   119 // Creates an XMPP client identified by the given JID, authenticating
   108 // Connect to the appropriate server and authenticate as the given JID
   120 // with the provided password and TLS config. Zero or more extensions
   109 // with the given password. This function will return as soon as a TCP
   121 // may be specified. The initial presence will be broadcast. If status
   110 // connection has been established, but before XMPP stream negotiation
   122 // is non-nil, connection progress information will be sent on it.
   111 // has completed. The negotiation will occur asynchronously, and any
   123 func NewClient(jid *JID, password string, tlsconf tls.Config, exts []Extension,
   112 // send operation to Client.Send will block until negotiation
   124 	pr Presence, status chan<- Status) (*Client, error) {
   113 // (resource binding) is complete. The caller must immediately start
   125 
   114 // reading from Client.Recv.
       
   115 func NewClient(jid *JID, password string, tlsconf tls.Config, exts []Extension) (*Client, error) {
       
   116 	// Include the mandatory extensions.
   126 	// Include the mandatory extensions.
   117 	roster := newRosterExt()
   127 	roster := newRosterExt()
   118 	exts = append(exts, roster.Extension)
   128 	exts = append(exts, roster.Extension)
   119 	exts = append(exts, bindExt)
   129 	exts = append(exts, bindExt)
       
   130 
       
   131 	cl := new(Client)
       
   132 	cl.Roster = *roster
       
   133 	cl.password = password
       
   134 	cl.Jid = *jid
       
   135 	cl.handlers = make(chan *callback, 100)
       
   136 	cl.tlsConfig = tlsconf
       
   137 	cl.sendFilterAdd = make(chan Filter)
       
   138 	cl.recvFilterAdd = make(chan Filter)
       
   139 	cl.statmgr = newStatmgr(status)
       
   140 
       
   141 	extStanza := make(map[xml.Name]reflect.Type)
       
   142 	for _, ext := range exts {
       
   143 		for k, v := range ext.StanzaHandlers {
       
   144 			if _, ok := extStanza[k]; ok {
       
   145 				return nil, fmt.Errorf("duplicate handler %s",
       
   146 					k)
       
   147 			}
       
   148 			extStanza[k] = v
       
   149 		}
       
   150 	}
   120 
   151 
   121 	// Resolve the domain in the JID.
   152 	// Resolve the domain in the JID.
   122 	_, srvs, err := net.LookupSRV(clientSrv, "tcp", jid.Domain)
   153 	_, srvs, err := net.LookupSRV(clientSrv, "tcp", jid.Domain)
   123 	if err != nil {
   154 	if err != nil {
   124 		return nil, fmt.Errorf("LookupSrv %s: %v", jid.Domain, err)
   155 		return nil, fmt.Errorf("LookupSrv %s: %v", jid.Domain, err)
   143 		}
   174 		}
   144 	}
   175 	}
   145 	if tcp == nil {
   176 	if tcp == nil {
   146 		return nil, err
   177 		return nil, err
   147 	}
   178 	}
   148 
   179 	cl.setStatus(StatusConnected)
   149 	cl := new(Client)
       
   150 	cl.Roster = *roster
       
   151 	cl.password = password
       
   152 	cl.Jid = *jid
       
   153 	cl.handlers = make(chan *callback, 100)
       
   154 	cl.inputControl = make(chan sendCmd)
       
   155 	cl.tlsConfig = tlsconf
       
   156 	cl.sendFilterAdd = make(chan Filter)
       
   157 	cl.recvFilterAdd = make(chan Filter)
       
   158 
       
   159 	extStanza := make(map[xml.Name]reflect.Type)
       
   160 	for _, ext := range exts {
       
   161 		for k, v := range ext.StanzaHandlers {
       
   162 			if _, ok := extStanza[k]; ok {
       
   163 				return nil, fmt.Errorf("duplicate handler %s",
       
   164 					k)
       
   165 			}
       
   166 			extStanza[k] = v
       
   167 		}
       
   168 	}
       
   169 
   180 
   170 	// Start the transport handler, initially unencrypted.
   181 	// Start the transport handler, initially unencrypted.
   171 	recvReader, recvWriter := io.Pipe()
   182 	recvReader, recvWriter := io.Pipe()
   172 	sendReader, sendWriter := io.Pipe()
   183 	sendReader, sendWriter := io.Pipe()
   173 	cl.layer1 = startLayer1(tcp, recvWriter, sendReader)
   184 	cl.layer1 = startLayer1(tcp, recvWriter, sendReader,
       
   185 		cl.statmgr.newListener())
   174 
   186 
   175 	// Start the reader and writer that convert to and from XML.
   187 	// Start the reader and writer that convert to and from XML.
   176 	recvXmlCh := make(chan interface{})
   188 	recvXmlCh := make(chan interface{})
   177 	go recvXml(recvReader, recvXmlCh, extStanza)
   189 	go recvXml(recvReader, recvXmlCh, extStanza)
   178 	sendXmlCh := make(chan interface{})
   190 	sendXmlCh := make(chan interface{})
   180 	go sendXml(sendWriter, sendXmlCh)
   192 	go sendXml(sendWriter, sendXmlCh)
   181 
   193 
   182 	// Start the reader and writer that convert between XML and
   194 	// Start the reader and writer that convert between XML and
   183 	// XMPP stanzas.
   195 	// XMPP stanzas.
   184 	recvRawXmpp := make(chan Stanza)
   196 	recvRawXmpp := make(chan Stanza)
   185 	go cl.recvStream(recvXmlCh, recvRawXmpp)
   197 	go cl.recvStream(recvXmlCh, recvRawXmpp, cl.statmgr.newListener())
   186 	sendRawXmpp := make(chan Stanza)
   198 	sendRawXmpp := make(chan Stanza)
   187 	go sendStream(sendXmlCh, sendRawXmpp, cl.inputControl)
   199 	go sendStream(sendXmlCh, sendRawXmpp, cl.statmgr.newListener())
   188 
   200 
   189 	// Start the manager for the filters that can modify what the
   201 	// Start the managers for the filters that can modify what the
   190 	// app sees.
   202 	// app sees or sends.
   191 	recvFiltXmpp := make(chan Stanza)
   203 	recvFiltXmpp := make(chan Stanza)
   192 	cl.Recv = recvFiltXmpp
   204 	cl.Recv = recvFiltXmpp
   193 	go filterMgr(cl.recvFilterAdd, recvRawXmpp, recvFiltXmpp)
   205 	go filterMgr(cl.recvFilterAdd, recvRawXmpp, recvFiltXmpp)
   194 	sendFiltXmpp := make(chan Stanza)
   206 	sendFiltXmpp := make(chan Stanza)
   195 	cl.Send = sendFiltXmpp
   207 	cl.Send = sendFiltXmpp
   202 
   214 
   203 	// Initial handshake.
   215 	// Initial handshake.
   204 	hsOut := &stream{To: jid.Domain, Version: XMPPVersion}
   216 	hsOut := &stream{To: jid.Domain, Version: XMPPVersion}
   205 	cl.sendXml <- hsOut
   217 	cl.sendXml <- hsOut
   206 
   218 
       
   219 	// Wait until resource binding is complete.
       
   220 	if err := cl.statmgr.awaitStatus(StatusBound); err != nil {
       
   221 		return nil, err
       
   222 	}
       
   223 
       
   224 	// Initialize the session.
       
   225 	id := NextId()
       
   226 	iq := &Iq{Header: Header{To: cl.Jid.Domain, Id: id, Type: "set",
       
   227 		Nested: []interface{}{Generic{XMLName: xml.Name{Space: NsSession, Local: "session"}}}}}
       
   228 	ch := make(chan error)
       
   229 	f := func(st Stanza) {
       
   230 		iq, ok := st.(*Iq)
       
   231 		if !ok {
       
   232 			Warn.Log("iq reply not iq; can't start session")
       
   233 			ch <- errors.New("bad session start reply")
       
   234 		}
       
   235 		if iq.Type == "error" {
       
   236 			Warn.Logf("Can't start session: %v", iq)
       
   237 			ch <- iq.Error
       
   238 		}
       
   239 		ch <- nil
       
   240 	}
       
   241 	cl.SetCallback(id, f)
       
   242 	cl.sendXml <- iq
       
   243 	// Now wait until the callback is called.
       
   244 	if err := <-ch; err != nil {
       
   245 		return nil, err
       
   246 	}
       
   247 
       
   248 	// This allows the client to receive stanzas.
       
   249 	cl.setStatus(StatusRunning)
       
   250 
       
   251 	// Request the roster.
       
   252 	cl.Roster.update()
       
   253 
       
   254 	// Send the initial presence.
       
   255 	cl.Send <- &pr
       
   256 
   207 	return cl, nil
   257 	return cl, nil
   208 }
   258 }
   209 
   259 
   210 func tee(r io.Reader, w io.Writer, prefix string) {
   260 func tee(r io.Reader, w io.Writer, prefix string) {
   211 	defer func(w io.Writer) {
   261 	defer func(w io.Writer) {
   234 	leftover := buf.String()
   284 	leftover := buf.String()
   235 	if leftover != "" {
   285 	if leftover != "" {
   236 		Debug.Log(buf)
   286 		Debug.Log(buf)
   237 	}
   287 	}
   238 }
   288 }
   239 
       
   240 // bindDone is called when we've finished resource binding (and all
       
   241 // the negotiations that precede it). Now we can start accepting
       
   242 // traffic from the app.
       
   243 func (cl *Client) bindDone() {
       
   244 	cl.inputControl <- sendAllow
       
   245 }
       
   246 
       
   247 // Start an XMPP session. A typical XMPP client should call this
       
   248 // immediately after creating the Client in order to start the session
       
   249 // and broadcast an initial presence. The presence can be as simple as
       
   250 // a newly-initialized Presence struct.  See RFC 3921, Section
       
   251 // 3. After calling this, a normal client should call Roster.Update().
       
   252 func (cl *Client) StartSession(pr *Presence) error {
       
   253 	id := NextId()
       
   254 	iq := &Iq{Header: Header{To: cl.Jid.Domain, Id: id, Type: "set",
       
   255 		Nested: []interface{}{Generic{XMLName: xml.Name{Space: NsSession, Local: "session"}}}}}
       
   256 	ch := make(chan error)
       
   257 	f := func(st Stanza) bool {
       
   258 		iq, ok := st.(*Iq)
       
   259 		if !ok {
       
   260 			Warn.Log("iq reply not iq; can't start session")
       
   261 			ch <- errors.New("bad session start reply")
       
   262 			return false
       
   263 		}
       
   264 		if iq.Type == "error" {
       
   265 			Warn.Logf("Can't start session: %v", iq)
       
   266 			ch <- iq.Error
       
   267 			return false
       
   268 		}
       
   269 		ch <- nil
       
   270 		return false
       
   271 	}
       
   272 	cl.SetCallback(id, f)
       
   273 	cl.Send <- iq
       
   274 
       
   275 	// Now wait until the callback is called.
       
   276 	if err := <-ch; err != nil {
       
   277 		return err
       
   278 	}
       
   279 	if pr != nil {
       
   280 		cl.Send <- pr
       
   281 	}
       
   282 	return nil
       
   283 }