Go微服务网关(六)Hystrix解析

Posted by YaPi on January 8, 2021

中间件

意义

提高复用、隔离业务,可随意组合,拓展相关功能

实现

中间件一般都封装在路由上,路由是URL请求分发的管理器

就构建模式来说,一般有两种方式

  1. 基于链表构建中间件,缺点,实现复杂,调用方式不灵活
  2. 使用数组构建中间件,控制灵活方便,比如:GIN框架

中间件实例

限流

高并发系统三大利器:缓存、降级、限流

  • 缓存:提升系统访问速度和增大处理容量,为相应业务增加缓存
  • 降价:当服务器压力巨增时,根据业务策略降级,以此释放服务资源保证业务正常
  • 限流:通过并发限速,达到排队等待、降级等处理

限流分类

  • 漏桶限流

    每次请求时计算桶流量,超过阀值则降级请求 golang 官方 time/rate

  • 令牌桶限流

    每次请求时从令牌桶里取令牌,取不到则降级请求

time/rate使用


type Limiter struct {
	limit Limit // 每秒token数
	burst int // 桶数

	mu     sync.Mutex
	tokens float64 // 令牌数
	last time.Time // 最后一次令牌更新时间
	lastEvent time.Time // 最后一次限速的时间
}


rate.NewLimiter(limit,burst)

主要方法: 
wait(context): 阻塞多少秒
Reserve(): 返回阻塞的具体时间
Allow(): 判断当前是否能取到token(实际使用较多)

time/rate 主要源码逻辑

  • 计算上次请求和当前请求时间差
  • 计算时间差内生产的token数+旧token数
  • 如果token为负,计算等待时间
  • token为正,则请求后token-1
主要源码逻辑在reserveN方法里面,上述三种方法底层都是调用此方法


// 当前时间,请求token数量,最大等待时间
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()
    
    // 若此limit无限制,则可随时获取
	if lim.limit == Inf {
		lim.mu.Unlock()
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
	}
    
    // 根据当前时间获取至上次获取token后应增加的token数量
	now, last, tokens := lim.advance(now)
    
    // 减去本次获取数量
	tokens -= float64(n)

	var waitDuration time.Duration
	// token小于0,说明本次获取会将所有token获取完,需等待能够生成对应token数量的时间
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}
    
    // 获取数量不能大于总桶容量,并且 计算等待时间小于最大等待时间
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)
	}

	// Update state
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	} else {
		lim.last = last
	}

	lim.mu.Unlock()
	return r
}
熔断与降级

熔断器是当依赖的服务已经出现故障时,为了保证自身服务的正常运行不再访问依赖的服务,防止雪崩效应。

降级的意义是,当服务器压力巨增时,根据业务策略降级,以此释放服务资源保证业务正常。

熔断器的三种状态

  • 关闭状态

    服务正常,维护失败率统计,达到失败率阀值时,转到开启状态

  • 开启状态

    服务异常,调用fallback函数,一段时间后,进入半开启状态

  • 半开启状态

    尝试恢复服务,失败率高于阀值,进入开启状态,低于阀值,进入关闭状态。在等待一段时间后重新执行 业务逻辑,若业务逻辑执行成功,那么熔断器状态由开启状态转到关闭状态

hystrix-go

hystrix-go 是熔断、降级、限流集成类库

流量统计


type DefaultMetricCollector struct {
	mutex *sync.RWMutex

	numRequests *rolling.Number
	errors      *rolling.Number
    
	successes               *rolling.Number
	failures                *rolling.Number
	rejects                 *rolling.Number
	shortCircuits           *rolling.Number
	timeouts                *rolling.Number
	contextCanceled         *rolling.Number
	contextDeadlineExceeded *rolling.Number

	fallbackSuccesses *rolling.Number
	fallbackFailures  *rolling.Number
	totalDuration     *rolling.Timing
	runDuration       *rolling.Timing
}



