--- a/xmpp.go Mon Dec 26 11:48:35 2011 -0700
+++ b/xmpp.go Mon Dec 26 14:36:41 2011 -0700
@@ -19,16 +19,14 @@
const (
serverSrv = "xmpp-server"
clientSrv = "xmpp-client"
- debug = true
+ debug = false
)
// The client in a client-server XMPP connection.
type Client struct {
In <-chan interface{}
- in chan interface{}
Out chan<- interface{}
- out chan interface{}
- tcp *net.TCPConn
+ TextOut chan<- *string
}
var _ io.Closer = &Client{}
@@ -42,7 +40,7 @@
": " + err.String())
}
- var c *net.TCPConn
+ var tcp *net.TCPConn
for _, srv := range srvs {
addrStr := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
addr, err := net.ResolveTCPAddr("tcp", addrStr)
@@ -51,53 +49,111 @@
addrStr, err.String()))
continue
}
- c, err = net.DialTCP("tcp", nil, addr)
+ tcp, err = net.DialTCP("tcp", nil, addr)
if err != nil {
err = os.NewError(fmt.Sprintf("DialTCP(%s): %s",
addr, err.String()))
continue
}
}
- if c == nil {
+ if tcp == nil {
return nil, err
}
- cl := new(Client)
- cl.tcp = c
- cl.in = make(chan interface{})
- cl.In = cl.in
- cl.out = make(chan interface{})
- cl.Out = cl.out
- // TODO Send readXml a reader that we can close when we
- // negotiate TLS.
- go readXml(cl.tcp, cl.in, debug)
- go writeXml(cl.tcp, cl.out, debug)
+ // Start the transport handler, initially unencrypted.
+ tlsr, tlsw := startTransport(tcp)
+
+ // Start the reader and writers that convert to and from XML.
+ xmlIn := startXmlReader(tlsr)
+ xmlOut := startXmlWriter(tlsw)
+ textOut := startTextWriter(tlsw)
+
+ // Start the XMPP stream handler which filters stream-level
+ // events and responds to them.
+ clIn := startStreamReader(xmlIn)
+ clOut := startStreamWriter(xmlOut)
// Initial handshake.
hsOut := &Stream{To: jid.Domain, Version: Version}
- cl.Out <- hsOut
+ xmlOut <- hsOut
+
+ // TODO Wait for initialization to finish.
+
+ // Make the Client and init its fields.
+ cl := new(Client)
+ cl.In = clIn
+ cl.Out = clOut
+ cl.TextOut = textOut
return cl, nil
}
func (c *Client) Close() os.Error {
- close(c.in)
- close(c.out)
- return c.tcp.Close()
+ tryClose(c.In, c.Out, c.TextOut)
+ return nil
+}
+
+func startTransport(tcp io.ReadWriter) (io.Reader, io.Writer) {
+ f := func(r io.Reader, w io.Writer, dir string) {
+ defer tryClose(r, w)
+ p := make([]byte, 1024)
+ for {
+ nr, err := r.Read(p)
+ if nr == 0 {
+ log.Printf("%s: %s", dir, err.String())
+ break
+ }
+ nw, err := w.Write(p[:nr])
+ if nw < nr {
+ log.Println("%s: %s", dir, err.String())
+ break
+ }
+ }
+ }
+ inr, inw := io.Pipe()
+ outr, outw := io.Pipe()
+ go f(tcp, inw, "read")
+ go f(outr, tcp, "write")
+ return inr, outw
}
-// TODO Delete; for use only by interact.go:
-func ReadXml(r io.ReadCloser, ch chan<- interface{}, dbg bool) {
- readXml(r, ch, dbg)
+func startXmlReader(r io.Reader) <-chan interface{} {
+ ch := make(chan interface{})
+ go readXml(r, ch)
+ return ch
+}
+
+func startXmlWriter(w io.Writer) chan<- interface{} {
+ ch := make(chan interface{})
+ go writeXml(w, ch)
+ return ch
}
-func readXml(r io.Reader, ch chan<- interface{}, dbg bool) {
- defer close(ch)
- if dbg {
+func startTextWriter(w io.Writer) chan<- *string {
+ ch := make(chan *string)
+ go writeText(w, ch)
+ return ch
+}
+
+func startStreamReader(xmlIn <-chan interface{}) <-chan interface{} {
+ ch := make(chan interface{})
+ go readStream(xmlIn, ch)
+ return ch
+}
+
+func startStreamWriter(xmlOut chan<- interface{}) chan<- interface{} {
+ ch := make(chan interface{})
+ go writeStream(xmlOut, ch)
+ return ch
+}
+
+func readXml(r io.Reader, ch chan<- interface{}) {
+ if debug {
pr, pw := io.Pipe()
go tee(r, pw, "S: ")
r = pr
}
+ defer tryClose(r, ch)
p := xml.NewParser(r)
for {
@@ -127,7 +183,7 @@
}
ch <- st
continue
- case "stream error":
+ case "stream error", nsStream + " error":
obj = &StreamError{}
case nsStream + " features":
obj = &Features{}
@@ -149,12 +205,13 @@
}
}
-func writeXml(w io.Writer, ch <-chan interface{}, dbg bool) {
- if dbg {
+func writeXml(w io.Writer, ch <-chan interface{}) {
+ if debug {
pr, pw := io.Pipe()
go tee(pr, w, "C: ")
w = pw
}
+ defer tryClose(w, ch)
for obj := range ch {
err := xml.Marshal(w, obj)
@@ -165,14 +222,41 @@
}
}
+func writeText(w io.Writer, ch <-chan *string) {
+ if debug {
+ pr, pw := io.Pipe()
+ go tee(pr, w, "C: ")
+ w = pw
+ }
+ defer tryClose(w, ch)
+
+ for str := range ch {
+ _, err := w.Write([]byte(*str))
+ if err != nil {
+ log.Printf("writeStr: %v", err)
+ break
+ }
+ }
+}
+
+func readStream(srvIn <-chan interface{}, cliOut chan<- interface{}) {
+ defer tryClose(srvIn, cliOut)
+
+ for x := range srvIn {
+ cliOut <- x
+ }
+}
+
+func writeStream(srvOut chan<- interface{}, cliIn <-chan interface{}) {
+ defer tryClose(srvOut, cliIn)
+
+ for x := range cliIn {
+ srvOut <- x
+ }
+}
+
func tee(r io.Reader, w io.Writer, prefix string) {
- defer func(xs ...interface{}) {
- for _, x := range xs {
- if c, ok := x.(io.Closer) ; ok {
- c.Close()
- }
- }
- }(r, w)
+ defer tryClose(r, w)
buf := bytes.NewBuffer(nil)
for {
@@ -181,13 +265,13 @@
if n == 0 {
break
}
- n, _ = w.Write(c[:])
+ n, _ = w.Write(c[:n])
if n == 0 {
break
}
- buf.Write(c[:])
- if c[0] == '\n' {
- fmt.Printf("%s%s", prefix, buf.String())
+ buf.Write(c[:n])
+ if c[0] == '\n' || c[0] == '>' {
+ fmt.Printf("%s%s\n", prefix, buf.String())
buf.Reset()
}
}
@@ -196,3 +280,28 @@
fmt.Printf("%s%s\n", prefix, leftover)
}
}
+
+func tryClose(xs ...interface{}) {
+ f1 := func(ch chan<- interface{}) {
+ defer func() {
+ recover()
+ }()
+ close(ch)
+ }
+ f2 := func(ch <-chan interface{}) {
+ defer func() {
+ recover()
+ }()
+ close(ch)
+ }
+
+ for _, x := range xs {
+ if c, ok := x.(io.Closer) ; ok {
+ c.Close()
+ } else if ch, ok := x.(chan<- interface{}) ; ok {
+ f1(ch)
+ } else if ch, ok := x.(<-chan interface{}) ; ok {
+ f2(ch)
+ }
+ }
+}
--- a/xmpp_test.go Mon Dec 26 11:48:35 2011 -0700
+++ b/xmpp_test.go Mon Dec 26 14:36:41 2011 -0700
@@ -16,7 +16,7 @@
func TestReadError(t *testing.T) {
r := strings.NewReader(`<stream:error><bad-foo/></stream:error>`)
ch := make(chan interface{})
- go readXml(r, ch, false)
+ go readXml(r, ch)
x := <- ch
se, ok := x.(*StreamError)
if !ok {
@@ -32,7 +32,7 @@
`<text xml:lang="en" xmlns="` + nsStreams +
`">Error text</text></stream:error>`)
ch = make(chan interface{})
- go readXml(r, ch, false)
+ go readXml(r, ch)
x = <- ch
se, ok = x.(*StreamError)
if !ok {
@@ -50,7 +50,7 @@
`xmlns="jabber:client" xmlns:stream="` + nsStream +
`" version="1.0">`)
ch := make(chan interface{})
- go readXml(r, ch, false)
+ go readXml(r, ch)
x := <- ch
ss, ok := x.(*Stream)
if !ok {
@@ -69,7 +69,7 @@
wg.Add(1)
go func() {
defer wg.Done()
- writeXml(w, ch, true)
+ writeXml(w, ch)
}()
ch <- obj
close(ch)