69 func sendStream(sendXml chan<- interface{}, recvXmpp <-chan Stanza, |
68 func sendStream(sendXml chan<- interface{}, recvXmpp <-chan Stanza, |
70 control <-chan sendCmd) { |
69 control <-chan sendCmd) { |
71 defer close(sendXml) |
70 defer close(sendXml) |
72 |
71 |
73 var input <-chan Stanza |
72 var input <-chan Stanza |
74 Loop: |
|
75 for { |
73 for { |
76 select { |
74 select { |
77 case cmd := <-control: |
75 case cmd := <-control: |
78 switch cmd { |
76 switch cmd { |
79 case sendDeny: |
77 case sendDeny: |
80 input = nil |
78 input = nil |
81 case sendAllow: |
79 case sendAllow: |
82 input = recvXmpp |
80 input = recvXmpp |
|
81 case sendAbort: |
|
82 return |
|
83 default: |
|
84 panic(fmt.Sprintf("unknown cmd %d", cmd)) |
83 } |
85 } |
84 case x, ok := <-input: |
86 case x, ok := <-input: |
85 if !ok { |
87 if !ok { |
86 break Loop |
88 return |
87 } |
89 } |
88 if x == nil { |
90 if x == nil { |
89 Info.Log("Refusing to send nil stanza") |
91 Info.Log("Refusing to send nil stanza") |
90 continue |
92 continue |
91 } |
93 } |
97 func handleStream(ss *stream) { |
99 func handleStream(ss *stream) { |
98 } |
100 } |
99 |
101 |
100 func (cl *Client) handleStreamError(se *streamError) { |
102 func (cl *Client) handleStreamError(se *streamError) { |
101 Info.Logf("Received stream error: %v", se) |
103 Info.Logf("Received stream error: %v", se) |
102 cl.socket.Close() |
104 cl.inputControl <- sendAbort |
103 } |
105 } |
104 |
106 |
105 func (cl *Client) handleFeatures(fe *Features) { |
107 func (cl *Client) handleFeatures(fe *Features) { |
106 cl.Features = fe |
108 cl.Features = fe |
107 if fe.Starttls != nil { |
109 if fe.Starttls != nil { |
120 cl.bind(fe.Bind) |
122 cl.bind(fe.Bind) |
121 return |
123 return |
122 } |
124 } |
123 } |
125 } |
124 |
126 |
125 // readTransport() is running concurrently. We need to stop it, |
|
126 // negotiate TLS, then start it again. It calls waitForSocket() in |
|
127 // its inner loop; see below. |
|
128 func (cl *Client) handleTls(t *starttls) { |
127 func (cl *Client) handleTls(t *starttls) { |
129 tcp := cl.socket |
128 cl.layer1.startTls(&cl.tlsConfig) |
130 |
|
131 // Set the socket to nil, and wait for the reader routine to |
|
132 // signal that it's paused. |
|
133 cl.socket = nil |
|
134 cl.socketSync.Add(1) |
|
135 cl.socketSync.Wait() |
|
136 |
|
137 // Negotiate TLS with the server. |
|
138 tls := tls.Client(tcp, &cl.tlsConfig) |
|
139 |
|
140 // Make the TLS connection available to the reader, and wait |
|
141 // for it to signal that it's working again. |
|
142 cl.socketSync.Add(1) |
|
143 cl.socket = tls |
|
144 cl.socketSync.Wait() |
|
145 |
|
146 Info.Log("TLS negotiation succeeded.") |
|
147 cl.Features = nil |
|
148 |
129 |
149 // Now re-send the initial handshake message to start the new |
130 // Now re-send the initial handshake message to start the new |
150 // session. |
131 // session. |
151 hsOut := &stream{To: cl.Jid.Domain, Version: XMPPVersion} |
132 cl.sendXml <- &stream{To: cl.Jid.Domain, Version: XMPPVersion} |
152 cl.sendXml <- hsOut |
|
153 } |
|
154 |
|
155 // Synchronize with handleTls(). Called from readTransport() when |
|
156 // cl.socket is nil. |
|
157 func (cl *Client) waitForSocket() { |
|
158 // Signal that we've stopped reading from the socket. |
|
159 cl.socketSync.Done() |
|
160 |
|
161 // Wait until the socket is available again. |
|
162 for cl.socket == nil { |
|
163 time.Sleep(1e8) |
|
164 } |
|
165 |
|
166 // Signal that we're going back to the read loop. |
|
167 cl.socketSync.Done() |
|
168 } |
133 } |
169 |
134 |
170 // Register a callback to handle the next XMPP stanza (iq, message, or |
135 // Register a callback to handle the next XMPP stanza (iq, message, or |
171 // presence) with a given id. The provided function will not be called |
136 // presence) with a given id. The provided function will not be called |
172 // more than once. If it returns false, the stanza will not be made |
137 // more than once. If it returns false, the stanza will not be made |