equal
deleted
inserted
replaced
36 |
36 |
37 // BUG(cjyar) Review all these *Client receiver methods. They should |
37 // BUG(cjyar) Review all these *Client receiver methods. They should |
38 // probably either all be receivers, or none. |
38 // probably either all be receivers, or none. |
39 |
39 |
40 func (cl *Client) readTransport(w io.Writer) { |
40 func (cl *Client) readTransport(w io.Writer) { |
|
41 defer fmt.Println("readTransport done") |
41 defer tryClose(cl.socket, w) |
42 defer tryClose(cl.socket, w) |
42 cl.socket.SetReadTimeout(1e8) |
43 cl.socket.SetReadTimeout(1e8) |
43 p := make([]byte, 1024) |
44 p := make([]byte, 1024) |
44 for { |
45 for { |
45 if cl.socket == nil { |
46 if cl.socket == nil { |
66 } |
67 } |
67 } |
68 } |
68 } |
69 } |
69 |
70 |
70 func (cl *Client) writeTransport(r io.Reader) { |
71 func (cl *Client) writeTransport(r io.Reader) { |
|
72 defer fmt.Println("writeTransport done") |
71 defer tryClose(r, cl.socket) |
73 defer tryClose(r, cl.socket) |
72 p := make([]byte, 1024) |
74 p := make([]byte, 1024) |
73 for { |
75 for { |
74 nr, err := r.Read(p) |
76 nr, err := r.Read(p) |
75 if nr == 0 { |
77 if nr == 0 { |
88 } |
90 } |
89 } |
91 } |
90 |
92 |
91 func readXml(r io.Reader, ch chan<- interface{}, |
93 func readXml(r io.Reader, ch chan<- interface{}, |
92 extStanza map[string] func(*xml.Name) interface{}) { |
94 extStanza map[string] func(*xml.Name) interface{}) { |
|
95 defer fmt.Println("readXml done") |
93 if Loglevel >= syslog.LOG_DEBUG { |
96 if Loglevel >= syslog.LOG_DEBUG { |
94 pr, pw := io.Pipe() |
97 pr, pw := io.Pipe() |
95 go tee(r, pw, "S: ") |
98 go tee(r, pw, "S: ") |
96 r = pr |
99 r = pr |
97 } |
100 } |
98 defer tryClose(r, ch) |
101 defer tryClose(r, ch) |
99 |
102 |
100 p := xml.NewParser(r) |
103 p := xml.NewParser(r) |
|
104 Loop: |
101 for { |
105 for { |
102 // Sniff the next token on the stream. |
106 // Sniff the next token on the stream. |
103 t, err := p.Token() |
107 t, err := p.Token() |
104 if t == nil { |
108 if t == nil { |
105 if err != os.EOF { |
109 if err != os.EOF { |
123 if err != nil { |
127 if err != nil { |
124 if Log != nil { |
128 if Log != nil { |
125 Log.Err("unmarshal stream: " + |
129 Log.Err("unmarshal stream: " + |
126 err.String()) |
130 err.String()) |
127 } |
131 } |
128 break |
132 break Loop |
129 } |
133 } |
130 ch <- st |
134 ch <- st |
131 continue |
135 continue |
132 case "stream error", NsStream + " error": |
136 case "stream error", NsStream + " error": |
133 obj = &streamError{} |
137 obj = &streamError{} |
156 err = p.Unmarshal(obj, &se) |
160 err = p.Unmarshal(obj, &se) |
157 if err != nil { |
161 if err != nil { |
158 if Log != nil { |
162 if Log != nil { |
159 Log.Err("unmarshal: " + err.String()) |
163 Log.Err("unmarshal: " + err.String()) |
160 } |
164 } |
161 break |
165 break Loop |
162 } |
166 } |
163 |
167 |
164 // If it's a Stanza, we try to unmarshal its innerxml |
168 // If it's a Stanza, we try to unmarshal its innerxml |
165 // into objects of the appropriate respective |
169 // into objects of the appropriate respective |
166 // types. This is specified by our extensions. |
170 // types. This is specified by our extensions. |
169 if err != nil { |
173 if err != nil { |
170 if Log != nil { |
174 if Log != nil { |
171 Log.Err("ext unmarshal: " + |
175 Log.Err("ext unmarshal: " + |
172 err.String()) |
176 err.String()) |
173 } |
177 } |
174 break |
178 break Loop |
175 } |
179 } |
176 } |
180 } |
177 |
181 |
178 // Put it on the channel. |
182 // Put it on the channel. |
179 ch <- obj |
183 ch <- obj |
230 } |
234 } |
231 } |
235 } |
232 } |
236 } |
233 |
237 |
234 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) { |
238 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) { |
|
239 defer fmt.Println("readStream done") |
|
240 defer close(cliOut) |
235 defer tryClose(srvIn, cliOut) |
241 defer tryClose(srvIn, cliOut) |
236 |
242 |
237 handlers := make(map[string] func(Stanza) bool) |
243 handlers := make(map[string] func(Stanza) bool) |
|
244 Loop: |
238 for { |
245 for { |
239 select { |
246 select { |
240 case h := <- cl.handlers: |
247 case h := <- cl.handlers: |
241 handlers[h.id] = h.f |
248 handlers[h.id] = h.f |
242 case x := <- srvIn: |
249 case x, ok := <- srvIn: |
243 if x == nil { |
250 if !ok { |
244 break |
251 break Loop |
245 } |
252 } |
246 send := false |
253 send := false |
247 switch obj := x.(type) { |
254 switch obj := x.(type) { |
248 case *stream: |
255 case *stream: |
249 handleStream(obj) |
256 handleStream(obj) |
285 // the app might inject something inappropriate into our negotiations |
292 // the app might inject something inappropriate into our negotiations |
286 // with the server. The control channel controls this loop's |
293 // with the server. The control channel controls this loop's |
287 // activity. |
294 // activity. |
288 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza, |
295 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza, |
289 control <-chan int) { |
296 control <-chan int) { |
290 defer tryClose(srvOut, cliIn) |
297 defer fmt.Println("writeStream done") |
|
298 defer close(srvOut) |
291 |
299 |
292 var input <-chan Stanza |
300 var input <-chan Stanza |
|
301 Loop: |
293 for { |
302 for { |
294 select { |
303 select { |
295 case status := <- control: |
304 case status := <- control: |
296 switch status { |
305 switch status { |
297 case 0: |
306 case 0: |
298 input = nil |
307 input = nil |
299 case 1: |
308 case 1: |
300 input = cliIn |
309 input = cliIn |
301 case -1: |
310 case -1: |
302 break |
311 break Loop |
303 } |
312 } |
304 case x := <- input: |
313 case x, ok := <- input: |
|
314 if !ok { |
|
315 fmt.Println("writeStream input closed") |
|
316 break Loop |
|
317 } |
305 if x == nil { |
318 if x == nil { |
306 if Log != nil { |
319 if Log != nil { |
307 Log.Notice("Refusing to send" + |
320 Log.Notice("Refusing to send" + |
308 " nil stanza") |
321 " nil stanza") |
309 } |
322 } |
316 |
329 |
317 // Stanzas from the remote go up through a stack of filters to the |
330 // Stanzas from the remote go up through a stack of filters to the |
318 // app. This function manages the filters. |
331 // app. This function manages the filters. |
319 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza, |
332 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza, |
320 topFilter <-chan Stanza, app chan<- Stanza) { |
333 topFilter <-chan Stanza, app chan<- Stanza) { |
|
334 defer fmt.Println("filterTop done") |
321 defer close(app) |
335 defer close(app) |
|
336 Loop: |
322 for { |
337 for { |
323 select { |
338 select { |
324 case newFilterOut := <- filterOut: |
339 case newFilterOut := <- filterOut: |
325 if newFilterOut == nil { |
340 if newFilterOut == nil { |
326 if Log != nil { |
341 if Log != nil { |
332 filterIn <- topFilter |
347 filterIn <- topFilter |
333 topFilter = newFilterOut |
348 topFilter = newFilterOut |
334 |
349 |
335 case data, ok := <-topFilter: |
350 case data, ok := <-topFilter: |
336 if !ok { |
351 if !ok { |
337 break |
352 break Loop |
338 } |
353 } |
339 app <- data |
354 app <- data |
340 } |
355 } |
341 } |
356 } |
342 } |
357 } |
343 |
358 |
344 func filterBottom(from <-chan Stanza, to chan<- Stanza) { |
359 func filterBottom(from <-chan Stanza, to chan<- Stanza) { |
|
360 defer fmt.Println("filterBottom done") |
345 defer close(to) |
361 defer close(to) |
346 for data := range(from) { |
362 for data := range(from) { |
347 to <- data |
363 to <- data |
348 } |
364 } |
349 } |
365 } |
353 |
369 |
354 func (cl *Client) handleStreamError(se *streamError) { |
370 func (cl *Client) handleStreamError(se *streamError) { |
355 if Log != nil { |
371 if Log != nil { |
356 Log.Notice(fmt.Sprintf("Received stream error: %v", se)) |
372 Log.Notice(fmt.Sprintf("Received stream error: %v", se)) |
357 } |
373 } |
358 cl.Close() |
374 close(cl.Out) |
359 } |
375 } |
360 |
376 |
361 func (cl *Client) handleFeatures(fe *Features) { |
377 func (cl *Client) handleFeatures(fe *Features) { |
362 cl.Features = fe |
378 cl.Features = fe |
363 if fe.Starttls != nil { |
379 if fe.Starttls != nil { |
396 // Make the TLS connection available to the reader, and wait |
412 // Make the TLS connection available to the reader, and wait |
397 // for it to signal that it's working again. |
413 // for it to signal that it's working again. |
398 cl.socketSync.Add(1) |
414 cl.socketSync.Add(1) |
399 cl.socket = tls |
415 cl.socket = tls |
400 cl.socketSync.Wait() |
416 cl.socketSync.Wait() |
401 |
|
402 // Reset the read timeout on the (underlying) socket so the |
|
403 // reader doesn't get woken up unnecessarily. |
|
404 tcp.SetReadTimeout(0) |
|
405 |
417 |
406 if Log != nil { |
418 if Log != nil { |
407 Log.Info("TLS negotiation succeeded.") |
419 Log.Info("TLS negotiation succeeded.") |
408 } |
420 } |
409 cl.Features = nil |
421 cl.Features = nil |