stream.go
changeset 121 ebb86cbdd218
parent 119 712aa5780660
equal deleted inserted replaced
120:9d7e8333948b 121:ebb86cbdd218
   298 			srvOut <- x
   298 			srvOut <- x
   299 		}
   299 		}
   300 	}
   300 	}
   301 }
   301 }
   302 
   302 
   303 // Stanzas from the remote go up through a stack of filters to the
       
   304 // app. This function manages the filters.
       
   305 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza,
       
   306 	topFilter <-chan Stanza, app chan<- Stanza) {
       
   307 	defer close(app)
       
   308 Loop:
       
   309 	for {
       
   310 		select {
       
   311 		case newFilterOut := <-filterOut:
       
   312 			if newFilterOut == nil {
       
   313 				Warn.Log("Received nil filter")
       
   314 				filterIn <- nil
       
   315 				continue
       
   316 			}
       
   317 			filterIn <- topFilter
       
   318 			topFilter = newFilterOut
       
   319 
       
   320 		case data, ok := <-topFilter:
       
   321 			if !ok {
       
   322 				break Loop
       
   323 			}
       
   324 			app <- data
       
   325 		}
       
   326 	}
       
   327 }
       
   328 
       
   329 func filterBottom(from <-chan Stanza, to chan<- Stanza) {
       
   330 	defer close(to)
       
   331 	for data := range from {
       
   332 		to <- data
       
   333 	}
       
   334 }
       
   335 
       
   336 func handleStream(ss *stream) {
   303 func handleStream(ss *stream) {
   337 }
   304 }
   338 
   305 
   339 func (cl *Client) handleStreamError(se *streamError) {
   306 func (cl *Client) handleStreamError(se *streamError) {
   340 	Info.Logf("Received stream error: %v", se)
   307 	Info.Logf("Received stream error: %v", se)