--- a/xmpp.go Mon Dec 26 14:36:41 2011 -0700
+++ b/xmpp.go Mon Dec 26 18:07:14 2011 -0700
@@ -10,22 +10,35 @@
"bytes"
"fmt"
"io"
- "log"
"net"
"os"
- "xml"
+ "sync"
)
const (
+ // Version of RFC 3920 that we implement.
+ Version = "1.0"
+
+ // Various XML namespaces.
+ nsStreams = "urn:ietf:params:xml:ns:xmpp-streams"
+ nsStream = "http://etherx.jabber.org/streams"
+ nsTLS = "urn:ietf:params:xml:ns:xmpp-tls"
+
+ // DNS SRV names
serverSrv = "xmpp-server"
clientSrv = "xmpp-client"
- debug = false
+
+ debug = true
)
// The client in a client-server XMPP connection.
type Client struct {
+ Jid JID
+ socket net.Conn
+ socketSync sync.WaitGroup
In <-chan interface{}
Out chan<- interface{}
+ xmlOut chan<- interface{}
TextOut chan<- *string
}
var _ io.Closer = &Client{}
@@ -60,27 +73,29 @@
return nil, err
}
+ cl := new(Client)
+ cl.Jid = *jid
+ cl.socket = tcp
+
// Start the transport handler, initially unencrypted.
- tlsr, tlsw := startTransport(tcp)
+ tlsr, tlsw := cl.startTransport()
// Start the reader and writers that convert to and from XML.
xmlIn := startXmlReader(tlsr)
- xmlOut := startXmlWriter(tlsw)
+ cl.xmlOut = startXmlWriter(tlsw)
textOut := startTextWriter(tlsw)
// Start the XMPP stream handler which filters stream-level
// events and responds to them.
- clIn := startStreamReader(xmlIn)
- clOut := startStreamWriter(xmlOut)
+ clIn := cl.startStreamReader(xmlIn, cl.xmlOut)
+ clOut := startStreamWriter(cl.xmlOut)
// Initial handshake.
hsOut := &Stream{To: jid.Domain, Version: Version}
- xmlOut <- hsOut
+ cl.xmlOut <- hsOut
// TODO Wait for initialization to finish.
- // Make the Client and init its fields.
- cl := new(Client)
cl.In = clIn
cl.Out = clOut
cl.TextOut = textOut
@@ -93,27 +108,11 @@
return nil
}
-func startTransport(tcp io.ReadWriter) (io.Reader, io.Writer) {
- f := func(r io.Reader, w io.Writer, dir string) {
- defer tryClose(r, w)
- p := make([]byte, 1024)
- for {
- nr, err := r.Read(p)
- if nr == 0 {
- log.Printf("%s: %s", dir, err.String())
- break
- }
- nw, err := w.Write(p[:nr])
- if nw < nr {
- log.Println("%s: %s", dir, err.String())
- break
- }
- }
- }
+func (cl *Client) startTransport() (io.Reader, io.Writer) {
inr, inw := io.Pipe()
outr, outw := io.Pipe()
- go f(tcp, inw, "read")
- go f(outr, tcp, "write")
+ go cl.readTransport(inw)
+ go cl.writeTransport(outr)
return inr, outw
}
@@ -135,9 +134,9 @@
return ch
}
-func startStreamReader(xmlIn <-chan interface{}) <-chan interface{} {
+func (cl *Client) startStreamReader(xmlIn <-chan interface{}, srvOut chan<- interface{}) <-chan interface{} {
ch := make(chan interface{})
- go readStream(xmlIn, ch)
+ go cl.readStream(xmlIn, srvOut, ch)
return ch
}
@@ -147,114 +146,6 @@
return ch
}
-func readXml(r io.Reader, ch chan<- interface{}) {
- if debug {
- pr, pw := io.Pipe()
- go tee(r, pw, "S: ")
- r = pr
- }
- defer tryClose(r, ch)
-
- p := xml.NewParser(r)
- for {
- // Sniff the next token on the stream.
- t, err := p.Token()
- if t == nil {
- if err != os.EOF {
- log.Printf("read: %v", err)
- }
- break
- }
- var se xml.StartElement
- var ok bool
- if se, ok = t.(xml.StartElement) ; !ok {
- continue
- }
-
- // Allocate the appropriate structure for this token.
- var obj interface{}
- switch se.Name.Space + " " + se.Name.Local {
- case nsStream + " stream":
- st, err := parseStream(se)
- if err != nil {
- log.Printf("unmarshal stream: %v",
- err)
- break
- }
- ch <- st
- continue
- case "stream error", nsStream + " error":
- obj = &StreamError{}
- case nsStream + " features":
- obj = &Features{}
- default:
- obj = &Unrecognized{}
- log.Printf("Ignoring unrecognized: %s %s\n",
- se.Name.Space, se.Name.Local)
- }
-
- // Read the complete XML stanza.
- err = p.Unmarshal(obj, &se)
- if err != nil {
- log.Printf("unmarshal: %v", err)
- break
- }
-
- // Put it on the channel.
- ch <- obj
- }
-}
-
-func writeXml(w io.Writer, ch <-chan interface{}) {
- if debug {
- pr, pw := io.Pipe()
- go tee(pr, w, "C: ")
- w = pw
- }
- defer tryClose(w, ch)
-
- for obj := range ch {
- err := xml.Marshal(w, obj)
- if err != nil {
- log.Printf("write: %v", err)
- break
- }
- }
-}
-
-func writeText(w io.Writer, ch <-chan *string) {
- if debug {
- pr, pw := io.Pipe()
- go tee(pr, w, "C: ")
- w = pw
- }
- defer tryClose(w, ch)
-
- for str := range ch {
- _, err := w.Write([]byte(*str))
- if err != nil {
- log.Printf("writeStr: %v", err)
- break
- }
- }
-}
-
-func readStream(srvIn <-chan interface{}, cliOut chan<- interface{}) {
- defer tryClose(srvIn, cliOut)
-
- for x := range srvIn {
- cliOut <- x
- }
-}
-
-func writeStream(srvOut chan<- interface{}, cliIn <-chan interface{}) {
- defer tryClose(srvOut, cliIn)
-
- for x := range cliIn {
- srvOut <- x
- }
-}
-
func tee(r io.Reader, w io.Writer, prefix string) {
defer tryClose(r, w)