Changed the way filters work. They're now symmetrical, consisting of a paired send filter and receive filter.
authorChris Jones <>
Mon, 02 Sep 2013 20:38:02 -0700 (2013-09-03)
changeset 121 ebb86cbdd218
parent 120 9d7e8333948b
child 122 ab22b4285d12
Changed the way filters work. They're now symmetrical, consisting of a paired send filter and receive filter.
--- a/TODO.txt	Sat Aug 31 23:08:21 2013 +0100
+++ b/TODO.txt	Mon Sep 02 20:38:02 2013 -0700
@@ -1,10 +1,6 @@
 Extension.StanzaHandlers should use reflection, not constructor
-Rather than use Client.AddFilter(), and Extension.Start(), we should
-have a function in Extension that, if non-nil, accepts a stanza and
-returns a slice of stanzas.
 Review all these *Client receiver methods. They should probably either
 all be receivers, or none.
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/filter.go	Mon Sep 02 20:38:02 2013 -0700
@@ -0,0 +1,64 @@
+package xmpp
+// Manages the stack of filters that can read and modify stanzas on
+// their way from the remote to the application.
+// Receive new filters on filterAdd; those new filters get added to
+// the top of the stack. Receive stanzas at the bottom of the stack on
+// input. Send stanzas out the top of the stack on output.
+func filterMgr(filterAdd <-chan Filter, input <-chan Stanza, output chan<- Stanza) {
+	botFiltIn := output
+	topFiltOut := input
+	for {
+		select {
+		case stan, ok := <-input:
+			if !ok {
+				break loop
+			}
+			botFiltIn <- stan
+		case stan, ok := <-topFiltOut:
+			if !ok {
+				break loop
+			}
+			output <- stan
+		case filt := <-filterAdd:
+			newTop := make(chan Stanza)
+			go filt(topFiltOut, newTop)
+			topFiltOut = newTop
+		}
+	}
+	close(botFiltIn)
+// Starts the filter chain. Filters will all interpose themselves
+// between srvIn and cliOut.
+func (cl *Client) startFilters(srvIn, cliIn <-chan Stanza) (<-chan Stanza, <-chan Stanza) {
+	cliOut := make(chan Stanza)
+	srvOut := make(chan Stanza)
+	go filterMgr(cl.sendFilterAdd, srvIn, cliOut)
+	go filterMgr(cl.recvFilterAdd, cliIn, srvOut)
+	return cliOut, srvOut
+// AddRecvFilter adds a new filter to the top of the stack through which
+// incoming stanzas travel on their way up to the client.
+func (cl *Client) AddRecvFilter(filt Filter) {
+	if filt == nil {
+		return
+	}
+	cl.recvFilterAdd <- filt
+// AddSendFilter adds a new filter to the top of the stack through
+// which outgoing stanzas travel on their way down from the client to
+// the network.
+func (cl *Client) AddSendFilter(filt Filter) {
+	if filt == nil {
+		return
+	}
+	cl.sendFilterAdd <- filt
--- a/roster.go	Sat Aug 31 23:08:21 2013 +0100
+++ b/roster.go	Mon Sep 02 20:38:02 2013 -0700
@@ -4,15 +4,12 @@
 package xmpp
+// This file contains support for roster management, RFC 3921, Section 7.
 import (
-	"fmt"
-// This file contains support for roster management, RFC 3921, Section 7.
-var rosterExt Extension = Extension{StanzaHandlers: map[string]func(*xml.Name) interface{}{NsRoster: newRosterQuery}, Start: startRosterFilter}
 // Roster query/result
 type RosterQuery struct {
 	XMLName xml.Name     `xml:"jabber:iq:roster query"`
@@ -28,134 +25,131 @@
 	Group        []string
+type rosterCb struct {
+	id string
+	cb func()
+type Roster struct {
+	Extension
+	get chan []RosterItem
+	callbacks chan rosterCb
+	toServer chan Stanza
 type rosterClient struct {
 	rosterChan   <-chan []RosterItem
 	rosterUpdate chan<- RosterItem
-var (
-	rosterClients = make(map[string]rosterClient)
 // Implicitly becomes part of NewClient's extStanza arg.
 func newRosterQuery(name *xml.Name) interface{} {
 	return &RosterQuery{}
-// Synchronously fetch this entity's roster from the server and cache
-// that information. This is called once from a fairly deep call stack
-// as part of XMPP negotiation.
-func fetchRoster(client *Client) error {
-	rosterUpdate := rosterClients[client.Uid].rosterUpdate
-	iq := &Iq{Header: Header{From: client.Jid.String(), Type: "get",
-		Id: NextId(), Nested: []interface{}{RosterQuery{}}}}
-	ch := make(chan error)
-	f := func(v Stanza) bool {
-		defer close(ch)
-		iq, ok := v.(*Iq)
-		if !ok {
-			ch <- fmt.Errorf("response to iq wasn't iq: %s", v)
-			return false
-		}
-		if iq.Type == "error" {
-			ch <- iq.Error
-			return false
-		}
-		var rq *RosterQuery
-		for _, ele := range iq.Nested {
-			if q, ok := ele.(*RosterQuery); ok {
-				rq = q
-				break
-			}
-		}
-		if rq == nil {
-			ch <- fmt.Errorf(
-				"Roster query result not query: %v", v)
-			return false
-		}
-		for _, item := range rq.Item {
-			rosterUpdate <- item
-		}
-		ch <- nil
-		return false
-	}
-	client.HandleStanza(iq.Id, f)
-	client.Out <- iq
-	// Wait for f to complete.
-	return <-ch
-// The roster filter updates the Client's representation of the
-// roster, but it lets the relevant stanzas through. This also starts
-// the roster feeder, which is the goroutine that provides data on
-// client.Roster.
-func startRosterFilter(client *Client) {
-	out := make(chan Stanza)
-	in := client.AddFilter(out)
-	go func(in <-chan Stanza, out chan<- Stanza) {
-		defer close(out)
-		for st := range in {
-			maybeUpdateRoster(client, st)
-			out <- st
-		}
-	}(in, out)
-	rosterCh := make(chan []RosterItem)
-	rosterUpdate := make(chan RosterItem)
-	rosterClients[client.Uid] = rosterClient{rosterChan: rosterCh,
-		rosterUpdate: rosterUpdate}
-	go feedRoster(rosterCh, rosterUpdate)
-func maybeUpdateRoster(client *Client, st interface{}) {
-	iq, ok := st.(*Iq)
-	if !ok {
-		return
-	}
-	rosterUpdate := rosterClients[client.Uid].rosterUpdate
-	var rq *RosterQuery
-	for _, ele := range iq.Nested {
-		if q, ok := ele.(*RosterQuery); ok {
-			rq = q
-			break
-		}
-	}
-	if iq.Type == "set" && rq != nil {
-		for _, item := range rq.Item {
-			rosterUpdate <- item
-		}
-		// Send a reply.
-		reply := &Iq{Header: Header{To: iq.From, Id: iq.Id,
-			Type: "result"}}
-		client.Out <- reply
-	}
-func feedRoster(rosterCh chan<- []RosterItem, rosterUpdate <-chan RosterItem) {
+func (r *Roster) rosterMgr(upd <-chan Stanza) {
 	roster := make(map[string]RosterItem)
-	snapshot := []RosterItem{}
+	waits := make(map[string]func())
+	var snapshot []RosterItem
 	for {
 		select {
-		case newIt := <-rosterUpdate:
-			if newIt.Subscription == "remove" {
-				delete(roster, newIt.Jid)
-			} else {
-				roster[newIt.Jid] = newIt
+		case stan, ok := <- upd:
+			if !ok {
+				return
+			}
+			hdr := stan.GetHeader()
+			if f := waits[hdr.Id] ; f != nil {
+				delete(waits, hdr.Id)
+				f()
+			}
+			iq, ok := stan.(*Iq)
+			if iq.Type != "set" {
+				continue
-		case rosterCh <- snapshot:
-		}
-		snapshot = make([]RosterItem, 0, len(roster))
-		for _, v := range roster {
-			snapshot = append(snapshot, v)
+			var rq *RosterQuery
+			for _, ele := range iq.Nested {
+				if q, ok := ele.(*RosterQuery); ok {
+					rq = q
+					break
+				}
+			}
+			if rq == nil {
+				continue
+			}
+			for _, item := range rq.Item {
+				roster[item.Jid] = item
+			}
+			snapshot = []RosterItem{}
+			for _, ri := range roster {
+				snapshot = append(snapshot, ri)
+			}
+		case r.get <- snapshot:
+		case cb := <- r.callbacks:
+			waits[] = cb.cb
-// Retrieve a snapshot of the roster for the given Client.
-func Roster(client *Client) []RosterItem {
-	rosterChan := rosterClients[client.Uid].rosterChan
-	return <-rosterChan
+func (r *Roster) makeFilters() (Filter, Filter) {
+	rosterUpdate := make(chan Stanza)
+	go r.rosterMgr(rosterUpdate)
+	recv := func(in <-chan Stanza, out chan<- Stanza) {
+		defer close(out)
+		for stan := range in {
+			rosterUpdate <- stan
+			out <- stan
+		}
+	}
+	send := func(in <-chan Stanza, out chan<- Stanza) {
+		defer close(out)
+		for {
+			select {
+			case stan, ok := <- in:
+				if !ok {
+					return
+				}
+				out <- stan
+			case stan := <- r.toServer:
+				out <- stan
+			}
+		}
+	}
+	return recv, send
+func newRosterExt() *Roster {
+	r := Roster{}
+	r.StanzaHandlers = make(map[string]func(*xml.Name) interface{})
+	r.StanzaHandlers[NsRoster] = newRosterQuery
+	r.RecvFilter, r.SendFilter = r.makeFilters()
+	r.get = make(chan []RosterItem)
+	r.callbacks = make(chan rosterCb)
+	r.toServer = make(chan Stanza)
+	return &r
+// Return the most recent snapshot of the roster status. This is
+// updated automatically as roster updates are received from the
+// server, but especially in response to calls to Update().
+func (r *Roster) Get() []RosterItem {
+	return <-r.get
+// Synchronously fetch this entity's roster from the server and cache
+// that information. The client can access the roster by watching for
+// RosterQuery objects or by calling Get().
+func (r *Roster) Update(client *Client) {
+	iq := &Iq{Header: Header{From: client.Jid.String(), Type: "get",
+		Id: NextId(), Nested: []interface{}{RosterQuery{}}}}
+	waitchan := make(chan int)
+	done := func() {
+		close(waitchan)
+	}
+	r.waitFor(iq.Id, done)
+	r.toServer <- iq
+	<-waitchan
+func (r *Roster) waitFor(id string, cb func()) {
+	r.callbacks <- rosterCb{id: id, cb: cb}
--- a/stream.go	Sat Aug 31 23:08:21 2013 +0100
+++ b/stream.go	Mon Sep 02 20:38:02 2013 -0700
@@ -300,39 +300,6 @@
-// Stanzas from the remote go up through a stack of filters to the
-// app. This function manages the filters.
-func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza,
-	topFilter <-chan Stanza, app chan<- Stanza) {
-	defer close(app)
-	for {
-		select {
-		case newFilterOut := <-filterOut:
-			if newFilterOut == nil {
-				Warn.Log("Received nil filter")
-				filterIn <- nil
-				continue
-			}
-			filterIn <- topFilter
-			topFilter = newFilterOut
-		case data, ok := <-topFilter:
-			if !ok {
-				break Loop
-			}
-			app <- data
-		}
-	}
-func filterBottom(from <-chan Stanza, to chan<- Stanza) {
-	defer close(to)
-	for data := range from {
-		to <- data
-	}
 func handleStream(ss *stream) {
--- a/structs.go	Sat Aug 31 23:08:21 2013 +0100
+++ b/structs.go	Mon Sep 02 20:38:02 2013 -0700
@@ -269,8 +269,7 @@
 	return string(buf)
-var bindExt Extension = Extension{StanzaHandlers: map[string]func(*xml.Name) interface{}{NsBind: newBind},
-	Start: func(cl *Client) {}}
+var bindExt Extension = Extension{StanzaHandlers: map[string]func(*xml.Name) interface{}{NsBind: newBind}}
 func newBind(name *xml.Name) interface{} {
 	return &bindIq{}
--- a/xmpp.go	Sat Aug 31 23:08:21 2013 +0100
+++ b/xmpp.go	Mon Sep 02 20:38:02 2013 -0700
@@ -40,13 +40,24 @@
 	clientSrv = "xmpp-client"
+// A filter can modify the XMPP traffic to or from the remote
+// server. It's part of an Extension. The filter function will be
+// called in a new goroutine, so it doesn't need to return. The filter
+// should close its output when its input is closed.
+type Filter func(in <-chan Stanza, out chan<- Stanza)
 // Extensions can add stanza filters and/or new XML element types.
 type Extension struct {
 	// Maps from an XML namespace to a function which constructs a
 	// structure to hold the contents of stanzas in that
 	// namespace.
 	StanzaHandlers map[string]func(*xml.Name) interface{}
-	Start          func(*Client)
+	// If non-nil, will be called once to start the filter
+	// running. RecvFilter intercepts incoming messages on their
+	// way from the remote server to the application; SendFilter
+	// intercepts messages going the other direction.
+	RecvFilter Filter
+	SendFilter Filter
 // Allows the user to override the TLS configuration.
@@ -76,13 +87,16 @@
 	// channel.
 	Out    chan<- Stanza
 	xmlOut chan<- interface{}
+	// The client's roster is also known as the buddy list. It's
+	// the set of contacts which are known to this JID, or which
+	// this JID is known to.
+	Roster Roster
 	// Features advertised by the remote. This will be updated
 	// asynchronously as new features are received throughout the
 	// connection process. It should not be updated once
 	// StartSession() returns.
 	Features  *Features
-	filterOut chan<- <-chan Stanza
-	filterIn  <-chan <-chan Stanza
+	sendFilterAdd, recvFilterAdd chan Filter
 // Connect to the appropriate server and authenticate as the given JID
@@ -93,7 +107,8 @@
 // binding) is complete.
 func NewClient(jid *JID, password string, exts []Extension) (*Client, error) {
 	// Include the mandatory extensions.
-	exts = append(exts, rosterExt)
+	roster := newRosterExt()
+	exts = append(exts, roster.Extension)
 	exts = append(exts, bindExt)
 	// Resolve the domain in the JID.
@@ -123,6 +138,7 @@
 	cl := new(Client)
+	cl.Roster = *roster
 	cl.Uid = NextId()
 	cl.password = password
 	cl.Jid = *jid
@@ -138,27 +154,33 @@
 	// Start the transport handler, initially unencrypted.
-	tlsr, tlsw := cl.startTransport()
-	// Start the reader and writers that convert to and from XML.
-	xmlIn := startXmlReader(tlsr, extStanza)
-	cl.xmlOut = startXmlWriter(tlsw)
+	recvReader, recvWriter := io.Pipe()
+	sendReader, sendWriter := io.Pipe()
+	go cl.readTransport(recvWriter)
+	go cl.writeTransport(sendReader)
-	// Start the XMPP stream handler which filters stream-level
-	// events and responds to them.
-	stIn := cl.startStreamReader(xmlIn, cl.xmlOut)
-	clOut := cl.startStreamWriter(cl.xmlOut)
-	cl.Out = clOut
+	// Start the reader and writer that convert to and from XML.
+	recvXml := make(chan interface{})
+	go readXml(recvReader, recvXml, extStanza)
+	sendXml := make(chan interface{})
+	cl.xmlOut = sendXml
+	go writeXml(sendWriter, sendXml)
+	// Start the reader and writer that convert between XML and
+	// XMPP stanzas.
+	recvRawXmpp := make(chan Stanza)
+	go cl.readStream(recvXml, recvRawXmpp)
+	sendRawXmpp := make(chan Stanza)
+	go writeStream(sendXml, sendRawXmpp, cl.inputControl)
 	// Start the manager for the filters that can modify what the
 	// app sees.
-	clIn := cl.startFilter(stIn)
-	cl.In = clIn
-	// Add filters for our extensions.
-	for _, ext := range exts {
-		ext.Start(cl)
-	}
+	recvFiltXmpp := make(chan Stanza)
+	cl.In = recvFiltXmpp
+	go filterMgr(cl.recvFilterAdd, recvRawXmpp, recvFiltXmpp)
+	sendFiltXmpp := make(chan Stanza)
+	cl.Out = sendFiltXmpp
+	go filterMgr(cl.sendFilterAdd, sendFiltXmpp, sendFiltXmpp)
 	// Initial handshake.
 	hsOut := &stream{To: jid.Domain, Version: XMPPVersion}
@@ -167,51 +189,6 @@
 	return cl, nil
-func (cl *Client) startTransport() (io.Reader, io.WriteCloser) {
-	inr, inw := io.Pipe()
-	outr, outw := io.Pipe()
-	go cl.readTransport(inw)
-	go cl.writeTransport(outr)
-	return inr, outw
-func startXmlReader(r io.Reader,
-	extStanza map[string]func(*xml.Name) interface{}) <-chan interface{} {
-	ch := make(chan interface{})
-	go readXml(r, ch, extStanza)
-	return ch
-func startXmlWriter(w io.WriteCloser) chan<- interface{} {
-	ch := make(chan interface{})
-	go writeXml(w, ch)
-	return ch
-func (cl *Client) startStreamReader(xmlIn <-chan interface{}, srvOut chan<- interface{}) <-chan Stanza {
-	ch := make(chan Stanza)
-	go cl.readStream(xmlIn, ch)
-	return ch
-func (cl *Client) startStreamWriter(xmlOut chan<- interface{}) chan<- Stanza {
-	ch := make(chan Stanza)
-	go writeStream(xmlOut, ch, cl.inputControl)
-	return ch
-func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza {
-	cliIn := make(chan Stanza)
-	filterOut := make(chan (<-chan Stanza))
-	filterIn := make(chan (<-chan Stanza))
-	nullFilter := make(chan Stanza)
-	go filterBottom(srvIn, nullFilter)
-	go filterTop(filterOut, filterIn, nullFilter, cliIn)
-	cl.filterOut = filterOut
-	cl.filterIn = filterIn
-	return cliIn
 func tee(r io.Reader, w io.Writer, prefix string) {
 	defer func(w io.Writer) {
 		if c, ok := w.(io.Closer); ok {
@@ -253,8 +230,9 @@
 // immediately after creating the Client in order to start the
 // session, retrieve the roster, and broadcast an initial
 // presence. The presence can be as simple as a newly-initialized
-// Presence struct.  See RFC 3921, Section 3.
-func (cl *Client) StartSession(getRoster bool, pr *Presence) error {
+// Presence struct.  See RFC 3921, Section 3. After calling this, a
+// normal client will want to call Roster.Update().
+func (cl *Client) StartSession(pr *Presence) error {
 	id := NextId()
 	iq := &Iq{Header: Header{To: cl.Jid.Domain, Id: id, Type: "set",
 		Nested: []interface{}{Generic{XMLName: xml.Name{Space: NsSession, Local: "session"}}}}}
@@ -281,24 +259,8 @@
 	if err := <-ch; err != nil {
 		return err
-	if getRoster {
-		err := fetchRoster(cl)
-		if err != nil {
-			return err
-		}
-	}
 	if pr != nil {
 		cl.Out <- pr
 	return nil
-// AddFilter adds a new filter to the top of the stack through which
-// incoming stanzas travel on their way up to the client. The new
-// filter's output channel is given to this function, and it returns a
-// new input channel which the filter should read from. When its input
-// channel closes, the filter should close its output channel.
-func (cl *Client) AddFilter(out <-chan Stanza) <-chan Stanza {
-	cl.filterOut <- out
-	return <-cl.filterIn