stream.go
changeset 17 d269d9c0fc8e
parent 16 b839e37b3f29
child 19 e923f28d65aa
equal deleted inserted replaced
16:b839e37b3f29 17:d269d9c0fc8e
     3 // license that can be found in the LICENSE file.
     3 // license that can be found in the LICENSE file.
     4 
     4 
     5 // This file contains the three layers of processing for the
     5 // This file contains the three layers of processing for the
     6 // communication with the server: transport (where TLS happens), XML
     6 // communication with the server: transport (where TLS happens), XML
     7 // (where strings are converted to go structures), and Stream (where
     7 // (where strings are converted to go structures), and Stream (where
     8 // we respond to XMPP events on behalf of the library client).
     8 // we respond to XMPP events on behalf of the library client), or send
       
     9 // those events to the client.
     9 
    10 
    10 package xmpp
    11 package xmpp
    11 
    12 
    12 import (
    13 import (
    13 	"big"
    14 	"big"
    24 	"strings"
    25 	"strings"
    25 	"time"
    26 	"time"
    26 	"xml"
    27 	"xml"
    27 )
    28 )
    28 
    29 
       
    30 // Callback to handle a stanza with a particular id.
    29 type stanzaHandler struct {
    31 type stanzaHandler struct {
    30 	id string
    32 	id string
       
    33 	// Return true means pass this to the application
    31 	f func(Stanza) bool
    34 	f func(Stanza) bool
    32 }
    35 }
       
    36 
       
    37 // TODO Review all these *Client receiver methods. They should
       
    38 // probably either all be receivers, or none.
    33 
    39 
    34 func (cl *Client) readTransport(w io.Writer) {
    40 func (cl *Client) readTransport(w io.Writer) {
    35 	defer tryClose(cl.socket, w)
    41 	defer tryClose(cl.socket, w)
    36 	cl.socket.SetReadTimeout(1e8)
    42 	cl.socket.SetReadTimeout(1e8)
    37 	p := make([]byte, 1024)
    43 	p := make([]byte, 1024)
   136 		if err != nil {
   142 		if err != nil {
   137 			log.Printf("unmarshal: %v", err)
   143 			log.Printf("unmarshal: %v", err)
   138 			break
   144 			break
   139 		}
   145 		}
   140 
   146 
       
   147 		// TODO If it's a Stanza, use reflection to search for
       
   148 		// any Unrecognized elements and fill in their
       
   149 		// attributes.
       
   150 
   141 		// Put it on the channel.
   151 		// Put it on the channel.
   142 		ch <- obj
   152 		ch <- obj
   143 	}
   153 	}
   144 }
   154 }
   145 
   155 
   158 			break
   168 			break
   159 		}
   169 		}
   160 	}
   170 	}
   161 }
   171 }
   162 
   172 
       
   173 // TODO This should go away. We shouldn't allow writing of
       
   174 // unstructured data.
   163 func writeText(w io.Writer, ch <-chan *string) {
   175 func writeText(w io.Writer, ch <-chan *string) {
   164 	if debug {
   176 	if debug {
   165 		pr, pw := io.Pipe()
   177 		pr, pw := io.Pipe()
   166 		go tee(pr, w, "C: ")
   178 		go tee(pr, w, "C: ")
   167 		w = pw
   179 		w = pw
   212 			}
   224 			}
   213 		}
   225 		}
   214 	}
   226 	}
   215 }
   227 }
   216 
   228 
       
   229 // TODO Disable this loop until resource binding is
       
   230 // complete. Otherwise the app might inject something weird into our
       
   231 // negotiation stream.
   217 func writeStream(srvOut chan<- interface{}, cliIn <-chan interface{}) {
   232 func writeStream(srvOut chan<- interface{}, cliIn <-chan interface{}) {
   218 	defer tryClose(srvOut, cliIn)
   233 	defer tryClose(srvOut, cliIn)
   219 
   234 
   220 	for x := range cliIn {
   235 	for x := range cliIn {
   221 		srvOut <- x
   236 		srvOut <- x
   238 		return
   253 		return
   239 	}
   254 	}
   240 
   255 
   241 	if fe.Bind != nil {
   256 	if fe.Bind != nil {
   242 		cl.bind(fe.Bind)
   257 		cl.bind(fe.Bind)
       
   258 		return
   243 	}
   259 	}
   244 }
   260 }
   245 
   261 
   246 // readTransport() is running concurrently. We need to stop it,
   262 // readTransport() is running concurrently. We need to stop it,
   247 // negotiate TLS, then start it again. It calls waitForSocket() in
   263 // negotiate TLS, then start it again. It calls waitForSocket() in
   289 
   305 
   290 	// Signal that we're going back to the read loop.
   306 	// Signal that we're going back to the read loop.
   291 	cl.socketSync.Done()
   307 	cl.socketSync.Done()
   292 }
   308 }
   293 
   309 
       
   310 // TODO Implement TLS/SASL EXTERNAL.
   294 func (cl *Client) chooseSasl(fe *Features) {
   311 func (cl *Client) chooseSasl(fe *Features) {
   295 	var digestMd5 bool
   312 	var digestMd5 bool
   296 	for _, m := range(fe.Mechanisms.Mechanism) {
   313 	for _, m := range(fe.Mechanisms.Mechanism) {
   297 		switch strings.ToLower(m) {
   314 		switch strings.ToLower(m) {
   298 		case "digest-md5":
   315 		case "digest-md5":
   434 		m[key] = value
   451 		m[key] = value
   435 	}
   452 	}
   436 	return m
   453 	return m
   437 }
   454 }
   438 
   455 
       
   456 // Inverse of parseSasl().
   439 func packSasl(m map[string]string) string {
   457 func packSasl(m map[string]string) string {
   440 	var terms []string
   458 	var terms []string
   441 	for key, value := range(m) {
   459 	for key, value := range(m) {
   442 		if key == "" || value == "" || value == `""` {
   460 		if key == "" || value == "" || value == `""` {
   443 			continue
   461 			continue
   445 		terms = append(terms, key + "=" + value)
   463 		terms = append(terms, key + "=" + value)
   446 	}
   464 	}
   447 	return strings.Join(terms, ",")
   465 	return strings.Join(terms, ",")
   448 }
   466 }
   449 
   467 
       
   468 // Computes the response string for digest authentication.
   450 func saslDigestResponse(username, realm, passwd, nonce, cnonceStr,
   469 func saslDigestResponse(username, realm, passwd, nonce, cnonceStr,
   451 	authenticate, digestUri, nonceCountStr string) string {
   470 	authenticate, digestUri, nonceCountStr string) string {
   452 	h := func(text string) []byte {
   471 	h := func(text string) []byte {
   453 		h := md5.New()
   472 		h := md5.New()
   454 		h.Write([]byte(text))
   473 		h.Write([]byte(text))
   468 		nonceCountStr + ":" + cnonceStr + ":auth:" +
   487 		nonceCountStr + ":" + cnonceStr + ":auth:" +
   469 		hex(h(a2))))
   488 		hex(h(a2))))
   470 	return response
   489 	return response
   471 }
   490 }
   472 
   491 
       
   492 // Send a request to bind a resource. RFC 3920, section 7.
   473 func (cl *Client) bind(bind *Unrecognized) {
   493 func (cl *Client) bind(bind *Unrecognized) {
   474 	res := cl.Jid.Resource
   494 	res := cl.Jid.Resource
   475 	msg := &Iq{Type: "set", Id: cl.NextId(), Any:
   495 	msg := &Iq{Type: "set", Id: cl.NextId(), Any:
   476 		&Unrecognized{XMLName: xml.Name{Space: nsBind, Local:
   496 		&Unrecognized{XMLName: xml.Name{Space: nsBind, Local:
   477 					"bind"}}}
   497 					"bind"}}}
   510 	}
   530 	}
   511 	cl.HandleStanza(msg.Id, f)
   531 	cl.HandleStanza(msg.Id, f)
   512 	cl.xmlOut <- msg
   532 	cl.xmlOut <- msg
   513 }
   533 }
   514 
   534 
       
   535 // Register a callback to handle the next XMPP stanza (iq, message, or
       
   536 // presence) with a given id. The provided function will not be called
       
   537 // more than once. If it returns false, the stanza will not be made
       
   538 // available on the normal Client.In channel.
   515 func (cl *Client) HandleStanza(id string, f func(Stanza) bool) {
   539 func (cl *Client) HandleStanza(id string, f func(Stanza) bool) {
   516 	h := &stanzaHandler{id: id, f: f}
   540 	h := &stanzaHandler{id: id, f: f}
   517 	cl.handlers <- h
   541 	cl.handlers <- h
   518 }
   542 }