Updates to BUG comments.
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// This package implements a simple XMPP client according to RFCs 3920
// and 3921, plus the various XEPs at http://xmpp.org/protocols/.
package xmpp
import (
"bytes"
"crypto/tls"
"encoding/xml"
"errors"
"fmt"
"io"
"log"
"log/syslog"
"net"
"sync"
)
const (
// Version of RFC 3920 that we implement.
Version = "1.0"
// Various XML namespaces.
NsStreams = "urn:ietf:params:xml:ns:xmpp-streams"
NsStream = "http://etherx.jabber.org/streams"
NsTLS = "urn:ietf:params:xml:ns:xmpp-tls"
NsSASL = "urn:ietf:params:xml:ns:xmpp-sasl"
NsBind = "urn:ietf:params:xml:ns:xmpp-bind"
NsSession = "urn:ietf:params:xml:ns:xmpp-session"
NsRoster = "jabber:iq:roster"
// DNS SRV names
serverSrv = "xmpp-server"
clientSrv = "xmpp-client"
)
var (
// If non-nil when NewClient() is called, log messages will be
// sent to this writer.
Log *log.Logger
// Threshold for which messages are logged.
Loglevel syslog.Priority = syslog.LOG_NOTICE
// Use this Config to negotiate TLS connections.
TLSConfig *tls.Config = nil
)
// This channel may be used as a convenient way to generate a unique
// id for an iq, message, or presence stanza.
var Id <-chan string
func init() {
// Start the unique id generator.
idCh := make(chan string)
Id = idCh
go func(ch chan<- string) {
id := int64(1)
for {
str := fmt.Sprintf("id_%d", id)
ch <- str
id++
}
}(idCh)
}
// Extensions can add stanza filters and/or new XML element types.
type Extension struct {
StanzaHandlers map[string]func(*xml.Name) interface{}
Start func(*Client)
}
// The client in a client-server XMPP connection.
type Client struct {
// This client's unique ID. It's unique within the context of
// this process, so if multiple Client objects exist, each
// will be distinguishable by its Uid.
Uid string
// This client's JID. This will be updated asynchronously by
// the time StartSession() returns.
Jid JID
password string
socket net.Conn
socketSync sync.WaitGroup
saslExpected string
authDone bool
handlers chan *stanzaHandler
inputControl chan int
// Incoming XMPP stanzas from the server will be published on
// this channel. Information which is only used by this
// library to set up the XMPP stream will not appear here.
In <-chan Stanza
// Outgoing XMPP stanzas to the server should be sent to this
// channel.
Out chan<- Stanza
xmlOut chan<- interface{}
// Features advertised by the remote. This will be updated
// asynchronously as new features are received throughout the
// connection process. It should not be updated once
// StartSession() returns.
Features *Features
filterOut chan<- <-chan Stanza
filterIn <-chan <-chan Stanza
}
// Connect to the appropriate server and authenticate as the given JID
// with the given password. This function will return as soon as a TCP
// connection has been established, but before XMPP stream negotiation
// has completed. The negotiation will occur asynchronously, and any
// send operation to Client.Out will block until negotiation (resource
// binding) is complete.
func NewClient(jid *JID, password string, exts []Extension) (*Client,
error) {
// Include the mandatory extensions.
exts = append(exts, rosterExt)
exts = append(exts, bindExt)
// Resolve the domain in the JID.
_, srvs, err := net.LookupSRV(clientSrv, "tcp", jid.Domain)
if err != nil {
return nil, errors.New("LookupSrv " + jid.Domain +
": " + err.Error())
}
var tcp *net.TCPConn
for _, srv := range srvs {
addrStr := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
addr, err := net.ResolveTCPAddr("tcp", addrStr)
if err != nil {
err = errors.New(fmt.Sprintf("ResolveTCPAddr(%s): %s",
addrStr, err.Error()))
continue
}
tcp, err = net.DialTCP("tcp", nil, addr)
if err != nil {
err = errors.New(fmt.Sprintf("DialTCP(%s): %s",
addr, err.Error()))
continue
}
}
if tcp == nil {
return nil, err
}
cl := new(Client)
cl.Uid = <-Id
cl.password = password
cl.Jid = *jid
cl.socket = tcp
cl.handlers = make(chan *stanzaHandler, 100)
cl.inputControl = make(chan int)
extStanza := make(map[string]func(*xml.Name) interface{})
for _, ext := range exts {
for k, v := range ext.StanzaHandlers {
extStanza[k] = v
}
}
// Start the transport handler, initially unencrypted.
tlsr, tlsw := cl.startTransport()
// Start the reader and writers that convert to and from XML.
xmlIn := startXmlReader(tlsr, extStanza)
cl.xmlOut = startXmlWriter(tlsw)
// Start the XMPP stream handler which filters stream-level
// events and responds to them.
stIn := cl.startStreamReader(xmlIn, cl.xmlOut)
clOut := cl.startStreamWriter(cl.xmlOut)
cl.Out = clOut
// Start the manager for the filters that can modify what the
// app sees.
clIn := cl.startFilter(stIn)
cl.In = clIn
// Add filters for our extensions.
for _, ext := range exts {
ext.Start(cl)
}
// Initial handshake.
cl.xmlOut <- openStream(jid)
return cl, nil
}
func (cl *Client) startTransport() (io.Reader, io.WriteCloser) {
inr, inw := io.Pipe()
outr, outw := io.Pipe()
go cl.readTransport(inw)
go cl.writeTransport(outr)
return inr, outw
}
func startXmlReader(r io.Reader,
extStanza map[string]func(*xml.Name) interface{}) <-chan interface{} {
ch := make(chan interface{})
go readXml(r, ch, extStanza)
return ch
}
func startXmlWriter(w io.WriteCloser) chan<- interface{} {
ch := make(chan interface{})
go writeXml(w, ch)
return ch
}
func (cl *Client) startStreamReader(xmlIn <-chan interface{}, srvOut chan<- interface{}) <-chan Stanza {
ch := make(chan Stanza)
go cl.readStream(xmlIn, ch)
return ch
}
func (cl *Client) startStreamWriter(xmlOut chan<- interface{}) chan<- Stanza {
ch := make(chan Stanza)
go writeStream(xmlOut, ch, cl.inputControl)
return ch
}
func (cl *Client) startFilter(srvIn <-chan Stanza) <-chan Stanza {
cliIn := make(chan Stanza)
filterOut := make(chan (<-chan Stanza))
filterIn := make(chan (<-chan Stanza))
nullFilter := make(chan Stanza)
go filterBottom(srvIn, nullFilter)
go filterTop(filterOut, filterIn, nullFilter, cliIn)
cl.filterOut = filterOut
cl.filterIn = filterIn
return cliIn
}
func tee(r io.Reader, w io.Writer, prefix string) {
defer func(w io.Writer) {
if c, ok := w.(io.Closer); ok {
c.Close()
}
}(w)
buf := bytes.NewBuffer([]uint8(prefix))
for {
var c [1]byte
n, _ := r.Read(c[:])
if n == 0 {
break
}
n, _ = w.Write(c[:n])
if n == 0 {
break
}
buf.Write(c[:n])
if c[0] == '\n' || c[0] == '>' {
Log.Println(buf.String())
buf.Reset()
}
}
leftover := buf.String()
if leftover != "" {
Log.Println(buf.String())
}
}
// bindDone is called when we've finished resource binding (and all
// the negotiations that precede it). Now we can start accepting
// traffic from the app.
func (cl *Client) bindDone() {
cl.inputControl <- 1
}
// Start an XMPP session. A typical XMPP client should call this
// immediately after creating the Client in order to start the
// session, retrieve the roster, and broadcast an initial
// presence. The presence can be as simple as a newly-initialized
// Presence struct. See RFC 3921, Section 3.
func (cl *Client) StartSession(getRoster bool, pr *Presence) error {
id := <-Id
iq := &Iq{To: cl.Jid.Domain, Id: id, Type: "set", Nested: []interface{}{Generic{XMLName: xml.Name{Space: NsSession, Local: "session"}}}}
ch := make(chan error)
f := func(st Stanza) bool {
if st.GetType() == "error" {
if Log != nil {
Log.Printf("Can't start session: %v",
st)
}
ch <- st.GetError()
return false
}
ch <- nil
return false
}
cl.HandleStanza(id, f)
cl.Out <- iq
// Now wait until the callback is called.
if err := <-ch; err != nil {
return err
}
if getRoster {
err := fetchRoster(cl)
if err != nil {
return err
}
}
if pr != nil {
cl.Out <- pr
}
return nil
}
// AddFilter adds a new filter to the top of the stack through which
// incoming stanzas travel on their way up to the client. The new
// filter's output channel is given to this function, and it returns a
// new input channel which the filter should read from. When its input
// channel closes, the filter should close its output channel.
func (cl *Client) AddFilter(out <-chan Stanza) <-chan Stanza {
cl.filterOut <- out
return <-cl.filterIn
}