8 |
8 |
9 import ( |
9 import ( |
10 "bytes" |
10 "bytes" |
11 "fmt" |
11 "fmt" |
12 "io" |
12 "io" |
13 "log" |
|
14 "net" |
13 "net" |
15 "os" |
14 "os" |
16 "xml" |
15 "sync" |
17 ) |
16 ) |
18 |
17 |
19 const ( |
18 const ( |
|
19 // Version of RFC 3920 that we implement. |
|
20 Version = "1.0" |
|
21 |
|
22 // Various XML namespaces. |
|
23 nsStreams = "urn:ietf:params:xml:ns:xmpp-streams" |
|
24 nsStream = "http://etherx.jabber.org/streams" |
|
25 nsTLS = "urn:ietf:params:xml:ns:xmpp-tls" |
|
26 |
|
27 // DNS SRV names |
20 serverSrv = "xmpp-server" |
28 serverSrv = "xmpp-server" |
21 clientSrv = "xmpp-client" |
29 clientSrv = "xmpp-client" |
22 debug = false |
30 |
|
31 debug = true |
23 ) |
32 ) |
24 |
33 |
25 // The client in a client-server XMPP connection. |
34 // The client in a client-server XMPP connection. |
26 type Client struct { |
35 type Client struct { |
|
36 Jid JID |
|
37 socket net.Conn |
|
38 socketSync sync.WaitGroup |
27 In <-chan interface{} |
39 In <-chan interface{} |
28 Out chan<- interface{} |
40 Out chan<- interface{} |
|
41 xmlOut chan<- interface{} |
29 TextOut chan<- *string |
42 TextOut chan<- *string |
30 } |
43 } |
31 var _ io.Closer = &Client{} |
44 var _ io.Closer = &Client{} |
32 |
45 |
33 // Connect to the appropriate server and authenticate as the given JID |
46 // Connect to the appropriate server and authenticate as the given JID |
58 } |
71 } |
59 if tcp == nil { |
72 if tcp == nil { |
60 return nil, err |
73 return nil, err |
61 } |
74 } |
62 |
75 |
|
76 cl := new(Client) |
|
77 cl.Jid = *jid |
|
78 cl.socket = tcp |
|
79 |
63 // Start the transport handler, initially unencrypted. |
80 // Start the transport handler, initially unencrypted. |
64 tlsr, tlsw := startTransport(tcp) |
81 tlsr, tlsw := cl.startTransport() |
65 |
82 |
66 // Start the reader and writers that convert to and from XML. |
83 // Start the reader and writers that convert to and from XML. |
67 xmlIn := startXmlReader(tlsr) |
84 xmlIn := startXmlReader(tlsr) |
68 xmlOut := startXmlWriter(tlsw) |
85 cl.xmlOut = startXmlWriter(tlsw) |
69 textOut := startTextWriter(tlsw) |
86 textOut := startTextWriter(tlsw) |
70 |
87 |
71 // Start the XMPP stream handler which filters stream-level |
88 // Start the XMPP stream handler which filters stream-level |
72 // events and responds to them. |
89 // events and responds to them. |
73 clIn := startStreamReader(xmlIn) |
90 clIn := cl.startStreamReader(xmlIn, cl.xmlOut) |
74 clOut := startStreamWriter(xmlOut) |
91 clOut := startStreamWriter(cl.xmlOut) |
75 |
92 |
76 // Initial handshake. |
93 // Initial handshake. |
77 hsOut := &Stream{To: jid.Domain, Version: Version} |
94 hsOut := &Stream{To: jid.Domain, Version: Version} |
78 xmlOut <- hsOut |
95 cl.xmlOut <- hsOut |
79 |
96 |
80 // TODO Wait for initialization to finish. |
97 // TODO Wait for initialization to finish. |
81 |
98 |
82 // Make the Client and init its fields. |
|
83 cl := new(Client) |
|
84 cl.In = clIn |
99 cl.In = clIn |
85 cl.Out = clOut |
100 cl.Out = clOut |
86 cl.TextOut = textOut |
101 cl.TextOut = textOut |
87 |
102 |
88 return cl, nil |
103 return cl, nil |
91 func (c *Client) Close() os.Error { |
106 func (c *Client) Close() os.Error { |
92 tryClose(c.In, c.Out, c.TextOut) |
107 tryClose(c.In, c.Out, c.TextOut) |
93 return nil |
108 return nil |
94 } |
109 } |
95 |
110 |
96 func startTransport(tcp io.ReadWriter) (io.Reader, io.Writer) { |
111 func (cl *Client) startTransport() (io.Reader, io.Writer) { |
97 f := func(r io.Reader, w io.Writer, dir string) { |
|
98 defer tryClose(r, w) |
|
99 p := make([]byte, 1024) |
|
100 for { |
|
101 nr, err := r.Read(p) |
|
102 if nr == 0 { |
|
103 log.Printf("%s: %s", dir, err.String()) |
|
104 break |
|
105 } |
|
106 nw, err := w.Write(p[:nr]) |
|
107 if nw < nr { |
|
108 log.Println("%s: %s", dir, err.String()) |
|
109 break |
|
110 } |
|
111 } |
|
112 } |
|
113 inr, inw := io.Pipe() |
112 inr, inw := io.Pipe() |
114 outr, outw := io.Pipe() |
113 outr, outw := io.Pipe() |
115 go f(tcp, inw, "read") |
114 go cl.readTransport(inw) |
116 go f(outr, tcp, "write") |
115 go cl.writeTransport(outr) |
117 return inr, outw |
116 return inr, outw |
118 } |
117 } |
119 |
118 |
120 func startXmlReader(r io.Reader) <-chan interface{} { |
119 func startXmlReader(r io.Reader) <-chan interface{} { |
121 ch := make(chan interface{}) |
120 ch := make(chan interface{}) |
133 ch := make(chan *string) |
132 ch := make(chan *string) |
134 go writeText(w, ch) |
133 go writeText(w, ch) |
135 return ch |
134 return ch |
136 } |
135 } |
137 |
136 |
138 func startStreamReader(xmlIn <-chan interface{}) <-chan interface{} { |
137 func (cl *Client) startStreamReader(xmlIn <-chan interface{}, srvOut chan<- interface{}) <-chan interface{} { |
139 ch := make(chan interface{}) |
138 ch := make(chan interface{}) |
140 go readStream(xmlIn, ch) |
139 go cl.readStream(xmlIn, srvOut, ch) |
141 return ch |
140 return ch |
142 } |
141 } |
143 |
142 |
144 func startStreamWriter(xmlOut chan<- interface{}) chan<- interface{} { |
143 func startStreamWriter(xmlOut chan<- interface{}) chan<- interface{} { |
145 ch := make(chan interface{}) |
144 ch := make(chan interface{}) |
146 go writeStream(xmlOut, ch) |
145 go writeStream(xmlOut, ch) |
147 return ch |
146 return ch |
148 } |
|
149 |
|
150 func readXml(r io.Reader, ch chan<- interface{}) { |
|
151 if debug { |
|
152 pr, pw := io.Pipe() |
|
153 go tee(r, pw, "S: ") |
|
154 r = pr |
|
155 } |
|
156 defer tryClose(r, ch) |
|
157 |
|
158 p := xml.NewParser(r) |
|
159 for { |
|
160 // Sniff the next token on the stream. |
|
161 t, err := p.Token() |
|
162 if t == nil { |
|
163 if err != os.EOF { |
|
164 log.Printf("read: %v", err) |
|
165 } |
|
166 break |
|
167 } |
|
168 var se xml.StartElement |
|
169 var ok bool |
|
170 if se, ok = t.(xml.StartElement) ; !ok { |
|
171 continue |
|
172 } |
|
173 |
|
174 // Allocate the appropriate structure for this token. |
|
175 var obj interface{} |
|
176 switch se.Name.Space + " " + se.Name.Local { |
|
177 case nsStream + " stream": |
|
178 st, err := parseStream(se) |
|
179 if err != nil { |
|
180 log.Printf("unmarshal stream: %v", |
|
181 err) |
|
182 break |
|
183 } |
|
184 ch <- st |
|
185 continue |
|
186 case "stream error", nsStream + " error": |
|
187 obj = &StreamError{} |
|
188 case nsStream + " features": |
|
189 obj = &Features{} |
|
190 default: |
|
191 obj = &Unrecognized{} |
|
192 log.Printf("Ignoring unrecognized: %s %s\n", |
|
193 se.Name.Space, se.Name.Local) |
|
194 } |
|
195 |
|
196 // Read the complete XML stanza. |
|
197 err = p.Unmarshal(obj, &se) |
|
198 if err != nil { |
|
199 log.Printf("unmarshal: %v", err) |
|
200 break |
|
201 } |
|
202 |
|
203 // Put it on the channel. |
|
204 ch <- obj |
|
205 } |
|
206 } |
|
207 |
|
208 func writeXml(w io.Writer, ch <-chan interface{}) { |
|
209 if debug { |
|
210 pr, pw := io.Pipe() |
|
211 go tee(pr, w, "C: ") |
|
212 w = pw |
|
213 } |
|
214 defer tryClose(w, ch) |
|
215 |
|
216 for obj := range ch { |
|
217 err := xml.Marshal(w, obj) |
|
218 if err != nil { |
|
219 log.Printf("write: %v", err) |
|
220 break |
|
221 } |
|
222 } |
|
223 } |
|
224 |
|
225 func writeText(w io.Writer, ch <-chan *string) { |
|
226 if debug { |
|
227 pr, pw := io.Pipe() |
|
228 go tee(pr, w, "C: ") |
|
229 w = pw |
|
230 } |
|
231 defer tryClose(w, ch) |
|
232 |
|
233 for str := range ch { |
|
234 _, err := w.Write([]byte(*str)) |
|
235 if err != nil { |
|
236 log.Printf("writeStr: %v", err) |
|
237 break |
|
238 } |
|
239 } |
|
240 } |
|
241 |
|
242 func readStream(srvIn <-chan interface{}, cliOut chan<- interface{}) { |
|
243 defer tryClose(srvIn, cliOut) |
|
244 |
|
245 for x := range srvIn { |
|
246 cliOut <- x |
|
247 } |
|
248 } |
|
249 |
|
250 func writeStream(srvOut chan<- interface{}, cliIn <-chan interface{}) { |
|
251 defer tryClose(srvOut, cliIn) |
|
252 |
|
253 for x := range cliIn { |
|
254 srvOut <- x |
|
255 } |
|
256 } |
147 } |
257 |
148 |
258 func tee(r io.Reader, w io.Writer, prefix string) { |
149 func tee(r io.Reader, w io.Writer, prefix string) { |
259 defer tryClose(r, w) |
150 defer tryClose(r, w) |
260 |
151 |