# HG changeset patch # User Chris Jones # Date 1378179482 25200 # Node ID ebb86cbdd218560cad1ffdd966e744b69be874e4 # Parent 9d7e8333948bc28cfdd8388c9e6a465830072ea1 Changed the way filters work. They're now symmetrical, consisting of a paired send filter and receive filter. diff -r 9d7e8333948b -r ebb86cbdd218 TODO.txt --- a/TODO.txt Sat Aug 31 23:08:21 2013 +0100 +++ b/TODO.txt Mon Sep 02 20:38:02 2013 -0700 @@ -1,10 +1,6 @@ Extension.StanzaHandlers should use reflection, not constructor functions. -Rather than use Client.AddFilter(), and Extension.Start(), we should -have a function in Extension that, if non-nil, accepts a stanza and -returns a slice of stanzas. - Review all these *Client receiver methods. They should probably either all be receivers, or none. diff -r 9d7e8333948b -r ebb86cbdd218 filter.go --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/filter.go Mon Sep 02 20:38:02 2013 -0700 @@ -0,0 +1,64 @@ +package xmpp + +// Manages the stack of filters that can read and modify stanzas on +// their way from the remote to the application. + +// Receive new filters on filterAdd; those new filters get added to +// the top of the stack. Receive stanzas at the bottom of the stack on +// input. Send stanzas out the top of the stack on output. +func filterMgr(filterAdd <-chan Filter, input <-chan Stanza, output chan<- Stanza) { + botFiltIn := output + topFiltOut := input + +loop: + for { + select { + case stan, ok := <-input: + if !ok { + break loop + } + botFiltIn <- stan + + case stan, ok := <-topFiltOut: + if !ok { + break loop + } + output <- stan + + case filt := <-filterAdd: + newTop := make(chan Stanza) + go filt(topFiltOut, newTop) + topFiltOut = newTop + } + } + close(botFiltIn) +} + +// Starts the filter chain. Filters will all interpose themselves +// between srvIn and cliOut. +func (cl *Client) startFilters(srvIn, cliIn <-chan Stanza) (<-chan Stanza, <-chan Stanza) { + cliOut := make(chan Stanza) + srvOut := make(chan Stanza) + go filterMgr(cl.sendFilterAdd, srvIn, cliOut) + go filterMgr(cl.recvFilterAdd, cliIn, srvOut) + return cliOut, srvOut +} + +// AddRecvFilter adds a new filter to the top of the stack through which +// incoming stanzas travel on their way up to the client. +func (cl *Client) AddRecvFilter(filt Filter) { + if filt == nil { + return + } + cl.recvFilterAdd <- filt +} + +// AddSendFilter adds a new filter to the top of the stack through +// which outgoing stanzas travel on their way down from the client to +// the network. +func (cl *Client) AddSendFilter(filt Filter) { + if filt == nil { + return + } + cl.sendFilterAdd <- filt +} diff -r 9d7e8333948b -r ebb86cbdd218 roster.go --- a/roster.go Sat Aug 31 23:08:21 2013 +0100 +++ b/roster.go Mon Sep 02 20:38:02 2013 -0700 @@ -4,15 +4,12 @@ package xmpp +// This file contains support for roster management, RFC 3921, Section 7. + import ( "encoding/xml" - "fmt" ) -// This file contains support for roster management, RFC 3921, Section 7. - -var rosterExt Extension = Extension{StanzaHandlers: map[string]func(*xml.Name) interface{}{NsRoster: newRosterQuery}, Start: startRosterFilter} - // Roster query/result type RosterQuery struct { XMLName xml.Name `xml:"jabber:iq:roster query"` @@ -28,134 +25,131 @@ Group []string } +type rosterCb struct { + id string + cb func() +} + +type Roster struct { + Extension + get chan []RosterItem + callbacks chan rosterCb + toServer chan Stanza +} + type rosterClient struct { rosterChan <-chan []RosterItem rosterUpdate chan<- RosterItem } -var ( - rosterClients = make(map[string]rosterClient) -) - // Implicitly becomes part of NewClient's extStanza arg. func newRosterQuery(name *xml.Name) interface{} { return &RosterQuery{} } -// Synchronously fetch this entity's roster from the server and cache -// that information. This is called once from a fairly deep call stack -// as part of XMPP negotiation. -func fetchRoster(client *Client) error { - rosterUpdate := rosterClients[client.Uid].rosterUpdate - - iq := &Iq{Header: Header{From: client.Jid.String(), Type: "get", - Id: NextId(), Nested: []interface{}{RosterQuery{}}}} - ch := make(chan error) - f := func(v Stanza) bool { - defer close(ch) - iq, ok := v.(*Iq) - if !ok { - ch <- fmt.Errorf("response to iq wasn't iq: %s", v) - return false - } - if iq.Type == "error" { - ch <- iq.Error - return false - } - var rq *RosterQuery - for _, ele := range iq.Nested { - if q, ok := ele.(*RosterQuery); ok { - rq = q - break - } - } - if rq == nil { - ch <- fmt.Errorf( - "Roster query result not query: %v", v) - return false - } - for _, item := range rq.Item { - rosterUpdate <- item - } - ch <- nil - return false - } - client.HandleStanza(iq.Id, f) - client.Out <- iq - // Wait for f to complete. - return <-ch -} - -// The roster filter updates the Client's representation of the -// roster, but it lets the relevant stanzas through. This also starts -// the roster feeder, which is the goroutine that provides data on -// client.Roster. -func startRosterFilter(client *Client) { - out := make(chan Stanza) - in := client.AddFilter(out) - go func(in <-chan Stanza, out chan<- Stanza) { - defer close(out) - for st := range in { - maybeUpdateRoster(client, st) - out <- st - } - }(in, out) - - rosterCh := make(chan []RosterItem) - rosterUpdate := make(chan RosterItem) - rosterClients[client.Uid] = rosterClient{rosterChan: rosterCh, - rosterUpdate: rosterUpdate} - go feedRoster(rosterCh, rosterUpdate) -} - -func maybeUpdateRoster(client *Client, st interface{}) { - iq, ok := st.(*Iq) - if !ok { - return - } - - rosterUpdate := rosterClients[client.Uid].rosterUpdate - - var rq *RosterQuery - for _, ele := range iq.Nested { - if q, ok := ele.(*RosterQuery); ok { - rq = q - break - } - } - if iq.Type == "set" && rq != nil { - for _, item := range rq.Item { - rosterUpdate <- item - } - // Send a reply. - reply := &Iq{Header: Header{To: iq.From, Id: iq.Id, - Type: "result"}} - client.Out <- reply - } -} - -func feedRoster(rosterCh chan<- []RosterItem, rosterUpdate <-chan RosterItem) { +func (r *Roster) rosterMgr(upd <-chan Stanza) { roster := make(map[string]RosterItem) - snapshot := []RosterItem{} + waits := make(map[string]func()) + var snapshot []RosterItem for { select { - case newIt := <-rosterUpdate: - if newIt.Subscription == "remove" { - delete(roster, newIt.Jid) - } else { - roster[newIt.Jid] = newIt + case stan, ok := <- upd: + if !ok { + return + } + hdr := stan.GetHeader() + if f := waits[hdr.Id] ; f != nil { + delete(waits, hdr.Id) + f() + } + iq, ok := stan.(*Iq) + if iq.Type != "set" { + continue } - case rosterCh <- snapshot: - } - snapshot = make([]RosterItem, 0, len(roster)) - for _, v := range roster { - snapshot = append(snapshot, v) + var rq *RosterQuery + for _, ele := range iq.Nested { + if q, ok := ele.(*RosterQuery); ok { + rq = q + break + } + } + if rq == nil { + continue + } + for _, item := range rq.Item { + roster[item.Jid] = item + } + snapshot = []RosterItem{} + for _, ri := range roster { + snapshot = append(snapshot, ri) + } + case r.get <- snapshot: + case cb := <- r.callbacks: + waits[cb.id] = cb.cb } } } -// Retrieve a snapshot of the roster for the given Client. -func Roster(client *Client) []RosterItem { - rosterChan := rosterClients[client.Uid].rosterChan - return <-rosterChan +func (r *Roster) makeFilters() (Filter, Filter) { + rosterUpdate := make(chan Stanza) + go r.rosterMgr(rosterUpdate) + recv := func(in <-chan Stanza, out chan<- Stanza) { + defer close(out) + for stan := range in { + rosterUpdate <- stan + out <- stan + } + } + send := func(in <-chan Stanza, out chan<- Stanza) { + defer close(out) + for { + select { + case stan, ok := <- in: + if !ok { + return + } + out <- stan + case stan := <- r.toServer: + out <- stan + } + } + } + return recv, send } + +func newRosterExt() *Roster { + r := Roster{} + r.StanzaHandlers = make(map[string]func(*xml.Name) interface{}) + r.StanzaHandlers[NsRoster] = newRosterQuery + r.RecvFilter, r.SendFilter = r.makeFilters() + r.get = make(chan []RosterItem) + r.callbacks = make(chan rosterCb) + r.toServer = make(chan Stanza) + return &r +} + +// Return the most recent snapshot of the roster status. This is +// updated automatically as roster updates are received from the +// server, but especially in response to calls to Update(). +func (r *Roster) Get() []RosterItem { + return <-r.get +} + +// Synchronously fetch this entity's roster from the server and cache +// that information. The client can access the roster by watching for +// RosterQuery objects or by calling Get(). +func (r *Roster) Update(client *Client) { + iq := &Iq{Header: Header{From: client.Jid.String(), Type: "get", + Id: NextId(), Nested: []interface{}{RosterQuery{}}}} + waitchan := make(chan int) + done := func() { + close(waitchan) + } + r.waitFor(iq.Id, done) + r.toServer <- iq + <-waitchan +} + +func (r *Roster) waitFor(id string, cb func()) { + r.callbacks <- rosterCb{id: id, cb: cb} +} diff -r 9d7e8333948b -r ebb86cbdd218 stream.go --- a/stream.go Sat Aug 31 23:08:21 2013 +0100 +++ b/stream.go Mon Sep 02 20:38:02 2013 -0700 @@ -300,39 +300,6 @@ } } -// Stanzas from the remote go up through a stack of filters to the -// app. This function manages the filters. -func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza, - topFilter <-chan Stanza, app chan<- Stanza) { - defer close(app) -Loop: - for { - select { - case newFilterOut := <-filterOut: - if newFilterOut == nil { - Warn.Log("Received nil filter") - filterIn <- nil - continue - } - filterIn <- topFilter - topFilter = newFilterOut - - case data, ok := <-topFilter: - if !ok { - break Loop - } - app <- data - } - } -} - -func filterBottom(from <-chan Stanza, to chan<- Stanza) { - defer close(to) - for data := range from { - to <- data - } -} - func handleStream(ss *stream) { } diff -r 9d7e8333948b -r ebb86cbdd218 structs.go --- a/structs.go Sat Aug 31 23:08:21 2013 +0100 +++ b/structs.go Mon Sep 02 20:38:02 2013 -0700 @@ -269,8 +269,7 @@ return string(buf) } -var bindExt Extension = Extension{StanzaHandlers: map[string]func(*xml.Name) interface{}{NsBind: newBind}, - Start: func(cl *Client) {}} +var bindExt Extension = Extension{StanzaHandlers: map[string]func(*xml.Name) interface{}{NsBind: newBind}} func newBind(name *xml.Name) interface{} { return &bindIq{} diff -r 9d7e8333948b -r ebb86cbdd218 xmpp.go --- a/xmpp.go Sat Aug 31 23:08:21 2013 +0100 +++ b/xmpp.go Mon Sep 02 20:38:02 2013 -0700 @@ -40,13 +40,24 @@ clientSrv = "xmpp-client" ) +// A filter can modify the XMPP traffic to or from the remote +// server. It's part of an Extension. The filter function will be +// called in a new goroutine, so it doesn't need to return. The filter +// should close its output when its input is closed. +type Filter func(in <-chan Stanza, out chan<- Stanza) + // Extensions can add stanza filters and/or new XML element types. type Extension struct { // Maps from an XML namespace to a function which constructs a // structure to hold the contents of stanzas in that // namespace. StanzaHandlers map[string]func(*xml.Name) interface{} - Start func(*Client) + // If non-nil, will be called once to start the filter + // running. RecvFilter intercepts incoming messages on their + // way from the remote server to the application; SendFilter + // intercepts messages going the other direction. + RecvFilter Filter + SendFilter Filter } // Allows the user to override the TLS configuration. @@ -76,13 +87,16 @@ // channel. Out chan<- Stanza xmlOut chan<- interface{} + // The client's roster is also known as the buddy list. It's + // the set of contacts which are known to this JID, or which + // this JID is known to. + Roster Roster // Features advertised by the remote. This will be updated // asynchronously as new features are received throughout the // connection process. It should not be updated once // StartSession() returns. Features *Features - filterOut chan<- <-chan Stanza - filterIn <-chan <-chan Stanza + sendFilterAdd, recvFilterAdd chan Filter } // Connect to the appropriate server and authenticate as the given JID @@ -93,7 +107,8 @@ // binding) is complete. func NewClient(jid *JID, password string, exts []Extension) (*Client, error) { // Include the mandatory extensions. - exts = append(exts, rosterExt) + roster := newRosterExt() + exts = append(exts, roster.Extension) exts = append(exts, bindExt) // Resolve the domain in the JID. @@ -123,6 +138,7 @@ } cl := new(Client) + cl.Roster = *roster cl.Uid = NextId() cl.password = password cl.Jid = *jid @@ -138,27 +154,33 @@ } // Start the transport handler, initially unencrypted. - tlsr, tlsw := cl.startTransport() - - // Start the reader and writers that convert to and from XML. - xmlIn := startXmlReader(tlsr, extStanza) - cl.xmlOut = startXmlWriter(tlsw) + recvReader, recvWriter := io.Pipe() + sendReader, sendWriter := io.Pipe() + go cl.readTransport(recvWriter) + go cl.writeTransport(sendReader) - // Start the XMPP stream handler which filters stream-level - // events and responds to them. - stIn := cl.startStreamReader(xmlIn, cl.xmlOut) - clOut := cl.startStreamWriter(cl.xmlOut) - cl.Out = clOut + // Start the reader and writer that convert to and from XML. + recvXml := make(chan interface{}) + go readXml(recvReader, recvXml, extStanza) + sendXml := make(chan interface{}) + cl.xmlOut = sendXml + go writeXml(sendWriter, sendXml) + + // Start the reader and writer that convert between XML and + // XMPP stanzas. + recvRawXmpp := make(chan Stanza) + go cl.readStream(recvXml, recvRawXmpp) + sendRawXmpp := make(chan Stanza) + go writeStream(sendXml, sendRawXmpp, cl.inputControl) // Start the manager for the filters that can modify what the // app sees. - clIn := cl.startFilter(stIn) - cl.In = clIn - - // Add filters for our extensions. - for _, ext := range exts { - ext.Start(cl) - } + recvFiltXmpp := make(chan Stanza) + cl.In = recvFiltXmpp + go filterMgr(cl.recvFilterAdd, recvRawXmpp, recvFiltXmpp) + sendFiltXmpp := make(chan Stanza) + cl.Out = sendFiltXmpp + go filterMgr(cl.sendFilterAdd, sendFiltXmpp, sendFiltXmpp) // Initial handshake. hsOut := &stream{To: jid.Domain, Version: XMPPVersion} @@ -167,51 +189,6 @@ return cl, nil } -func (cl *Client) startTransport() (io.Reader, io.WriteCloser) { - inr, inw := io.Pipe() - outr, outw := io.Pipe() - go cl.readTransport(inw) - go cl.writeTransport(outr) - return inr, outw -} - -func startXmlReader(r io.Reader, - extStanza map[string]func(*xml.Name) interface{}) <-chan interface{} { - ch := make(chan interface{}) - go readXml(r, ch, extStanza) - return ch -} - -func startXmlWriter(w io.WriteCloser) chan<- interface{} { - ch := make(chan interface{}) - go writeXml(w, ch) - return ch -} - -func (cl *Client) startStreamReader(xmlIn <-chan interface{}, srvOut chan<- interface{}) <-chan Stanza { - ch := make(chan Stanza) - go cl.readStream(xmlIn, ch) - return ch -} - -func (cl *Client) startStreamWriter(xmlOut chan<- interface{}) chan<- Stanza { - ch := make(chan Stanza) - go writeStream(xmlOut, ch, cl.inputControl) - return ch -} - -func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza { - cliIn := make(chan Stanza) - filterOut := make(chan (<-chan Stanza)) - filterIn := make(chan (<-chan Stanza)) - nullFilter := make(chan Stanza) - go filterBottom(srvIn, nullFilter) - go filterTop(filterOut, filterIn, nullFilter, cliIn) - cl.filterOut = filterOut - cl.filterIn = filterIn - return cliIn -} - func tee(r io.Reader, w io.Writer, prefix string) { defer func(w io.Writer) { if c, ok := w.(io.Closer); ok { @@ -253,8 +230,9 @@ // immediately after creating the Client in order to start the // session, retrieve the roster, and broadcast an initial // presence. The presence can be as simple as a newly-initialized -// Presence struct. See RFC 3921, Section 3. -func (cl *Client) StartSession(getRoster bool, pr *Presence) error { +// Presence struct. See RFC 3921, Section 3. After calling this, a +// normal client will want to call Roster.Update(). +func (cl *Client) StartSession(pr *Presence) error { id := NextId() iq := &Iq{Header: Header{To: cl.Jid.Domain, Id: id, Type: "set", Nested: []interface{}{Generic{XMLName: xml.Name{Space: NsSession, Local: "session"}}}}} @@ -281,24 +259,8 @@ if err := <-ch; err != nil { return err } - if getRoster { - err := fetchRoster(cl) - if err != nil { - return err - } - } if pr != nil { cl.Out <- pr } return nil } - -// AddFilter adds a new filter to the top of the stack through which -// incoming stanzas travel on their way up to the client. The new -// filter's output channel is given to this function, and it returns a -// new input channel which the filter should read from. When its input -// channel closes, the filter should close its output channel. -func (cl *Client) AddFilter(out <-chan Stanza) <-chan Stanza { - cl.filterOut <- out - return <-cl.filterIn -}