27 |
27 |
28 // Callback to handle a stanza with a particular id. |
28 // Callback to handle a stanza with a particular id. |
29 type stanzaHandler struct { |
29 type stanzaHandler struct { |
30 id string |
30 id string |
31 // Return true means pass this to the application |
31 // Return true means pass this to the application |
32 f func(Stanza) bool |
32 f func(interface{}) bool |
33 } |
33 } |
34 |
34 |
35 // BUG(cjyar) Review all these *Client receiver methods. They should |
35 // BUG(cjyar) Review all these *Client receiver methods. They should |
36 // probably either all be receivers, or none. |
36 // probably either all be receivers, or none. |
37 |
37 |
146 } |
146 } |
147 |
147 |
148 // If it's a Stanza, we try to unmarshal its innerxml |
148 // If it's a Stanza, we try to unmarshal its innerxml |
149 // into objects of the appropriate respective |
149 // into objects of the appropriate respective |
150 // types. This is specified by our extensions. |
150 // types. This is specified by our extensions. |
151 if st, ok := obj.(Stanza); ok { |
151 if st := getStanza(obj) ; st != nil { |
152 err = parseExtended(st, extStanza) |
152 err = parseExtended(st, extStanza) |
153 if err != nil { |
153 if err != nil { |
154 Warn.Logf("ext unmarshal: %s", err) |
154 Warn.Logf("ext unmarshal: %s", err) |
155 break Loop |
155 break Loop |
156 } |
156 } |
159 // Put it on the channel. |
159 // Put it on the channel. |
160 ch <- obj |
160 ch <- obj |
161 } |
161 } |
162 } |
162 } |
163 |
163 |
164 func parseExtended(st Stanza, extStanza map[string]func(*xml.Name) interface{}) error { |
164 func parseExtended(st *Stanza, extStanza map[string]func(*xml.Name) interface{}) error { |
165 // Now parse the stanza's innerxml to find the string that we |
165 // Now parse the stanza's innerxml to find the string that we |
166 // can unmarshal this nested element from. |
166 // can unmarshal this nested element from. |
167 reader := strings.NewReader(st.innerxml()) |
167 reader := strings.NewReader(st.Innerxml) |
168 p := xml.NewDecoder(reader) |
168 p := xml.NewDecoder(reader) |
169 for { |
169 for { |
170 t, err := p.Token() |
170 t, err := p.Token() |
171 if err == io.EOF { |
171 if err == io.EOF { |
172 break |
172 break |
223 } |
223 } |
224 } |
224 } |
225 } |
225 } |
226 } |
226 } |
227 |
227 |
228 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) { |
228 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- interface{}) { |
229 defer close(cliOut) |
229 defer close(cliOut) |
230 |
230 |
231 handlers := make(map[string]func(Stanza) bool) |
231 handlers := make(map[string]func(interface{}) bool) |
232 Loop: |
232 Loop: |
233 for { |
233 for { |
234 select { |
234 select { |
235 case h := <-cl.handlers: |
235 case h := <-cl.handlers: |
236 handlers[h.id] = h.f |
236 handlers[h.id] = h.f |
237 case x, ok := <-srvIn: |
237 case x, ok := <-srvIn: |
238 if !ok { |
238 if !ok { |
239 break Loop |
239 break Loop |
240 } |
240 } |
241 send := false |
241 var st *Stanza |
242 switch obj := x.(type) { |
242 switch obj := x.(type) { |
243 case *stream: |
243 case *stream: |
244 handleStream(obj) |
244 handleStream(obj) |
245 case *streamError: |
245 case *streamError: |
246 cl.handleStreamError(obj) |
246 cl.handleStreamError(obj) |
248 cl.handleFeatures(obj) |
248 cl.handleFeatures(obj) |
249 case *starttls: |
249 case *starttls: |
250 cl.handleTls(obj) |
250 cl.handleTls(obj) |
251 case *auth: |
251 case *auth: |
252 cl.handleSasl(obj) |
252 cl.handleSasl(obj) |
|
253 case *Iq, *Message, *Presence: |
|
254 st = getStanza(obj) |
253 default: |
255 default: |
254 send = true |
256 Warn.Logf("Unhandled non-stanza: %T %#v", x, x) |
255 } |
257 } |
256 if !send { |
258 |
|
259 if st == nil { |
257 continue |
260 continue |
258 } |
261 } |
259 st, ok := x.(Stanza) |
262 |
260 if !ok { |
263 send := true |
261 Warn.Logf("Unhandled non-stanza: %v", x) |
264 if handlers[st.Id] != nil { |
262 continue |
265 f := handlers[st.Id] |
263 } |
266 delete(handlers, st.Id) |
264 if handlers[st.GetId()] != nil { |
267 send = f(x) |
265 f := handlers[st.GetId()] |
|
266 delete(handlers, st.GetId()) |
|
267 send = f(st) |
|
268 } |
268 } |
269 if send { |
269 if send { |
270 cliOut <- st |
270 cliOut <- x |
271 } |
271 } |
272 } |
272 } |
273 } |
273 } |
274 } |
274 } |
275 |
275 |
276 // This loop is paused until resource binding is complete. Otherwise |
276 // This loop is paused until resource binding is complete. Otherwise |
277 // the app might inject something inappropriate into our negotiations |
277 // the app might inject something inappropriate into our negotiations |
278 // with the server. The control channel controls this loop's |
278 // with the server. The control channel controls this loop's |
279 // activity. |
279 // activity. |
280 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza, |
280 func writeStream(srvOut chan<- interface{}, cliIn <-chan interface{}, |
281 control <-chan int) { |
281 control <-chan int) { |
282 defer close(srvOut) |
282 defer close(srvOut) |
283 |
283 |
284 var input <-chan Stanza |
284 var input <-chan interface{} |
285 Loop: |
285 Loop: |
286 for { |
286 for { |
287 select { |
287 select { |
288 case status := <-control: |
288 case status := <-control: |
289 switch status { |
289 switch status { |
307 } |
307 } |
308 } |
308 } |
309 |
309 |
310 // Stanzas from the remote go up through a stack of filters to the |
310 // Stanzas from the remote go up through a stack of filters to the |
311 // app. This function manages the filters. |
311 // app. This function manages the filters. |
312 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza, |
312 func filterTop(filterOut <-chan <-chan interface{}, filterIn chan<- <-chan interface{}, |
313 topFilter <-chan Stanza, app chan<- Stanza) { |
313 topFilter <-chan interface{}, app chan<- interface{}) { |
314 defer close(app) |
314 defer close(app) |
315 Loop: |
315 Loop: |
316 for { |
316 for { |
317 select { |
317 select { |
318 case newFilterOut := <-filterOut: |
318 case newFilterOut := <-filterOut: |
594 res := cl.Jid.Resource |
594 res := cl.Jid.Resource |
595 bindReq := &bindIq{} |
595 bindReq := &bindIq{} |
596 if res != "" { |
596 if res != "" { |
597 bindReq.Resource = &res |
597 bindReq.Resource = &res |
598 } |
598 } |
599 msg := &Iq{Type: "set", Id: <-Id, Nested: []interface{}{bindReq}} |
599 msg := &Iq{Stanza: Stanza{Type: "set", Id: <-Id, |
600 f := func(st Stanza) bool { |
600 Nested: []interface{}{bindReq}}} |
|
601 f := func(st interface{}) bool { |
601 iq, ok := st.(*Iq) |
602 iq, ok := st.(*Iq) |
602 if !ok { |
603 if !ok { |
603 Warn.Log("non-iq response") |
604 Warn.Log("non-iq response") |
604 } |
605 } |
605 if iq.Type == "error" { |
606 if iq.Type == "error" { |
640 // presence) with a given id. The provided function will not be called |
641 // presence) with a given id. The provided function will not be called |
641 // more than once. If it returns false, the stanza will not be made |
642 // more than once. If it returns false, the stanza will not be made |
642 // available on the normal Client.In channel. The stanza handler |
643 // available on the normal Client.In channel. The stanza handler |
643 // must not read from that channel, as deliveries on it cannot proceed |
644 // must not read from that channel, as deliveries on it cannot proceed |
644 // until the handler returns true or false. |
645 // until the handler returns true or false. |
645 func (cl *Client) HandleStanza(id string, f func(Stanza) bool) { |
646 func (cl *Client) HandleStanza(id string, f func(interface{}) bool) { |
646 h := &stanzaHandler{id: id, f: f} |
647 h := &stanzaHandler{id: id, f: f} |
647 cl.handlers <- h |
648 cl.handlers <- h |
648 } |
649 } |