xmpp.go
author Chris Jones <chris@cjones.org>
Thu, 29 Dec 2011 11:02:21 -0700
changeset 29 a456133ed0ac
parent 28 78961db80bae
child 30 a77fc342e013
permissions -rw-r--r--
Don't accept data on Client.Out until resource binding is complete. StartSession() won't do its work until after this happens. That means the app can call StartSession() and wait for it to return before checking Client.Jid.

// Copyright 2011 The Go Authors.  All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// This package implements a simple XMPP client according to RFCs 3920
// and 3921, plus the various XEPs at http://xmpp.org/protocols/.
package xmpp

// BUG(cjyar) Figure out why the library doesn't exit when the server
// closes its stream to us.

import (
	"bytes"
	"fmt"
	"io"
	"log"
	"net"
	"os"
	"sync"
	"xml"
)

const (
	// Version of RFC 3920 that we implement.
	Version = "1.0"

	// Various XML namespaces.
	nsStreams = "urn:ietf:params:xml:ns:xmpp-streams"
	nsStream = "http://etherx.jabber.org/streams"
	nsTLS = "urn:ietf:params:xml:ns:xmpp-tls"
	nsSASL = "urn:ietf:params:xml:ns:xmpp-sasl"
	nsBind = "urn:ietf:params:xml:ns:xmpp-bind"
	nsSession = "urn:ietf:params:xml:ns:xmpp-session"

	// DNS SRV names
	serverSrv = "xmpp-server"
	clientSrv = "xmpp-client"

	// BUG(cjyar) Make this a parameter to NewClient, not a
	// constant. We should have both a log level and a
	// syslog.Writer, so the app can control how much time we
	// spend generating log messages, as well as where they go.
	debug = true
)

// The client in a client-server XMPP connection.
type Client struct {
	// This client's JID. This will be updated asynchronously by
	// the time StartSession() returns.
	Jid JID
	password string
	socket net.Conn
	socketSync sync.WaitGroup
	saslExpected string
	authDone bool
	idMutex sync.Mutex
	nextId int64
	handlers chan *stanzaHandler
	inputControl chan int
	// Incoming XMPP stanzas from the server will be published on
	// this channel. Information which is only used by this
	// library to set up the XMPP stream will not appear here.
	In <-chan Stanza
	// Outgoing XMPP stanzas to the server should be sent to this
	// channel.
	Out chan<- Stanza
	xmlOut chan<- interface{}
}
var _ io.Closer = &Client{}

// Connect to the appropriate server and authenticate as the given JID
// with the given password. This function will return as soon as a TCP
// connection has been established, but before XMPP stream negotiation
// has completed. The negotiation will occur asynchronously, and any
// send operation to Client.Out will block until negotiation (resource
// binding) is complete.
func NewClient(jid *JID, password string) (*Client, os.Error) {
	// Resolve the domain in the JID.
	_, srvs, err := net.LookupSRV(clientSrv, "tcp", jid.Domain)
	if err != nil {
		return nil, os.NewError("LookupSrv " + jid.Domain +
			": " + err.String())
	}

	var tcp *net.TCPConn
	for _, srv := range srvs {
		addrStr := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
		addr, err := net.ResolveTCPAddr("tcp", addrStr)
		if err != nil {
			err = os.NewError(fmt.Sprintf("ResolveTCPAddr(%s): %s",
				addrStr, err.String()))
			continue
		}
		tcp, err = net.DialTCP("tcp", nil, addr)
		if err != nil {
			err = os.NewError(fmt.Sprintf("DialTCP(%s): %s",
				addr, err.String()))
			continue
		}
	}
	if tcp == nil {
		return nil, err
	}

	cl := new(Client)
	cl.password = password
	cl.Jid = *jid
	cl.socket = tcp
	cl.handlers = make(chan *stanzaHandler, 1)
	cl.inputControl = make(chan int)

	// Start the transport handler, initially unencrypted.
	tlsr, tlsw := cl.startTransport()

	// Start the reader and writers that convert to and from XML.
	xmlIn := startXmlReader(tlsr)
	cl.xmlOut = startXmlWriter(tlsw)

	// Start the XMPP stream handler which filters stream-level
	// events and responds to them.
	clIn := cl.startStreamReader(xmlIn, cl.xmlOut)
	clOut := cl.startStreamWriter(cl.xmlOut)

	// Initial handshake.
	hsOut := &stream{To: jid.Domain, Version: Version}
	cl.xmlOut <- hsOut

	cl.In = clIn
	cl.Out = clOut

	return cl, nil
}

