Reorganize so we have a layered approach to IO with the server.
authorChris Jones <chris@cjones.org>
Mon, 26 Dec 2011 14:36:41 -0700
changeset 9 4fe926b03827
parent 8 30a7752cf8f7
child 10 f38b0ee7b1c1
Reorganize so we have a layered approach to IO with the server.
examples/interact.go
xmpp.go
xmpp_test.go
--- a/examples/interact.go	Mon Dec 26 11:48:35 2011 -0700
+++ b/examples/interact.go	Mon Dec 26 14:36:41 2011 -0700
@@ -37,11 +37,14 @@
 		fmt.Println("done reading")
 	}(c.In)
 
-	ch := make(chan interface{})
-	go xmpp.ReadXml(os.Stdin, ch, false)
-	for x := range ch {
-		fmt.Printf("c: %v", x)
-		c.Out <- x
+	p := make([]byte, 1024)
+	for {
+		nr, _ := os.Stdin.Read(p)
+		if nr == 0 {
+			break
+		}
+		s := string(p)
+		c.TextOut <- &s
 	}
 	fmt.Println("done sending")
 }
--- a/xmpp.go	Mon Dec 26 11:48:35 2011 -0700
+++ b/xmpp.go	Mon Dec 26 14:36:41 2011 -0700
@@ -19,16 +19,14 @@
 const (
 	serverSrv = "xmpp-server"
 	clientSrv = "xmpp-client"
-	debug = true
+	debug = false
 )
 
 // The client in a client-server XMPP connection.
 type Client struct {
 	In <-chan interface{}
-	in chan interface{}
 	Out chan<- interface{}
-	out chan interface{}
-	tcp *net.TCPConn
+	TextOut chan<- *string
 }
 var _ io.Closer = &Client{}
 
@@ -42,7 +40,7 @@
 			": " + err.String())
 	}
 
-	var c *net.TCPConn
+	var tcp *net.TCPConn
 	for _, srv := range srvs {
 		addrStr := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
 		addr, err := net.ResolveTCPAddr("tcp", addrStr)
@@ -51,53 +49,111 @@
 				addrStr, err.String()))
 			continue
 		}
-		c, err = net.DialTCP("tcp", nil, addr)
+		tcp, err = net.DialTCP("tcp", nil, addr)
 		if err != nil {
 			err = os.NewError(fmt.Sprintf("DialTCP(%s): %s",
 				addr, err.String()))
 			continue
 		}
 	}
-	if c == nil {
+	if tcp == nil {
 		return nil, err
 	}
 
-	cl := new(Client)
-	cl.tcp = c
-	cl.in = make(chan interface{})
-	cl.In = cl.in
-	cl.out = make(chan interface{})
-	cl.Out = cl.out
-	// TODO Send readXml a reader that we can close when we
-	// negotiate TLS.
-	go readXml(cl.tcp, cl.in, debug)
-	go writeXml(cl.tcp, cl.out, debug)
+	// Start the transport handler, initially unencrypted.
+	tlsr, tlsw := startTransport(tcp)
+
+	// Start the reader and writers that convert to and from XML.
+	xmlIn := startXmlReader(tlsr)
+	xmlOut := startXmlWriter(tlsw)
+	textOut := startTextWriter(tlsw)
+
+	// Start the XMPP stream handler which filters stream-level
+	// events and responds to them.
+	clIn := startStreamReader(xmlIn)
+	clOut := startStreamWriter(xmlOut)
 
 	// Initial handshake.
 	hsOut := &Stream{To: jid.Domain, Version: Version}
-	cl.Out <- hsOut
+	xmlOut <- hsOut
+
+	// TODO Wait for initialization to finish.
+
+	// Make the Client and init its fields.
+	cl := new(Client)
+	cl.In = clIn
+	cl.Out = clOut
+	cl.TextOut = textOut
 
 	return cl, nil
 }
 
 func (c *Client) Close() os.Error {
-	close(c.in)
-	close(c.out)
-	return c.tcp.Close()
+	tryClose(c.In, c.Out, c.TextOut)
+	return nil
+}
+
+func startTransport(tcp io.ReadWriter) (io.Reader, io.Writer) {
+	f := func(r io.Reader, w io.Writer, dir string) {
+		defer tryClose(r, w)
+		p := make([]byte, 1024)
+		for {
+			nr, err := r.Read(p)
+			if nr == 0 {
+				log.Printf("%s: %s", dir, err.String())
+				break
+			}
+			nw, err := w.Write(p[:nr])
+			if nw < nr {
+				log.Println("%s: %s", dir, err.String())
+				break
+			}
+		}
+	}
+	inr, inw := io.Pipe()
+	outr, outw := io.Pipe()
+	go f(tcp, inw, "read")
+	go f(outr, tcp, "write")
+	return inr, outw
 }
 
