Go微服务网关(五)HTTP代理

Posted by YaPi on January 7, 2021

HTTP代理

前置知识点

特殊Header头
  • X-Forwarded-For

    记录最后直连实际服务器之前,整个代理过程,比如从原始ip到代理服务器1,再到代理服务器2,再到访问地址IP 那么它的值就是:原始IP,代理ip1,代理ip2,能伪造,原始访问端可以直接设置

  • X-Real-IP

    真是IP,在第一层代理服务器就可以拿到。原始访问端不能伪造

  • Connection
  • TE
  • Trailer
hop-by-hop

第一代理除去标准的逐段传输头 此类头部字段只对单次转发有效。会因为转发给缓存/代理服务器而失效。 HTTP 1.1 版本之后,如果要使用Hop-by-hop头部字段则需要提供Connection字段。 除了一下8个字段为逐跳字段,其余均为端到端字段。

  • Connection
  • Keep-Alive
  • Proxy-Authenticate
  • Proxy-Authenrization
  • Trailer
  • TE
  • Tranfer-Encoding
  • Upgrade

ReverseProxy

ReverseProxy是Golang官方提供的一个HTTP代理实现

ReverseProxy功能点
  • 更改内容
  • 错误信息回调
  • 支持自定义负载均衡
  • url重写
  • 连接池
  • 支持websocket服务
  • 支持https代理
ReverseProxy实现

var addr = "127.0.0.1:2002"

func main() {
	//请求访问地址:127.0.0.1:2002/xxx
	//实际转发地址:127.0.0.1:2003/base/xxx
	rs1 := "http://127.0.0.1:2003/base"
	url1, err1 := url.Parse(rs1)
	if err1 != nil {
		log.Println(err1)
	}
	proxy := httputil.NewSingleHostReverseProxy(url1)
	log.Println("Starting httpserver at " + addr)
	// 直接加入使用
	log.Fatal(http.ListenAndServe(addr, proxy))
}
ReverseProxy源码分析

使用http包下面的方法新建一个简单的ReverseProxy实现


type ReverseProxy struct {
    // 控制器,可以用此对函数内部数据进行修改
	Director func(*http.Request)
    // 连接池,若为nil,则使用http.DefaultTransPort
	Transport http.RoundTripper
    // 刷新到客户端的刷新间隔
	FlushInterval time.Duration
    // 错误记录器
	ErrorLog *log.Logger
    // 定义一个缓冲池,在复制http响应时使用,用来提高效率
	BufferPool BufferPool
	// 修改Response函数
	ModifyResponse func(*http.Response) error
    // 错误处理回调
	ErrorHandler func(http.ResponseWriter, *http.Request, error)
}

创建此结构体
proxy := httputil.NewSingleHostReverseProxy(url1)

此结构体实现了Handler接口的ServerHTTP方法,可以直接加入server使用

主要代理实现方法:ServerHTTP

