Made layers 1 and 3 more modular, shrinking the surface area of the coupling between them.
--- a/xmpp/layer1.go Sun Sep 15 13:09:26 2013 -0600
+++ b/xmpp/layer1.go Sun Sep 15 16:18:20 2013 -0600
@@ -4,50 +4,113 @@
package xmpp
import (
+ "crypto/tls"
"io"
"net"
"time"
)
-func (cl *Client) recvTransport(w io.WriteCloser) {
+var l1interval = time.Second
+
+type layer1 struct {
+ sock net.Conn
+ recvSocks chan<- net.Conn
+ sendSocks chan net.Conn
+}
+
+func startLayer1(sock net.Conn, recvWriter io.WriteCloser,
+ sendReader io.ReadCloser) *layer1 {
+ l1 := layer1{sock: sock}
+ recvSocks := make(chan net.Conn)
+ l1.recvSocks = recvSocks
+ sendSocks := make(chan net.Conn, 1)
+ l1.sendSocks = sendSocks
+ go recvTransport(recvSocks, recvWriter)
+ go sendTransport(sendSocks, sendReader)
+ recvSocks <- sock
+ sendSocks <- sock
+ return &l1
+}
+
+func (l1 *layer1) startTls(conf *tls.Config) {
+ sendSockToSender := func(sock net.Conn) {
+ for {
+ select {
+ case <-l1.sendSocks:
+ case l1.sendSocks <- sock:
+ return
+ }
+ }
+ }
+
+ sendSockToSender(nil)
+ l1.recvSocks <- nil
+ l1.sock = tls.Client(l1.sock, conf)
+ sendSockToSender(l1.sock)
+ l1.recvSocks <- l1.sock
+}
+
+func recvTransport(socks <-chan net.Conn, w io.WriteCloser) {
defer w.Close()
+ var sock net.Conn
p := make([]byte, 1024)
for {
- if cl.socket == nil {
- cl.waitForSocket()
+ select {
+ case sock = <-socks:
+ default:
}
- cl.socket.SetReadDeadline(time.Now().Add(time.Second))
- nr, err := cl.socket.Read(p)
- if nr == 0 {
- if errno, ok := err.(*net.OpError); ok {
- if errno.Timeout() {
- continue
+
+ if sock == nil {
+ time.Sleep(l1interval)
+ } else {
+ sock.SetReadDeadline(time.Now().Add(l1interval))
+ nr, err := sock.Read(p)
+ if nr == 0 {
+ if errno, ok := err.(*net.OpError); ok {
+ if errno.Timeout() {
+ continue
+ }
}
+ Warn.Logf("recvTransport: %s", err)
+ break
}
- Warn.Logf("read: %s", err)
- break
- }
- nw, err := w.Write(p[:nr])
- if nw < nr {
- Warn.Logf("read: %s", err)
- break
+ nw, err := w.Write(p[:nr])
+ if nw < nr {
+ Warn.Logf("recvTransport: %s", err)
+ break
+ }
}
}
}
-func (cl *Client) sendTransport(r io.Reader) {
- defer cl.socket.Close()
+func sendTransport(socks <-chan net.Conn, r io.Reader) {
+ var sock net.Conn
p := make([]byte, 1024)
for {
nr, err := r.Read(p)
if nr == 0 {
- Warn.Logf("write: %s", err)
+ Warn.Logf("sendTransport: %s", err)
break
}
- nw, err := cl.socket.Write(p[:nr])
- if nw < nr {
- Warn.Logf("write: %s", err)
- break
+ for nr > 0 {
+ select {
+ case sock = <-socks:
+ if sock != nil {
+ defer sock.Close()
+ }
+ default:
+ }
+
+ if sock == nil {
+ time.Sleep(l1interval)
+ } else {
+ nw, err := sock.Write(p[:nr])
+ nr -= nw
+ if nr != 0 {
+ Warn.Logf("write: %s", err)
+ break
+ }
+ }
}
}
}
--- a/xmpp/layer3.go Sun Sep 15 13:09:26 2013 -0600
+++ b/xmpp/layer3.go Sun Sep 15 16:18:20 2013 -0600
@@ -4,9 +4,8 @@
package xmpp
import (
- "crypto/tls"
"encoding/xml"
- "time"
+ "fmt"
)
// Callback to handle a stanza with a particular id.
@@ -71,7 +70,6 @@
defer close(sendXml)
var input <-chan Stanza
-Loop:
for {
select {
case cmd := <-control:
@@ -80,10 +78,14 @@
input = nil
case sendAllow:
input = recvXmpp
+ case sendAbort:
+ return
+ default:
+ panic(fmt.Sprintf("unknown cmd %d", cmd))
}
case x, ok := <-input:
if !ok {
- break Loop
+ return
}
if x == nil {
Info.Log("Refusing to send nil stanza")
@@ -99,7 +101,7 @@
func (cl *Client) handleStreamError(se *streamError) {
Info.Logf("Received stream error: %v", se)
- cl.socket.Close()
+ cl.inputControl <- sendAbort
}
func (cl *Client) handleFeatures(fe *Features) {
@@ -122,49 +124,12 @@
}
}
-// readTransport() is running concurrently. We need to stop it,
-// negotiate TLS, then start it again. It calls waitForSocket() in
-// its inner loop; see below.
func (cl *Client) handleTls(t *starttls) {
- tcp := cl.socket
-
- // Set the socket to nil, and wait for the reader routine to
- // signal that it's paused.
- cl.socket = nil
- cl.socketSync.Add(1)
- cl.socketSync.Wait()
-
- // Negotiate TLS with the server.
- tls := tls.Client(tcp, &cl.tlsConfig)
-
- // Make the TLS connection available to the reader, and wait
- // for it to signal that it's working again.
- cl.socketSync.Add(1)
- cl.socket = tls
- cl.socketSync.Wait()
-
- Info.Log("TLS negotiation succeeded.")
- cl.Features = nil
+ cl.layer1.startTls(&cl.tlsConfig)
// Now re-send the initial handshake message to start the new
// session.
- hsOut := &stream{To: cl.Jid.Domain, Version: XMPPVersion}
- cl.sendXml <- hsOut
-}
-
-// Synchronize with handleTls(). Called from readTransport() when
-// cl.socket is nil.
-func (cl *Client) waitForSocket() {
- // Signal that we've stopped reading from the socket.
- cl.socketSync.Done()
-
- // Wait until the socket is available again.
- for cl.socket == nil {
- time.Sleep(1e8)
- }
-
- // Signal that we're going back to the read loop.
- cl.socketSync.Done()
+ cl.sendXml <- &stream{To: cl.Jid.Domain, Version: XMPPVersion}
}
// Register a callback to handle the next XMPP stanza (iq, message, or
--- a/xmpp/xmpp.go Sun Sep 15 13:09:26 2013 -0600
+++ b/xmpp/xmpp.go Sun Sep 15 16:18:20 2013 -0600
@@ -15,7 +15,6 @@
"io"
"net"
"reflect"
- "sync"
)
const (
@@ -39,11 +38,18 @@
// Flow control for preventing sending stanzas until negotiation has
// completed.
-type sendCmd bool
+type sendCmd int
+
+const (
+ sendAllowConst = iota
+ sendDenyConst
+ sendAbortConst
+)
var (
- sendAllow sendCmd = true
- sendDeny sendCmd = false
+ sendAllow sendCmd = sendAllowConst
+ sendDeny sendCmd = sendDenyConst
+ sendAbort sendCmd = sendAbortConst
)
// A filter can modify the XMPP traffic to or from the remote
@@ -72,8 +78,6 @@
// the time StartSession() returns.
Jid JID
password string
- socket net.Conn
- socketSync sync.WaitGroup
saslExpected string
authDone bool
handlers chan *callback
@@ -98,6 +102,7 @@
sendFilterAdd, recvFilterAdd chan Filter
// Allows the user to override the TLS configuration.
tlsConfig tls.Config
+ layer1 *layer1
}
// Connect to the appropriate server and authenticate as the given JID
@@ -145,7 +150,6 @@
cl.Roster = *roster
cl.password = password
cl.Jid = *jid
- cl.socket = tcp
cl.handlers = make(chan *callback, 100)
cl.inputControl = make(chan sendCmd)
cl.tlsConfig = tlsconf
@@ -166,8 +170,7 @@
// Start the transport handler, initially unencrypted.
recvReader, recvWriter := io.Pipe()
sendReader, sendWriter := io.Pipe()
- go cl.recvTransport(recvWriter)
- go cl.sendTransport(sendReader)
+ cl.layer1 = startLayer1(tcp, recvWriter, sendReader)
// Start the reader and writer that convert to and from XML.
recvXmlCh := make(chan interface{})