xmpp.go
changeset 69 a38173c1c8b2
parent 67 e8ad85bb6608
child 72 53f15893a1a7
equal deleted inserted replaced
62:6e2eea62ccca 69:a38173c1c8b2
     3 // license that can be found in the LICENSE file.
     3 // license that can be found in the LICENSE file.
     4 
     4 
     5 // This package implements a simple XMPP client according to RFCs 3920
     5 // This package implements a simple XMPP client according to RFCs 3920
     6 // and 3921, plus the various XEPs at http://xmpp.org/protocols/.
     6 // and 3921, plus the various XEPs at http://xmpp.org/protocols/.
     7 package xmpp
     7 package xmpp
     8 
       
     9 // BUG(cjyar) Figure out why the library doesn't exit when the server
       
    10 // closes its stream to us.
       
    11 
     8 
    12 import (
     9 import (
    13 	"bytes"
    10 	"bytes"
    14 	"fmt"
    11 	"fmt"
    15 	"io"
    12 	"io"
   100 	// StartSession() returns.
    97 	// StartSession() returns.
   101 	Features *Features
    98 	Features *Features
   102 	filterOut chan<- <-chan Stanza
    99 	filterOut chan<- <-chan Stanza
   103 	filterIn <-chan <-chan Stanza
   100 	filterIn <-chan <-chan Stanza
   104 }
   101 }
   105 var _ io.Closer = &Client{}
       
   106 
   102 
   107 // Connect to the appropriate server and authenticate as the given JID
   103 // Connect to the appropriate server and authenticate as the given JID
   108 // with the given password. This function will return as soon as a TCP
   104 // with the given password. This function will return as soon as a TCP
   109 // connection has been established, but before XMPP stream negotiation
   105 // connection has been established, but before XMPP stream negotiation
   110 // has completed. The negotiation will occur asynchronously, and any
   106 // has completed. The negotiation will occur asynchronously, and any
   186 	cl.xmlOut <- hsOut
   182 	cl.xmlOut <- hsOut
   187 
   183 
   188 	return cl, nil
   184 	return cl, nil
   189 }
   185 }
   190 
   186 
   191 func (c *Client) Close() os.Error {
   187 func (cl *Client) startTransport() (io.Reader, io.WriteCloser) {
   192 	tryClose(c.In, c.Out)
       
   193 	return nil
       
   194 }
       
   195 
       
   196 func (cl *Client) startTransport() (io.Reader, io.Writer) {
       
   197 	inr, inw := io.Pipe()
   188 	inr, inw := io.Pipe()
   198 	outr, outw := io.Pipe()
   189 	outr, outw := io.Pipe()
   199 	go cl.readTransport(inw)
   190 	go cl.readTransport(inw)
   200 	go cl.writeTransport(outr)
   191 	go cl.writeTransport(outr)
   201 	return inr, outw
   192 	return inr, outw
   206 	ch := make(chan interface{})
   197 	ch := make(chan interface{})
   207 	go readXml(r, ch, extStanza)
   198 	go readXml(r, ch, extStanza)
   208 	return ch
   199 	return ch
   209 }
   200 }
   210 
   201 
   211 func startXmlWriter(w io.Writer) chan<- interface{} {
   202 func startXmlWriter(w io.WriteCloser) chan<- interface{} {
   212 	ch := make(chan interface{})
   203 	ch := make(chan interface{})
   213 	go writeXml(w, ch)
   204 	go writeXml(w, ch)
   214 	return ch
   205 	return ch
   215 }
   206 }
   216 
   207 
   225 	go writeStream(xmlOut, ch, cl.inputControl)
   216 	go writeStream(xmlOut, ch, cl.inputControl)
   226 	return ch
   217 	return ch
   227 }
   218 }
   228 
   219 
   229 func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza {
   220 func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza {
   230 	cliOut := make(chan Stanza)
   221 	cliIn := make(chan Stanza)
   231 	filterOut := make(chan (<-chan Stanza))
   222 	filterOut := make(chan (<-chan Stanza))
   232 	filterIn := make(chan (<-chan Stanza))
   223 	filterIn := make(chan (<-chan Stanza))
   233 	nullFilter := make(chan Stanza)
   224 	nullFilter := make(chan Stanza)
   234 	go filterBottom(srvIn, nullFilter)
   225 	go filterBottom(srvIn, nullFilter)
   235 	go filterTop(filterOut, filterIn, nullFilter, cliOut)
   226 	go filterTop(filterOut, filterIn, nullFilter, cliIn)
   236 	cl.filterOut = filterOut
   227 	cl.filterOut = filterOut
   237 	cl.filterIn = filterIn
   228 	cl.filterIn = filterIn
   238 	return cliOut
   229 	return cliIn
   239 }
   230 }
   240 
   231 
   241 func tee(r io.Reader, w io.Writer, prefix string) {
   232 func tee(r io.Reader, w io.Writer, prefix string) {
   242 	defer tryClose(r, w)
   233 	defer func(w io.Writer) {
       
   234 		if c, ok := w.(io.Closer) ; ok {
       
   235 			c.Close()
       
   236 		}
       
   237 	}(w)
   243 
   238 
   244 	buf := bytes.NewBuffer([]uint8(prefix))
   239 	buf := bytes.NewBuffer([]uint8(prefix))
   245 	for {
   240 	for {
   246 		var c [1]byte
   241 		var c [1]byte
   247 		n, _ := r.Read(c[:])
   242 		n, _ := r.Read(c[:])
   259 		}
   254 		}
   260 	}
   255 	}
   261 	leftover := buf.String()
   256 	leftover := buf.String()
   262 	if leftover != "" {
   257 	if leftover != "" {
   263 		Log.Debug(buf.String())
   258 		Log.Debug(buf.String())
   264 	}
       
   265 }
       
   266 
       
   267 func tryClose(xs ...interface{}) {
       
   268 	f1 := func(ch chan<- interface{}) {
       
   269 		defer func() {
       
   270 			recover()
       
   271 		}()
       
   272 		close(ch)
       
   273 	}
       
   274 	f2 := func(ch <-chan interface{}) {
       
   275 		defer func() {
       
   276 			recover()
       
   277 		}()
       
   278 		close(ch)
       
   279 	}
       
   280 
       
   281 	for _, x := range xs {
       
   282 		if c, ok := x.(io.Closer) ; ok {
       
   283 			c.Close()
       
   284 		} else if ch, ok := x.(chan<- interface{}) ; ok {
       
   285 			f1(ch)
       
   286 		} else if ch, ok := x.(<-chan interface{}) ; ok {
       
   287 			f2(ch)
       
   288 		}
       
   289 	}
   259 	}
   290 }
   260 }
   291 
   261 
   292 // bindDone is called when we've finished resource binding (and all
   262 // bindDone is called when we've finished resource binding (and all
   293 // the negotiations that precede it). Now we can start accepting
   263 // the negotiations that precede it). Now we can start accepting