# HG changeset patch # User Chris Jones # Date 1379283500 21600 # Node ID b1b4900eee5b0aae6e55f0bdc6c774d534ac2d40 # Parent d7679d991b1739ee3bfe9ee629ca7e1ecad5150a Made layers 1 and 3 more modular, shrinking the surface area of the coupling between them. diff -r d7679d991b17 -r b1b4900eee5b xmpp/layer1.go --- a/xmpp/layer1.go Sun Sep 15 13:09:26 2013 -0600 +++ b/xmpp/layer1.go Sun Sep 15 16:18:20 2013 -0600 @@ -4,50 +4,113 @@ package xmpp import ( + "crypto/tls" "io" "net" "time" ) -func (cl *Client) recvTransport(w io.WriteCloser) { +var l1interval = time.Second + +type layer1 struct { + sock net.Conn + recvSocks chan<- net.Conn + sendSocks chan net.Conn +} + +func startLayer1(sock net.Conn, recvWriter io.WriteCloser, + sendReader io.ReadCloser) *layer1 { + l1 := layer1{sock: sock} + recvSocks := make(chan net.Conn) + l1.recvSocks = recvSocks + sendSocks := make(chan net.Conn, 1) + l1.sendSocks = sendSocks + go recvTransport(recvSocks, recvWriter) + go sendTransport(sendSocks, sendReader) + recvSocks <- sock + sendSocks <- sock + return &l1 +} + +func (l1 *layer1) startTls(conf *tls.Config) { + sendSockToSender := func(sock net.Conn) { + for { + select { + case <-l1.sendSocks: + case l1.sendSocks <- sock: + return + } + } + } + + sendSockToSender(nil) + l1.recvSocks <- nil + l1.sock = tls.Client(l1.sock, conf) + sendSockToSender(l1.sock) + l1.recvSocks <- l1.sock +} + +func recvTransport(socks <-chan net.Conn, w io.WriteCloser) { defer w.Close() + var sock net.Conn p := make([]byte, 1024) for { - if cl.socket == nil { - cl.waitForSocket() + select { + case sock = <-socks: + default: } - cl.socket.SetReadDeadline(time.Now().Add(time.Second)) - nr, err := cl.socket.Read(p) - if nr == 0 { - if errno, ok := err.(*net.OpError); ok { - if errno.Timeout() { - continue + + if sock == nil { + time.Sleep(l1interval) + } else { + sock.SetReadDeadline(time.Now().Add(l1interval)) + nr, err := sock.Read(p) + if nr == 0 { + if errno, ok := err.(*net.OpError); ok { + if errno.Timeout() { + continue + } } + Warn.Logf("recvTransport: %s", err) + break } - Warn.Logf("read: %s", err) - break - } - nw, err := w.Write(p[:nr]) - if nw < nr { - Warn.Logf("read: %s", err) - break + nw, err := w.Write(p[:nr]) + if nw < nr { + Warn.Logf("recvTransport: %s", err) + break + } } } } -func (cl *Client) sendTransport(r io.Reader) { - defer cl.socket.Close() +func sendTransport(socks <-chan net.Conn, r io.Reader) { + var sock net.Conn p := make([]byte, 1024) for { nr, err := r.Read(p) if nr == 0 { - Warn.Logf("write: %s", err) + Warn.Logf("sendTransport: %s", err) break } - nw, err := cl.socket.Write(p[:nr]) - if nw < nr { - Warn.Logf("write: %s", err) - break + for nr > 0 { + select { + case sock = <-socks: + if sock != nil { + defer sock.Close() + } + default: + } + + if sock == nil { + time.Sleep(l1interval) + } else { + nw, err := sock.Write(p[:nr]) + nr -= nw + if nr != 0 { + Warn.Logf("write: %s", err) + break + } + } } } } diff -r d7679d991b17 -r b1b4900eee5b xmpp/layer3.go --- a/xmpp/layer3.go Sun Sep 15 13:09:26 2013 -0600 +++ b/xmpp/layer3.go Sun Sep 15 16:18:20 2013 -0600 @@ -4,9 +4,8 @@ package xmpp import ( - "crypto/tls" "encoding/xml" - "time" + "fmt" ) // Callback to handle a stanza with a particular id. @@ -71,7 +70,6 @@ defer close(sendXml) var input <-chan Stanza -Loop: for { select { case cmd := <-control: @@ -80,10 +78,14 @@ input = nil case sendAllow: input = recvXmpp + case sendAbort: + return + default: + panic(fmt.Sprintf("unknown cmd %d", cmd)) } case x, ok := <-input: if !ok { - break Loop + return } if x == nil { Info.Log("Refusing to send nil stanza") @@ -99,7 +101,7 @@ func (cl *Client) handleStreamError(se *streamError) { Info.Logf("Received stream error: %v", se) - cl.socket.Close() + cl.inputControl <- sendAbort } func (cl *Client) handleFeatures(fe *Features) { @@ -122,49 +124,12 @@ } } -// readTransport() is running concurrently. We need to stop it, -// negotiate TLS, then start it again. It calls waitForSocket() in -// its inner loop; see below. func (cl *Client) handleTls(t *starttls) { - tcp := cl.socket - - // Set the socket to nil, and wait for the reader routine to - // signal that it's paused. - cl.socket = nil - cl.socketSync.Add(1) - cl.socketSync.Wait() - - // Negotiate TLS with the server. - tls := tls.Client(tcp, &cl.tlsConfig) - - // Make the TLS connection available to the reader, and wait - // for it to signal that it's working again. - cl.socketSync.Add(1) - cl.socket = tls - cl.socketSync.Wait() - - Info.Log("TLS negotiation succeeded.") - cl.Features = nil + cl.layer1.startTls(&cl.tlsConfig) // Now re-send the initial handshake message to start the new // session. - hsOut := &stream{To: cl.Jid.Domain, Version: XMPPVersion} - cl.sendXml <- hsOut -} - -// Synchronize with handleTls(). Called from readTransport() when -// cl.socket is nil. -func (cl *Client) waitForSocket() { - // Signal that we've stopped reading from the socket. - cl.socketSync.Done() - - // Wait until the socket is available again. - for cl.socket == nil { - time.Sleep(1e8) - } - - // Signal that we're going back to the read loop. - cl.socketSync.Done() + cl.sendXml <- &stream{To: cl.Jid.Domain, Version: XMPPVersion} } // Register a callback to handle the next XMPP stanza (iq, message, or diff -r d7679d991b17 -r b1b4900eee5b xmpp/xmpp.go --- a/xmpp/xmpp.go Sun Sep 15 13:09:26 2013 -0600 +++ b/xmpp/xmpp.go Sun Sep 15 16:18:20 2013 -0600 @@ -15,7 +15,6 @@ "io" "net" "reflect" - "sync" ) const ( @@ -39,11 +38,18 @@ // Flow control for preventing sending stanzas until negotiation has // completed. -type sendCmd bool +type sendCmd int + +const ( + sendAllowConst = iota + sendDenyConst + sendAbortConst +) var ( - sendAllow sendCmd = true - sendDeny sendCmd = false + sendAllow sendCmd = sendAllowConst + sendDeny sendCmd = sendDenyConst + sendAbort sendCmd = sendAbortConst ) // A filter can modify the XMPP traffic to or from the remote @@ -72,8 +78,6 @@ // the time StartSession() returns. Jid JID password string - socket net.Conn - socketSync sync.WaitGroup saslExpected string authDone bool handlers chan *callback @@ -98,6 +102,7 @@ sendFilterAdd, recvFilterAdd chan Filter // Allows the user to override the TLS configuration. tlsConfig tls.Config + layer1 *layer1 } // Connect to the appropriate server and authenticate as the given JID @@ -145,7 +150,6 @@ cl.Roster = *roster cl.password = password cl.Jid = *jid - cl.socket = tcp cl.handlers = make(chan *callback, 100) cl.inputControl = make(chan sendCmd) cl.tlsConfig = tlsconf @@ -166,8 +170,7 @@ // Start the transport handler, initially unencrypted. recvReader, recvWriter := io.Pipe() sendReader, sendWriter := io.Pipe() - go cl.recvTransport(recvWriter) - go cl.sendTransport(sendReader) + cl.layer1 = startLayer1(tcp, recvWriter, sendReader) // Start the reader and writer that convert to and from XML. recvXmlCh := make(chan interface{})