xmpp.go
changeset 9 4fe926b03827
parent 8 30a7752cf8f7
child 10 f38b0ee7b1c1
equal deleted inserted replaced
8:30a7752cf8f7 9:4fe926b03827
    17 )
    17 )
    18 
    18 
    19 const (
    19 const (
    20 	serverSrv = "xmpp-server"
    20 	serverSrv = "xmpp-server"
    21 	clientSrv = "xmpp-client"
    21 	clientSrv = "xmpp-client"
    22 	debug = true
    22 	debug = false
    23 )
    23 )
    24 
    24 
    25 // The client in a client-server XMPP connection.
    25 // The client in a client-server XMPP connection.
    26 type Client struct {
    26 type Client struct {
    27 	In <-chan interface{}
    27 	In <-chan interface{}
    28 	in chan interface{}
       
    29 	Out chan<- interface{}
    28 	Out chan<- interface{}
    30 	out chan interface{}
    29 	TextOut chan<- *string
    31 	tcp *net.TCPConn
       
    32 }
    30 }
    33 var _ io.Closer = &Client{}
    31 var _ io.Closer = &Client{}
    34 
    32 
    35 // Connect to the appropriate server and authenticate as the given JID
    33 // Connect to the appropriate server and authenticate as the given JID
    36 // with the given password.
    34 // with the given password.
    40 	if err != nil {
    38 	if err != nil {
    41 		return nil, os.NewError("LookupSrv " + jid.Domain +
    39 		return nil, os.NewError("LookupSrv " + jid.Domain +
    42 			": " + err.String())
    40 			": " + err.String())
    43 	}
    41 	}
    44 
    42 
    45 	var c *net.TCPConn
    43 	var tcp *net.TCPConn
    46 	for _, srv := range srvs {
    44 	for _, srv := range srvs {
    47 		addrStr := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
    45 		addrStr := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
    48 		addr, err := net.ResolveTCPAddr("tcp", addrStr)
    46 		addr, err := net.ResolveTCPAddr("tcp", addrStr)
    49 		if err != nil {
    47 		if err != nil {
    50 			err = os.NewError(fmt.Sprintf("ResolveTCPAddr(%s): %s",
    48 			err = os.NewError(fmt.Sprintf("ResolveTCPAddr(%s): %s",
    51 				addrStr, err.String()))
    49 				addrStr, err.String()))
    52 			continue
    50 			continue
    53 		}
    51 		}
    54 		c, err = net.DialTCP("tcp", nil, addr)
    52 		tcp, err = net.DialTCP("tcp", nil, addr)
    55 		if err != nil {
    53 		if err != nil {
    56 			err = os.NewError(fmt.Sprintf("DialTCP(%s): %s",
    54 			err = os.NewError(fmt.Sprintf("DialTCP(%s): %s",
    57 				addr, err.String()))
    55 				addr, err.String()))
    58 			continue
    56 			continue
    59 		}
    57 		}
    60 	}
    58 	}
    61 	if c == nil {
    59 	if tcp == nil {
    62 		return nil, err
    60 		return nil, err
    63 	}
    61 	}
    64 
    62 
    65 	cl := new(Client)
    63 	// Start the transport handler, initially unencrypted.
    66 	cl.tcp = c
    64 	tlsr, tlsw := startTransport(tcp)
    67 	cl.in = make(chan interface{})
    65 
    68 	cl.In = cl.in
    66 	// Start the reader and writers that convert to and from XML.
    69 	cl.out = make(chan interface{})
    67 	xmlIn := startXmlReader(tlsr)
    70 	cl.Out = cl.out
    68 	xmlOut := startXmlWriter(tlsw)
    71 	// TODO Send readXml a reader that we can close when we
    69 	textOut := startTextWriter(tlsw)
    72 	// negotiate TLS.
    70 
    73 	go readXml(cl.tcp, cl.in, debug)
    71 	// Start the XMPP stream handler which filters stream-level
    74 	go writeXml(cl.tcp, cl.out, debug)
    72 	// events and responds to them.
       
    73 	clIn := startStreamReader(xmlIn)
       
    74 	clOut := startStreamWriter(xmlOut)
    75 
    75 
    76 	// Initial handshake.
    76 	// Initial handshake.
    77 	hsOut := &Stream{To: jid.Domain, Version: Version}
    77 	hsOut := &Stream{To: jid.Domain, Version: Version}
    78 	cl.Out <- hsOut
    78 	xmlOut <- hsOut
       
    79 
       
    80 	// TODO Wait for initialization to finish.
       
    81 
       
    82 	// Make the Client and init its fields.
       
    83 	cl := new(Client)
       
    84 	cl.In = clIn
       
    85 	cl.Out = clOut
       
    86 	cl.TextOut = textOut
    79 
    87 
    80 	return cl, nil
    88 	return cl, nil
    81 }
    89 }
    82 
    90 
    83 func (c *Client) Close() os.Error {
    91 func (c *Client) Close() os.Error {
    84 	close(c.in)
    92 	tryClose(c.In, c.Out, c.TextOut)
    85 	close(c.out)
    93 	return nil
    86 	return c.tcp.Close()
    94 }
    87 }
    95 
    88 
    96 func startTransport(tcp io.ReadWriter) (io.Reader, io.Writer) {
    89 // TODO Delete; for use only by interact.go:
    97 	f := func(r io.Reader, w io.Writer, dir string) {
    90 func ReadXml(r io.ReadCloser, ch chan<- interface{}, dbg bool) {
    98 		defer tryClose(r, w)
    91 	readXml(r, ch, dbg)
    99 		p := make([]byte, 1024)
    92 }
   100 		for {
    93 
   101 			nr, err := r.Read(p)
    94 func readXml(r io.Reader, ch chan<- interface{}, dbg bool) {
   102 			if nr == 0 {
    95 	defer close(ch)
   103 				log.Printf("%s: %s", dir, err.String())
    96 	if dbg {
   104 				break
       
   105 			}
       
   106 			nw, err := w.Write(p[:nr])
       
   107 			if nw < nr {
       
   108 				log.Println("%s: %s", dir, err.String())
       
   109 				break
       
   110 			}
       
   111 		}
       
   112 	}
       
   113 	inr, inw := io.Pipe()
       
   114 	outr, outw := io.Pipe()
       
   115 	go f(tcp, inw, "read")
       
   116 	go f(outr, tcp, "write")
       
   117 	return inr, outw
       
   118 }
       
   119 
       
   120 func startXmlReader(r io.Reader) <-chan interface{} {
       
   121 	ch := make(chan interface{})
       
   122 	go readXml(r, ch)
       
   123 	return ch
       
   124 }
       
   125 
       
   126 func startXmlWriter(w io.Writer) chan<- interface{} {
       
   127 	ch := make(chan interface{})
       
   128 	go writeXml(w, ch)
       
   129 	return ch
       
   130 }
       
   131 
       
   132 func startTextWriter(w io.Writer) chan<- *string {
       
   133 	ch := make(chan *string)
       
   134 	go writeText(w, ch)
       
   135 	return ch
       
   136 }
       
   137 
       
   138 func startStreamReader(xmlIn <-chan interface{}) <-chan interface{} {
       
   139 	ch := make(chan interface{})
       
   140 	go readStream(xmlIn, ch)
       
   141 	return ch
       
   142 }
       
   143 
       
   144 func startStreamWriter(xmlOut chan<- interface{}) chan<- interface{} {
       
   145 	ch := make(chan interface{})
       
   146 	go writeStream(xmlOut, ch)
       
   147 	return ch
       
   148 }
       
   149 
       
   150 func readXml(r io.Reader, ch chan<- interface{}) {
       
   151 	if debug {
    97 		pr, pw := io.Pipe()
   152 		pr, pw := io.Pipe()
    98 		go tee(r, pw, "S: ")
   153 		go tee(r, pw, "S: ")
    99 		r = pr
   154 		r = pr
   100 	}
   155 	}
       
   156 	defer tryClose(r, ch)
   101 
   157 
   102 	p := xml.NewParser(r)
   158 	p := xml.NewParser(r)
   103 	for {
   159 	for {
   104 		// Sniff the next token on the stream.
   160 		// Sniff the next token on the stream.
   105 		t, err := p.Token()
   161 		t, err := p.Token()
   125 					err)
   181 					err)
   126 				break
   182 				break
   127 			}
   183 			}
   128 			ch <- st
   184 			ch <- st
   129 			continue
   185 			continue
   130 		case "stream error":
   186 		case "stream error", nsStream + " error":
   131 			obj = &StreamError{}
   187 			obj = &StreamError{}
   132 		case nsStream + " features":
   188 		case nsStream + " features":
   133 			obj = &Features{}
   189 			obj = &Features{}
   134 		default:
   190 		default:
   135 			obj = &Unrecognized{}
   191 			obj = &Unrecognized{}
   147 		// Put it on the channel.
   203 		// Put it on the channel.
   148 		ch <- obj
   204 		ch <- obj
   149 	}
   205 	}
   150 }
   206 }
   151 
   207 
   152 func writeXml(w io.Writer, ch <-chan interface{}, dbg bool) {
   208 func writeXml(w io.Writer, ch <-chan interface{}) {
   153 	if dbg {
   209 	if debug {
   154 		pr, pw := io.Pipe()
   210 		pr, pw := io.Pipe()
   155 		go tee(pr, w, "C: ")
   211 		go tee(pr, w, "C: ")
   156 		w = pw
   212 		w = pw
   157 	}
   213 	}
       
   214 	defer tryClose(w, ch)
   158 
   215 
   159 	for obj := range ch {
   216 	for obj := range ch {
   160 		err := xml.Marshal(w, obj)
   217 		err := xml.Marshal(w, obj)
   161 		if err != nil {
   218 		if err != nil {
   162 			log.Printf("write: %v", err)
   219 			log.Printf("write: %v", err)
   163 			break
   220 			break
   164 		}
   221 		}
   165 	}
   222 	}
   166 }
   223 }
   167 
   224 
       
   225 func writeText(w io.Writer, ch <-chan *string) {
       
   226 	if debug {
       
   227 		pr, pw := io.Pipe()
       
   228 		go tee(pr, w, "C: ")
       
   229 		w = pw
       
   230 	}
       
   231 	defer tryClose(w, ch)
       
   232 
       
   233 	for str := range ch {
       
   234 		_, err := w.Write([]byte(*str))
       
   235 		if err != nil {
       
   236 			log.Printf("writeStr: %v", err)
       
   237 			break
       
   238 		}
       
   239 	}
       
   240 }
       
   241 
       
   242 func readStream(srvIn <-chan interface{}, cliOut chan<- interface{}) {
       
   243 	defer tryClose(srvIn, cliOut)
       
   244 
       
   245 	for x := range srvIn {
       
   246 		cliOut <- x
       
   247 	}
       
   248 }
       
   249 
       
   250 func writeStream(srvOut chan<- interface{}, cliIn <-chan interface{}) {
       
   251 	defer tryClose(srvOut, cliIn)
       
   252 
       
   253 	for x := range cliIn {
       
   254 		srvOut <- x
       
   255 	}
       
   256 }
       
   257 
   168 func tee(r io.Reader, w io.Writer, prefix string) {
   258 func tee(r io.Reader, w io.Writer, prefix string) {
   169 	defer func(xs ...interface{}) {
   259 	defer tryClose(r, w)
   170 		for _, x := range xs {
       
   171 			if c, ok := x.(io.Closer) ; ok {
       
   172 				c.Close()
       
   173 			}
       
   174 		}
       
   175 	}(r, w)
       
   176 
   260 
   177 	buf := bytes.NewBuffer(nil)
   261 	buf := bytes.NewBuffer(nil)
   178 	for {
   262 	for {
   179 		var c [1]byte
   263 		var c [1]byte
   180 		n, _ := r.Read(c[:])
   264 		n, _ := r.Read(c[:])
   181 		if n == 0 {
   265 		if n == 0 {
   182 			break
   266 			break
   183 		}
   267 		}
   184 		n, _ = w.Write(c[:])
   268 		n, _ = w.Write(c[:n])
   185 		if n == 0 {
   269 		if n == 0 {
   186 			break
   270 			break
   187 		}
   271 		}
   188 		buf.Write(c[:])
   272 		buf.Write(c[:n])
   189 		if c[0] == '\n' {
   273 		if c[0] == '\n' || c[0] == '>' {
   190 			fmt.Printf("%s%s", prefix, buf.String())
   274 			fmt.Printf("%s%s\n", prefix, buf.String())
   191 			buf.Reset()
   275 			buf.Reset()
   192 		}
   276 		}
   193 	}
   277 	}
   194 	leftover := buf.String()
   278 	leftover := buf.String()
   195 	if leftover != "" {
   279 	if leftover != "" {
   196 		fmt.Printf("%s%s\n", prefix, leftover)
   280 		fmt.Printf("%s%s\n", prefix, leftover)
   197 	}
   281 	}
   198 }
   282 }
       
   283 
       
   284 func tryClose(xs ...interface{}) {
       
   285 	f1 := func(ch chan<- interface{}) {
       
   286 		defer func() {
       
   287 			recover()
       
   288 		}()
       
   289 		close(ch)
       
   290 	}
       
   291 	f2 := func(ch <-chan interface{}) {
       
   292 		defer func() {
       
   293 			recover()
       
   294 		}()
       
   295 		close(ch)
       
   296 	}
       
   297 
       
   298 	for _, x := range xs {
       
   299 		if c, ok := x.(io.Closer) ; ok {
       
   300 			c.Close()
       
   301 		} else if ch, ok := x.(chan<- interface{}) ; ok {
       
   302 			f1(ch)
       
   303 		} else if ch, ok := x.(<-chan interface{}) ; ok {
       
   304 			f2(ch)
       
   305 		}
       
   306 	}
       
   307 }