--- a/xmpp.go Sat Aug 31 23:08:21 2013 +0100
+++ b/xmpp.go Mon Sep 02 20:38:02 2013 -0700
@@ -40,13 +40,24 @@
clientSrv = "xmpp-client"
)
+// A filter can modify the XMPP traffic to or from the remote
+// server. It's part of an Extension. The filter function will be
+// called in a new goroutine, so it doesn't need to return. The filter
+// should close its output when its input is closed.
+type Filter func(in <-chan Stanza, out chan<- Stanza)
+
// Extensions can add stanza filters and/or new XML element types.
type Extension struct {
// Maps from an XML namespace to a function which constructs a
// structure to hold the contents of stanzas in that
// namespace.
StanzaHandlers map[string]func(*xml.Name) interface{}
- Start func(*Client)
+ // If non-nil, will be called once to start the filter
+ // running. RecvFilter intercepts incoming messages on their
+ // way from the remote server to the application; SendFilter
+ // intercepts messages going the other direction.
+ RecvFilter Filter
+ SendFilter Filter
}
// Allows the user to override the TLS configuration.
@@ -76,13 +87,16 @@
// channel.
Out chan<- Stanza
xmlOut chan<- interface{}
+ // The client's roster is also known as the buddy list. It's
+ // the set of contacts which are known to this JID, or which
+ // this JID is known to.
+ Roster Roster
// Features advertised by the remote. This will be updated
// asynchronously as new features are received throughout the
// connection process. It should not be updated once
// StartSession() returns.
Features *Features
- filterOut chan<- <-chan Stanza
- filterIn <-chan <-chan Stanza
+ sendFilterAdd, recvFilterAdd chan Filter
}
// Connect to the appropriate server and authenticate as the given JID
@@ -93,7 +107,8 @@
// binding) is complete.
func NewClient(jid *JID, password string, exts []Extension) (*Client, error) {
// Include the mandatory extensions.
- exts = append(exts, rosterExt)
+ roster := newRosterExt()
+ exts = append(exts, roster.Extension)
exts = append(exts, bindExt)
// Resolve the domain in the JID.
@@ -123,6 +138,7 @@
}
cl := new(Client)
+ cl.Roster = *roster
cl.Uid = NextId()
cl.password = password
cl.Jid = *jid
@@ -138,27 +154,33 @@
}
// Start the transport handler, initially unencrypted.
- tlsr, tlsw := cl.startTransport()
-
- // Start the reader and writers that convert to and from XML.
- xmlIn := startXmlReader(tlsr, extStanza)
- cl.xmlOut = startXmlWriter(tlsw)
+ recvReader, recvWriter := io.Pipe()
+ sendReader, sendWriter := io.Pipe()
+ go cl.readTransport(recvWriter)
+ go cl.writeTransport(sendReader)
- // Start the XMPP stream handler which filters stream-level
- // events and responds to them.
- stIn := cl.startStreamReader(xmlIn, cl.xmlOut)
- clOut := cl.startStreamWriter(cl.xmlOut)
- cl.Out = clOut
+ // Start the reader and writer that convert to and from XML.
+ recvXml := make(chan interface{})
+ go readXml(recvReader, recvXml, extStanza)
+ sendXml := make(chan interface{})
+ cl.xmlOut = sendXml
+ go writeXml(sendWriter, sendXml)
+
+ // Start the reader and writer that convert between XML and
+ // XMPP stanzas.
+ recvRawXmpp := make(chan Stanza)
+ go cl.readStream(recvXml, recvRawXmpp)
+ sendRawXmpp := make(chan Stanza)
+ go writeStream(sendXml, sendRawXmpp, cl.inputControl)
// Start the manager for the filters that can modify what the
// app sees.
- clIn := cl.startFilter(stIn)
- cl.In = clIn
-
- // Add filters for our extensions.
- for _, ext := range exts {
- ext.Start(cl)
- }
+ recvFiltXmpp := make(chan Stanza)
+ cl.In = recvFiltXmpp
+ go filterMgr(cl.recvFilterAdd, recvRawXmpp, recvFiltXmpp)
+ sendFiltXmpp := make(chan Stanza)
+ cl.Out = sendFiltXmpp
+ go filterMgr(cl.sendFilterAdd, sendFiltXmpp, sendFiltXmpp)
// Initial handshake.
hsOut := &stream{To: jid.Domain, Version: XMPPVersion}
@@ -167,51 +189,6 @@
return cl, nil
}
-func (cl *Client) startTransport() (io.Reader, io.WriteCloser) {
- inr, inw := io.Pipe()
- outr, outw := io.Pipe()
- go cl.readTransport(inw)
- go cl.writeTransport(outr)
- return inr, outw
-}
-
-func startXmlReader(r io.Reader,
- extStanza map[string]func(*xml.Name) interface{}) <-chan interface{} {
- ch := make(chan interface{})
- go readXml(r, ch, extStanza)
- return ch
-}
-
-func startXmlWriter(w io.WriteCloser) chan<- interface{} {
- ch := make(chan interface{})
- go writeXml(w, ch)
- return ch
-}
-
-func (cl *Client) startStreamReader(xmlIn <-chan interface{}, srvOut chan<- interface{}) <-chan Stanza {
- ch := make(chan Stanza)
- go cl.readStream(xmlIn, ch)
- return ch
-}
-
-func (cl *Client) startStreamWriter(xmlOut chan<- interface{}) chan<- Stanza {
- ch := make(chan Stanza)
- go writeStream(xmlOut, ch, cl.inputControl)
- return ch
-}
-
-func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza {
- cliIn := make(chan Stanza)
- filterOut := make(chan (<-chan Stanza))
- filterIn := make(chan (<-chan Stanza))
- nullFilter := make(chan Stanza)
- go filterBottom(srvIn, nullFilter)
- go filterTop(filterOut, filterIn, nullFilter, cliIn)
- cl.filterOut = filterOut
- cl.filterIn = filterIn
- return cliIn
-}
-
func tee(r io.Reader, w io.Writer, prefix string) {
defer func(w io.Writer) {
if c, ok := w.(io.Closer); ok {
@@ -253,8 +230,9 @@
// immediately after creating the Client in order to start the
// session, retrieve the roster, and broadcast an initial
// presence. The presence can be as simple as a newly-initialized
-// Presence struct. See RFC 3921, Section 3.
-func (cl *Client) StartSession(getRoster bool, pr *Presence) error {
+// Presence struct. See RFC 3921, Section 3. After calling this, a
+// normal client will want to call Roster.Update().
+func (cl *Client) StartSession(pr *Presence) error {
id := NextId()
iq := &Iq{Header: Header{To: cl.Jid.Domain, Id: id, Type: "set",
Nested: []interface{}{Generic{XMLName: xml.Name{Space: NsSession, Local: "session"}}}}}
@@ -281,24 +259,8 @@
if err := <-ch; err != nil {
return err
}
- if getRoster {
- err := fetchRoster(cl)
- if err != nil {
- return err
- }
- }
if pr != nil {
cl.Out <- pr
}
return nil
}
-
-// AddFilter adds a new filter to the top of the stack through which
-// incoming stanzas travel on their way up to the client. The new
-// filter's output channel is given to this function, and it returns a
-// new input channel which the filter should read from. When its input
-// channel closes, the filter should close its output channel.
-func (cl *Client) AddFilter(out <-chan Stanza) <-chan Stanza {
- cl.filterOut <- out
- return <-cl.filterIn
-}