-// TODO Delete; for use only by interact.go:
-func ReadXml(r io.ReadCloser, ch chan<- interface{}, dbg bool) {
-	readXml(r, ch, dbg)
+func startXmlReader(r io.Reader) <-chan interface{} {
+	ch := make(chan interface{})
+	go readXml(r, ch)
+	return ch
+}
+
+func startXmlWriter(w io.Writer) chan<- interface{} {
+	ch := make(chan interface{})
+	go writeXml(w, ch)
+	return ch
 }
 
-func readXml(r io.Reader, ch chan<- interface{}, dbg bool) {
-	defer close(ch)
-	if dbg {
+func startTextWriter(w io.Writer) chan<- *string {
+	ch := make(chan *string)
+	go writeText(w, ch)
+	return ch
+}
+
+func startStreamReader(xmlIn <-chan interface{}) <-chan interface{} {
+	ch := make(chan interface{})
+	go readStream(xmlIn, ch)
+	return ch
+}
+
+func startStreamWriter(xmlOut chan<- interface{}) chan<- interface{} {
+	ch := make(chan interface{})
+	go writeStream(xmlOut, ch)
+	return ch
+}
+
+func readXml(r io.Reader, ch chan<- interface{}) {
+	if debug {
 		pr, pw := io.Pipe()
 		go tee(r, pw, "S: ")
 		r = pr
 	}
+	defer tryClose(r, ch)
 
 	p := xml.NewParser(r)
 	for {
@@ -127,7 +183,7 @@
 			}
 			ch <- st
 			continue
-		case "stream error":
+		case "stream error", nsStream + " error":
 			obj = &StreamError{}
 		case nsStream + " features":
 			obj = &Features{}
@@ -149,12 +205,13 @@
 	}
 }
 
-func writeXml(w io.Writer, ch <-chan interface{}, dbg bool) {
-	if dbg {
+func writeXml(w io.Writer, ch <-chan interface{}) {
+	if debug {
 		pr, pw := io.Pipe()
 		go tee(pr, w, "C: ")
 		w = pw
 	}
+	defer tryClose(w, ch)
 
 	for obj := range ch {
 		err := xml.Marshal(w, obj)
@@ -165,14 +222,41 @@
 	}
 }
 
+func writeText(w io.Writer, ch <-chan *string) {
+	if debug {
+		pr, pw := io.Pipe()
+		go tee(pr, w, "C: ")
+		w = pw
+	}
+	defer tryClose(w, ch)
+
+	for str := range ch {
+		_, err := w.Write([]byte(*str))
+		if err != nil {
+			log.Printf("writeStr: %v", err)
+			break
+		}
+	}
+}
+
+func readStream(srvIn <-chan interface{}, cliOut chan<- interface{}) {
+	defer tryClose(srvIn, cliOut)
+
+	for x := range srvIn {
+		cliOut <- x
+	}
+}
+
+func writeStream(srvOut chan<- interface{}, cliIn <-chan interface{}) {
+	defer tryClose(srvOut, cliIn)
+
+	for x := range cliIn {
+		srvOut <- x
+	}
+}
+
 func tee(r io.Reader, w io.Writer, prefix string) {
-	defer func(xs ...interface{}) {
-		for _, x := range xs {
-			if c, ok := x.(io.Closer) ; ok {
-				c.Close()
-			}
-		}
-	}(r, w)
+	defer tryClose(r, w)
 
 	buf := bytes.NewBuffer(nil)
 	for {
@@ -181,13 +265,13 @@
 		if n == 0 {
 			break
 		}
-		n, _ = w.Write(c[:])
+		n, _ = w.Write(c[:n])
 		if n == 0 {
 			break
 		}
-		buf.Write(c[:])
-		if c[0] == '\n' {
-			fmt.Printf("%s%s", prefix, buf.String())
+		buf.Write(c[:n])
+		if c[0] == '\n' || c[0] == '>' {
+			fmt.Printf("%s%s\n", prefix, buf.String())
 			buf.Reset()
 		}
 	}
@@ -196,3 +280,28 @@
 		fmt.Printf("%s%s\n", prefix, leftover)
 	}
 }
+
+func tryClose(xs ...interface{}) {
+	f1 := func(ch chan<- interface{}) {
+		defer func() {
+			recover()
+		}()
+		close(ch)
+	}
+	f2 := func(ch <-chan interface{}) {
+		defer func() {
+			recover()
+		}()
+		close(ch)
+	}
+
+	for _, x := range xs {
+		if c, ok := x.(io.Closer) ; ok {
+			c.Close()
+		} else if ch, ok := x.(chan<- interface{}) ; ok {
+			f1(ch)
+		} else if ch, ok := x.(<-chan interface{}) ; ok {
+			f2(ch)
+		}
+	}
+}
--- a/xmpp_test.go	Mon Dec 26 11:48:35 2011 -0700
+++ b/xmpp_test.go	Mon Dec 26 14:36:41 2011 -0700
@@ -16,7 +16,7 @@
 func TestReadError(t *testing.T) {
 	r := strings.NewReader(`<stream:error><bad-foo/></stream:error>`)
 	ch := make(chan interface{})
-	go readXml(r, ch, false)
+	go readXml(r, ch)
 	x := <- ch
 	se, ok := x.(*StreamError)
 	if !ok {
@@ -32,7 +32,7 @@
 		`<text xml:lang="en" xmlns="` + nsStreams +
 		`">Error text</text></stream:error>`)
 	ch = make(chan interface{})
-	go readXml(r, ch, false)
+	go readXml(r, ch)
 	x = <- ch
 	se, ok = x.(*StreamError)
 	if !ok {
@@ -50,7 +50,7 @@
 		`xmlns="jabber:client" xmlns:stream="` + nsStream +
 		`" version="1.0">`)
 	ch := make(chan interface{})
-	go readXml(r, ch, false)
+	go readXml(r, ch)
 	x := <- ch
 	ss, ok := x.(*Stream)
 	if !ok {
@@ -69,7 +69,7 @@
 	wg.Add(1)
 	go func() {
 		defer wg.Done()
-		writeXml(w, ch, true)
+		writeXml(w, ch)
 	}()
 	ch <- obj
 	close(ch)