Correct my misunderstanding of Go's break, and fix how we close channels and Writers so we can shut down our goroutines gracefully.
authorChris Jones <chris@cjones.org>
Sun, 08 Jan 2012 13:04:09 -0700
changeset 69 a38173c1c8b2
parent 62 6e2eea62ccca (current diff)
parent 68 d693ecc11f29 (diff)
child 71 578c2a83dc18
Correct my misunderstanding of Go's break, and fix how we close channels and Writers so we can shut down our goroutines gracefully.
--- a/examples/interact.go	Sun Jan 08 09:11:14 2012 -0700
+++ b/examples/interact.go	Sun Jan 08 13:04:09 2012 -0700
@@ -10,7 +10,7 @@
 	"fmt"
 	"log"
 	"os"
-	)
+)
 
 // Demonstrate the API, and allow the user to interact with an XMPP
 // server via the terminal.
@@ -28,7 +28,7 @@
 	if err != nil {
 		log.Fatalf("NewClient(%v): %v", jid, err)
 	}
-	defer c.Close()
+	defer close(c.Out)
 
 	err = c.StartSession(true, &xmpp.Presence{})
 	if err != nil {
--- a/stream.go	Sun Jan 08 09:11:14 2012 -0700
+++ b/stream.go	Sun Jan 08 13:04:09 2012 -0700
@@ -37,8 +37,8 @@
 // BUG(cjyar) Review all these *Client receiver methods. They should
 // probably either all be receivers, or none.
 
-func (cl *Client) readTransport(w io.Writer) {
-	defer tryClose(cl.socket, w)
+func (cl *Client) readTransport(w io.WriteCloser) {
+	defer w.Close()
 	cl.socket.SetReadTimeout(1e8)
 	p := make([]byte, 1024)
 	for {
@@ -68,7 +68,7 @@
 }
 
 func (cl *Client) writeTransport(r io.Reader) {
-	defer tryClose(r, cl.socket)
+	defer cl.socket.Close()
 	p := make([]byte, 1024)
 	for {
 		nr, err := r.Read(p)
@@ -95,9 +95,10 @@
 		go tee(r, pw, "S: ")
 		r = pr
 	}
-	defer tryClose(r, ch)
+	defer close(ch)
 
 	p := xml.NewParser(r)
+Loop:
 	for {
 		// Sniff the next token on the stream.
 		t, err := p.Token()
@@ -125,7 +126,7 @@
 					Log.Err("unmarshal stream: " +
 						err.String())
 				}
-				break
+				break Loop
 			}
 			ch <- st
 			continue
@@ -158,7 +159,7 @@
 			if Log != nil {
 				Log.Err("unmarshal: " + err.String())
 			}
-			break
+			break Loop
 		}
 
 		// If it's a Stanza, we try to unmarshal its innerxml
@@ -171,7 +172,7 @@
 					Log.Err("ext unmarshal: " +
 						err.String())
 				}
-				break
+				break Loop
 			}
 		}
 
@@ -218,7 +219,11 @@
 		go tee(pr, w, "C: ")
 		w = pw
 	}
