# HG changeset patch # User Chris Jones # Date 1324863973 25200 # Node ID 8e425e340ca11e24537053b60934525d6f7e24ab # Parent faef59c8db058582086020cc5cf5756fb675eb94 Implemented writing to the remote. Now we have bidirectional communication. diff -r faef59c8db05 -r 8e425e340ca1 examples/interact.go --- a/examples/interact.go Sat Dec 24 13:11:36 2011 -0700 +++ b/examples/interact.go Sun Dec 25 18:46:13 2011 -0700 @@ -7,6 +7,7 @@ import ( "cjyar/xmpp" "flag" + "fmt" "log" "os" ) @@ -28,4 +29,19 @@ log.Fatalf("NewClient(%v): %v", jid, err) } defer c.Close() + + go func(ch <-chan interface{}) { + for obj := range ch { + fmt.Printf("s: %v\n", obj) + } + fmt.Println("done reading") + }(c.In) + + ch := make(chan interface{}) + go xmpp.ReadXml(os.Stdin, ch, false) + for x := range ch { + fmt.Printf("c: %v", x) + c.Out <- x + } + fmt.Println("done sending") } diff -r faef59c8db05 -r 8e425e340ca1 structs.go --- a/structs.go Sat Dec 24 13:11:36 2011 -0700 +++ b/structs.go Sun Dec 25 18:46:13 2011 -0700 @@ -38,17 +38,19 @@ // XMPP's XML element type Stream struct { - to string `xml:"attr"` - from string `xml:"attr"` - id string `xml:"attr"` - lang string `xml:"attr"` - version string `xml:"attr"` + To string `xml:"attr"` + From string `xml:"attr"` + Id string `xml:"attr"` + Lang string `xml:"attr"` + Version string `xml:"attr"` } var _ xml.Marshaler = &Stream{} +var _ fmt.Stringer = &Stream{} +// type StreamError struct { - cond definedCondition - text *errText + Any definedCondition + Text *errText } var _ xml.Marshaler = &StreamError{} @@ -58,8 +60,8 @@ } type errText struct { - Lang string - text string `xml:"chardata"` + Lang string `xml:"attr"` + Text string `xml:"chardata"` } var _ xml.Marshaler = &errText{} @@ -101,31 +103,37 @@ func (s *Stream) MarshalXML() ([]byte, os.Error) { buf := bytes.NewBuffer(nil) buf.WriteString("") // We never write return buf.Bytes(), nil } +func (s *Stream) String() string { + result, _ := s.MarshalXML() + return string(result) +} + func parseStream(se xml.StartElement) (*Stream, os.Error) { s := &Stream{} - se = se.Copy() for _, attr := range se.Attr { switch strings.ToLower(attr.Name.Local) { case "to": - s.to = attr.Value + s.To = attr.Value case "from": - s.from = attr.Value + s.From = attr.Value case "id": - s.id = attr.Value + s.Id = attr.Value case "lang": - s.lang = attr.Value + s.Lang = attr.Value case "version": - s.version = attr.Value + s.Version = attr.Value } } return s, nil @@ -134,9 +142,9 @@ func (s *StreamError) MarshalXML() ([]byte, os.Error) { buf := bytes.NewBuffer(nil) buf.WriteString("") - xml.Marshal(buf, s.cond) - if s.text != nil { - xml.Marshal(buf, s.text) + xml.Marshal(buf, s.Any) + if s.Text != nil { + xml.Marshal(buf, s.Text) } buf.WriteString("") return buf.Bytes(), nil @@ -148,7 +156,7 @@ writeField(buf, "xmlns", nsStreams) writeField(buf, "xml:lang", e.Lang) buf.WriteString(">") - xml.Escape(buf, []byte(e.text)) + xml.Escape(buf, []byte(e.Text)) buf.WriteString("") return buf.Bytes(), nil } diff -r faef59c8db05 -r 8e425e340ca1 structs_test.go --- a/structs_test.go Sat Dec 24 13:11:36 2011 -0700 +++ b/structs_test.go Sun Dec 25 18:46:13 2011 -0700 @@ -50,28 +50,32 @@ } func TestStreamMarshal(t *testing.T) { - s := &Stream{to: "bob"} - exp := `` + s := &Stream{To: "bob"} + exp := `` assertMarshal(t, exp, s) - s = &Stream{to: "bob", from: "alice", id: "#3", version: "5.3"} - exp = `` + s = &Stream{To: "bob", From: "alice", Id: "#3", Version: "5.3"} + exp = `` assertMarshal(t, exp, s) - s = &Stream{lang: "en_US"} - exp = `` + s = &Stream{Lang: "en_US"} + exp = `` assertMarshal(t, exp, s) } func TestStreamErrorMarshal(t *testing.T) { name := xml.Name{Space: nsStreams, Local: "ack"} - e := &StreamError{cond: definedCondition{name}} + e := &StreamError{Any: definedCondition{name}} exp := ``; assertMarshal(t, exp, e) - txt := errText{Lang: "pt", text: "things happen"} - e = &StreamError{cond: definedCondition{name}, text: &txt} + txt := errText{Lang: "pt", Text: "things happen"} + e = &StreamError{Any: definedCondition{name}, Text: &txt} exp = `things happen` diff -r faef59c8db05 -r 8e425e340ca1 xmpp.go --- a/xmpp.go Sat Dec 24 13:11:36 2011 -0700 +++ b/xmpp.go Sun Dec 25 18:46:13 2011 -0700 @@ -7,6 +7,7 @@ package xmpp import ( + "bytes" "fmt" "io" "log" @@ -18,6 +19,7 @@ const ( serverSrv = "xmpp-server" clientSrv = "xmpp-client" + debug = true ) // The client in a client-server XMPP connection. @@ -60,23 +62,39 @@ return nil, err } - cl := Client{} + cl := new(Client) cl.tcp = c cl.in = make(chan interface{}) cl.In = cl.in + cl.out = make(chan interface{}) + cl.Out = cl.out // TODO Send readXml a reader that we can close when we // negotiate TLS. - go readXml(cl.tcp, cl.in) - // TODO go writeXml(&cl) + go readXml(cl.tcp, cl.in, debug) + go writeXml(cl.tcp, cl.out, debug) - return &cl, nil + return cl, nil } func (c *Client) Close() os.Error { + close(c.in) + close(c.out) return c.tcp.Close() } -func readXml(r io.Reader, ch chan<- interface{}) { +// TODO Delete; for use only by interact.go: +func ReadXml(r io.ReadCloser, ch chan<- interface{}, dbg bool) { + readXml(r, ch, dbg) +} + +func readXml(r io.Reader, ch chan<- interface{}, dbg bool) { + defer close(ch) + if dbg { + pr, pw := io.Pipe() + go tee(r, pw, "S: ") + r = pr + } + p := xml.NewParser(r) for { // Sniff the next token on the stream. @@ -95,8 +113,8 @@ // Allocate the appropriate structure for this token. var obj interface{} - switch se.Name.Space + se.Name.Local { - case "stream stream": + switch se.Name.Space + " " + se.Name.Local { + case nsStream + " stream": st, err := parseStream(se) if err != nil { log.Printf("unmarshal stream: %v", @@ -105,7 +123,7 @@ } ch <- st continue - case nsStreams + " stream:error": + case "stream error": obj = &StreamError{} default: obj = &Unrecognized{} @@ -122,3 +140,51 @@ ch <- obj } } + +func writeXml(w io.Writer, ch <-chan interface{}, dbg bool) { + if dbg { + pr, pw := io.Pipe() + go tee(pr, w, "C: ") + w = pw + } + + for obj := range ch { + err := xml.Marshal(w, obj) + if err != nil { + log.Printf("write: %v", err) + break + } + } +} + +func tee(r io.Reader, w io.Writer, prefix string) { + defer func(xs ...interface{}) { + for _, x := range xs { + if c, ok := x.(io.Closer) ; ok { + c.Close() + } + } + }(r, w) + + buf := bytes.NewBuffer(nil) + for { + var c [1]byte + n, _ := r.Read(c[:]) + if n == 0 { + break + } + n, _ = w.Write(c[:]) + if n == 0 { + break + } + buf.Write(c[:]) + if c[0] == '\n' { + fmt.Printf("%s%s", prefix, buf.String()) + buf.Reset() + } + } + leftover := buf.String() + if leftover != "" { + fmt.Printf("%s%s\n", prefix, leftover) + } +} diff -r faef59c8db05 -r 8e425e340ca1 xmpp_test.go --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/xmpp_test.go Sun Dec 25 18:46:13 2011 -0700 @@ -0,0 +1,105 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package xmpp + +import ( + "bytes" + "reflect" + "strings" + "sync" + "testing" + "xml" +) + +func TestReadError(t *testing.T) { + r := strings.NewReader(``) + ch := make(chan interface{}) + go readXml(r, ch, false) + x := <- ch + se, ok := x.(*StreamError) + if !ok { + t.Fatalf("not StreamError: %v", reflect.TypeOf(x)) + } + assertEquals(t, "bad-foo", se.Any.XMLName.Local) + assertEquals(t, "", se.Any.XMLName.Space) + if se.Text != nil { + t.Errorf("text not nil: %v", se.Text) + } + + r = strings.NewReader(`` + + `Error text`) + ch = make(chan interface{}) + go readXml(r, ch, false) + x = <- ch + se, ok = x.(*StreamError) + if !ok { + t.Fatalf("not StreamError: %v", reflect.TypeOf(x)) + } + assertEquals(t, "bad-foo", se.Any.XMLName.Local) + assertEquals(t, "", se.Any.XMLName.Space) + assertEquals(t, "Error text", se.Text.Text) + assertEquals(t, "en", se.Text.Lang) +} + +func TestReadStream(t *testing.T) { + r := strings.NewReader(``) + ch := make(chan interface{}) + go readXml(r, ch, false) + x := <- ch + ss, ok := x.(*Stream) + if !ok { + t.Fatalf("not Stream: %v", reflect.TypeOf(x)) + } + assertEquals(t, "foo.com", ss.To) + assertEquals(t, "bar.org", ss.From) + assertEquals(t, "42", ss.Id) + assertEquals(t, "1.0", ss.Version) +} + +func testWrite(obj interface{}) string { + w := bytes.NewBuffer(nil) + ch := make(chan interface{}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + writeXml(w, ch, true) + }() + ch <- obj + close(ch) + wg.Wait() + return w.String() +} + +func TestWriteError(t *testing.T) { + se := &StreamError{Any: definedCondition{XMLName: + xml.Name{Local: "blah"}}} + str := testWrite(se) + exp := `` + assertEquals(t, exp, str) + + se = &StreamError{Any: definedCondition{XMLName: + xml.Name{Space: nsStreams, Local: "foo"}}, + Text: &errText{Lang: "ru", Text: "Пошёл ты"}} + str = testWrite(se) + exp = `Пошёл ты` + assertEquals(t, exp, str) +} + +func TestWriteStream(t *testing.T) { + ss := &Stream{To: "foo.org", From: "bar.com", Id: "42", Lang: + "en", Version: "1.0"} + str := testWrite(ss) + exp := `` + assertEquals(t, exp, str) +}