Correct my misunderstanding of Go's break, and fix how we close channels and Writers so we can shut down our goroutines gracefully.
--- a/examples/interact.go Sun Jan 08 09:11:14 2012 -0700
+++ b/examples/interact.go Sun Jan 08 13:04:09 2012 -0700
@@ -10,7 +10,7 @@
"fmt"
"log"
"os"
- )
+)
// Demonstrate the API, and allow the user to interact with an XMPP
// server via the terminal.
@@ -28,7 +28,7 @@
if err != nil {
log.Fatalf("NewClient(%v): %v", jid, err)
}
- defer c.Close()
+ defer close(c.Out)
err = c.StartSession(true, &xmpp.Presence{})
if err != nil {
--- a/stream.go Sun Jan 08 09:11:14 2012 -0700
+++ b/stream.go Sun Jan 08 13:04:09 2012 -0700
@@ -37,8 +37,8 @@
// BUG(cjyar) Review all these *Client receiver methods. They should
// probably either all be receivers, or none.
-func (cl *Client) readTransport(w io.Writer) {
- defer tryClose(cl.socket, w)
+func (cl *Client) readTransport(w io.WriteCloser) {
+ defer w.Close()
cl.socket.SetReadTimeout(1e8)
p := make([]byte, 1024)
for {
@@ -68,7 +68,7 @@
}
func (cl *Client) writeTransport(r io.Reader) {
- defer tryClose(r, cl.socket)
+ defer cl.socket.Close()
p := make([]byte, 1024)
for {
nr, err := r.Read(p)
@@ -95,9 +95,10 @@
go tee(r, pw, "S: ")
r = pr
}
- defer tryClose(r, ch)
+ defer close(ch)
p := xml.NewParser(r)
+Loop:
for {
// Sniff the next token on the stream.
t, err := p.Token()
@@ -125,7 +126,7 @@
Log.Err("unmarshal stream: " +
err.String())
}
- break
+ break Loop
}
ch <- st
continue
@@ -158,7 +159,7 @@
if Log != nil {
Log.Err("unmarshal: " + err.String())
}
- break
+ break Loop
}
// If it's a Stanza, we try to unmarshal its innerxml
@@ -171,7 +172,7 @@
Log.Err("ext unmarshal: " +
err.String())
}
- break
+ break Loop
}
}
@@ -218,7 +219,11 @@
go tee(pr, w, "C: ")
w = pw
}
- defer tryClose(w, ch)
+ defer func(w io.Writer) {
+ if c, ok := w.(io.Closer) ; ok {
+ c.Close()
+ }
+ }(w)
for obj := range ch {
err := xml.Marshal(w, obj)
@@ -232,16 +237,17 @@
}
func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) {
- defer tryClose(srvIn, cliOut)
+ defer close(cliOut)
handlers := make(map[string] func(Stanza) bool)
+Loop:
for {
select {
case h := <- cl.handlers:
handlers[h.id] = h.f
- case x := <- srvIn:
- if x == nil {
- break
+ case x, ok := <- srvIn:
+ if !ok {
+ break Loop
}
send := false
switch obj := x.(type) {
@@ -287,9 +293,10 @@
// activity.
func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza,
control <-chan int) {
- defer tryClose(srvOut, cliIn)
+ defer close(srvOut)
var input <-chan Stanza
+Loop:
for {
select {
case status := <- control:
@@ -299,9 +306,12 @@
case 1:
input = cliIn
case -1:
- break
+ break Loop
}
- case x := <- input:
+ case x, ok := <- input:
+ if !ok {
+ break Loop
+ }
if x == nil {
if Log != nil {
Log.Notice("Refusing to send" +
@@ -319,6 +329,7 @@
func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza,
topFilter <-chan Stanza, app chan<- Stanza) {
defer close(app)
+Loop:
for {
select {
case newFilterOut := <- filterOut:
@@ -334,7 +345,7 @@
case data, ok := <-topFilter:
if !ok {
- break
+ break Loop
}
app <- data
}
@@ -355,7 +366,7 @@
if Log != nil {
Log.Notice(fmt.Sprintf("Received stream error: %v", se))
}
- cl.Close()
+ close(cl.Out)
}
func (cl *Client) handleFeatures(fe *Features) {
--- a/xmpp.go Sun Jan 08 09:11:14 2012 -0700
+++ b/xmpp.go Sun Jan 08 13:04:09 2012 -0700
@@ -6,9 +6,6 @@
// and 3921, plus the various XEPs at http://xmpp.org/protocols/.
package xmpp
-// BUG(cjyar) Figure out why the library doesn't exit when the server
-// closes its stream to us.
-
import (
"bytes"
"fmt"
@@ -102,7 +99,6 @@
filterOut chan<- <-chan Stanza
filterIn <-chan <-chan Stanza
}
-var _ io.Closer = &Client{}
// Connect to the appropriate server and authenticate as the given JID
// with the given password. This function will return as soon as a TCP
@@ -188,12 +184,7 @@
return cl, nil
}
-func (c *Client) Close() os.Error {
- tryClose(c.In, c.Out)
- return nil
-}
-
-func (cl *Client) startTransport() (io.Reader, io.Writer) {
+func (cl *Client) startTransport() (io.Reader, io.WriteCloser) {
inr, inw := io.Pipe()
outr, outw := io.Pipe()
go cl.readTransport(inw)
@@ -208,7 +199,7 @@
return ch
}
-func startXmlWriter(w io.Writer) chan<- interface{} {
+func startXmlWriter(w io.WriteCloser) chan<- interface{} {
ch := make(chan interface{})
go writeXml(w, ch)
return ch
@@ -227,19 +218,23 @@
}
func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza {
- cliOut := make(chan Stanza)
+ cliIn := make(chan Stanza)
filterOut := make(chan (<-chan Stanza))
filterIn := make(chan (<-chan Stanza))
nullFilter := make(chan Stanza)
go filterBottom(srvIn, nullFilter)
- go filterTop(filterOut, filterIn, nullFilter, cliOut)
+ go filterTop(filterOut, filterIn, nullFilter, cliIn)
cl.filterOut = filterOut
cl.filterIn = filterIn
- return cliOut
+ return cliIn
}
func tee(r io.Reader, w io.Writer, prefix string) {
- defer tryClose(r, w)
+ defer func(w io.Writer) {
+ if c, ok := w.(io.Closer) ; ok {
+ c.Close()
+ }
+ }(w)
buf := bytes.NewBuffer([]uint8(prefix))
for {
@@ -264,31 +259,6 @@
}
}
-func tryClose(xs ...interface{}) {
- f1 := func(ch chan<- interface{}) {
- defer func() {
- recover()
- }()
- close(ch)
- }
- f2 := func(ch <-chan interface{}) {
- defer func() {
- recover()
- }()
- close(ch)
- }
-
- for _, x := range xs {
- if c, ok := x.(io.Closer) ; ok {
- c.Close()
- } else if ch, ok := x.(chan<- interface{}) ; ok {
- f1(ch)
- } else if ch, ok := x.(<-chan interface{}) ; ok {
- f2(ch)
- }
- }
-}
-
// bindDone is called when we've finished resource binding (and all
// the negotiations that precede it). Now we can start accepting
// traffic from the app.