stream.go
changeset 113 bee6cc131798
parent 112 bd56fb741f69
child 114 a058e33c1666
equal deleted inserted replaced
112:bd56fb741f69 113:bee6cc131798
    27 
    27 
    28 // Callback to handle a stanza with a particular id.
    28 // Callback to handle a stanza with a particular id.
    29 type stanzaHandler struct {
    29 type stanzaHandler struct {
    30 	id string
    30 	id string
    31 	// Return true means pass this to the application
    31 	// Return true means pass this to the application
    32 	f func(interface{}) bool
    32 	f func(Stanza) bool
    33 }
    33 }
    34 
    34 
    35 // BUG(cjyar) Review all these *Client receiver methods. They should
    35 // BUG(cjyar) Review all these *Client receiver methods. They should
    36 // probably either all be receivers, or none.
    36 // probably either all be receivers, or none.
    37 
    37 
   223 			}
   223 			}
   224 		}
   224 		}
   225 	}
   225 	}
   226 }
   226 }
   227 
   227 
   228 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- interface{}) {
   228 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) {
   229 	defer close(cliOut)
   229 	defer close(cliOut)
   230 
   230 
   231 	handlers := make(map[string]func(interface{}) bool)
   231 	handlers := make(map[string]func(Stanza) bool)
   232 Loop:
   232 Loop:
   233 	for {
   233 	for {
   234 		select {
   234 		select {
   235 		case h := <-cl.handlers:
   235 		case h := <-cl.handlers:
   236 			handlers[h.id] = h.f
   236 			handlers[h.id] = h.f
   237 		case x, ok := <-srvIn:
   237 		case x, ok := <-srvIn:
   238 			if !ok {
   238 			if !ok {
   239 				break Loop
   239 				break Loop
   240 			}
   240 			}
   241 			var st *Header
       
   242 			switch obj := x.(type) {
   241 			switch obj := x.(type) {
   243 			case *stream:
   242 			case *stream:
   244 				handleStream(obj)
   243 				handleStream(obj)
   245 			case *streamError:
   244 			case *streamError:
   246 				cl.handleStreamError(obj)
   245 				cl.handleStreamError(obj)
   249 			case *starttls:
   248 			case *starttls:
   250 				cl.handleTls(obj)
   249 				cl.handleTls(obj)
   251 			case *auth:
   250 			case *auth:
   252 				cl.handleSasl(obj)
   251 				cl.handleSasl(obj)
   253 			case Stanza:
   252 			case Stanza:
   254 				st = obj.GetHeader()
   253 				send := true
       
   254 				id := obj.GetHeader().Id
       
   255 				if handlers[id] != nil {
       
   256 					f := handlers[id]
       
   257 					delete(handlers, id)
       
   258 					send = f(obj)
       
   259 				}
       
   260 				if send {
       
   261 					cliOut <- obj
       
   262 				}
   255 			default:
   263 			default:
   256 				Warn.Logf("Unhandled non-stanza: %T %#v", x, x)
   264 				Warn.Logf("Unhandled non-stanza: %T %#v", x, x)
   257 			}
       
   258 
       
   259 			if st == nil {
       
   260 				continue
       
   261 			}
       
   262 
       
   263 			send := true
       
   264 			if handlers[st.Id] != nil {
       
   265 				f := handlers[st.Id]
       
   266 				delete(handlers, st.Id)
       
   267 				send = f(x)
       
   268 			}
       
   269 			if send {
       
   270 				cliOut <- x
       
   271 			}
   265 			}
   272 		}
   266 		}
   273 	}
   267 	}
   274 }
   268 }
   275 
   269 
   276 // This loop is paused until resource binding is complete. Otherwise
   270 // This loop is paused until resource binding is complete. Otherwise
   277 // the app might inject something inappropriate into our negotiations
   271 // the app might inject something inappropriate into our negotiations
   278 // with the server. The control channel controls this loop's
   272 // with the server. The control channel controls this loop's
   279 // activity.
   273 // activity.
   280 func writeStream(srvOut chan<- interface{}, cliIn <-chan interface{},
   274 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza,
   281 control <-chan int) {
   275 control <-chan int) {
   282 	defer close(srvOut)
   276 	defer close(srvOut)
   283 
   277 
   284 	var input <-chan interface{}
   278 	var input <-chan Stanza
   285 Loop:
   279 Loop:
   286 	for {
   280 	for {
   287 		select {
   281 		select {
   288 		case status := <-control:
   282 		case status := <-control:
   289 			switch status {
   283 			switch status {
   307 	}
   301 	}
   308 }
   302 }
   309 
   303 
   310 // Stanzas from the remote go up through a stack of filters to the
   304 // Stanzas from the remote go up through a stack of filters to the
   311 // app. This function manages the filters.
   305 // app. This function manages the filters.
   312 func filterTop(filterOut <-chan <-chan interface{}, filterIn chan<- <-chan interface{},
   306 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza,
   313 topFilter <-chan interface{}, app chan<- interface{}) {
   307 topFilter <-chan Stanza, app chan<- Stanza) {
   314 	defer close(app)
   308 	defer close(app)
   315 Loop:
   309 Loop:
   316 	for {
   310 	for {
   317 		select {
   311 		select {
   318 		case newFilterOut := <-filterOut:
   312 		case newFilterOut := <-filterOut:
   331 			app <- data
   325 			app <- data
   332 		}
   326 		}
   333 	}
   327 	}
   334 }
   328 }
   335 
   329 
   336 func filterBottom(from <-chan interface{}, to chan<- interface{}) {
   330 func filterBottom(from <-chan Stanza, to chan<- Stanza) {
   337 	defer close(to)
   331 	defer close(to)
   338 	for data := range from {
   332 	for data := range from {
   339 		to <- data
   333 		to <- data
   340 	}
   334 	}
   341 }
   335 }
   596 	if res != "" {
   590 	if res != "" {
   597 		bindReq.Resource = &res
   591 		bindReq.Resource = &res
   598 	}
   592 	}
   599 	msg := &Iq{Header: Header{Type: "set", Id: <-Id,
   593 	msg := &Iq{Header: Header{Type: "set", Id: <-Id,
   600 		Nested: []interface{}{bindReq}}}
   594 		Nested: []interface{}{bindReq}}}
   601 	f := func(st interface{}) bool {
   595 	f := func(st Stanza) bool {
   602 		iq, ok := st.(*Iq)
   596 		iq, ok := st.(*Iq)
   603 		if !ok {
   597 		if !ok {
   604 			Warn.Log("non-iq response")
   598 			Warn.Log("non-iq response")
   605 		}
   599 		}
   606 		if iq.Type == "error" {
   600 		if iq.Type == "error" {
   641 // presence) with a given id. The provided function will not be called
   635 // presence) with a given id. The provided function will not be called
   642 // more than once. If it returns false, the stanza will not be made
   636 // more than once. If it returns false, the stanza will not be made
   643 // available on the normal Client.In channel. The stanza handler
   637 // available on the normal Client.In channel. The stanza handler
   644 // must not read from that channel, as deliveries on it cannot proceed
   638 // must not read from that channel, as deliveries on it cannot proceed
   645 // until the handler returns true or false.
   639 // until the handler returns true or false.
   646 func (cl *Client) HandleStanza(id string, f func(interface{}) bool) {
   640 func (cl *Client) HandleStanza(id string, f func(Stanza) bool) {
   647 	h := &stanzaHandler{id: id, f: f}
   641 	h := &stanzaHandler{id: id, f: f}
   648 	cl.handlers <- h
   642 	cl.handlers <- h
   649 }
   643 }