stream.go
changeset 20 e119444a1119
parent 19 e923f28d65aa
child 21 8f6ae5cfc9b9
equal deleted inserted replaced
19:e923f28d65aa 20:e119444a1119
    32 	id string
    32 	id string
    33 	// Return true means pass this to the application
    33 	// Return true means pass this to the application
    34 	f func(Stanza) bool
    34 	f func(Stanza) bool
    35 }
    35 }
    36 
    36 
    37 // TODO Review all these *Client receiver methods. They should
    37 // BUG(cjyar) Review all these *Client receiver methods. They should
    38 // probably either all be receivers, or none.
    38 // probably either all be receivers, or none.
    39 
    39 
    40 func (cl *Client) readTransport(w io.Writer) {
    40 func (cl *Client) readTransport(w io.Writer) {
    41 	defer tryClose(cl.socket, w)
    41 	defer tryClose(cl.socket, w)
    42 	cl.socket.SetReadTimeout(1e8)
    42 	cl.socket.SetReadTimeout(1e8)
   142 		if err != nil {
   142 		if err != nil {
   143 			log.Printf("unmarshal: %v", err)
   143 			log.Printf("unmarshal: %v", err)
   144 			break
   144 			break
   145 		}
   145 		}
   146 
   146 
   147 		// TODO If it's a Stanza, use reflection to search for
   147 		// BUG(cjyar) If it's a Stanza, use reflection to
   148 		// any Unrecognized elements and fill in their
   148 		// search for any Unrecognized elements and fill in
   149 		// attributes.
   149 		// their attributes.
   150 
   150 
   151 		// Put it on the channel.
   151 		// Put it on the channel.
   152 		ch <- obj
   152 		ch <- obj
   153 	}
   153 	}
   154 }
   154 }
   168 			break
   168 			break
   169 		}
   169 		}
   170 	}
   170 	}
   171 }
   171 }
   172 
   172 
   173 // TODO This should go away. We shouldn't allow writing of
   173 // BUG(cjyar) This should go away. We shouldn't allow writing of
   174 // unstructured data.
   174 // unstructured data.
   175 func writeText(w io.Writer, ch <-chan *string) {
   175 func writeText(w io.Writer, ch <-chan *string) {
   176 	if debug {
   176 	if debug {
   177 		pr, pw := io.Pipe()
   177 		pr, pw := io.Pipe()
   178 		go tee(pr, w, "C: ")
   178 		go tee(pr, w, "C: ")
   191 
   191 
   192 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- interface{}) {
   192 func (cl *Client) readStream(srvIn <-chan interface{}, cliOut chan<- interface{}) {
   193 	defer tryClose(srvIn, cliOut)
   193 	defer tryClose(srvIn, cliOut)
   194 
   194 
   195 	handlers := make(map[string] func(Stanza) bool)
   195 	handlers := make(map[string] func(Stanza) bool)
   196 	// TODO This for loop will never terminate, even when the
   196 	// BUG(cjyar) This for loop will never terminate, even when
   197 	// channels are closed.
   197 	// the channels are closed.
   198 	for {
   198 	for {
   199 		select {
   199 		select {
   200 		case h := <- cl.handlers:
   200 		case h := <- cl.handlers:
   201 			handlers[h.id] = h.f
   201 			handlers[h.id] = h.f
   202 		case x := <- srvIn:
   202 		case x := <- srvIn:
   224 			}
   224 			}
   225 		}
   225 		}
   226 	}
   226 	}
   227 }
   227 }
   228 
   228 
   229 // TODO Disable this loop until resource binding is
   229 // BUG(cjyar) Disable this loop until resource binding is
   230 // complete. Otherwise the app might inject something weird into our
   230 // complete. Otherwise the app might inject something weird into our
   231 // negotiation stream.
   231 // negotiation stream.
   232 func writeStream(srvOut chan<- interface{}, cliIn <-chan interface{}) {
   232 func writeStream(srvOut chan<- interface{}, cliIn <-chan interface{}) {
   233 	defer tryClose(srvOut, cliIn)
   233 	defer tryClose(srvOut, cliIn)
   234 
   234 
   305 
   305 
   306 	// Signal that we're going back to the read loop.
   306 	// Signal that we're going back to the read loop.
   307 	cl.socketSync.Done()
   307 	cl.socketSync.Done()
   308 }
   308 }
   309 
   309 
   310 // TODO
       
   311 // BUG(cjyar) Doesn't implement TLS/SASL EXTERNAL.
   310 // BUG(cjyar) Doesn't implement TLS/SASL EXTERNAL.
   312 func (cl *Client) chooseSasl(fe *Features) {
   311 func (cl *Client) chooseSasl(fe *Features) {
   313 	var digestMd5 bool
   312 	var digestMd5 bool
   314 	for _, m := range(fe.Mechanisms.Mechanism) {
   313 	for _, m := range(fe.Mechanisms.Mechanism) {
   315 		switch strings.ToLower(m) {
   314 		switch strings.ToLower(m) {