# HG changeset patch
# User Chris Jones <chris@cjones.org>
# Date 1326053049 25200
# Node ID a38173c1c8b28529a23b281f8c8c9bb65ba6168e
# Parent  6e2eea62ccca26e4a92ea0451a47a57036244d3b# Parent  d693ecc11f297b3a53c4d48e9bb2272c00d5182e
Correct my misunderstanding of Go's break, and fix how we close channels and Writers so we can shut down our goroutines gracefully.

diff -r 6e2eea62ccca -r a38173c1c8b2 examples/interact.go
--- a/examples/interact.go	Sun Jan 08 09:11:14 2012 -0700
+++ b/examples/interact.go	Sun Jan 08 13:04:09 2012 -0700
@@ -10,7 +10,7 @@
 	"fmt"
 	"log"
 	"os"
-	)
+)
 
 // Demonstrate the API, and allow the user to interact with an XMPP
 // server via the terminal.
@@ -28,7 +28,7 @@
 	if err != nil {
 		log.Fatalf("NewClient(%v): %v", jid, err)
 	}
-	defer c.Close()
+	defer close(c.Out)
 
 	err = c.StartSession(true, &xmpp.Presence{})
 	if err != nil {
diff -r 6e2eea62ccca -r a38173c1c8b2 stream.go
--- a/stream.go	Sun Jan 08 09:11:14 2012 -0700
+++ b/stream.go	Sun Jan 08 13:04:09 2012 -0700
@@ -37,8 +37,8 @@
 // BUG(cjyar) Review all these *Client receiver methods. They should
 // probably either all be receivers, or none.
 
-func (cl *Client) readTransport(w io.Writer) {
-	defer tryClose(cl.socket, w)
+func (cl *Client) readTransport(w io.WriteCloser) {
+	defer w.Close()
 	cl.socket.SetReadTimeout(1e8)
 	p := make([]byte, 1024)
 	for {
@@ -68,7 +68,7 @@
 }
 
 func (cl *Client) writeTransport(r io.Reader) {
-	defer tryClose(r, cl.socket)
+	defer cl.socket.Close()
 	p := make([]byte, 1024)
 	for {
 		nr, err := r.Read(p)
@@ -95,9 +95,10 @@
 		go tee(r, pw, "S: ")
 		r = pr
 	}
-	defer tryClose(r, ch)
+	defer close(ch)
 
 	p := xml.NewParser(r)
+Loop:
 	for {
 		// Sniff the next token on the stream.
 		t, err := p.Token()
@@ -125,7 +126,7 @@
 					Log.Err("unmarshal stream: " +
 						err.String())
 				}
-				break
+				break Loop
 			}
 			ch <- st
 			continue
@@ -158,7 +159,7 @@
 			if Log != nil {
 				Log.Err("unmarshal: " + err.String())
 			}
-			break
+			break Loop
 		}
 
 		// If it's a Stanza, we try to unmarshal its innerxml
@@ -171,7 +172,7 @@
 					Log.Err("ext unmarshal: " +
 						err.String())
 				}
-				break
+				break Loop
 			}
 		}
 
@@ -218,7 +219,11 @@
 		go tee(pr, w, "C: ")
 		w = pw
 	}
