xmpp/layer3.go
changeset 153 bbd4166df95d
parent 150 fa7f6ff10c67
child 162 7b5586a5e109
equal deleted inserted replaced
152:69c5b4382e39 153:bbd4166df95d
     3 
     3 
     4 package xmpp
     4 package xmpp
     5 
     5 
     6 import (
     6 import (
     7 	"encoding/xml"
     7 	"encoding/xml"
     8 	"fmt"
       
     9 )
     8 )
    10 
     9 
    11 // Callback to handle a stanza with a particular id.
    10 // Callback to handle a stanza with a particular id.
    12 type callback struct {
    11 type callback struct {
    13 	id string
    12 	id string
    14 	// Return true means pass this to the application
    13 	f  func(Stanza)
    15 	f func(Stanza) bool
       
    16 }
    14 }
    17 
    15 
    18 // Receive XMPP stanzas from the client and send them on to the
    16 // Receive XMPP stanzas from the client and send them on to the
    19 // remote. Don't allow the client to send us any stanzas until
    17 // remote. Don't allow the client to send us any stanzas until
    20 // negotiation has completed.  This loop is paused until resource
    18 // negotiation has completed.  This loop is paused until resource
    21 // binding is complete. Otherwise the app might inject something
    19 // binding is complete. Otherwise the app might inject something
    22 // inappropriate into our negotiations with the server. The control
    20 // inappropriate into our negotiations with the server. The control
    23 // channel controls this loop's activity.
    21 // channel controls this loop's activity.
    24 func sendStream(sendXml chan<- interface{}, recvXmpp <-chan Stanza,
    22 func sendStream(sendXml chan<- interface{}, recvXmpp <-chan Stanza,
    25 	control <-chan sendCmd) {
    23 	status <-chan Status) {
    26 	defer close(sendXml)
    24 	defer close(sendXml)
    27 
    25 
    28 	var input <-chan Stanza
    26 	var input <-chan Stanza
    29 	for {
    27 	for {
    30 		select {
    28 		select {
    31 		case cmd := <-control:
    29 		case stat := <-status:
    32 			switch cmd {
    30 			switch stat {
    33 			case sendDeny:
    31 			default:
    34 				input = nil
    32 				input = nil
    35 			case sendAllow:
    33 			case StatusRunning:
    36 				input = recvXmpp
    34 				input = recvXmpp
    37 			case sendAbort:
       
    38 				return
       
    39 			default:
       
    40 				panic(fmt.Sprintf("unknown cmd %d", cmd))
       
    41 			}
    35 			}
    42 		case x, ok := <-input:
    36 		case x, ok := <-input:
    43 			if !ok {
    37 			if !ok {
    44 				return
    38 				return
    45 			}
    39 			}
    51 		}
    45 		}
    52 	}
    46 	}
    53 }
    47 }
    54 
    48 
    55 // Receive XMLish structures, handle all the stream-related ones, and
    49 // Receive XMLish structures, handle all the stream-related ones, and
    56 // send XMPP stanzas on to the client.
    50 // send XMPP stanzas on to the client once the connection is running.
    57 func (cl *Client) recvStream(recvXml <-chan interface{}, sendXmpp chan<- Stanza) {
    51 func (cl *Client) recvStream(recvXml <-chan interface{}, sendXmpp chan<- Stanza,
       
    52 	status <-chan Status) {
    58 	defer close(sendXmpp)
    53 	defer close(sendXmpp)
       
    54 	defer cl.statmgr.close()
    59 
    55 
    60 	handlers := make(map[string]func(Stanza) bool)
    56 	handlers := make(map[string]func(Stanza))
       
    57 	doSend := false
    61 	for {
    58 	for {
    62 		select {
    59 		select {
       
    60 		case stat := <-status:
       
    61 			switch stat {
       
    62 			default:
       
    63 				doSend = false
       
    64 			case StatusRunning:
       
    65 				doSend = true
       
    66 			}
    63 		case h := <-cl.handlers:
    67 		case h := <-cl.handlers:
    64 			handlers[h.id] = h.f
    68 			handlers[h.id] = h.f
    65 		case x, ok := <-recvXml:
    69 		case x, ok := <-recvXml:
    66 			if !ok {
    70 			if !ok {
    67 				return
    71 				return
    76 			case *starttls:
    80 			case *starttls:
    77 				cl.handleTls(obj)
    81 				cl.handleTls(obj)
    78 			case *auth:
    82 			case *auth:
    79 				cl.handleSasl(obj)
    83 				cl.handleSasl(obj)
    80 			case Stanza:
    84 			case Stanza:
    81 				send := true
       
    82 				id := obj.GetHeader().Id
    85 				id := obj.GetHeader().Id
    83 				if handlers[id] != nil {
    86 				if handlers[id] != nil {
    84 					f := handlers[id]
    87 					f := handlers[id]
    85 					delete(handlers, id)
    88 					delete(handlers, id)
    86 					send = f(obj)
    89 					f(obj)
    87 				}
    90 				}
    88 				if send {
    91 				if doSend {
    89 					sendXmpp <- obj
    92 					sendXmpp <- obj
    90 				}
    93 				}
    91 			default:
    94 			default:
    92 				Warn.Logf("Unhandled non-stanza: %T %#v", x, x)
    95 				Warn.Logf("Unhandled non-stanza: %T %#v", x, x)
    93 			}
    96 			}
    95 	}
    98 	}
    96 }
    99 }
    97 
   100 
    98 func (cl *Client) handleStreamError(se *streamError) {
   101 func (cl *Client) handleStreamError(se *streamError) {
    99 	Info.Logf("Received stream error: %v", se)
   102 	Info.Logf("Received stream error: %v", se)
   100 	cl.inputControl <- sendAbort
   103 	cl.setStatus(StatusShutdown)
   101 }
   104 }
   102 
   105 
   103 func (cl *Client) handleFeatures(fe *Features) {
   106 func (cl *Client) handleFeatures(fe *Features) {
   104 	cl.Features = fe
   107 	cl.Features = fe
   105 	if fe.Starttls != nil {
   108 	if fe.Starttls != nil {
   121 }
   124 }
   122 
   125 
   123 func (cl *Client) handleTls(t *starttls) {
   126 func (cl *Client) handleTls(t *starttls) {
   124 	cl.layer1.startTls(&cl.tlsConfig)
   127 	cl.layer1.startTls(&cl.tlsConfig)
   125 
   128 
       
   129 	cl.setStatus(StatusConnectedTls)
       
   130 
   126 	// Now re-send the initial handshake message to start the new
   131 	// Now re-send the initial handshake message to start the new
   127 	// session.
   132 	// session.
   128 	cl.sendXml <- &stream{To: cl.Jid.Domain, Version: XMPPVersion}
   133 	cl.sendXml <- &stream{To: cl.Jid.Domain, Version: XMPPVersion}
   129 }
   134 }
   130 
   135 
   135 	if res != "" {
   140 	if res != "" {
   136 		bindReq.Resource = &res
   141 		bindReq.Resource = &res
   137 	}
   142 	}
   138 	msg := &Iq{Header: Header{Type: "set", Id: NextId(),
   143 	msg := &Iq{Header: Header{Type: "set", Id: NextId(),
   139 		Nested: []interface{}{bindReq}}}
   144 		Nested: []interface{}{bindReq}}}
   140 	f := func(st Stanza) bool {
   145 	f := func(st Stanza) {
   141 		iq, ok := st.(*Iq)
   146 		iq, ok := st.(*Iq)
   142 		if !ok {
   147 		if !ok {
   143 			Warn.Log("non-iq response")
   148 			Warn.Log("non-iq response")
   144 		}
   149 		}
   145 		if iq.Type == "error" {
   150 		if iq.Type == "error" {
   146 			Warn.Log("Resource binding failed")
   151 			Warn.Log("Resource binding failed")
   147 			return false
       
   148 		}
   152 		}
   149 		var bindRepl *bindIq
   153 		var bindRepl *bindIq
   150 		for _, ele := range iq.Nested {
   154 		for _, ele := range iq.Nested {
   151 			if b, ok := ele.(*bindIq); ok {
   155 			if b, ok := ele.(*bindIq); ok {
   152 				bindRepl = b
   156 				bindRepl = b
   153 				break
   157 				break
   154 			}
   158 			}
   155 		}
   159 		}
   156 		if bindRepl == nil {
   160 		if bindRepl == nil {
   157 			Warn.Logf("Bad bind reply: %#v", iq)
   161 			Warn.Logf("Bad bind reply: %#v", iq)
   158 			return false
       
   159 		}
   162 		}
   160 		jidStr := bindRepl.Jid
   163 		jidStr := bindRepl.Jid
   161 		if jidStr == nil || *jidStr == "" {
   164 		if jidStr == nil || *jidStr == "" {
   162 			Warn.Log("Can't bind empty resource")
   165 			Warn.Log("Can't bind empty resource")
   163 			return false
       
   164 		}
   166 		}
   165 		jid := new(JID)
   167 		jid := new(JID)
   166 		if err := jid.Set(*jidStr); err != nil {
   168 		if err := jid.Set(*jidStr); err != nil {
   167 			Warn.Logf("Can't parse JID %s: %s", *jidStr, err)
   169 			Warn.Logf("Can't parse JID %s: %s", *jidStr, err)
   168 			return false
       
   169 		}
   170 		}
   170 		cl.Jid = *jid
   171 		cl.Jid = *jid
   171 		Info.Logf("Bound resource: %s", cl.Jid.String())
   172 		Info.Logf("Bound resource: %s", cl.Jid.String())
   172 		cl.bindDone()
   173 		cl.setStatus(StatusBound)
   173 		return false
       
   174 	}
   174 	}
   175 	cl.SetCallback(msg.Id, f)
   175 	cl.SetCallback(msg.Id, f)
   176 	cl.sendXml <- msg
   176 	cl.sendXml <- msg
   177 }
   177 }
   178 
   178 
   180 // presence) with a given id. The provided function will not be called
   180 // presence) with a given id. The provided function will not be called
   181 // more than once. If it returns false, the stanza will not be made
   181 // more than once. If it returns false, the stanza will not be made
   182 // available on the normal Client.Recv channel. The callback must not
   182 // available on the normal Client.Recv channel. The callback must not
   183 // read from that channel, as deliveries on it cannot proceed until
   183 // read from that channel, as deliveries on it cannot proceed until
   184 // the handler returns true or false.
   184 // the handler returns true or false.
   185 func (cl *Client) SetCallback(id string, f func(Stanza) bool) {
   185 func (cl *Client) SetCallback(id string, f func(Stanza)) {
   186 	h := &callback{id: id, f: f}
   186 	h := &callback{id: id, f: f}
   187 	cl.handlers <- h
   187 	cl.handlers <- h
   188 }
   188 }