Made layers 1 and 3 more modular, shrinking the surface area of the coupling between them.
authorChris Jones <christian.jones@sri.com>
Sun, 15 Sep 2013 16:18:20 -0600
changeset 148 b1b4900eee5b
parent 147 d7679d991b17
child 149 22c96a9ab289
Made layers 1 and 3 more modular, shrinking the surface area of the coupling between them.
xmpp/layer1.go
xmpp/layer3.go
xmpp/xmpp.go
--- a/xmpp/layer1.go	Sun Sep 15 13:09:26 2013 -0600
+++ b/xmpp/layer1.go	Sun Sep 15 16:18:20 2013 -0600
@@ -4,50 +4,113 @@
 package xmpp
 
 import (
+	"crypto/tls"
 	"io"
 	"net"
 	"time"
 )
 
-func (cl *Client) recvTransport(w io.WriteCloser) {
+var l1interval = time.Second
+
+type layer1 struct {
+	sock      net.Conn
+	recvSocks chan<- net.Conn
+	sendSocks chan net.Conn
+}
+
+func startLayer1(sock net.Conn, recvWriter io.WriteCloser,
+	sendReader io.ReadCloser) *layer1 {
+	l1 := layer1{sock: sock}
+	recvSocks := make(chan net.Conn)
+	l1.recvSocks = recvSocks
+	sendSocks := make(chan net.Conn, 1)
+	l1.sendSocks = sendSocks
+	go recvTransport(recvSocks, recvWriter)
+	go sendTransport(sendSocks, sendReader)
+	recvSocks <- sock
+	sendSocks <- sock
+	return &l1
+}
+
+func (l1 *layer1) startTls(conf *tls.Config) {
+	sendSockToSender := func(sock net.Conn) {
+		for {
+			select {
+			case <-l1.sendSocks:
+			case l1.sendSocks <- sock:
+				return
+			}
+		}
+	}
+
+	sendSockToSender(nil)
+	l1.recvSocks <- nil
+	l1.sock = tls.Client(l1.sock, conf)
+	sendSockToSender(l1.sock)
+	l1.recvSocks <- l1.sock
+}
+
+func recvTransport(socks <-chan net.Conn, w io.WriteCloser) {
 	defer w.Close()
+	var sock net.Conn
 	p := make([]byte, 1024)
 	for {
-		if cl.socket == nil {
-			cl.waitForSocket()
+		select {
+		case sock = <-socks:
+		default:
 		}
-		cl.socket.SetReadDeadline(time.Now().Add(time.Second))
-		nr, err := cl.socket.Read(p)
-		if nr == 0 {
-			if errno, ok := err.(*net.OpError); ok {
-				if errno.Timeout() {
-					continue
+
+		if sock == nil {
+			time.Sleep(l1interval)
+		} else {
+			sock.SetReadDeadline(time.Now().Add(l1interval))
+			nr, err := sock.Read(p)
+			if nr == 0 {
+				if errno, ok := err.(*net.OpError); ok {
+					if errno.Timeout() {
+						continue
+					}
 				}
+				Warn.Logf("recvTransport: %s", err)
+				break
 			}
-			Warn.Logf("read: %s", err)
-			break
-		}
-		nw, err := w.Write(p[:nr])
-		if nw < nr {
-			Warn.Logf("read: %s", err)
-			break
+			nw, err := w.Write(p[:nr])
+			if nw < nr {
+				Warn.Logf("recvTransport: %s", err)
+				break
+			}
 		}
 	}
 }
 
-func (cl *Client) sendTransport(r io.Reader) {
-	defer cl.socket.Close()
+func sendTransport(socks <-chan net.Conn, r io.Reader) {
+	var sock net.Conn
 	p := make([]byte, 1024)
 	for {
 		nr, err := r.Read(p)
 		if nr == 0 {
-			Warn.Logf("write: %s", err)
+			Warn.Logf("sendTransport: %s", err)
 			break
 		}
-		nw, err := cl.socket.Write(p[:nr])
-		if nw < nr {
-			Warn.Logf("write: %s", err)
-			break
+		for nr > 0 {
+			select {
+			case sock = <-socks:
+				if sock != nil {
+					defer sock.Close()
+				}
+			default:
+			}
+
+			if sock == nil {
+				time.Sleep(l1interval)
+			} else {
+				nw, err := sock.Write(p[:nr])
+				nr -= nw
+				if nr != 0 {
+					Warn.Logf("write: %s", err)
+					break
+				}
+			}
 		}
 	}
 }
--- a/xmpp/layer3.go	Sun Sep 15 13:09:26 2013 -0600
+++ b/xmpp/layer3.go	Sun Sep 15 16:18:20 2013 -0600
@@ -4,9 +4,8 @@
 package xmpp
 
 import (
-	"crypto/tls"
 	"encoding/xml"
-	"time"
+	"fmt"
 )
 
 // Callback to handle a stanza with a particular id.
@@ -71,7 +70,6 @@
 	defer close(sendXml)
 
 	var input <-chan Stanza
-Loop:
 	for {
 		select {
 		case cmd := <-control:
@@ -80,10 +78,14 @@
 				input = nil
 			case sendAllow:
 				input = recvXmpp
+			case sendAbort:
+				return
+			default:
+				panic(fmt.Sprintf("unknown cmd %d", cmd))
 			}
 		case x, ok := <-input:
 			if !ok {
-				break Loop
+				return
 			}
 			if x == nil {
 				Info.Log("Refusing to send nil stanza")
@@ -99,7 +101,7 @@
 
 func (cl *Client) handleStreamError(se *streamError) {
 	Info.Logf("Received stream error: %v", se)
-	cl.socket.Close()
+	cl.inputControl <- sendAbort
 }
 
 func (cl *Client) handleFeatures(fe *Features) {
@@ -122,49 +124,12 @@
 	}
 }
 
-// readTransport() is running concurrently. We need to stop it,
-// negotiate TLS, then start it again. It calls waitForSocket() in
-// its inner loop; see below.
 func (cl *Client) handleTls(t *starttls) {
-	tcp := cl.socket
-
-	// Set the socket to nil, and wait for the reader routine to
-	// signal that it's paused.
-	cl.socket = nil
-	cl.socketSync.Add(1)
-	cl.socketSync.Wait()
-
-	// Negotiate TLS with the server.
-	tls := tls.Client(tcp, &cl.tlsConfig)
-
-	// Make the TLS connection available to the reader, and wait
-	// for it to signal that it's working again.
-	cl.socketSync.Add(1)
-	cl.socket = tls
-	cl.socketSync.Wait()
-
-	Info.Log("TLS negotiation succeeded.")
-	cl.Features = nil
+	cl.layer1.startTls(&cl.tlsConfig)
 
 	// Now re-send the initial handshake message to start the new
 	// session.
-	hsOut := &stream{To: cl.Jid.Domain, Version: XMPPVersion}
-	cl.sendXml <- hsOut
-}
-
-// Synchronize with handleTls(). Called from readTransport() when
-// cl.socket is nil.
-func (cl *Client) waitForSocket() {
-	// Signal that we've stopped reading from the socket.
-	cl.socketSync.Done()
-
-	// Wait until the socket is available again.
-	for cl.socket == nil {
-		time.Sleep(1e8)
-	}
-
-	// Signal that we're going back to the read loop.
-	cl.socketSync.Done()
+	cl.sendXml <- &stream{To: cl.Jid.Domain, Version: XMPPVersion}
 }
 
 // Register a callback to handle the next XMPP stanza (iq, message, or
--- a/xmpp/xmpp.go	Sun Sep 15 13:09:26 2013 -0600
+++ b/xmpp/xmpp.go	Sun Sep 15 16:18:20 2013 -0600
@@ -15,7 +15,6 @@
 	"io"
 	"net"
 	"reflect"
-	"sync"
 )
 
 const (
@@ -39,11 +38,18 @@
 
 // Flow control for preventing sending stanzas until negotiation has
 // completed.
-type sendCmd bool
+type sendCmd int
+
+const (
+	sendAllowConst = iota
+	sendDenyConst
+	sendAbortConst
+)
 
 var (
-	sendAllow sendCmd = true
-	sendDeny  sendCmd = false
+	sendAllow sendCmd = sendAllowConst
+	sendDeny  sendCmd = sendDenyConst
+	sendAbort sendCmd = sendAbortConst
 )
 
 // A filter can modify the XMPP traffic to or from the remote
@@ -72,8 +78,6 @@
 	// the time StartSession() returns.
 	Jid          JID
 	password     string
-	socket       net.Conn
-	socketSync   sync.WaitGroup
 	saslExpected string
 	authDone     bool
 	handlers     chan *callback
@@ -98,6 +102,7 @@
 	sendFilterAdd, recvFilterAdd chan Filter
 	// Allows the user to override the TLS configuration.
 	tlsConfig tls.Config
+	layer1    *layer1
 }
 
 // Connect to the appropriate server and authenticate as the given JID
@@ -145,7 +150,6 @@
 	cl.Roster = *roster
 	cl.password = password
 	cl.Jid = *jid
-	cl.socket = tcp
 	cl.handlers = make(chan *callback, 100)
 	cl.inputControl = make(chan sendCmd)
 	cl.tlsConfig = tlsconf
@@ -166,8 +170,7 @@
 	// Start the transport handler, initially unencrypted.
 	recvReader, recvWriter := io.Pipe()
 	sendReader, sendWriter := io.Pipe()
-	go cl.recvTransport(recvWriter)
-	go cl.sendTransport(sendReader)
+	cl.layer1 = startLayer1(tcp, recvWriter, sendReader)
 
 	// Start the reader and writer that convert to and from XML.
 	recvXmlCh := make(chan interface{})