Intermediate commit. Fixing how we close our channels and sockets and shut down our goroutines. 20120108-close
authorChris Jones <chris@cjones.org>
Sun, 08 Jan 2012 12:20:21 -0700
branch20120108-close
changeset 63 c7f2edd25f4a
parent 62 6e2eea62ccca
child 64 ac0639692317
Intermediate commit. Fixing how we close our channels and sockets and shut down our goroutines.
examples/interact.go
stream.go
xmpp.go
--- a/examples/interact.go	Sun Jan 08 09:11:14 2012 -0700
+++ b/examples/interact.go	Sun Jan 08 12:20:21 2012 -0700
@@ -10,7 +10,8 @@
 	"fmt"
 	"log"
 	"os"
-	)
+	"time"
+)
 
 // Demonstrate the API, and allow the user to interact with an XMPP
 // server via the terminal.
@@ -28,7 +29,7 @@
 	if err != nil {
 		log.Fatalf("NewClient(%v): %v", jid, err)
 	}
-	defer c.Close()
+	defer close(c.Out)
 
 	err = c.StartSession(true, &xmpp.Presence{})
 	if err != nil {
@@ -47,20 +48,9 @@
 		fmt.Println("done reading")
 	}(c.In)
 
-	p := make([]byte, 1024)
-	for {
-		nr, _ := os.Stdin.Read(p)
-		if nr == 0 {
-			break
-		}
-		s := string(p)
-		stan, err := xmpp.ParseStanza(s)
-		if err == nil {
-			c.Out <- stan
-		} else {
-			fmt.Printf("Parse error: %v\n", err)
-			break
-		}
-	}
-	fmt.Println("done sending")
+	time.Sleep(1e9 * 5)
+	fmt.Println("Shutting down.")
+	close(c.Out)
+	time.Sleep(1e9 * 5)
+	select {}
 }
--- a/stream.go	Sun Jan 08 09:11:14 2012 -0700
+++ b/stream.go	Sun Jan 08 12:20:21 2012 -0700
@@ -38,6 +38,7 @@
 // probably either all be receivers, or none.
 
 func (cl *Client) readTransport(w io.Writer) {
+	defer fmt.Println("readTransport done")
 	defer tryClose(cl.socket, w)
 	cl.socket.SetReadTimeout(1e8)
 	p := make([]byte, 1024)
@@ -68,6 +69,7 @@
 }
 
 func (cl *Client) writeTransport(r io.Reader) {
+	defer fmt.Println("writeTransport done")
 	defer tryClose(r, cl.socket)
 	p := make([]byte, 1024)
 	for {
@@ -90,6 +92,7 @@
 
 func readXml(r io.Reader, ch chan<- interface{},
 	extStanza map[string] func(*xml.Name) interface{}) {
+	defer fmt.Println("readXml done")
 	if Loglevel >= syslog.LOG_DEBUG {
 		pr, pw := io.Pipe()
 		go tee(r, pw, "S: ")
@@ -98,6 +101,7 @@
 	defer tryClose(r, ch)
 
 	p := xml.NewParser(r)
+Loop:
 	for {
 		// Sniff the next token on the stream.
 		t, err := p.Token()
@@ -125,7 +129,7 @@
 					Log.Err("unmarshal stream: " +
 						err.String())
 				}
-				break
+				break Loop
 			}
 			ch <- st
 			continue
@@ -158,7 +162,7 @@
 			if Log != nil {
 				Log.Err("unmarshal: " + err.String())
 			}
-			break
+			break Loop
 		}
 
 		// If it's a Stanza, we try to unmarshal its innerxml
@@ -171,7 +175,7 @@
 					Log.Err("ext unmarshal: " +
 						err.String())
 				}
-				break
+				break Loop
 			}
 		}
 
@@ -232,16 +236,19 @@
 }
 
 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) {
+	defer fmt.Println("readStream done")
+	defer close(cliOut)
 	defer tryClose(srvIn, cliOut)
 
 	handlers := make(map[string] func(Stanza) bool)
+Loop:
 	for {
 		select {
 		case h := <- cl.handlers:
 			handlers[h.id] = h.f
-		case x := <- srvIn:
-			if x == nil {
-				break
+		case x, ok := <- srvIn:
+			if !ok {
+				break Loop
 			}
 			send := false
 			switch obj := x.(type) {
@@ -287,9 +294,11 @@
 // activity.
 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza,
 	control <-chan int) {
-	defer tryClose(srvOut, cliIn)
+	defer fmt.Println("writeStream done")
+	defer close(srvOut)
 
 	var input <-chan Stanza
+Loop:
 	for {
 		select {
 		case status := <- control:
@@ -299,9 +308,13 @@
 			case 1:
 				input = cliIn
 			case -1:
-				break
+				break Loop
 			}
-		case x := <- input:
+		case x, ok := <- input:
+			if !ok {
+				fmt.Println("writeStream input closed")
+				break Loop
+			}
 			if x == nil {
 				if Log != nil {
 					Log.Notice("Refusing to send" +
@@ -318,7 +331,9 @@
 // app. This function manages the filters.
 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza,
 	topFilter <-chan Stanza, app chan<- Stanza) {
+	defer fmt.Println("filterTop done")
 	defer close(app)
+Loop:
 	for {
 		select {
 		case newFilterOut := <- filterOut:
@@ -334,7 +349,7 @@
 
 		case data, ok := <-topFilter:
 			if !ok {
-				break
+				break Loop
 			}
 			app <- data
 		}
@@ -342,6 +357,7 @@
 }
 
 func filterBottom(from <-chan Stanza, to chan<- Stanza) {
+	defer fmt.Println("filterBottom done")
 	defer close(to)
 	for data := range(from) {
 		to <- data
@@ -355,7 +371,7 @@
 	if Log != nil {
 		Log.Notice(fmt.Sprintf("Received stream error: %v", se))
 	}
-	cl.Close()
+	close(cl.Out)
 }
 
 func (cl *Client) handleFeatures(fe *Features) {
@@ -399,10 +415,6 @@
 	cl.socket = tls
 	cl.socketSync.Wait()
 
-	// Reset the read timeout on the (underlying) socket so the
-	// reader doesn't get woken up unnecessarily.
-	tcp.SetReadTimeout(0)
-
 	if Log != nil {
 		Log.Info("TLS negotiation succeeded.")
 	}
--- a/xmpp.go	Sun Jan 08 09:11:14 2012 -0700
+++ b/xmpp.go	Sun Jan 08 12:20:21 2012 -0700
@@ -80,6 +80,7 @@
 	// the time StartSession() returns.
 	Jid JID
 	password string
+	tcp net.Conn
 	socket net.Conn
 	socketSync sync.WaitGroup
 	saslExpected string
@@ -102,7 +103,6 @@
 	filterOut chan<- <-chan Stanza
 	filterIn <-chan <-chan Stanza
 }
-var _ io.Closer = &Client{}
 
 // Connect to the appropriate server and authenticate as the given JID
 // with the given password. This function will return as soon as a TCP
@@ -147,6 +147,7 @@
 	cl.Uid = <- Id
 	cl.password = password
 	cl.Jid = *jid
+	cl.tcp = tcp
 	cl.socket = tcp
 	cl.handlers = make(chan *stanzaHandler, 100)
 	cl.inputControl = make(chan int)
@@ -188,11 +189,6 @@
 	return cl, nil
 }
 
-func (c *Client) Close() os.Error {
-	tryClose(c.In, c.Out)
-	return nil
-}
-
 func (cl *Client) startTransport() (io.Reader, io.Writer) {
 	inr, inw := io.Pipe()
 	outr, outw := io.Pipe()