stream.go
branch20120108-close
changeset 64 ac0639692317
parent 63 c7f2edd25f4a
child 66 4558994ab3b3
equal deleted inserted replaced
63:c7f2edd25f4a 64:ac0639692317
    35 }
    35 }
    36 
    36 
    37 // BUG(cjyar) Review all these *Client receiver methods. They should
    37 // BUG(cjyar) Review all these *Client receiver methods. They should
    38 // probably either all be receivers, or none.
    38 // probably either all be receivers, or none.
    39 
    39 
    40 func (cl *Client) readTransport(w io.Writer) {
    40 func (cl *Client) readTransport(w io.WriteCloser) {
    41 	defer fmt.Println("readTransport done")
    41 	defer w.Close()
    42 	defer tryClose(cl.socket, w)
       
    43 	cl.socket.SetReadTimeout(1e8)
    42 	cl.socket.SetReadTimeout(1e8)
    44 	p := make([]byte, 1024)
    43 	p := make([]byte, 1024)
    45 	for {
    44 	for {
    46 		if cl.socket == nil {
    45 		if cl.socket == nil {
    47 			cl.waitForSocket()
    46 			cl.waitForSocket()
    67 		}
    66 		}
    68 	}
    67 	}
    69 }
    68 }
    70 
    69 
    71 func (cl *Client) writeTransport(r io.Reader) {
    70 func (cl *Client) writeTransport(r io.Reader) {
    72 	defer fmt.Println("writeTransport done")
    71 	defer cl.socket.Close()
    73 	defer tryClose(r, cl.socket)
       
    74 	p := make([]byte, 1024)
    72 	p := make([]byte, 1024)
    75 	for {
    73 	for {
    76 		nr, err := r.Read(p)
    74 		nr, err := r.Read(p)
    77 		if nr == 0 {
    75 		if nr == 0 {
    78 			if Log != nil {
    76 			if Log != nil {
    90 	}
    88 	}
    91 }
    89 }
    92 
    90 
    93 func readXml(r io.Reader, ch chan<- interface{},
    91 func readXml(r io.Reader, ch chan<- interface{},
    94 	extStanza map[string] func(*xml.Name) interface{}) {
    92 	extStanza map[string] func(*xml.Name) interface{}) {
    95 	defer fmt.Println("readXml done")
       
    96 	if Loglevel >= syslog.LOG_DEBUG {
    93 	if Loglevel >= syslog.LOG_DEBUG {
    97 		pr, pw := io.Pipe()
    94 		pr, pw := io.Pipe()
    98 		go tee(r, pw, "S: ")
    95 		go tee(r, pw, "S: ")
    99 		r = pr
    96 		r = pr
   100 	}
    97 	}
   101 	defer tryClose(r, ch)
    98 	defer close(ch)
   102 
    99 
   103 	p := xml.NewParser(r)
   100 	p := xml.NewParser(r)
   104 Loop:
   101 Loop:
   105 	for {
   102 	for {
   106 		// Sniff the next token on the stream.
   103 		// Sniff the next token on the stream.
   220 	if Loglevel >= syslog.LOG_DEBUG {
   217 	if Loglevel >= syslog.LOG_DEBUG {
   221 		pr, pw := io.Pipe()
   218 		pr, pw := io.Pipe()
   222 		go tee(pr, w, "C: ")
   219 		go tee(pr, w, "C: ")
   223 		w = pw
   220 		w = pw
   224 	}
   221 	}
   225 	defer tryClose(w, ch)
   222 	defer func(w io.Writer) {
       
   223 		if c, ok := w.(io.Closer) ; ok {
       
   224 			c.Close()
       
   225 		}
       
   226 	}(w)
   226 
   227 
   227 	for obj := range ch {
   228 	for obj := range ch {
   228 		err := xml.Marshal(w, obj)
   229 		err := xml.Marshal(w, obj)
   229 		if err != nil {
   230 		if err != nil {
   230 			if Log != nil {
   231 			if Log != nil {
   234 		}
   235 		}
   235 	}
   236 	}
   236 }
   237 }
   237 
   238 
   238 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) {
   239 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) {
   239 	defer fmt.Println("readStream done")
       
   240 	defer close(cliOut)
   240 	defer close(cliOut)
   241 	defer tryClose(srvIn, cliOut)
       
   242 
   241 
   243 	handlers := make(map[string] func(Stanza) bool)
   242 	handlers := make(map[string] func(Stanza) bool)
   244 Loop:
   243 Loop:
   245 	for {
   244 	for {
   246 		select {
   245 		select {
   292 // the app might inject something inappropriate into our negotiations
   291 // the app might inject something inappropriate into our negotiations
   293 // with the server. The control channel controls this loop's
   292 // with the server. The control channel controls this loop's
   294 // activity.
   293 // activity.
   295 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza,
   294 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza,
   296 	control <-chan int) {
   295 	control <-chan int) {
   297 	defer fmt.Println("writeStream done")
       
   298 	defer close(srvOut)
   296 	defer close(srvOut)
   299 
   297 
   300 	var input <-chan Stanza
   298 	var input <-chan Stanza
   301 Loop:
   299 Loop:
   302 	for {
   300 	for {
   310 			case -1:
   308 			case -1:
   311 				break Loop
   309 				break Loop
   312 			}
   310 			}
   313 		case x, ok := <- input:
   311 		case x, ok := <- input:
   314 			if !ok {
   312 			if !ok {
   315 				fmt.Println("writeStream input closed")
       
   316 				break Loop
   313 				break Loop
   317 			}
   314 			}
   318 			if x == nil {
   315 			if x == nil {
   319 				if Log != nil {
   316 				if Log != nil {
   320 					Log.Notice("Refusing to send" +
   317 					Log.Notice("Refusing to send" +
   329 
   326 
   330 // Stanzas from the remote go up through a stack of filters to the
   327 // Stanzas from the remote go up through a stack of filters to the
   331 // app. This function manages the filters.
   328 // app. This function manages the filters.
   332 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza,
   329 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza,
   333 	topFilter <-chan Stanza, app chan<- Stanza) {
   330 	topFilter <-chan Stanza, app chan<- Stanza) {
   334 	defer fmt.Println("filterTop done")
       
   335 	defer close(app)
   331 	defer close(app)
   336 Loop:
   332 Loop:
   337 	for {
   333 	for {
   338 		select {
   334 		select {
   339 		case newFilterOut := <- filterOut:
   335 		case newFilterOut := <- filterOut:
   355 		}
   351 		}
   356 	}
   352 	}
   357 }
   353 }
   358 
   354 
   359 func filterBottom(from <-chan Stanza, to chan<- Stanza) {
   355 func filterBottom(from <-chan Stanza, to chan<- Stanza) {
   360 	defer fmt.Println("filterBottom done")
       
   361 	defer close(to)
   356 	defer close(to)
   362 	for data := range(from) {
   357 	for data := range(from) {
   363 		to <- data
   358 		to <- data
   364 	}
   359 	}
   365 }
   360 }