Go微服务网关(三)Http服务及源码

Posted by YaPi on January 5, 2021

HTTP实例

服务端

func main() {
	// 创建路由器
	mux := http.NewServeMux()
	// 设置路由规则
	mux.HandleFunc("/bye", sayBye)
	// 创建服务器
	server := &http.Server{
		Addr:         Addr,
		WriteTimeout: time.Second * 3,
		Handler:      mux,
	}
	// 监听端口并提供服务
	log.Println("Starting httpserver at "+Addr)
	log.Fatal(server.ListenAndServe())
}

func sayBye(w http.ResponseWriter, r *http.Request) {
	time.Sleep(1 * time.Second)
	w.Write([]byte("bye bye ,this is httpServer"))
}

客户端

func main() {
	// 创建连接池
	transport := &http.Transport{
		DialContext: (&net.Dialer{
			Timeout:   30 * time.Second, //连接超时
			KeepAlive: 30 * time.Second, //探活时间 连接建立过后,TCP自动发送连接报文的时间间隔
		}).DialContext,
		MaxIdleConns:          100,              //最大空闲连接
		IdleConnTimeout:       90 * time.Second, //空闲超时时间
		TLSHandshakeTimeout:   10 * time.Second, //tls握手超时时间 使用HTTPS的话就会使用
		ExpectContinueTimeout: 1 * time.Second,  //100-continue状态码超时时间
	}
	// 创建客户端
	client := &http.Client{
		Timeout:   time.Second * 30, //请求超时时间
		Transport: transport,
	}
	// 请求数据
	resp, err := client.Get("http://127.0.0.1:1210/bye")
	if err != nil {
		panic(err)
	}
	defer resp.Body.Close()
	// 读取内容
	bds, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		panic(err)
	}
	fmt.Println(string(bds))
}

源码解析

服务端源码

1. mux := http.NewServeMux()
创建路由器

主要构建ServeMux实例
type ServeMux struct {
	mu    sync.RWMutex //锁
	m     map[string]muxEntry // 路由map,里面包含了一些列路径,及对应调用方法的实体
	es    []muxEntry // slice of entries sorted from longest to shortest.
	hosts bool       // whether any patterns contain hostnames
}

muxEntry结构体
type muxEntry struct {
    // 具体方法
	h       Handler
	pattern string
}

2. 	mux.HandleFunc("/bye", sayBye)
添加路由及处理方法,这里第二个参数是一个函数,

func (mux *ServeMux) HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
	if handler == nil {
		panic("http: nil handler")
	}
	// 主要构建逻辑
	// 首先构建成一个HandlerFunc结构体,此结构体会实现Handler接口
	// type Handler interface {
	//  ServeHTTP(ResponseWriter, *Request)
	// }
}
	mux.Handle(pattern, HandlerFunc(handler))
}

func (mux *ServeMux) Handle(pattern string, handler Handler) {
    // 锁处理及相关校验
	...
	// 校验当前路径是否已有处理方法
	if _, exist := mux.m[pattern]; exist {
		panic("http: multiple registrations for " + pattern)
	}
    // 判断此map是否存在,不存在就新建一个
    // 未放在上一步判断之前校验,map若未初始化,匹配值的时候不会报错,数组会
	if mux.m == nil {
		mux.m = make(map[string]muxEntry)
	}
	e := muxEntry{h: handler, pattern: pattern}
	mux.m[pattern] = e
	if pattern[len(pattern)-1] == '/' {
		mux.es = appendSorted(mux.es, e)
	}

	if pattern[0] != '/' {
		mux.hosts = true
	}
}

3. 创建服务器
绑定超时时间及路由
server := &http.Server{
    Addr:         Addr,
    WriteTimeout: time.Second * 3,
    Handler:      mux,
}

4. 启动服务创建监听
server.ListenAndServe()

处理方法
func (srv *Server) ListenAndServe() error {
    // 判断是否关闭
	if srv.shuttingDown() {
		return ErrServerClosed
	}
	// 获取监听地址
	addr := srv.Addr
	if addr == "" {
		addr = ":http"
	}
	// 创建tcp监听
	ln, err := net.Listen("tcp", addr)
	if err != nil {
		return err
	}
	return srv.Serve(ln)
}

