stream.go
changeset 74 e619e18dcec3
parent 72 53f15893a1a7
child 75 03a923eb5c01
equal deleted inserted replaced
73:acca351fb8f0 74:e619e18dcec3
     9 // those events to the client.
     9 // those events to the client.
    10 
    10 
    11 package xmpp
    11 package xmpp
    12 
    12 
    13 import (
    13 import (
    14 	"big"
       
    15 	"crypto/md5"
    14 	"crypto/md5"
    16 	"crypto/rand"
    15 	"crypto/rand"
    17 	"crypto/tls"
    16 	"crypto/tls"
    18 	"encoding/base64"
    17 	"encoding/base64"
       
    18 	"encoding/xml"
    19 	"fmt"
    19 	"fmt"
    20 	"io"
    20 	"io"
       
    21 	"log/syslog"
       
    22 	"math/big"
    21 	"net"
    23 	"net"
    22 	"os"
       
    23 	"regexp"
    24 	"regexp"
    24 	"strings"
    25 	"strings"
    25 	"syslog"
       
    26 	"time"
    26 	"time"
    27 	"xml"
       
    28 )
    27 )
    29 
    28 
    30 // Callback to handle a stanza with a particular id.
    29 // Callback to handle a stanza with a particular id.
    31 type stanzaHandler struct {
    30 type stanzaHandler struct {
    32 	id string
    31 	id string
    51 				if errno.Timeout() {
    50 				if errno.Timeout() {
    52 					continue
    51 					continue
    53 				}
    52 				}
    54 			}
    53 			}
    55 			if Log != nil {
    54 			if Log != nil {
    56 				Log.Err("read: " + err.String())
    55 				Log.Err("read: " + err.Error())
    57 			}
    56 			}
    58 			break
    57 			break
    59 		}
    58 		}
    60 		nw, err := w.Write(p[:nr])
    59 		nw, err := w.Write(p[:nr])
    61 		if nw < nr {
    60 		if nw < nr {
    62 			if Log != nil {
    61 			if Log != nil {
    63 				Log.Err("read: " + err.String())
    62 				Log.Err("read: " + err.Error())
    64 			}
    63 			}
    65 			break
    64 			break
    66 		}
    65 		}
    67 	}
    66 	}
    68 }
    67 }
    72 	p := make([]byte, 1024)
    71 	p := make([]byte, 1024)
    73 	for {
    72 	for {
    74 		nr, err := r.Read(p)
    73 		nr, err := r.Read(p)
    75 		if nr == 0 {
    74 		if nr == 0 {
    76 			if Log != nil {
    75 			if Log != nil {
    77 				Log.Err("write: " + err.String())
    76 				Log.Err("write: " + err.Error())
    78 			}
    77 			}
    79 			break
    78 			break
    80 		}
    79 		}
    81 		nw, err := cl.socket.Write(p[:nr])
    80 		nw, err := cl.socket.Write(p[:nr])
    82 		if nw < nr {
    81 		if nw < nr {
    83 			if Log != nil {
    82 			if Log != nil {
    84 				Log.Err("write: " + err.String())
    83 				Log.Err("write: " + err.Error())
    85 			}
    84 			}
    86 			break
    85 			break
    87 		}
    86 		}
    88 	}
    87 	}
    89 }
    88 }
    90 
    89 
    91 func readXml(r io.Reader, ch chan<- interface{},
    90 func readXml(r io.Reader, ch chan<- interface{},
    92 extStanza map[string]func(*xml.Name) interface{}) {
    91 	extStanza map[string]func(*xml.Name) interface{}) {
    93 	if Loglevel >= syslog.LOG_DEBUG {
    92 	if Loglevel >= syslog.LOG_DEBUG {
    94 		pr, pw := io.Pipe()
    93 		pr, pw := io.Pipe()
    95 		go tee(r, pw, "S: ")
    94 		go tee(r, pw, "S: ")
    96 		r = pr
    95 		r = pr
    97 	}
    96 	}
   101 Loop:
   100 Loop:
   102 	for {
   101 	for {
   103 		// Sniff the next token on the stream.
   102 		// Sniff the next token on the stream.
   104 		t, err := p.Token()
   103 		t, err := p.Token()
   105 		if t == nil {
   104 		if t == nil {
   106 			if err != os.EOF {
   105 			if err != io.EOF {
   107 				if Log != nil {
   106 				if Log != nil {
   108 					Log.Err("read: " + err.String())
   107 					Log.Err("read: " + err.Error())
   109 				}
   108 				}
   110 			}
   109 			}
   111 			break
   110 			break
   112 		}
   111 		}
   113 		var se xml.StartElement
   112 		var se xml.StartElement
   122 		case NsStream + " stream":
   121 		case NsStream + " stream":
   123 			st, err := parseStream(se)
   122 			st, err := parseStream(se)
   124 			if err != nil {
   123 			if err != nil {
   125 				if Log != nil {
   124 				if Log != nil {
   126 					Log.Err("unmarshal stream: " +
   125 					Log.Err("unmarshal stream: " +
   127 						err.String())
   126 						err.Error())
   128 				}
   127 				}
   129 				break Loop
   128 				break Loop
   130 			}
   129 			}
   131 			ch <- st
   130 			ch <- st
   132 			continue
   131 			continue
   155 
   154 
   156 		// Read the complete XML stanza.
   155 		// Read the complete XML stanza.
   157 		err = p.Unmarshal(obj, &se)
   156 		err = p.Unmarshal(obj, &se)
   158 		if err != nil {
   157 		if err != nil {
   159 			if Log != nil {
   158 			if Log != nil {
   160 				Log.Err("unmarshal: " + err.String())
   159 				Log.Err("unmarshal: " + err.Error())
   161 			}
   160 			}
   162 			break Loop
   161 			break Loop
   163 		}
   162 		}
   164 
   163 
   165 		// If it's a Stanza, we try to unmarshal its innerxml
   164 		// If it's a Stanza, we try to unmarshal its innerxml
   168 		if st, ok := obj.(Stanza); ok {
   167 		if st, ok := obj.(Stanza); ok {
   169 			err = parseExtended(st, extStanza)
   168 			err = parseExtended(st, extStanza)
   170 			if err != nil {
   169 			if err != nil {
   171 				if Log != nil {
   170 				if Log != nil {
   172 					Log.Err("ext unmarshal: " +
   171 					Log.Err("ext unmarshal: " +
   173 						err.String())
   172 						err.Error())
   174 				}
   173 				}
   175 				break Loop
   174 				break Loop
   176 			}
   175 			}
   177 		}
   176 		}
   178 
   177 
   179 		// Put it on the channel.
   178 		// Put it on the channel.
   180 		ch <- obj
   179 		ch <- obj
   181 	}
   180 	}
   182 }
   181 }
   183 
   182 
   184 func parseExtended(st Stanza, extStanza map[string]func(*xml.Name) interface{}) os.Error {
   183 func parseExtended(st Stanza, extStanza map[string]func(*xml.Name) interface{}) error {
   185 	// Now parse the stanza's innerxml to find the string that we
   184 	// Now parse the stanza's innerxml to find the string that we
   186 	// can unmarshal this nested element from.
   185 	// can unmarshal this nested element from.
   187 	reader := strings.NewReader(st.innerxml())
   186 	reader := strings.NewReader(st.innerxml())
   188 	p := xml.NewParser(reader)
   187 	p := xml.NewParser(reader)
   189 	for {
   188 	for {
   190 		t, err := p.Token()
   189 		t, err := p.Token()
   191 		if err == os.EOF {
   190 		if err == io.EOF {
   192 			break
   191 			break
   193 		}
   192 		}
   194 		if err != nil {
   193 		if err != nil {
   195 			return err
   194 			return err
   196 		}
   195 		}
   227 
   226 
   228 	for obj := range ch {
   227 	for obj := range ch {
   229 		err := xml.Marshal(w, obj)
   228 		err := xml.Marshal(w, obj)
   230 		if err != nil {
   229 		if err != nil {
   231 			if Log != nil {
   230 			if Log != nil {
   232 				Log.Err("write: " + err.String())
   231 				Log.Err("write: " + err.Error())
   233 			}
   232 			}
   234 			break
   233 			break
   235 		}
   234 		}
   236 	}
   235 	}
   237 }
   236 }
   290 // This loop is paused until resource binding is complete. Otherwise
   289 // This loop is paused until resource binding is complete. Otherwise
   291 // the app might inject something inappropriate into our negotiations
   290 // the app might inject something inappropriate into our negotiations
   292 // with the server. The control channel controls this loop's
   291 // with the server. The control channel controls this loop's
   293 // activity.
   292 // activity.
   294 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza,
   293 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza,
   295 control <-chan int) {
   294 	control <-chan int) {
   296 	defer close(srvOut)
   295 	defer close(srvOut)
   297 
   296 
   298 	var input <-chan Stanza
   297 	var input <-chan Stanza
   299 Loop:
   298 Loop:
   300 	for {
   299 	for {
   325 }
   324 }
   326 
   325 
   327 // Stanzas from the remote go up through a stack of filters to the
   326 // Stanzas from the remote go up through a stack of filters to the
   328 // app. This function manages the filters.
   327 // app. This function manages the filters.
   329 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza,
   328 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza,
   330 topFilter <-chan Stanza, app chan<- Stanza) {
   329 	topFilter <-chan Stanza, app chan<- Stanza) {
   331 	defer close(app)
   330 	defer close(app)
   332 Loop:
   331 Loop:
   333 	for {
   332 	for {
   334 		select {
   333 		select {
   335 		case newFilterOut := <-filterOut:
   334 		case newFilterOut := <-filterOut:
   462 		b64 := base64.StdEncoding
   461 		b64 := base64.StdEncoding
   463 		str, err := b64.DecodeString(srv.Chardata)
   462 		str, err := b64.DecodeString(srv.Chardata)
   464 		if err != nil {
   463 		if err != nil {
   465 			if Log != nil {
   464 			if Log != nil {
   466 				Log.Err("SASL challenge decode: " +
   465 				Log.Err("SASL challenge decode: " +
   467 					err.String())
   466 					err.Error())
   468 			}
   467 			}
   469 			return
   468 			return
   470 		}
   469 		}
   471 		srvMap := parseSasl(string(str))
   470 		srvMap := parseSasl(string(str))
   472 
   471 
   529 	randSize := big.NewInt(0)
   528 	randSize := big.NewInt(0)
   530 	randSize.Lsh(big.NewInt(1), 64)
   529 	randSize.Lsh(big.NewInt(1), 64)
   531 	cnonce, err := rand.Int(rand.Reader, randSize)
   530 	cnonce, err := rand.Int(rand.Reader, randSize)
   532 	if err != nil {
   531 	if err != nil {
   533 		if Log != nil {
   532 		if Log != nil {
   534 			Log.Err("SASL rand: " + err.String())
   533 			Log.Err("SASL rand: " + err.Error())
   535 		}
   534 		}
   536 		return
   535 		return
   537 	}
   536 	}
   538 	cnonceStr := fmt.Sprintf("%016x", cnonce)
   537 	cnonceStr := fmt.Sprintf("%016x", cnonce)
   539 
   538 
   603 	return strings.Join(terms, ",")
   602 	return strings.Join(terms, ",")
   604 }
   603 }
   605 
   604 
   606 // Computes the response string for digest authentication.
   605 // Computes the response string for digest authentication.
   607 func saslDigestResponse(username, realm, passwd, nonce, cnonceStr,
   606 func saslDigestResponse(username, realm, passwd, nonce, cnonceStr,
   608 authenticate, digestUri, nonceCountStr string) string {
   607 	authenticate, digestUri, nonceCountStr string) string {
   609 	h := func(text string) []byte {
   608 	h := func(text string) []byte {
   610 		h := md5.New()
   609 		h := md5.New()
   611 		h.Write([]byte(text))
   610 		h.Write([]byte(text))
   612 		return h.Sum()
   611 		return h.Sum(nil)
   613 	}
   612 	}
   614 	hex := func(bytes []byte) string {
   613 	hex := func(bytes []byte) string {
   615 		return fmt.Sprintf("%x", bytes)
   614 		return fmt.Sprintf("%x", bytes)
   616 	}
   615 	}
   617 	kd := func(secret, data string) []byte {
   616 	kd := func(secret, data string) []byte {