func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
    // 判断当前ReverseProxy是否设置连接池,没有的话使用默认的连接池
	transport := p.Transport
	if transport == nil {
		transport = http.DefaultTransport
	}
   	// 获取请求上下文,
	ctx := req.Context()
	// 判断上游http.ResponseWriter返回是否结束,若结束,将执行请求Request请求中上下文的取消方法
	if cn, ok := rw.(http.CloseNotifier); ok {
		var cancel context.CancelFunc
		ctx, cancel = context.WithCancel(ctx)
		defer cancel()
		notifyChan := cn.CloseNotify()
		go func() {
			select {
			case <-notifyChan:
				cancel()
			case <-ctx.Done():
			}
		}()
	}
    // 拷贝请求上下文
	outreq := req.Clone(ctx)
	if req.ContentLength == 0 {
		outreq.Body = nil // Issue 16036: nil Body for http.Transport retries
	}
	if outreq.Header == nil {
		outreq.Header = make(http.Header) // Issue 33142: historical behavior was to always allocate
	}
    // 执行请求内容修改方法
	p.Director(outreq)
	outreq.Close = false
    
    // 判断协议是否升级
	reqUpType := upgradeType(outreq.Header)
	// 删除请求header
	removeConnectionHeaders(outreq.Header)

	// 删除逐跳首部,作为代理来说,需要保持持久的连接
	// 无论客户端发送建立的是持久的或者非持久的连接
	for _, h := range hopHeaders {
		hv := outreq.Header.Get(h)
		if hv == "" {
			continue
		}
		// 跳过
		if h == "Te" && hv == "trailers" {
			continue
		}
		outreq.Header.Del(h)
	}

	// 删除所有的逐跳首部过后,添加某些协议升级必须的请求头,如:websocket
	if reqUpType != "" {
		outreq.Header.Set("Connection", "Upgrade")
		outreq.Header.Set("Upgrade", reqUpType)
	}
    
    // 设置X-Forwarded-For
	if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
		// If we aren't the first proxy retain prior
		// X-Forwarded-For information as a comma+space
		// separated list and fold multiple headers into one.
		prior, ok := outreq.Header["X-Forwarded-For"]
		omit := ok && prior == nil // Issue 38079: nil now means don't populate the header
		if len(prior) > 0 {
			clientIP = strings.Join(prior, ", ") + ", " + clientIP
		}
		if !omit {
			outreq.Header.Set("X-Forwarded-For", clientIP)
		}
	}
    
    // 调用
	res, err := transport.RoundTrip(outreq)
	if err != nil {
		p.getErrorHandler()(rw, outreq, err)
		return
	}

	// Deal with 101 Switching Protocols responses: (WebSocket, h2c, etc)
	// 处理升级协议的返回
	if res.StatusCode == http.StatusSwitchingProtocols {
	    // 判断此请求和返回协议是否一致
		if !p.modifyResponse(rw, res, outreq) {
			return
		}
		// 升级协议
		// 主要实现就是 维持与真实服务器的连接,与客户端的连接
		// 将客户端的连接写入真实服务端
		// 将真实服务端返回的结果写到客户端
		p.handleUpgradeResponse(rw, outreq, res)
		// 若升级了,直接返回
		return
	}
    // 删除相关请求头
	removeConnectionHeaders(res.Header)

	for _, h := range hopHeaders {
		res.Header.Del(h)
	}

	if !p.modifyResponse(rw, res, outreq) {
		return
	}
    // 拷贝返回的header
	copyHeader(rw.Header(), res.Header)
    
    // 不重要,略
	// The "Trailer" header isn't included in the Transport's response,
	// at least for *http.Transport. Build it up from Trailer.
	announcedTrailers := len(res.Trailer)
	if announcedTrailers > 0 {
		trailerKeys := make([]string, 0, len(res.Trailer))
		for k := range res.Trailer {
			trailerKeys = append(trailerKeys, k)
		}
		rw.Header().Add("Trailer", strings.Join(trailerKeys, ", "))
	}

	rw.WriteHeader(res.StatusCode)

	err = p.copyResponse(rw, res.Body, p.flushInterval(req, res))
	if err != nil {
		defer res.Body.Close()
		// Since we're streaming the response, if we run into an error all we can do
		// is abort the request. Issue 23643: ReverseProxy should use ErrAbortHandler
		// on read error while copying body.
		if !shouldPanicOnCopyError(req) {
			p.logf("suppressing panic for copyResponse error in test; copy error: %v", err)
			return
		}
		panic(http.ErrAbortHandler)
	}
	res.Body.Close() // close now, instead of defer, to populate res.Trailer

	if len(res.Trailer) > 0 {
		// Force chunking if we saw a response trailer.
		// This prevents net/http from calculating the length for short
		// bodies and adding a Content-Length.
		if fl, ok := rw.(http.Flusher); ok {
			fl.Flush()
		}
	}

	if len(res.Trailer) == announcedTrailers {
		copyHeader(rw.Header(), res.Trailer)
		return
	}

	for k, vv := range res.Trailer {
		k = http.TrailerPrefix + k
		for _, v := range vv {
			rw.Header().Add(k, v)
		}
	}
}
ReverseProxy拓展

