# HG changeset patch # User Chris Jones # Date 1326051830 25200 # Node ID ac0639692317e71b33f1a046688e3f79afd4e7f2 # Parent c7f2edd25f4acb6a3be83cd65f0f2c5763e7cdfb Properly close all the channels and writers if Client.Out is close. diff -r c7f2edd25f4a -r ac0639692317 stream.go --- a/stream.go Sun Jan 08 12:20:21 2012 -0700 +++ b/stream.go Sun Jan 08 12:43:50 2012 -0700 @@ -37,9 +37,8 @@ // BUG(cjyar) Review all these *Client receiver methods. They should // probably either all be receivers, or none. -func (cl *Client) readTransport(w io.Writer) { - defer fmt.Println("readTransport done") - defer tryClose(cl.socket, w) +func (cl *Client) readTransport(w io.WriteCloser) { + defer w.Close() cl.socket.SetReadTimeout(1e8) p := make([]byte, 1024) for { @@ -69,8 +68,7 @@ } func (cl *Client) writeTransport(r io.Reader) { - defer fmt.Println("writeTransport done") - defer tryClose(r, cl.socket) + defer cl.socket.Close() p := make([]byte, 1024) for { nr, err := r.Read(p) @@ -92,13 +90,12 @@ func readXml(r io.Reader, ch chan<- interface{}, extStanza map[string] func(*xml.Name) interface{}) { - defer fmt.Println("readXml done") if Loglevel >= syslog.LOG_DEBUG { pr, pw := io.Pipe() go tee(r, pw, "S: ") r = pr } - defer tryClose(r, ch) + defer close(ch) p := xml.NewParser(r) Loop: @@ -222,7 +219,11 @@ go tee(pr, w, "C: ") w = pw } - defer tryClose(w, ch) + defer func(w io.Writer) { + if c, ok := w.(io.Closer) ; ok { + c.Close() + } + }(w) for obj := range ch { err := xml.Marshal(w, obj) @@ -236,9 +237,7 @@ } func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) { - defer fmt.Println("readStream done") defer close(cliOut) - defer tryClose(srvIn, cliOut) handlers := make(map[string] func(Stanza) bool) Loop: @@ -294,7 +293,6 @@ // activity. func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza, control <-chan int) { - defer fmt.Println("writeStream done") defer close(srvOut) var input <-chan Stanza @@ -312,7 +310,6 @@ } case x, ok := <- input: if !ok { - fmt.Println("writeStream input closed") break Loop } if x == nil { @@ -331,7 +328,6 @@ // app. This function manages the filters. func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza, topFilter <-chan Stanza, app chan<- Stanza) { - defer fmt.Println("filterTop done") defer close(app) Loop: for { @@ -357,7 +353,6 @@ } func filterBottom(from <-chan Stanza, to chan<- Stanza) { - defer fmt.Println("filterBottom done") defer close(to) for data := range(from) { to <- data diff -r c7f2edd25f4a -r ac0639692317 xmpp.go --- a/xmpp.go Sun Jan 08 12:20:21 2012 -0700 +++ b/xmpp.go Sun Jan 08 12:43:50 2012 -0700 @@ -189,7 +189,7 @@ return cl, nil } -func (cl *Client) startTransport() (io.Reader, io.Writer) { +func (cl *Client) startTransport() (io.Reader, io.WriteCloser) { inr, inw := io.Pipe() outr, outw := io.Pipe() go cl.readTransport(inw) @@ -204,7 +204,7 @@ return ch } -func startXmlWriter(w io.Writer) chan<- interface{} { +func startXmlWriter(w io.WriteCloser) chan<- interface{} { ch := make(chan interface{}) go writeXml(w, ch) return ch @@ -223,19 +223,23 @@ } func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza { - cliOut := make(chan Stanza) + cliIn := make(chan Stanza) filterOut := make(chan (<-chan Stanza)) filterIn := make(chan (<-chan Stanza)) nullFilter := make(chan Stanza) go filterBottom(srvIn, nullFilter) - go filterTop(filterOut, filterIn, nullFilter, cliOut) + go filterTop(filterOut, filterIn, nullFilter, cliIn) cl.filterOut = filterOut cl.filterIn = filterIn - return cliOut + return cliIn } func tee(r io.Reader, w io.Writer, prefix string) { - defer tryClose(r, w) + defer func(w io.Writer) { + if c, ok := w.(io.Closer) ; ok { + c.Close() + } + }(w) buf := bytes.NewBuffer([]uint8(prefix)) for { @@ -260,31 +264,6 @@ } } -func tryClose(xs ...interface{}) { - f1 := func(ch chan<- interface{}) { - defer func() { - recover() - }() - close(ch) - } - f2 := func(ch <-chan interface{}) { - defer func() { - recover() - }() - close(ch) - } - - for _, x := range xs { - if c, ok := x.(io.Closer) ; ok { - c.Close() - } else if ch, ok := x.(chan<- interface{}) ; ok { - f1(ch) - } else if ch, ok := x.(<-chan interface{}) ; ok { - f2(ch) - } - } -} - // bindDone is called when we've finished resource binding (and all // the negotiations that precede it). Now we can start accepting // traffic from the app.