stream.go
changeset 69 a38173c1c8b2
parent 66 4558994ab3b3
child 72 53f15893a1a7
equal deleted inserted replaced
62:6e2eea62ccca 69:a38173c1c8b2
    35 }
    35 }
    36 
    36 
    37 // BUG(cjyar) Review all these *Client receiver methods. They should
    37 // BUG(cjyar) Review all these *Client receiver methods. They should
    38 // probably either all be receivers, or none.
    38 // probably either all be receivers, or none.
    39 
    39 
    40 func (cl *Client) readTransport(w io.Writer) {
    40 func (cl *Client) readTransport(w io.WriteCloser) {
    41 	defer tryClose(cl.socket, w)
    41 	defer w.Close()
    42 	cl.socket.SetReadTimeout(1e8)
    42 	cl.socket.SetReadTimeout(1e8)
    43 	p := make([]byte, 1024)
    43 	p := make([]byte, 1024)
    44 	for {
    44 	for {
    45 		if cl.socket == nil {
    45 		if cl.socket == nil {
    46 			cl.waitForSocket()
    46 			cl.waitForSocket()
    66 		}
    66 		}
    67 	}
    67 	}
    68 }
    68 }
    69 
    69 
    70 func (cl *Client) writeTransport(r io.Reader) {
    70 func (cl *Client) writeTransport(r io.Reader) {
    71 	defer tryClose(r, cl.socket)
    71 	defer cl.socket.Close()
    72 	p := make([]byte, 1024)
    72 	p := make([]byte, 1024)
    73 	for {
    73 	for {
    74 		nr, err := r.Read(p)
    74 		nr, err := r.Read(p)
    75 		if nr == 0 {
    75 		if nr == 0 {
    76 			if Log != nil {
    76 			if Log != nil {
    93 	if Loglevel >= syslog.LOG_DEBUG {
    93 	if Loglevel >= syslog.LOG_DEBUG {
    94 		pr, pw := io.Pipe()
    94 		pr, pw := io.Pipe()
    95 		go tee(r, pw, "S: ")
    95 		go tee(r, pw, "S: ")
    96 		r = pr
    96 		r = pr
    97 	}
    97 	}
    98 	defer tryClose(r, ch)
    98 	defer close(ch)
    99 
    99 
   100 	p := xml.NewParser(r)
   100 	p := xml.NewParser(r)
       
   101 Loop:
   101 	for {
   102 	for {
   102 		// Sniff the next token on the stream.
   103 		// Sniff the next token on the stream.
   103 		t, err := p.Token()
   104 		t, err := p.Token()
   104 		if t == nil {
   105 		if t == nil {
   105 			if err != os.EOF {
   106 			if err != os.EOF {
   123 			if err != nil {
   124 			if err != nil {
   124 				if Log != nil {
   125 				if Log != nil {
   125 					Log.Err("unmarshal stream: " +
   126 					Log.Err("unmarshal stream: " +
   126 						err.String())
   127 						err.String())
   127 				}
   128 				}
   128 				break
   129 				break Loop
   129 			}
   130 			}
   130 			ch <- st
   131 			ch <- st
   131 			continue
   132 			continue
   132 		case "stream error", NsStream + " error":
   133 		case "stream error", NsStream + " error":
   133 			obj = &streamError{}
   134 			obj = &streamError{}
   156 		err = p.Unmarshal(obj, &se)
   157 		err = p.Unmarshal(obj, &se)
   157 		if err != nil {
   158 		if err != nil {
   158 			if Log != nil {
   159 			if Log != nil {
   159 				Log.Err("unmarshal: " + err.String())
   160 				Log.Err("unmarshal: " + err.String())
   160 			}
   161 			}
   161 			break
   162 			break Loop
   162 		}
   163 		}
   163 
   164 
   164 		// If it's a Stanza, we try to unmarshal its innerxml
   165 		// If it's a Stanza, we try to unmarshal its innerxml
   165 		// into objects of the appropriate respective
   166 		// into objects of the appropriate respective
   166 		// types. This is specified by our extensions.
   167 		// types. This is specified by our extensions.
   169 			if err != nil {
   170 			if err != nil {
   170 				if Log != nil {
   171 				if Log != nil {
   171 					Log.Err("ext unmarshal: " +
   172 					Log.Err("ext unmarshal: " +
   172 						err.String())
   173 						err.String())
   173 				}
   174 				}
   174 				break
   175 				break Loop
   175 			}
   176 			}
   176 		}
   177 		}
   177 
   178 
   178 		// Put it on the channel.
   179 		// Put it on the channel.
   179 		ch <- obj
   180 		ch <- obj
   216 	if Loglevel >= syslog.LOG_DEBUG {
   217 	if Loglevel >= syslog.LOG_DEBUG {
   217 		pr, pw := io.Pipe()
   218 		pr, pw := io.Pipe()
   218 		go tee(pr, w, "C: ")
   219 		go tee(pr, w, "C: ")
   219 		w = pw
   220 		w = pw
   220 	}
   221 	}
   221 	defer tryClose(w, ch)
   222 	defer func(w io.Writer) {
       
   223 		if c, ok := w.(io.Closer) ; ok {
       
   224 			c.Close()
       
   225 		}
       
   226 	}(w)
   222 
   227 
   223 	for obj := range ch {
   228 	for obj := range ch {
   224 		err := xml.Marshal(w, obj)
   229 		err := xml.Marshal(w, obj)
   225 		if err != nil {
   230 		if err != nil {
   226 			if Log != nil {
   231 			if Log != nil {
   230 		}
   235 		}
   231 	}
   236 	}
   232 }
   237 }
   233 
   238 
   234 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) {
   239 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) {
   235 	defer tryClose(srvIn, cliOut)
   240 	defer close(cliOut)
   236 
   241 
   237 	handlers := make(map[string] func(Stanza) bool)
   242 	handlers := make(map[string] func(Stanza) bool)
       
   243 Loop:
   238 	for {
   244 	for {
   239 		select {
   245 		select {
   240 		case h := <- cl.handlers:
   246 		case h := <- cl.handlers:
   241 			handlers[h.id] = h.f
   247 			handlers[h.id] = h.f
   242 		case x := <- srvIn:
   248 		case x, ok := <- srvIn:
   243 			if x == nil {
   249 			if !ok {
   244 				break
   250 				break Loop
   245 			}
   251 			}
   246 			send := false
   252 			send := false
   247 			switch obj := x.(type) {
   253 			switch obj := x.(type) {
   248 			case *stream:
   254 			case *stream:
   249 				handleStream(obj)
   255 				handleStream(obj)
   285 // the app might inject something inappropriate into our negotiations
   291 // the app might inject something inappropriate into our negotiations
   286 // with the server. The control channel controls this loop's
   292 // with the server. The control channel controls this loop's
   287 // activity.
   293 // activity.
   288 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza,
   294 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza,
   289 	control <-chan int) {
   295 	control <-chan int) {
   290 	defer tryClose(srvOut, cliIn)
   296 	defer close(srvOut)
   291 
   297 
   292 	var input <-chan Stanza
   298 	var input <-chan Stanza
       
   299 Loop:
   293 	for {
   300 	for {
   294 		select {
   301 		select {
   295 		case status := <- control:
   302 		case status := <- control:
   296 			switch status {
   303 			switch status {
   297 			case 0:
   304 			case 0:
   298 				input = nil
   305 				input = nil
   299 			case 1:
   306 			case 1:
   300 				input = cliIn
   307 				input = cliIn
   301 			case -1:
   308 			case -1:
   302 				break
   309 				break Loop
   303 			}
   310 			}
   304 		case x := <- input:
   311 		case x, ok := <- input:
       
   312 			if !ok {
       
   313 				break Loop
       
   314 			}
   305 			if x == nil {
   315 			if x == nil {
   306 				if Log != nil {
   316 				if Log != nil {
   307 					Log.Notice("Refusing to send" +
   317 					Log.Notice("Refusing to send" +
   308 						" nil stanza")
   318 						" nil stanza")
   309 				}
   319 				}
   317 // Stanzas from the remote go up through a stack of filters to the
   327 // Stanzas from the remote go up through a stack of filters to the
   318 // app. This function manages the filters.
   328 // app. This function manages the filters.
   319 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza,
   329 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza,
   320 	topFilter <-chan Stanza, app chan<- Stanza) {
   330 	topFilter <-chan Stanza, app chan<- Stanza) {
   321 	defer close(app)
   331 	defer close(app)
       
   332 Loop:
   322 	for {
   333 	for {
   323 		select {
   334 		select {
   324 		case newFilterOut := <- filterOut:
   335 		case newFilterOut := <- filterOut:
   325 			if newFilterOut == nil {
   336 			if newFilterOut == nil {
   326 				if Log != nil {
   337 				if Log != nil {
   332 			filterIn <- topFilter
   343 			filterIn <- topFilter
   333 			topFilter = newFilterOut
   344 			topFilter = newFilterOut
   334 
   345 
   335 		case data, ok := <-topFilter:
   346 		case data, ok := <-topFilter:
   336 			if !ok {
   347 			if !ok {
   337 				break
   348 				break Loop
   338 			}
   349 			}
   339 			app <- data
   350 			app <- data
   340 		}
   351 		}
   341 	}
   352 	}
   342 }
   353 }
   353 
   364 
   354 func (cl *Client) handleStreamError(se *streamError) {
   365 func (cl *Client) handleStreamError(se *streamError) {
   355 	if Log != nil {
   366 	if Log != nil {
   356 		Log.Notice(fmt.Sprintf("Received stream error: %v", se))
   367 		Log.Notice(fmt.Sprintf("Received stream error: %v", se))
   357 	}
   368 	}
   358 	cl.Close()
   369 	close(cl.Out)
   359 }
   370 }
   360 
   371 
   361 func (cl *Client) handleFeatures(fe *Features) {
   372 func (cl *Client) handleFeatures(fe *Features) {
   362 	cl.Features = fe
   373 	cl.Features = fe
   363 	if fe.Starttls != nil {
   374 	if fe.Starttls != nil {