Implemented writing to the remote. Now we have bidirectional communication.
authorChris Jones <christian.jones@sri.com>
Sun, 25 Dec 2011 18:46:13 -0700 (2011-12-26)
changeset 6 8e425e340ca1
parent 5 faef59c8db05
child 7 4f0f66f9a441
Implemented writing to the remote. Now we have bidirectional communication.
examples/interact.go
structs.go
structs_test.go
xmpp.go
xmpp_test.go
--- a/examples/interact.go	Sat Dec 24 13:11:36 2011 -0700
+++ b/examples/interact.go	Sun Dec 25 18:46:13 2011 -0700
@@ -7,6 +7,7 @@
 import (
 	"cjyar/xmpp"
 	"flag"
+	"fmt"
 	"log"
 	"os"
 	)
@@ -28,4 +29,19 @@
 		log.Fatalf("NewClient(%v): %v", jid, err)
 	}
 	defer c.Close()
+
+	go func(ch <-chan interface{}) {
+		for obj := range ch {
+			fmt.Printf("s: %v\n", obj)
+		}
+		fmt.Println("done reading")
+	}(c.In)
+
+	ch := make(chan interface{})
+	go xmpp.ReadXml(os.Stdin, ch, false)
+	for x := range ch {
+		fmt.Printf("c: %v", x)
+		c.Out <- x
+	}
+	fmt.Println("done sending")
 }
--- a/structs.go	Sat Dec 24 13:11:36 2011 -0700
+++ b/structs.go	Sun Dec 25 18:46:13 2011 -0700
@@ -38,17 +38,19 @@
 
 // XMPP's <stream:stream> XML element
 type Stream struct {
-	to string `xml:"attr"`
-	from string `xml:"attr"`
-	id string `xml:"attr"`
-	lang string `xml:"attr"`
-	version string `xml:"attr"`
+	To string `xml:"attr"`
+	From string `xml:"attr"`
+	Id string `xml:"attr"`
+	Lang string `xml:"attr"`
+	Version string `xml:"attr"`
 }
 var _ xml.Marshaler = &Stream{}
+var _ fmt.Stringer = &Stream{}
 
+// <stream:error>
 type StreamError struct {
-	cond definedCondition
-	text *errText
+	Any definedCondition
+	Text *errText
 }
 var _ xml.Marshaler = &StreamError{}
 
@@ -58,8 +60,8 @@
 }
 
 type errText struct {
-	Lang string
-	text string `xml:"chardata"`
+	Lang string `xml:"attr"`
+	Text string `xml:"chardata"`
 }
 var _ xml.Marshaler = &errText{}
 
@@ -101,31 +103,37 @@
 func (s *Stream) MarshalXML() ([]byte, os.Error) {
 	buf := bytes.NewBuffer(nil)
 	buf.WriteString("<stream:stream")
-	writeField(buf, "to", s.to)
-	writeField(buf, "from", s.from)
-	writeField(buf, "id", s.id)
-	writeField(buf, "xml:lang", s.lang)
-	writeField(buf, "version", s.version)
+	writeField(buf, "xmlns", "jabber:client")
+	writeField(buf, "xmlns:stream", nsStream)
+	writeField(buf, "to", s.To)
+	writeField(buf, "from", s.From)
+	writeField(buf, "id", s.Id)
+	writeField(buf, "xml:lang", s.Lang)
+	writeField(buf, "version", s.Version)
 	buf.WriteString(">")
 	// We never write </stream:stream>
 	return buf.Bytes(), nil
 }
 
