diff -r 4fe926b03827 -r f38b0ee7b1c1 xmpp.go --- a/xmpp.go Mon Dec 26 14:36:41 2011 -0700 +++ b/xmpp.go Mon Dec 26 18:07:14 2011 -0700 @@ -10,22 +10,35 @@ "bytes" "fmt" "io" - "log" "net" "os" - "xml" + "sync" ) const ( + // Version of RFC 3920 that we implement. + Version = "1.0" + + // Various XML namespaces. + nsStreams = "urn:ietf:params:xml:ns:xmpp-streams" + nsStream = "http://etherx.jabber.org/streams" + nsTLS = "urn:ietf:params:xml:ns:xmpp-tls" + + // DNS SRV names serverSrv = "xmpp-server" clientSrv = "xmpp-client" - debug = false + + debug = true ) // The client in a client-server XMPP connection. type Client struct { + Jid JID + socket net.Conn + socketSync sync.WaitGroup In <-chan interface{} Out chan<- interface{} + xmlOut chan<- interface{} TextOut chan<- *string } var _ io.Closer = &Client{} @@ -60,27 +73,29 @@ return nil, err } + cl := new(Client) + cl.Jid = *jid + cl.socket = tcp + // Start the transport handler, initially unencrypted. - tlsr, tlsw := startTransport(tcp) + tlsr, tlsw := cl.startTransport() // Start the reader and writers that convert to and from XML. xmlIn := startXmlReader(tlsr) - xmlOut := startXmlWriter(tlsw) + cl.xmlOut = startXmlWriter(tlsw) textOut := startTextWriter(tlsw) // Start the XMPP stream handler which filters stream-level // events and responds to them. - clIn := startStreamReader(xmlIn) - clOut := startStreamWriter(xmlOut) + clIn := cl.startStreamReader(xmlIn, cl.xmlOut) + clOut := startStreamWriter(cl.xmlOut) // Initial handshake. hsOut := &Stream{To: jid.Domain, Version: Version} - xmlOut <- hsOut + cl.xmlOut <- hsOut // TODO Wait for initialization to finish. - // Make the Client and init its fields. - cl := new(Client) cl.In = clIn cl.Out = clOut cl.TextOut = textOut @@ -93,27 +108,11 @@ return nil } -func startTransport(tcp io.ReadWriter) (io.Reader, io.Writer) { - f := func(r io.Reader, w io.Writer, dir string) { - defer tryClose(r, w) - p := make([]byte, 1024) - for { - nr, err := r.Read(p) - if nr == 0 { - log.Printf("%s: %s", dir, err.String()) - break - } - nw, err := w.Write(p[:nr]) - if nw < nr { - log.Println("%s: %s", dir, err.String()) - break - } - } - } +func (cl *Client) startTransport() (io.Reader, io.Writer) { inr, inw := io.Pipe() outr, outw := io.Pipe() - go f(tcp, inw, "read") - go f(outr, tcp, "write") + go cl.readTransport(inw) + go cl.writeTransport(outr) return inr, outw } @@ -135,9 +134,9 @@ return ch } -func startStreamReader(xmlIn <-chan interface{}) <-chan interface{} { +func (cl *Client) startStreamReader(xmlIn <-chan interface{}, srvOut chan<- interface{}) <-chan interface{} { ch := make(chan interface{}) - go readStream(xmlIn, ch) + go cl.readStream(xmlIn, srvOut, ch) return ch } @@ -147,114 +146,6 @@ return ch } -func readXml(r io.Reader, ch chan<- interface{}) { - if debug { - pr, pw := io.Pipe() - go tee(r, pw, "S: ") - r = pr - } - defer tryClose(r, ch) - - p := xml.NewParser(r) - for { - // Sniff the next token on the stream. - t, err := p.Token() - if t == nil { - if err != os.EOF { - log.Printf("read: %v", err) - } - break - } - var se xml.StartElement - var ok bool - if se, ok = t.(xml.StartElement) ; !ok { - continue - } - - // Allocate the appropriate structure for this token. - var obj interface{} - switch se.Name.Space + " " + se.Name.Local { - case nsStream + " stream": - st, err := parseStream(se) - if err != nil { - log.Printf("unmarshal stream: %v", - err) - break - } - ch <- st - continue - case "stream error", nsStream + " error": - obj = &StreamError{} - case nsStream + " features": - obj = &Features{} - default: - obj = &Unrecognized{} - log.Printf("Ignoring unrecognized: %s %s\n", - se.Name.Space, se.Name.Local) - } - - // Read the complete XML stanza. - err = p.Unmarshal(obj, &se) - if err != nil { - log.Printf("unmarshal: %v", err) - break - } - - // Put it on the channel. - ch <- obj - } -} - -func writeXml(w io.Writer, ch <-chan interface{}) { - if debug { - pr, pw := io.Pipe() - go tee(pr, w, "C: ") - w = pw - } - defer tryClose(w, ch) - - for obj := range ch { - err := xml.Marshal(w, obj) - if err != nil { - log.Printf("write: %v", err) - break - } - } -} - -func writeText(w io.Writer, ch <-chan *string) { - if debug { - pr, pw := io.Pipe() - go tee(pr, w, "C: ") - w = pw - } - defer tryClose(w, ch) - - for str := range ch { - _, err := w.Write([]byte(*str)) - if err != nil { - log.Printf("writeStr: %v", err) - break - } - } -} - -func readStream(srvIn <-chan interface{}, cliOut chan<- interface{}) { - defer tryClose(srvIn, cliOut) - - for x := range srvIn { - cliOut <- x - } -} - -func writeStream(srvOut chan<- interface{}, cliIn <-chan interface{}) { - defer tryClose(srvOut, cliIn) - - for x := range cliIn { - srvOut <- x - } -} - func tee(r io.Reader, w io.Writer, prefix string) { defer tryClose(r, w)