roster.go
changeset 121 ebb86cbdd218
parent 118 fb9bb98a8d70
child 124 34e917ca6a11
--- a/roster.go	Sat Aug 31 23:08:21 2013 +0100
+++ b/roster.go	Mon Sep 02 20:38:02 2013 -0700
@@ -4,15 +4,12 @@
 
 package xmpp
 
+// This file contains support for roster management, RFC 3921, Section 7.
+
 import (
 	"encoding/xml"
-	"fmt"
 )
 
-// This file contains support for roster management, RFC 3921, Section 7.
-
-var rosterExt Extension = Extension{StanzaHandlers: map[string]func(*xml.Name) interface{}{NsRoster: newRosterQuery}, Start: startRosterFilter}
-
 // Roster query/result
 type RosterQuery struct {
 	XMLName xml.Name     `xml:"jabber:iq:roster query"`
@@ -28,134 +25,131 @@
 	Group        []string
 }
 
+type rosterCb struct {
+	id string
+	cb func()
+}
+
+type Roster struct {
+	Extension
+	get chan []RosterItem
+	callbacks chan rosterCb
+	toServer chan Stanza
+}
+
 type rosterClient struct {
 	rosterChan   <-chan []RosterItem
 	rosterUpdate chan<- RosterItem
 }
 
-var (
-	rosterClients = make(map[string]rosterClient)
-)
-
 // Implicitly becomes part of NewClient's extStanza arg.
 func newRosterQuery(name *xml.Name) interface{} {
 	return &RosterQuery{}
 }
 
