stream.go
changeset 110 7696e6a01709
parent 105 aa895dfae3f6
child 111 36287f2cf06e
equal deleted inserted replaced
109:3887d7ad19c1 110:7696e6a01709
    27 
    27 
    28 // Callback to handle a stanza with a particular id.
    28 // Callback to handle a stanza with a particular id.
    29 type stanzaHandler struct {
    29 type stanzaHandler struct {
    30 	id string
    30 	id string
    31 	// Return true means pass this to the application
    31 	// Return true means pass this to the application
    32 	f func(Stanza) bool
    32 	f func(interface{}) bool
    33 }
    33 }
    34 
    34 
    35 // BUG(cjyar) Review all these *Client receiver methods. They should
    35 // BUG(cjyar) Review all these *Client receiver methods. They should
    36 // probably either all be receivers, or none.
    36 // probably either all be receivers, or none.
    37 
    37 
   146 		}
   146 		}
   147 
   147 
   148 		// If it's a Stanza, we try to unmarshal its innerxml
   148 		// If it's a Stanza, we try to unmarshal its innerxml
   149 		// into objects of the appropriate respective
   149 		// into objects of the appropriate respective
   150 		// types. This is specified by our extensions.
   150 		// types. This is specified by our extensions.
   151 		if st, ok := obj.(Stanza); ok {
   151 		if st := getStanza(obj) ; st != nil {
   152 			err = parseExtended(st, extStanza)
   152 			err = parseExtended(st, extStanza)
   153 			if err != nil {
   153 			if err != nil {
   154 				Warn.Logf("ext unmarshal: %s", err)
   154 				Warn.Logf("ext unmarshal: %s", err)
   155 				break Loop
   155 				break Loop
   156 			}
   156 			}
   159 		// Put it on the channel.
   159 		// Put it on the channel.
   160 		ch <- obj
   160 		ch <- obj
   161 	}
   161 	}
   162 }
   162 }
   163 
   163 
   164 func parseExtended(st Stanza, extStanza map[string]func(*xml.Name) interface{}) error {
   164 func parseExtended(st *Stanza, extStanza map[string]func(*xml.Name) interface{}) error {
   165 	// Now parse the stanza's innerxml to find the string that we
   165 	// Now parse the stanza's innerxml to find the string that we
   166 	// can unmarshal this nested element from.
   166 	// can unmarshal this nested element from.
   167 	reader := strings.NewReader(st.innerxml())
   167 	reader := strings.NewReader(st.Innerxml)
   168 	p := xml.NewDecoder(reader)
   168 	p := xml.NewDecoder(reader)
   169 	for {
   169 	for {
   170 		t, err := p.Token()
   170 		t, err := p.Token()
   171 		if err == io.EOF {
   171 		if err == io.EOF {
   172 			break
   172 			break
   183 				// stuff it back into the stanza.
   183 				// stuff it back into the stanza.
   184 				err := p.DecodeElement(nested, &se)
   184 				err := p.DecodeElement(nested, &se)
   185 				if err != nil {
   185 				if err != nil {
   186 					return err
   186 					return err
   187 				}
   187 				}
   188 				st.addNested(nested)
   188 				st.Nested = append(st.Nested, nested)
   189 			}
   189 			}
   190 		}
   190 		}
   191 	}
   191 	}
   192 
   192 
   193 	return nil
   193 	return nil
   223 			}
   223 			}
   224 		}
   224 		}
   225 	}
   225 	}
   226 }
   226 }
   227 
   227 
   228 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- Stanza) {
   228 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- interface{}) {
   229 	defer close(cliOut)
   229 	defer close(cliOut)
   230 
   230 
   231 	handlers := make(map[string]func(Stanza) bool)
   231 	handlers := make(map[string]func(interface{}) bool)
   232 Loop:
   232 Loop:
   233 	for {
   233 	for {
   234 		select {
   234 		select {
   235 		case h := <-cl.handlers:
   235 		case h := <-cl.handlers:
   236 			handlers[h.id] = h.f
   236 			handlers[h.id] = h.f
   237 		case x, ok := <-srvIn:
   237 		case x, ok := <-srvIn:
   238 			if !ok {
   238 			if !ok {
   239 				break Loop
   239 				break Loop
   240 			}
   240 			}
   241 			send := false
   241 			var st *Stanza
   242 			switch obj := x.(type) {
   242 			switch obj := x.(type) {
   243 			case *stream:
   243 			case *stream:
   244 				handleStream(obj)
   244 				handleStream(obj)
   245 			case *streamError:
   245 			case *streamError:
   246 				cl.handleStreamError(obj)
   246 				cl.handleStreamError(obj)
   248 				cl.handleFeatures(obj)
   248 				cl.handleFeatures(obj)
   249 			case *starttls:
   249 			case *starttls:
   250 				cl.handleTls(obj)
   250 				cl.handleTls(obj)
   251 			case *auth:
   251 			case *auth:
   252 				cl.handleSasl(obj)
   252 				cl.handleSasl(obj)
       
   253 			case *Iq, *Message, *Presence:
       
   254 				st = getStanza(obj)
   253 			default:
   255 			default:
   254 				send = true
   256 				Warn.Logf("Unhandled non-stanza: %T %#v", x, x)
   255 			}
   257 			}
   256 			if !send {
   258 
       
   259 			if st == nil {
   257 				continue
   260 				continue
   258 			}
   261 			}
   259 			st, ok := x.(Stanza)
   262 
   260 			if !ok {
   263 			send := true
   261 				Warn.Logf("Unhandled non-stanza: %v", x)
   264 			if handlers[st.Id] != nil {
   262 				continue
   265 				f := handlers[st.Id]
   263 			}
   266 				delete(handlers, st.Id)
   264 			if handlers[st.GetId()] != nil {
   267 				send = f(x)
   265 				f := handlers[st.GetId()]
       
   266 				delete(handlers, st.GetId())
       
   267 				send = f(st)
       
   268 			}
   268 			}
   269 			if send {
   269 			if send {
   270 				cliOut <- st
   270 				cliOut <- x
   271 			}
   271 			}
   272 		}
   272 		}
   273 	}
   273 	}
   274 }
   274 }
   275 
   275 
   276 // This loop is paused until resource binding is complete. Otherwise
   276 // This loop is paused until resource binding is complete. Otherwise
   277 // the app might inject something inappropriate into our negotiations
   277 // the app might inject something inappropriate into our negotiations
   278 // with the server. The control channel controls this loop's
   278 // with the server. The control channel controls this loop's
   279 // activity.
   279 // activity.
   280 func writeStream(srvOut chan<- interface{}, cliIn <-chan Stanza,
   280 func writeStream(srvOut chan<- interface{}, cliIn <-chan interface{},
   281 control <-chan int) {
   281 control <-chan int) {
   282 	defer close(srvOut)
   282 	defer close(srvOut)
   283 
   283 
   284 	var input <-chan Stanza
   284 	var input <-chan interface{}
   285 Loop:
   285 Loop:
   286 	for {
   286 	for {
   287 		select {
   287 		select {
   288 		case status := <-control:
   288 		case status := <-control:
   289 			switch status {
   289 			switch status {
   307 	}
   307 	}
   308 }
   308 }
   309 
   309 
   310 // Stanzas from the remote go up through a stack of filters to the
   310 // Stanzas from the remote go up through a stack of filters to the
   311 // app. This function manages the filters.
   311 // app. This function manages the filters.
   312 func filterTop(filterOut <-chan <-chan Stanza, filterIn chan<- <-chan Stanza,
   312 func filterTop(filterOut <-chan <-chan interface{}, filterIn chan<- <-chan interface{},
   313 topFilter <-chan Stanza, app chan<- Stanza) {
   313 topFilter <-chan interface{}, app chan<- interface{}) {
   314 	defer close(app)
   314 	defer close(app)
   315 Loop:
   315 Loop:
   316 	for {
   316 	for {
   317 		select {
   317 		select {
   318 		case newFilterOut := <-filterOut:
   318 		case newFilterOut := <-filterOut:
   331 			app <- data
   331 			app <- data
   332 		}
   332 		}
   333 	}
   333 	}
   334 }
   334 }
   335 
   335 
   336 func filterBottom(from <-chan Stanza, to chan<- Stanza) {
   336 func filterBottom(from <-chan interface{}, to chan<- interface{}) {
   337 	defer close(to)
   337 	defer close(to)
   338 	for data := range from {
   338 	for data := range from {
   339 		to <- data
   339 		to <- data
   340 	}
   340 	}
   341 }
   341 }
   594 	res := cl.Jid.Resource
   594 	res := cl.Jid.Resource
   595 	bindReq := &bindIq{}
   595 	bindReq := &bindIq{}
   596 	if res != "" {
   596 	if res != "" {
   597 		bindReq.Resource = &res
   597 		bindReq.Resource = &res
   598 	}
   598 	}
   599 	msg := &Iq{Type: "set", Id: <-Id, Nested: []interface{}{bindReq}}
   599 	msg := &Iq{Stanza: Stanza{Type: "set", Id: <-Id,
   600 	f := func(st Stanza) bool {
   600 		Nested: []interface{}{bindReq}}}
       
   601 	f := func(st interface{}) bool {
   601 		iq, ok := st.(*Iq)
   602 		iq, ok := st.(*Iq)
   602 		if !ok {
   603 		if !ok {
   603 			Warn.Log("non-iq response")
   604 			Warn.Log("non-iq response")
   604 		}
   605 		}
   605 		if iq.Type == "error" {
   606 		if iq.Type == "error" {
   612 				bindRepl = b
   613 				bindRepl = b
   613 				break
   614 				break
   614 			}
   615 			}
   615 		}
   616 		}
   616 		if bindRepl == nil {
   617 		if bindRepl == nil {
   617 			Warn.Logf("Bad bind reply: %v", iq)
   618 			Warn.Logf("Bad bind reply: %#v", iq)
   618 			return false
   619 			return false
   619 		}
   620 		}
   620 		jidStr := bindRepl.Jid
   621 		jidStr := bindRepl.Jid
   621 		if jidStr == nil || *jidStr == "" {
   622 		if jidStr == nil || *jidStr == "" {
   622 			Warn.Log("Can't bind empty resource")
   623 			Warn.Log("Can't bind empty resource")
   640 // presence) with a given id. The provided function will not be called
   641 // presence) with a given id. The provided function will not be called
   641 // more than once. If it returns false, the stanza will not be made
   642 // more than once. If it returns false, the stanza will not be made
   642 // available on the normal Client.In channel. The stanza handler
   643 // available on the normal Client.In channel. The stanza handler
   643 // must not read from that channel, as deliveries on it cannot proceed
   644 // must not read from that channel, as deliveries on it cannot proceed
   644 // until the handler returns true or false.
   645 // until the handler returns true or false.
   645 func (cl *Client) HandleStanza(id string, f func(Stanza) bool) {
   646 func (cl *Client) HandleStanza(id string, f func(interface{}) bool) {
   646 	h := &stanzaHandler{id: id, f: f}
   647 	h := &stanzaHandler{id: id, f: f}
   647 	cl.handlers <- h
   648 	cl.handlers <- h
   648 }
   649 }