xmpp.go
changeset 121 ebb86cbdd218
parent 120 9d7e8333948b
child 123 42a9995faa38
equal deleted inserted replaced
120:9d7e8333948b 121:ebb86cbdd218
    38 	// DNS SRV names
    38 	// DNS SRV names
    39 	serverSrv = "xmpp-server"
    39 	serverSrv = "xmpp-server"
    40 	clientSrv = "xmpp-client"
    40 	clientSrv = "xmpp-client"
    41 )
    41 )
    42 
    42 
       
    43 // A filter can modify the XMPP traffic to or from the remote
       
    44 // server. It's part of an Extension. The filter function will be
       
    45 // called in a new goroutine, so it doesn't need to return. The filter
       
    46 // should close its output when its input is closed.
       
    47 type Filter func(in <-chan Stanza, out chan<- Stanza)
       
    48 
    43 // Extensions can add stanza filters and/or new XML element types.
    49 // Extensions can add stanza filters and/or new XML element types.
    44 type Extension struct {
    50 type Extension struct {
    45 	// Maps from an XML namespace to a function which constructs a
    51 	// Maps from an XML namespace to a function which constructs a
    46 	// structure to hold the contents of stanzas in that
    52 	// structure to hold the contents of stanzas in that
    47 	// namespace.
    53 	// namespace.
    48 	StanzaHandlers map[string]func(*xml.Name) interface{}
    54 	StanzaHandlers map[string]func(*xml.Name) interface{}
    49 	Start          func(*Client)
    55 	// If non-nil, will be called once to start the filter
       
    56 	// running. RecvFilter intercepts incoming messages on their
       
    57 	// way from the remote server to the application; SendFilter
       
    58 	// intercepts messages going the other direction.
       
    59 	RecvFilter Filter
       
    60 	SendFilter Filter
    50 }
    61 }
    51 
    62 
    52 // Allows the user to override the TLS configuration.
    63 // Allows the user to override the TLS configuration.
    53 var TlsConfig tls.Config
    64 var TlsConfig tls.Config
    54 
    65 
    74 	In <-chan Stanza
    85 	In <-chan Stanza
    75 	// Outgoing XMPP stanzas to the server should be sent to this
    86 	// Outgoing XMPP stanzas to the server should be sent to this
    76 	// channel.
    87 	// channel.
    77 	Out    chan<- Stanza
    88 	Out    chan<- Stanza
    78 	xmlOut chan<- interface{}
    89 	xmlOut chan<- interface{}
       
    90 	// The client's roster is also known as the buddy list. It's
       
    91 	// the set of contacts which are known to this JID, or which
       
    92 	// this JID is known to.
       
    93 	Roster Roster
    79 	// Features advertised by the remote. This will be updated
    94 	// Features advertised by the remote. This will be updated
    80 	// asynchronously as new features are received throughout the
    95 	// asynchronously as new features are received throughout the
    81 	// connection process. It should not be updated once
    96 	// connection process. It should not be updated once
    82 	// StartSession() returns.
    97 	// StartSession() returns.
    83 	Features  *Features
    98 	Features  *Features
    84 	filterOut chan<- <-chan Stanza
    99 	sendFilterAdd, recvFilterAdd chan Filter
    85 	filterIn  <-chan <-chan Stanza
       
    86 }
   100 }
    87 
   101 
    88 // Connect to the appropriate server and authenticate as the given JID
   102 // Connect to the appropriate server and authenticate as the given JID
    89 // with the given password. This function will return as soon as a TCP
   103 // with the given password. This function will return as soon as a TCP
    90 // connection has been established, but before XMPP stream negotiation
   104 // connection has been established, but before XMPP stream negotiation
    91 // has completed. The negotiation will occur asynchronously, and any
   105 // has completed. The negotiation will occur asynchronously, and any
    92 // send operation to Client.Out will block until negotiation (resource
   106 // send operation to Client.Out will block until negotiation (resource
    93 // binding) is complete.
   107 // binding) is complete.
    94 func NewClient(jid *JID, password string, exts []Extension) (*Client, error) {
   108 func NewClient(jid *JID, password string, exts []Extension) (*Client, error) {
    95 	// Include the mandatory extensions.
   109 	// Include the mandatory extensions.
    96 	exts = append(exts, rosterExt)
   110 	roster := newRosterExt()
       
   111 	exts = append(exts, roster.Extension)
    97 	exts = append(exts, bindExt)
   112 	exts = append(exts, bindExt)
    98 
   113 
    99 	// Resolve the domain in the JID.
   114 	// Resolve the domain in the JID.
   100 	_, srvs, err := net.LookupSRV(clientSrv, "tcp", jid.Domain)
   115 	_, srvs, err := net.LookupSRV(clientSrv, "tcp", jid.Domain)
   101 	if err != nil {
   116 	if err != nil {
   121 	if tcp == nil {
   136 	if tcp == nil {
   122 		return nil, err
   137 		return nil, err
   123 	}
   138 	}
   124 
   139 
   125 	cl := new(Client)
   140 	cl := new(Client)
       
   141 	cl.Roster = *roster
   126 	cl.Uid = NextId()
   142 	cl.Uid = NextId()
   127 	cl.password = password
   143 	cl.password = password
   128 	cl.Jid = *jid
   144 	cl.Jid = *jid
   129 	cl.socket = tcp
   145 	cl.socket = tcp
   130 	cl.handlers = make(chan *stanzaHandler, 100)
   146 	cl.handlers = make(chan *stanzaHandler, 100)
   136 			extStanza[k] = v
   152 			extStanza[k] = v
   137 		}
   153 		}
   138 	}
   154 	}
   139 
   155 
   140 	// Start the transport handler, initially unencrypted.
   156 	// Start the transport handler, initially unencrypted.
   141 	tlsr, tlsw := cl.startTransport()
   157 	recvReader, recvWriter := io.Pipe()
   142 
   158 	sendReader, sendWriter := io.Pipe()
   143 	// Start the reader and writers that convert to and from XML.
   159 	go cl.readTransport(recvWriter)
   144 	xmlIn := startXmlReader(tlsr, extStanza)
   160 	go cl.writeTransport(sendReader)
   145 	cl.xmlOut = startXmlWriter(tlsw)
   161 
   146 
   162 	// Start the reader and writer that convert to and from XML.
   147 	// Start the XMPP stream handler which filters stream-level
   163 	recvXml := make(chan interface{})
   148 	// events and responds to them.
   164 	go readXml(recvReader, recvXml, extStanza)
   149 	stIn := cl.startStreamReader(xmlIn, cl.xmlOut)
   165 	sendXml := make(chan interface{})
   150 	clOut := cl.startStreamWriter(cl.xmlOut)
   166 	cl.xmlOut = sendXml
   151 	cl.Out = clOut
   167 	go writeXml(sendWriter, sendXml)
       
   168 
       
   169 	// Start the reader and writer that convert between XML and
       
   170 	// XMPP stanzas.
       
   171 	recvRawXmpp := make(chan Stanza)
       
   172 	go cl.readStream(recvXml, recvRawXmpp)
       
   173 	sendRawXmpp := make(chan Stanza)
       
   174 	go writeStream(sendXml, sendRawXmpp, cl.inputControl)
   152 
   175 
   153 	// Start the manager for the filters that can modify what the
   176 	// Start the manager for the filters that can modify what the
   154 	// app sees.
   177 	// app sees.
   155 	clIn := cl.startFilter(stIn)
   178 	recvFiltXmpp := make(chan Stanza)
   156 	cl.In = clIn
   179 	cl.In = recvFiltXmpp
   157 
   180 	go filterMgr(cl.recvFilterAdd, recvRawXmpp, recvFiltXmpp)
   158 	// Add filters for our extensions.
   181 	sendFiltXmpp := make(chan Stanza)
   159 	for _, ext := range exts {
   182 	cl.Out = sendFiltXmpp
   160 		ext.Start(cl)
   183 	go filterMgr(cl.sendFilterAdd, sendFiltXmpp, sendFiltXmpp)
   161 	}
       
   162 
   184 
   163 	// Initial handshake.
   185 	// Initial handshake.
   164 	hsOut := &stream{To: jid.Domain, Version: XMPPVersion}
   186 	hsOut := &stream{To: jid.Domain, Version: XMPPVersion}
   165 	cl.xmlOut <- hsOut
   187 	cl.xmlOut <- hsOut
   166 
   188 
   167 	return cl, nil
   189 	return cl, nil
   168 }
       
   169 
       
   170 func (cl *Client) startTransport() (io.Reader, io.WriteCloser) {
       
   171 	inr, inw := io.Pipe()
       
   172 	outr, outw := io.Pipe()
       
   173 	go cl.readTransport(inw)
       
   174 	go cl.writeTransport(outr)
       
   175 	return inr, outw
       
   176 }
       
   177 
       
   178 func startXmlReader(r io.Reader,
       
   179 	extStanza map[string]func(*xml.Name) interface{}) <-chan interface{} {
       
   180 	ch := make(chan interface{})
       
   181 	go readXml(r, ch, extStanza)
       
   182 	return ch
       
   183 }
       
   184 
       
   185 func startXmlWriter(w io.WriteCloser) chan<- interface{} {
       
   186 	ch := make(chan interface{})
       
   187 	go writeXml(w, ch)
       
   188 	return ch
       
   189 }
       
   190 
       
   191 func (cl *Client) startStreamReader(xmlIn <-chan interface{}, srvOut chan<- interface{}) <-chan Stanza {
       
   192 	ch := make(chan Stanza)
       
   193 	go cl.readStream(xmlIn, ch)
       
   194 	return ch
       
   195 }
       
   196 
       
   197 func (cl *Client) startStreamWriter(xmlOut chan<- interface{}) chan<- Stanza {
       
   198 	ch := make(chan Stanza)
       
   199 	go writeStream(xmlOut, ch, cl.inputControl)
       
   200 	return ch
       
   201 }
       
   202 
       
   203 func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza {
       
   204 	cliIn := make(chan Stanza)
       
   205 	filterOut := make(chan (<-chan Stanza))
       
   206 	filterIn := make(chan (<-chan Stanza))
       
   207 	nullFilter := make(chan Stanza)
       
   208 	go filterBottom(srvIn, nullFilter)
       
   209 	go filterTop(filterOut, filterIn, nullFilter, cliIn)
       
   210 	cl.filterOut = filterOut
       
   211 	cl.filterIn = filterIn
       
   212 	return cliIn
       
   213 }
   190 }
   214 
   191 
   215 func tee(r io.Reader, w io.Writer, prefix string) {
   192 func tee(r io.Reader, w io.Writer, prefix string) {
   216 	defer func(w io.Writer) {
   193 	defer func(w io.Writer) {
   217 		if c, ok := w.(io.Closer); ok {
   194 		if c, ok := w.(io.Closer); ok {
   251 
   228 
   252 // Start an XMPP session. A typical XMPP client should call this
   229 // Start an XMPP session. A typical XMPP client should call this
   253 // immediately after creating the Client in order to start the
   230 // immediately after creating the Client in order to start the
   254 // session, retrieve the roster, and broadcast an initial
   231 // session, retrieve the roster, and broadcast an initial
   255 // presence. The presence can be as simple as a newly-initialized
   232 // presence. The presence can be as simple as a newly-initialized
   256 // Presence struct.  See RFC 3921, Section 3.
   233 // Presence struct.  See RFC 3921, Section 3. After calling this, a
   257 func (cl *Client) StartSession(getRoster bool, pr *Presence) error {
   234 // normal client will want to call Roster.Update().
       
   235 func (cl *Client) StartSession(pr *Presence) error {
   258 	id := NextId()
   236 	id := NextId()
   259 	iq := &Iq{Header: Header{To: cl.Jid.Domain, Id: id, Type: "set",
   237 	iq := &Iq{Header: Header{To: cl.Jid.Domain, Id: id, Type: "set",
   260 		Nested: []interface{}{Generic{XMLName: xml.Name{Space: NsSession, Local: "session"}}}}}
   238 		Nested: []interface{}{Generic{XMLName: xml.Name{Space: NsSession, Local: "session"}}}}}
   261 	ch := make(chan error)
   239 	ch := make(chan error)
   262 	f := func(st Stanza) bool {
   240 	f := func(st Stanza) bool {
   279 
   257 
   280 	// Now wait until the callback is called.
   258 	// Now wait until the callback is called.
   281 	if err := <-ch; err != nil {
   259 	if err := <-ch; err != nil {
   282 		return err
   260 		return err
   283 	}
   261 	}
   284 	if getRoster {
       
   285 		err := fetchRoster(cl)
       
   286 		if err != nil {
       
   287 			return err
       
   288 		}
       
   289 	}
       
   290 	if pr != nil {
   262 	if pr != nil {
   291 		cl.Out <- pr
   263 		cl.Out <- pr
   292 	}
   264 	}
   293 	return nil
   265 	return nil
   294 }
   266 }
   295 
       
   296 // AddFilter adds a new filter to the top of the stack through which
       
   297 // incoming stanzas travel on their way up to the client. The new
       
   298 // filter's output channel is given to this function, and it returns a
       
   299 // new input channel which the filter should read from. When its input
       
   300 // channel closes, the filter should close its output channel.
       
   301 func (cl *Client) AddFilter(out <-chan Stanza) <-chan Stanza {
       
   302 	cl.filterOut <- out
       
   303 	return <-cl.filterIn
       
   304 }