64 // Session has started and normal message traffic can be sent |
63 // Session has started and normal message traffic can be sent |
65 // and received. |
64 // and received. |
66 StatusRunning Status = statusRunning |
65 StatusRunning Status = statusRunning |
67 // The session has closed, or is in the process of closing. |
66 // The session has closed, or is in the process of closing. |
68 StatusShutdown Status = statusShutdown |
67 StatusShutdown Status = statusShutdown |
|
68 // The session has encountered an error. Otherwise identical |
|
69 // to StatusShutdown. |
|
70 StatusError Status = statusError |
69 ) |
71 ) |
|
72 |
|
73 func (s Status) fatal() bool { |
|
74 switch s { |
|
75 default: |
|
76 return false |
|
77 case StatusShutdown, StatusError: |
|
78 return true |
|
79 } |
|
80 } |
70 |
81 |
71 // A filter can modify the XMPP traffic to or from the remote |
82 // A filter can modify the XMPP traffic to or from the remote |
72 // server. It's part of an Extension. The filter function will be |
83 // server. It's part of an Extension. The filter function will be |
73 // called in a new goroutine, so it doesn't need to return. The filter |
84 // called in a new goroutine, so it doesn't need to return. The filter |
74 // should close its output when its input is closed. |
85 // should close its output when its input is closed. |
100 // set up the XMPP stream will not appear here. |
111 // set up the XMPP stream will not appear here. |
101 Recv <-chan Stanza |
112 Recv <-chan Stanza |
102 // Outgoing XMPP stanzas to the server should be sent to this |
113 // Outgoing XMPP stanzas to the server should be sent to this |
103 // channel. |
114 // channel. |
104 Send chan<- Stanza |
115 Send chan<- Stanza |
105 sendXml chan<- interface{} |
116 sendRaw chan<- interface{} |
106 statmgr *statmgr |
117 statmgr *statmgr |
107 // The client's roster is also known as the buddy list. It's |
118 // The client's roster is also known as the buddy list. It's |
108 // the set of contacts which are known to this JID, or which |
119 // the set of contacts which are known to this JID, or which |
109 // this JID is known to. |
120 // this JID is known to. |
110 Roster Roster |
121 Roster Roster |
111 // Features advertised by the remote. |
122 // Features advertised by the remote. |
112 Features *Features |
123 Features *Features |
113 sendFilterAdd, recvFilterAdd chan Filter |
124 sendFilterAdd, recvFilterAdd chan Filter |
114 tlsConfig tls.Config |
125 tlsConfig tls.Config |
115 layer1 *layer1 |
126 layer1 *layer1 |
|
127 error chan error |
116 } |
128 } |
117 |
129 |
118 // Creates an XMPP client identified by the given JID, authenticating |
130 // Creates an XMPP client identified by the given JID, authenticating |
119 // with the provided password and TLS config. Zero or more extensions |
131 // with the provided password and TLS config. Zero or more extensions |
120 // may be specified. The initial presence will be broadcast. If status |
132 // may be specified. The initial presence will be broadcast. If status |
134 cl.handlers = make(chan *callback, 100) |
146 cl.handlers = make(chan *callback, 100) |
135 cl.tlsConfig = tlsconf |
147 cl.tlsConfig = tlsconf |
136 cl.sendFilterAdd = make(chan Filter) |
148 cl.sendFilterAdd = make(chan Filter) |
137 cl.recvFilterAdd = make(chan Filter) |
149 cl.recvFilterAdd = make(chan Filter) |
138 cl.statmgr = newStatmgr(status) |
150 cl.statmgr = newStatmgr(status) |
|
151 cl.error = make(chan error, 1) |
139 |
152 |
140 extStanza := make(map[xml.Name]reflect.Type) |
153 extStanza := make(map[xml.Name]reflect.Type) |
141 for _, ext := range exts { |
154 for _, ext := range exts { |
142 for k, v := range ext.StanzaTypes { |
155 for k, v := range ext.StanzaTypes { |
143 if _, ok := extStanza[k]; ok { |
156 if _, ok := extStanza[k]; ok { |
178 cl.setStatus(StatusConnected) |
191 cl.setStatus(StatusConnected) |
179 |
192 |
180 // Start the transport handler, initially unencrypted. |
193 // Start the transport handler, initially unencrypted. |
181 recvReader, recvWriter := io.Pipe() |
194 recvReader, recvWriter := io.Pipe() |
182 sendReader, sendWriter := io.Pipe() |
195 sendReader, sendWriter := io.Pipe() |
183 cl.layer1 = startLayer1(tcp, recvWriter, sendReader, |
196 cl.layer1 = cl.startLayer1(tcp, recvWriter, sendReader, |
184 cl.statmgr.newListener()) |
197 cl.statmgr.newListener()) |
185 |
198 |
186 // Start the reader and writer that convert to and from XML. |
199 // Start the reader and writer that convert to and from XML. |
187 recvXmlCh := make(chan interface{}) |
200 recvXmlCh := make(chan interface{}) |
188 go recvXml(recvReader, recvXmlCh, extStanza) |
201 go cl.recvXml(recvReader, recvXmlCh, extStanza) |
189 sendXmlCh := make(chan interface{}) |
202 sendXmlCh := make(chan interface{}) |
190 cl.sendXml = sendXmlCh |
203 cl.sendRaw = sendXmlCh |
191 go sendXml(sendWriter, sendXmlCh) |
204 go cl.sendXml(sendWriter, sendXmlCh) |
192 |
205 |
193 // Start the reader and writer that convert between XML and |
206 // Start the reader and writer that convert between XML and |
194 // XMPP stanzas. |
207 // XMPP stanzas. |
195 recvRawXmpp := make(chan Stanza) |
208 recvRawXmpp := make(chan Stanza) |
196 go cl.recvStream(recvXmlCh, recvRawXmpp, cl.statmgr.newListener()) |
209 go cl.recvStream(recvXmlCh, recvRawXmpp, cl.statmgr.newListener()) |
211 cl.AddSendFilter(ext.SendFilter) |
224 cl.AddSendFilter(ext.SendFilter) |
212 } |
225 } |
213 |
226 |
214 // Initial handshake. |
227 // Initial handshake. |
215 hsOut := &stream{To: jid.Domain, Version: XMPPVersion} |
228 hsOut := &stream{To: jid.Domain, Version: XMPPVersion} |
216 cl.sendXml <- hsOut |
229 cl.sendRaw <- hsOut |
217 |
230 |
218 // Wait until resource binding is complete. |
231 // Wait until resource binding is complete. |
219 if err := cl.statmgr.awaitStatus(StatusBound); err != nil { |
232 if err := cl.statmgr.awaitStatus(StatusBound); err != nil { |
220 return nil, err |
233 return nil, cl.getError(err) |
221 } |
234 } |
222 |
235 |
223 // Forget about the password, for paranoia's sake. |
236 // Forget about the password, for paranoia's sake. |
224 cl.password = "" |
237 cl.password = "" |
225 |
238 |
229 Nested: []interface{}{Generic{XMLName: xml.Name{Space: NsSession, Local: "session"}}}}} |
242 Nested: []interface{}{Generic{XMLName: xml.Name{Space: NsSession, Local: "session"}}}}} |
230 ch := make(chan error) |
243 ch := make(chan error) |
231 f := func(st Stanza) { |
244 f := func(st Stanza) { |
232 iq, ok := st.(*Iq) |
245 iq, ok := st.(*Iq) |
233 if !ok { |
246 if !ok { |
234 Warn.Log("iq reply not iq; can't start session") |
247 ch <- fmt.Errorf("bad session start reply: %#v", st) |
235 ch <- errors.New("bad session start reply") |
|
236 } |
248 } |
237 if iq.Type == "error" { |
249 if iq.Type == "error" { |
238 Warn.Logf("Can't start session: %v", iq) |
250 ch <- fmt.Errorf("Can't start session: %v", iq.Error) |
239 ch <- iq.Error |
|
240 } |
251 } |
241 ch <- nil |
252 ch <- nil |
242 } |
253 } |
243 cl.SetCallback(id, f) |
254 cl.SetCallback(id, f) |
244 cl.sendXml <- iq |
255 cl.sendRaw <- iq |
245 // Now wait until the callback is called. |
256 // Now wait until the callback is called. |
246 if err := <-ch; err != nil { |
257 if err := <-ch; err != nil { |
247 return nil, err |
258 return nil, cl.getError(err) |
248 } |
259 } |
249 |
260 |
250 // This allows the client to receive stanzas. |
261 // This allows the client to receive stanzas. |
251 cl.setStatus(StatusRunning) |
262 cl.setStatus(StatusRunning) |
252 |
263 |
254 cl.Roster.update() |
265 cl.Roster.update() |
255 |
266 |
256 // Send the initial presence. |
267 // Send the initial presence. |
257 cl.Send <- &pr |
268 cl.Send <- &pr |
258 |
269 |
259 return cl, nil |
270 return cl, cl.getError(nil) |
260 } |
|
261 |
|
262 func tee(r io.Reader, w io.Writer, prefix string) { |
|
263 defer func(w io.Writer) { |
|
264 if c, ok := w.(io.Closer); ok { |
|
265 c.Close() |
|
266 } |
|
267 }(w) |
|
268 |
|
269 buf := bytes.NewBuffer([]uint8(prefix)) |
|
270 for { |
|
271 var c [1]byte |
|
272 n, _ := r.Read(c[:]) |
|
273 if n == 0 { |
|
274 break |
|
275 } |
|
276 n, _ = w.Write(c[:n]) |
|
277 if n == 0 { |
|
278 break |
|
279 } |
|
280 buf.Write(c[:n]) |
|
281 if c[0] == '\n' || c[0] == '>' { |
|
282 Debug.Log(buf) |
|
283 buf = bytes.NewBuffer([]uint8(prefix)) |
|
284 } |
|
285 } |
|
286 leftover := buf.String() |
|
287 if leftover != "" { |
|
288 Debug.Log(buf) |
|
289 } |
|
290 } |
271 } |
291 |
272 |
292 func (cl *Client) Close() { |
273 func (cl *Client) Close() { |
293 // Shuts down the receivers: |
274 // Shuts down the receivers: |
294 cl.setStatus(StatusShutdown) |
275 cl.setStatus(StatusShutdown) |
295 // Shuts down the senders: |
276 // Shuts down the senders: |
296 close(cl.Send) |
277 close(cl.Send) |
297 } |
278 } |
|
279 |
|
280 // If there's a buffered error in the channel, return it. Otherwise, |
|
281 // return what was passed to us. The idea is that the error in the |
|
282 // channel probably preceded (and caused) the one that's passed as an |
|
283 // argument here. |
|
284 func (cl *Client) getError(err1 error) error { |
|
285 select { |
|
286 case err0 := <-cl.error: |
|
287 return err0 |
|
288 default: |
|
289 return err1 |
|
290 } |
|
291 } |
|
292 |
|
293 // Register an error that happened in the internals somewhere. If |
|
294 // there's already an error in the channel, discard the newer one in |
|
295 // favor of the older. |
|
296 func (cl *Client) setError(err error) { |
|
297 cl.Close() |
|
298 cl.setStatus(StatusError) |
|
299 if len(cl.error) > 0 { |
|
300 return |
|
301 } |
|
302 // If we're in a race between two calls to this function, |
|
303 // trying to set the "first" error, just arbitrarily let one |
|
304 // of them win. |
|
305 select { |
|
306 case cl.error <- err: |
|
307 default: |
|
308 } |
|
309 } |