func (srv *Server) Serve(l net.Listener) error {
    // 上下文等操作
	...
	// 循环监听
	for {
	
		rw, err := l.Accept()
		if err != nil {
			select {
			case <-srv.getDoneChan():
				return ErrServerClosed
			default:
			}
			// 判断是否出错
			// 并设置等待时间,重试,若重试超最大值报错
			// 否则继续
			if ne, ok := err.(net.Error); ok && ne.Temporary() {
				if tempDelay == 0 {
					tempDelay = 5 * time.Millisecond
				} else {
					tempDelay *= 2
				}
				if max := 1 * time.Second; tempDelay > max {
					tempDelay = max
				}
				srv.logf("http: Accept error: %v; retrying in %v", err, tempDelay)
				time.Sleep(tempDelay)
				continue
			}
			return err
		}
		// 获取连接上下文信息
		connCtx := ctx
		if cc := srv.ConnContext; cc != nil {
			connCtx = cc(connCtx, rw)
			if connCtx == nil {
				panic("ConnContext returned nil")
			}
		}
		tempDelay = 0
		// 将上下文信息及连接构建成conn对象,并调用serve方法,进行具体的操作
		// 在具体处理过程中调用server的ServeHTTP方法,即,前面构建路由时,构建的HandlerFunc对象
		// 它实现了此接口,间接调用了,创建路由的时候,传入的函数方法
		c := srv.newConn(rw)
		c.setState(c.rwc, StateNew) // before Serve can return
		go c.serve(connCtx)
	}
}

客户端源码

1. 连接池相关结构体
http.Transport{}结构体主要属性

type Transport struct {
    ...
	idleMu       sync.Mutex
	closeIdle    bool                                // 用户是否已关闭所有空闲连接
	idleConn     map[connectMethodKey][]*persistConn // most recently used at end
	idleConnWait map[connectMethodKey]wantConnQueue  // waiting getConns
}

// connectMethodKey可以看作是connectMethod的描述,包含了其基础信息
type connectMethodKey struct {
	proxy, scheme, addr string
	onlyH1              bool
}
// 包含了连接地址,代理地址,请求类型等消息
type connectMethod struct {
	_            incomparable
	proxyURL     *url.URL // nil for no proxy, else full proxy URL
	targetScheme string   // "http" or "https"
	//如果proxyURL指定了http或https代理,而targetScheme是http(不是https),
    //那么targetAddr不包含在connect方法键中,因为套接字可以
    //可重用于不同的targetAddr值。
	targetAddr string
	onlyH1     bool // whether to disable HTTP/2 and force HTTP/1
}



roundTrip流程

func (t *Transport)roundTrip(req *Request)

// 获取一个持久连接
// 参数,包装Request后的transportRequest和connectMethod
pconn,err := t.getConn(treq,cm)

type persistConn struct {
    br      *bufio.Reader   // 从建立的TCP连接中读取
    bw      *bufio.Writer   // 向建立的TCP连接中写数据
    reqch   chan requestAndChan     // read by readLoop
    writech   chan writeRequest     // read by writeLoop
}

获取到一个持久化连接过后,就可以尝试去获取一个真是的连接

1. 尝试获取闲置的连接
pc, idelSince := t.getIdleConn(cm)

2. select监听事件,获取可用连接
    2.1 case <- t.incHostConnCount(cmKey),判断主机是否能突破单个限制?有限制,不用当前持久化连接,而是去使用一个非持久化连接
    2.2 case pc := <- t.getIdelConnch(cm): 尝试获取连接(获取的过程中若有其他空闲出来的连接就使用)
    2.3 case <- req.Context().Done(): 监听当前获取连接上下文是否取消了获取连接,若取消了直接返回
3. 异步创建连接
go func(){pc,err := t.dialConn(ctx,cm)}   
在这个异步连接中就可以:
go pconn.readLoop() 一边读数据 -> 监听pconn.reqch, 写入conn
go pconn.writeLoop() 一边写数据 -> 监听pconn.writech, 写入conn

4. 监听连接是否创建成功

select {
    case v:= <- dialc: 新增连接成功
    case pc:= <- idleConnCh: 有可用的连接
}

当连接创建成功过后调用persistConn的roundTrip方法

resp,err := pconn.roundTrip(treq) -> pc.writech <- writeRequest 

写入的数据就会在persistConn 的 writeLoop()方法中监听读取返回的东西
TransPort RoundTrip流程

avatar

Client超时时间设置

avatar