equal
deleted
inserted
replaced
71 // asynchronously as new features are received throughout the |
71 // asynchronously as new features are received throughout the |
72 // connection process. It should not be updated once |
72 // connection process. It should not be updated once |
73 // StartSession() returns. |
73 // StartSession() returns. |
74 Features *Features |
74 Features *Features |
75 roster map[string] *RosterItem |
75 roster map[string] *RosterItem |
|
76 filterOut chan<- <-chan Stanza |
|
77 filterIn <-chan <-chan Stanza |
76 } |
78 } |
77 var _ io.Closer = &Client{} |
79 var _ io.Closer = &Client{} |
78 |
80 |
79 // Connect to the appropriate server and authenticate as the given JID |
81 // Connect to the appropriate server and authenticate as the given JID |
80 // with the given password. This function will return as soon as a TCP |
82 // with the given password. This function will return as soon as a TCP |
136 xmlIn := startXmlReader(tlsr, extStanza) |
138 xmlIn := startXmlReader(tlsr, extStanza) |
137 cl.xmlOut = startXmlWriter(tlsw) |
139 cl.xmlOut = startXmlWriter(tlsw) |
138 |
140 |
139 // Start the XMPP stream handler which filters stream-level |
141 // Start the XMPP stream handler which filters stream-level |
140 // events and responds to them. |
142 // events and responds to them. |
141 clIn := cl.startStreamReader(xmlIn, cl.xmlOut) |
143 stIn := cl.startStreamReader(xmlIn, cl.xmlOut) |
142 clOut := cl.startStreamWriter(cl.xmlOut) |
144 clOut := cl.startStreamWriter(cl.xmlOut) |
|
145 |
|
146 // Start the manager for the filters that can modify what the |
|
147 // app sees. |
|
148 clIn := cl.startFilter(stIn) |
143 |
149 |
144 // Initial handshake. |
150 // Initial handshake. |
145 hsOut := &stream{To: jid.Domain, Version: Version} |
151 hsOut := &stream{To: jid.Domain, Version: Version} |
146 cl.xmlOut <- hsOut |
152 cl.xmlOut <- hsOut |
147 |
153 |
185 |
191 |
186 func (cl *Client) startStreamWriter(xmlOut chan<- interface{}) chan<- Stanza { |
192 func (cl *Client) startStreamWriter(xmlOut chan<- interface{}) chan<- Stanza { |
187 ch := make(chan Stanza) |
193 ch := make(chan Stanza) |
188 go writeStream(xmlOut, ch, cl.inputControl) |
194 go writeStream(xmlOut, ch, cl.inputControl) |
189 return ch |
195 return ch |
|
196 } |
|
197 |
|
198 func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza { |
|
199 cliOut := make(chan Stanza) |
|
200 filterOut := make(chan (<-chan Stanza)) |
|
201 filterIn := make(chan (<-chan Stanza)) |
|
202 go filter(srvIn, cliOut, filterOut, filterIn) |
|
203 return cliOut |
190 } |
204 } |
191 |
205 |
192 func tee(r io.Reader, w io.Writer, prefix string) { |
206 func tee(r io.Reader, w io.Writer, prefix string) { |
193 defer tryClose(r, w) |
207 defer tryClose(r, w) |
194 |
208 |
292 if pr != nil { |
306 if pr != nil { |
293 cl.Out <- pr |
307 cl.Out <- pr |
294 } |
308 } |
295 return nil |
309 return nil |
296 } |
310 } |
|
311 |
|
312 // AddFilter adds a new filter to the top of the stack through which |
|
313 // incoming stanzas travel on their way up to the client. The new |
|
314 // filter's output channel is given to this function, and it returns a |
|
315 // new input channel which the filter should read from. When its input |
|
316 // channel closes, the filter should close its output channel. |
|
317 func (cl *Client) AddFilter(out <-chan Stanza) <-chan Stanza { |
|
318 cl.filterOut <- out |
|
319 return <- cl.filterIn |
|
320 } |