xmpp.go
branch20120108-close
changeset 64 ac0639692317
parent 63 c7f2edd25f4a
child 65 3d69b31c3839
equal deleted inserted replaced
63:c7f2edd25f4a 64:ac0639692317
   187 	cl.xmlOut <- hsOut
   187 	cl.xmlOut <- hsOut
   188 
   188 
   189 	return cl, nil
   189 	return cl, nil
   190 }
   190 }
   191 
   191 
   192 func (cl *Client) startTransport() (io.Reader, io.Writer) {
   192 func (cl *Client) startTransport() (io.Reader, io.WriteCloser) {
   193 	inr, inw := io.Pipe()
   193 	inr, inw := io.Pipe()
   194 	outr, outw := io.Pipe()
   194 	outr, outw := io.Pipe()
   195 	go cl.readTransport(inw)
   195 	go cl.readTransport(inw)
   196 	go cl.writeTransport(outr)
   196 	go cl.writeTransport(outr)
   197 	return inr, outw
   197 	return inr, outw
   202 	ch := make(chan interface{})
   202 	ch := make(chan interface{})
   203 	go readXml(r, ch, extStanza)
   203 	go readXml(r, ch, extStanza)
   204 	return ch
   204 	return ch
   205 }
   205 }
   206 
   206 
   207 func startXmlWriter(w io.Writer) chan<- interface{} {
   207 func startXmlWriter(w io.WriteCloser) chan<- interface{} {
   208 	ch := make(chan interface{})
   208 	ch := make(chan interface{})
   209 	go writeXml(w, ch)
   209 	go writeXml(w, ch)
   210 	return ch
   210 	return ch
   211 }
   211 }
   212 
   212 
   221 	go writeStream(xmlOut, ch, cl.inputControl)
   221 	go writeStream(xmlOut, ch, cl.inputControl)
   222 	return ch
   222 	return ch
   223 }
   223 }
   224 
   224 
   225 func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza {
   225 func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza {
   226 	cliOut := make(chan Stanza)
   226 	cliIn := make(chan Stanza)
   227 	filterOut := make(chan (<-chan Stanza))
   227 	filterOut := make(chan (<-chan Stanza))
   228 	filterIn := make(chan (<-chan Stanza))
   228 	filterIn := make(chan (<-chan Stanza))
   229 	nullFilter := make(chan Stanza)
   229 	nullFilter := make(chan Stanza)
   230 	go filterBottom(srvIn, nullFilter)
   230 	go filterBottom(srvIn, nullFilter)
   231 	go filterTop(filterOut, filterIn, nullFilter, cliOut)
   231 	go filterTop(filterOut, filterIn, nullFilter, cliIn)
   232 	cl.filterOut = filterOut
   232 	cl.filterOut = filterOut
   233 	cl.filterIn = filterIn
   233 	cl.filterIn = filterIn
   234 	return cliOut
   234 	return cliIn
   235 }
   235 }
   236 
   236 
   237 func tee(r io.Reader, w io.Writer, prefix string) {
   237 func tee(r io.Reader, w io.Writer, prefix string) {
   238 	defer tryClose(r, w)
   238 	defer func(w io.Writer) {
       
   239 		if c, ok := w.(io.Closer) ; ok {
       
   240 			c.Close()
       
   241 		}
       
   242 	}(w)
   239 
   243 
   240 	buf := bytes.NewBuffer([]uint8(prefix))
   244 	buf := bytes.NewBuffer([]uint8(prefix))
   241 	for {
   245 	for {
   242 		var c [1]byte
   246 		var c [1]byte
   243 		n, _ := r.Read(c[:])
   247 		n, _ := r.Read(c[:])
   255 		}
   259 		}
   256 	}
   260 	}
   257 	leftover := buf.String()
   261 	leftover := buf.String()
   258 	if leftover != "" {
   262 	if leftover != "" {
   259 		Log.Debug(buf.String())
   263 		Log.Debug(buf.String())
   260 	}
       
   261 }
       
   262 
       
   263 func tryClose(xs ...interface{}) {
       
   264 	f1 := func(ch chan<- interface{}) {
       
   265 		defer func() {
       
   266 			recover()
       
   267 		}()
       
   268 		close(ch)
       
   269 	}
       
   270 	f2 := func(ch <-chan interface{}) {
       
   271 		defer func() {
       
   272 			recover()
       
   273 		}()
       
   274 		close(ch)
       
   275 	}
       
   276 
       
   277 	for _, x := range xs {
       
   278 		if c, ok := x.(io.Closer) ; ok {
       
   279 			c.Close()
       
   280 		} else if ch, ok := x.(chan<- interface{}) ; ok {
       
   281 			f1(ch)
       
   282 		} else if ch, ok := x.(<-chan interface{}) ; ok {
       
   283 			f2(ch)
       
   284 		}
       
   285 	}
   264 	}
   286 }
   265 }
   287 
   266 
   288 // bindDone is called when we've finished resource binding (and all
   267 // bindDone is called when we've finished resource binding (and all
   289 // the negotiations that precede it). Now we can start accepting
   268 // the negotiations that precede it). Now we can start accepting