Trying to be smarter about closing channels: Ask the application not to close the Send channel, but let us do that from Close() instead.
// This package implements a simple XMPP client according to RFCs 3920// and 3921, plus the various XEPs at http://xmpp.org/protocols/. The// implementation is structured as a stack of layers, with TCP at the// bottom and the application at the top. The application receives and// sends structures representing XMPP stanzas. Additional stanza// parsers can be inserted into the stack of layers as extensions.packagexmppimport("crypto/tls""encoding/xml""fmt""io""net""reflect""sync")const(// Version of RFC 3920 that we implement.XMPPVersion="1.0"// Various XML namespaces.NsClient="jabber:client"NsStreams="urn:ietf:params:xml:ns:xmpp-streams"NsStream="http://etherx.jabber.org/streams"NsTLS="urn:ietf:params:xml:ns:xmpp-tls"NsSASL="urn:ietf:params:xml:ns:xmpp-sasl"NsBind="urn:ietf:params:xml:ns:xmpp-bind"NsSession="urn:ietf:params:xml:ns:xmpp-session"NsRoster="jabber:iq:roster"// DNS SRV namesserverSrv="xmpp-server"clientSrv="xmpp-client")// A filter can modify the XMPP traffic to or from the remote// server. It's part of an Extension. The filter function will be// called in a new goroutine, so it doesn't need to return. The filter// should close its output when its input is closed.typeFilterfunc(in<-chanStanza,outchan<-Stanza)// Extensions can add stanza filters and/or new XML element types.typeExtensionstruct{// Maps from an XML name to a structure which holds stanza// contents with that name.StanzaTypesmap[xml.Name]reflect.Type// If non-nil, will be called once to start the filter// running. RecvFilter intercepts incoming messages on their// way from the remote server to the application; SendFilter// intercepts messages going the other direction.RecvFilterFilterSendFilterFilter}// The client in a client-server XMPP connection.typeClientstruct{// This client's full JID, including resourceJidJIDpasswordstringsaslExpectedstringauthDoneboolhandlerschan*callback// Incoming XMPP stanzas from the remote will be published on// this channel. Information which is used by this library to// set up the XMPP stream will not appear here.Recv<-chanStanza// Outgoing XMPP stanzas to the server should be sent to this// channel. The application should not close this channel;// rather, call Close().Sendchan<-StanzasendRawchan<-interface{}statmgr*statmgr// The client's roster is also known as the buddy list. It's// the set of contacts which are known to this JID, or which// this JID is known to.RosterRoster// Features advertised by the remote.Features*FeaturessendFilterAdd,recvFilterAddchanFiltertlsConfigtls.Configlayer1*layer1errorchanerrorshutdownOncesync.Once}// Creates an XMPP client identified by the given JID, authenticating// with the provided password and TLS config. Zero or more extensions// may be specified. The initial presence will be broadcast. If status// is non-nil, connection progress information will be sent on it.funcNewClient(jid*JID,passwordstring,tlsconftls.Config,exts[]Extension,prPresence,statuschan<-Status)(*Client,error){// Include the mandatory extensions.roster:=newRosterExt()exts=append(exts,roster.Extension)exts=append(exts,bindExt)cl:=new(Client)cl.Roster=*rostercl.password=passwordcl.Jid=*jidcl.handlers=make(chan*callback,100)cl.tlsConfig=tlsconfcl.sendFilterAdd=make(chanFilter)cl.recvFilterAdd=make(chanFilter)cl.statmgr=newStatmgr(status)cl.error=make(chanerror,1)extStanza:=make(map[xml.Name]reflect.Type)for_,ext:=rangeexts{fork,v:=rangeext.StanzaTypes{if_,ok:=extStanza[k];ok{returnnil,fmt.Errorf("duplicate handler %s",k)}extStanza[k]=v}}// Resolve the domain in the JID._,srvs,err:=net.LookupSRV(clientSrv,"tcp",jid.Domain)iferr!=nil{returnnil,fmt.Errorf("LookupSrv %s: %v",jid.Domain,err)}iflen(srvs)==0{returnnil,fmt.Errorf("LookupSrv %s: no results",jid.Domain)}vartcp*net.TCPConnfor_,srv:=rangesrvs{addrStr:=fmt.Sprintf("%s:%d",srv.Target,srv.Port)varaddr*net.TCPAddraddr,err=net.ResolveTCPAddr("tcp",addrStr)iferr!=nil{err=fmt.Errorf("ResolveTCPAddr(%s): %s",addrStr,err.Error())continue}tcp,err=net.DialTCP("tcp",nil,addr)iftcp!=nil{break}}iftcp==nil{returnnil,err}cl.setStatus(StatusConnected)// Start the transport handler, initially unencrypted.recvReader,recvWriter:=io.Pipe()sendReader,sendWriter:=io.Pipe()cl.layer1=cl.startLayer1(tcp,recvWriter,sendReader,cl.statmgr.newListener())// Start the reader and writer that convert to and from XML.recvXmlCh:=make(chaninterface{})gocl.recvXml(recvReader,recvXmlCh,extStanza)sendXmlCh:=make(chaninterface{})cl.sendRaw=sendXmlChgocl.sendXml(sendWriter,sendXmlCh)// Start the reader and writer that convert between XML and// XMPP stanzas.recvRawXmpp:=make(chanStanza)gocl.recvStream(recvXmlCh,recvRawXmpp,cl.statmgr.newListener())sendRawXmpp:=make(chanStanza)gosendStream(sendXmlCh,sendRawXmpp,cl.statmgr.newListener())// Start the managers for the filters that can modify what the// app sees or sends.recvFiltXmpp:=make(chanStanza)cl.Recv=recvFiltXmppgofilterMgr(cl.recvFilterAdd,recvRawXmpp,recvFiltXmpp)sendFiltXmpp:=make(chanStanza)cl.Send=sendFiltXmppgofilterMgr(cl.sendFilterAdd,sendFiltXmpp,sendRawXmpp)// Set up the initial filters.for_,ext:=rangeexts{cl.AddRecvFilter(ext.RecvFilter)cl.AddSendFilter(ext.SendFilter)}// Initial handshake.hsOut:=&stream{To:jid.Domain,Version:XMPPVersion}cl.sendRaw<-hsOut// Wait until resource binding is complete.iferr:=cl.statmgr.awaitStatus(StatusBound);err!=nil{returnnil,cl.getError(err)}// Forget about the password, for paranoia's sake.cl.password=""// Initialize the session.id:=NextId()iq:=&Iq{Header:Header{To:cl.Jid.Domain,Id:id,Type:"set",Nested:[]interface{}{Generic{XMLName:xml.Name{Space:NsSession,Local:"session"}}}}}ch:=make(chanerror)f:=func(stStanza){iq,ok:=st.(*Iq)if!ok{ch<-fmt.Errorf("bad session start reply: %#v",st)}ifiq.Type=="error"{ch<-fmt.Errorf("Can't start session: %v",iq.Error)}ch<-nil}cl.SetCallback(id,f)cl.sendRaw<-iq// Now wait until the callback is called.iferr:=<-ch;err!=nil{returnnil,cl.getError(err)}// This allows the client to receive stanzas.cl.setStatus(StatusRunning)// Request the roster.cl.Roster.update()// Send the initial presence.cl.Send<-&prreturncl,cl.getError(nil)}func(cl*Client)Close(){// Shuts down the receivers:cl.setStatus(StatusShutdown)// Shuts down the senders:cl.shutdownOnce.Do(func(){close(cl.Send)})}// If there's a buffered error in the channel, return it. Otherwise,// return what was passed to us. The idea is that the error in the// channel probably preceded (and caused) the one that's passed as an// argument here.func(cl*Client)getError(err1error)error{select{caseerr0:=<-cl.error:returnerr0default:returnerr1}}// Register an error that happened in the internals somewhere. If// there's already an error in the channel, discard the newer one in// favor of the older.func(cl*Client)setError(errerror){defercl.Close()defercl.setStatus(StatusError)iflen(cl.error)>0{return}// If we're in a race between two calls to this function,// trying to set the "first" error, just arbitrarily let one// of them win.select{casecl.error<-err:default:}}