xmpp.go
changeset 10 f38b0ee7b1c1
parent 9 4fe926b03827
child 11 48be1ae93fd4
equal deleted inserted replaced
9:4fe926b03827 10:f38b0ee7b1c1
     8 
     8 
     9 import (
     9 import (
    10 	"bytes"
    10 	"bytes"
    11 	"fmt"
    11 	"fmt"
    12 	"io"
    12 	"io"
    13 	"log"
       
    14 	"net"
    13 	"net"
    15 	"os"
    14 	"os"
    16 	"xml"
    15 	"sync"
    17 )
    16 )
    18 
    17 
    19 const (
    18 const (
       
    19 	// Version of RFC 3920 that we implement.
       
    20 	Version = "1.0"
       
    21 
       
    22 	// Various XML namespaces.
       
    23 	nsStreams = "urn:ietf:params:xml:ns:xmpp-streams"
       
    24 	nsStream = "http://etherx.jabber.org/streams"
       
    25 	nsTLS = "urn:ietf:params:xml:ns:xmpp-tls"
       
    26 
       
    27 	// DNS SRV names
    20 	serverSrv = "xmpp-server"
    28 	serverSrv = "xmpp-server"
    21 	clientSrv = "xmpp-client"
    29 	clientSrv = "xmpp-client"
    22 	debug = false
    30 
       
    31 	debug = true
    23 )
    32 )
    24 
    33 
    25 // The client in a client-server XMPP connection.
    34 // The client in a client-server XMPP connection.
    26 type Client struct {
    35 type Client struct {
       
    36 	Jid JID
       
    37 	socket net.Conn
       
    38 	socketSync sync.WaitGroup
    27 	In <-chan interface{}
    39 	In <-chan interface{}
    28 	Out chan<- interface{}
    40 	Out chan<- interface{}
       
    41 	xmlOut chan<- interface{}
    29 	TextOut chan<- *string
    42 	TextOut chan<- *string
    30 }
    43 }
    31 var _ io.Closer = &Client{}
    44 var _ io.Closer = &Client{}
    32 
    45 
    33 // Connect to the appropriate server and authenticate as the given JID
    46 // Connect to the appropriate server and authenticate as the given JID
    58 	}
    71 	}
    59 	if tcp == nil {
    72 	if tcp == nil {
    60 		return nil, err
    73 		return nil, err
    61 	}
    74 	}
    62 
    75 
       
    76 	cl := new(Client)
       
    77 	cl.Jid = *jid
       
    78 	cl.socket = tcp
       
    79 
    63 	// Start the transport handler, initially unencrypted.
    80 	// Start the transport handler, initially unencrypted.
    64 	tlsr, tlsw := startTransport(tcp)
    81 	tlsr, tlsw := cl.startTransport()
    65 
    82 
    66 	// Start the reader and writers that convert to and from XML.
    83 	// Start the reader and writers that convert to and from XML.
    67 	xmlIn := startXmlReader(tlsr)
    84 	xmlIn := startXmlReader(tlsr)
    68 	xmlOut := startXmlWriter(tlsw)
    85 	cl.xmlOut = startXmlWriter(tlsw)
    69 	textOut := startTextWriter(tlsw)
    86 	textOut := startTextWriter(tlsw)
    70 
    87 
    71 	// Start the XMPP stream handler which filters stream-level
    88 	// Start the XMPP stream handler which filters stream-level
    72 	// events and responds to them.
    89 	// events and responds to them.
    73 	clIn := startStreamReader(xmlIn)
    90 	clIn := cl.startStreamReader(xmlIn, cl.xmlOut)
    74 	clOut := startStreamWriter(xmlOut)
    91 	clOut := startStreamWriter(cl.xmlOut)
    75 
    92 
    76 	// Initial handshake.
    93 	// Initial handshake.
    77 	hsOut := &Stream{To: jid.Domain, Version: Version}
    94 	hsOut := &Stream{To: jid.Domain, Version: Version}
    78 	xmlOut <- hsOut
    95 	cl.xmlOut <- hsOut
    79 
    96 
    80 	// TODO Wait for initialization to finish.
    97 	// TODO Wait for initialization to finish.
    81 
    98 
    82 	// Make the Client and init its fields.
       
    83 	cl := new(Client)
       
    84 	cl.In = clIn
    99 	cl.In = clIn
    85 	cl.Out = clOut
   100 	cl.Out = clOut
    86 	cl.TextOut = textOut
   101 	cl.TextOut = textOut
    87 
   102 
    88 	return cl, nil
   103 	return cl, nil
    91 func (c *Client) Close() os.Error {
   106 func (c *Client) Close() os.Error {
    92 	tryClose(c.In, c.Out, c.TextOut)
   107 	tryClose(c.In, c.Out, c.TextOut)
    93 	return nil
   108 	return nil
    94 }
   109 }
    95 
   110 
    96 func startTransport(tcp io.ReadWriter) (io.Reader, io.Writer) {
   111 func (cl *Client) startTransport() (io.Reader, io.Writer) {
    97 	f := func(r io.Reader, w io.Writer, dir string) {
       
    98 		defer tryClose(r, w)
       
    99 		p := make([]byte, 1024)
       
   100 		for {
       
   101 			nr, err := r.Read(p)
       
   102 			if nr == 0 {
       
   103 				log.Printf("%s: %s", dir, err.String())
       
   104 				break
       
   105 			}
       
   106 			nw, err := w.Write(p[:nr])
       
   107 			if nw < nr {
       
   108 				log.Println("%s: %s", dir, err.String())
       
   109 				break
       
   110 			}
       
   111 		}
       
   112 	}
       
   113 	inr, inw := io.Pipe()
   112 	inr, inw := io.Pipe()
   114 	outr, outw := io.Pipe()
   113 	outr, outw := io.Pipe()
   115 	go f(tcp, inw, "read")
   114 	go cl.readTransport(inw)
   116 	go f(outr, tcp, "write")
   115 	go cl.writeTransport(outr)
   117 	return inr, outw
   116 	return inr, outw
   118 }
   117 }
   119 
   118 
   120 func startXmlReader(r io.Reader) <-chan interface{} {
   119 func startXmlReader(r io.Reader) <-chan interface{} {
   121 	ch := make(chan interface{})
   120 	ch := make(chan interface{})
   133 	ch := make(chan *string)
   132 	ch := make(chan *string)
   134 	go writeText(w, ch)
   133 	go writeText(w, ch)
   135 	return ch
   134 	return ch
   136 }
   135 }
   137 
   136 
   138 func startStreamReader(xmlIn <-chan interface{}) <-chan interface{} {
   137 func (cl *Client) startStreamReader(xmlIn <-chan interface{}, srvOut chan<- interface{}) <-chan interface{} {
   139 	ch := make(chan interface{})
   138 	ch := make(chan interface{})
   140 	go readStream(xmlIn, ch)
   139 	go cl.readStream(xmlIn, srvOut, ch)
   141 	return ch
   140 	return ch
   142 }
   141 }
   143 
   142 
   144 func startStreamWriter(xmlOut chan<- interface{}) chan<- interface{} {
   143 func startStreamWriter(xmlOut chan<- interface{}) chan<- interface{} {
   145 	ch := make(chan interface{})
   144 	ch := make(chan interface{})
   146 	go writeStream(xmlOut, ch)
   145 	go writeStream(xmlOut, ch)
   147 	return ch
   146 	return ch
   148 }
       
   149 
       
   150 func readXml(r io.Reader, ch chan<- interface{}) {
       
   151 	if debug {
       
   152 		pr, pw := io.Pipe()
       
   153 		go tee(r, pw, "S: ")
       
   154 		r = pr
       
   155 	}
       
   156 	defer tryClose(r, ch)
       
   157 
       
   158 	p := xml.NewParser(r)
       
   159 	for {
       
   160 		// Sniff the next token on the stream.
       
   161 		t, err := p.Token()
       
   162 		if t == nil {
       
   163 			if err != os.EOF {
       
   164 				log.Printf("read: %v", err)
       
   165 			}
       
   166 			break
       
   167 		}
       
   168 		var se xml.StartElement
       
   169 		var ok bool
       
   170 		if se, ok = t.(xml.StartElement) ; !ok {
       
   171 			continue
       
   172 		}
       
   173 
       
   174 		// Allocate the appropriate structure for this token.
       
   175 		var obj interface{}
       
   176 		switch se.Name.Space + " " + se.Name.Local {
       
   177 		case nsStream + " stream":
       
   178 			st, err := parseStream(se)
       
   179 			if err != nil {
       
   180 				log.Printf("unmarshal stream: %v",
       
   181 					err)
       
   182 				break
       
   183 			}
       
   184 			ch <- st
       
   185 			continue
       
   186 		case "stream error", nsStream + " error":
       
   187 			obj = &StreamError{}
       
   188 		case nsStream + " features":
       
   189 			obj = &Features{}
       
   190 		default:
       
   191 			obj = &Unrecognized{}
       
   192 			log.Printf("Ignoring unrecognized: %s %s\n",
       
   193 				se.Name.Space, se.Name.Local)
       
   194 		}
       
   195 
       
   196 		// Read the complete XML stanza.
       
   197 		err = p.Unmarshal(obj, &se)
       
   198 		if err != nil {
       
   199 			log.Printf("unmarshal: %v", err)
       
   200 			break
       
   201 		}
       
   202 
       
   203 		// Put it on the channel.
       
   204 		ch <- obj
       
   205 	}
       
   206 }
       
   207 
       
   208 func writeXml(w io.Writer, ch <-chan interface{}) {
       
   209 	if debug {
       
   210 		pr, pw := io.Pipe()
       
   211 		go tee(pr, w, "C: ")
       
   212 		w = pw
       
   213 	}
       
   214 	defer tryClose(w, ch)
       
   215 
       
   216 	for obj := range ch {
       
   217 		err := xml.Marshal(w, obj)
       
   218 		if err != nil {
       
   219 			log.Printf("write: %v", err)
       
   220 			break
       
   221 		}
       
   222 	}
       
   223 }
       
   224 
       
   225 func writeText(w io.Writer, ch <-chan *string) {
       
   226 	if debug {
       
   227 		pr, pw := io.Pipe()
       
   228 		go tee(pr, w, "C: ")
       
   229 		w = pw
       
   230 	}
       
   231 	defer tryClose(w, ch)
       
   232 
       
   233 	for str := range ch {
       
   234 		_, err := w.Write([]byte(*str))
       
   235 		if err != nil {
       
   236 			log.Printf("writeStr: %v", err)
       
   237 			break
       
   238 		}
       
   239 	}
       
   240 }
       
   241 
       
   242 func readStream(srvIn <-chan interface{}, cliOut chan<- interface{}) {
       
   243 	defer tryClose(srvIn, cliOut)
       
   244 
       
   245 	for x := range srvIn {
       
   246 		cliOut <- x
       
   247 	}
       
   248 }
       
   249 
       
   250 func writeStream(srvOut chan<- interface{}, cliIn <-chan interface{}) {
       
   251 	defer tryClose(srvOut, cliIn)
       
   252 
       
   253 	for x := range cliIn {
       
   254 		srvOut <- x
       
   255 	}
       
   256 }
   147 }
   257 
   148 
   258 func tee(r io.Reader, w io.Writer, prefix string) {
   149 func tee(r io.Reader, w io.Writer, prefix string) {
   259 	defer tryClose(r, w)
   150 	defer tryClose(r, w)
   260 
   151