# HG changeset patch # User Chris Jones # Date 1326050421 25200 # Node ID c7f2edd25f4acb6a3be83cd65f0f2c5763e7cdfb # Parent 6e2eea62ccca26e4a92ea0451a47a57036244d3b Intermediate commit. Fixing how we close our channels and sockets and shut down our goroutines. diff -r 6e2eea62ccca -r c7f2edd25f4a examples/interact.go --- a/examples/interact.go Sun Jan 08 09:11:14 2012 -0700 +++ b/examples/interact.go Sun Jan 08 12:20:21 2012 -0700 @@ -10,7 +10,8 @@ "fmt" "log" "os" - ) + "time" +) // Demonstrate the API, and allow the user to interact with an XMPP // server via the terminal. @@ -28,7 +29,7 @@ if err != nil { log.Fatalf("NewClient(%v): %v", jid, err) } - defer c.Close() + defer close(c.Out) err = c.StartSession(true, &xmpp.Presence{}) if err != nil { @@ -47,20 +48,9 @@ fmt.Println("done reading") }(c.In) - p := make([]byte, 1024) - for { - nr, _ := os.Stdin.Read(p) - if nr == 0 { - break - } - s := string(p) - stan, err := xmpp.ParseStanza(s) - if err == nil { - c.Out <- stan - } else { - fmt.Printf("Parse error: %v\n", err) - break - } - } - fmt.Println("done sending") + time.Sleep(1e9 * 5) + fmt.Println("Shutting down.") + close(c.Out) + time.Sleep(1e9 * 5) + select {} } diff -r 6e2eea62ccca -r c7f2edd25f4a stream.go --- a/stream.go Sun Jan 08 09:11:14 2012 -0700 +++ b/stream.go Sun Jan 08 12:20:21 2012 -0700 @@ -38,6 +38,7 @@ // probably either all be receivers, or none. func (cl *Client) readTransport(w io.Writer) { + defer fmt.Println("readTransport done") defer tryClose(cl.socket, w) cl.socket.SetReadTimeout(1e8) p := make([]byte, 1024) @@ -68,6 +69,7 @@ } func (cl *Client) writeTransport(r io.Reader) { + defer fmt.Println("writeTransport done") defer tryClose(r, cl.socket) p := make([]byte, 1024) for { @@ -90,6 +92,7 @@ func readXml(r io.Reader, ch chan<- interface{}, extStanza map[string] func(*xml.Name) interface{}) { + defer fmt.Println("readXml done") if Loglevel >= syslog.LOG_DEBUG { pr, pw := io.Pipe() go tee(r, pw, "S: ") @@ -98,6 +101,7 @@ defer tryClose(r, ch) p := xml.NewParser(r) +Loop: for { // Sniff the next token on the stream. t, err := p.Token() @@ -125,7 +129,7 @@ Log.Err("unmarshal stream: " + err.String()) } - break + break Loop } ch <- st continue @@ -158,7 +162,7 @@ if Log != nil { Log.Err("unmarshal: " + err.String()) } - break + break Loop } // If it's a Stanza, we try to unmarshal its innerxml @@ -171,7 +175,7 @@ Log.Err("ext unmarshal: " + err.String()) } - break + break Loop } } @@ -232,16 +236,19 @@ } func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) { + defer fmt.Println("readStream done") + defer close(cliOut) defer tryClose(srvIn, cliOut) handlers := make(map[string] func(Stanza) bool) +Loop: for { select { case h := <- cl.handlers: handlers[h.id] = h.f - case x := <- srvIn: - if x == nil { - break + case x, ok := <- srvIn: + if !ok { + break Loop } send := false switch obj := x.(type) { @@ -287,9 +294,11 @@ // activity. func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza, control <-chan int) { - defer tryClose(srvOut, cliIn) + defer fmt.Println("writeStream done") + defer close(srvOut) var input <-chan Stanza +Loop: for { select { case status := <- control: @@ -299,9 +308,13 @@ case 1: input = cliIn case -1: - break + break Loop } - case x := <- input: + case x, ok := <- input: + if !ok { + fmt.Println("writeStream input closed") + break Loop + } if x == nil { if Log != nil { Log.Notice("Refusing to send" + @@ -318,7 +331,9 @@ // app. This function manages the filters. func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza, topFilter <-chan Stanza, app chan<- Stanza) { + defer fmt.Println("filterTop done") defer close(app) +Loop: for { select { case newFilterOut := <- filterOut: @@ -334,7 +349,7 @@ case data, ok := <-topFilter: if !ok { - break + break Loop } app <- data } @@ -342,6 +357,7 @@ } func filterBottom(from <-chan Stanza, to chan<- Stanza) { + defer fmt.Println("filterBottom done") defer close(to) for data := range(from) { to <- data @@ -355,7 +371,7 @@ if Log != nil { Log.Notice(fmt.Sprintf("Received stream error: %v", se)) } - cl.Close() + close(cl.Out) } func (cl *Client) handleFeatures(fe *Features) { @@ -399,10 +415,6 @@ cl.socket = tls cl.socketSync.Wait() - // Reset the read timeout on the (underlying) socket so the - // reader doesn't get woken up unnecessarily. - tcp.SetReadTimeout(0) - if Log != nil { Log.Info("TLS negotiation succeeded.") } diff -r 6e2eea62ccca -r c7f2edd25f4a xmpp.go --- a/xmpp.go Sun Jan 08 09:11:14 2012 -0700 +++ b/xmpp.go Sun Jan 08 12:20:21 2012 -0700 @@ -80,6 +80,7 @@ // the time StartSession() returns. Jid JID password string + tcp net.Conn socket net.Conn socketSync sync.WaitGroup saslExpected string @@ -102,7 +103,6 @@ filterOut chan<- <-chan Stanza filterIn <-chan <-chan Stanza } -var _ io.Closer = &Client{} // Connect to the appropriate server and authenticate as the given JID // with the given password. This function will return as soon as a TCP @@ -147,6 +147,7 @@ cl.Uid = <- Id cl.password = password cl.Jid = *jid + cl.tcp = tcp cl.socket = tcp cl.handlers = make(chan *stanzaHandler, 100) cl.inputControl = make(chan int) @@ -188,11 +189,6 @@ return cl, nil } -func (c *Client) Close() os.Error { - tryClose(c.In, c.Out) - return nil -} - func (cl *Client) startTransport() (io.Reader, io.Writer) { inr, inw := io.Pipe() outr, outw := io.Pipe()