Revamped how the roster works. We're now using a channel to transmit snapshots
authorChris Jones <chris@cjones.org>
Thu, 05 Jan 2012 23:14:14 -0700
changeset 57 e6cb3f049137
parent 56 74686b8c9146
child 58 c0e8778bdb80
Revamped how the roster works. We're now using a channel to transmit snapshots of the roster, which is thread-safe. Also found and fixed a bug that was preventing new filters from being sent more than one stanza ever.
examples/interact.go
roster.go
stream.go
xmpp.go
--- a/examples/interact.go	Thu Jan 05 19:53:37 2012 -0700
+++ b/examples/interact.go	Thu Jan 05 23:14:14 2012 -0700
@@ -34,10 +34,10 @@
 	if err != nil {
 		log.Fatalf("StartSession: %v", err)
 	}
-	roster := c.Roster()
+	roster := xmpp.Roster(c)
 	fmt.Printf("%d roster entries:\n", len(roster))
-	for jid, entry := range(roster) {
-		fmt.Printf("%s: %v\n", jid, entry)
+	for i, entry := range(roster) {
+		fmt.Printf("%d: %v\n", i, entry)
 	}
 
 	go func(ch <-chan xmpp.Stanza) {
--- a/roster.go	Thu Jan 05 19:53:37 2012 -0700
+++ b/roster.go	Thu Jan 05 23:14:14 2012 -0700
@@ -27,18 +27,31 @@
 	Group []string
 }
 
+type rosterClient struct {
+	rosterChan <-chan []RosterItem
+	rosterUpdate chan<- RosterItem
+}
+
+var (
+	rosterClients = make(map[string] rosterClient)
+)
+
 // Implicitly becomes part of NewClient's extStanza arg.
 func newRosterQuery(name *xml.Name) interface{} {
 	return &RosterQuery{}
 }
 
 // Synchronously fetch this entity's roster from the server and cache
-// that information.
-func (cl *Client) fetchRoster() os.Error {
-	iq := &Iq{From: cl.Jid.String(), Id: <- cl.Id, Type: "get",
+// that information. This is called once from a fairly deep call stack
+// as part of XMPP negotiation.
+func fetchRoster(client *Client) os.Error {
+	rosterUpdate := rosterClients[client.Uid].rosterUpdate
+
+	iq := &Iq{From: client.Jid.String(), Id: <- Id, Type: "get",
 		Nested: RosterQuery{}}
 	ch := make(chan os.Error)
 	f := func(st Stanza) bool {
+		defer close(ch)
 		if iq.Type == "error" {
 			ch <- iq.Error
 			return false
@@ -49,59 +62,74 @@
 				"Roster query result not query: %v", st))
 			return false
 		}
-		cl.roster = make(map[string] *RosterItem, len(rq.Item))
-		for i, item := range(rq.Item) {
-			cl.roster[item.Jid] = &rq.Item[i]
+		for _, item := range(rq.Item) {
+			rosterUpdate <- item
 		}
 		ch <- nil
 		return false
 	}
-	cl.HandleStanza(iq.Id, f)
-	cl.Out <- iq
+	client.HandleStanza(iq.Id, f)
+	client.Out <- iq
 	// Wait for f to complete.
 	return <- ch
 }
 
-// Returns the current roster of other entities which this one has a
-// relationship with. Changes to the roster will be signaled by an
-// appropriate Iq appearing on Client.In. See RFC 3921, Section 7.4.
-func (cl *Client) Roster() map[string] *RosterItem {
-	r := make(map[string] *RosterItem)
-	for key, val := range(cl.roster) {
-		r[key] = val
-	}
-	return r
-}
-
 // The roster filter updates the Client's representation of the
-// roster, but it lets the relevant stanzas through.
-func (cl *Client) startRosterFilter() {
+// roster, but it lets the relevant stanzas through. This also starts
+// the roster feeder, which is the goroutine that provides data on
+// client.Roster.
+func startRosterFilter(client *Client) {
 	out := make(chan Stanza)
-	in := cl.AddFilter(out)
+	in := client.AddFilter(out)
 	go func(in <-chan Stanza, out chan<- Stanza) {
 		defer close(out)
 		for st := range(in) {
-			cl.maybeUpdateRoster(st)
+			maybeUpdateRoster(client, st)
 			out <- st
 		}
 	}(in, out)
+
+	rosterCh := make(chan []RosterItem)
+	rosterUpdate := make(chan RosterItem)
+	rosterClients[client.Uid] = rosterClient{rosterChan: rosterCh,
+		rosterUpdate: rosterUpdate}
+	go feedRoster(rosterCh, rosterUpdate)
 }
 
-// BUG(cjyar) This isn't getting updates.
-// BUG(cjyar) This isn't actually thread safe, though it's unlikely it
-// will fail in practice. Either the roster should be protected with a
-// mutex, or we should make the roster available on a channel instead
-// of via a method call.
 // BUG(cjyar) RFC 3921, Section 7.4 says we need to reply.
-func (cl *Client) maybeUpdateRoster(st Stanza) {
+func maybeUpdateRoster(client *Client, st Stanza) {
+	rosterUpdate := rosterClients[client.Uid].rosterUpdate
+
 	rq, ok := st.GetNested().(*RosterQuery)
 	if st.GetName() == "iq" && st.GetType() == "set" && ok {
-		for i, item := range(rq.Item) {
-			if item.Subscription == "remove" {
-				cl.roster[item.Jid] = nil
-			} else {
-				cl.roster[item.Jid] = &rq.Item[i]
-			}
+		for _, item := range(rq.Item) {
+			rosterUpdate <- item
 		}
 	}
 }
+
+func feedRoster(rosterCh chan<- []RosterItem, rosterUpdate <-chan RosterItem) {
+	roster := make(map[string] RosterItem)
+	snapshot := []RosterItem{}
+	for {
+		select {
+		case newIt := <-rosterUpdate:
+			if newIt.Subscription == "remove" {
+				roster[newIt.Jid] = RosterItem{}, false
+			} else {
+				roster[newIt.Jid] = newIt
+			}
+		case rosterCh <- snapshot:
+		}
+		snapshot = make([]RosterItem, 0, len(roster))
+		for _, v := range(roster) {
+			snapshot = append(snapshot, v)
+		}
+	}
+}
+
+// Retrieve a snapshot of the roster for the given Client.
+func Roster(client *Client) []RosterItem {
+	rosterChan := rosterClients[client.Uid].rosterChan
+	return <- rosterChan
+}
--- a/stream.go	Thu Jan 05 19:53:37 2012 -0700
+++ b/stream.go	Thu Jan 05 23:14:14 2012 -0700
@@ -309,6 +309,7 @@
 				continue
 			}
 			filterIn <- topFilter
+			topFilter = newFilterOut
 
 		case data, ok := <-topFilter:
 			if !ok {
@@ -593,7 +594,7 @@
 	if res != "" {
 		bindReq.Resource = &res
 	}
-	msg := &Iq{Type: "set", Id: <- cl.Id, Nested: &bindReq}
+	msg := &Iq{Type: "set", Id: <- Id, Nested: &bindReq}
 	f := func(st Stanza) bool {
 		if st.GetType() == "error" {
 			log.Println("Resource binding failed")
--- a/xmpp.go	Thu Jan 05 19:53:37 2012 -0700
+++ b/xmpp.go	Thu Jan 05 23:14:14 2012 -0700
@@ -44,8 +44,30 @@
 	debug = false
 )
 
+// This channel may be used as a convenient way to generate a unique
+// id for an iq, message, or presence stanza.
+var Id <-chan string
+
+func init() {
+	// Start the unique id generator.
+	idCh := make(chan string)
+	Id = idCh
+	go func(ch chan<- string) {
+		id := int64(1)
+		for {
+			str := fmt.Sprintf("id_%d", id)
+			ch <- str
+			id++
+		}
+	}(idCh)
+}
+
 // The client in a client-server XMPP connection.
 type Client struct {
+	// This client's unique ID. It's unique within the context of
+	// this process, so if multiple Client objects exist, each
+	// will be distinguishable by its Uid.
+	Uid string
 	// This client's JID. This will be updated asynchronously by
 	// the time StartSession() returns.
 	Jid JID
@@ -56,9 +78,6 @@
 	authDone bool
 	handlers chan *stanzaHandler
 	inputControl chan int
-	// This channel may be used as a convenient way to generate a
-	// unique id for an iq, message, or presence stanza.
-	Id <-chan string
 	// Incoming XMPP stanzas from the server will be published on
 	// this channel. Information which is only used by this
 	// library to set up the XMPP stream will not appear here.
@@ -72,12 +91,15 @@
 	// connection process. It should not be updated once
 	// StartSession() returns.
 	Features *Features
-	roster map[string] *RosterItem
 	filterOut chan<- <-chan Stanza
 	filterIn <-chan <-chan Stanza
 }
 var _ io.Closer = &Client{}
 
+// BUG(cjyar) Replace extStanza with a generalized extension interface
+// that handles starting filters, registering extended stanzes, and
+// anything else an extension has to do.
+
 // Connect to the appropriate server and authenticate as the given JID
 // with the given password. This function will return as soon as a TCP
 // connection has been established, but before XMPP stream negotiation
@@ -114,13 +136,12 @@
 	}
 
 	cl := new(Client)
+	cl.Uid = <- Id
 	cl.password = password
 	cl.Jid = *jid
 	cl.socket = tcp
 	cl.handlers = make(chan *stanzaHandler, 100)
 	cl.inputControl = make(chan int)
-	idCh := make(chan string)
-	cl.Id = idCh
 
 	if extStanza == nil {
 		extStanza = make(map[string] func(*xml.Name) interface{})
@@ -128,9 +149,6 @@
 	extStanza[NsRoster] = newRosterQuery
 	extStanza[NsBind] = newBind
 
-	// Start the unique id generator.
-	go makeIds(idCh)
-
 	// Start the transport handler, initially unencrypted.
 	tlsr, tlsw := cl.startTransport()
 
@@ -146,7 +164,7 @@
 	// Start the manager for the filters that can modify what the
 	// app sees.
 	clIn := cl.startFilter(stIn)
-	cl.startRosterFilter()
+	startRosterFilter(cl)
 
 	// Initial handshake.
 	hsOut := &stream{To: jid.Domain, Version: Version}
@@ -259,15 +277,6 @@
 	}
 }
 
-func makeIds(ch chan<- string) {
-	id := int64(1)
-	for {
-		str := fmt.Sprintf("id_%d", id)
-		ch <- str
-		id++
-	}
-}
-
 // bindDone is called when we've finished resource binding (and all
 // the negotiations that precede it). Now we can start accepting
 // traffic from the app.
@@ -281,7 +290,7 @@
 // presence. The presence can be as simple as a newly-initialized
 // Presence struct.  See RFC 3921, Section 3.
 func (cl *Client) StartSession(getRoster bool, pr *Presence) os.Error {
-	id := <- cl.Id
+	id := <- Id
 	iq := &Iq{To: cl.Jid.Domain, Id: id, Type: "set", Any:
 		&Generic{XMLName: xml.Name{Space: NsSession, Local:
 				"session"}}}
@@ -303,7 +312,7 @@
 		return err
 	}
 	if getRoster {
-		err := cl.fetchRoster()
+		err := fetchRoster(cl)
 		if err != nil {
 			return err
 		}