Properly close all the channels and writers if Client.Out is close. 20120108-close
authorChris Jones <chris@cjones.org>
Sun, 08 Jan 2012 12:43:50 -0700
branch20120108-close
changeset 64 ac0639692317
parent 63 c7f2edd25f4a
child 65 3d69b31c3839
Properly close all the channels and writers if Client.Out is close.
stream.go
xmpp.go
--- a/stream.go	Sun Jan 08 12:20:21 2012 -0700
+++ b/stream.go	Sun Jan 08 12:43:50 2012 -0700
@@ -37,9 +37,8 @@
 // BUG(cjyar) Review all these *Client receiver methods. They should
 // probably either all be receivers, or none.
 
-func (cl *Client) readTransport(w io.Writer) {
-	defer fmt.Println("readTransport done")
-	defer tryClose(cl.socket, w)
+func (cl *Client) readTransport(w io.WriteCloser) {
+	defer w.Close()
 	cl.socket.SetReadTimeout(1e8)
 	p := make([]byte, 1024)
 	for {
@@ -69,8 +68,7 @@
 }
 
 func (cl *Client) writeTransport(r io.Reader) {
-	defer fmt.Println("writeTransport done")
-	defer tryClose(r, cl.socket)
+	defer cl.socket.Close()
 	p := make([]byte, 1024)
 	for {
 		nr, err := r.Read(p)
@@ -92,13 +90,12 @@
 
 func readXml(r io.Reader, ch chan<- interface{},
 	extStanza map[string] func(*xml.Name) interface{}) {
-	defer fmt.Println("readXml done")
 	if Loglevel >= syslog.LOG_DEBUG {
 		pr, pw := io.Pipe()
 		go tee(r, pw, "S: ")
 		r = pr
 	}
-	defer tryClose(r, ch)
+	defer close(ch)
 
 	p := xml.NewParser(r)
 Loop:
@@ -222,7 +219,11 @@
 		go tee(pr, w, "C: ")
 		w = pw
 	}
-	defer tryClose(w, ch)
+	defer func(w io.Writer) {
+		if c, ok := w.(io.Closer) ; ok {
+			c.Close()
+		}
+	}(w)
 
 	for obj := range ch {
 		err := xml.Marshal(w, obj)
@@ -236,9 +237,7 @@
 }
 
 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) {
-	defer fmt.Println("readStream done")
 	defer close(cliOut)
-	defer tryClose(srvIn, cliOut)
 
 	handlers := make(map[string] func(Stanza) bool)
 Loop:
@@ -294,7 +293,6 @@
 // activity.
 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza,
 	control <-chan int) {
-	defer fmt.Println("writeStream done")
 	defer close(srvOut)
 
 	var input <-chan Stanza
@@ -312,7 +310,6 @@
 			}
 		case x, ok := <- input:
 			if !ok {
-				fmt.Println("writeStream input closed")
 				break Loop
 			}
 			if x == nil {
@@ -331,7 +328,6 @@
 // app. This function manages the filters.
 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza,
 	topFilter <-chan Stanza, app chan<- Stanza) {
-	defer fmt.Println("filterTop done")
 	defer close(app)
 Loop:
 	for {
@@ -357,7 +353,6 @@
 }
 
 func filterBottom(from <-chan Stanza, to chan<- Stanza) {
-	defer fmt.Println("filterBottom done")
 	defer close(to)
 	for data := range(from) {
 		to <- data
--- a/xmpp.go	Sun Jan 08 12:20:21 2012 -0700
+++ b/xmpp.go	Sun Jan 08 12:43:50 2012 -0700
@@ -189,7 +189,7 @@
 	return cl, nil
 }
 
-func (cl *Client) startTransport() (io.Reader, io.Writer) {
+func (cl *Client) startTransport() (io.Reader, io.WriteCloser) {
 	inr, inw := io.Pipe()
 	outr, outw := io.Pipe()
 	go cl.readTransport(inw)
@@ -204,7 +204,7 @@
 	return ch
 }
 
-func startXmlWriter(w io.Writer) chan<- interface{} {
+func startXmlWriter(w io.WriteCloser) chan<- interface{} {
 	ch := make(chan interface{})
 	go writeXml(w, ch)
 	return ch
@@ -223,19 +223,23 @@
 }
 
 func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza {
-	cliOut := make(chan Stanza)
+	cliIn := make(chan Stanza)
 	filterOut := make(chan (<-chan Stanza))
 	filterIn := make(chan (<-chan Stanza))
 	nullFilter := make(chan Stanza)
 	go filterBottom(srvIn, nullFilter)
-	go filterTop(filterOut, filterIn, nullFilter, cliOut)
+	go filterTop(filterOut, filterIn, nullFilter, cliIn)
 	cl.filterOut = filterOut
 	cl.filterIn = filterIn
-	return cliOut
+	return cliIn
 }
 
 func tee(r io.Reader, w io.Writer, prefix string) {
-	defer tryClose(r, w)
+	defer func(w io.Writer) {
+		if c, ok := w.(io.Closer) ; ok {
+			c.Close()
+		}
+	}(w)
 
 	buf := bytes.NewBuffer([]uint8(prefix))
 	for {
@@ -260,31 +264,6 @@
 	}
 }
 
-func tryClose(xs ...interface{}) {
-	f1 := func(ch chan<- interface{}) {
-		defer func() {
-			recover()
-		}()
-		close(ch)
-	}
-	f2 := func(ch <-chan interface{}) {
-		defer func() {
-			recover()
-		}()
-		close(ch)
-	}
-
-	for _, x := range xs {
-		if c, ok := x.(io.Closer) ; ok {
-			c.Close()
-		} else if ch, ok := x.(chan<- interface{}) ; ok {
-			f1(ch)
-		} else if ch, ok := x.(<-chan interface{}) ; ok {
-			f2(ch)
-		}
-	}
-}
-
 // bindDone is called when we've finished resource binding (and all
 // the negotiations that precede it). Now we can start accepting
 // traffic from the app.