|
1 // This layer of the XMPP protocol reads XMLish structures and |
|
2 // responds to them. It negotiates TLS and authentication. |
|
3 |
|
4 package xmpp |
|
5 |
|
6 import ( |
|
7 "encoding/xml" |
|
8 "crypto/tls" |
|
9 "time" |
|
10 ) |
|
11 |
|
12 // Callback to handle a stanza with a particular id. |
|
13 type stanzaHandler struct { |
|
14 id string |
|
15 // Return true means pass this to the application |
|
16 f func(Stanza) bool |
|
17 } |
|
18 |
|
19 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) { |
|
20 defer close(cliOut) |
|
21 |
|
22 handlers := make(map[string]func(Stanza) bool) |
|
23 Loop: |
|
24 for { |
|
25 select { |
|
26 case h := <-cl.handlers: |
|
27 handlers[h.id] = h.f |
|
28 case x, ok := <-srvIn: |
|
29 if !ok { |
|
30 break Loop |
|
31 } |
|
32 switch obj := x.(type) { |
|
33 case *stream: |
|
34 handleStream(obj) |
|
35 case *streamError: |
|
36 cl.handleStreamError(obj) |
|
37 case *Features: |
|
38 cl.handleFeatures(obj) |
|
39 case *starttls: |
|
40 cl.handleTls(obj) |
|
41 case *auth: |
|
42 cl.handleSasl(obj) |
|
43 case Stanza: |
|
44 send := true |
|
45 id := obj.GetHeader().Id |
|
46 if handlers[id] != nil { |
|
47 f := handlers[id] |
|
48 delete(handlers, id) |
|
49 send = f(obj) |
|
50 } |
|
51 if send { |
|
52 cliOut <- obj |
|
53 } |
|
54 default: |
|
55 Warn.Logf("Unhandled non-stanza: %T %#v", x, x) |
|
56 } |
|
57 } |
|
58 } |
|
59 } |
|
60 |
|
61 // This loop is paused until resource binding is complete. Otherwise |
|
62 // the app might inject something inappropriate into our negotiations |
|
63 // with the server. The control channel controls this loop's |
|
64 // activity. |
|
65 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza, |
|
66 control <-chan int) { |
|
67 defer close(srvOut) |
|
68 |
|
69 var input <-chan Stanza |
|
70 Loop: |
|
71 for { |
|
72 select { |
|
73 case status := <-control: |
|
74 switch status { |
|
75 case 0: |
|
76 input = nil |
|
77 case 1: |
|
78 input = cliIn |
|
79 case -1: |
|
80 break Loop |
|
81 } |
|
82 case x, ok := <-input: |
|
83 if !ok { |
|
84 break Loop |
|
85 } |
|
86 if x == nil { |
|
87 Info.Log("Refusing to send nil stanza") |
|
88 continue |
|
89 } |
|
90 srvOut <- x |
|
91 } |
|
92 } |
|
93 } |
|
94 |
|
95 func handleStream(ss *stream) { |
|
96 } |
|
97 |
|
98 func (cl *Client) handleStreamError(se *streamError) { |
|
99 Info.Logf("Received stream error: %v", se) |
|
100 cl.socket.Close() |
|
101 } |
|
102 |
|
103 func (cl *Client) handleFeatures(fe *Features) { |
|
104 cl.Features = fe |
|
105 if fe.Starttls != nil { |
|
106 start := &starttls{XMLName: xml.Name{Space: NsTLS, |
|
107 Local: "starttls"}} |
|
108 cl.sendXml <- start |
|
109 return |
|
110 } |
|
111 |
|
112 if len(fe.Mechanisms.Mechanism) > 0 { |
|
113 cl.chooseSasl(fe) |
|
114 return |
|
115 } |
|
116 |
|
117 if fe.Bind != nil { |
|
118 cl.bind(fe.Bind) |
|
119 return |
|
120 } |
|
121 } |
|
122 |
|
123 // readTransport() is running concurrently. We need to stop it, |
|
124 // negotiate TLS, then start it again. It calls waitForSocket() in |
|
125 // its inner loop; see below. |
|
126 func (cl *Client) handleTls(t *starttls) { |
|
127 tcp := cl.socket |
|
128 |
|
129 // Set the socket to nil, and wait for the reader routine to |
|
130 // signal that it's paused. |
|
131 cl.socket = nil |
|
132 cl.socketSync.Add(1) |
|
133 cl.socketSync.Wait() |
|
134 |
|
135 // Negotiate TLS with the server. |
|
136 tls := tls.Client(tcp, &cl.tlsConfig) |
|
137 |
|
138 // Make the TLS connection available to the reader, and wait |
|
139 // for it to signal that it's working again. |
|
140 cl.socketSync.Add(1) |
|
141 cl.socket = tls |
|
142 cl.socketSync.Wait() |
|
143 |
|
144 Info.Log("TLS negotiation succeeded.") |
|
145 cl.Features = nil |
|
146 |
|
147 // Now re-send the initial handshake message to start the new |
|
148 // session. |
|
149 hsOut := &stream{To: cl.Jid.Domain, Version: XMPPVersion} |
|
150 cl.sendXml <- hsOut |
|
151 } |
|
152 |
|
153 // Synchronize with handleTls(). Called from readTransport() when |
|
154 // cl.socket is nil. |
|
155 func (cl *Client) waitForSocket() { |
|
156 // Signal that we've stopped reading from the socket. |
|
157 cl.socketSync.Done() |
|
158 |
|
159 // Wait until the socket is available again. |
|
160 for cl.socket == nil { |
|
161 time.Sleep(1e8) |
|
162 } |
|
163 |
|
164 // Signal that we're going back to the read loop. |
|
165 cl.socketSync.Done() |
|
166 } |
|
167 |
|
168 // Register a callback to handle the next XMPP stanza (iq, message, or |
|
169 // presence) with a given id. The provided function will not be called |
|
170 // more than once. If it returns false, the stanza will not be made |
|
171 // available on the normal Client.In channel. The stanza handler |
|
172 // must not read from that channel, as deliveries on it cannot proceed |
|
173 // until the handler returns true or false. |
|
174 func (cl *Client) HandleStanza(id string, f func(Stanza) bool) { |
|
175 h := &stanzaHandler{id: id, f: f} |
|
176 cl.handlers <- h |
|
177 } |
|
178 |
|
179 // Send a request to bind a resource. RFC 3920, section 7. |
|
180 func (cl *Client) bind(bindAdv *bindIq) { |
|
181 res := cl.Jid.Resource |
|
182 bindReq := &bindIq{} |
|
183 if res != "" { |
|
184 bindReq.Resource = &res |
|
185 } |
|
186 msg := &Iq{Header: Header{Type: "set", Id: NextId(), |
|
187 Nested: []interface{}{bindReq}}} |
|
188 f := func(st Stanza) bool { |
|
189 iq, ok := st.(*Iq) |
|
190 if !ok { |
|
191 Warn.Log("non-iq response") |
|
192 } |
|
193 if iq.Type == "error" { |
|
194 Warn.Log("Resource binding failed") |
|
195 return false |
|
196 } |
|
197 var bindRepl *bindIq |
|
198 for _, ele := range iq.Nested { |
|
199 if b, ok := ele.(*bindIq); ok { |
|
200 bindRepl = b |
|
201 break |
|
202 } |
|
203 } |
|
204 if bindRepl == nil { |
|
205 Warn.Logf("Bad bind reply: %#v", iq) |
|
206 return false |
|
207 } |
|
208 jidStr := bindRepl.Jid |
|
209 if jidStr == nil || *jidStr == "" { |
|
210 Warn.Log("Can't bind empty resource") |
|
211 return false |
|
212 } |
|
213 jid := new(JID) |
|
214 if err := jid.Set(*jidStr); err != nil { |
|
215 Warn.Logf("Can't parse JID %s: %s", *jidStr, err) |
|
216 return false |
|
217 } |
|
218 cl.Jid = *jid |
|
219 Info.Logf("Bound resource: %s", cl.Jid.String()) |
|
220 cl.bindDone() |
|
221 return false |
|
222 } |
|
223 cl.HandleStanza(msg.Id, f) |
|
224 cl.sendXml <- msg |
|
225 } |