diff -r 9d7e8333948b -r ebb86cbdd218 xmpp.go --- a/xmpp.go Sat Aug 31 23:08:21 2013 +0100 +++ b/xmpp.go Mon Sep 02 20:38:02 2013 -0700 @@ -40,13 +40,24 @@ clientSrv = "xmpp-client" ) +// A filter can modify the XMPP traffic to or from the remote +// server. It's part of an Extension. The filter function will be +// called in a new goroutine, so it doesn't need to return. The filter +// should close its output when its input is closed. +type Filter func(in <-chan Stanza, out chan<- Stanza) + // Extensions can add stanza filters and/or new XML element types. type Extension struct { // Maps from an XML namespace to a function which constructs a // structure to hold the contents of stanzas in that // namespace. StanzaHandlers map[string]func(*xml.Name) interface{} - Start func(*Client) + // If non-nil, will be called once to start the filter + // running. RecvFilter intercepts incoming messages on their + // way from the remote server to the application; SendFilter + // intercepts messages going the other direction. + RecvFilter Filter + SendFilter Filter } // Allows the user to override the TLS configuration. @@ -76,13 +87,16 @@ // channel. Out chan<- Stanza xmlOut chan<- interface{} + // The client's roster is also known as the buddy list. It's + // the set of contacts which are known to this JID, or which + // this JID is known to. + Roster Roster // Features advertised by the remote. This will be updated // asynchronously as new features are received throughout the // connection process. It should not be updated once // StartSession() returns. Features *Features - filterOut chan<- <-chan Stanza - filterIn <-chan <-chan Stanza + sendFilterAdd, recvFilterAdd chan Filter } // Connect to the appropriate server and authenticate as the given JID @@ -93,7 +107,8 @@ // binding) is complete. func NewClient(jid *JID, password string, exts []Extension) (*Client, error) { // Include the mandatory extensions. - exts = append(exts, rosterExt) + roster := newRosterExt() + exts = append(exts, roster.Extension) exts = append(exts, bindExt) // Resolve the domain in the JID. @@ -123,6 +138,7 @@ } cl := new(Client) + cl.Roster = *roster cl.Uid = NextId() cl.password = password cl.Jid = *jid @@ -138,27 +154,33 @@ } // Start the transport handler, initially unencrypted. - tlsr, tlsw := cl.startTransport() - - // Start the reader and writers that convert to and from XML. - xmlIn := startXmlReader(tlsr, extStanza) - cl.xmlOut = startXmlWriter(tlsw) + recvReader, recvWriter := io.Pipe() + sendReader, sendWriter := io.Pipe() + go cl.readTransport(recvWriter) + go cl.writeTransport(sendReader) - // Start the XMPP stream handler which filters stream-level - // events and responds to them. - stIn := cl.startStreamReader(xmlIn, cl.xmlOut) - clOut := cl.startStreamWriter(cl.xmlOut) - cl.Out = clOut + // Start the reader and writer that convert to and from XML. + recvXml := make(chan interface{}) + go readXml(recvReader, recvXml, extStanza) + sendXml := make(chan interface{}) + cl.xmlOut = sendXml + go writeXml(sendWriter, sendXml) + + // Start the reader and writer that convert between XML and + // XMPP stanzas. + recvRawXmpp := make(chan Stanza) + go cl.readStream(recvXml, recvRawXmpp) + sendRawXmpp := make(chan Stanza) + go writeStream(sendXml, sendRawXmpp, cl.inputControl) // Start the manager for the filters that can modify what the // app sees. - clIn := cl.startFilter(stIn) - cl.In = clIn - - // Add filters for our extensions. - for _, ext := range exts { - ext.Start(cl) - } + recvFiltXmpp := make(chan Stanza) + cl.In = recvFiltXmpp + go filterMgr(cl.recvFilterAdd, recvRawXmpp, recvFiltXmpp) + sendFiltXmpp := make(chan Stanza) + cl.Out = sendFiltXmpp + go filterMgr(cl.sendFilterAdd, sendFiltXmpp, sendFiltXmpp) // Initial handshake. hsOut := &stream{To: jid.Domain, Version: XMPPVersion} @@ -167,51 +189,6 @@ return cl, nil } -func (cl *Client) startTransport() (io.Reader, io.WriteCloser) { - inr, inw := io.Pipe() - outr, outw := io.Pipe() - go cl.readTransport(inw) - go cl.writeTransport(outr) - return inr, outw -} - -func startXmlReader(r io.Reader, - extStanza map[string]func(*xml.Name) interface{}) <-chan interface{} { - ch := make(chan interface{}) - go readXml(r, ch, extStanza) - return ch -} - -func startXmlWriter(w io.WriteCloser) chan<- interface{} { - ch := make(chan interface{}) - go writeXml(w, ch) - return ch -} - -func (cl *Client) startStreamReader(xmlIn <-chan interface{}, srvOut chan<- interface{}) <-chan Stanza { - ch := make(chan Stanza) - go cl.readStream(xmlIn, ch) - return ch -} - -func (cl *Client) startStreamWriter(xmlOut chan<- interface{}) chan<- Stanza { - ch := make(chan Stanza) - go writeStream(xmlOut, ch, cl.inputControl) - return ch -} - -func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza { - cliIn := make(chan Stanza) - filterOut := make(chan (<-chan Stanza)) - filterIn := make(chan (<-chan Stanza)) - nullFilter := make(chan Stanza) - go filterBottom(srvIn, nullFilter) - go filterTop(filterOut, filterIn, nullFilter, cliIn) - cl.filterOut = filterOut - cl.filterIn = filterIn - return cliIn -} - func tee(r io.Reader, w io.Writer, prefix string) { defer func(w io.Writer) { if c, ok := w.(io.Closer); ok { @@ -253,8 +230,9 @@ // immediately after creating the Client in order to start the // session, retrieve the roster, and broadcast an initial // presence. The presence can be as simple as a newly-initialized -// Presence struct. See RFC 3921, Section 3. -func (cl *Client) StartSession(getRoster bool, pr *Presence) error { +// Presence struct. See RFC 3921, Section 3. After calling this, a +// normal client will want to call Roster.Update(). +func (cl *Client) StartSession(pr *Presence) error { id := NextId() iq := &Iq{Header: Header{To: cl.Jid.Domain, Id: id, Type: "set", Nested: []interface{}{Generic{XMLName: xml.Name{Space: NsSession, Local: "session"}}}}} @@ -281,24 +259,8 @@ if err := <-ch; err != nil { return err } - if getRoster { - err := fetchRoster(cl) - if err != nil { - return err - } - } if pr != nil { cl.Out <- pr } return nil } - -// AddFilter adds a new filter to the top of the stack through which -// incoming stanzas travel on their way up to the client. The new -// filter's output channel is given to this function, and it returns a -// new input channel which the filter should read from. When its input -// channel closes, the filter should close its output channel. -func (cl *Client) AddFilter(out <-chan Stanza) <-chan Stanza { - cl.filterOut <- out - return <-cl.filterIn -}