stream.go
changeset 100 24231ff0016c
parent 98 c9cc4eda6dce
child 102 872e936f9f3f
equal deleted inserted replaced
98:c9cc4eda6dce 100:24231ff0016c
    16 	"crypto/tls"
    16 	"crypto/tls"
    17 	"encoding/base64"
    17 	"encoding/base64"
    18 	"encoding/xml"
    18 	"encoding/xml"
    19 	"fmt"
    19 	"fmt"
    20 	"io"
    20 	"io"
    21 	"log/syslog"
       
    22 	"math/big"
    21 	"math/big"
    23 	"net"
    22 	"net"
    24 	"regexp"
    23 	"regexp"
    25 	"strings"
    24 	"strings"
    26 	"time"
    25 	"time"
    49 			if errno, ok := err.(*net.OpError); ok {
    48 			if errno, ok := err.(*net.OpError); ok {
    50 				if errno.Timeout() {
    49 				if errno.Timeout() {
    51 					continue
    50 					continue
    52 				}
    51 				}
    53 			}
    52 			}
    54 			if Log != nil {
    53 			Warnf("read: %s", err)
    55 				Log.Err("read: " + err.Error())
       
    56 			}
       
    57 			break
    54 			break
    58 		}
    55 		}
    59 		nw, err := w.Write(p[:nr])
    56 		nw, err := w.Write(p[:nr])
    60 		if nw < nr {
    57 		if nw < nr {
    61 			if Log != nil {
    58 			Warnf("read: %s", err)
    62 				Log.Err("read: " + err.Error())
       
    63 			}
       
    64 			break
    59 			break
    65 		}
    60 		}
    66 	}
    61 	}
    67 }
    62 }
    68 
    63 
    70 	defer cl.socket.Close()
    65 	defer cl.socket.Close()
    71 	p := make([]byte, 1024)
    66 	p := make([]byte, 1024)
    72 	for {
    67 	for {
    73 		nr, err := r.Read(p)
    68 		nr, err := r.Read(p)
    74 		if nr == 0 {
    69 		if nr == 0 {
    75 			if Log != nil {
    70 			Warnf("write: %s", err)
    76 				Log.Err("write: " + err.Error())
       
    77 			}
       
    78 			break
    71 			break
    79 		}
    72 		}
    80 		nw, err := cl.socket.Write(p[:nr])
    73 		nw, err := cl.socket.Write(p[:nr])
    81 		if nw < nr {
    74 		if nw < nr {
    82 			if Log != nil {
    75 			Warnf("write: %s", err)
    83 				Log.Err("write: " + err.Error())
       
    84 			}
       
    85 			break
    76 			break
    86 		}
    77 		}
    87 	}
    78 	}
    88 }
    79 }
    89 
    80 
    90 func readXml(r io.Reader, ch chan<- interface{},
    81 func readXml(r io.Reader, ch chan<- interface{},
    91 extStanza map[string]func(*xml.Name) interface{}) {
    82 extStanza map[string]func(*xml.Name) interface{}) {
    92 	if Loglevel >= syslog.LOG_DEBUG {
    83 	if Debug != nil {
    93 		pr, pw := io.Pipe()
    84 		pr, pw := io.Pipe()
    94 		go tee(r, pw, "S: ")
    85 		go tee(r, pw, "S: ")
    95 		r = pr
    86 		r = pr
    96 	}
    87 	}
    97 	defer close(ch)
    88 	defer close(ch)
   103 	for {
    94 	for {
   104 		// Sniff the next token on the stream.
    95 		// Sniff the next token on the stream.
   105 		t, err := p.Token()
    96 		t, err := p.Token()
   106 		if t == nil {
    97 		if t == nil {
   107 			if err != io.EOF {
    98 			if err != io.EOF {
   108 				if Log != nil {
    99 				Warnf("read: %s", err)
   109 					Log.Err("read: " + err.Error())
       
   110 				}
       
   111 			}
   100 			}
   112 			break
   101 			break
   113 		}
   102 		}
   114 		var se xml.StartElement
   103 		var se xml.StartElement
   115 		var ok bool
   104 		var ok bool
   121 		var obj interface{}
   110 		var obj interface{}
   122 		switch se.Name.Space + " " + se.Name.Local {
   111 		switch se.Name.Space + " " + se.Name.Local {
   123 		case NsStream + " stream":
   112 		case NsStream + " stream":
   124 			st, err := parseStream(se)
   113 			st, err := parseStream(se)
   125 			if err != nil {
   114 			if err != nil {
   126 				if Log != nil {
   115 				Warnf("unmarshal stream: %s", err)
   127 					Log.Err("unmarshal stream: " +
       
   128 						err.Error())
       
   129 				}
       
   130 				break Loop
   116 				break Loop
   131 			}
   117 			}
   132 			ch <- st
   118 			ch <- st
   133 			continue
   119 			continue
   134 		case "stream error", NsStream + " error":
   120 		case "stream error", NsStream + " error":
   146 			obj = &Message{}
   132 			obj = &Message{}
   147 		case NsClient + " presence":
   133 		case NsClient + " presence":
   148 			obj = &Presence{}
   134 			obj = &Presence{}
   149 		default:
   135 		default:
   150 			obj = &Generic{}
   136 			obj = &Generic{}
   151 			if Log != nil {
   137 			Infof("Ignoring unrecognized: %s %s", se.Name.Space,
   152 				Log.Notice("Ignoring unrecognized: " +
   138 				se.Name.Local)
   153 					se.Name.Space + " " + se.Name.Local)
       
   154 			}
       
   155 		}
   139 		}
   156 
   140 
   157 		// Read the complete XML stanza.
   141 		// Read the complete XML stanza.
   158 		err = p.DecodeElement(obj, &se)
   142 		err = p.DecodeElement(obj, &se)
   159 		if err != nil {
   143 		if err != nil {
   160 			if Log != nil {
   144 			Warnf("unmarshal: %s", err)
   161 				Log.Err("unmarshal: " + err.Error())
       
   162 			}
       
   163 			break Loop
   145 			break Loop
   164 		}
   146 		}
   165 
   147 
   166 		// If it's a Stanza, we try to unmarshal its innerxml
   148 		// If it's a Stanza, we try to unmarshal its innerxml
   167 		// into objects of the appropriate respective
   149 		// into objects of the appropriate respective
   168 		// types. This is specified by our extensions.
   150 		// types. This is specified by our extensions.
   169 		if st, ok := obj.(Stanza); ok {
   151 		if st, ok := obj.(Stanza); ok {
   170 			err = parseExtended(st, extStanza)
   152 			err = parseExtended(st, extStanza)
   171 			if err != nil {
   153 			if err != nil {
   172 				if Log != nil {
   154 				Warnf("ext unmarshal: %s", err)
   173 					Log.Err("ext unmarshal: " +
       
   174 						err.Error())
       
   175 				}
       
   176 				fmt.Printf("ext: %v\n", err)
       
   177 				break Loop
   155 				break Loop
   178 			}
   156 			}
   179 		}
   157 		}
   180 
   158 
   181 		// Put it on the channel.
   159 		// Put it on the channel.
   214 
   192 
   215 	return nil
   193 	return nil
   216 }
   194 }
   217 
   195 
   218 func writeXml(w io.Writer, ch <-chan interface{}) {
   196 func writeXml(w io.Writer, ch <-chan interface{}) {
   219 	if Loglevel >= syslog.LOG_DEBUG {
   197 	if Debug != nil {
   220 		pr, pw := io.Pipe()
   198 		pr, pw := io.Pipe()
   221 		go tee(pr, w, "C: ")
   199 		go tee(pr, w, "C: ")
   222 		w = pw
   200 		w = pw
   223 	}
   201 	}
   224 	defer func(w io.Writer) {
   202 	defer func(w io.Writer) {
   233 
   211 
   234 	for obj := range ch {
   212 	for obj := range ch {
   235 		if st, ok := obj.(*stream); ok {
   213 		if st, ok := obj.(*stream); ok {
   236 			_, err := w.Write([]byte(st.String()))
   214 			_, err := w.Write([]byte(st.String()))
   237 			if err != nil {
   215 			if err != nil {
   238 				if Log != nil {
   216 				Warnf("write: %s", err)
   239 					Log.Err("write: " + err.Error())
       
   240 				}
       
   241 			}
   217 			}
   242 		} else {
   218 		} else {
   243 			err := enc.Encode(obj)
   219 			err := enc.Encode(obj)
   244 			if err != nil {
   220 			if err != nil {
   245 				if Log != nil {
   221 				Warnf("marshal: %s", err)
   246 					Log.Err("marshal: " + err.Error())
       
   247 				}
       
   248 				break
   222 				break
   249 			}
   223 			}
   250 		}
   224 		}
   251 	}
   225 	}
   252 }
   226 }
   282 			if !send {
   256 			if !send {
   283 				continue
   257 				continue
   284 			}
   258 			}
   285 			st, ok := x.(Stanza)
   259 			st, ok := x.(Stanza)
   286 			if !ok {
   260 			if !ok {
   287 				if Log != nil {
   261 				Warnf("Unhandled non-stanza: %v", x)
   288 					Log.Warning(fmt.Sprintf(
       
   289 						"Unhandled non-stanza: %v", x))
       
   290 				}
       
   291 				continue
   262 				continue
   292 			}
   263 			}
   293 			if handlers[st.GetId()] != nil {
   264 			if handlers[st.GetId()] != nil {
   294 				f := handlers[st.GetId()]
   265 				f := handlers[st.GetId()]
   295 				delete(handlers, st.GetId())
   266 				delete(handlers, st.GetId())
   326 		case x, ok := <-input:
   297 		case x, ok := <-input:
   327 			if !ok {
   298 			if !ok {
   328 				break Loop
   299 				break Loop
   329 			}
   300 			}
   330 			if x == nil {
   301 			if x == nil {
   331 				if Log != nil {
   302 				Infof("Refusing to send nil stanza")
   332 					Log.Notice("Refusing to send" +
       
   333 						" nil stanza")
       
   334 				}
       
   335 				continue
   303 				continue
   336 			}
   304 			}
   337 			srvOut <- x
   305 			srvOut <- x
   338 		}
   306 		}
   339 	}
   307 	}
   347 Loop:
   315 Loop:
   348 	for {
   316 	for {
   349 		select {
   317 		select {
   350 		case newFilterOut := <-filterOut:
   318 		case newFilterOut := <-filterOut:
   351 			if newFilterOut == nil {
   319 			if newFilterOut == nil {
   352 				if Log != nil {
   320 				Warnf("Received nil filter")
   353 					Log.Warning("Received nil filter")
       
   354 				}
       
   355 				filterIn <- nil
   321 				filterIn <- nil
   356 				continue
   322 				continue
   357 			}
   323 			}
   358 			filterIn <- topFilter
   324 			filterIn <- topFilter
   359 			topFilter = newFilterOut
   325 			topFilter = newFilterOut
   376 
   342 
   377 func handleStream(ss *stream) {
   343 func handleStream(ss *stream) {
   378 }
   344 }
   379 
   345 
   380 func (cl *Client) handleStreamError(se *streamError) {
   346 func (cl *Client) handleStreamError(se *streamError) {
   381 	if Log != nil {
   347 	Infof("Received stream error: %v", se)
   382 		Log.Notice(fmt.Sprintf("Received stream error: %v", se))
       
   383 	}
       
   384 	close(cl.Out)
   348 	close(cl.Out)
   385 }
   349 }
   386 
   350 
   387 func (cl *Client) handleFeatures(fe *Features) {
   351 func (cl *Client) handleFeatures(fe *Features) {
   388 	cl.Features = fe
   352 	cl.Features = fe
   423 	// for it to signal that it's working again.
   387 	// for it to signal that it's working again.
   424 	cl.socketSync.Add(1)
   388 	cl.socketSync.Add(1)
   425 	cl.socket = tls
   389 	cl.socket = tls
   426 	cl.socketSync.Wait()
   390 	cl.socketSync.Wait()
   427 
   391 
   428 	if Log != nil {
   392 	Infof("TLS negotiation succeeded.")
   429 		Log.Info("TLS negotiation succeeded.")
       
   430 	}
       
   431 	cl.Features = nil
   393 	cl.Features = nil
   432 
   394 
   433 	// Now re-send the initial handshake message to start the new
   395 	// Now re-send the initial handshake message to start the new
   434 	// session.
   396 	// session.
   435 	hsOut := &stream{To: cl.Jid.Domain, Version: Version}
   397 	hsOut := &stream{To: cl.Jid.Domain, Version: Version}
   471 	switch strings.ToLower(srv.XMLName.Local) {
   433 	switch strings.ToLower(srv.XMLName.Local) {
   472 	case "challenge":
   434 	case "challenge":
   473 		b64 := base64.StdEncoding
   435 		b64 := base64.StdEncoding
   474 		str, err := b64.DecodeString(srv.Chardata)
   436 		str, err := b64.DecodeString(srv.Chardata)
   475 		if err != nil {
   437 		if err != nil {
   476 			if Log != nil {
   438 			Warnf("SASL challenge decode: %s", err)
   477 				Log.Err("SASL challenge decode: " +
       
   478 					err.Error())
       
   479 			}
       
   480 			return
   439 			return
   481 		}
   440 		}
   482 		srvMap := parseSasl(string(str))
   441 		srvMap := parseSasl(string(str))
   483 
   442 
   484 		if cl.saslExpected == "" {
   443 		if cl.saslExpected == "" {
   485 			cl.saslDigest1(srvMap)
   444 			cl.saslDigest1(srvMap)
   486 		} else {
   445 		} else {
   487 			cl.saslDigest2(srvMap)
   446 			cl.saslDigest2(srvMap)
   488 		}
   447 		}
   489 	case "failure":
   448 	case "failure":
   490 		if Log != nil {
   449 		Infof("SASL authentication failed")
   491 			Log.Notice("SASL authentication failed")
       
   492 		}
       
   493 	case "success":
   450 	case "success":
   494 		if Log != nil {
   451 		Infof("Sasl authentication succeeded")
   495 			Log.Info("Sasl authentication succeeded")
       
   496 		}
       
   497 		cl.Features = nil
   452 		cl.Features = nil
   498 		ss := &stream{To: cl.Jid.Domain, Version: Version}
   453 		ss := &stream{To: cl.Jid.Domain, Version: Version}
   499 		cl.xmlOut <- ss
   454 		cl.xmlOut <- ss
   500 	}
   455 	}
   501 }
   456 }
   507 		if qop == "auth" {
   462 		if qop == "auth" {
   508 			hasAuth = true
   463 			hasAuth = true
   509 		}
   464 		}
   510 	}
   465 	}
   511 	if !hasAuth {
   466 	if !hasAuth {
   512 		if Log != nil {
   467 		Warnf("Server doesn't support SASL auth")
   513 			Log.Err("Server doesn't support SASL auth")
       
   514 		}
       
   515 		return
   468 		return
   516 	}
   469 	}
   517 
   470 
   518 	// Pick a realm.
   471 	// Pick a realm.
   519 	var realm string
   472 	var realm string
   539 	// Generate our own nonce from random data.
   492 	// Generate our own nonce from random data.
   540 	randSize := big.NewInt(0)
   493 	randSize := big.NewInt(0)
   541 	randSize.Lsh(big.NewInt(1), 64)
   494 	randSize.Lsh(big.NewInt(1), 64)
   542 	cnonce, err := rand.Int(rand.Reader, randSize)
   495 	cnonce, err := rand.Int(rand.Reader, randSize)
   543 	if err != nil {
   496 	if err != nil {
   544 		if Log != nil {
   497 		Warnf("SASL rand: %s", err)
   545 			Log.Err("SASL rand: " + err.Error())
       
   546 		}
       
   547 		return
   498 		return
   548 	}
   499 	}
   549 	cnonceStr := fmt.Sprintf("%016x", cnonce)
   500 	cnonceStr := fmt.Sprintf("%016x", cnonce)
   550 
   501 
   551 	/* Now encode the actual password response, as well as the
   502 	/* Now encode the actual password response, as well as the
   647 	}
   598 	}
   648 	msg := &Iq{Type: "set", Id: <-Id, Nested: []interface{}{bindReq}}
   599 	msg := &Iq{Type: "set", Id: <-Id, Nested: []interface{}{bindReq}}
   649 	f := func(st Stanza) bool {
   600 	f := func(st Stanza) bool {
   650 		iq, ok := st.(*Iq)
   601 		iq, ok := st.(*Iq)
   651 		if !ok {
   602 		if !ok {
   652 			if Log != nil {
   603 			Warnf("non-iq response")
   653 				Log.Err("non-iq response")
       
   654 			}
       
   655 		}
   604 		}
   656 		if iq.Type == "error" {
   605 		if iq.Type == "error" {
   657 			if Log != nil {
   606 			Warnf("Resource binding failed")
   658 				Log.Err("Resource binding failed")
       
   659 			}
       
   660 			return false
   607 			return false
   661 		}
   608 		}
   662 		var bindRepl *bindIq
   609 		var bindRepl *bindIq
   663 		for _, ele := range iq.Nested {
   610 		for _, ele := range iq.Nested {
   664 			if b, ok := ele.(*bindIq); ok {
   611 			if b, ok := ele.(*bindIq); ok {
   665 				bindRepl = b
   612 				bindRepl = b
   666 				break
   613 				break
   667 			}
   614 			}
   668 		}
   615 		}
   669 		if bindRepl == nil {
   616 		if bindRepl == nil {
   670 			if Log != nil {
   617 			Warnf("Bad bind reply: %v", iq)
   671 				Log.Err(fmt.Sprintf("Bad bind reply: %v",
       
   672 					iq))
       
   673 			}
       
   674 			return false
   618 			return false
   675 		}
   619 		}
   676 		jidStr := bindRepl.Jid
   620 		jidStr := bindRepl.Jid
   677 		if jidStr == nil || *jidStr == "" {
   621 		if jidStr == nil || *jidStr == "" {
   678 			if Log != nil {
   622 			Warnf("Can't bind empty resource")
   679 				Log.Err("Can't bind empty resource")
       
   680 			}
       
   681 			return false
   623 			return false
   682 		}
   624 		}
   683 		jid := new(JID)
   625 		jid := new(JID)
   684 		if err := jid.Set(*jidStr); err != nil {
   626 		if err := jid.Set(*jidStr); err != nil {
   685 			if Log != nil {
   627 			Warnf("Can't parse JID %s: %s", *jidStr, err)
   686 				Log.Err(fmt.Sprintf("Can't parse JID %s: %s",
       
   687 						*jidStr, err))
       
   688 			}
       
   689 			return false
   628 			return false
   690 		}
   629 		}
   691 		cl.Jid = *jid
   630 		cl.Jid = *jid
   692 		if Log != nil {
   631 		Infof("Bound resource: %s", cl.Jid.String())
   693 			Log.Info("Bound resource: " + cl.Jid.String())
       
   694 		}
       
   695 		cl.bindDone()
   632 		cl.bindDone()
   696 		return false
   633 		return false
   697 	}
   634 	}
   698 	cl.HandleStanza(msg.Id, f)
   635 	cl.HandleStanza(msg.Id, f)
   699 	cl.xmlOut <- msg
   636 	cl.xmlOut <- msg