40 if err != nil { |
38 if err != nil { |
41 return nil, os.NewError("LookupSrv " + jid.Domain + |
39 return nil, os.NewError("LookupSrv " + jid.Domain + |
42 ": " + err.String()) |
40 ": " + err.String()) |
43 } |
41 } |
44 |
42 |
45 var c *net.TCPConn |
43 var tcp *net.TCPConn |
46 for _, srv := range srvs { |
44 for _, srv := range srvs { |
47 addrStr := fmt.Sprintf("%s:%d", srv.Target, srv.Port) |
45 addrStr := fmt.Sprintf("%s:%d", srv.Target, srv.Port) |
48 addr, err := net.ResolveTCPAddr("tcp", addrStr) |
46 addr, err := net.ResolveTCPAddr("tcp", addrStr) |
49 if err != nil { |
47 if err != nil { |
50 err = os.NewError(fmt.Sprintf("ResolveTCPAddr(%s): %s", |
48 err = os.NewError(fmt.Sprintf("ResolveTCPAddr(%s): %s", |
51 addrStr, err.String())) |
49 addrStr, err.String())) |
52 continue |
50 continue |
53 } |
51 } |
54 c, err = net.DialTCP("tcp", nil, addr) |
52 tcp, err = net.DialTCP("tcp", nil, addr) |
55 if err != nil { |
53 if err != nil { |
56 err = os.NewError(fmt.Sprintf("DialTCP(%s): %s", |
54 err = os.NewError(fmt.Sprintf("DialTCP(%s): %s", |
57 addr, err.String())) |
55 addr, err.String())) |
58 continue |
56 continue |
59 } |
57 } |
60 } |
58 } |
61 if c == nil { |
59 if tcp == nil { |
62 return nil, err |
60 return nil, err |
63 } |
61 } |
64 |
62 |
65 cl := new(Client) |
63 // Start the transport handler, initially unencrypted. |
66 cl.tcp = c |
64 tlsr, tlsw := startTransport(tcp) |
67 cl.in = make(chan interface{}) |
65 |
68 cl.In = cl.in |
66 // Start the reader and writers that convert to and from XML. |
69 cl.out = make(chan interface{}) |
67 xmlIn := startXmlReader(tlsr) |
70 cl.Out = cl.out |
68 xmlOut := startXmlWriter(tlsw) |
71 // TODO Send readXml a reader that we can close when we |
69 textOut := startTextWriter(tlsw) |
72 // negotiate TLS. |
70 |
73 go readXml(cl.tcp, cl.in, debug) |
71 // Start the XMPP stream handler which filters stream-level |
74 go writeXml(cl.tcp, cl.out, debug) |
72 // events and responds to them. |
|
73 clIn := startStreamReader(xmlIn) |
|
74 clOut := startStreamWriter(xmlOut) |
75 |
75 |
76 // Initial handshake. |
76 // Initial handshake. |
77 hsOut := &Stream{To: jid.Domain, Version: Version} |
77 hsOut := &Stream{To: jid.Domain, Version: Version} |
78 cl.Out <- hsOut |
78 xmlOut <- hsOut |
|
79 |
|
80 // TODO Wait for initialization to finish. |
|
81 |
|
82 // Make the Client and init its fields. |
|
83 cl := new(Client) |
|
84 cl.In = clIn |
|
85 cl.Out = clOut |
|
86 cl.TextOut = textOut |
79 |
87 |
80 return cl, nil |
88 return cl, nil |
81 } |
89 } |
82 |
90 |
83 func (c *Client) Close() os.Error { |
91 func (c *Client) Close() os.Error { |
84 close(c.in) |
92 tryClose(c.In, c.Out, c.TextOut) |
85 close(c.out) |
93 return nil |
86 return c.tcp.Close() |
94 } |
87 } |
95 |
88 |
96 func startTransport(tcp io.ReadWriter) (io.Reader, io.Writer) { |
89 // TODO Delete; for use only by interact.go: |
97 f := func(r io.Reader, w io.Writer, dir string) { |
90 func ReadXml(r io.ReadCloser, ch chan<- interface{}, dbg bool) { |
98 defer tryClose(r, w) |
91 readXml(r, ch, dbg) |
99 p := make([]byte, 1024) |
92 } |
100 for { |
93 |
101 nr, err := r.Read(p) |
94 func readXml(r io.Reader, ch chan<- interface{}, dbg bool) { |
102 if nr == 0 { |
95 defer close(ch) |
103 log.Printf("%s: %s", dir, err.String()) |
96 if dbg { |
104 break |
|
105 } |
|
106 nw, err := w.Write(p[:nr]) |
|
107 if nw < nr { |
|
108 log.Println("%s: %s", dir, err.String()) |
|
109 break |
|
110 } |
|
111 } |
|
112 } |
|
113 inr, inw := io.Pipe() |
|
114 outr, outw := io.Pipe() |
|
115 go f(tcp, inw, "read") |
|
116 go f(outr, tcp, "write") |
|
117 return inr, outw |
|
118 } |
|
119 |
|
120 func startXmlReader(r io.Reader) <-chan interface{} { |
|
121 ch := make(chan interface{}) |
|
122 go readXml(r, ch) |
|
123 return ch |
|
124 } |
|
125 |
|
126 func startXmlWriter(w io.Writer) chan<- interface{} { |
|
127 ch := make(chan interface{}) |
|
128 go writeXml(w, ch) |
|
129 return ch |
|
130 } |
|
131 |
|
132 func startTextWriter(w io.Writer) chan<- *string { |
|
133 ch := make(chan *string) |
|
134 go writeText(w, ch) |
|
135 return ch |
|
136 } |
|
137 |
|
138 func startStreamReader(xmlIn <-chan interface{}) <-chan interface{} { |
|
139 ch := make(chan interface{}) |
|
140 go readStream(xmlIn, ch) |
|
141 return ch |
|
142 } |
|
143 |
|
144 func startStreamWriter(xmlOut chan<- interface{}) chan<- interface{} { |
|
145 ch := make(chan interface{}) |
|
146 go writeStream(xmlOut, ch) |
|
147 return ch |
|
148 } |
|
149 |
|
150 func readXml(r io.Reader, ch chan<- interface{}) { |
|
151 if debug { |
97 pr, pw := io.Pipe() |
152 pr, pw := io.Pipe() |
98 go tee(r, pw, "S: ") |
153 go tee(r, pw, "S: ") |
99 r = pr |
154 r = pr |
100 } |
155 } |
|
156 defer tryClose(r, ch) |
101 |
157 |
102 p := xml.NewParser(r) |
158 p := xml.NewParser(r) |
103 for { |
159 for { |
104 // Sniff the next token on the stream. |
160 // Sniff the next token on the stream. |
105 t, err := p.Token() |
161 t, err := p.Token() |
147 // Put it on the channel. |
203 // Put it on the channel. |
148 ch <- obj |
204 ch <- obj |
149 } |
205 } |
150 } |
206 } |
151 |
207 |
152 func writeXml(w io.Writer, ch <-chan interface{}, dbg bool) { |
208 func writeXml(w io.Writer, ch <-chan interface{}) { |
153 if dbg { |
209 if debug { |
154 pr, pw := io.Pipe() |
210 pr, pw := io.Pipe() |
155 go tee(pr, w, "C: ") |
211 go tee(pr, w, "C: ") |
156 w = pw |
212 w = pw |
157 } |
213 } |
|
214 defer tryClose(w, ch) |
158 |
215 |
159 for obj := range ch { |
216 for obj := range ch { |
160 err := xml.Marshal(w, obj) |
217 err := xml.Marshal(w, obj) |
161 if err != nil { |
218 if err != nil { |
162 log.Printf("write: %v", err) |
219 log.Printf("write: %v", err) |
163 break |
220 break |
164 } |
221 } |
165 } |
222 } |
166 } |
223 } |
167 |
224 |
|
225 func writeText(w io.Writer, ch <-chan *string) { |
|
226 if debug { |
|
227 pr, pw := io.Pipe() |
|
228 go tee(pr, w, "C: ") |
|
229 w = pw |
|
230 } |
|
231 defer tryClose(w, ch) |
|
232 |
|
233 for str := range ch { |
|
234 _, err := w.Write([]byte(*str)) |
|
235 if err != nil { |
|
236 log.Printf("writeStr: %v", err) |
|
237 break |
|
238 } |
|
239 } |
|
240 } |
|
241 |
|
242 func readStream(srvIn <-chan interface{}, cliOut chan<- interface{}) { |
|
243 defer tryClose(srvIn, cliOut) |
|
244 |
|
245 for x := range srvIn { |
|
246 cliOut <- x |
|
247 } |
|
248 } |
|
249 |
|
250 func writeStream(srvOut chan<- interface{}, cliIn <-chan interface{}) { |
|
251 defer tryClose(srvOut, cliIn) |
|
252 |
|
253 for x := range cliIn { |
|
254 srvOut <- x |
|
255 } |
|
256 } |
|
257 |
168 func tee(r io.Reader, w io.Writer, prefix string) { |
258 func tee(r io.Reader, w io.Writer, prefix string) { |
169 defer func(xs ...interface{}) { |
259 defer tryClose(r, w) |
170 for _, x := range xs { |
|
171 if c, ok := x.(io.Closer) ; ok { |
|
172 c.Close() |
|
173 } |
|
174 } |
|
175 }(r, w) |
|
176 |
260 |
177 buf := bytes.NewBuffer(nil) |
261 buf := bytes.NewBuffer(nil) |
178 for { |
262 for { |
179 var c [1]byte |
263 var c [1]byte |
180 n, _ := r.Read(c[:]) |
264 n, _ := r.Read(c[:]) |
181 if n == 0 { |
265 if n == 0 { |
182 break |
266 break |
183 } |
267 } |
184 n, _ = w.Write(c[:]) |
268 n, _ = w.Write(c[:n]) |
185 if n == 0 { |
269 if n == 0 { |
186 break |
270 break |
187 } |
271 } |
188 buf.Write(c[:]) |
272 buf.Write(c[:n]) |
189 if c[0] == '\n' { |
273 if c[0] == '\n' || c[0] == '>' { |
190 fmt.Printf("%s%s", prefix, buf.String()) |
274 fmt.Printf("%s%s\n", prefix, buf.String()) |
191 buf.Reset() |
275 buf.Reset() |
192 } |
276 } |
193 } |
277 } |
194 leftover := buf.String() |
278 leftover := buf.String() |
195 if leftover != "" { |
279 if leftover != "" { |
196 fmt.Printf("%s%s\n", prefix, leftover) |
280 fmt.Printf("%s%s\n", prefix, leftover) |
197 } |
281 } |
198 } |
282 } |
|
283 |
|
284 func tryClose(xs ...interface{}) { |
|
285 f1 := func(ch chan<- interface{}) { |
|
286 defer func() { |
|
287 recover() |
|
288 }() |
|
289 close(ch) |
|
290 } |
|
291 f2 := func(ch <-chan interface{}) { |
|
292 defer func() { |
|
293 recover() |
|
294 }() |
|
295 close(ch) |
|
296 } |
|
297 |
|
298 for _, x := range xs { |
|
299 if c, ok := x.(io.Closer) ; ok { |
|
300 c.Close() |
|
301 } else if ch, ok := x.(chan<- interface{}) ; ok { |
|
302 f1(ch) |
|
303 } else if ch, ok := x.(<-chan interface{}) ; ok { |
|
304 f2(ch) |
|
305 } |
|
306 } |
|
307 } |