中间件
意义
提高复用、隔离业务,可随意组合,拓展相关功能
实现
中间件一般都封装在路由上,路由是URL请求分发的管理器
就构建模式来说,一般有两种方式
- 基于链表构建中间件,缺点,实现复杂,调用方式不灵活
- 使用数组构建中间件,控制灵活方便,比如:GIN框架
中间件实例
限流
高并发系统三大利器:缓存、降级、限流
- 缓存:提升系统访问速度和增大处理容量,为相应业务增加缓存
- 降价:当服务器压力巨增时,根据业务策略降级,以此释放服务资源保证业务正常
- 限流:通过并发限速,达到排队等待、降级等处理
限流分类
- 漏桶限流
每次请求时计算桶流量,超过阀值则降级请求 golang 官方 time/rate
- 令牌桶限流
每次请求时从令牌桶里取令牌,取不到则降级请求
time/rate使用
type Limiter struct {
limit Limit // 每秒token数
burst int // 桶数
mu sync.Mutex
tokens float64 // 令牌数
last time.Time // 最后一次令牌更新时间
lastEvent time.Time // 最后一次限速的时间
}
rate.NewLimiter(limit,burst)
主要方法:
wait(context): 阻塞多少秒
Reserve(): 返回阻塞的具体时间
Allow(): 判断当前是否能取到token(实际使用较多)
time/rate 主要源码逻辑
- 计算上次请求和当前请求时间差
- 计算时间差内生产的token数+旧token数
- 如果token为负,计算等待时间
- token为正,则请求后token-1
主要源码逻辑在reserveN方法里面,上述三种方法底层都是调用此方法
// 当前时间,请求token数量,最大等待时间
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
// 若此limit无限制,则可随时获取
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}
// 根据当前时间获取至上次获取token后应增加的token数量
now, last, tokens := lim.advance(now)
// 减去本次获取数量
tokens -= float64(n)
var waitDuration time.Duration
// token小于0,说明本次获取会将所有token获取完,需等待能够生成对应token数量的时间
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// 获取数量不能大于总桶容量,并且 计算等待时间小于最大等待时间
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// Update state
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
lim.mu.Unlock()
return r
}
熔断与降级
熔断器是当依赖的服务已经出现故障时,为了保证自身服务的正常运行不再访问依赖的服务,防止雪崩效应。
降级的意义是,当服务器压力巨增时,根据业务策略降级,以此释放服务资源保证业务正常。
熔断器的三种状态
- 关闭状态
服务正常,维护失败率统计,达到失败率阀值时,转到开启状态
- 开启状态
服务异常,调用fallback函数,一段时间后,进入半开启状态
- 半开启状态
尝试恢复服务,失败率高于阀值,进入开启状态,低于阀值,进入关闭状态。在等待一段时间后重新执行 业务逻辑,若业务逻辑执行成功,那么熔断器状态由开启状态转到关闭状态
hystrix-go
hystrix-go 是熔断、降级、限流集成类库
流量统计
type DefaultMetricCollector struct {
mutex *sync.RWMutex
numRequests *rolling.Number
errors *rolling.Number
successes *rolling.Number
failures *rolling.Number
rejects *rolling.Number
shortCircuits *rolling.Number
timeouts *rolling.Number
contextCanceled *rolling.Number
contextDeadlineExceeded *rolling.Number
fallbackSuccesses *rolling.Number
fallbackFailures *rolling.Number
totalDuration *rolling.Timing
runDuration *rolling.Timing
}
type Number struct {
// 存储统计的时候的时间戳及对应的数量
// 他会统计每一秒中,上述结构体每一个指标的数量
// key就是当前时间戳
// 同时,只会保存最近10秒的数据
// 这样,若我需要统计当前成功数,只需要将此map遍历一遍
// 就能取到所有的值了
Buckets map[int64]*numberBucket
Mutex *sync.RWMutex
}
type numberBucket struct {
Value float64
}
流量控制
type executorPool struct {
// 熔断器的名称
Name string
// 统计
Metrics *poolMetrics
// 最大并发数
Max int
// 令牌channel
Tickets chan *struct{}
}
func newExecutorPool(name string) *executorPool {
p := &executorPool{}
p.Name = name
p.Metrics = newPoolMetrics(name)
// 取熔断器定义最大数量
p.Max = getSettings(name).MaxConcurrentRequests
p.Tickets = make(chan *struct{}, p.Max)
// 放令牌数个数据到channel
for i := 0; i < p.Max; i++ {
p.Tickets <- &struct{}{}
}
return p
}
// 调用完成后执行
func (p *executorPool) Return(ticket *struct{}) {
if ticket == nil {
return
}
// 指标统计
// 面板就可以查看了
p.Metrics.Updates <- poolMetricsUpdate{
activeCount: p.ActiveCount(),
}
// 归还令牌
p.Tickets <- ticket
}
实时统计数据流
func (sh *StreamHandler) loop() {
// 每一秒将统计信息推送进去
tick := time.Tick(1 * time.Second)
for {
select {
case <-tick:
circuitBreakersMutex.RLock()
for _, cb := range circuitBreakers {
sh.publishMetrics(cb)
sh.publishThreadPools(cb.executorPool)
}
circuitBreakersMutex.RUnlock()
case <-sh.done:
return
}
}
}
// 读取到收到的信息
// 将接受的消息推送过去
func (sh *StreamHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
f, ok := rw.(http.Flusher)
if !ok {
http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
return
}
events := sh.register(req)
defer sh.unregister(req)
notify := rw.(http.CloseNotifier).CloseNotify()
// 设置返回的消息头
rw.Header().Add("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")
for {
select {
case <-notify:
// client is gone
return
case event := <-events:
_, err := rw.Write(event)
if err != nil {
return
}
f.Flush()
}
}
}
websocket代理实例
import (
"flag"
"html/template"
"log"
"net/http"
"github.com/gorilla/websocket" // 第三方websoket库
)
var addr = flag.String("addr", "localhost:2003", "http service address")
var upgrader = websocket.Upgrader{} // use default options
func echo(w http.ResponseWriter, r *http.Request) {
// 调用第三方库 将协议升级为websocket协议
// 这个方法的主要步骤
// 1. 校验参数header, Connection = upgrade,Upgrade = websocket, Sec-Websocket-Key 必须存在
// 2. 将请求request及返回response封装成自己连接类型,并实现ReadMessage、WriteMessage方法
// 3. 返回,状态码为101 Switching Protocols,返回Sec-WebSocket-Accept等
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Print("upgrade:", err)
return
}
// 必须关闭连接
defer c.Close()
for {
// 循环读取信息
mt, message, err := c.ReadMessage()
if err != nil {
log.Println("read:", err)
break
}
log.Printf("recv: %s", message)
// 发送消息
err = c.WriteMessage(mt, message)
if err != nil {
log.Println("write:", err)
break
}
}
}
func home(w http.ResponseWriter, r *http.Request) {
homeTemplate.Execute(w, "ws://"+r.Host+"/echo")
}
func main() {
flag.Parse()
log.SetFlags(0)
http.HandleFunc("/echo", echo)
http.HandleFunc("/", home)
log.Println("Starting websocket server at " + *addr)
log.Fatal(http.ListenAndServe(*addr, nil))
}