Changed the way filters work. They're now symmetrical, consisting of a paired send filter and receive filter.
--- a/TODO.txt Sat Aug 31 23:08:21 2013 +0100
+++ b/TODO.txt Mon Sep 02 20:38:02 2013 -0700
@@ -1,10 +1,6 @@
Extension.StanzaHandlers should use reflection, not constructor
functions.
-Rather than use Client.AddFilter(), and Extension.Start(), we should
-have a function in Extension that, if non-nil, accepts a stanza and
-returns a slice of stanzas.
-
Review all these *Client receiver methods. They should probably either
all be receivers, or none.
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/filter.go Mon Sep 02 20:38:02 2013 -0700
@@ -0,0 +1,64 @@
+package xmpp
+
+// Manages the stack of filters that can read and modify stanzas on
+// their way from the remote to the application.
+
+// Receive new filters on filterAdd; those new filters get added to
+// the top of the stack. Receive stanzas at the bottom of the stack on
+// input. Send stanzas out the top of the stack on output.
+func filterMgr(filterAdd <-chan Filter, input <-chan Stanza, output chan<- Stanza) {
+ botFiltIn := output
+ topFiltOut := input
+
+loop:
+ for {
+ select {
+ case stan, ok := <-input:
+ if !ok {
+ break loop
+ }
+ botFiltIn <- stan
+
+ case stan, ok := <-topFiltOut:
+ if !ok {
+ break loop
+ }
+ output <- stan
+
+ case filt := <-filterAdd:
+ newTop := make(chan Stanza)
+ go filt(topFiltOut, newTop)
+ topFiltOut = newTop
+ }
+ }
+ close(botFiltIn)
+}
+
+// Starts the filter chain. Filters will all interpose themselves
+// between srvIn and cliOut.
+func (cl *Client) startFilters(srvIn, cliIn <-chan Stanza) (<-chan Stanza, <-chan Stanza) {
+ cliOut := make(chan Stanza)
+ srvOut := make(chan Stanza)
+ go filterMgr(cl.sendFilterAdd, srvIn, cliOut)
+ go filterMgr(cl.recvFilterAdd, cliIn, srvOut)
+ return cliOut, srvOut
+}
+
+// AddRecvFilter adds a new filter to the top of the stack through which
+// incoming stanzas travel on their way up to the client.
+func (cl *Client) AddRecvFilter(filt Filter) {
+ if filt == nil {
+ return
+ }
+ cl.recvFilterAdd <- filt
+}
+
+// AddSendFilter adds a new filter to the top of the stack through
+// which outgoing stanzas travel on their way down from the client to
+// the network.
+func (cl *Client) AddSendFilter(filt Filter) {
+ if filt == nil {
+ return
+ }
+ cl.sendFilterAdd <- filt
+}
--- a/roster.go Sat Aug 31 23:08:21 2013 +0100
+++ b/roster.go Mon Sep 02 20:38:02 2013 -0700
@@ -4,15 +4,12 @@
package xmpp
+// This file contains support for roster management, RFC 3921, Section 7.
+
import (
"encoding/xml"
- "fmt"
)
-// This file contains support for roster management, RFC 3921, Section 7.
-
-var rosterExt Extension = Extension{StanzaHandlers: map[string]func(*xml.Name) interface{}{NsRoster: newRosterQuery}, Start: startRosterFilter}
-
// Roster query/result
type RosterQuery struct {
XMLName xml.Name `xml:"jabber:iq:roster query"`
@@ -28,134 +25,131 @@
Group []string
}
+type rosterCb struct {
+ id string
+ cb func()
+}
+
+type Roster struct {
+ Extension
+ get chan []RosterItem
+ callbacks chan rosterCb
+ toServer chan Stanza
+}
+
type rosterClient struct {
rosterChan <-chan []RosterItem
rosterUpdate chan<- RosterItem
}
-var (
- rosterClients = make(map[string]rosterClient)
-)
-
// Implicitly becomes part of NewClient's extStanza arg.
func newRosterQuery(name *xml.Name) interface{} {
return &RosterQuery{}
}
-// Synchronously fetch this entity's roster from the server and cache
-// that information. This is called once from a fairly deep call stack
-// as part of XMPP negotiation.
-func fetchRoster(client *Client) error {
- rosterUpdate := rosterClients[client.Uid].rosterUpdate
-
- iq := &Iq{Header: Header{From: client.Jid.String(), Type: "get",
- Id: NextId(), Nested: []interface{}{RosterQuery{}}}}
- ch := make(chan error)
- f := func(v Stanza) bool {
- defer close(ch)
- iq, ok := v.(*Iq)
- if !ok {
- ch <- fmt.Errorf("response to iq wasn't iq: %s", v)
- return false
- }
- if iq.Type == "error" {
- ch <- iq.Error
- return false
- }
- var rq *RosterQuery
- for _, ele := range iq.Nested {
- if q, ok := ele.(*RosterQuery); ok {
- rq = q
- break
- }
- }
- if rq == nil {
- ch <- fmt.Errorf(
- "Roster query result not query: %v", v)
- return false
- }
- for _, item := range rq.Item {
- rosterUpdate <- item
- }
- ch <- nil
- return false
- }
- client.HandleStanza(iq.Id, f)
- client.Out <- iq
- // Wait for f to complete.
- return <-ch
-}
-
-// The roster filter updates the Client's representation of the
-// roster, but it lets the relevant stanzas through. This also starts
-// the roster feeder, which is the goroutine that provides data on
-// client.Roster.
-func startRosterFilter(client *Client) {
- out := make(chan Stanza)
- in := client.AddFilter(out)
- go func(in <-chan Stanza, out chan<- Stanza) {
- defer close(out)
- for st := range in {
- maybeUpdateRoster(client, st)
- out <- st
- }
- }(in, out)
-
- rosterCh := make(chan []RosterItem)
- rosterUpdate := make(chan RosterItem)
- rosterClients[client.Uid] = rosterClient{rosterChan: rosterCh,
- rosterUpdate: rosterUpdate}
- go feedRoster(rosterCh, rosterUpdate)
-}
-
-func maybeUpdateRoster(client *Client, st interface{}) {
- iq, ok := st.(*Iq)
- if !ok {
- return
- }
-
- rosterUpdate := rosterClients[client.Uid].rosterUpdate
-
- var rq *RosterQuery
- for _, ele := range iq.Nested {
- if q, ok := ele.(*RosterQuery); ok {
- rq = q
- break
- }
- }
- if iq.Type == "set" && rq != nil {
- for _, item := range rq.Item {
- rosterUpdate <- item
- }
- // Send a reply.
- reply := &Iq{Header: Header{To: iq.From, Id: iq.Id,
- Type: "result"}}
- client.Out <- reply
- }
-}
-
-func feedRoster(rosterCh chan<- []RosterItem, rosterUpdate <-chan RosterItem) {
+func (r *Roster) rosterMgr(upd <-chan Stanza) {
roster := make(map[string]RosterItem)
- snapshot := []RosterItem{}
+ waits := make(map[string]func())
+ var snapshot []RosterItem
for {
select {
- case newIt := <-rosterUpdate:
- if newIt.Subscription == "remove" {
- delete(roster, newIt.Jid)
- } else {
- roster[newIt.Jid] = newIt
+ case stan, ok := <- upd:
+ if !ok {
+ return
+ }
+ hdr := stan.GetHeader()
+ if f := waits[hdr.Id] ; f != nil {
+ delete(waits, hdr.Id)
+ f()
+ }
+ iq, ok := stan.(*Iq)
+ if iq.Type != "set" {
+ continue
}
- case rosterCh <- snapshot:
- }
- snapshot = make([]RosterItem, 0, len(roster))
- for _, v := range roster {
- snapshot = append(snapshot, v)
+ var rq *RosterQuery
+ for _, ele := range iq.Nested {
+ if q, ok := ele.(*RosterQuery); ok {
+ rq = q
+ break
+ }
+ }
+ if rq == nil {
+ continue
+ }
+ for _, item := range rq.Item {
+ roster[item.Jid] = item
+ }
+ snapshot = []RosterItem{}
+ for _, ri := range roster {
+ snapshot = append(snapshot, ri)
+ }
+ case r.get <- snapshot:
+ case cb := <- r.callbacks:
+ waits[cb.id] = cb.cb
}
}
}
-// Retrieve a snapshot of the roster for the given Client.
-func Roster(client *Client) []RosterItem {
- rosterChan := rosterClients[client.Uid].rosterChan
- return <-rosterChan
+func (r *Roster) makeFilters() (Filter, Filter) {
+ rosterUpdate := make(chan Stanza)
+ go r.rosterMgr(rosterUpdate)
+ recv := func(in <-chan Stanza, out chan<- Stanza) {
+ defer close(out)
+ for stan := range in {
+ rosterUpdate <- stan
+ out <- stan
+ }
+ }
+ send := func(in <-chan Stanza, out chan<- Stanza) {
+ defer close(out)
+ for {
+ select {
+ case stan, ok := <- in:
+ if !ok {
+ return
+ }
+ out <- stan
+ case stan := <- r.toServer:
+ out <- stan
+ }
+ }
+ }
+ return recv, send
}
+
+func newRosterExt() *Roster {
+ r := Roster{}
+ r.StanzaHandlers = make(map[string]func(*xml.Name) interface{})
+ r.StanzaHandlers[NsRoster] = newRosterQuery
+ r.RecvFilter, r.SendFilter = r.makeFilters()
+ r.get = make(chan []RosterItem)
+ r.callbacks = make(chan rosterCb)
+ r.toServer = make(chan Stanza)
+ return &r
+}
+
+// Return the most recent snapshot of the roster status. This is
+// updated automatically as roster updates are received from the
+// server, but especially in response to calls to Update().
+func (r *Roster) Get() []RosterItem {
+ return <-r.get
+}
+
+// Synchronously fetch this entity's roster from the server and cache
+// that information. The client can access the roster by watching for
+// RosterQuery objects or by calling Get().
+func (r *Roster) Update(client *Client) {
+ iq := &Iq{Header: Header{From: client.Jid.String(), Type: "get",
+ Id: NextId(), Nested: []interface{}{RosterQuery{}}}}
+ waitchan := make(chan int)
+ done := func() {
+ close(waitchan)
+ }
+ r.waitFor(iq.Id, done)
+ r.toServer <- iq
+ <-waitchan
+}
+
+func (r *Roster) waitFor(id string, cb func()) {
+ r.callbacks <- rosterCb{id: id, cb: cb}
+}
--- a/stream.go Sat Aug 31 23:08:21 2013 +0100
+++ b/stream.go Mon Sep 02 20:38:02 2013 -0700
@@ -300,39 +300,6 @@
}
}
-// Stanzas from the remote go up through a stack of filters to the
-// app. This function manages the filters.
-func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza,
- topFilter <-chan Stanza, app chan<- Stanza) {
- defer close(app)
-Loop:
- for {
- select {
- case newFilterOut := <-filterOut:
- if newFilterOut == nil {
- Warn.Log("Received nil filter")
- filterIn <- nil
- continue
- }
- filterIn <- topFilter
- topFilter = newFilterOut
-
- case data, ok := <-topFilter:
- if !ok {
- break Loop
- }
- app <- data
- }
- }
-}
-
-func filterBottom(from <-chan Stanza, to chan<- Stanza) {
- defer close(to)
- for data := range from {
- to <- data
- }
-}
-
func handleStream(ss *stream) {
}
--- a/structs.go Sat Aug 31 23:08:21 2013 +0100
+++ b/structs.go Mon Sep 02 20:38:02 2013 -0700
@@ -269,8 +269,7 @@
return string(buf)
}
-var bindExt Extension = Extension{StanzaHandlers: map[string]func(*xml.Name) interface{}{NsBind: newBind},
- Start: func(cl *Client) {}}
+var bindExt Extension = Extension{StanzaHandlers: map[string]func(*xml.Name) interface{}{NsBind: newBind}}
func newBind(name *xml.Name) interface{} {
return &bindIq{}
--- a/xmpp.go Sat Aug 31 23:08:21 2013 +0100
+++ b/xmpp.go Mon Sep 02 20:38:02 2013 -0700
@@ -40,13 +40,24 @@
clientSrv = "xmpp-client"
)
+// A filter can modify the XMPP traffic to or from the remote
+// server. It's part of an Extension. The filter function will be
+// called in a new goroutine, so it doesn't need to return. The filter
+// should close its output when its input is closed.
+type Filter func(in <-chan Stanza, out chan<- Stanza)
+
// Extensions can add stanza filters and/or new XML element types.
type Extension struct {
// Maps from an XML namespace to a function which constructs a
// structure to hold the contents of stanzas in that
// namespace.
StanzaHandlers map[string]func(*xml.Name) interface{}
- Start func(*Client)
+ // If non-nil, will be called once to start the filter
+ // running. RecvFilter intercepts incoming messages on their
+ // way from the remote server to the application; SendFilter
+ // intercepts messages going the other direction.
+ RecvFilter Filter
+ SendFilter Filter
}
// Allows the user to override the TLS configuration.
@@ -76,13 +87,16 @@
// channel.
Out chan<- Stanza
xmlOut chan<- interface{}
+ // The client's roster is also known as the buddy list. It's
+ // the set of contacts which are known to this JID, or which
+ // this JID is known to.
+ Roster Roster
// Features advertised by the remote. This will be updated
// asynchronously as new features are received throughout the
// connection process. It should not be updated once
// StartSession() returns.
Features *Features
- filterOut chan<- <-chan Stanza
- filterIn <-chan <-chan Stanza
+ sendFilterAdd, recvFilterAdd chan Filter
}
// Connect to the appropriate server and authenticate as the given JID
@@ -93,7 +107,8 @@
// binding) is complete.
func NewClient(jid *JID, password string, exts []Extension) (*Client, error) {
// Include the mandatory extensions.
- exts = append(exts, rosterExt)
+ roster := newRosterExt()
+ exts = append(exts, roster.Extension)
exts = append(exts, bindExt)
// Resolve the domain in the JID.
@@ -123,6 +138,7 @@
}
cl := new(Client)
+ cl.Roster = *roster
cl.Uid = NextId()
cl.password = password
cl.Jid = *jid
@@ -138,27 +154,33 @@
}
// Start the transport handler, initially unencrypted.
- tlsr, tlsw := cl.startTransport()
-
- // Start the reader and writers that convert to and from XML.
- xmlIn := startXmlReader(tlsr, extStanza)
- cl.xmlOut = startXmlWriter(tlsw)
+ recvReader, recvWriter := io.Pipe()
+ sendReader, sendWriter := io.Pipe()
+ go cl.readTransport(recvWriter)
+ go cl.writeTransport(sendReader)
- // Start the XMPP stream handler which filters stream-level
- // events and responds to them.
- stIn := cl.startStreamReader(xmlIn, cl.xmlOut)
- clOut := cl.startStreamWriter(cl.xmlOut)
- cl.Out = clOut
+ // Start the reader and writer that convert to and from XML.
+ recvXml := make(chan interface{})
+ go readXml(recvReader, recvXml, extStanza)
+ sendXml := make(chan interface{})
+ cl.xmlOut = sendXml
+ go writeXml(sendWriter, sendXml)
+
+ // Start the reader and writer that convert between XML and
+ // XMPP stanzas.
+ recvRawXmpp := make(chan Stanza)
+ go cl.readStream(recvXml, recvRawXmpp)
+ sendRawXmpp := make(chan Stanza)
+ go writeStream(sendXml, sendRawXmpp, cl.inputControl)
// Start the manager for the filters that can modify what the
// app sees.
- clIn := cl.startFilter(stIn)
- cl.In = clIn
-
- // Add filters for our extensions.
- for _, ext := range exts {
- ext.Start(cl)
- }
+ recvFiltXmpp := make(chan Stanza)
+ cl.In = recvFiltXmpp
+ go filterMgr(cl.recvFilterAdd, recvRawXmpp, recvFiltXmpp)
+ sendFiltXmpp := make(chan Stanza)
+ cl.Out = sendFiltXmpp
+ go filterMgr(cl.sendFilterAdd, sendFiltXmpp, sendFiltXmpp)
// Initial handshake.
hsOut := &stream{To: jid.Domain, Version: XMPPVersion}
@@ -167,51 +189,6 @@
return cl, nil
}
-func (cl *Client) startTransport() (io.Reader, io.WriteCloser) {
- inr, inw := io.Pipe()
- outr, outw := io.Pipe()
- go cl.readTransport(inw)
- go cl.writeTransport(outr)
- return inr, outw
-}
-
-func startXmlReader(r io.Reader,
- extStanza map[string]func(*xml.Name) interface{}) <-chan interface{} {
- ch := make(chan interface{})
- go readXml(r, ch, extStanza)
- return ch
-}
-
-func startXmlWriter(w io.WriteCloser) chan<- interface{} {
- ch := make(chan interface{})
- go writeXml(w, ch)
- return ch
-}
-
-func (cl *Client) startStreamReader(xmlIn <-chan interface{}, srvOut chan<- interface{}) <-chan Stanza {
- ch := make(chan Stanza)
- go cl.readStream(xmlIn, ch)
- return ch
-}
-
-func (cl *Client) startStreamWriter(xmlOut chan<- interface{}) chan<- Stanza {
- ch := make(chan Stanza)
- go writeStream(xmlOut, ch, cl.inputControl)
- return ch
-}
-
-func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza {
- cliIn := make(chan Stanza)
- filterOut := make(chan (<-chan Stanza))
- filterIn := make(chan (<-chan Stanza))
- nullFilter := make(chan Stanza)
- go filterBottom(srvIn, nullFilter)
- go filterTop(filterOut, filterIn, nullFilter, cliIn)
- cl.filterOut = filterOut
- cl.filterIn = filterIn
- return cliIn
-}
-
func tee(r io.Reader, w io.Writer, prefix string) {
defer func(w io.Writer) {
if c, ok := w.(io.Closer); ok {
@@ -253,8 +230,9 @@
// immediately after creating the Client in order to start the
// session, retrieve the roster, and broadcast an initial
// presence. The presence can be as simple as a newly-initialized
-// Presence struct. See RFC 3921, Section 3.
-func (cl *Client) StartSession(getRoster bool, pr *Presence) error {
+// Presence struct. See RFC 3921, Section 3. After calling this, a
+// normal client will want to call Roster.Update().
+func (cl *Client) StartSession(pr *Presence) error {
id := NextId()
iq := &Iq{Header: Header{To: cl.Jid.Domain, Id: id, Type: "set",
Nested: []interface{}{Generic{XMLName: xml.Name{Space: NsSession, Local: "session"}}}}}
@@ -281,24 +259,8 @@
if err := <-ch; err != nil {
return err
}
- if getRoster {
- err := fetchRoster(cl)
- if err != nil {
- return err
- }
- }
if pr != nil {
cl.Out <- pr
}
return nil
}
-
-// AddFilter adds a new filter to the top of the stack through which
-// incoming stanzas travel on their way up to the client. The new
-// filter's output channel is given to this function, and it returns a
-// new input channel which the filter should read from. When its input
-// channel closes, the filter should close its output channel.
-func (cl *Client) AddFilter(out <-chan Stanza) <-chan Stanza {
- cl.filterOut <- out
- return <-cl.filterIn
-}