diff -r 9d7e8333948b -r ebb86cbdd218 roster.go --- a/roster.go Sat Aug 31 23:08:21 2013 +0100 +++ b/roster.go Mon Sep 02 20:38:02 2013 -0700 @@ -4,15 +4,12 @@ package xmpp +// This file contains support for roster management, RFC 3921, Section 7. + import ( "encoding/xml" - "fmt" ) -// This file contains support for roster management, RFC 3921, Section 7. - -var rosterExt Extension = Extension{StanzaHandlers: map[string]func(*xml.Name) interface{}{NsRoster: newRosterQuery}, Start: startRosterFilter} - // Roster query/result type RosterQuery struct { XMLName xml.Name `xml:"jabber:iq:roster query"` @@ -28,134 +25,131 @@ Group []string } +type rosterCb struct { + id string + cb func() +} + +type Roster struct { + Extension + get chan []RosterItem + callbacks chan rosterCb + toServer chan Stanza +} + type rosterClient struct { rosterChan <-chan []RosterItem rosterUpdate chan<- RosterItem } -var ( - rosterClients = make(map[string]rosterClient) -) - // Implicitly becomes part of NewClient's extStanza arg. func newRosterQuery(name *xml.Name) interface{} { return &RosterQuery{} } -// Synchronously fetch this entity's roster from the server and cache -// that information. This is called once from a fairly deep call stack -// as part of XMPP negotiation. -func fetchRoster(client *Client) error { - rosterUpdate := rosterClients[client.Uid].rosterUpdate - - iq := &Iq{Header: Header{From: client.Jid.String(), Type: "get", - Id: NextId(), Nested: []interface{}{RosterQuery{}}}} - ch := make(chan error) - f := func(v Stanza) bool { - defer close(ch) - iq, ok := v.(*Iq) - if !ok { - ch <- fmt.Errorf("response to iq wasn't iq: %s", v) - return false - } - if iq.Type == "error" { - ch <- iq.Error - return false - } - var rq *RosterQuery - for _, ele := range iq.Nested { - if q, ok := ele.(*RosterQuery); ok { - rq = q - break - } - } - if rq == nil { - ch <- fmt.Errorf( - "Roster query result not query: %v", v) - return false - } - for _, item := range rq.Item { - rosterUpdate <- item - } - ch <- nil - return false - } - client.HandleStanza(iq.Id, f) - client.Out <- iq - // Wait for f to complete. - return <-ch -} - -// The roster filter updates the Client's representation of the -// roster, but it lets the relevant stanzas through. This also starts -// the roster feeder, which is the goroutine that provides data on -// client.Roster. -func startRosterFilter(client *Client) { - out := make(chan Stanza) - in := client.AddFilter(out) - go func(in <-chan Stanza, out chan<- Stanza) { - defer close(out) - for st := range in { - maybeUpdateRoster(client, st) - out <- st - } - }(in, out) - - rosterCh := make(chan []RosterItem) - rosterUpdate := make(chan RosterItem) - rosterClients[client.Uid] = rosterClient{rosterChan: rosterCh, - rosterUpdate: rosterUpdate} - go feedRoster(rosterCh, rosterUpdate) -} - -func maybeUpdateRoster(client *Client, st interface{}) { - iq, ok := st.(*Iq) - if !ok { - return - } - - rosterUpdate := rosterClients[client.Uid].rosterUpdate - - var rq *RosterQuery - for _, ele := range iq.Nested { - if q, ok := ele.(*RosterQuery); ok { - rq = q - break - } - } - if iq.Type == "set" && rq != nil { - for _, item := range rq.Item { - rosterUpdate <- item - } - // Send a reply. - reply := &Iq{Header: Header{To: iq.From, Id: iq.Id, - Type: "result"}} - client.Out <- reply - } -} - -func feedRoster(rosterCh chan<- []RosterItem, rosterUpdate <-chan RosterItem) { +func (r *Roster) rosterMgr(upd <-chan Stanza) { roster := make(map[string]RosterItem) - snapshot := []RosterItem{} + waits := make(map[string]func()) + var snapshot []RosterItem for { select { - case newIt := <-rosterUpdate: - if newIt.Subscription == "remove" { - delete(roster, newIt.Jid) - } else { - roster[newIt.Jid] = newIt + case stan, ok := <- upd: + if !ok { + return + } + hdr := stan.GetHeader() + if f := waits[hdr.Id] ; f != nil { + delete(waits, hdr.Id) + f() + } + iq, ok := stan.(*Iq) + if iq.Type != "set" { + continue } - case rosterCh <- snapshot: - } - snapshot = make([]RosterItem, 0, len(roster)) - for _, v := range roster { - snapshot = append(snapshot, v) + var rq *RosterQuery + for _, ele := range iq.Nested { + if q, ok := ele.(*RosterQuery); ok { + rq = q + break + } + } + if rq == nil { + continue + } + for _, item := range rq.Item { + roster[item.Jid] = item + } + snapshot = []RosterItem{} + for _, ri := range roster { + snapshot = append(snapshot, ri) + } + case r.get <- snapshot: + case cb := <- r.callbacks: + waits[cb.id] = cb.cb } } } -// Retrieve a snapshot of the roster for the given Client. -func Roster(client *Client) []RosterItem { - rosterChan := rosterClients[client.Uid].rosterChan - return <-rosterChan +func (r *Roster) makeFilters() (Filter, Filter) { + rosterUpdate := make(chan Stanza) + go r.rosterMgr(rosterUpdate) + recv := func(in <-chan Stanza, out chan<- Stanza) { + defer close(out) + for stan := range in { + rosterUpdate <- stan + out <- stan + } + } + send := func(in <-chan Stanza, out chan<- Stanza) { + defer close(out) + for { + select { + case stan, ok := <- in: + if !ok { + return + } + out <- stan + case stan := <- r.toServer: + out <- stan + } + } + } + return recv, send } + +func newRosterExt() *Roster { + r := Roster{} + r.StanzaHandlers = make(map[string]func(*xml.Name) interface{}) + r.StanzaHandlers[NsRoster] = newRosterQuery + r.RecvFilter, r.SendFilter = r.makeFilters() + r.get = make(chan []RosterItem) + r.callbacks = make(chan rosterCb) + r.toServer = make(chan Stanza) + return &r +} + +// Return the most recent snapshot of the roster status. This is +// updated automatically as roster updates are received from the +// server, but especially in response to calls to Update(). +func (r *Roster) Get() []RosterItem { + return <-r.get +} + +// Synchronously fetch this entity's roster from the server and cache +// that information. The client can access the roster by watching for +// RosterQuery objects or by calling Get(). +func (r *Roster) Update(client *Client) { + iq := &Iq{Header: Header{From: client.Jid.String(), Type: "get", + Id: NextId(), Nested: []interface{}{RosterQuery{}}}} + waitchan := make(chan int) + done := func() { + close(waitchan) + } + r.waitFor(iq.Id, done) + r.toServer <- iq + <-waitchan +} + +func (r *Roster) waitFor(id string, cb func()) { + r.callbacks <- rosterCb{id: id, cb: cb} +}