-	defer tryClose(w, ch)
+	defer func(w io.Writer) {
+		if c, ok := w.(io.Closer) ; ok {
+			c.Close()
+		}
+	}(w)
 
 	for obj := range ch {
 		err := xml.Marshal(w, obj)
@@ -232,16 +237,17 @@
 }
 
 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) {
-	defer tryClose(srvIn, cliOut)
+	defer close(cliOut)
 
 	handlers := make(map[string] func(Stanza) bool)
+Loop:
 	for {
 		select {
 		case h := <- cl.handlers:
 			handlers[h.id] = h.f
-		case x := <- srvIn:
-			if x == nil {
-				break
+		case x, ok := <- srvIn:
+			if !ok {
+				break Loop
 			}
 			send := false
 			switch obj := x.(type) {
@@ -287,9 +293,10 @@
 // activity.
 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza,
 	control <-chan int) {
-	defer tryClose(srvOut, cliIn)
+	defer close(srvOut)
 
 	var input <-chan Stanza
+Loop:
 	for {
 		select {
 		case status := <- control:
@@ -299,9 +306,12 @@
 			case 1:
 				input = cliIn
 			case -1:
-				break
+				break Loop
 			}
-		case x := <- input:
+		case x, ok := <- input:
+			if !ok {
+				break Loop
+			}
 			if x == nil {
 				if Log != nil {
 					Log.Notice("Refusing to send" +
@@ -319,6 +329,7 @@
 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza,
 	topFilter <-chan Stanza, app chan<- Stanza) {
 	defer close(app)
+Loop:
 	for {
 		select {
 		case newFilterOut := <- filterOut:
@@ -334,7 +345,7 @@
 
 		case data, ok := <-topFilter:
 			if !ok {
-				break
+				break Loop
 			}
 			app <- data
 		}
@@ -355,7 +366,7 @@
 	if Log != nil {
 		Log.Notice(fmt.Sprintf("Received stream error: %v", se))
 	}
-	cl.Close()
+	close(cl.Out)
 }
 
 func (cl *Client) handleFeatures(fe *Features) {
--- a/xmpp.go	Sun Jan 08 09:11:14 2012 -0700
+++ b/xmpp.go	Sun Jan 08 13:04:09 2012 -0700
@@ -6,9 +6,6 @@
 // and 3921, plus the various XEPs at http://xmpp.org/protocols/.
 package xmpp
 
-// BUG(cjyar) Figure out why the library doesn't exit when the server
-// closes its stream to us.
-
 import (
 	"bytes"
 	"fmt"
@@ -102,7 +99,6 @@
 	filterOut chan<- <-chan Stanza
 	filterIn <-chan <-chan Stanza
 }
-var _ io.Closer = &Client{}
 
 // Connect to the appropriate server and authenticate as the given JID
 // with the given password. This function will return as soon as a TCP
@@ -188,12 +184,7 @@
 	return cl, nil
 }
 
-func (c *Client) Close() os.Error {
-	tryClose(c.In, c.Out)
-	return nil
-}
-
-func (cl *Client) startTransport() (io.Reader, io.Writer) {
+func (cl *Client) startTransport() (io.Reader, io.WriteCloser) {
 	inr, inw := io.Pipe()
 	outr, outw := io.Pipe()
 	go cl.readTransport(inw)
@@ -208,7 +199,7 @@
 	return ch
 }
 
-func startXmlWriter(w io.Writer) chan<- interface{} {
+func startXmlWriter(w io.WriteCloser) chan<- interface{} {
 	ch := make(chan interface{})
 	go writeXml(w, ch)
 	return ch
@@ -227,19 +218,23 @@
 }
 
 func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza {
-	cliOut := make(chan Stanza)
+	cliIn := make(chan Stanza)
 	filterOut := make(chan (<-chan Stanza))
 	filterIn := make(chan (<-chan Stanza))
 	nullFilter := make(chan Stanza)
 	go filterBottom(srvIn, nullFilter)
-	go filterTop(filterOut, filterIn, nullFilter, cliOut)
+	go filterTop(filterOut, filterIn, nullFilter, cliIn)
 	cl.filterOut = filterOut
 	cl.filterIn = filterIn
-	return cliOut
+	return cliIn
 }
 
 func tee(r io.Reader, w io.Writer, prefix string) {
-	defer tryClose(r, w)
+	defer func(w io.Writer) {
+		if c, ok := w.(io.Closer) ; ok {
+			c.Close()
+		}
+	}(w)
 
 	buf := bytes.NewBuffer([]uint8(prefix))
 	for {
@@ -264,31 +259,6 @@
 	}
 }
 
-func tryClose(xs ...interface{}) {
-	f1 := func(ch chan<- interface{}) {
-		defer func() {
-			recover()
-		}()
-		close(ch)
-	}
-	f2 := func(ch <-chan interface{}) {
-		defer func() {
-			recover()
-		}()
-		close(ch)
-	}
-
-	for _, x := range xs {
-		if c, ok := x.(io.Closer) ; ok {
-			c.Close()
-		} else if ch, ok := x.(chan<- interface{}) ; ok {
-			f1(ch)
-		} else if ch, ok := x.(<-chan interface{}) ; ok {
-			f2(ch)
-		}
-	}
-}
-
 // bindDone is called when we've finished resource binding (and all
 // the negotiations that precede it). Now we can start accepting
 // traffic from the app.