定义基础路由存储

type RandomBalance struct {
	curIndex int
	rss      []string
}

func (r *RandomBalance) Add(params ...string) error {
	if len(params) == 0 {
		return errors.New("param len 1 at least")
	}
	addr := params[0]
	r.rss = append(r.rss, addr)
	return nil
}

func (r *RandomBalance) Get(key string) (string, error) {
	return r.Next(), nil
}

  • 4种负载轮询类型实现及接口封装
  1. 随机
func (r *RandomBalance) Next() string {
	if len(r.rss) == 0 {
		return ""
	}
	// 随机逻辑
	r.curIndex = rand.Intn(len(r.rss))
	return r.rss[r.curIndex]
}
  1. 轮询
func (r *RandomBalance) Next() string {
	if len(r.rss) == 0 {
		return ""
	}
	lens := len(r.rss)
	if r.curIndex >= lens {
		r.curIndex = 0
	}
	curAddr := r.rss[r.curIndex]
	r.curIndex = (r.curIndex + 1) % lens
	return curAddr
}
  1. 权重轮询 一般权重轮询有几个重要的参数
  • Wight: 初始化时对节点约定的权重
  • currentWeight: 节点临时权重,每轮都会变化
  • effectiveWeight: 节点有效权重,默认与Weight相同,若其中有一台机子故障过,其权重就会降低
  • totalWeight: 所有节点有效权重之和: sum(effectiveWeight)

参照Nginx加权轮询策略: 假如当前有a,b,c三台机器,权重分别为4,3,2 初始化设置 currentWeight = effectiveWeight 第一轮:所有节点执行, currentWeight += effectiveWeight 可得:a:8,b:6,c:4 选取currentWeight最大的一台机器,即选中a机器

当选中一台机器过后,currentWeight -= totalWeight, totalWeight为所有effectiveWeight之和,即为 9

那么此时a机器的currentWeight权重为 8-9 = -1

第二轮:a:-1,b:6,c:4 同上,所有节点执行, currentWeight += effectiveWeight 可得,a: -1 + 4 = 3, b: 6+3 = 9, c : 4+2 = 6

选中b机器,执行 currentWeight -= totalWeight

9 - (4+3+2) = 0

第三轮:a:3,b:0,c:6 同上,所有节点执行, currentWeight += effectiveWeight 可得,a:7,b:3,c:8 选中c,执行 currentWeight -= totalWeight,c变为-1

第四轮:a:7,b:3,c:-1

type WeightRoundRobinBalance struct {
	curIndex int
	rss      []*WeightNode
}

type WeightNode struct {
	addr            string
	weight          int //权重值
	currentWeight   int //节点当前权重
	effectiveWeight int //有效权重
}

func (r *WeightRoundRobinBalance) Add(params ...string) error {
	if len(params) != 2 {
		return errors.New("param len need 2")
	}
	parInt, err := strconv.ParseInt(params[1], 10, 64)
	if err != nil {
		return err
	}
	node := &WeightNode{addr: params[0], weight: int(parInt)}
	node.effectiveWeight = node.weight
	r.rss = append(r.rss, node)
	return nil
}

func (r *WeightRoundRobinBalance) Next() string {
	total := 0
	var best *WeightNode
	for i := 0; i < len(r.rss); i++ {
		w := r.rss[i]
		//step 1 统计所有有效权重之和
		total += w.effectiveWeight

		//step 2 变更节点临时权重为的节点临时权重+节点有效权重
		w.currentWeight += w.effectiveWeight

		//step 3 有效权重默认与权重相同,通讯异常时-1, 通讯成功+1,直到恢复到weight大小
		if w.effectiveWeight < w.weight {
			w.effectiveWeight++
		}
		//step 4 选择最大临时权重点节点
		if best == nil || w.currentWeight > best.currentWeight {
			best = w
		}
	}
	if best == nil {
		return ""
	}
	//step 5 变更临时权重为 临时权重-有效权重之和
	best.currentWeight -= total
	return best.addr
}

  1. Hash一致性轮询
  • 拓展中间件支持:限流、熔断、权限、数据统计
