xmpp.go
changeset 9 4fe926b03827
parent 8 30a7752cf8f7
child 10 f38b0ee7b1c1
--- a/xmpp.go	Mon Dec 26 11:48:35 2011 -0700
+++ b/xmpp.go	Mon Dec 26 14:36:41 2011 -0700
@@ -19,16 +19,14 @@
 const (
 	serverSrv = "xmpp-server"
 	clientSrv = "xmpp-client"
-	debug = true
+	debug = false
 )
 
 // The client in a client-server XMPP connection.
 type Client struct {
 	In <-chan interface{}
-	in chan interface{}
 	Out chan<- interface{}
-	out chan interface{}
-	tcp *net.TCPConn
+	TextOut chan<- *string
 }
 var _ io.Closer = &Client{}
 
@@ -42,7 +40,7 @@
 			": " + err.String())
 	}
 
-	var c *net.TCPConn
+	var tcp *net.TCPConn
 	for _, srv := range srvs {
 		addrStr := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
 		addr, err := net.ResolveTCPAddr("tcp", addrStr)
@@ -51,53 +49,111 @@
 				addrStr, err.String()))
 			continue
 		}
-		c, err = net.DialTCP("tcp", nil, addr)
+		tcp, err = net.DialTCP("tcp", nil, addr)
 		if err != nil {
 			err = os.NewError(fmt.Sprintf("DialTCP(%s): %s",
 				addr, err.String()))
 			continue
 		}
 	}
-	if c == nil {
+	if tcp == nil {
 		return nil, err
 	}
 
-	cl := new(Client)
-	cl.tcp = c
-	cl.in = make(chan interface{})
-	cl.In = cl.in
-	cl.out = make(chan interface{})
-	cl.Out = cl.out
-	// TODO Send readXml a reader that we can close when we
-	// negotiate TLS.
-	go readXml(cl.tcp, cl.in, debug)
-	go writeXml(cl.tcp, cl.out, debug)
+	// Start the transport handler, initially unencrypted.
+	tlsr, tlsw := startTransport(tcp)
+
+	// Start the reader and writers that convert to and from XML.
+	xmlIn := startXmlReader(tlsr)
+	xmlOut := startXmlWriter(tlsw)
+	textOut := startTextWriter(tlsw)
+
+	// Start the XMPP stream handler which filters stream-level
+	// events and responds to them.
+	clIn := startStreamReader(xmlIn)
+	clOut := startStreamWriter(xmlOut)
 
 	// Initial handshake.
 	hsOut := &Stream{To: jid.Domain, Version: Version}
-	cl.Out <- hsOut
+	xmlOut <- hsOut
+
+	// TODO Wait for initialization to finish.
+
+	// Make the Client and init its fields.
+	cl := new(Client)
+	cl.In = clIn
+	cl.Out = clOut
+	cl.TextOut = textOut
 
 	return cl, nil
 }
 
 func (c *Client) Close() os.Error {
-	close(c.in)
-	close(c.out)
-	return c.tcp.Close()
+	tryClose(c.In, c.Out, c.TextOut)
+	return nil
+}
+
+func startTransport(tcp io.ReadWriter) (io.Reader, io.Writer) {
+	f := func(r io.Reader, w io.Writer, dir string) {
+		defer tryClose(r, w)
+		p := make([]byte, 1024)
+		for {
+			nr, err := r.Read(p)
+			if nr == 0 {
+				log.Printf("%s: %s", dir, err.String())
+				break
+			}
+			nw, err := w.Write(p[:nr])
+			if nw < nr {
+				log.Println("%s: %s", dir, err.String())
+				break
+			}
+		}
+	}
+	inr, inw := io.Pipe()
+	outr, outw := io.Pipe()
+	go f(tcp, inw, "read")
+	go f(outr, tcp, "write")
+	return inr, outw
 }
 
