# HG changeset patch # User Chris Jones # Date 1325469621 25200 # Node ID abf958bcc201774e9cabb0bc5d744f8c02289bf7 # Parent dd6f5cc27d52510718b6fe947ad9da91e020b690 Added a stack of filters which can intercept data before it gets to the app. diff -r dd6f5cc27d52 -r abf958bcc201 stream.go --- a/stream.go Sun Jan 01 17:28:00 2012 -0700 +++ b/stream.go Sun Jan 01 19:00:21 2012 -0700 @@ -293,6 +293,55 @@ } } +// Stanzas from the remote go up through a stack of filters to the +// app. This function manages the filters. +func filter(srvIn2 <-chan Stanza, cliOut2 chan<- Stanza, + filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza) { + defer close(cliOut2) + var srvIn1, topFilterOut1, topFilterOut2 <-chan Stanza + var cliOut1, botFilterIn1, botFilterIn2 chan<- Stanza + ch := make(chan Stanza, 1) + topFilterOut2 = ch + botFilterIn2 = ch + topFilterOut1 = topFilterOut2 + srvIn1 = srvIn2 + var botItem Stanza + var topItem Stanza + var ok bool + for { + select { + case newFilterOut := <- filterOut: + filterIn <- topFilterOut2 + topFilterOut2 = newFilterOut + if topFilterOut1 != nil { + topFilterOut1 = topFilterOut2 + } + + case topItem, ok = <-topFilterOut1: + if !ok { + break + } + topFilterOut1 = nil + cliOut1 = cliOut2 + case cliOut1 <- topItem: + topFilterOut1 = topFilterOut2 + cliOut1 = nil + + case botItem, ok = <-srvIn1: + if !ok { + close(botFilterIn2) + srvIn1 = nil + continue + } + srvIn1 = nil + botFilterIn1 = botFilterIn2 + case botFilterIn1 <- botItem: + srvIn1 = srvIn2 + botFilterIn1 = nil + } + } +} + func handleStream(ss *stream) { } diff -r dd6f5cc27d52 -r abf958bcc201 xmpp.go --- a/xmpp.go Sun Jan 01 17:28:00 2012 -0700 +++ b/xmpp.go Sun Jan 01 19:00:21 2012 -0700 @@ -73,6 +73,8 @@ // StartSession() returns. Features *Features roster map[string] *RosterItem + filterOut chan<- <-chan Stanza + filterIn <-chan <-chan Stanza } var _ io.Closer = &Client{} @@ -138,9 +140,13 @@ // Start the XMPP stream handler which filters stream-level // events and responds to them. - clIn := cl.startStreamReader(xmlIn, cl.xmlOut) + stIn := cl.startStreamReader(xmlIn, cl.xmlOut) clOut := cl.startStreamWriter(cl.xmlOut) + // Start the manager for the filters that can modify what the + // app sees. + clIn := cl.startFilter(stIn) + // Initial handshake. hsOut := &stream{To: jid.Domain, Version: Version} cl.xmlOut <- hsOut @@ -189,6 +195,14 @@ return ch } +func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza { + cliOut := make(chan Stanza) + filterOut := make(chan (<-chan Stanza)) + filterIn := make(chan (<-chan Stanza)) + go filter(srvIn, cliOut, filterOut, filterIn) + return cliOut +} + func tee(r io.Reader, w io.Writer, prefix string) { defer tryClose(r, w) @@ -294,3 +308,13 @@ } return nil } + +// AddFilter adds a new filter to the top of the stack through which +// incoming stanzas travel on their way up to the client. The new +// filter's output channel is given to this function, and it returns a +// new input channel which the filter should read from. When its input +// channel closes, the filter should close its output channel. +func (cl *Client) AddFilter(out <-chan Stanza) <-chan Stanza { + cl.filterOut <- out + return <- cl.filterIn +}