stream.go
branch20120108-close
changeset 63 c7f2edd25f4a
parent 62 6e2eea62ccca
child 64 ac0639692317
equal deleted inserted replaced
62:6e2eea62ccca 63:c7f2edd25f4a
    36 
    36 
    37 // BUG(cjyar) Review all these *Client receiver methods. They should
    37 // BUG(cjyar) Review all these *Client receiver methods. They should
    38 // probably either all be receivers, or none.
    38 // probably either all be receivers, or none.
    39 
    39 
    40 func (cl *Client) readTransport(w io.Writer) {
    40 func (cl *Client) readTransport(w io.Writer) {
       
    41 	defer fmt.Println("readTransport done")
    41 	defer tryClose(cl.socket, w)
    42 	defer tryClose(cl.socket, w)
    42 	cl.socket.SetReadTimeout(1e8)
    43 	cl.socket.SetReadTimeout(1e8)
    43 	p := make([]byte, 1024)
    44 	p := make([]byte, 1024)
    44 	for {
    45 	for {
    45 		if cl.socket == nil {
    46 		if cl.socket == nil {
    66 		}
    67 		}
    67 	}
    68 	}
    68 }
    69 }
    69 
    70 
    70 func (cl *Client) writeTransport(r io.Reader) {
    71 func (cl *Client) writeTransport(r io.Reader) {
       
    72 	defer fmt.Println("writeTransport done")
    71 	defer tryClose(r, cl.socket)
    73 	defer tryClose(r, cl.socket)
    72 	p := make([]byte, 1024)
    74 	p := make([]byte, 1024)
    73 	for {
    75 	for {
    74 		nr, err := r.Read(p)
    76 		nr, err := r.Read(p)
    75 		if nr == 0 {
    77 		if nr == 0 {
    88 	}
    90 	}
    89 }
    91 }
    90 
    92 
    91 func readXml(r io.Reader, ch chan<- interface{},
    93 func readXml(r io.Reader, ch chan<- interface{},
    92 	extStanza map[string] func(*xml.Name) interface{}) {
    94 	extStanza map[string] func(*xml.Name) interface{}) {
       
    95 	defer fmt.Println("readXml done")
    93 	if Loglevel >= syslog.LOG_DEBUG {
    96 	if Loglevel >= syslog.LOG_DEBUG {
    94 		pr, pw := io.Pipe()
    97 		pr, pw := io.Pipe()
    95 		go tee(r, pw, "S: ")
    98 		go tee(r, pw, "S: ")
    96 		r = pr
    99 		r = pr
    97 	}
   100 	}
    98 	defer tryClose(r, ch)
   101 	defer tryClose(r, ch)
    99 
   102 
   100 	p := xml.NewParser(r)
   103 	p := xml.NewParser(r)
       
   104 Loop:
   101 	for {
   105 	for {
   102 		// Sniff the next token on the stream.
   106 		// Sniff the next token on the stream.
   103 		t, err := p.Token()
   107 		t, err := p.Token()
   104 		if t == nil {
   108 		if t == nil {
   105 			if err != os.EOF {
   109 			if err != os.EOF {
   123 			if err != nil {
   127 			if err != nil {
   124 				if Log != nil {
   128 				if Log != nil {
   125 					Log.Err("unmarshal stream: " +
   129 					Log.Err("unmarshal stream: " +
   126 						err.String())
   130 						err.String())
   127 				}
   131 				}
   128 				break
   132 				break Loop
   129 			}
   133 			}
   130 			ch <- st
   134 			ch <- st
   131 			continue
   135 			continue
   132 		case "stream error", NsStream + " error":
   136 		case "stream error", NsStream + " error":
   133 			obj = &streamError{}
   137 			obj = &streamError{}
   156 		err = p.Unmarshal(obj, &se)
   160 		err = p.Unmarshal(obj, &se)
   157 		if err != nil {
   161 		if err != nil {
   158 			if Log != nil {
   162 			if Log != nil {
   159 				Log.Err("unmarshal: " + err.String())
   163 				Log.Err("unmarshal: " + err.String())
   160 			}
   164 			}
   161 			break
   165 			break Loop
   162 		}
   166 		}
   163 
   167 
   164 		// If it's a Stanza, we try to unmarshal its innerxml
   168 		// If it's a Stanza, we try to unmarshal its innerxml
   165 		// into objects of the appropriate respective
   169 		// into objects of the appropriate respective
   166 		// types. This is specified by our extensions.
   170 		// types. This is specified by our extensions.
   169 			if err != nil {
   173 			if err != nil {
   170 				if Log != nil {
   174 				if Log != nil {
   171 					Log.Err("ext unmarshal: " +
   175 					Log.Err("ext unmarshal: " +
   172 						err.String())
   176 						err.String())
   173 				}
   177 				}
   174 				break
   178 				break Loop
   175 			}
   179 			}
   176 		}
   180 		}
   177 
   181 
   178 		// Put it on the channel.
   182 		// Put it on the channel.
   179 		ch <- obj
   183 		ch <- obj
   230 		}
   234 		}
   231 	}
   235 	}
   232 }
   236 }
   233 
   237 
   234 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) {
   238 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) {
       
   239 	defer fmt.Println("readStream done")
       
   240 	defer close(cliOut)
   235 	defer tryClose(srvIn, cliOut)
   241 	defer tryClose(srvIn, cliOut)
   236 
   242 
   237 	handlers := make(map[string] func(Stanza) bool)
   243 	handlers := make(map[string] func(Stanza) bool)
       
   244 Loop:
   238 	for {
   245 	for {
   239 		select {
   246 		select {
   240 		case h := <- cl.handlers:
   247 		case h := <- cl.handlers:
   241 			handlers[h.id] = h.f
   248 			handlers[h.id] = h.f
   242 		case x := <- srvIn:
   249 		case x, ok := <- srvIn:
   243 			if x == nil {
   250 			if !ok {
   244 				break
   251 				break Loop
   245 			}
   252 			}
   246 			send := false
   253 			send := false
   247 			switch obj := x.(type) {
   254 			switch obj := x.(type) {
   248 			case *stream:
   255 			case *stream:
   249 				handleStream(obj)
   256 				handleStream(obj)
   285 // the app might inject something inappropriate into our negotiations
   292 // the app might inject something inappropriate into our negotiations
   286 // with the server. The control channel controls this loop's
   293 // with the server. The control channel controls this loop's
   287 // activity.
   294 // activity.
   288 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza,
   295 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza,
   289 	control <-chan int) {
   296 	control <-chan int) {
   290 	defer tryClose(srvOut, cliIn)
   297 	defer fmt.Println("writeStream done")
       
   298 	defer close(srvOut)
   291 
   299 
   292 	var input <-chan Stanza
   300 	var input <-chan Stanza
       
   301 Loop:
   293 	for {
   302 	for {
   294 		select {
   303 		select {
   295 		case status := <- control:
   304 		case status := <- control:
   296 			switch status {
   305 			switch status {
   297 			case 0:
   306 			case 0:
   298 				input = nil
   307 				input = nil
   299 			case 1:
   308 			case 1:
   300 				input = cliIn
   309 				input = cliIn
   301 			case -1:
   310 			case -1:
   302 				break
   311 				break Loop
   303 			}
   312 			}
   304 		case x := <- input:
   313 		case x, ok := <- input:
       
   314 			if !ok {
       
   315 				fmt.Println("writeStream input closed")
       
   316 				break Loop
       
   317 			}
   305 			if x == nil {
   318 			if x == nil {
   306 				if Log != nil {
   319 				if Log != nil {
   307 					Log.Notice("Refusing to send" +
   320 					Log.Notice("Refusing to send" +
   308 						" nil stanza")
   321 						" nil stanza")
   309 				}
   322 				}
   316 
   329 
   317 // Stanzas from the remote go up through a stack of filters to the
   330 // Stanzas from the remote go up through a stack of filters to the
   318 // app. This function manages the filters.
   331 // app. This function manages the filters.
   319 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza,
   332 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza,
   320 	topFilter <-chan Stanza, app chan<- Stanza) {
   333 	topFilter <-chan Stanza, app chan<- Stanza) {
       
   334 	defer fmt.Println("filterTop done")
   321 	defer close(app)
   335 	defer close(app)
       
   336 Loop:
   322 	for {
   337 	for {
   323 		select {
   338 		select {
   324 		case newFilterOut := <- filterOut:
   339 		case newFilterOut := <- filterOut:
   325 			if newFilterOut == nil {
   340 			if newFilterOut == nil {
   326 				if Log != nil {
   341 				if Log != nil {
   332 			filterIn <- topFilter
   347 			filterIn <- topFilter
   333 			topFilter = newFilterOut
   348 			topFilter = newFilterOut
   334 
   349 
   335 		case data, ok := <-topFilter:
   350 		case data, ok := <-topFilter:
   336 			if !ok {
   351 			if !ok {
   337 				break
   352 				break Loop
   338 			}
   353 			}
   339 			app <- data
   354 			app <- data
   340 		}
   355 		}
   341 	}
   356 	}
   342 }
   357 }
   343 
   358 
   344 func filterBottom(from <-chan Stanza, to chan<- Stanza) {
   359 func filterBottom(from <-chan Stanza, to chan<- Stanza) {
       
   360 	defer fmt.Println("filterBottom done")
   345 	defer close(to)
   361 	defer close(to)
   346 	for data := range(from) {
   362 	for data := range(from) {
   347 		to <- data
   363 		to <- data
   348 	}
   364 	}
   349 }
   365 }
   353 
   369 
   354 func (cl *Client) handleStreamError(se *streamError) {
   370 func (cl *Client) handleStreamError(se *streamError) {
   355 	if Log != nil {
   371 	if Log != nil {
   356 		Log.Notice(fmt.Sprintf("Received stream error: %v", se))
   372 		Log.Notice(fmt.Sprintf("Received stream error: %v", se))
   357 	}
   373 	}
   358 	cl.Close()
   374 	close(cl.Out)
   359 }
   375 }
   360 
   376 
   361 func (cl *Client) handleFeatures(fe *Features) {
   377 func (cl *Client) handleFeatures(fe *Features) {
   362 	cl.Features = fe
   378 	cl.Features = fe
   363 	if fe.Starttls != nil {
   379 	if fe.Starttls != nil {
   396 	// Make the TLS connection available to the reader, and wait
   412 	// Make the TLS connection available to the reader, and wait
   397 	// for it to signal that it's working again.
   413 	// for it to signal that it's working again.
   398 	cl.socketSync.Add(1)
   414 	cl.socketSync.Add(1)
   399 	cl.socket = tls
   415 	cl.socket = tls
   400 	cl.socketSync.Wait()
   416 	cl.socketSync.Wait()
   401 
       
   402 	// Reset the read timeout on the (underlying) socket so the
       
   403 	// reader doesn't get woken up unnecessarily.
       
   404 	tcp.SetReadTimeout(0)
       
   405 
   417 
   406 	if Log != nil {
   418 	if Log != nil {
   407 		Log.Info("TLS negotiation succeeded.")
   419 		Log.Info("TLS negotiation succeeded.")
   408 	}
   420 	}
   409 	cl.Features = nil
   421 	cl.Features = nil