Revamped how the roster works. We're now using a channel to transmit snapshots
of the roster, which is thread-safe. Also found and fixed a bug that was
preventing new filters from being sent more than one stanza ever.
--- a/examples/interact.go Thu Jan 05 19:53:37 2012 -0700
+++ b/examples/interact.go Thu Jan 05 23:14:14 2012 -0700
@@ -34,10 +34,10 @@
if err != nil {
log.Fatalf("StartSession: %v", err)
}
- roster := c.Roster()
+ roster := xmpp.Roster(c)
fmt.Printf("%d roster entries:\n", len(roster))
- for jid, entry := range(roster) {
- fmt.Printf("%s: %v\n", jid, entry)
+ for i, entry := range(roster) {
+ fmt.Printf("%d: %v\n", i, entry)
}
go func(ch <-chan xmpp.Stanza) {
--- a/roster.go Thu Jan 05 19:53:37 2012 -0700
+++ b/roster.go Thu Jan 05 23:14:14 2012 -0700
@@ -27,18 +27,31 @@
Group []string
}
+type rosterClient struct {
+ rosterChan <-chan []RosterItem
+ rosterUpdate chan<- RosterItem
+}
+
+var (
+ rosterClients = make(map[string] rosterClient)
+)
+
// Implicitly becomes part of NewClient's extStanza arg.
func newRosterQuery(name *xml.Name) interface{} {
return &RosterQuery{}
}
// Synchronously fetch this entity's roster from the server and cache
-// that information.
-func (cl *Client) fetchRoster() os.Error {
- iq := &Iq{From: cl.Jid.String(), Id: <- cl.Id, Type: "get",
+// that information. This is called once from a fairly deep call stack
+// as part of XMPP negotiation.
+func fetchRoster(client *Client) os.Error {
+ rosterUpdate := rosterClients[client.Uid].rosterUpdate
+
+ iq := &Iq{From: client.Jid.String(), Id: <- Id, Type: "get",
Nested: RosterQuery{}}
ch := make(chan os.Error)
f := func(st Stanza) bool {
+ defer close(ch)
if iq.Type == "error" {
ch <- iq.Error
return false
@@ -49,59 +62,74 @@
"Roster query result not query: %v", st))
return false
}
- cl.roster = make(map[string] *RosterItem, len(rq.Item))
- for i, item := range(rq.Item) {
- cl.roster[item.Jid] = &rq.Item[i]
+ for _, item := range(rq.Item) {
+ rosterUpdate <- item
}
ch <- nil
return false
}
- cl.HandleStanza(iq.Id, f)
- cl.Out <- iq
+ client.HandleStanza(iq.Id, f)
+ client.Out <- iq
// Wait for f to complete.
return <- ch
}
-// Returns the current roster of other entities which this one has a
-// relationship with. Changes to the roster will be signaled by an
-// appropriate Iq appearing on Client.In. See RFC 3921, Section 7.4.
-func (cl *Client) Roster() map[string] *RosterItem {
- r := make(map[string] *RosterItem)
- for key, val := range(cl.roster) {
- r[key] = val
- }
- return r
-}
-
// The roster filter updates the Client's representation of the
-// roster, but it lets the relevant stanzas through.
-func (cl *Client) startRosterFilter() {
+// roster, but it lets the relevant stanzas through. This also starts
+// the roster feeder, which is the goroutine that provides data on
+// client.Roster.
+func startRosterFilter(client *Client) {
out := make(chan Stanza)
- in := cl.AddFilter(out)
+ in := client.AddFilter(out)
go func(in <-chan Stanza, out chan<- Stanza) {
defer close(out)
for st := range(in) {
- cl.maybeUpdateRoster(st)
+ maybeUpdateRoster(client, st)
out <- st
}
}(in, out)
+
+ rosterCh := make(chan []RosterItem)
+ rosterUpdate := make(chan RosterItem)
+ rosterClients[client.Uid] = rosterClient{rosterChan: rosterCh,
+ rosterUpdate: rosterUpdate}
+ go feedRoster(rosterCh, rosterUpdate)
}
-// BUG(cjyar) This isn't getting updates.
-// BUG(cjyar) This isn't actually thread safe, though it's unlikely it
-// will fail in practice. Either the roster should be protected with a
-// mutex, or we should make the roster available on a channel instead
-// of via a method call.
// BUG(cjyar) RFC 3921, Section 7.4 says we need to reply.
-func (cl *Client) maybeUpdateRoster(st Stanza) {
+func maybeUpdateRoster(client *Client, st Stanza) {
+ rosterUpdate := rosterClients[client.Uid].rosterUpdate
+
rq, ok := st.GetNested().(*RosterQuery)
if st.GetName() == "iq" && st.GetType() == "set" && ok {
- for i, item := range(rq.Item) {
- if item.Subscription == "remove" {
- cl.roster[item.Jid] = nil
- } else {
- cl.roster[item.Jid] = &rq.Item[i]
- }
+ for _, item := range(rq.Item) {
+ rosterUpdate <- item
}
}
}
+
+func feedRoster(rosterCh chan<- []RosterItem, rosterUpdate <-chan RosterItem) {
+ roster := make(map[string] RosterItem)
+ snapshot := []RosterItem{}
+ for {
+ select {
+ case newIt := <-rosterUpdate:
+ if newIt.Subscription == "remove" {
+ roster[newIt.Jid] = RosterItem{}, false
+ } else {
+ roster[newIt.Jid] = newIt
+ }
+ case rosterCh <- snapshot:
+ }
+ snapshot = make([]RosterItem, 0, len(roster))
+ for _, v := range(roster) {
+ snapshot = append(snapshot, v)
+ }
+ }
+}
+
+// Retrieve a snapshot of the roster for the given Client.
+func Roster(client *Client) []RosterItem {
+ rosterChan := rosterClients[client.Uid].rosterChan
+ return <- rosterChan
+}
--- a/stream.go Thu Jan 05 19:53:37 2012 -0700
+++ b/stream.go Thu Jan 05 23:14:14 2012 -0700
@@ -309,6 +309,7 @@
continue
}
filterIn <- topFilter
+ topFilter = newFilterOut
case data, ok := <-topFilter:
if !ok {
@@ -593,7 +594,7 @@
if res != "" {
bindReq.Resource = &res
}
- msg := &Iq{Type: "set", Id: <- cl.Id, Nested: &bindReq}
+ msg := &Iq{Type: "set", Id: <- Id, Nested: &bindReq}
f := func(st Stanza) bool {
if st.GetType() == "error" {
log.Println("Resource binding failed")
--- a/xmpp.go Thu Jan 05 19:53:37 2012 -0700
+++ b/xmpp.go Thu Jan 05 23:14:14 2012 -0700
@@ -44,8 +44,30 @@
debug = false
)
+// This channel may be used as a convenient way to generate a unique
+// id for an iq, message, or presence stanza.
+var Id <-chan string
+
+func init() {
+ // Start the unique id generator.
+ idCh := make(chan string)
+ Id = idCh
+ go func(ch chan<- string) {
+ id := int64(1)
+ for {
+ str := fmt.Sprintf("id_%d", id)
+ ch <- str
+ id++
+ }
+ }(idCh)
+}
+
// The client in a client-server XMPP connection.
type Client struct {
+ // This client's unique ID. It's unique within the context of
+ // this process, so if multiple Client objects exist, each
+ // will be distinguishable by its Uid.
+ Uid string
// This client's JID. This will be updated asynchronously by
// the time StartSession() returns.
Jid JID
@@ -56,9 +78,6 @@
authDone bool
handlers chan *stanzaHandler
inputControl chan int
- // This channel may be used as a convenient way to generate a
- // unique id for an iq, message, or presence stanza.
- Id <-chan string
// Incoming XMPP stanzas from the server will be published on
// this channel. Information which is only used by this
// library to set up the XMPP stream will not appear here.
@@ -72,12 +91,15 @@
// connection process. It should not be updated once
// StartSession() returns.
Features *Features
- roster map[string] *RosterItem
filterOut chan<- <-chan Stanza
filterIn <-chan <-chan Stanza
}
var _ io.Closer = &Client{}
+// BUG(cjyar) Replace extStanza with a generalized extension interface
+// that handles starting filters, registering extended stanzes, and
+// anything else an extension has to do.
+
// Connect to the appropriate server and authenticate as the given JID
// with the given password. This function will return as soon as a TCP
// connection has been established, but before XMPP stream negotiation
@@ -114,13 +136,12 @@
}
cl := new(Client)
+ cl.Uid = <- Id
cl.password = password
cl.Jid = *jid
cl.socket = tcp
cl.handlers = make(chan *stanzaHandler, 100)
cl.inputControl = make(chan int)
- idCh := make(chan string)
- cl.Id = idCh
if extStanza == nil {
extStanza = make(map[string] func(*xml.Name) interface{})
@@ -128,9 +149,6 @@
extStanza[NsRoster] = newRosterQuery
extStanza[NsBind] = newBind
- // Start the unique id generator.
- go makeIds(idCh)
-
// Start the transport handler, initially unencrypted.
tlsr, tlsw := cl.startTransport()
@@ -146,7 +164,7 @@
// Start the manager for the filters that can modify what the
// app sees.
clIn := cl.startFilter(stIn)
- cl.startRosterFilter()
+ startRosterFilter(cl)
// Initial handshake.
hsOut := &stream{To: jid.Domain, Version: Version}
@@ -259,15 +277,6 @@
}
}
-func makeIds(ch chan<- string) {
- id := int64(1)
- for {
- str := fmt.Sprintf("id_%d", id)
- ch <- str
- id++
- }
-}
-
// bindDone is called when we've finished resource binding (and all
// the negotiations that precede it). Now we can start accepting
// traffic from the app.
@@ -281,7 +290,7 @@
// presence. The presence can be as simple as a newly-initialized
// Presence struct. See RFC 3921, Section 3.
func (cl *Client) StartSession(getRoster bool, pr *Presence) os.Error {
- id := <- cl.Id
+ id := <- Id
iq := &Iq{To: cl.Jid.Domain, Id: id, Type: "set", Any:
&Generic{XMLName: xml.Name{Space: NsSession, Local:
"session"}}}
@@ -303,7 +312,7 @@
return err
}
if getRoster {
- err := cl.fetchRoster()
+ err := fetchRoster(cl)
if err != nil {
return err
}