stream.go
changeset 23 b5de44679389
parent 22 d6b7b4cbf50d
child 25 7437d6eed227
equal deleted inserted replaced
22:d6b7b4cbf50d 23:b5de44679389
   187 			break
   187 			break
   188 		}
   188 		}
   189 	}
   189 	}
   190 }
   190 }
   191 
   191 
   192 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- interface{}) {
   192 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) {
   193 	defer tryClose(srvIn, cliOut)
   193 	defer tryClose(srvIn, cliOut)
   194 
   194 
   195 	handlers := make(map[string] func(Stanza) bool)
   195 	handlers := make(map[string] func(Stanza) bool)
   196 	// BUG(cjyar) This for loop will never terminate, even when
   196 	// BUG(cjyar) This for loop will never terminate, even when
   197 	// the channels are closed.
   197 	// the channels are closed.
   211 			case *auth:
   211 			case *auth:
   212 				cl.handleSasl(obj)
   212 				cl.handleSasl(obj)
   213 			default:
   213 			default:
   214 				send = true
   214 				send = true
   215 			}
   215 			}
   216 			if st, ok := x.(Stanza) ; ok &&
   216 			st, ok := x.(Stanza)
   217 				handlers[st.XId()] != nil {
   217 			if !ok {
       
   218 				log.Printf("Unhandled non-stanza: %v",
       
   219 					x)
       
   220 				continue
       
   221 			}
       
   222 			if handlers[st.XId()] != nil {
   218 				f := handlers[st.XId()]
   223 				f := handlers[st.XId()]
   219 				handlers[st.XId()] = nil
   224 				handlers[st.XId()] = nil
   220 				send = f(st)
   225 				send = f(st)
   221 			}
   226 			}
   222 			if send {
   227 			if send {
   223 				cliOut <- x
   228 				cliOut <- st
   224 			}
   229 			}
   225 		}
   230 		}
   226 	}
   231 	}
   227 }
   232 }
   228 
   233 
   229 // BUG(cjyar) Disable this loop until resource binding is
   234 // BUG(cjyar) Disable this loop until resource binding is
   230 // complete. Otherwise the app might inject something weird into our
   235 // complete. Otherwise the app might inject something weird into our
   231 // negotiation stream.
   236 // negotiation stream.
   232 func writeStream(srvOut chan<- interface{}, cliIn <-chan interface{}) {
   237 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza) {
   233 	defer tryClose(srvOut, cliIn)
   238 	defer tryClose(srvOut, cliIn)
   234 
   239 
   235 	for x := range cliIn {
   240 	for x := range cliIn {
   236 		srvOut <- x
   241 		srvOut <- x
   237 	}
   242 	}