+func (s *Stream) String() string {
+	result, _ := s.MarshalXML()
+	return string(result)
+}
+
 func parseStream(se xml.StartElement) (*Stream, os.Error) {
 	s := &Stream{}
-	se = se.Copy()
 	for _, attr := range se.Attr {
 		switch strings.ToLower(attr.Name.Local) {
 		case "to":
-			s.to = attr.Value
+			s.To = attr.Value
 		case "from":
-			s.from = attr.Value
+			s.From = attr.Value
 		case "id":
-			s.id = attr.Value
+			s.Id = attr.Value
 		case "lang":
-			s.lang = attr.Value
+			s.Lang = attr.Value
 		case "version":
-			s.version = attr.Value
+			s.Version = attr.Value
 		}
 	}
 	return s, nil
@@ -134,9 +142,9 @@
 func (s *StreamError) MarshalXML() ([]byte, os.Error) {
 	buf := bytes.NewBuffer(nil)
 	buf.WriteString("<stream:error>")
-	xml.Marshal(buf, s.cond)
-	if s.text != nil {
-		xml.Marshal(buf, s.text)
+	xml.Marshal(buf, s.Any)
+	if s.Text != nil {
+		xml.Marshal(buf, s.Text)
 	}
 	buf.WriteString("</stream:error>")
 	return buf.Bytes(), nil
@@ -148,7 +156,7 @@
 	writeField(buf, "xmlns", nsStreams)
 	writeField(buf, "xml:lang", e.Lang)
 	buf.WriteString(">")
-	xml.Escape(buf, []byte(e.text))
+	xml.Escape(buf, []byte(e.Text))
 	buf.WriteString("</text>")
 	return buf.Bytes(), nil
 }
--- a/structs_test.go	Sat Dec 24 13:11:36 2011 -0700
+++ b/structs_test.go	Sun Dec 25 18:46:13 2011 -0700
@@ -50,28 +50,32 @@
 }
 
 func TestStreamMarshal(t *testing.T) {
-	s := &Stream{to: "bob"}
-	exp := `<stream:stream to="bob">`
+	s := &Stream{To: "bob"}
+	exp := `<stream:stream xmlns="jabber:client"` +
+		` xmlns:stream="` + nsStream + `" to="bob">`
 	assertMarshal(t, exp, s)
 
-	s = &Stream{to: "bob", from: "alice", id: "#3", version: "5.3"}
-	exp = `<stream:stream to="bob" from="alice" id="#3" version="5.3">`
+	s = &Stream{To: "bob", From: "alice", Id: "#3", Version: "5.3"}
+	exp = `<stream:stream xmlns="jabber:client"` +
+		` xmlns:stream="` + nsStream + `" to="bob" from="alice"` +
+		` id="#3" version="5.3">`
 	assertMarshal(t, exp, s)
 
-	s = &Stream{lang: "en_US"}
-	exp = `<stream:stream xml:lang="en_US">`
+	s = &Stream{Lang: "en_US"}
+	exp = `<stream:stream xmlns="jabber:client"` +
+		` xmlns:stream="` + nsStream + `" xml:lang="en_US">`
 	assertMarshal(t, exp, s)
 }
 
 func TestStreamErrorMarshal(t *testing.T) {
 	name := xml.Name{Space: nsStreams, Local: "ack"}
-	e := &StreamError{cond: definedCondition{name}}
+	e := &StreamError{Any: definedCondition{name}}
 	exp := `<stream:error><ack xmlns="` + nsStreams +
 		`"></ack></stream:error>`;
 	assertMarshal(t, exp, e)
 
-	txt := errText{Lang: "pt", text: "things happen"}
-	e = &StreamError{cond: definedCondition{name}, text: &txt}
+	txt := errText{Lang: "pt", Text: "things happen"}
+	e = &StreamError{Any: definedCondition{name}, Text: &txt}
 	exp = `<stream:error><ack xmlns="` + nsStreams +
 		`"></ack><text xmlns="` + nsStreams +
 		`" xml:lang="pt">things happen</text></stream:error>`
--- a/xmpp.go	Sat Dec 24 13:11:36 2011 -0700
+++ b/xmpp.go	Sun Dec 25 18:46:13 2011 -0700
@@ -7,6 +7,7 @@
 package xmpp
 
 import (
+	"bytes"
 	"fmt"
 	"io"
 	"log"
@@ -18,6 +19,7 @@
 const (
 	serverSrv = "xmpp-server"
 	clientSrv = "xmpp-client"
+	debug = true
 )
 
 // The client in a client-server XMPP connection.
@@ -60,23 +62,39 @@
 		return nil, err
 	}
 
-	cl := Client{}
+	cl := new(Client)
 	cl.tcp = c
 	cl.in = make(chan interface{})
 	cl.In = cl.in
+	cl.out = make(chan interface{})
+	cl.Out = cl.out
 	// TODO Send readXml a reader that we can close when we
 	// negotiate TLS.
-	go readXml(cl.tcp, cl.in)
-	// TODO go writeXml(&cl)
+	go readXml(cl.tcp, cl.in, debug)
+	go writeXml(cl.tcp, cl.out, debug)
 
-	return &cl, nil
+	return cl, nil
 }
 
 func (c *Client) Close() os.Error {
+	close(c.in)
+	close(c.out)
 	return c.tcp.Close()
 }
 
-func readXml(r io.Reader, ch chan<- interface{}) {
+// TODO Delete; for use only by interact.go:
+func ReadXml(r io.ReadCloser, ch chan<- interface{}, dbg bool) {
+	readXml(r, ch, dbg)
+}
+
+func readXml(r io.Reader, ch chan<- interface{}, dbg bool) {
+	defer close(ch)
+	if dbg {
+		pr, pw := io.Pipe()
+		go tee(r, pw, "S: ")
+		r = pr
+	}
+
 	p := xml.NewParser(r)
 	for {
 		// Sniff the next token on the stream.
@@ -95,8 +113,8 @@
 
 		// Allocate the appropriate structure for this token.
 		var obj interface{}
-		switch se.Name.Space + se.Name.Local {
-		case "stream stream":
+		switch se.Name.Space + " " + se.Name.Local {
+		case nsStream + " stream":
 			st, err := parseStream(se)
 			if err != nil {
 				log.Printf("unmarshal stream: %v",
@@ -105,7 +123,7 @@
 			}
 			ch <- st
 			continue
-		case nsStreams + " stream:error":
+		case "stream error":
 			obj = &StreamError{}
 		default:
 			obj = &Unrecognized{}
@@ -122,3 +140,51 @@
 		ch <- obj
 	}
 }
+
+func writeXml(w io.Writer, ch <-chan interface{}, dbg bool) {
+	if dbg {
+		pr, pw := io.Pipe()
+		go tee(pr, w, "C: ")
+		w = pw
+	}
+
+	for obj := range ch {
+		err := xml.Marshal(w, obj)
+		if err != nil {
+			log.Printf("write: %v", err)
+			break
+		}
+	}
+}
+
+func tee(r io.Reader, w io.Writer, prefix string) {
+	defer func(xs ...interface{}) {
+		for _, x := range xs {
+			if c, ok := x.(io.Closer) ; ok {
+				c.Close()
+			}
+		}
+	}(r, w)
+
+	buf := bytes.NewBuffer(nil)
+	for {
+		var c [1]byte
+		n, _ := r.Read(c[:])
+		if n == 0 {
+			break
+		}
+		n, _ = w.Write(c[:])
+		if n == 0 {
+			break
+		}
+		buf.Write(c[:])
+		if c[0] == '\n' {
+			fmt.Printf("%s%s", prefix, buf.String())
+			buf.Reset()
+		}
+	}
+	leftover := buf.String()
+	if leftover != "" {
+		fmt.Printf("%s%s\n", prefix, leftover)
+	}
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/xmpp_test.go	Sun Dec 25 18:46:13 2011 -0700
@@ -0,0 +1,105 @@
+// Copyright 2011 The Go Authors.  All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package xmpp
+
+import (
+	"bytes"
+	"reflect"
+	"strings"
+	"sync"
+	"testing"
+	"xml"
+)
+
+func TestReadError(t *testing.T) {
+	r := strings.NewReader(`<stream:error><bad-foo/></stream:error>`)
+	ch := make(chan interface{})
+	go readXml(r, ch, false)
+	x := <- ch
+	se, ok := x.(*StreamError)
+	if !ok {
+		t.Fatalf("not StreamError: %v", reflect.TypeOf(x))
+	}
+	assertEquals(t, "bad-foo", se.Any.XMLName.Local)
+	assertEquals(t, "", se.Any.XMLName.Space)
+	if se.Text != nil {
+		t.Errorf("text not nil: %v", se.Text)
+	}
+
+	r = strings.NewReader(`<stream:error><bad-foo/>` +
+		`<text xml:lang="en" xmlns="` + nsStreams +
+		`">Error text</text></stream:error>`)
+	ch = make(chan interface{})
+	go readXml(r, ch, false)
+	x = <- ch
+	se, ok = x.(*StreamError)
+	if !ok {
+		t.Fatalf("not StreamError: %v", reflect.TypeOf(x))
+	}
+	assertEquals(t, "bad-foo", se.Any.XMLName.Local)
+	assertEquals(t, "", se.Any.XMLName.Space)
+	assertEquals(t, "Error text", se.Text.Text)
+	assertEquals(t, "en", se.Text.Lang)
+}
+
+func TestReadStream(t *testing.T) {
+	r := strings.NewReader(`<stream:stream to="foo.com" ` +
+		`from="bar.org" id="42"` +
+		`xmlns="jabber:client" xmlns:stream="` + nsStream +
+		`" version="1.0">`)
+	ch := make(chan interface{})
+	go readXml(r, ch, false)
+	x := <- ch
+	ss, ok := x.(*Stream)
+	if !ok {
+		t.Fatalf("not Stream: %v", reflect.TypeOf(x))
+	}
+	assertEquals(t, "foo.com", ss.To)
+	assertEquals(t, "bar.org", ss.From)
+	assertEquals(t, "42", ss.Id)
+	assertEquals(t, "1.0", ss.Version)
+}
+
+func testWrite(obj interface{}) string {
+	w := bytes.NewBuffer(nil)
+	ch := make(chan interface{})
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		writeXml(w, ch, true)
+	}()
+	ch <- obj
+	close(ch)
+	wg.Wait()
+	return w.String()
+}
+
+func TestWriteError(t *testing.T) {
+	se := &StreamError{Any: definedCondition{XMLName:
+			xml.Name{Local: "blah"}}}
+	str := testWrite(se)
+	exp := `<stream:error><blah></blah></stream:error>`
+	assertEquals(t, exp, str)
+
+	se = &StreamError{Any: definedCondition{XMLName:
+			xml.Name{Space: nsStreams, Local: "foo"}},
+		Text: &errText{Lang: "ru", Text: "Пошёл ты"}}
+	str = testWrite(se)
+	exp = `<stream:error><foo xmlns="` + nsStreams +
+		`"></foo><text xmlns="` + nsStreams +
+		`" xml:lang="ru">Пошёл ты</text></stream:error>`
+	assertEquals(t, exp, str)
+}
+
+func TestWriteStream(t *testing.T) {
+	ss := &Stream{To: "foo.org", From: "bar.com", Id: "42", Lang:
+		"en", Version: "1.0"}
+	str := testWrite(ss)
+	exp := `<stream:stream xmlns="jabber:client"` +
+		` xmlns:stream="` + nsStream + `" to="foo.org"` +
+		` from="bar.com" id="42" xml:lang="en" version="1.0">`
+	assertEquals(t, exp, str)
+}