diff -r 30a7752cf8f7 -r 4fe926b03827 xmpp.go --- a/xmpp.go Mon Dec 26 11:48:35 2011 -0700 +++ b/xmpp.go Mon Dec 26 14:36:41 2011 -0700 @@ -19,16 +19,14 @@ const ( serverSrv = "xmpp-server" clientSrv = "xmpp-client" - debug = true + debug = false ) // The client in a client-server XMPP connection. type Client struct { In <-chan interface{} - in chan interface{} Out chan<- interface{} - out chan interface{} - tcp *net.TCPConn + TextOut chan<- *string } var _ io.Closer = &Client{} @@ -42,7 +40,7 @@ ": " + err.String()) } - var c *net.TCPConn + var tcp *net.TCPConn for _, srv := range srvs { addrStr := fmt.Sprintf("%s:%d", srv.Target, srv.Port) addr, err := net.ResolveTCPAddr("tcp", addrStr) @@ -51,53 +49,111 @@ addrStr, err.String())) continue } - c, err = net.DialTCP("tcp", nil, addr) + tcp, err = net.DialTCP("tcp", nil, addr) if err != nil { err = os.NewError(fmt.Sprintf("DialTCP(%s): %s", addr, err.String())) continue } } - if c == nil { + if tcp == nil { return nil, err } - cl := new(Client) - cl.tcp = c - cl.in = make(chan interface{}) - cl.In = cl.in - cl.out = make(chan interface{}) - cl.Out = cl.out - // TODO Send readXml a reader that we can close when we - // negotiate TLS. - go readXml(cl.tcp, cl.in, debug) - go writeXml(cl.tcp, cl.out, debug) + // Start the transport handler, initially unencrypted. + tlsr, tlsw := startTransport(tcp) + + // Start the reader and writers that convert to and from XML. + xmlIn := startXmlReader(tlsr) + xmlOut := startXmlWriter(tlsw) + textOut := startTextWriter(tlsw) + + // Start the XMPP stream handler which filters stream-level + // events and responds to them. + clIn := startStreamReader(xmlIn) + clOut := startStreamWriter(xmlOut) // Initial handshake. hsOut := &Stream{To: jid.Domain, Version: Version} - cl.Out <- hsOut + xmlOut <- hsOut + + // TODO Wait for initialization to finish. + + // Make the Client and init its fields. + cl := new(Client) + cl.In = clIn + cl.Out = clOut + cl.TextOut = textOut return cl, nil } func (c *Client) Close() os.Error { - close(c.in) - close(c.out) - return c.tcp.Close() + tryClose(c.In, c.Out, c.TextOut) + return nil +} + +func startTransport(tcp io.ReadWriter) (io.Reader, io.Writer) { + f := func(r io.Reader, w io.Writer, dir string) { + defer tryClose(r, w) + p := make([]byte, 1024) + for { + nr, err := r.Read(p) + if nr == 0 { + log.Printf("%s: %s", dir, err.String()) + break + } + nw, err := w.Write(p[:nr]) + if nw < nr { + log.Println("%s: %s", dir, err.String()) + break + } + } + } + inr, inw := io.Pipe() + outr, outw := io.Pipe() + go f(tcp, inw, "read") + go f(outr, tcp, "write") + return inr, outw } -// TODO Delete; for use only by interact.go: -func ReadXml(r io.ReadCloser, ch chan<- interface{}, dbg bool) { - readXml(r, ch, dbg) +func startXmlReader(r io.Reader) <-chan interface{} { + ch := make(chan interface{}) + go readXml(r, ch) + return ch +} + +func startXmlWriter(w io.Writer) chan<- interface{} { + ch := make(chan interface{}) + go writeXml(w, ch) + return ch } -func readXml(r io.Reader, ch chan<- interface{}, dbg bool) { - defer close(ch) - if dbg { +func startTextWriter(w io.Writer) chan<- *string { + ch := make(chan *string) + go writeText(w, ch) + return ch +} + +func startStreamReader(xmlIn <-chan interface{}) <-chan interface{} { + ch := make(chan interface{}) + go readStream(xmlIn, ch) + return ch +} + +func startStreamWriter(xmlOut chan<- interface{}) chan<- interface{} { + ch := make(chan interface{}) + go writeStream(xmlOut, ch) + return ch +} + +func readXml(r io.Reader, ch chan<- interface{}) { + if debug { pr, pw := io.Pipe() go tee(r, pw, "S: ") r = pr } + defer tryClose(r, ch) p := xml.NewParser(r) for { @@ -127,7 +183,7 @@ } ch <- st continue - case "stream error": + case "stream error", nsStream + " error": obj = &StreamError{} case nsStream + " features": obj = &Features{} @@ -149,12 +205,13 @@ } } -func writeXml(w io.Writer, ch <-chan interface{}, dbg bool) { - if dbg { +func writeXml(w io.Writer, ch <-chan interface{}) { + if debug { pr, pw := io.Pipe() go tee(pr, w, "C: ") w = pw } + defer tryClose(w, ch) for obj := range ch { err := xml.Marshal(w, obj) @@ -165,14 +222,41 @@ } } +func writeText(w io.Writer, ch <-chan *string) { + if debug { + pr, pw := io.Pipe() + go tee(pr, w, "C: ") + w = pw + } + defer tryClose(w, ch) + + for str := range ch { + _, err := w.Write([]byte(*str)) + if err != nil { + log.Printf("writeStr: %v", err) + break + } + } +} + +func readStream(srvIn <-chan interface{}, cliOut chan<- interface{}) { + defer tryClose(srvIn, cliOut) + + for x := range srvIn { + cliOut <- x + } +} + +func writeStream(srvOut chan<- interface{}, cliIn <-chan interface{}) { + defer tryClose(srvOut, cliIn) + + for x := range cliIn { + srvOut <- x + } +} + func tee(r io.Reader, w io.Writer, prefix string) { - defer func(xs ...interface{}) { - for _, x := range xs { - if c, ok := x.(io.Closer) ; ok { - c.Close() - } - } - }(r, w) + defer tryClose(r, w) buf := bytes.NewBuffer(nil) for { @@ -181,13 +265,13 @@ if n == 0 { break } - n, _ = w.Write(c[:]) + n, _ = w.Write(c[:n]) if n == 0 { break } - buf.Write(c[:]) - if c[0] == '\n' { - fmt.Printf("%s%s", prefix, buf.String()) + buf.Write(c[:n]) + if c[0] == '\n' || c[0] == '>' { + fmt.Printf("%s%s\n", prefix, buf.String()) buf.Reset() } } @@ -196,3 +280,28 @@ fmt.Printf("%s%s\n", prefix, leftover) } } + +func tryClose(xs ...interface{}) { + f1 := func(ch chan<- interface{}) { + defer func() { + recover() + }() + close(ch) + } + f2 := func(ch <-chan interface{}) { + defer func() { + recover() + }() + close(ch) + } + + for _, x := range xs { + if c, ok := x.(io.Closer) ; ok { + c.Close() + } else if ch, ok := x.(chan<- interface{}) ; ok { + f1(ch) + } else if ch, ok := x.(<-chan interface{}) ; ok { + f2(ch) + } + } +}