func (c *Client) Close() os.Error {
	tryClose(c.In, c.Out)
	return nil
}

func (cl *Client) startTransport() (io.Reader, io.Writer) {
	inr, inw := io.Pipe()
	outr, outw := io.Pipe()
	go cl.readTransport(inw)
	go cl.writeTransport(outr)
	return inr, outw
}

func startXmlReader(r io.Reader) <-chan interface{} {
	ch := make(chan interface{})
	go readXml(r, ch)
	return ch
}

func startXmlWriter(w io.Writer) chan<- interface{} {
	ch := make(chan interface{})
	go writeXml(w, ch)
	return ch
}

func (cl *Client) startStreamReader(xmlIn <-chan interface{}, srvOut chan<- interface{}) <-chan Stanza {
	ch := make(chan Stanza)
	go cl.readStream(xmlIn, ch)
	return ch
}

func (cl *Client) startStreamWriter(xmlOut chan<- interface{}) chan<- Stanza {
	ch := make(chan Stanza)
	go writeStream(xmlOut, ch, cl.inputControl)
	return ch
}

func tee(r io.Reader, w io.Writer, prefix string) {
	defer tryClose(r, w)

	buf := bytes.NewBuffer(nil)
	for {
		var c [1]byte
		n, _ := r.Read(c[:])
		if n == 0 {
			break
		}
		n, _ = w.Write(c[:n])
		if n == 0 {
			break
		}
		buf.Write(c[:n])
		if c[0] == '\n' || c[0] == '>' {
			fmt.Printf("%s%s\n", prefix, buf.String())
			buf.Reset()
		}
	}
	leftover := buf.String()
	if leftover != "" {
		fmt.Printf("%s%s\n", prefix, leftover)
	}
}

func tryClose(xs ...interface{}) {
	f1 := func(ch chan<- interface{}) {
		defer func() {
			recover()
		}()
		close(ch)
	}
	f2 := func(ch <-chan interface{}) {
		defer func() {
			recover()
		}()
		close(ch)
	}

	for _, x := range xs {
		if c, ok := x.(io.Closer) ; ok {
			c.Close()
		} else if ch, ok := x.(chan<- interface{}) ; ok {
			f1(ch)
		} else if ch, ok := x.(<-chan interface{}) ; ok {
			f2(ch)
		}
	}
}

// This convenience function may be used to generate a unique id for
// use in the Id fields of iq, message, and presence stanzas.
// BUG(cjyar) This should be replaced with a goroutine that feeds a
// channel.
func (cl *Client) NextId() string {
	cl.idMutex.Lock()
	defer cl.idMutex.Unlock()
	id := cl.nextId
	cl.nextId++
	return fmt.Sprintf("id_%d", id)
}

// bindDone is called when we've finished resource binding (and all
// the negotiations that precede it). Now we can start accepting
// traffic from the app.
func (cl *Client) bindDone() {
	cl.inputControl <- 1
}

// Start an XMPP session. This should typically be done immediately
// after creating the new Client. Once the session has been
// established, pr will be sent as an initial presence; nil means
// don't send initial presence. The initial presence can be a
// newly-initialized Presence struct. See RFC 3921, Section 3.
func (cl *Client) StartSession(pr *Presence) os.Error {
	id := cl.NextId()
	iq := &Iq{To: cl.Jid.Domain, Id: id, Type: "set", Any:
		&Generic{XMLName: xml.Name{Space: nsSession, Local:
				"session"}}}
	ch := make(chan os.Error)
	f := func(st Stanza) bool {
		if st.XType() == "error" {
			log.Printf("Can't start session: %v", st)
			ch <- st.XError()
			return false
		}
		if pr != nil {
			cl.Out <- pr
		}
		ch <- nil
		return false
	}
	cl.HandleStanza(id, f)
	cl.Out <- iq
	// Now wait until the callback is called.
	return <-ch
}