type Number struct {
    // 存储统计的时候的时间戳及对应的数量
    // 他会统计每一秒中,上述结构体每一个指标的数量
    // key就是当前时间戳
    // 同时,只会保存最近10秒的数据
    
    // 这样,若我需要统计当前成功数,只需要将此map遍历一遍
    // 就能取到所有的值了
	Buckets map[int64]*numberBucket
	Mutex   *sync.RWMutex
}

type numberBucket struct {
	Value float64
}

流量控制


type executorPool struct {
    // 熔断器的名称
	Name    string
	// 统计
	Metrics *poolMetrics
	// 最大并发数
	Max     int
	// 令牌channel
	Tickets chan *struct{}
}

func newExecutorPool(name string) *executorPool {
	p := &executorPool{}
	p.Name = name
	p.Metrics = newPoolMetrics(name)
	// 取熔断器定义最大数量
	p.Max = getSettings(name).MaxConcurrentRequests

	p.Tickets = make(chan *struct{}, p.Max)
	// 放令牌数个数据到channel
	for i := 0; i < p.Max; i++ {
		p.Tickets <- &struct{}{}
	}

	return p
}

// 调用完成后执行
func (p *executorPool) Return(ticket *struct{}) {
	if ticket == nil {
		return
	}
    // 指标统计
    // 面板就可以查看了
	p.Metrics.Updates <- poolMetricsUpdate{
		activeCount: p.ActiveCount(),
	}
	// 归还令牌
	p.Tickets <- ticket
}

实时统计数据流

func (sh *StreamHandler) loop() {
    // 每一秒将统计信息推送进去
	tick := time.Tick(1 * time.Second)
	for {
		select {
		case <-tick:
			circuitBreakersMutex.RLock()
			for _, cb := range circuitBreakers {
				sh.publishMetrics(cb)
				sh.publishThreadPools(cb.executorPool)
			}
			circuitBreakersMutex.RUnlock()
		case <-sh.done:
			return
		}
	}
}


// 读取到收到的信息
// 将接受的消息推送过去
func (sh *StreamHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
    f, ok := rw.(http.Flusher)
    if !ok {
    http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
        return
    }
    events := sh.register(req)
    defer sh.unregister(req)
    notify := rw.(http.CloseNotifier).CloseNotify()
    // 设置返回的消息头
    rw.Header().Add("Content-Type", "text/event-stream")
    rw.Header().Set("Cache-Control", "no-cache")
    rw.Header().Set("Connection", "keep-alive")
    for {
        select {
        case <-notify:
            // client is gone
            return
        case event := <-events:
            _, err := rw.Write(event)
            if err != nil {
                return
            }
            f.Flush()
        }
    }
}

websocket代理实例

import (
	"flag"
	"html/template"
	"log"
	"net/http"

	"github.com/gorilla/websocket" // 第三方websoket库
)

var addr = flag.String("addr", "localhost:2003", "http service address")

var upgrader = websocket.Upgrader{} // use default options

func echo(w http.ResponseWriter, r *http.Request) {
    // 调用第三方库 将协议升级为websocket协议
    // 这个方法的主要步骤
    // 1. 校验参数header, Connection = upgrade,Upgrade = websocket, Sec-Websocket-Key 必须存在
    // 2. 将请求request及返回response封装成自己连接类型,并实现ReadMessage、WriteMessage方法
    // 3. 返回,状态码为101 Switching Protocols,返回Sec-WebSocket-Accept等
	c, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Print("upgrade:", err)
		return
	}
	// 必须关闭连接
	defer c.Close()
	for {
	    // 循环读取信息
		mt, message, err := c.ReadMessage()
		if err != nil {
			log.Println("read:", err)
			break
		}
		log.Printf("recv: %s", message)
		// 发送消息
		err = c.WriteMessage(mt, message)
		if err != nil {
			log.Println("write:", err)
			break
		}
	}
}

func home(w http.ResponseWriter, r *http.Request) {
	homeTemplate.Execute(w, "ws://"+r.Host+"/echo")
}

func main() {
	flag.Parse()
	log.SetFlags(0)
	http.HandleFunc("/echo", echo)
	http.HandleFunc("/", home)
	log.Println("Starting websocket server at " + *addr)
	log.Fatal(http.ListenAndServe(*addr, nil))
}