--- a/roster.go Sat Aug 31 23:08:21 2013 +0100
+++ b/roster.go Mon Sep 02 20:38:02 2013 -0700
@@ -4,15 +4,12 @@
package xmpp
+// This file contains support for roster management, RFC 3921, Section 7.
+
import (
"encoding/xml"
- "fmt"
)
-// This file contains support for roster management, RFC 3921, Section 7.
-
-var rosterExt Extension = Extension{StanzaHandlers: map[string]func(*xml.Name) interface{}{NsRoster: newRosterQuery}, Start: startRosterFilter}
-
// Roster query/result
type RosterQuery struct {
XMLName xml.Name `xml:"jabber:iq:roster query"`
@@ -28,134 +25,131 @@
Group []string
}
+type rosterCb struct {
+ id string
+ cb func()
+}
+
+type Roster struct {
+ Extension
+ get chan []RosterItem
+ callbacks chan rosterCb
+ toServer chan Stanza
+}
+
type rosterClient struct {
rosterChan <-chan []RosterItem
rosterUpdate chan<- RosterItem
}
-var (
- rosterClients = make(map[string]rosterClient)
-)
-
// Implicitly becomes part of NewClient's extStanza arg.
func newRosterQuery(name *xml.Name) interface{} {
return &RosterQuery{}
}
-// Synchronously fetch this entity's roster from the server and cache
-// that information. This is called once from a fairly deep call stack
-// as part of XMPP negotiation.
-func fetchRoster(client *Client) error {
- rosterUpdate := rosterClients[client.Uid].rosterUpdate
-
- iq := &Iq{Header: Header{From: client.Jid.String(), Type: "get",
- Id: NextId(), Nested: []interface{}{RosterQuery{}}}}
- ch := make(chan error)
- f := func(v Stanza) bool {
- defer close(ch)
- iq, ok := v.(*Iq)
- if !ok {
- ch <- fmt.Errorf("response to iq wasn't iq: %s", v)
- return false
- }
- if iq.Type == "error" {
- ch <- iq.Error
- return false
- }
- var rq *RosterQuery
- for _, ele := range iq.Nested {
- if q, ok := ele.(*RosterQuery); ok {
- rq = q
- break
- }
- }
- if rq == nil {
- ch <- fmt.Errorf(
- "Roster query result not query: %v", v)
- return false
- }
- for _, item := range rq.Item {
- rosterUpdate <- item
- }
- ch <- nil
- return false
- }
- client.HandleStanza(iq.Id, f)
- client.Out <- iq
- // Wait for f to complete.
- return <-ch
-}
-
-// The roster filter updates the Client's representation of the
-// roster, but it lets the relevant stanzas through. This also starts
-// the roster feeder, which is the goroutine that provides data on
-// client.Roster.
-func startRosterFilter(client *Client) {
- out := make(chan Stanza)
- in := client.AddFilter(out)
- go func(in <-chan Stanza, out chan<- Stanza) {
- defer close(out)
- for st := range in {
- maybeUpdateRoster(client, st)
- out <- st
- }
- }(in, out)
-
- rosterCh := make(chan []RosterItem)
- rosterUpdate := make(chan RosterItem)
- rosterClients[client.Uid] = rosterClient{rosterChan: rosterCh,
- rosterUpdate: rosterUpdate}
- go feedRoster(rosterCh, rosterUpdate)
-}
-
-func maybeUpdateRoster(client *Client, st interface{}) {
- iq, ok := st.(*Iq)
- if !ok {
- return
- }
-
- rosterUpdate := rosterClients[client.Uid].rosterUpdate
-
- var rq *RosterQuery
- for _, ele := range iq.Nested {
- if q, ok := ele.(*RosterQuery); ok {
- rq = q
- break
- }
- }
- if iq.Type == "set" && rq != nil {
- for _, item := range rq.Item {
- rosterUpdate <- item
- }
- // Send a reply.
- reply := &Iq{Header: Header{To: iq.From, Id: iq.Id,
- Type: "result"}}
- client.Out <- reply
- }
-}
-
-func feedRoster(rosterCh chan<- []RosterItem, rosterUpdate <-chan RosterItem) {
+func (r *Roster) rosterMgr(upd <-chan Stanza) {
roster := make(map[string]RosterItem)
- snapshot := []RosterItem{}
+ waits := make(map[string]func())
+ var snapshot []RosterItem
for {
select {
- case newIt := <-rosterUpdate:
- if newIt.Subscription == "remove" {
- delete(roster, newIt.Jid)
- } else {
- roster[newIt.Jid] = newIt
+ case stan, ok := <- upd:
+ if !ok {
+ return
+ }
+ hdr := stan.GetHeader()
+ if f := waits[hdr.Id] ; f != nil {
+ delete(waits, hdr.Id)
+ f()
+ }
+ iq, ok := stan.(*Iq)
+ if iq.Type != "set" {
+ continue
}
- case rosterCh <- snapshot:
- }
- snapshot = make([]RosterItem, 0, len(roster))
- for _, v := range roster {
- snapshot = append(snapshot, v)
+ var rq *RosterQuery
+ for _, ele := range iq.Nested {
+ if q, ok := ele.(*RosterQuery); ok {
+ rq = q
+ break
+ }
+ }
+ if rq == nil {
+ continue
+ }
+ for _, item := range rq.Item {
+ roster[item.Jid] = item
+ }
+ snapshot = []RosterItem{}
+ for _, ri := range roster {
+ snapshot = append(snapshot, ri)
+ }
+ case r.get <- snapshot:
+ case cb := <- r.callbacks:
+ waits[cb.id] = cb.cb
}
}
}
-// Retrieve a snapshot of the roster for the given Client.
-func Roster(client *Client) []RosterItem {
- rosterChan := rosterClients[client.Uid].rosterChan
- return <-rosterChan
+func (r *Roster) makeFilters() (Filter, Filter) {
+ rosterUpdate := make(chan Stanza)
+ go r.rosterMgr(rosterUpdate)
+ recv := func(in <-chan Stanza, out chan<- Stanza) {
+ defer close(out)
+ for stan := range in {
+ rosterUpdate <- stan
+ out <- stan
+ }
+ }
+ send := func(in <-chan Stanza, out chan<- Stanza) {
+ defer close(out)
+ for {
+ select {
+ case stan, ok := <- in:
+ if !ok {
+ return
+ }
+ out <- stan
+ case stan := <- r.toServer:
+ out <- stan
+ }
+ }
+ }
+ return recv, send
}
+
+func newRosterExt() *Roster {
+ r := Roster{}
+ r.StanzaHandlers = make(map[string]func(*xml.Name) interface{})
+ r.StanzaHandlers[NsRoster] = newRosterQuery
+ r.RecvFilter, r.SendFilter = r.makeFilters()
+ r.get = make(chan []RosterItem)
+ r.callbacks = make(chan rosterCb)
+ r.toServer = make(chan Stanza)
+ return &r
+}
+
+// Return the most recent snapshot of the roster status. This is
+// updated automatically as roster updates are received from the
+// server, but especially in response to calls to Update().
+func (r *Roster) Get() []RosterItem {
+ return <-r.get
+}
+
+// Synchronously fetch this entity's roster from the server and cache
+// that information. The client can access the roster by watching for
+// RosterQuery objects or by calling Get().
+func (r *Roster) Update(client *Client) {
+ iq := &Iq{Header: Header{From: client.Jid.String(), Type: "get",
+ Id: NextId(), Nested: []interface{}{RosterQuery{}}}}
+ waitchan := make(chan int)
+ done := func() {
+ close(waitchan)
+ }
+ r.waitFor(iq.Id, done)
+ r.toServer <- iq
+ <-waitchan
+}
+
+func (r *Roster) waitFor(id string, cb func()) {
+ r.callbacks <- rosterCb{id: id, cb: cb}
+}