xmpp/layer3.go
changeset 147 d7679d991b17
parent 146 aa9a0ae8f875
child 148 b1b4900eee5b
equal deleted inserted replaced
146:aa9a0ae8f875 147:d7679d991b17
    14 	id string
    14 	id string
    15 	// Return true means pass this to the application
    15 	// Return true means pass this to the application
    16 	f func(Stanza) bool
    16 	f func(Stanza) bool
    17 }
    17 }
    18 
    18 
    19 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) {
    19 // Receive XMLish structures, handle all the stream-related ones, and
    20 	defer close(cliOut)
    20 // send XMPP stanzas on to the client.
       
    21 func (cl *Client) recvStream(recvXml <-chan interface{}, sendXmpp chan<- Stanza) {
       
    22 	defer close(sendXmpp)
    21 
    23 
    22 	handlers := make(map[string]func(Stanza) bool)
    24 	handlers := make(map[string]func(Stanza) bool)
    23 Loop:
    25 Loop:
    24 	for {
    26 	for {
    25 		select {
    27 		select {
    26 		case h := <-cl.handlers:
    28 		case h := <-cl.handlers:
    27 			handlers[h.id] = h.f
    29 			handlers[h.id] = h.f
    28 		case x, ok := <-srvIn:
    30 		case x, ok := <-recvXml:
    29 			if !ok {
    31 			if !ok {
    30 				break Loop
    32 				break Loop
    31 			}
    33 			}
    32 			switch obj := x.(type) {
    34 			switch obj := x.(type) {
    33 			case *stream:
    35 			case *stream:
    47 					f := handlers[id]
    49 					f := handlers[id]
    48 					delete(handlers, id)
    50 					delete(handlers, id)
    49 					send = f(obj)
    51 					send = f(obj)
    50 				}
    52 				}
    51 				if send {
    53 				if send {
    52 					cliOut <- obj
    54 					sendXmpp <- obj
    53 				}
    55 				}
    54 			default:
    56 			default:
    55 				Warn.Logf("Unhandled non-stanza: %T %#v", x, x)
    57 				Warn.Logf("Unhandled non-stanza: %T %#v", x, x)
    56 			}
    58 			}
    57 		}
    59 		}
    58 	}
    60 	}
    59 }
    61 }
    60 
    62 
    61 // This loop is paused until resource binding is complete. Otherwise
    63 // Receive XMPP stanzas from the client and send them on to the
    62 // the app might inject something inappropriate into our negotiations
    64 // remote. Don't allow the client to send us any stanzas until
    63 // with the server. The control channel controls this loop's
    65 // negotiation has completed.  This loop is paused until resource
    64 // activity.
    66 // binding is complete. Otherwise the app might inject something
    65 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza,
    67 // inappropriate into our negotiations with the server. The control
       
    68 // channel controls this loop's activity.
       
    69 func sendStream(sendXml chan<- interface{}, recvXmpp <-chan Stanza,
    66 	control <-chan sendCmd) {
    70 	control <-chan sendCmd) {
    67 	defer close(srvOut)
    71 	defer close(sendXml)
    68 
    72 
    69 	var input <-chan Stanza
    73 	var input <-chan Stanza
    70 Loop:
    74 Loop:
    71 	for {
    75 	for {
    72 		select {
    76 		select {
    73 		case cmd := <-control:
    77 		case cmd := <-control:
    74 			switch cmd {
    78 			switch cmd {
    75 			case sendDeny:
    79 			case sendDeny:
    76 				input = nil
    80 				input = nil
    77 			case sendAllow:
    81 			case sendAllow:
    78 				input = cliIn
    82 				input = recvXmpp
    79 			}
    83 			}
    80 		case x, ok := <-input:
    84 		case x, ok := <-input:
    81 			if !ok {
    85 			if !ok {
    82 				break Loop
    86 				break Loop
    83 			}
    87 			}
    84 			if x == nil {
    88 			if x == nil {
    85 				Info.Log("Refusing to send nil stanza")
    89 				Info.Log("Refusing to send nil stanza")
    86 				continue
    90 				continue
    87 			}
    91 			}
    88 			srvOut <- x
    92 			sendXml <- x
    89 		}
    93 		}
    90 	}
    94 	}
    91 }
    95 }
    92 
    96 
    93 func handleStream(ss *stream) {
    97 func handleStream(ss *stream) {