26 Subscription string `xml:"subscription,attr"` |
23 Subscription string `xml:"subscription,attr"` |
27 Name string `xml:"name,attr"` |
24 Name string `xml:"name,attr"` |
28 Group []string |
25 Group []string |
29 } |
26 } |
30 |
27 |
|
28 type rosterCb struct { |
|
29 id string |
|
30 cb func() |
|
31 } |
|
32 |
|
33 type Roster struct { |
|
34 Extension |
|
35 get chan []RosterItem |
|
36 callbacks chan rosterCb |
|
37 toServer chan Stanza |
|
38 } |
|
39 |
31 type rosterClient struct { |
40 type rosterClient struct { |
32 rosterChan <-chan []RosterItem |
41 rosterChan <-chan []RosterItem |
33 rosterUpdate chan<- RosterItem |
42 rosterUpdate chan<- RosterItem |
34 } |
43 } |
35 |
|
36 var ( |
|
37 rosterClients = make(map[string]rosterClient) |
|
38 ) |
|
39 |
44 |
40 // Implicitly becomes part of NewClient's extStanza arg. |
45 // Implicitly becomes part of NewClient's extStanza arg. |
41 func newRosterQuery(name *xml.Name) interface{} { |
46 func newRosterQuery(name *xml.Name) interface{} { |
42 return &RosterQuery{} |
47 return &RosterQuery{} |
43 } |
48 } |
44 |
49 |
45 // Synchronously fetch this entity's roster from the server and cache |
50 func (r *Roster) rosterMgr(upd <-chan Stanza) { |
46 // that information. This is called once from a fairly deep call stack |
|
47 // as part of XMPP negotiation. |
|
48 func fetchRoster(client *Client) error { |
|
49 rosterUpdate := rosterClients[client.Uid].rosterUpdate |
|
50 |
|
51 iq := &Iq{Header: Header{From: client.Jid.String(), Type: "get", |
|
52 Id: NextId(), Nested: []interface{}{RosterQuery{}}}} |
|
53 ch := make(chan error) |
|
54 f := func(v Stanza) bool { |
|
55 defer close(ch) |
|
56 iq, ok := v.(*Iq) |
|
57 if !ok { |
|
58 ch <- fmt.Errorf("response to iq wasn't iq: %s", v) |
|
59 return false |
|
60 } |
|
61 if iq.Type == "error" { |
|
62 ch <- iq.Error |
|
63 return false |
|
64 } |
|
65 var rq *RosterQuery |
|
66 for _, ele := range iq.Nested { |
|
67 if q, ok := ele.(*RosterQuery); ok { |
|
68 rq = q |
|
69 break |
|
70 } |
|
71 } |
|
72 if rq == nil { |
|
73 ch <- fmt.Errorf( |
|
74 "Roster query result not query: %v", v) |
|
75 return false |
|
76 } |
|
77 for _, item := range rq.Item { |
|
78 rosterUpdate <- item |
|
79 } |
|
80 ch <- nil |
|
81 return false |
|
82 } |
|
83 client.HandleStanza(iq.Id, f) |
|
84 client.Out <- iq |
|
85 // Wait for f to complete. |
|
86 return <-ch |
|
87 } |
|
88 |
|
89 // The roster filter updates the Client's representation of the |
|
90 // roster, but it lets the relevant stanzas through. This also starts |
|
91 // the roster feeder, which is the goroutine that provides data on |
|
92 // client.Roster. |
|
93 func startRosterFilter(client *Client) { |
|
94 out := make(chan Stanza) |
|
95 in := client.AddFilter(out) |
|
96 go func(in <-chan Stanza, out chan<- Stanza) { |
|
97 defer close(out) |
|
98 for st := range in { |
|
99 maybeUpdateRoster(client, st) |
|
100 out <- st |
|
101 } |
|
102 }(in, out) |
|
103 |
|
104 rosterCh := make(chan []RosterItem) |
|
105 rosterUpdate := make(chan RosterItem) |
|
106 rosterClients[client.Uid] = rosterClient{rosterChan: rosterCh, |
|
107 rosterUpdate: rosterUpdate} |
|
108 go feedRoster(rosterCh, rosterUpdate) |
|
109 } |
|
110 |
|
111 func maybeUpdateRoster(client *Client, st interface{}) { |
|
112 iq, ok := st.(*Iq) |
|
113 if !ok { |
|
114 return |
|
115 } |
|
116 |
|
117 rosterUpdate := rosterClients[client.Uid].rosterUpdate |
|
118 |
|
119 var rq *RosterQuery |
|
120 for _, ele := range iq.Nested { |
|
121 if q, ok := ele.(*RosterQuery); ok { |
|
122 rq = q |
|
123 break |
|
124 } |
|
125 } |
|
126 if iq.Type == "set" && rq != nil { |
|
127 for _, item := range rq.Item { |
|
128 rosterUpdate <- item |
|
129 } |
|
130 // Send a reply. |
|
131 reply := &Iq{Header: Header{To: iq.From, Id: iq.Id, |
|
132 Type: "result"}} |
|
133 client.Out <- reply |
|
134 } |
|
135 } |
|
136 |
|
137 func feedRoster(rosterCh chan<- []RosterItem, rosterUpdate <-chan RosterItem) { |
|
138 roster := make(map[string]RosterItem) |
51 roster := make(map[string]RosterItem) |
139 snapshot := []RosterItem{} |
52 waits := make(map[string]func()) |
|
53 var snapshot []RosterItem |
140 for { |
54 for { |
141 select { |
55 select { |
142 case newIt := <-rosterUpdate: |
56 case stan, ok := <- upd: |
143 if newIt.Subscription == "remove" { |
57 if !ok { |
144 delete(roster, newIt.Jid) |
58 return |
145 } else { |
|
146 roster[newIt.Jid] = newIt |
|
147 } |
59 } |
148 case rosterCh <- snapshot: |
60 hdr := stan.GetHeader() |
149 } |
61 if f := waits[hdr.Id] ; f != nil { |
150 snapshot = make([]RosterItem, 0, len(roster)) |
62 delete(waits, hdr.Id) |
151 for _, v := range roster { |
63 f() |
152 snapshot = append(snapshot, v) |
64 } |
|
65 iq, ok := stan.(*Iq) |
|
66 if iq.Type != "set" { |
|
67 continue |
|
68 } |
|
69 var rq *RosterQuery |
|
70 for _, ele := range iq.Nested { |
|
71 if q, ok := ele.(*RosterQuery); ok { |
|
72 rq = q |
|
73 break |
|
74 } |
|
75 } |
|
76 if rq == nil { |
|
77 continue |
|
78 } |
|
79 for _, item := range rq.Item { |
|
80 roster[item.Jid] = item |
|
81 } |
|
82 snapshot = []RosterItem{} |
|
83 for _, ri := range roster { |
|
84 snapshot = append(snapshot, ri) |
|
85 } |
|
86 case r.get <- snapshot: |
|
87 case cb := <- r.callbacks: |
|
88 waits[cb.id] = cb.cb |
153 } |
89 } |
154 } |
90 } |
155 } |
91 } |
156 |
92 |
157 // Retrieve a snapshot of the roster for the given Client. |
93 func (r *Roster) makeFilters() (Filter, Filter) { |
158 func Roster(client *Client) []RosterItem { |
94 rosterUpdate := make(chan Stanza) |
159 rosterChan := rosterClients[client.Uid].rosterChan |
95 go r.rosterMgr(rosterUpdate) |
160 return <-rosterChan |
96 recv := func(in <-chan Stanza, out chan<- Stanza) { |
|
97 defer close(out) |
|
98 for stan := range in { |
|
99 rosterUpdate <- stan |
|
100 out <- stan |
|
101 } |
|
102 } |
|
103 send := func(in <-chan Stanza, out chan<- Stanza) { |
|
104 defer close(out) |
|
105 for { |
|
106 select { |
|
107 case stan, ok := <- in: |
|
108 if !ok { |
|
109 return |
|
110 } |
|
111 out <- stan |
|
112 case stan := <- r.toServer: |
|
113 out <- stan |
|
114 } |
|
115 } |
|
116 } |
|
117 return recv, send |
161 } |
118 } |
|
119 |
|
120 func newRosterExt() *Roster { |
|
121 r := Roster{} |
|
122 r.StanzaHandlers = make(map[string]func(*xml.Name) interface{}) |
|
123 r.StanzaHandlers[NsRoster] = newRosterQuery |
|
124 r.RecvFilter, r.SendFilter = r.makeFilters() |
|
125 r.get = make(chan []RosterItem) |
|
126 r.callbacks = make(chan rosterCb) |
|
127 r.toServer = make(chan Stanza) |
|
128 return &r |
|
129 } |
|
130 |
|
131 // Return the most recent snapshot of the roster status. This is |
|
132 // updated automatically as roster updates are received from the |
|
133 // server, but especially in response to calls to Update(). |
|
134 func (r *Roster) Get() []RosterItem { |
|
135 return <-r.get |
|
136 } |
|
137 |
|
138 // Synchronously fetch this entity's roster from the server and cache |
|
139 // that information. The client can access the roster by watching for |
|
140 // RosterQuery objects or by calling Get(). |
|
141 func (r *Roster) Update(client *Client) { |
|
142 iq := &Iq{Header: Header{From: client.Jid.String(), Type: "get", |
|
143 Id: NextId(), Nested: []interface{}{RosterQuery{}}}} |
|
144 waitchan := make(chan int) |
|
145 done := func() { |
|
146 close(waitchan) |
|
147 } |
|
148 r.waitFor(iq.Id, done) |
|
149 r.toServer <- iq |
|
150 <-waitchan |
|
151 } |
|
152 |
|
153 func (r *Roster) waitFor(id string, cb func()) { |
|
154 r.callbacks <- rosterCb{id: id, cb: cb} |
|
155 } |