25 Subscription string `xml:"attr"` |
25 Subscription string `xml:"attr"` |
26 Name string `xml:"attr"` |
26 Name string `xml:"attr"` |
27 Group []string |
27 Group []string |
28 } |
28 } |
29 |
29 |
|
30 type rosterClient struct { |
|
31 rosterChan <-chan []RosterItem |
|
32 rosterUpdate chan<- RosterItem |
|
33 } |
|
34 |
|
35 var ( |
|
36 rosterClients = make(map[string] rosterClient) |
|
37 ) |
|
38 |
30 // Implicitly becomes part of NewClient's extStanza arg. |
39 // Implicitly becomes part of NewClient's extStanza arg. |
31 func newRosterQuery(name *xml.Name) interface{} { |
40 func newRosterQuery(name *xml.Name) interface{} { |
32 return &RosterQuery{} |
41 return &RosterQuery{} |
33 } |
42 } |
34 |
43 |
35 // Synchronously fetch this entity's roster from the server and cache |
44 // Synchronously fetch this entity's roster from the server and cache |
36 // that information. |
45 // that information. This is called once from a fairly deep call stack |
37 func (cl *Client) fetchRoster() os.Error { |
46 // as part of XMPP negotiation. |
38 iq := &Iq{From: cl.Jid.String(), Id: <- cl.Id, Type: "get", |
47 func fetchRoster(client *Client) os.Error { |
|
48 rosterUpdate := rosterClients[client.Uid].rosterUpdate |
|
49 |
|
50 iq := &Iq{From: client.Jid.String(), Id: <- Id, Type: "get", |
39 Nested: RosterQuery{}} |
51 Nested: RosterQuery{}} |
40 ch := make(chan os.Error) |
52 ch := make(chan os.Error) |
41 f := func(st Stanza) bool { |
53 f := func(st Stanza) bool { |
|
54 defer close(ch) |
42 if iq.Type == "error" { |
55 if iq.Type == "error" { |
43 ch <- iq.Error |
56 ch <- iq.Error |
44 return false |
57 return false |
45 } |
58 } |
46 rq, ok := st.GetNested().(*RosterQuery) |
59 rq, ok := st.GetNested().(*RosterQuery) |
47 if !ok { |
60 if !ok { |
48 ch <- os.NewError(fmt.Sprintf( |
61 ch <- os.NewError(fmt.Sprintf( |
49 "Roster query result not query: %v", st)) |
62 "Roster query result not query: %v", st)) |
50 return false |
63 return false |
51 } |
64 } |
52 cl.roster = make(map[string] *RosterItem, len(rq.Item)) |
65 for _, item := range(rq.Item) { |
53 for i, item := range(rq.Item) { |
66 rosterUpdate <- item |
54 cl.roster[item.Jid] = &rq.Item[i] |
|
55 } |
67 } |
56 ch <- nil |
68 ch <- nil |
57 return false |
69 return false |
58 } |
70 } |
59 cl.HandleStanza(iq.Id, f) |
71 client.HandleStanza(iq.Id, f) |
60 cl.Out <- iq |
72 client.Out <- iq |
61 // Wait for f to complete. |
73 // Wait for f to complete. |
62 return <- ch |
74 return <- ch |
63 } |
75 } |
64 |
76 |
65 // Returns the current roster of other entities which this one has a |
|
66 // relationship with. Changes to the roster will be signaled by an |
|
67 // appropriate Iq appearing on Client.In. See RFC 3921, Section 7.4. |
|
68 func (cl *Client) Roster() map[string] *RosterItem { |
|
69 r := make(map[string] *RosterItem) |
|
70 for key, val := range(cl.roster) { |
|
71 r[key] = val |
|
72 } |
|
73 return r |
|
74 } |
|
75 |
|
76 // The roster filter updates the Client's representation of the |
77 // The roster filter updates the Client's representation of the |
77 // roster, but it lets the relevant stanzas through. |
78 // roster, but it lets the relevant stanzas through. This also starts |
78 func (cl *Client) startRosterFilter() { |
79 // the roster feeder, which is the goroutine that provides data on |
|
80 // client.Roster. |
|
81 func startRosterFilter(client *Client) { |
79 out := make(chan Stanza) |
82 out := make(chan Stanza) |
80 in := cl.AddFilter(out) |
83 in := client.AddFilter(out) |
81 go func(in <-chan Stanza, out chan<- Stanza) { |
84 go func(in <-chan Stanza, out chan<- Stanza) { |
82 defer close(out) |
85 defer close(out) |
83 for st := range(in) { |
86 for st := range(in) { |
84 cl.maybeUpdateRoster(st) |
87 maybeUpdateRoster(client, st) |
85 out <- st |
88 out <- st |
86 } |
89 } |
87 }(in, out) |
90 }(in, out) |
|
91 |
|
92 rosterCh := make(chan []RosterItem) |
|
93 rosterUpdate := make(chan RosterItem) |
|
94 rosterClients[client.Uid] = rosterClient{rosterChan: rosterCh, |
|
95 rosterUpdate: rosterUpdate} |
|
96 go feedRoster(rosterCh, rosterUpdate) |
88 } |
97 } |
89 |
98 |
90 // BUG(cjyar) This isn't getting updates. |
|
91 // BUG(cjyar) This isn't actually thread safe, though it's unlikely it |
|
92 // will fail in practice. Either the roster should be protected with a |
|
93 // mutex, or we should make the roster available on a channel instead |
|
94 // of via a method call. |
|
95 // BUG(cjyar) RFC 3921, Section 7.4 says we need to reply. |
99 // BUG(cjyar) RFC 3921, Section 7.4 says we need to reply. |
96 func (cl *Client) maybeUpdateRoster(st Stanza) { |
100 func maybeUpdateRoster(client *Client, st Stanza) { |
|
101 rosterUpdate := rosterClients[client.Uid].rosterUpdate |
|
102 |
97 rq, ok := st.GetNested().(*RosterQuery) |
103 rq, ok := st.GetNested().(*RosterQuery) |
98 if st.GetName() == "iq" && st.GetType() == "set" && ok { |
104 if st.GetName() == "iq" && st.GetType() == "set" && ok { |
99 for i, item := range(rq.Item) { |
105 for _, item := range(rq.Item) { |
100 if item.Subscription == "remove" { |
106 rosterUpdate <- item |
101 cl.roster[item.Jid] = nil |
|
102 } else { |
|
103 cl.roster[item.Jid] = &rq.Item[i] |
|
104 } |
|
105 } |
107 } |
106 } |
108 } |
107 } |
109 } |
|
110 |
|
111 func feedRoster(rosterCh chan<- []RosterItem, rosterUpdate <-chan RosterItem) { |
|
112 roster := make(map[string] RosterItem) |
|
113 snapshot := []RosterItem{} |
|
114 for { |
|
115 select { |
|
116 case newIt := <-rosterUpdate: |
|
117 if newIt.Subscription == "remove" { |
|
118 roster[newIt.Jid] = RosterItem{}, false |
|
119 } else { |
|
120 roster[newIt.Jid] = newIt |
|
121 } |
|
122 case rosterCh <- snapshot: |
|
123 } |
|
124 snapshot = make([]RosterItem, 0, len(roster)) |
|
125 for _, v := range(roster) { |
|
126 snapshot = append(snapshot, v) |
|
127 } |
|
128 } |
|
129 } |
|
130 |
|
131 // Retrieve a snapshot of the roster for the given Client. |
|
132 func Roster(client *Client) []RosterItem { |
|
133 rosterChan := rosterClients[client.Uid].rosterChan |
|
134 return <- rosterChan |
|
135 } |