Added a stack of filters which can intercept data before it gets to
the app.
--- a/stream.go Sun Jan 01 17:28:00 2012 -0700
+++ b/stream.go Sun Jan 01 19:00:21 2012 -0700
@@ -293,6 +293,55 @@
}
}
+// Stanzas from the remote go up through a stack of filters to the
+// app. This function manages the filters.
+func filter(srvIn2 <-chan Stanza, cliOut2 chan<- Stanza,
+ filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza) {
+ defer close(cliOut2)
+ var srvIn1, topFilterOut1, topFilterOut2 <-chan Stanza
+ var cliOut1, botFilterIn1, botFilterIn2 chan<- Stanza
+ ch := make(chan Stanza, 1)
+ topFilterOut2 = ch
+ botFilterIn2 = ch
+ topFilterOut1 = topFilterOut2
+ srvIn1 = srvIn2
+ var botItem Stanza
+ var topItem Stanza
+ var ok bool
+ for {
+ select {
+ case newFilterOut := <- filterOut:
+ filterIn <- topFilterOut2
+ topFilterOut2 = newFilterOut
+ if topFilterOut1 != nil {
+ topFilterOut1 = topFilterOut2
+ }
+
+ case topItem, ok = <-topFilterOut1:
+ if !ok {
+ break
+ }
+ topFilterOut1 = nil
+ cliOut1 = cliOut2
+ case cliOut1 <- topItem:
+ topFilterOut1 = topFilterOut2
+ cliOut1 = nil
+
+ case botItem, ok = <-srvIn1:
+ if !ok {
+ close(botFilterIn2)
+ srvIn1 = nil
+ continue
+ }
+ srvIn1 = nil
+ botFilterIn1 = botFilterIn2
+ case botFilterIn1 <- botItem:
+ srvIn1 = srvIn2
+ botFilterIn1 = nil
+ }
+ }
+}
+
func handleStream(ss *stream) {
}
--- a/xmpp.go Sun Jan 01 17:28:00 2012 -0700
+++ b/xmpp.go Sun Jan 01 19:00:21 2012 -0700
@@ -73,6 +73,8 @@
// StartSession() returns.
Features *Features
roster map[string] *RosterItem
+ filterOut chan<- <-chan Stanza
+ filterIn <-chan <-chan Stanza
}
var _ io.Closer = &Client{}
@@ -138,9 +140,13 @@
// Start the XMPP stream handler which filters stream-level
// events and responds to them.
- clIn := cl.startStreamReader(xmlIn, cl.xmlOut)
+ stIn := cl.startStreamReader(xmlIn, cl.xmlOut)
clOut := cl.startStreamWriter(cl.xmlOut)
+ // Start the manager for the filters that can modify what the
+ // app sees.
+ clIn := cl.startFilter(stIn)
+
// Initial handshake.
hsOut := &stream{To: jid.Domain, Version: Version}
cl.xmlOut <- hsOut
@@ -189,6 +195,14 @@
return ch
}
+func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza {
+ cliOut := make(chan Stanza)
+ filterOut := make(chan (<-chan Stanza))
+ filterIn := make(chan (<-chan Stanza))
+ go filter(srvIn, cliOut, filterOut, filterIn)
+ return cliOut
+}
+
func tee(r io.Reader, w io.Writer, prefix string) {
defer tryClose(r, w)
@@ -294,3 +308,13 @@
}
return nil
}
+
+// AddFilter adds a new filter to the top of the stack through which
+// incoming stanzas travel on their way up to the client. The new
+// filter's output channel is given to this function, and it returns a
+// new input channel which the filter should read from. When its input
+// channel closes, the filter should close its output channel.
+func (cl *Client) AddFilter(out <-chan Stanza) <-chan Stanza {
+ cl.filterOut <- out
+ return <- cl.filterIn
+}