187 cl.xmlOut <- hsOut |
187 cl.xmlOut <- hsOut |
188 |
188 |
189 return cl, nil |
189 return cl, nil |
190 } |
190 } |
191 |
191 |
192 func (cl *Client) startTransport() (io.Reader, io.Writer) { |
192 func (cl *Client) startTransport() (io.Reader, io.WriteCloser) { |
193 inr, inw := io.Pipe() |
193 inr, inw := io.Pipe() |
194 outr, outw := io.Pipe() |
194 outr, outw := io.Pipe() |
195 go cl.readTransport(inw) |
195 go cl.readTransport(inw) |
196 go cl.writeTransport(outr) |
196 go cl.writeTransport(outr) |
197 return inr, outw |
197 return inr, outw |
221 go writeStream(xmlOut, ch, cl.inputControl) |
221 go writeStream(xmlOut, ch, cl.inputControl) |
222 return ch |
222 return ch |
223 } |
223 } |
224 |
224 |
225 func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza { |
225 func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza { |
226 cliOut := make(chan Stanza) |
226 cliIn := make(chan Stanza) |
227 filterOut := make(chan (<-chan Stanza)) |
227 filterOut := make(chan (<-chan Stanza)) |
228 filterIn := make(chan (<-chan Stanza)) |
228 filterIn := make(chan (<-chan Stanza)) |
229 nullFilter := make(chan Stanza) |
229 nullFilter := make(chan Stanza) |
230 go filterBottom(srvIn, nullFilter) |
230 go filterBottom(srvIn, nullFilter) |
231 go filterTop(filterOut, filterIn, nullFilter, cliOut) |
231 go filterTop(filterOut, filterIn, nullFilter, cliIn) |
232 cl.filterOut = filterOut |
232 cl.filterOut = filterOut |
233 cl.filterIn = filterIn |
233 cl.filterIn = filterIn |
234 return cliOut |
234 return cliIn |
235 } |
235 } |
236 |
236 |
237 func tee(r io.Reader, w io.Writer, prefix string) { |
237 func tee(r io.Reader, w io.Writer, prefix string) { |
238 defer tryClose(r, w) |
238 defer func(w io.Writer) { |
|
239 if c, ok := w.(io.Closer) ; ok { |
|
240 c.Close() |
|
241 } |
|
242 }(w) |
239 |
243 |
240 buf := bytes.NewBuffer([]uint8(prefix)) |
244 buf := bytes.NewBuffer([]uint8(prefix)) |
241 for { |
245 for { |
242 var c [1]byte |
246 var c [1]byte |
243 n, _ := r.Read(c[:]) |
247 n, _ := r.Read(c[:]) |
255 } |
259 } |
256 } |
260 } |
257 leftover := buf.String() |
261 leftover := buf.String() |
258 if leftover != "" { |
262 if leftover != "" { |
259 Log.Debug(buf.String()) |
263 Log.Debug(buf.String()) |
260 } |
|
261 } |
|
262 |
|
263 func tryClose(xs ...interface{}) { |
|
264 f1 := func(ch chan<- interface{}) { |
|
265 defer func() { |
|
266 recover() |
|
267 }() |
|
268 close(ch) |
|
269 } |
|
270 f2 := func(ch <-chan interface{}) { |
|
271 defer func() { |
|
272 recover() |
|
273 }() |
|
274 close(ch) |
|
275 } |
|
276 |
|
277 for _, x := range xs { |
|
278 if c, ok := x.(io.Closer) ; ok { |
|
279 c.Close() |
|
280 } else if ch, ok := x.(chan<- interface{}) ; ok { |
|
281 f1(ch) |
|
282 } else if ch, ok := x.(<-chan interface{}) ; ok { |
|
283 f2(ch) |
|
284 } |
|
285 } |
264 } |
286 } |
265 } |
287 |
266 |
288 // bindDone is called when we've finished resource binding (and all |
267 // bindDone is called when we've finished resource binding (and all |
289 // the negotiations that precede it). Now we can start accepting |
268 // the negotiations that precede it). Now we can start accepting |