xmpp.go
changeset 6 8e425e340ca1
parent 5 faef59c8db05
child 8 30a7752cf8f7
equal deleted inserted replaced
5:faef59c8db05 6:8e425e340ca1
     5 // This package implements a simple XMPP client according to RFCs 3920
     5 // This package implements a simple XMPP client according to RFCs 3920
     6 // and 3921, plus the various XEPs at http://xmpp.org/protocols/.
     6 // and 3921, plus the various XEPs at http://xmpp.org/protocols/.
     7 package xmpp
     7 package xmpp
     8 
     8 
     9 import (
     9 import (
       
    10 	"bytes"
    10 	"fmt"
    11 	"fmt"
    11 	"io"
    12 	"io"
    12 	"log"
    13 	"log"
    13 	"net"
    14 	"net"
    14 	"os"
    15 	"os"
    16 )
    17 )
    17 
    18 
    18 const (
    19 const (
    19 	serverSrv = "xmpp-server"
    20 	serverSrv = "xmpp-server"
    20 	clientSrv = "xmpp-client"
    21 	clientSrv = "xmpp-client"
       
    22 	debug = true
    21 )
    23 )
    22 
    24 
    23 // The client in a client-server XMPP connection.
    25 // The client in a client-server XMPP connection.
    24 type Client struct {
    26 type Client struct {
    25 	In <-chan interface{}
    27 	In <-chan interface{}
    58 	}
    60 	}
    59 	if c == nil {
    61 	if c == nil {
    60 		return nil, err
    62 		return nil, err
    61 	}
    63 	}
    62 
    64 
    63 	cl := Client{}
    65 	cl := new(Client)
    64 	cl.tcp = c
    66 	cl.tcp = c
    65 	cl.in = make(chan interface{})
    67 	cl.in = make(chan interface{})
    66 	cl.In = cl.in
    68 	cl.In = cl.in
       
    69 	cl.out = make(chan interface{})
       
    70 	cl.Out = cl.out
    67 	// TODO Send readXml a reader that we can close when we
    71 	// TODO Send readXml a reader that we can close when we
    68 	// negotiate TLS.
    72 	// negotiate TLS.
    69 	go readXml(cl.tcp, cl.in)
    73 	go readXml(cl.tcp, cl.in, debug)
    70 	// TODO go writeXml(&cl)
    74 	go writeXml(cl.tcp, cl.out, debug)
    71 
    75 
    72 	return &cl, nil
    76 	return cl, nil
    73 }
    77 }
    74 
    78 
    75 func (c *Client) Close() os.Error {
    79 func (c *Client) Close() os.Error {
       
    80 	close(c.in)
       
    81 	close(c.out)
    76 	return c.tcp.Close()
    82 	return c.tcp.Close()
    77 }
    83 }
    78 
    84 
    79 func readXml(r io.Reader, ch chan<- interface{}) {
    85 // TODO Delete; for use only by interact.go:
       
    86 func ReadXml(r io.ReadCloser, ch chan<- interface{}, dbg bool) {
       
    87 	readXml(r, ch, dbg)
       
    88 }
       
    89 
       
    90 func readXml(r io.Reader, ch chan<- interface{}, dbg bool) {
       
    91 	defer close(ch)
       
    92 	if dbg {
       
    93 		pr, pw := io.Pipe()
       
    94 		go tee(r, pw, "S: ")
       
    95 		r = pr
       
    96 	}
       
    97 
    80 	p := xml.NewParser(r)
    98 	p := xml.NewParser(r)
    81 	for {
    99 	for {
    82 		// Sniff the next token on the stream.
   100 		// Sniff the next token on the stream.
    83 		t, err := p.Token()
   101 		t, err := p.Token()
    84 		if t == nil {
   102 		if t == nil {
    93 			continue
   111 			continue
    94 		}
   112 		}
    95 
   113 
    96 		// Allocate the appropriate structure for this token.
   114 		// Allocate the appropriate structure for this token.
    97 		var obj interface{}
   115 		var obj interface{}
    98 		switch se.Name.Space + se.Name.Local {
   116 		switch se.Name.Space + " " + se.Name.Local {
    99 		case "stream stream":
   117 		case nsStream + " stream":
   100 			st, err := parseStream(se)
   118 			st, err := parseStream(se)
   101 			if err != nil {
   119 			if err != nil {
   102 				log.Printf("unmarshal stream: %v",
   120 				log.Printf("unmarshal stream: %v",
   103 					err)
   121 					err)
   104 				break
   122 				break
   105 			}
   123 			}
   106 			ch <- st
   124 			ch <- st
   107 			continue
   125 			continue
   108 		case nsStreams + " stream:error":
   126 		case "stream error":
   109 			obj = &StreamError{}
   127 			obj = &StreamError{}
   110 		default:
   128 		default:
   111 			obj = &Unrecognized{}
   129 			obj = &Unrecognized{}
   112 		}
   130 		}
   113 
   131 
   120 
   138 
   121 		// Put it on the channel.
   139 		// Put it on the channel.
   122 		ch <- obj
   140 		ch <- obj
   123 	}
   141 	}
   124 }
   142 }
       
   143 
       
   144 func writeXml(w io.Writer, ch <-chan interface{}, dbg bool) {
       
   145 	if dbg {
       
   146 		pr, pw := io.Pipe()
       
   147 		go tee(pr, w, "C: ")
       
   148 		w = pw
       
   149 	}
       
   150 
       
   151 	for obj := range ch {
       
   152 		err := xml.Marshal(w, obj)
       
   153 		if err != nil {
       
   154 			log.Printf("write: %v", err)
       
   155 			break
       
   156 		}
       
   157 	}
       
   158 }
       
   159 
       
   160 func tee(r io.Reader, w io.Writer, prefix string) {
       
   161 	defer func(xs ...interface{}) {
       
   162 		for _, x := range xs {
       
   163 			if c, ok := x.(io.Closer) ; ok {
       
   164 				c.Close()
       
   165 			}
       
   166 		}
       
   167 	}(r, w)
       
   168 
       
   169 	buf := bytes.NewBuffer(nil)
       
   170 	for {
       
   171 		var c [1]byte
       
   172 		n, _ := r.Read(c[:])
       
   173 		if n == 0 {
       
   174 			break
       
   175 		}
       
   176 		n, _ = w.Write(c[:])
       
   177 		if n == 0 {
       
   178 			break
       
   179 		}
       
   180 		buf.Write(c[:])
       
   181 		if c[0] == '\n' {
       
   182 			fmt.Printf("%s%s", prefix, buf.String())
       
   183 			buf.Reset()
       
   184 		}
       
   185 	}
       
   186 	leftover := buf.String()
       
   187 	if leftover != "" {
       
   188 		fmt.Printf("%s%s\n", prefix, leftover)
       
   189 	}
       
   190 }