# HG changeset patch # User Chris Jones # Date 1326053049 25200 # Node ID a38173c1c8b28529a23b281f8c8c9bb65ba6168e # Parent 6e2eea62ccca26e4a92ea0451a47a57036244d3b# Parent d693ecc11f297b3a53c4d48e9bb2272c00d5182e Correct my misunderstanding of Go's break, and fix how we close channels and Writers so we can shut down our goroutines gracefully. diff -r 6e2eea62ccca -r a38173c1c8b2 examples/interact.go --- a/examples/interact.go Sun Jan 08 09:11:14 2012 -0700 +++ b/examples/interact.go Sun Jan 08 13:04:09 2012 -0700 @@ -10,7 +10,7 @@ "fmt" "log" "os" - ) +) // Demonstrate the API, and allow the user to interact with an XMPP // server via the terminal. @@ -28,7 +28,7 @@ if err != nil { log.Fatalf("NewClient(%v): %v", jid, err) } - defer c.Close() + defer close(c.Out) err = c.StartSession(true, &xmpp.Presence{}) if err != nil { diff -r 6e2eea62ccca -r a38173c1c8b2 stream.go --- a/stream.go Sun Jan 08 09:11:14 2012 -0700 +++ b/stream.go Sun Jan 08 13:04:09 2012 -0700 @@ -37,8 +37,8 @@ // BUG(cjyar) Review all these *Client receiver methods. They should // probably either all be receivers, or none. -func (cl *Client) readTransport(w io.Writer) { - defer tryClose(cl.socket, w) +func (cl *Client) readTransport(w io.WriteCloser) { + defer w.Close() cl.socket.SetReadTimeout(1e8) p := make([]byte, 1024) for { @@ -68,7 +68,7 @@ } func (cl *Client) writeTransport(r io.Reader) { - defer tryClose(r, cl.socket) + defer cl.socket.Close() p := make([]byte, 1024) for { nr, err := r.Read(p) @@ -95,9 +95,10 @@ go tee(r, pw, "S: ") r = pr } - defer tryClose(r, ch) + defer close(ch) p := xml.NewParser(r) +Loop: for { // Sniff the next token on the stream. t, err := p.Token() @@ -125,7 +126,7 @@ Log.Err("unmarshal stream: " + err.String()) } - break + break Loop } ch <- st continue @@ -158,7 +159,7 @@ if Log != nil { Log.Err("unmarshal: " + err.String()) } - break + break Loop } // If it's a Stanza, we try to unmarshal its innerxml @@ -171,7 +172,7 @@ Log.Err("ext unmarshal: " + err.String()) } - break + break Loop } } @@ -218,7 +219,11 @@ go tee(pr, w, "C: ") w = pw } - defer tryClose(w, ch) + defer func(w io.Writer) { + if c, ok := w.(io.Closer) ; ok { + c.Close() + } + }(w) for obj := range ch { err := xml.Marshal(w, obj) @@ -232,16 +237,17 @@ } func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) { - defer tryClose(srvIn, cliOut) + defer close(cliOut) handlers := make(map[string] func(Stanza) bool) +Loop: for { select { case h := <- cl.handlers: handlers[h.id] = h.f - case x := <- srvIn: - if x == nil { - break + case x, ok := <- srvIn: + if !ok { + break Loop } send := false switch obj := x.(type) { @@ -287,9 +293,10 @@ // activity. func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza, control <-chan int) { - defer tryClose(srvOut, cliIn) + defer close(srvOut) var input <-chan Stanza +Loop: for { select { case status := <- control: @@ -299,9 +306,12 @@ case 1: input = cliIn case -1: - break + break Loop } - case x := <- input: + case x, ok := <- input: + if !ok { + break Loop + } if x == nil { if Log != nil { Log.Notice("Refusing to send" + @@ -319,6 +329,7 @@ func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza, topFilter <-chan Stanza, app chan<- Stanza) { defer close(app) +Loop: for { select { case newFilterOut := <- filterOut: @@ -334,7 +345,7 @@ case data, ok := <-topFilter: if !ok { - break + break Loop } app <- data } @@ -355,7 +366,7 @@ if Log != nil { Log.Notice(fmt.Sprintf("Received stream error: %v", se)) } - cl.Close() + close(cl.Out) } func (cl *Client) handleFeatures(fe *Features) { diff -r 6e2eea62ccca -r a38173c1c8b2 xmpp.go --- a/xmpp.go Sun Jan 08 09:11:14 2012 -0700 +++ b/xmpp.go Sun Jan 08 13:04:09 2012 -0700 @@ -6,9 +6,6 @@ // and 3921, plus the various XEPs at http://xmpp.org/protocols/. package xmpp -// BUG(cjyar) Figure out why the library doesn't exit when the server -// closes its stream to us. - import ( "bytes" "fmt" @@ -102,7 +99,6 @@ filterOut chan<- <-chan Stanza filterIn <-chan <-chan Stanza } -var _ io.Closer = &Client{} // Connect to the appropriate server and authenticate as the given JID // with the given password. This function will return as soon as a TCP @@ -188,12 +184,7 @@ return cl, nil } -func (c *Client) Close() os.Error { - tryClose(c.In, c.Out) - return nil -} - -func (cl *Client) startTransport() (io.Reader, io.Writer) { +func (cl *Client) startTransport() (io.Reader, io.WriteCloser) { inr, inw := io.Pipe() outr, outw := io.Pipe() go cl.readTransport(inw) @@ -208,7 +199,7 @@ return ch } -func startXmlWriter(w io.Writer) chan<- interface{} { +func startXmlWriter(w io.WriteCloser) chan<- interface{} { ch := make(chan interface{}) go writeXml(w, ch) return ch @@ -227,19 +218,23 @@ } func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza { - cliOut := make(chan Stanza) + cliIn := make(chan Stanza) filterOut := make(chan (<-chan Stanza)) filterIn := make(chan (<-chan Stanza)) nullFilter := make(chan Stanza) go filterBottom(srvIn, nullFilter) - go filterTop(filterOut, filterIn, nullFilter, cliOut) + go filterTop(filterOut, filterIn, nullFilter, cliIn) cl.filterOut = filterOut cl.filterIn = filterIn - return cliOut + return cliIn } func tee(r io.Reader, w io.Writer, prefix string) { - defer tryClose(r, w) + defer func(w io.Writer) { + if c, ok := w.(io.Closer) ; ok { + c.Close() + } + }(w) buf := bytes.NewBuffer([]uint8(prefix)) for { @@ -264,31 +259,6 @@ } } -func tryClose(xs ...interface{}) { - f1 := func(ch chan<- interface{}) { - defer func() { - recover() - }() - close(ch) - } - f2 := func(ch <-chan interface{}) { - defer func() { - recover() - }() - close(ch) - } - - for _, x := range xs { - if c, ok := x.(io.Closer) ; ok { - c.Close() - } else if ch, ok := x.(chan<- interface{}) ; ok { - f1(ch) - } else if ch, ok := x.(<-chan interface{}) ; ok { - f2(ch) - } - } -} - // bindDone is called when we've finished resource binding (and all // the negotiations that precede it). Now we can start accepting // traffic from the app.