--- a/xmpp/layer3.go Sun Sep 22 17:43:34 2013 -0500
+++ b/xmpp/layer3.go Sat Sep 28 13:02:17 2013 -0600
@@ -5,14 +5,12 @@
import (
"encoding/xml"
- "fmt"
)
// Callback to handle a stanza with a particular id.
type callback struct {
id string
- // Return true means pass this to the application
- f func(Stanza) bool
+ f func(Stanza)
}
// Receive XMPP stanzas from the client and send them on to the
@@ -22,22 +20,18 @@
// inappropriate into our negotiations with the server. The control
// channel controls this loop's activity.
func sendStream(sendXml chan<- interface{}, recvXmpp <-chan Stanza,
- control <-chan sendCmd) {
+ status <-chan Status) {
defer close(sendXml)
var input <-chan Stanza
for {
select {
- case cmd := <-control:
- switch cmd {
- case sendDeny:
+ case stat := <-status:
+ switch stat {
+ default:
input = nil
- case sendAllow:
+ case StatusRunning:
input = recvXmpp
- case sendAbort:
- return
- default:
- panic(fmt.Sprintf("unknown cmd %d", cmd))
}
case x, ok := <-input:
if !ok {
@@ -53,13 +47,23 @@
}
// Receive XMLish structures, handle all the stream-related ones, and
-// send XMPP stanzas on to the client.
-func (cl *Client) recvStream(recvXml <-chan interface{}, sendXmpp chan<- Stanza) {
+// send XMPP stanzas on to the client once the connection is running.
+func (cl *Client) recvStream(recvXml <-chan interface{}, sendXmpp chan<- Stanza,
+ status <-chan Status) {
defer close(sendXmpp)
+ defer cl.statmgr.close()
- handlers := make(map[string]func(Stanza) bool)
+ handlers := make(map[string]func(Stanza))
+ doSend := false
for {
select {
+ case stat := <-status:
+ switch stat {
+ default:
+ doSend = false
+ case StatusRunning:
+ doSend = true
+ }
case h := <-cl.handlers:
handlers[h.id] = h.f
case x, ok := <-recvXml:
@@ -78,14 +82,13 @@
case *auth:
cl.handleSasl(obj)
case Stanza:
- send := true
id := obj.GetHeader().Id
if handlers[id] != nil {
f := handlers[id]
delete(handlers, id)
- send = f(obj)
+ f(obj)
}
- if send {
+ if doSend {
sendXmpp <- obj
}
default:
@@ -97,7 +100,7 @@
func (cl *Client) handleStreamError(se *streamError) {
Info.Logf("Received stream error: %v", se)
- cl.inputControl <- sendAbort
+ cl.setStatus(StatusShutdown)
}
func (cl *Client) handleFeatures(fe *Features) {
@@ -123,6 +126,8 @@
func (cl *Client) handleTls(t *starttls) {
cl.layer1.startTls(&cl.tlsConfig)
+ cl.setStatus(StatusConnectedTls)
+
// Now re-send the initial handshake message to start the new
// session.
cl.sendXml <- &stream{To: cl.Jid.Domain, Version: XMPPVersion}
@@ -137,14 +142,13 @@
}
msg := &Iq{Header: Header{Type: "set", Id: NextId(),
Nested: []interface{}{bindReq}}}
- f := func(st Stanza) bool {
+ f := func(st Stanza) {
iq, ok := st.(*Iq)
if !ok {
Warn.Log("non-iq response")
}
if iq.Type == "error" {
Warn.Log("Resource binding failed")
- return false
}
var bindRepl *bindIq
for _, ele := range iq.Nested {
@@ -155,22 +159,18 @@
}
if bindRepl == nil {
Warn.Logf("Bad bind reply: %#v", iq)
- return false
}
jidStr := bindRepl.Jid
if jidStr == nil || *jidStr == "" {
Warn.Log("Can't bind empty resource")
- return false
}
jid := new(JID)
if err := jid.Set(*jidStr); err != nil {
Warn.Logf("Can't parse JID %s: %s", *jidStr, err)
- return false
}
cl.Jid = *jid
Info.Logf("Bound resource: %s", cl.Jid.String())
- cl.bindDone()
- return false
+ cl.setStatus(StatusBound)
}
cl.SetCallback(msg.Id, f)
cl.sendXml <- msg
@@ -182,7 +182,7 @@
// available on the normal Client.Recv channel. The callback must not
// read from that channel, as deliveries on it cannot proceed until
// the handler returns true or false.
-func (cl *Client) SetCallback(id string, f func(Stanza) bool) {
+func (cl *Client) SetCallback(id string, f func(Stanza)) {
h := &callback{id: id, f: f}
cl.handlers <- h
}