stream.go
branchgo.weekly.2012-01-15
changeset 93 fbd51fa6b7ea
parent 84 25c4296a3524
equal deleted inserted replaced
87:d19b556d4ea6 93:fbd51fa6b7ea
    17 	"crypto/tls"
    17 	"crypto/tls"
    18 	"encoding/base64"
    18 	"encoding/base64"
    19 	"encoding/xml"
    19 	"encoding/xml"
    20 	"fmt"
    20 	"fmt"
    21 	"io"
    21 	"io"
    22 	"log/syslog"
       
    23 	"math/big"
    22 	"math/big"
    24 	"net"
    23 	"net"
    25 	"regexp"
    24 	"regexp"
    26 	"strings"
    25 	"strings"
    27 	"time"
    26 	"time"
    50 // BUG(cjyar) Review all these *Client receiver methods. They should
    49 // BUG(cjyar) Review all these *Client receiver methods. They should
    51 // probably either all be receivers, or none.
    50 // probably either all be receivers, or none.
    52 
    51 
    53 func (cl *Client) readTransport(w io.WriteCloser) {
    52 func (cl *Client) readTransport(w io.WriteCloser) {
    54 	defer w.Close()
    53 	defer w.Close()
    55 	cl.socket.SetReadTimeout(1e8)
    54 	readDelay, _ := time.ParseDuration("1s")
    56 	p := make([]byte, 1024)
    55 	p := make([]byte, 1024)
    57 	for {
    56 	for {
    58 		if cl.socket == nil {
    57 		if cl.socket == nil {
    59 			cl.waitForSocket()
    58 			cl.waitForSocket()
       
    59 			readDelay = 0
       
    60 		}
       
    61 		if readDelay != 0 {
       
    62 			readTimeout := time.Now()
       
    63 			readTimeout.Add(readDelay)
       
    64 			cl.socket.SetReadDeadline(readTimeout)
    60 		}
    65 		}
    61 		nr, err := cl.socket.Read(p)
    66 		nr, err := cl.socket.Read(p)
    62 		if nr == 0 {
    67 		if nr == 0 {
    63 			if errno, ok := err.(*net.OpError); ok {
    68 			if errno, ok := err.(*net.OpError); ok {
    64 				if errno.Timeout() {
    69 				if errno.Timeout() {
   101 	}
   106 	}
   102 }
   107 }
   103 
   108 
   104 func readXml(r io.Reader, ch chan<- interface{},
   109 func readXml(r io.Reader, ch chan<- interface{},
   105 	extStanza map[string]func(*xml.Name) interface{}) {
   110 	extStanza map[string]func(*xml.Name) interface{}) {
   106 	if Loglevel >= syslog.LOG_DEBUG {
   111 	if Loglevel >= LOG_DEBUG {
   107 		pr, pw := io.Pipe()
   112 		pr, pw := io.Pipe()
   108 		go tee(r, pw, "S: ")
   113 		go tee(r, pw, "S: ")
   109 		r = pr
   114 		r = pr
   110 	}
   115 	}
   111 	defer close(ch)
   116 	defer close(ch)
   112 
   117 
   113 	p := xml.NewParser(r)
   118 	p := xml.NewDecoder(r)
   114 Loop:
   119 Loop:
   115 	for {
   120 	for {
   116 		// Sniff the next token on the stream.
   121 		// Sniff the next token on the stream.
   117 		t, err := p.Token()
   122 		t, err := p.Token()
   118 		if t == nil {
   123 		if t == nil {
   160 			obj = &Message{}
   165 			obj = &Message{}
   161 		case "jabber:client presence":
   166 		case "jabber:client presence":
   162 			obj = &Presence{}
   167 			obj = &Presence{}
   163 		default:
   168 		default:
   164 			obj = &Generic{}
   169 			obj = &Generic{}
   165 			if Log != nil && Loglevel >= syslog.LOG_NOTICE {
   170 			if Log != nil && Loglevel >= LOG_NOTICE {
   166 				Log.Printf("Ignoring unrecognized: %s %s",
   171 				Log.Printf("Ignoring unrecognized: %s %s",
   167 					se.Name.Space, se.Name.Local)
   172 					se.Name.Space, se.Name.Local)
   168 			}
   173 			}
   169 		}
   174 		}
   170 
   175 
   171 		// Read the complete XML stanza.
   176 		// Read the complete XML stanza.
   172 		err = p.Unmarshal(obj, &se)
   177 		err = p.DecodeElement(obj, &se)
   173 		if err != nil {
   178 		if err != nil {
   174 			if Log != nil {
   179 			if Log != nil {
   175 				Log.Println("unmarshal: " + err.Error())
   180 				Log.Println("unmarshal: " + err.Error())
   176 			}
   181 			}
   177 			break Loop
   182 			break Loop
   198 
   203 
   199 func parseExtended(st Stanza, extStanza map[string]func(*xml.Name) interface{}) error {
   204 func parseExtended(st Stanza, extStanza map[string]func(*xml.Name) interface{}) error {
   200 	// Now parse the stanza's innerxml to find the string that we
   205 	// Now parse the stanza's innerxml to find the string that we
   201 	// can unmarshal this nested element from.
   206 	// can unmarshal this nested element from.
   202 	reader := strings.NewReader(st.innerxml())
   207 	reader := strings.NewReader(st.innerxml())
   203 	p := xml.NewParser(reader)
   208 	p := xml.NewDecoder(reader)
   204 	for {
   209 	for {
   205 		t, err := p.Token()
   210 		t, err := p.Token()
   206 		if err == io.EOF {
   211 		if err == io.EOF {
   207 			break
   212 			break
   208 		}
   213 		}
   214 				// Call the indicated constructor.
   219 				// Call the indicated constructor.
   215 				nested := con(&se.Name)
   220 				nested := con(&se.Name)
   216 
   221 
   217 				// Unmarshal the nested element and
   222 				// Unmarshal the nested element and
   218 				// stuff it back into the stanza.
   223 				// stuff it back into the stanza.
   219 				err := p.Unmarshal(nested, &se)
   224 				err := p.DecodeElement(nested, &se)
   220 				if err != nil {
   225 				if err != nil {
   221 					return err
   226 					return err
   222 				}
   227 				}
   223 				st.addNested(nested)
   228 				st.addNested(nested)
   224 			}
   229 			}
   227 
   232 
   228 	return nil
   233 	return nil
   229 }
   234 }
   230 
   235 
   231 func writeXml(w io.Writer, ch <-chan interface{}) {
   236 func writeXml(w io.Writer, ch <-chan interface{}) {
   232 	if Loglevel >= syslog.LOG_DEBUG {
   237 	if Loglevel >= LOG_DEBUG {
   233 		pr, pw := io.Pipe()
   238 		pr, pw := io.Pipe()
   234 		go tee(pr, w, "C: ")
   239 		go tee(pr, w, "C: ")
   235 		w = pw
   240 		w = pw
   236 	}
   241 	}
   237 	defer func(w io.Writer) {
   242 	defer func(w io.Writer) {
   239 			c.Close()
   244 			c.Close()
   240 		}
   245 		}
   241 	}(w)
   246 	}(w)
   242 
   247 
   243 	for obj := range ch {
   248 	for obj := range ch {
   244 		err := xml.Marshal(w, obj)
   249 		err := xml.NewEncoder(w).Encode(obj)
   245 		if err != nil {
   250 		if err != nil {
   246 			if Log != nil {
   251 			if Log != nil {
   247 				Log.Println("write: " + err.Error())
   252 				Log.Println("write: " + err.Error())
   248 			}
   253 			}
   249 			break
   254 			break
   282 			if !send {
   287 			if !send {
   283 				continue
   288 				continue
   284 			}
   289 			}
   285 			st, ok := x.(Stanza)
   290 			st, ok := x.(Stanza)
   286 			if !ok {
   291 			if !ok {
   287 				if Log != nil && Loglevel >= syslog.LOG_WARNING {
   292 				if Log != nil && Loglevel >= LOG_WARNING {
   288 					Log.Printf(
   293 					Log.Printf(
   289 						"Unhandled non-stanza: %v", x)
   294 						"Unhandled non-stanza: %v", x)
   290 				}
   295 				}
   291 				continue
   296 				continue
   292 			}
   297 			}
   326 		case x, ok := <-input:
   331 		case x, ok := <-input:
   327 			if !ok {
   332 			if !ok {
   328 				break Loop
   333 				break Loop
   329 			}
   334 			}
   330 			if x == nil {
   335 			if x == nil {
   331 				if Log != nil && Loglevel >= syslog.LOG_NOTICE {
   336 				if Log != nil && Loglevel >= LOG_NOTICE {
   332 					Log.Println("Refusing to send" +
   337 					Log.Println("Refusing to send" +
   333 						" nil stanza")
   338 						" nil stanza")
   334 				}
   339 				}
   335 				continue
   340 				continue
   336 			}
   341 			}
   347 Loop:
   352 Loop:
   348 	for {
   353 	for {
   349 		select {
   354 		select {
   350 		case newFilterOut := <-filterOut:
   355 		case newFilterOut := <-filterOut:
   351 			if newFilterOut == nil {
   356 			if newFilterOut == nil {
   352 				if Log != nil && Loglevel >= syslog.LOG_WARNING {
   357 				if Log != nil && Loglevel >= LOG_WARNING {
   353 					Log.Println("Received nil filter")
   358 					Log.Println("Received nil filter")
   354 				}
   359 				}
   355 				filterIn <- nil
   360 				filterIn <- nil
   356 				continue
   361 				continue
   357 			}
   362 			}
   376 
   381 
   377 func handleStream(ss *stream) {
   382 func handleStream(ss *stream) {
   378 }
   383 }
   379 
   384 
   380 func (cl *Client) handleStreamError(se *streamError) {
   385 func (cl *Client) handleStreamError(se *streamError) {
   381 	if Log != nil && Loglevel >= syslog.LOG_NOTICE {
   386 	if Log != nil && Loglevel >= LOG_NOTICE {
   382 		Log.Printf("Received stream error: %v", se)
   387 		Log.Printf("Received stream error: %v", se)
   383 	}
   388 	}
   384 	close(cl.Out)
   389 	close(cl.Out)
   385 }
   390 }
   386 
   391 
   426 	// for it to signal that it's working again.
   431 	// for it to signal that it's working again.
   427 	cl.socketSync.Add(1)
   432 	cl.socketSync.Add(1)
   428 	cl.socket = tls
   433 	cl.socket = tls
   429 	cl.socketSync.Wait()
   434 	cl.socketSync.Wait()
   430 
   435 
   431 	// Reset the read timeout on the (underlying) socket so the
   436 	if Log != nil && Loglevel >= LOG_INFO {
   432 	// reader doesn't get woken up unnecessarily.
       
   433 	tcp.SetReadTimeout(0)
       
   434 
       
   435 	if Log != nil && Loglevel >= syslog.LOG_INFO {
       
   436 		Log.Println("TLS negotiation succeeded.")
   437 		Log.Println("TLS negotiation succeeded.")
   437 	}
   438 	}
   438 	cl.Features = nil
   439 	cl.Features = nil
   439 
   440 
   440 	// Now re-send the initial handshake message to start the new
   441 	// Now re-send the initial handshake message to start the new
   468 			external = true
   469 			external = true
   469 		}
   470 		}
   470 	}
   471 	}
   471 
   472 
   472 	if external {
   473 	if external {
   473 		auth := &auth{XMLName: xml.Name{Space: NsSASL, Local:
   474 		auth := &auth{XMLName: xml.Name{Space: NsSASL, Local: "auth"}, Mechanism: "EXTERNAL"}
   474 				"auth"}, Mechanism: "EXTERNAL"}
       
   475 		cl.xmlOut <- auth
   475 		cl.xmlOut <- auth
   476 	} else if digestMd5 {
   476 	} else if digestMd5 {
   477 		auth := &auth{XMLName: xml.Name{Space: NsSASL, Local: "auth"}, Mechanism: "DIGEST-MD5"}
   477 		auth := &auth{XMLName: xml.Name{Space: NsSASL, Local: "auth"}, Mechanism: "DIGEST-MD5"}
   478 		cl.xmlOut <- auth
   478 		cl.xmlOut <- auth
   479 	} else {
   479 	} else {
   480 		if Log != nil {
   480 		if Log != nil {
   481 			buf := bytes.NewBuffer(nil)
   481 			buf := bytes.NewBuffer(nil)
   482 			xml.Marshal(buf, fe)
   482 			xml.NewEncoder(buf).Encode(fe)
   483 			Log.Printf("No supported mechanisms: %s",
   483 			Log.Printf("No supported mechanisms: %s",
   484 				buf.String())
   484 				buf.String())
   485 		}
   485 		}
   486 		abort := Generic{XMLName: xml.Name{Local: "abort",
   486 		abort := Generic{XMLName: xml.Name{Local: "abort",
   487 			Space: NsSASL}}
   487 			Space: NsSASL}}
   488 		cl.xmlOut <- abort
   488 		cl.xmlOut <- abort
   489 		se := streamError{Any: Generic{XMLName:
   489 		se := streamError{Any: Generic{XMLName: xml.Name{Local: "undefined-condition",
   490 				xml.Name{Local: "undefined-condition",
   490 			Space: NsStreams}}, Text: &errText{Lang: "en", Text: "No supported mechs"}}
   491 				Space: NsStreams}}, Text:
       
   492 			&errText{Lang: "en", Text: "No supported mechs"}}
       
   493 		cl.xmlOut <- se
   491 		cl.xmlOut <- se
   494 		close(cl.xmlOut)
   492 		close(cl.xmlOut)
   495 	}
   493 	}
   496 }
   494 }
   497 
   495 
   513 			cl.saslDigest1(srvMap)
   511 			cl.saslDigest1(srvMap)
   514 		} else {
   512 		} else {
   515 			cl.saslDigest2(srvMap)
   513 			cl.saslDigest2(srvMap)
   516 		}
   514 		}
   517 	case "failure":
   515 	case "failure":
   518 		if Log != nil && Loglevel >= syslog.LOG_NOTICE {
   516 		if Log != nil && Loglevel >= LOG_NOTICE {
   519 			Log.Println("SASL authentication failed")
   517 			Log.Println("SASL authentication failed")
   520 		}
   518 		}
   521 	case "success":
   519 	case "success":
   522 		if Log != nil && Loglevel >= syslog.LOG_INFO {
   520 		if Log != nil && Loglevel >= LOG_INFO {
   523 			Log.Println("Sasl authentication succeeded")
   521 			Log.Println("Sasl authentication succeeded")
   524 		}
   522 		}
   525 		cl.Features = nil
   523 		cl.Features = nil
   526 		cl.xmlOut <- openStream(&cl.Jid)
   524 		cl.xmlOut <- openStream(&cl.Jid)
   527 	}
   525 	}
   706 				Log.Println(err.Error())
   704 				Log.Println(err.Error())
   707 			}
   705 			}
   708 			return false
   706 			return false
   709 		}
   707 		}
   710 		cl.Jid = *jid
   708 		cl.Jid = *jid
   711 		if Log != nil && Loglevel >= syslog.LOG_INFO {
   709 		if Log != nil && Loglevel >= LOG_INFO {
   712 			Log.Println("Bound resource: " + cl.Jid.String())
   710 			Log.Println("Bound resource: " + cl.Jid.String())
   713 		}
   711 		}
   714 		cl.bindDone()
   712 		cl.bindDone()
   715 		return false
   713 		return false
   716 	}
   714 	}