type Hash func(data []byte) uint32

type UInt32Slice []uint32

func (s UInt32Slice) Len() int {
	return len(s)
}

func (s UInt32Slice) Less(i, j int) bool {
	return s[i] < s[j]
}

func (s UInt32Slice) Swap(i, j int) {
	s[i], s[j] = s[j], s[i]
}

type ConsistentHashBanlance struct {
	mux      sync.RWMutex
	hash     Hash
	replicas int               //复制因子
	keys     UInt32Slice       //已排序的节点hash切片
	hashMap  map[uint32]string //节点哈希和Key的map,键是hash值,值是节点key

	//观察主体
	conf LoadBalanceConf
}

func NewConsistentHashBanlance(replicas int, fn Hash) *ConsistentHashBanlance {
	m := &ConsistentHashBanlance{
		replicas: replicas,
		hash:     fn,
		hashMap:  make(map[uint32]string),
	}
	if m.hash == nil {
		//最多32位,保证是一个2^32-1环
		m.hash = crc32.ChecksumIEEE
	}
	return m
}

// 验证是否为空
func (c *ConsistentHashBanlance) IsEmpty() bool {
	return len(c.keys) == 0
}

// Add 方法用来添加缓存节点,参数为节点key,比如使用IP
func (c *ConsistentHashBanlance) Add(params ...string) error {
	if len(params) == 0 {
		return errors.New("param len 1 at least")
	}
	addr := params[0]
	c.mux.Lock()
	defer c.mux.Unlock()
	// 结合复制因子计算所有虚拟节点的hash值,并存入m.keys中,同时在m.hashMap中保存哈希值和key的映射
	for i := 0; i < c.replicas; i++ {
		hash := c.hash([]byte(strconv.Itoa(i) + addr))
		c.keys = append(c.keys, hash)
		c.hashMap[hash] = addr
	}
	// 对所有虚拟节点的哈希值进行排序,方便之后进行二分查找
	sort.Sort(c.keys)
	return nil
}

// Get 方法根据给定的对象获取最靠近它的那个节点
func (c *ConsistentHashBanlance) Get(key string) (string, error) {
	if c.IsEmpty() {
		return "", errors.New("node is empty")
	}
	hash := c.hash([]byte(key))

	// 通过二分查找获取最优节点,第一个"服务器hash"值大于"数据hash"值的就是最优"服务器节点"
	idx := sort.Search(len(c.keys), func(i int) bool { return c.keys[i] >= hash })

	// 如果查找结果 大于 服务器节点哈希数组的最大索引,表示此时该对象哈希值位于最后一个节点之后,那么放入第一个节点中
	if idx == len(c.keys) {
		idx = 0
	}
	c.mux.RLock()
	defer c.mux.RUnlock()
	return c.hashMap[c.keys[idx]], nil
}

func (c *ConsistentHashBanlance) SetConf(conf LoadBalanceConf) {
	c.conf = conf
}

func (c *ConsistentHashBanlance) Update() {
	if conf, ok := c.conf.(*LoadBalanceZkConf); ok {
		fmt.Println("Update get conf:", conf.GetConf())
		c.keys = nil
		c.hashMap = nil
		for _, ip := range conf.GetConf() {
			c.Add(strings.Split(ip, ",")...)
		}
	}
	if conf, ok := c.conf.(*LoadBalanceCheckConf); ok {
		fmt.Println("Update get conf:", conf.GetConf())
		c.keys = nil
		c.hashMap = map[uint32]string{}
		for _, ip := range conf.GetConf() {
			c.Add(strings.Split(ip, ",")...)
		}
	}
}