Intermediate commit. Fixing how we close our channels and sockets and shut down our goroutines.
--- a/examples/interact.go Sun Jan 08 09:11:14 2012 -0700
+++ b/examples/interact.go Sun Jan 08 12:20:21 2012 -0700
@@ -10,7 +10,8 @@
"fmt"
"log"
"os"
- )
+ "time"
+)
// Demonstrate the API, and allow the user to interact with an XMPP
// server via the terminal.
@@ -28,7 +29,7 @@
if err != nil {
log.Fatalf("NewClient(%v): %v", jid, err)
}
- defer c.Close()
+ defer close(c.Out)
err = c.StartSession(true, &xmpp.Presence{})
if err != nil {
@@ -47,20 +48,9 @@
fmt.Println("done reading")
}(c.In)
- p := make([]byte, 1024)
- for {
- nr, _ := os.Stdin.Read(p)
- if nr == 0 {
- break
- }
- s := string(p)
- stan, err := xmpp.ParseStanza(s)
- if err == nil {
- c.Out <- stan
- } else {
- fmt.Printf("Parse error: %v\n", err)
- break
- }
- }
- fmt.Println("done sending")
+ time.Sleep(1e9 * 5)
+ fmt.Println("Shutting down.")
+ close(c.Out)
+ time.Sleep(1e9 * 5)
+ select {}
}
--- a/stream.go Sun Jan 08 09:11:14 2012 -0700
+++ b/stream.go Sun Jan 08 12:20:21 2012 -0700
@@ -38,6 +38,7 @@
// probably either all be receivers, or none.
func (cl *Client) readTransport(w io.Writer) {
+ defer fmt.Println("readTransport done")
defer tryClose(cl.socket, w)
cl.socket.SetReadTimeout(1e8)
p := make([]byte, 1024)
@@ -68,6 +69,7 @@
}
func (cl *Client) writeTransport(r io.Reader) {
+ defer fmt.Println("writeTransport done")
defer tryClose(r, cl.socket)
p := make([]byte, 1024)
for {
@@ -90,6 +92,7 @@
func readXml(r io.Reader, ch chan<- interface{},
extStanza map[string] func(*xml.Name) interface{}) {
+ defer fmt.Println("readXml done")
if Loglevel >= syslog.LOG_DEBUG {
pr, pw := io.Pipe()
go tee(r, pw, "S: ")
@@ -98,6 +101,7 @@
defer tryClose(r, ch)
p := xml.NewParser(r)
+Loop:
for {
// Sniff the next token on the stream.
t, err := p.Token()
@@ -125,7 +129,7 @@
Log.Err("unmarshal stream: " +
err.String())
}
- break
+ break Loop
}
ch <- st
continue
@@ -158,7 +162,7 @@
if Log != nil {
Log.Err("unmarshal: " + err.String())
}
- break
+ break Loop
}
// If it's a Stanza, we try to unmarshal its innerxml
@@ -171,7 +175,7 @@
Log.Err("ext unmarshal: " +
err.String())
}
- break
+ break Loop
}
}
@@ -232,16 +236,19 @@
}
func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) {
+ defer fmt.Println("readStream done")
+ defer close(cliOut)
defer tryClose(srvIn, cliOut)
handlers := make(map[string] func(Stanza) bool)
+Loop:
for {
select {
case h := <- cl.handlers:
handlers[h.id] = h.f
- case x := <- srvIn:
- if x == nil {
- break
+ case x, ok := <- srvIn:
+ if !ok {
+ break Loop
}
send := false
switch obj := x.(type) {
@@ -287,9 +294,11 @@
// activity.
func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza,
control <-chan int) {
- defer tryClose(srvOut, cliIn)
+ defer fmt.Println("writeStream done")
+ defer close(srvOut)
var input <-chan Stanza
+Loop:
for {
select {
case status := <- control:
@@ -299,9 +308,13 @@
case 1:
input = cliIn
case -1:
- break
+ break Loop
}
- case x := <- input:
+ case x, ok := <- input:
+ if !ok {
+ fmt.Println("writeStream input closed")
+ break Loop
+ }
if x == nil {
if Log != nil {
Log.Notice("Refusing to send" +
@@ -318,7 +331,9 @@
// app. This function manages the filters.
func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza,
topFilter <-chan Stanza, app chan<- Stanza) {
+ defer fmt.Println("filterTop done")
defer close(app)
+Loop:
for {
select {
case newFilterOut := <- filterOut:
@@ -334,7 +349,7 @@
case data, ok := <-topFilter:
if !ok {
- break
+ break Loop
}
app <- data
}
@@ -342,6 +357,7 @@
}
func filterBottom(from <-chan Stanza, to chan<- Stanza) {
+ defer fmt.Println("filterBottom done")
defer close(to)
for data := range(from) {
to <- data
@@ -355,7 +371,7 @@
if Log != nil {
Log.Notice(fmt.Sprintf("Received stream error: %v", se))
}
- cl.Close()
+ close(cl.Out)
}
func (cl *Client) handleFeatures(fe *Features) {
@@ -399,10 +415,6 @@
cl.socket = tls
cl.socketSync.Wait()
- // Reset the read timeout on the (underlying) socket so the
- // reader doesn't get woken up unnecessarily.
- tcp.SetReadTimeout(0)
-
if Log != nil {
Log.Info("TLS negotiation succeeded.")
}
--- a/xmpp.go Sun Jan 08 09:11:14 2012 -0700
+++ b/xmpp.go Sun Jan 08 12:20:21 2012 -0700
@@ -80,6 +80,7 @@
// the time StartSession() returns.
Jid JID
password string
+ tcp net.Conn
socket net.Conn
socketSync sync.WaitGroup
saslExpected string
@@ -102,7 +103,6 @@
filterOut chan<- <-chan Stanza
filterIn <-chan <-chan Stanza
}
-var _ io.Closer = &Client{}
// Connect to the appropriate server and authenticate as the given JID
// with the given password. This function will return as soon as a TCP
@@ -147,6 +147,7 @@
cl.Uid = <- Id
cl.password = password
cl.Jid = *jid
+ cl.tcp = tcp
cl.socket = tcp
cl.handlers = make(chan *stanzaHandler, 100)
cl.inputControl = make(chan int)
@@ -188,11 +189,6 @@
return cl, nil
}
-func (c *Client) Close() os.Error {
- tryClose(c.In, c.Out)
- return nil
-}
-
func (cl *Client) startTransport() (io.Reader, io.Writer) {
inr, inw := io.Pipe()
outr, outw := io.Pipe()