# HG changeset patch # User Chris Jones # Date 1325830454 25200 # Node ID e6cb3f0491373f3a54fbd21ce908697e0abcd554 # Parent 74686b8c9146b5a72fb59496f383399ec47b3b90 Revamped how the roster works. We're now using a channel to transmit snapshots of the roster, which is thread-safe. Also found and fixed a bug that was preventing new filters from being sent more than one stanza ever. diff -r 74686b8c9146 -r e6cb3f049137 examples/interact.go --- a/examples/interact.go Thu Jan 05 19:53:37 2012 -0700 +++ b/examples/interact.go Thu Jan 05 23:14:14 2012 -0700 @@ -34,10 +34,10 @@ if err != nil { log.Fatalf("StartSession: %v", err) } - roster := c.Roster() + roster := xmpp.Roster(c) fmt.Printf("%d roster entries:\n", len(roster)) - for jid, entry := range(roster) { - fmt.Printf("%s: %v\n", jid, entry) + for i, entry := range(roster) { + fmt.Printf("%d: %v\n", i, entry) } go func(ch <-chan xmpp.Stanza) { diff -r 74686b8c9146 -r e6cb3f049137 roster.go --- a/roster.go Thu Jan 05 19:53:37 2012 -0700 +++ b/roster.go Thu Jan 05 23:14:14 2012 -0700 @@ -27,18 +27,31 @@ Group []string } +type rosterClient struct { + rosterChan <-chan []RosterItem + rosterUpdate chan<- RosterItem +} + +var ( + rosterClients = make(map[string] rosterClient) +) + // Implicitly becomes part of NewClient's extStanza arg. func newRosterQuery(name *xml.Name) interface{} { return &RosterQuery{} } // Synchronously fetch this entity's roster from the server and cache -// that information. -func (cl *Client) fetchRoster() os.Error { - iq := &Iq{From: cl.Jid.String(), Id: <- cl.Id, Type: "get", +// that information. This is called once from a fairly deep call stack +// as part of XMPP negotiation. +func fetchRoster(client *Client) os.Error { + rosterUpdate := rosterClients[client.Uid].rosterUpdate + + iq := &Iq{From: client.Jid.String(), Id: <- Id, Type: "get", Nested: RosterQuery{}} ch := make(chan os.Error) f := func(st Stanza) bool { + defer close(ch) if iq.Type == "error" { ch <- iq.Error return false @@ -49,59 +62,74 @@ "Roster query result not query: %v", st)) return false } - cl.roster = make(map[string] *RosterItem, len(rq.Item)) - for i, item := range(rq.Item) { - cl.roster[item.Jid] = &rq.Item[i] + for _, item := range(rq.Item) { + rosterUpdate <- item } ch <- nil return false } - cl.HandleStanza(iq.Id, f) - cl.Out <- iq + client.HandleStanza(iq.Id, f) + client.Out <- iq // Wait for f to complete. return <- ch } -// Returns the current roster of other entities which this one has a -// relationship with. Changes to the roster will be signaled by an -// appropriate Iq appearing on Client.In. See RFC 3921, Section 7.4. -func (cl *Client) Roster() map[string] *RosterItem { - r := make(map[string] *RosterItem) - for key, val := range(cl.roster) { - r[key] = val - } - return r -} - // The roster filter updates the Client's representation of the -// roster, but it lets the relevant stanzas through. -func (cl *Client) startRosterFilter() { +// roster, but it lets the relevant stanzas through. This also starts +// the roster feeder, which is the goroutine that provides data on +// client.Roster. +func startRosterFilter(client *Client) { out := make(chan Stanza) - in := cl.AddFilter(out) + in := client.AddFilter(out) go func(in <-chan Stanza, out chan<- Stanza) { defer close(out) for st := range(in) { - cl.maybeUpdateRoster(st) + maybeUpdateRoster(client, st) out <- st } }(in, out) + + rosterCh := make(chan []RosterItem) + rosterUpdate := make(chan RosterItem) + rosterClients[client.Uid] = rosterClient{rosterChan: rosterCh, + rosterUpdate: rosterUpdate} + go feedRoster(rosterCh, rosterUpdate) } -// BUG(cjyar) This isn't getting updates. -// BUG(cjyar) This isn't actually thread safe, though it's unlikely it -// will fail in practice. Either the roster should be protected with a -// mutex, or we should make the roster available on a channel instead -// of via a method call. // BUG(cjyar) RFC 3921, Section 7.4 says we need to reply. -func (cl *Client) maybeUpdateRoster(st Stanza) { +func maybeUpdateRoster(client *Client, st Stanza) { + rosterUpdate := rosterClients[client.Uid].rosterUpdate + rq, ok := st.GetNested().(*RosterQuery) if st.GetName() == "iq" && st.GetType() == "set" && ok { - for i, item := range(rq.Item) { - if item.Subscription == "remove" { - cl.roster[item.Jid] = nil - } else { - cl.roster[item.Jid] = &rq.Item[i] - } + for _, item := range(rq.Item) { + rosterUpdate <- item } } } + +func feedRoster(rosterCh chan<- []RosterItem, rosterUpdate <-chan RosterItem) { + roster := make(map[string] RosterItem) + snapshot := []RosterItem{} + for { + select { + case newIt := <-rosterUpdate: + if newIt.Subscription == "remove" { + roster[newIt.Jid] = RosterItem{}, false + } else { + roster[newIt.Jid] = newIt + } + case rosterCh <- snapshot: + } + snapshot = make([]RosterItem, 0, len(roster)) + for _, v := range(roster) { + snapshot = append(snapshot, v) + } + } +} + +// Retrieve a snapshot of the roster for the given Client. +func Roster(client *Client) []RosterItem { + rosterChan := rosterClients[client.Uid].rosterChan + return <- rosterChan +} diff -r 74686b8c9146 -r e6cb3f049137 stream.go --- a/stream.go Thu Jan 05 19:53:37 2012 -0700 +++ b/stream.go Thu Jan 05 23:14:14 2012 -0700 @@ -309,6 +309,7 @@ continue } filterIn <- topFilter + topFilter = newFilterOut case data, ok := <-topFilter: if !ok { @@ -593,7 +594,7 @@ if res != "" { bindReq.Resource = &res } - msg := &Iq{Type: "set", Id: <- cl.Id, Nested: &bindReq} + msg := &Iq{Type: "set", Id: <- Id, Nested: &bindReq} f := func(st Stanza) bool { if st.GetType() == "error" { log.Println("Resource binding failed") diff -r 74686b8c9146 -r e6cb3f049137 xmpp.go --- a/xmpp.go Thu Jan 05 19:53:37 2012 -0700 +++ b/xmpp.go Thu Jan 05 23:14:14 2012 -0700 @@ -44,8 +44,30 @@ debug = false ) +// This channel may be used as a convenient way to generate a unique +// id for an iq, message, or presence stanza. +var Id <-chan string + +func init() { + // Start the unique id generator. + idCh := make(chan string) + Id = idCh + go func(ch chan<- string) { + id := int64(1) + for { + str := fmt.Sprintf("id_%d", id) + ch <- str + id++ + } + }(idCh) +} + // The client in a client-server XMPP connection. type Client struct { + // This client's unique ID. It's unique within the context of + // this process, so if multiple Client objects exist, each + // will be distinguishable by its Uid. + Uid string // This client's JID. This will be updated asynchronously by // the time StartSession() returns. Jid JID @@ -56,9 +78,6 @@ authDone bool handlers chan *stanzaHandler inputControl chan int - // This channel may be used as a convenient way to generate a - // unique id for an iq, message, or presence stanza. - Id <-chan string // Incoming XMPP stanzas from the server will be published on // this channel. Information which is only used by this // library to set up the XMPP stream will not appear here. @@ -72,12 +91,15 @@ // connection process. It should not be updated once // StartSession() returns. Features *Features - roster map[string] *RosterItem filterOut chan<- <-chan Stanza filterIn <-chan <-chan Stanza } var _ io.Closer = &Client{} +// BUG(cjyar) Replace extStanza with a generalized extension interface +// that handles starting filters, registering extended stanzes, and +// anything else an extension has to do. + // Connect to the appropriate server and authenticate as the given JID // with the given password. This function will return as soon as a TCP // connection has been established, but before XMPP stream negotiation @@ -114,13 +136,12 @@ } cl := new(Client) + cl.Uid = <- Id cl.password = password cl.Jid = *jid cl.socket = tcp cl.handlers = make(chan *stanzaHandler, 100) cl.inputControl = make(chan int) - idCh := make(chan string) - cl.Id = idCh if extStanza == nil { extStanza = make(map[string] func(*xml.Name) interface{}) @@ -128,9 +149,6 @@ extStanza[NsRoster] = newRosterQuery extStanza[NsBind] = newBind - // Start the unique id generator. - go makeIds(idCh) - // Start the transport handler, initially unencrypted. tlsr, tlsw := cl.startTransport() @@ -146,7 +164,7 @@ // Start the manager for the filters that can modify what the // app sees. clIn := cl.startFilter(stIn) - cl.startRosterFilter() + startRosterFilter(cl) // Initial handshake. hsOut := &stream{To: jid.Domain, Version: Version} @@ -259,15 +277,6 @@ } } -func makeIds(ch chan<- string) { - id := int64(1) - for { - str := fmt.Sprintf("id_%d", id) - ch <- str - id++ - } -} - // bindDone is called when we've finished resource binding (and all // the negotiations that precede it). Now we can start accepting // traffic from the app. @@ -281,7 +290,7 @@ // presence. The presence can be as simple as a newly-initialized // Presence struct. See RFC 3921, Section 3. func (cl *Client) StartSession(getRoster bool, pr *Presence) os.Error { - id := <- cl.Id + id := <- Id iq := &Iq{To: cl.Jid.Domain, Id: id, Type: "set", Any: &Generic{XMLName: xml.Name{Space: NsSession, Local: "session"}}} @@ -303,7 +312,7 @@ return err } if getRoster { - err := cl.fetchRoster() + err := fetchRoster(cl) if err != nil { return err }