-// TODO Delete; for use only by interact.go:
-func ReadXml(r io.ReadCloser, ch chan<- interface{}, dbg bool) {
-	readXml(r, ch, dbg)
+func startXmlReader(r io.Reader) <-chan interface{} {
+	ch := make(chan interface{})
+	go readXml(r, ch)
+	return ch
+}
+
+func startXmlWriter(w io.Writer) chan<- interface{} {
+	ch := make(chan interface{})
+	go writeXml(w, ch)
+	return ch
 }
 
-func readXml(r io.Reader, ch chan<- interface{}, dbg bool) {
-	defer close(ch)
-	if dbg {
+func startTextWriter(w io.Writer) chan<- *string {
+	ch := make(chan *string)
+	go writeText(w, ch)
+	return ch
+}
+
+func startStreamReader(xmlIn <-chan interface{}) <-chan interface{} {
+	ch := make(chan interface{})
+	go readStream(xmlIn, ch)
+	return ch
+}
+
+func startStreamWriter(xmlOut chan<- interface{}) chan<- interface{} {
+	ch := make(chan interface{})
+	go writeStream(xmlOut, ch)
+	return ch
+}
+
+func readXml(r io.Reader, ch chan<- interface{}) {
+	if debug {
 		pr, pw := io.Pipe()
 		go tee(r, pw, "S: ")
 		r = pr
 	}
+	defer tryClose(r, ch)
 
 	p := xml.NewParser(r)
 	for {
@@ -127,7 +183,7 @@
 			}
 			ch <- st
 			continue
-		case "stream error":
+		case "stream error", nsStream + " error":
 			obj = &StreamError{}
 		case nsStream + " features":
 			obj = &Features{}
@@ -149,12 +205,13 @@
 	}
 }
 
-func writeXml(w io.Writer, ch <-chan interface{}, dbg bool) {
-	if dbg {
+func writeXml(w io.Writer, ch <-chan interface{}) {
+	if debug {
 		pr, pw := io.Pipe()
 		go tee(pr, w, "C: ")
 		w = pw
 	}
+	defer tryClose(w, ch)
 
 	for obj := range ch {
 		err := xml.Marshal(w, obj)
@@ -165,14 +222,41 @@
 	}
 }
 
+func writeText(w io.Writer, ch <-chan *string) {
+	if debug {
+		pr, pw := io.Pipe()
+		go tee(pr, w, "C: ")
+		w = pw
+	}
+	defer tryClose(w, ch)
+
+	for str := range ch {
+		_, err := w.Write([]byte(*str))
+		if err != nil {
+			log.Printf("writeStr: %v", err)
+			break
+		}
+	}
+}
+
+func readStream(srvIn <-chan interface{}, cliOut chan<- interface{}) {
+	defer tryClose(srvIn, cliOut)
+
+	for x := range srvIn {
+		cliOut <- x
+	}
+}
+
+func writeStream(srvOut chan<- interface{}, cliIn <-chan interface{}) {
+	defer tryClose(srvOut, cliIn)
+
+	for x := range cliIn {
+		srvOut <- x
+	}
+}
+
 func tee(r io.Reader, w io.Writer, prefix string) {
-	defer func(xs ...interface{}) {
-		for _, x := range xs {
-			if c, ok := x.(io.Closer) ; ok {
-				c.Close()
-			}
-		}
-	}(r, w)
+	defer tryClose(r, w)
 
 	buf := bytes.NewBuffer(nil)
 	for {
@@ -181,13 +265,13 @@
 		if n == 0 {
 			break
 		}
-		n, _ = w.Write(c[:])
+		n, _ = w.Write(c[:n])
 		if n == 0 {
 			break
 		}
-		buf.Write(c[:])
-		if c[0] == '\n' {
-			fmt.Printf("%s%s", prefix, buf.String())
+		buf.Write(c[:n])
+		if c[0] == '\n' || c[0] == '>' {
+			fmt.Printf("%s%s\n", prefix, buf.String())
 			buf.Reset()
 		}
 	}
@@ -196,3 +280,28 @@
 		fmt.Printf("%s%s\n", prefix, leftover)
 	}
 }
+
+func tryClose(xs ...interface{}) {
+	f1 := func(ch chan<- interface{}) {
+		defer func() {
+			recover()
+		}()
+		close(ch)
+	}
+	f2 := func(ch <-chan interface{}) {
+		defer func() {
+			recover()
+		}()
+		close(ch)
+	}
+
+	for _, x := range xs {
+		if c, ok := x.(io.Closer) ; ok {
+			c.Close()
+		} else if ch, ok := x.(chan<- interface{}) ; ok {
+			f1(ch)
+		} else if ch, ok := x.(<-chan interface{}) ; ok {
+			f2(ch)
+		}
+	}
+}