34 // DNS SRV names |
33 // DNS SRV names |
35 serverSrv = "xmpp-server" |
34 serverSrv = "xmpp-server" |
36 clientSrv = "xmpp-client" |
35 clientSrv = "xmpp-client" |
37 ) |
36 ) |
38 |
37 |
39 // Status of the connection. |
|
40 type Status int |
|
41 |
|
42 const ( |
|
43 statusUnconnected = iota |
|
44 statusConnected |
|
45 statusConnectedTls |
|
46 statusAuthenticated |
|
47 statusBound |
|
48 statusRunning |
|
49 statusShutdown |
|
50 ) |
|
51 |
|
52 var ( |
|
53 // The client has not yet connected, or it has been |
|
54 // disconnected from the server. |
|
55 StatusUnconnected Status = statusUnconnected |
|
56 // Initial connection established. |
|
57 StatusConnected Status = statusConnected |
|
58 // Like StatusConnected, but with TLS. |
|
59 StatusConnectedTls Status = statusConnectedTls |
|
60 // Authentication succeeded. |
|
61 StatusAuthenticated Status = statusAuthenticated |
|
62 // Resource binding complete. |
|
63 StatusBound Status = statusBound |
|
64 // Session has started and normal message traffic can be sent |
|
65 // and received. |
|
66 StatusRunning Status = statusRunning |
|
67 // The session has closed, or is in the process of closing. |
|
68 StatusShutdown Status = statusShutdown |
|
69 ) |
|
70 |
|
71 // A filter can modify the XMPP traffic to or from the remote |
38 // A filter can modify the XMPP traffic to or from the remote |
72 // server. It's part of an Extension. The filter function will be |
39 // server. It's part of an Extension. The filter function will be |
73 // called in a new goroutine, so it doesn't need to return. The filter |
40 // called in a new goroutine, so it doesn't need to return. The filter |
74 // should close its output when its input is closed. |
41 // should close its output when its input is closed. |
75 type Filter func(in <-chan Stanza, out chan<- Stanza) |
42 type Filter func(in <-chan Stanza, out chan<- Stanza) |
76 |
43 |
77 // Extensions can add stanza filters and/or new XML element types. |
44 // Extensions can add stanza filters and/or new XML element types. |
78 type Extension struct { |
45 type Extension struct { |
79 // Maps from an XML namespace to a function which constructs a |
46 // Maps from an XML name to a structure which holds stanza |
80 // structure to hold the contents of stanzas in that |
47 // contents with that name. |
81 // namespace. |
48 StanzaTypes map[xml.Name]reflect.Type |
82 StanzaHandlers map[xml.Name]reflect.Type |
|
83 // If non-nil, will be called once to start the filter |
49 // If non-nil, will be called once to start the filter |
84 // running. RecvFilter intercepts incoming messages on their |
50 // running. RecvFilter intercepts incoming messages on their |
85 // way from the remote server to the application; SendFilter |
51 // way from the remote server to the application; SendFilter |
86 // intercepts messages going the other direction. |
52 // intercepts messages going the other direction. |
87 RecvFilter Filter |
53 RecvFilter Filter |
99 // Incoming XMPP stanzas from the remote will be published on |
65 // Incoming XMPP stanzas from the remote will be published on |
100 // this channel. Information which is used by this library to |
66 // this channel. Information which is used by this library to |
101 // set up the XMPP stream will not appear here. |
67 // set up the XMPP stream will not appear here. |
102 Recv <-chan Stanza |
68 Recv <-chan Stanza |
103 // Outgoing XMPP stanzas to the server should be sent to this |
69 // Outgoing XMPP stanzas to the server should be sent to this |
104 // channel. |
70 // channel. The application should not close this channel; |
|
71 // rather, call Close(). |
105 Send chan<- Stanza |
72 Send chan<- Stanza |
106 sendXml chan<- interface{} |
73 sendRaw chan<- interface{} |
107 statmgr *statmgr |
74 statmgr *statmgr |
108 // The client's roster is also known as the buddy list. It's |
75 // The client's roster is also known as the buddy list. It's |
109 // the set of contacts which are known to this JID, or which |
76 // the set of contacts which are known to this JID, or which |
110 // this JID is known to. |
77 // this JID is known to. |
111 Roster Roster |
78 Roster Roster |
112 // Features advertised by the remote. |
79 // Features advertised by the remote. |
113 Features *Features |
80 Features *Features |
114 sendFilterAdd, recvFilterAdd chan Filter |
81 sendFilterAdd, recvFilterAdd chan Filter |
115 tlsConfig tls.Config |
82 tlsConfig tls.Config |
116 layer1 *layer1 |
83 layer1 *layer1 |
|
84 error chan error |
|
85 shutdownOnce sync.Once |
117 } |
86 } |
118 |
87 |
119 // Creates an XMPP client identified by the given JID, authenticating |
88 // Creates an XMPP client identified by the given JID, authenticating |
120 // with the provided password and TLS config. Zero or more extensions |
89 // with the provided password and TLS config. Zero or more extensions |
121 // may be specified. The initial presence will be broadcast. If status |
90 // may be specified. The initial presence will be broadcast. If status |
122 // is non-nil, connection progress information will be sent on it. |
91 // is non-nil, connection progress information will be sent on it. |
123 func NewClient(jid *JID, password string, tlsconf tls.Config, exts []Extension, |
92 func NewClient(jid *JID, password string, tlsconf tls.Config, exts []Extension, |
124 pr Presence, status chan<- Status) (*Client, error) { |
93 pr Presence, status chan<- Status) (*Client, error) { |
|
94 |
|
95 // Resolve the domain in the JID. |
|
96 _, srvs, err := net.LookupSRV(clientSrv, "tcp", jid.Domain()) |
|
97 if err != nil { |
|
98 return nil, fmt.Errorf("LookupSrv %s: %v", jid.Domain, err) |
|
99 } |
|
100 if len(srvs) == 0 { |
|
101 return nil, fmt.Errorf("LookupSrv %s: no results", jid.Domain) |
|
102 } |
|
103 |
|
104 var tcp *net.TCPConn |
|
105 for _, srv := range srvs { |
|
106 addrStr := fmt.Sprintf("%s:%d", srv.Target, srv.Port) |
|
107 var addr *net.TCPAddr |
|
108 addr, err = net.ResolveTCPAddr("tcp", addrStr) |
|
109 if err != nil { |
|
110 err = fmt.Errorf("ResolveTCPAddr(%s): %s", |
|
111 addrStr, err.Error()) |
|
112 continue |
|
113 } |
|
114 tcp, err = net.DialTCP("tcp", nil, addr) |
|
115 if tcp != nil { |
|
116 break |
|
117 } |
|
118 } |
|
119 if tcp == nil { |
|
120 return nil, err |
|
121 } |
|
122 |
|
123 return newClient(tcp, jid, password, tlsconf, exts, pr, status) |
|
124 } |
|
125 |
|
126 // Connect to the specified host and port. This is otherwise identical |
|
127 // to NewClient. |
|
128 func NewClientFromHost(jid *JID, password string, tlsconf tls.Config, |
|
129 exts []Extension, pr Presence, status chan<- Status, host string, |
|
130 port int) (*Client, error) { |
|
131 |
|
132 addrStr := fmt.Sprintf("%s:%d", host, port) |
|
133 addr, err := net.ResolveTCPAddr("tcp", addrStr) |
|
134 if err != nil { |
|
135 return nil, err |
|
136 } |
|
137 tcp, err := net.DialTCP("tcp", nil, addr) |
|
138 if err != nil { |
|
139 return nil, err |
|
140 } |
|
141 |
|
142 return newClient(tcp, jid, password, tlsconf, exts, pr, status) |
|
143 } |
|
144 |
|
145 func newClient(tcp *net.TCPConn, jid *JID, password string, tlsconf tls.Config, |
|
146 exts []Extension, pr Presence, status chan<- Status) (*Client, error) { |
125 |
147 |
126 // Include the mandatory extensions. |
148 // Include the mandatory extensions. |
127 roster := newRosterExt() |
149 roster := newRosterExt() |
128 exts = append(exts, roster.Extension) |
150 exts = append(exts, roster.Extension) |
129 exts = append(exts, bindExt) |
151 exts = append(exts, bindExt) |
135 cl.handlers = make(chan *callback, 100) |
157 cl.handlers = make(chan *callback, 100) |
136 cl.tlsConfig = tlsconf |
158 cl.tlsConfig = tlsconf |
137 cl.sendFilterAdd = make(chan Filter) |
159 cl.sendFilterAdd = make(chan Filter) |
138 cl.recvFilterAdd = make(chan Filter) |
160 cl.recvFilterAdd = make(chan Filter) |
139 cl.statmgr = newStatmgr(status) |
161 cl.statmgr = newStatmgr(status) |
|
162 cl.error = make(chan error, 1) |
140 |
163 |
141 extStanza := make(map[xml.Name]reflect.Type) |
164 extStanza := make(map[xml.Name]reflect.Type) |
142 for _, ext := range exts { |
165 for _, ext := range exts { |
143 for k, v := range ext.StanzaHandlers { |
166 for k, v := range ext.StanzaTypes { |
144 if _, ok := extStanza[k]; ok { |
167 if _, ok := extStanza[k]; ok { |
145 return nil, fmt.Errorf("duplicate handler %s", |
168 return nil, fmt.Errorf("duplicate handler %s", |
146 k) |
169 k) |
147 } |
170 } |
148 extStanza[k] = v |
171 extStanza[k] = v |
149 } |
172 } |
150 } |
173 } |
151 |
174 |
152 // Resolve the domain in the JID. |
175 // The thing that called this made a TCP connection, so now we |
153 _, srvs, err := net.LookupSRV(clientSrv, "tcp", jid.Domain) |
176 // can signal that it's connected. |
154 if err != nil { |
|
155 return nil, fmt.Errorf("LookupSrv %s: %v", jid.Domain, err) |
|
156 } |
|
157 if len(srvs) == 0 { |
|
158 return nil, fmt.Errorf("LookupSrv %s: no results", jid.Domain) |
|
159 } |
|
160 |
|
161 var tcp *net.TCPConn |
|
162 for _, srv := range srvs { |
|
163 addrStr := fmt.Sprintf("%s:%d", srv.Target, srv.Port) |
|
164 var addr *net.TCPAddr |
|
165 addr, err = net.ResolveTCPAddr("tcp", addrStr) |
|
166 if err != nil { |
|
167 err = fmt.Errorf("ResolveTCPAddr(%s): %s", |
|
168 addrStr, err.Error()) |
|
169 continue |
|
170 } |
|
171 tcp, err = net.DialTCP("tcp", nil, addr) |
|
172 if tcp != nil { |
|
173 break |
|
174 } |
|
175 } |
|
176 if tcp == nil { |
|
177 return nil, err |
|
178 } |
|
179 cl.setStatus(StatusConnected) |
177 cl.setStatus(StatusConnected) |
180 |
178 |
181 // Start the transport handler, initially unencrypted. |
179 // Start the transport handler, initially unencrypted. |
182 recvReader, recvWriter := io.Pipe() |
180 recvReader, recvWriter := io.Pipe() |
183 sendReader, sendWriter := io.Pipe() |
181 sendReader, sendWriter := io.Pipe() |
184 cl.layer1 = startLayer1(tcp, recvWriter, sendReader, |
182 cl.layer1 = cl.startLayer1(tcp, recvWriter, sendReader, |
185 cl.statmgr.newListener()) |
183 cl.statmgr.newListener()) |
186 |
184 |
187 // Start the reader and writer that convert to and from XML. |
185 // Start the reader and writer that convert to and from XML. |
188 recvXmlCh := make(chan interface{}) |
186 recvXmlCh := make(chan interface{}) |
189 go recvXml(recvReader, recvXmlCh, extStanza) |
187 go cl.recvXml(recvReader, recvXmlCh, extStanza) |
190 sendXmlCh := make(chan interface{}) |
188 sendXmlCh := make(chan interface{}) |
191 cl.sendXml = sendXmlCh |
189 cl.sendRaw = sendXmlCh |
192 go sendXml(sendWriter, sendXmlCh) |
190 go cl.sendXml(sendWriter, sendXmlCh) |
193 |
191 |
194 // Start the reader and writer that convert between XML and |
192 // Start the reader and writer that convert between XML and |
195 // XMPP stanzas. |
193 // XMPP stanzas. |
196 recvRawXmpp := make(chan Stanza) |
194 recvRawXmpp := make(chan Stanza) |
197 go cl.recvStream(recvXmlCh, recvRawXmpp, cl.statmgr.newListener()) |
195 go cl.recvStream(recvXmlCh, recvRawXmpp, cl.statmgr.newListener()) |
211 cl.AddRecvFilter(ext.RecvFilter) |
209 cl.AddRecvFilter(ext.RecvFilter) |
212 cl.AddSendFilter(ext.SendFilter) |
210 cl.AddSendFilter(ext.SendFilter) |
213 } |
211 } |
214 |
212 |
215 // Initial handshake. |
213 // Initial handshake. |
216 hsOut := &stream{To: jid.Domain, Version: XMPPVersion} |
214 hsOut := &stream{To: jid.Domain(), Version: XMPPVersion} |
217 cl.sendXml <- hsOut |
215 cl.sendRaw <- hsOut |
218 |
216 |
219 // Wait until resource binding is complete. |
217 // Wait until resource binding is complete. |
220 if err := cl.statmgr.awaitStatus(StatusBound); err != nil { |
218 if err := cl.statmgr.awaitStatus(StatusBound); err != nil { |
221 return nil, err |
219 return nil, cl.getError(err) |
222 } |
220 } |
|
221 |
|
222 // Forget about the password, for paranoia's sake. |
|
223 cl.password = "" |
223 |
224 |
224 // Initialize the session. |
225 // Initialize the session. |
225 id := NextId() |
226 id := NextId() |
226 iq := &Iq{Header: Header{To: cl.Jid.Domain, Id: id, Type: "set", |
227 iq := &Iq{Header: Header{To: JID(cl.Jid.Domain()), Id: id, Type: "set", |
227 Nested: []interface{}{Generic{XMLName: xml.Name{Space: NsSession, Local: "session"}}}}} |
228 Nested: []interface{}{Generic{XMLName: xml.Name{Space: NsSession, Local: "session"}}}}} |
228 ch := make(chan error) |
229 ch := make(chan error) |
229 f := func(st Stanza) { |
230 f := func(st Stanza) { |
230 iq, ok := st.(*Iq) |
231 iq, ok := st.(*Iq) |
231 if !ok { |
232 if !ok { |
232 Warn.Log("iq reply not iq; can't start session") |
233 ch <- fmt.Errorf("bad session start reply: %#v", st) |
233 ch <- errors.New("bad session start reply") |
|
234 } |
234 } |
235 if iq.Type == "error" { |
235 if iq.Type == "error" { |
236 Warn.Logf("Can't start session: %v", iq) |
236 ch <- fmt.Errorf("Can't start session: %v", iq.Error) |
237 ch <- iq.Error |
|
238 } |
237 } |
239 ch <- nil |
238 ch <- nil |
240 } |
239 } |
241 cl.SetCallback(id, f) |
240 cl.SetCallback(id, f) |
242 cl.sendXml <- iq |
241 cl.sendRaw <- iq |
243 // Now wait until the callback is called. |
242 // Now wait until the callback is called. |
244 if err := <-ch; err != nil { |
243 if err := <-ch; err != nil { |
245 return nil, err |
244 return nil, cl.getError(err) |
246 } |
245 } |
247 |
246 |
248 // This allows the client to receive stanzas. |
247 // This allows the client to receive stanzas. |
249 cl.setStatus(StatusRunning) |
248 cl.setStatus(StatusRunning) |
250 |
249 |
252 cl.Roster.update() |
251 cl.Roster.update() |
253 |
252 |
254 // Send the initial presence. |
253 // Send the initial presence. |
255 cl.Send <- &pr |
254 cl.Send <- &pr |
256 |
255 |
257 return cl, nil |
256 return cl, cl.getError(nil) |
258 } |
257 } |
259 |
258 |
260 func tee(r io.Reader, w io.Writer, prefix string) { |
259 func (cl *Client) Close() { |
261 defer func(w io.Writer) { |
260 // Shuts down the receivers: |
262 if c, ok := w.(io.Closer); ok { |
261 cl.setStatus(StatusShutdown) |
263 c.Close() |
262 |
264 } |
263 // Shuts down the senders: |
265 }(w) |
264 cl.shutdownOnce.Do(func() { close(cl.Send) }) |
266 |
265 } |
267 buf := bytes.NewBuffer([]uint8(prefix)) |
266 |
268 for { |
267 // If there's a buffered error in the channel, return it. Otherwise, |
269 var c [1]byte |
268 // return what was passed to us. The idea is that the error in the |
270 n, _ := r.Read(c[:]) |
269 // channel probably preceded (and caused) the one that's passed as an |
271 if n == 0 { |
270 // argument here. |
272 break |
271 func (cl *Client) getError(err1 error) error { |
273 } |
272 select { |
274 n, _ = w.Write(c[:n]) |
273 case err0 := <-cl.error: |
275 if n == 0 { |
274 return err0 |
276 break |
275 default: |
277 } |
276 return err1 |
278 buf.Write(c[:n]) |
277 } |
279 if c[0] == '\n' || c[0] == '>' { |
278 } |
280 Debug.Log(buf) |
279 |
281 buf = bytes.NewBuffer([]uint8(prefix)) |
280 // Register an error that happened in the internals somewhere. If |
282 } |
281 // there's already an error in the channel, discard the newer one in |
283 } |
282 // favor of the older. |
284 leftover := buf.String() |
283 func (cl *Client) setError(err error) { |
285 if leftover != "" { |
284 defer cl.Close() |
286 Debug.Log(buf) |
285 defer cl.setStatus(StatusError) |
287 } |
286 |
288 } |
287 if len(cl.error) > 0 { |
|
288 return |
|
289 } |
|
290 // If we're in a race between two calls to this function, |
|
291 // trying to set the "first" error, just arbitrarily let one |
|
292 // of them win. |
|
293 select { |
|
294 case cl.error <- err: |
|
295 default: |
|
296 } |
|
297 } |