xmpp/roster.go
changeset 153 bbd4166df95d
parent 142 0ff033eed887
child 155 fdd637733628
equal deleted inserted replaced
152:69c5b4382e39 153:bbd4166df95d
    20 	Subscription string   `xml:"subscription,attr"`
    20 	Subscription string   `xml:"subscription,attr"`
    21 	Name         string   `xml:"name,attr"`
    21 	Name         string   `xml:"name,attr"`
    22 	Group        []string
    22 	Group        []string
    23 }
    23 }
    24 
    24 
    25 type rosterCb struct {
       
    26 	id string
       
    27 	cb func()
       
    28 }
       
    29 
       
    30 type Roster struct {
    25 type Roster struct {
    31 	Extension
    26 	Extension
    32 	get       chan []RosterItem
    27 	get       chan []RosterItem
    33 	callbacks chan rosterCb
       
    34 	toServer  chan Stanza
    28 	toServer  chan Stanza
    35 }
    29 }
    36 
    30 
    37 type rosterClient struct {
    31 type rosterClient struct {
    38 	rosterChan   <-chan []RosterItem
    32 	rosterChan   <-chan []RosterItem
    39 	rosterUpdate chan<- RosterItem
    33 	rosterUpdate chan<- RosterItem
    40 }
    34 }
    41 
    35 
    42 func (r *Roster) rosterMgr(upd <-chan Stanza) {
    36 func (r *Roster) rosterMgr(upd <-chan Stanza) {
    43 	roster := make(map[string]RosterItem)
    37 	roster := make(map[string]RosterItem)
    44 	waits := make(map[string]func())
       
    45 	var snapshot []RosterItem
    38 	var snapshot []RosterItem
       
    39 	var get chan<- []RosterItem
    46 	for {
    40 	for {
    47 		select {
    41 		select {
       
    42 		case get <- snapshot:
       
    43 
    48 		case stan, ok := <-upd:
    44 		case stan, ok := <-upd:
    49 			if !ok {
    45 			if !ok {
    50 				return
    46 				return
    51 			}
       
    52 			hdr := stan.GetHeader()
       
    53 			if f := waits[hdr.Id]; f != nil {
       
    54 				delete(waits, hdr.Id)
       
    55 				f()
       
    56 			}
    47 			}
    57 			iq, ok := stan.(*Iq)
    48 			iq, ok := stan.(*Iq)
    58 			if !ok {
    49 			if !ok {
    59 				continue
    50 				continue
    60 			}
    51 			}
    61 			if iq.Type != "result" {
    52 			if iq.Type != "result" && iq.Type != "set" {
    62 				continue
    53 				continue
    63 			}
    54 			}
    64 			var rq *RosterQuery
    55 			var rq *RosterQuery
    65 			for _, ele := range iq.Nested {
    56 			for _, ele := range iq.Nested {
    66 				if q, ok := ele.(*RosterQuery); ok {
    57 				if q, ok := ele.(*RosterQuery); ok {
    76 			}
    67 			}
    77 			snapshot = []RosterItem{}
    68 			snapshot = []RosterItem{}
    78 			for _, ri := range roster {
    69 			for _, ri := range roster {
    79 				snapshot = append(snapshot, ri)
    70 				snapshot = append(snapshot, ri)
    80 			}
    71 			}
    81 		case r.get <- snapshot:
    72 			get = r.get
    82 		case cb := <-r.callbacks:
       
    83 			waits[cb.id] = cb.cb
       
    84 		}
    73 		}
    85 	}
    74 	}
    86 }
    75 }
    87 
    76 
    88 func (r *Roster) makeFilters() (Filter, Filter) {
    77 func (r *Roster) makeFilters() (Filter, Filter) {
   117 	r.StanzaHandlers = make(map[xml.Name]reflect.Type)
   106 	r.StanzaHandlers = make(map[xml.Name]reflect.Type)
   118 	rName := xml.Name{Space: NsRoster, Local: "query"}
   107 	rName := xml.Name{Space: NsRoster, Local: "query"}
   119 	r.StanzaHandlers[rName] = reflect.TypeOf(RosterQuery{})
   108 	r.StanzaHandlers[rName] = reflect.TypeOf(RosterQuery{})
   120 	r.RecvFilter, r.SendFilter = r.makeFilters()
   109 	r.RecvFilter, r.SendFilter = r.makeFilters()
   121 	r.get = make(chan []RosterItem)
   110 	r.get = make(chan []RosterItem)
   122 	r.callbacks = make(chan rosterCb)
       
   123 	r.toServer = make(chan Stanza)
   111 	r.toServer = make(chan Stanza)
   124 	return &r
   112 	return &r
   125 }
   113 }
   126 
   114 
   127 // Return the most recent snapshot of the roster status. This is
   115 // Return the most recent snapshot of the roster status. This is
   128 // updated automatically as roster updates are received from the
   116 // updated automatically as roster updates are received from the
   129 // server, but especially in response to calls to Update().
   117 // server. This function may block immediately after the XMPP
       
   118 // connection has been established, until the first roster update is
       
   119 // received from the server.
   130 func (r *Roster) Get() []RosterItem {
   120 func (r *Roster) Get() []RosterItem {
   131 	return <-r.get
   121 	return <-r.get
   132 }
   122 }
   133 
   123 
   134 // Synchronously fetch this entity's roster from the server and cache
   124 // Asynchronously fetch this entity's roster from the server.
   135 // that information. The client can access the roster by watching for
   125 func (r *Roster) update() {
   136 // RosterQuery objects or by calling Get().
       
   137 func (r *Roster) Update() {
       
   138 	iq := &Iq{Header: Header{Type: "get", Id: NextId(),
   126 	iq := &Iq{Header: Header{Type: "get", Id: NextId(),
   139 		Nested: []interface{}{RosterQuery{}}}}
   127 		Nested: []interface{}{RosterQuery{}}}}
   140 	waitchan := make(chan int)
       
   141 	done := func() {
       
   142 		close(waitchan)
       
   143 	}
       
   144 	r.waitFor(iq.Id, done)
       
   145 	r.toServer <- iq
   128 	r.toServer <- iq
   146 	<-waitchan
       
   147 }
   129 }
   148 
       
   149 func (r *Roster) waitFor(id string, cb func()) {
       
   150 	r.callbacks <- rosterCb{id: id, cb: cb}
       
   151 }