-	defer tryClose(w, ch)
+	defer func(w io.Writer) {
+		if c, ok := w.(io.Closer) ; ok {
+			c.Close()
+		}
+	}(w)
 
 	for obj := range ch {
 		err := xml.Marshal(w, obj)
@@ -232,16 +237,17 @@
 }
 
 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) {
-	defer tryClose(srvIn, cliOut)
+	defer close(cliOut)
 
 	handlers := make(map[string] func(Stanza) bool)
+Loop:
 	for {
 		select {
 		case h := <- cl.handlers:
 			handlers[h.id] = h.f
-		case x := <- srvIn:
-			if x == nil {
-				break
+		case x, ok := <- srvIn:
+			if !ok {
+				break Loop
 			}
 			send := false
 			switch obj := x.(type) {
@@ -287,9 +293,10 @@
 // activity.
 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza,
 	control <-chan int) {
-	defer tryClose(srvOut, cliIn)
+	defer close(srvOut)
 
 	var input <-chan Stanza
+Loop:
 	for {
 		select {
 		case status := <- control:
@@ -299,9 +306,12 @@
 			case 1:
 				input = cliIn
 			case -1:
-				break
+				break Loop
 			}
-		case x := <- input:
+		case x, ok := <- input:
+			if !ok {
+				break Loop
+			}
 			if x == nil {
 				if Log != nil {
 					Log.Notice("Refusing to send" +
@@ -319,6 +329,7 @@
 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza,
 	topFilter <-chan Stanza, app chan<- Stanza) {
 	defer close(app)
+Loop:
 	for {
 		select {
 		case newFilterOut := <- filterOut:
@@ -334,7 +345,7 @@
 
 		case data, ok := <-topFilter:
 			if !ok {
-				break
+				break Loop
 			}
 			app <- data
 		}
@@ -355,7 +366,7 @@
 	if Log != nil {
 		Log.Notice(fmt.Sprintf("Received stream error: %v", se))
 	}
-	cl.Close()
+	close(cl.Out)
 }
 
 func (cl *Client) handleFeatures(fe *Features) {
diff -r 6e2eea62ccca -r a38173c1c8b2 xmpp.go
--- a/xmpp.go	Sun Jan 08 09:11:14 2012 -0700
+++ b/xmpp.go	Sun Jan 08 13:04:09 2012 -0700
@@ -6,9 +6,6 @@
 // and 3921, plus the various XEPs at http://xmpp.org/protocols/.
 package xmpp
 
-// BUG(cjyar) Figure out why the library doesn't exit when the server
-// closes its stream to us.
-
 import (
 	"bytes"
 	"fmt"
@@ -102,7 +99,6 @@
 	filterOut chan<- <-chan Stanza
 	filterIn <-chan <-chan Stanza
 }
-var _ io.Closer = &Client{}
 
 // Connect to the appropriate server and authenticate as the given JID
 // with the given password. This function will return as soon as a TCP
@@ -188,12 +184,7 @@
 	return cl, nil
 }
 
-func (c *Client) Close() os.Error {
-	tryClose(c.In, c.Out)
-	return nil
-}
-
-func (cl *Client) startTransport() (io.Reader, io.Writer) {
+func (cl *Client) startTransport() (io.Reader, io.WriteCloser) {
 	inr, inw := io.Pipe()
 	outr, outw := io.Pipe()
 	go cl.readTransport(inw)
@@ -208,7 +199,7 @@
 	return ch
 }
 
-func startXmlWriter(w io.Writer) chan<- interface{} {
+func startXmlWriter(w io.WriteCloser) chan<- interface{} {
 	ch := make(chan interface{})
 	go writeXml(w, ch)
 	return ch
@@ -227,19 +218,23 @@
 }
 
 func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza {
-	cliOut := make(chan Stanza)
+	cliIn := make(chan Stanza)
 	filterOut := make(chan (<-chan Stanza))
 	filterIn := make(chan (<-chan Stanza))
 	nullFilter := make(chan Stanza)
 	go filterBottom(srvIn, nullFilter)
-	go filterTop(filterOut, filterIn, nullFilter, cliOut)
+	go filterTop(filterOut, filterIn, nullFilter, cliIn)
 	cl.filterOut = filterOut
 	cl.filterIn = filterIn
-	return cliOut
+	return cliIn
 }
 
 func tee(r io.Reader, w io.Writer, prefix string) {
-	defer tryClose(r, w)
+	defer func(w io.Writer) {
+		if c, ok := w.(io.Closer) ; ok {
+			c.Close()
+		}
+	}(w)
 
 	buf := bytes.NewBuffer([]uint8(prefix))
 	for {
@@ -264,31 +259,6 @@
 	}
 }
 
-func tryClose(xs ...interface{}) {
-	f1 := func(ch chan<- interface{}) {
-		defer func() {
-			recover()
-		}()
-		close(ch)
-	}
-	f2 := func(ch <-chan interface{}) {
-		defer func() {
-			recover()
-		}()
-		close(ch)
-	}
-
-	for _, x := range xs {
-		if c, ok := x.(io.Closer) ; ok {
-			c.Close()
-		} else if ch, ok := x.(chan<- interface{}) ; ok {
-			f1(ch)
-		} else if ch, ok := x.(<-chan interface{}) ; ok {
-			f2(ch)
-		}
-	}
-}
-
 // bindDone is called when we've finished resource binding (and all
 // the negotiations that precede it). Now we can start accepting
 // traffic from the app.