equal
deleted
inserted
replaced
35 } |
35 } |
36 |
36 |
37 // BUG(cjyar) Review all these *Client receiver methods. They should |
37 // BUG(cjyar) Review all these *Client receiver methods. They should |
38 // probably either all be receivers, or none. |
38 // probably either all be receivers, or none. |
39 |
39 |
40 func (cl *Client) readTransport(w io.Writer) { |
40 func (cl *Client) readTransport(w io.WriteCloser) { |
41 defer fmt.Println("readTransport done") |
41 defer w.Close() |
42 defer tryClose(cl.socket, w) |
|
43 cl.socket.SetReadTimeout(1e8) |
42 cl.socket.SetReadTimeout(1e8) |
44 p := make([]byte, 1024) |
43 p := make([]byte, 1024) |
45 for { |
44 for { |
46 if cl.socket == nil { |
45 if cl.socket == nil { |
47 cl.waitForSocket() |
46 cl.waitForSocket() |
67 } |
66 } |
68 } |
67 } |
69 } |
68 } |
70 |
69 |
71 func (cl *Client) writeTransport(r io.Reader) { |
70 func (cl *Client) writeTransport(r io.Reader) { |
72 defer fmt.Println("writeTransport done") |
71 defer cl.socket.Close() |
73 defer tryClose(r, cl.socket) |
|
74 p := make([]byte, 1024) |
72 p := make([]byte, 1024) |
75 for { |
73 for { |
76 nr, err := r.Read(p) |
74 nr, err := r.Read(p) |
77 if nr == 0 { |
75 if nr == 0 { |
78 if Log != nil { |
76 if Log != nil { |
90 } |
88 } |
91 } |
89 } |
92 |
90 |
93 func readXml(r io.Reader, ch chan<- interface{}, |
91 func readXml(r io.Reader, ch chan<- interface{}, |
94 extStanza map[string] func(*xml.Name) interface{}) { |
92 extStanza map[string] func(*xml.Name) interface{}) { |
95 defer fmt.Println("readXml done") |
|
96 if Loglevel >= syslog.LOG_DEBUG { |
93 if Loglevel >= syslog.LOG_DEBUG { |
97 pr, pw := io.Pipe() |
94 pr, pw := io.Pipe() |
98 go tee(r, pw, "S: ") |
95 go tee(r, pw, "S: ") |
99 r = pr |
96 r = pr |
100 } |
97 } |
101 defer tryClose(r, ch) |
98 defer close(ch) |
102 |
99 |
103 p := xml.NewParser(r) |
100 p := xml.NewParser(r) |
104 Loop: |
101 Loop: |
105 for { |
102 for { |
106 // Sniff the next token on the stream. |
103 // Sniff the next token on the stream. |
220 if Loglevel >= syslog.LOG_DEBUG { |
217 if Loglevel >= syslog.LOG_DEBUG { |
221 pr, pw := io.Pipe() |
218 pr, pw := io.Pipe() |
222 go tee(pr, w, "C: ") |
219 go tee(pr, w, "C: ") |
223 w = pw |
220 w = pw |
224 } |
221 } |
225 defer tryClose(w, ch) |
222 defer func(w io.Writer) { |
|
223 if c, ok := w.(io.Closer) ; ok { |
|
224 c.Close() |
|
225 } |
|
226 }(w) |
226 |
227 |
227 for obj := range ch { |
228 for obj := range ch { |
228 err := xml.Marshal(w, obj) |
229 err := xml.Marshal(w, obj) |
229 if err != nil { |
230 if err != nil { |
230 if Log != nil { |
231 if Log != nil { |
234 } |
235 } |
235 } |
236 } |
236 } |
237 } |
237 |
238 |
238 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) { |
239 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) { |
239 defer fmt.Println("readStream done") |
|
240 defer close(cliOut) |
240 defer close(cliOut) |
241 defer tryClose(srvIn, cliOut) |
|
242 |
241 |
243 handlers := make(map[string] func(Stanza) bool) |
242 handlers := make(map[string] func(Stanza) bool) |
244 Loop: |
243 Loop: |
245 for { |
244 for { |
246 select { |
245 select { |
292 // the app might inject something inappropriate into our negotiations |
291 // the app might inject something inappropriate into our negotiations |
293 // with the server. The control channel controls this loop's |
292 // with the server. The control channel controls this loop's |
294 // activity. |
293 // activity. |
295 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza, |
294 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza, |
296 control <-chan int) { |
295 control <-chan int) { |
297 defer fmt.Println("writeStream done") |
|
298 defer close(srvOut) |
296 defer close(srvOut) |
299 |
297 |
300 var input <-chan Stanza |
298 var input <-chan Stanza |
301 Loop: |
299 Loop: |
302 for { |
300 for { |
310 case -1: |
308 case -1: |
311 break Loop |
309 break Loop |
312 } |
310 } |
313 case x, ok := <- input: |
311 case x, ok := <- input: |
314 if !ok { |
312 if !ok { |
315 fmt.Println("writeStream input closed") |
|
316 break Loop |
313 break Loop |
317 } |
314 } |
318 if x == nil { |
315 if x == nil { |
319 if Log != nil { |
316 if Log != nil { |
320 Log.Notice("Refusing to send" + |
317 Log.Notice("Refusing to send" + |
329 |
326 |
330 // Stanzas from the remote go up through a stack of filters to the |
327 // Stanzas from the remote go up through a stack of filters to the |
331 // app. This function manages the filters. |
328 // app. This function manages the filters. |
332 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza, |
329 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza, |
333 topFilter <-chan Stanza, app chan<- Stanza) { |
330 topFilter <-chan Stanza, app chan<- Stanza) { |
334 defer fmt.Println("filterTop done") |
|
335 defer close(app) |
331 defer close(app) |
336 Loop: |
332 Loop: |
337 for { |
333 for { |
338 select { |
334 select { |
339 case newFilterOut := <- filterOut: |
335 case newFilterOut := <- filterOut: |
355 } |
351 } |
356 } |
352 } |
357 } |
353 } |
358 |
354 |
359 func filterBottom(from <-chan Stanza, to chan<- Stanza) { |
355 func filterBottom(from <-chan Stanza, to chan<- Stanza) { |
360 defer fmt.Println("filterBottom done") |
|
361 defer close(to) |
356 defer close(to) |
362 for data := range(from) { |
357 for data := range(from) { |
363 to <- data |
358 to <- data |
364 } |
359 } |
365 } |
360 } |