20 Subscription string `xml:"subscription,attr"` |
20 Subscription string `xml:"subscription,attr"` |
21 Name string `xml:"name,attr"` |
21 Name string `xml:"name,attr"` |
22 Group []string |
22 Group []string |
23 } |
23 } |
24 |
24 |
25 type rosterCb struct { |
|
26 id string |
|
27 cb func() |
|
28 } |
|
29 |
|
30 type Roster struct { |
25 type Roster struct { |
31 Extension |
26 Extension |
32 get chan []RosterItem |
27 get chan []RosterItem |
33 callbacks chan rosterCb |
|
34 toServer chan Stanza |
28 toServer chan Stanza |
35 } |
29 } |
36 |
30 |
37 type rosterClient struct { |
31 type rosterClient struct { |
38 rosterChan <-chan []RosterItem |
32 rosterChan <-chan []RosterItem |
39 rosterUpdate chan<- RosterItem |
33 rosterUpdate chan<- RosterItem |
40 } |
34 } |
41 |
35 |
42 func (r *Roster) rosterMgr(upd <-chan Stanza) { |
36 func (r *Roster) rosterMgr(upd <-chan Stanza) { |
43 roster := make(map[string]RosterItem) |
37 roster := make(map[string]RosterItem) |
44 waits := make(map[string]func()) |
|
45 var snapshot []RosterItem |
38 var snapshot []RosterItem |
|
39 var get chan<- []RosterItem |
46 for { |
40 for { |
47 select { |
41 select { |
|
42 case get <- snapshot: |
|
43 |
48 case stan, ok := <-upd: |
44 case stan, ok := <-upd: |
49 if !ok { |
45 if !ok { |
50 return |
46 return |
51 } |
|
52 hdr := stan.GetHeader() |
|
53 if f := waits[hdr.Id]; f != nil { |
|
54 delete(waits, hdr.Id) |
|
55 f() |
|
56 } |
47 } |
57 iq, ok := stan.(*Iq) |
48 iq, ok := stan.(*Iq) |
58 if !ok { |
49 if !ok { |
59 continue |
50 continue |
60 } |
51 } |
61 if iq.Type != "result" { |
52 if iq.Type != "result" && iq.Type != "set" { |
62 continue |
53 continue |
63 } |
54 } |
64 var rq *RosterQuery |
55 var rq *RosterQuery |
65 for _, ele := range iq.Nested { |
56 for _, ele := range iq.Nested { |
66 if q, ok := ele.(*RosterQuery); ok { |
57 if q, ok := ele.(*RosterQuery); ok { |
76 } |
67 } |
77 snapshot = []RosterItem{} |
68 snapshot = []RosterItem{} |
78 for _, ri := range roster { |
69 for _, ri := range roster { |
79 snapshot = append(snapshot, ri) |
70 snapshot = append(snapshot, ri) |
80 } |
71 } |
81 case r.get <- snapshot: |
72 get = r.get |
82 case cb := <-r.callbacks: |
|
83 waits[cb.id] = cb.cb |
|
84 } |
73 } |
85 } |
74 } |
86 } |
75 } |
87 |
76 |
88 func (r *Roster) makeFilters() (Filter, Filter) { |
77 func (r *Roster) makeFilters() (Filter, Filter) { |
117 r.StanzaHandlers = make(map[xml.Name]reflect.Type) |
106 r.StanzaHandlers = make(map[xml.Name]reflect.Type) |
118 rName := xml.Name{Space: NsRoster, Local: "query"} |
107 rName := xml.Name{Space: NsRoster, Local: "query"} |
119 r.StanzaHandlers[rName] = reflect.TypeOf(RosterQuery{}) |
108 r.StanzaHandlers[rName] = reflect.TypeOf(RosterQuery{}) |
120 r.RecvFilter, r.SendFilter = r.makeFilters() |
109 r.RecvFilter, r.SendFilter = r.makeFilters() |
121 r.get = make(chan []RosterItem) |
110 r.get = make(chan []RosterItem) |
122 r.callbacks = make(chan rosterCb) |
|
123 r.toServer = make(chan Stanza) |
111 r.toServer = make(chan Stanza) |
124 return &r |
112 return &r |
125 } |
113 } |
126 |
114 |
127 // Return the most recent snapshot of the roster status. This is |
115 // Return the most recent snapshot of the roster status. This is |
128 // updated automatically as roster updates are received from the |
116 // updated automatically as roster updates are received from the |
129 // server, but especially in response to calls to Update(). |
117 // server. This function may block immediately after the XMPP |
|
118 // connection has been established, until the first roster update is |
|
119 // received from the server. |
130 func (r *Roster) Get() []RosterItem { |
120 func (r *Roster) Get() []RosterItem { |
131 return <-r.get |
121 return <-r.get |
132 } |
122 } |
133 |
123 |
134 // Synchronously fetch this entity's roster from the server and cache |
124 // Asynchronously fetch this entity's roster from the server. |
135 // that information. The client can access the roster by watching for |
125 func (r *Roster) update() { |
136 // RosterQuery objects or by calling Get(). |
|
137 func (r *Roster) Update() { |
|
138 iq := &Iq{Header: Header{Type: "get", Id: NextId(), |
126 iq := &Iq{Header: Header{Type: "get", Id: NextId(), |
139 Nested: []interface{}{RosterQuery{}}}} |
127 Nested: []interface{}{RosterQuery{}}}} |
140 waitchan := make(chan int) |
|
141 done := func() { |
|
142 close(waitchan) |
|
143 } |
|
144 r.waitFor(iq.Id, done) |
|
145 r.toServer <- iq |
128 r.toServer <- iq |
146 <-waitchan |
|
147 } |
129 } |
148 |
|
149 func (r *Roster) waitFor(id string, cb func()) { |
|
150 r.callbacks <- rosterCb{id: id, cb: cb} |
|
151 } |
|