-// Synchronously fetch this entity's roster from the server and cache
-// that information. This is called once from a fairly deep call stack
-// as part of XMPP negotiation.
-func fetchRoster(client *Client) error {
-	rosterUpdate := rosterClients[client.Uid].rosterUpdate
-
-	iq := &Iq{Header: Header{From: client.Jid.String(), Type: "get",
-		Id: NextId(), Nested: []interface{}{RosterQuery{}}}}
-	ch := make(chan error)
-	f := func(v Stanza) bool {
-		defer close(ch)
-		iq, ok := v.(*Iq)
-		if !ok {
-			ch <- fmt.Errorf("response to iq wasn't iq: %s", v)
-			return false
-		}
-		if iq.Type == "error" {
-			ch <- iq.Error
-			return false
-		}
-		var rq *RosterQuery
-		for _, ele := range iq.Nested {
-			if q, ok := ele.(*RosterQuery); ok {
-				rq = q
-				break
-			}
-		}
-		if rq == nil {
-			ch <- fmt.Errorf(
-				"Roster query result not query: %v", v)
-			return false
-		}
-		for _, item := range rq.Item {
-			rosterUpdate <- item
-		}
-		ch <- nil
-		return false
-	}
-	client.HandleStanza(iq.Id, f)
-	client.Out <- iq
-	// Wait for f to complete.
-	return <-ch
-}
-
-// The roster filter updates the Client's representation of the
-// roster, but it lets the relevant stanzas through. This also starts
-// the roster feeder, which is the goroutine that provides data on
-// client.Roster.
-func startRosterFilter(client *Client) {
-	out := make(chan Stanza)
-	in := client.AddFilter(out)
-	go func(in <-chan Stanza, out chan<- Stanza) {
-		defer close(out)
-		for st := range in {
-			maybeUpdateRoster(client, st)
-			out <- st
-		}
-	}(in, out)
-
-	rosterCh := make(chan []RosterItem)
-	rosterUpdate := make(chan RosterItem)
-	rosterClients[client.Uid] = rosterClient{rosterChan: rosterCh,
-		rosterUpdate: rosterUpdate}
-	go feedRoster(rosterCh, rosterUpdate)
-}
-
-func maybeUpdateRoster(client *Client, st interface{}) {
-	iq, ok := st.(*Iq)
-	if !ok {
-		return
-	}
-
-	rosterUpdate := rosterClients[client.Uid].rosterUpdate
-
-	var rq *RosterQuery
-	for _, ele := range iq.Nested {
-		if q, ok := ele.(*RosterQuery); ok {
-			rq = q
-			break
-		}
-	}
-	if iq.Type == "set" && rq != nil {
-		for _, item := range rq.Item {
-			rosterUpdate <- item
-		}
-		// Send a reply.
-		reply := &Iq{Header: Header{To: iq.From, Id: iq.Id,
-			Type: "result"}}
-		client.Out <- reply
-	}
-}
-
-func feedRoster(rosterCh chan<- []RosterItem, rosterUpdate <-chan RosterItem) {
+func (r *Roster) rosterMgr(upd <-chan Stanza) {
 	roster := make(map[string]RosterItem)
-	snapshot := []RosterItem{}
+	waits := make(map[string]func())
+	var snapshot []RosterItem
 	for {
 		select {
-		case newIt := <-rosterUpdate:
-			if newIt.Subscription == "remove" {
-				delete(roster, newIt.Jid)
-			} else {
-				roster[newIt.Jid] = newIt
+		case stan, ok := <- upd:
+			if !ok {
+				return
+			}
+			hdr := stan.GetHeader()
+			if f := waits[hdr.Id] ; f != nil {
+				delete(waits, hdr.Id)
+				f()
+			}
+			iq, ok := stan.(*Iq)
+			if iq.Type != "set" {
+				continue
 			}
-		case rosterCh <- snapshot:
-		}
-		snapshot = make([]RosterItem, 0, len(roster))
-		for _, v := range roster {
-			snapshot = append(snapshot, v)
+			var rq *RosterQuery
+			for _, ele := range iq.Nested {
+				if q, ok := ele.(*RosterQuery); ok {
+					rq = q
+					break
+				}
+			}
+			if rq == nil {
+				continue
+			}
+			for _, item := range rq.Item {
+				roster[item.Jid] = item
+			}
+			snapshot = []RosterItem{}
+			for _, ri := range roster {
+				snapshot = append(snapshot, ri)
+			}
+		case r.get <- snapshot:
+		case cb := <- r.callbacks:
+			waits[cb.id] = cb.cb
 		}
 	}
 }
 
-// Retrieve a snapshot of the roster for the given Client.
-func Roster(client *Client) []RosterItem {
-	rosterChan := rosterClients[client.Uid].rosterChan
-	return <-rosterChan
+func (r *Roster) makeFilters() (Filter, Filter) {
+	rosterUpdate := make(chan Stanza)
+	go r.rosterMgr(rosterUpdate)
+	recv := func(in <-chan Stanza, out chan<- Stanza) {
+		defer close(out)
+		for stan := range in {
+			rosterUpdate <- stan
+			out <- stan
+		}
+	}
+	send := func(in <-chan Stanza, out chan<- Stanza) {
+		defer close(out)
+		for {
+			select {
+			case stan, ok := <- in:
+				if !ok {
+					return
+				}
+				out <- stan
+			case stan := <- r.toServer:
+				out <- stan
+			}
+		}
+	}
+	return recv, send
 }
+
+func newRosterExt() *Roster {
+	r := Roster{}
+	r.StanzaHandlers = make(map[string]func(*xml.Name) interface{})
+	r.StanzaHandlers[NsRoster] = newRosterQuery
+	r.RecvFilter, r.SendFilter = r.makeFilters()
+	r.get = make(chan []RosterItem)
+	r.callbacks = make(chan rosterCb)
+	r.toServer = make(chan Stanza)
+	return &r
+}
+
+// Return the most recent snapshot of the roster status. This is
+// updated automatically as roster updates are received from the
+// server, but especially in response to calls to Update().
+func (r *Roster) Get() []RosterItem {
+	return <-r.get
+}
+
+// Synchronously fetch this entity's roster from the server and cache
+// that information. The client can access the roster by watching for
+// RosterQuery objects or by calling Get().
+func (r *Roster) Update(client *Client) {
+	iq := &Iq{Header: Header{From: client.Jid.String(), Type: "get",
+		Id: NextId(), Nested: []interface{}{RosterQuery{}}}}
+	waitchan := make(chan int)
+	done := func() {
+		close(waitchan)
+	}
+	r.waitFor(iq.Id, done)
+	r.toServer <- iq
+	<-waitchan
+}
+
+func (r *Roster) waitFor(id string, cb func()) {
+	r.callbacks <- rosterCb{id: id, cb: cb}
+}