equal
deleted
inserted
replaced
35 } |
35 } |
36 |
36 |
37 // BUG(cjyar) Review all these *Client receiver methods. They should |
37 // BUG(cjyar) Review all these *Client receiver methods. They should |
38 // probably either all be receivers, or none. |
38 // probably either all be receivers, or none. |
39 |
39 |
40 func (cl *Client) readTransport(w io.Writer) { |
40 func (cl *Client) readTransport(w io.WriteCloser) { |
41 defer tryClose(cl.socket, w) |
41 defer w.Close() |
42 cl.socket.SetReadTimeout(1e8) |
42 cl.socket.SetReadTimeout(1e8) |
43 p := make([]byte, 1024) |
43 p := make([]byte, 1024) |
44 for { |
44 for { |
45 if cl.socket == nil { |
45 if cl.socket == nil { |
46 cl.waitForSocket() |
46 cl.waitForSocket() |
66 } |
66 } |
67 } |
67 } |
68 } |
68 } |
69 |
69 |
70 func (cl *Client) writeTransport(r io.Reader) { |
70 func (cl *Client) writeTransport(r io.Reader) { |
71 defer tryClose(r, cl.socket) |
71 defer cl.socket.Close() |
72 p := make([]byte, 1024) |
72 p := make([]byte, 1024) |
73 for { |
73 for { |
74 nr, err := r.Read(p) |
74 nr, err := r.Read(p) |
75 if nr == 0 { |
75 if nr == 0 { |
76 if Log != nil { |
76 if Log != nil { |
93 if Loglevel >= syslog.LOG_DEBUG { |
93 if Loglevel >= syslog.LOG_DEBUG { |
94 pr, pw := io.Pipe() |
94 pr, pw := io.Pipe() |
95 go tee(r, pw, "S: ") |
95 go tee(r, pw, "S: ") |
96 r = pr |
96 r = pr |
97 } |
97 } |
98 defer tryClose(r, ch) |
98 defer close(ch) |
99 |
99 |
100 p := xml.NewParser(r) |
100 p := xml.NewParser(r) |
|
101 Loop: |
101 for { |
102 for { |
102 // Sniff the next token on the stream. |
103 // Sniff the next token on the stream. |
103 t, err := p.Token() |
104 t, err := p.Token() |
104 if t == nil { |
105 if t == nil { |
105 if err != os.EOF { |
106 if err != os.EOF { |
123 if err != nil { |
124 if err != nil { |
124 if Log != nil { |
125 if Log != nil { |
125 Log.Err("unmarshal stream: " + |
126 Log.Err("unmarshal stream: " + |
126 err.String()) |
127 err.String()) |
127 } |
128 } |
128 break |
129 break Loop |
129 } |
130 } |
130 ch <- st |
131 ch <- st |
131 continue |
132 continue |
132 case "stream error", NsStream + " error": |
133 case "stream error", NsStream + " error": |
133 obj = &streamError{} |
134 obj = &streamError{} |
156 err = p.Unmarshal(obj, &se) |
157 err = p.Unmarshal(obj, &se) |
157 if err != nil { |
158 if err != nil { |
158 if Log != nil { |
159 if Log != nil { |
159 Log.Err("unmarshal: " + err.String()) |
160 Log.Err("unmarshal: " + err.String()) |
160 } |
161 } |
161 break |
162 break Loop |
162 } |
163 } |
163 |
164 |
164 // If it's a Stanza, we try to unmarshal its innerxml |
165 // If it's a Stanza, we try to unmarshal its innerxml |
165 // into objects of the appropriate respective |
166 // into objects of the appropriate respective |
166 // types. This is specified by our extensions. |
167 // types. This is specified by our extensions. |
169 if err != nil { |
170 if err != nil { |
170 if Log != nil { |
171 if Log != nil { |
171 Log.Err("ext unmarshal: " + |
172 Log.Err("ext unmarshal: " + |
172 err.String()) |
173 err.String()) |
173 } |
174 } |
174 break |
175 break Loop |
175 } |
176 } |
176 } |
177 } |
177 |
178 |
178 // Put it on the channel. |
179 // Put it on the channel. |
179 ch <- obj |
180 ch <- obj |
216 if Loglevel >= syslog.LOG_DEBUG { |
217 if Loglevel >= syslog.LOG_DEBUG { |
217 pr, pw := io.Pipe() |
218 pr, pw := io.Pipe() |
218 go tee(pr, w, "C: ") |
219 go tee(pr, w, "C: ") |
219 w = pw |
220 w = pw |
220 } |
221 } |
221 defer tryClose(w, ch) |
222 defer func(w io.Writer) { |
|
223 if c, ok := w.(io.Closer) ; ok { |
|
224 c.Close() |
|
225 } |
|
226 }(w) |
222 |
227 |
223 for obj := range ch { |
228 for obj := range ch { |
224 err := xml.Marshal(w, obj) |
229 err := xml.Marshal(w, obj) |
225 if err != nil { |
230 if err != nil { |
226 if Log != nil { |
231 if Log != nil { |
230 } |
235 } |
231 } |
236 } |
232 } |
237 } |
233 |
238 |
234 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) { |
239 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) { |
235 defer tryClose(srvIn, cliOut) |
240 defer close(cliOut) |
236 |
241 |
237 handlers := make(map[string] func(Stanza) bool) |
242 handlers := make(map[string] func(Stanza) bool) |
|
243 Loop: |
238 for { |
244 for { |
239 select { |
245 select { |
240 case h := <- cl.handlers: |
246 case h := <- cl.handlers: |
241 handlers[h.id] = h.f |
247 handlers[h.id] = h.f |
242 case x := <- srvIn: |
248 case x, ok := <- srvIn: |
243 if x == nil { |
249 if !ok { |
244 break |
250 break Loop |
245 } |
251 } |
246 send := false |
252 send := false |
247 switch obj := x.(type) { |
253 switch obj := x.(type) { |
248 case *stream: |
254 case *stream: |
249 handleStream(obj) |
255 handleStream(obj) |
285 // the app might inject something inappropriate into our negotiations |
291 // the app might inject something inappropriate into our negotiations |
286 // with the server. The control channel controls this loop's |
292 // with the server. The control channel controls this loop's |
287 // activity. |
293 // activity. |
288 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza, |
294 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza, |
289 control <-chan int) { |
295 control <-chan int) { |
290 defer tryClose(srvOut, cliIn) |
296 defer close(srvOut) |
291 |
297 |
292 var input <-chan Stanza |
298 var input <-chan Stanza |
|
299 Loop: |
293 for { |
300 for { |
294 select { |
301 select { |
295 case status := <- control: |
302 case status := <- control: |
296 switch status { |
303 switch status { |
297 case 0: |
304 case 0: |
298 input = nil |
305 input = nil |
299 case 1: |
306 case 1: |
300 input = cliIn |
307 input = cliIn |
301 case -1: |
308 case -1: |
302 break |
309 break Loop |
303 } |
310 } |
304 case x := <- input: |
311 case x, ok := <- input: |
|
312 if !ok { |
|
313 break Loop |
|
314 } |
305 if x == nil { |
315 if x == nil { |
306 if Log != nil { |
316 if Log != nil { |
307 Log.Notice("Refusing to send" + |
317 Log.Notice("Refusing to send" + |
308 " nil stanza") |
318 " nil stanza") |
309 } |
319 } |
317 // Stanzas from the remote go up through a stack of filters to the |
327 // Stanzas from the remote go up through a stack of filters to the |
318 // app. This function manages the filters. |
328 // app. This function manages the filters. |
319 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza, |
329 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza, |
320 topFilter <-chan Stanza, app chan<- Stanza) { |
330 topFilter <-chan Stanza, app chan<- Stanza) { |
321 defer close(app) |
331 defer close(app) |
|
332 Loop: |
322 for { |
333 for { |
323 select { |
334 select { |
324 case newFilterOut := <- filterOut: |
335 case newFilterOut := <- filterOut: |
325 if newFilterOut == nil { |
336 if newFilterOut == nil { |
326 if Log != nil { |
337 if Log != nil { |
332 filterIn <- topFilter |
343 filterIn <- topFilter |
333 topFilter = newFilterOut |
344 topFilter = newFilterOut |
334 |
345 |
335 case data, ok := <-topFilter: |
346 case data, ok := <-topFilter: |
336 if !ok { |
347 if !ok { |
337 break |
348 break Loop |
338 } |
349 } |
339 app <- data |
350 app <- data |
340 } |
351 } |
341 } |
352 } |
342 } |
353 } |
353 |
364 |
354 func (cl *Client) handleStreamError(se *streamError) { |
365 func (cl *Client) handleStreamError(se *streamError) { |
355 if Log != nil { |
366 if Log != nil { |
356 Log.Notice(fmt.Sprintf("Received stream error: %v", se)) |
367 Log.Notice(fmt.Sprintf("Received stream error: %v", se)) |
357 } |
368 } |
358 cl.Close() |
369 close(cl.Out) |
359 } |
370 } |
360 |
371 |
361 func (cl *Client) handleFeatures(fe *Features) { |
372 func (cl *Client) handleFeatures(fe *Features) { |
362 cl.Features = fe |
373 cl.Features = fe |
363 if fe.Starttls != nil { |
374 if fe.Starttls != nil { |