stream.go
changeset 26 4d0a369079ce
parent 25 7437d6eed227
child 29 a456133ed0ac
equal deleted inserted replaced
25:7437d6eed227 26:4d0a369079ce
   168 			break
   168 			break
   169 		}
   169 		}
   170 	}
   170 	}
   171 }
   171 }
   172 
   172 
   173 // BUG(cjyar) This should go away. We shouldn't allow writing of
       
   174 // unstructured data.
       
   175 func writeText(w io.Writer, ch <-chan *string) {
       
   176 	if debug {
       
   177 		pr, pw := io.Pipe()
       
   178 		go tee(pr, w, "C: ")
       
   179 		w = pw
       
   180 	}
       
   181 	defer tryClose(w, ch)
       
   182 
       
   183 	for str := range ch {
       
   184 		_, err := w.Write([]byte(*str))
       
   185 		if err != nil {
       
   186 			log.Printf("writeStr: %v", err)
       
   187 			break
       
   188 		}
       
   189 	}
       
   190 }
       
   191 
       
   192 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) {
   173 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) {
   193 	defer tryClose(srvIn, cliOut)
   174 	defer tryClose(srvIn, cliOut)
   194 
   175 
   195 	handlers := make(map[string] func(Stanza) bool)
   176 	handlers := make(map[string] func(Stanza) bool)
   196 	// BUG(cjyar) This for loop will never terminate, even when
       
   197 	// the channels are closed.
       
   198 	for {
   177 	for {
   199 		select {
   178 		select {
   200 		case h := <- cl.handlers:
   179 		case h := <- cl.handlers:
   201 			handlers[h.id] = h.f
   180 			handlers[h.id] = h.f
   202 		case x := <- srvIn:
   181 		case x := <- srvIn:
       
   182 			if x == nil {
       
   183 				break
       
   184 			}
   203 			send := false
   185 			send := false
   204 			switch obj := x.(type) {
   186 			switch obj := x.(type) {
   205 			case *stream:
   187 			case *stream:
   206 				handleStream(obj)
   188 				handleStream(obj)
   207 			case *Features:
   189 			case *Features: