B1a9867fc0ff4f5fcf6cd4eeaa25a065
6 - grpc 负载均衡

grpc负载均衡

负载均衡流程

grpc 官方的 doc 中介绍了,grpc 的负载均衡是基于一次请求而不是一次连接的。也就是说,假如所有的请求都来自同一个客户端的连接,这些请求还是会被均衡到所有服务器。

整个 grpc 负载均衡流程如下图:


1、启动时,grpc client 通过服名字解析服务得到一个 address list,每个 address 将指示它是服务器地址还是负载平衡器地址,以及指示要哪个客户端负载平衡策略的服务配置(例如,round_robin 或 grpclb)

2、客户端实例化负载均衡策略
如果解析程序返回的任何一个地址是负载均衡器地址,则无论 service config 中定义了什么负载均衡策略,客户端都将使用grpclb策略。否则,客户端将使用 service config 中定义的负载均衡策略。如果服务配置未请求负载均衡策略,则客户端将默认使用选择第一个可用服务器地址的策略。

3、负载平衡策略为每个服务器地址创建一个 subchannel,假如是 grpclb 策略,客户端会根据名字解析服务返回的地址列表,请求负载均衡器,由负载均衡器决定请求哪个 subConn,然后打开一个数据流,对这个 subConn 中的所有服务器 adress 都建立连接,从而实现 client stream 的效果

4、当有rpc请求时,负载均衡策略决定哪个子通道即grpc服务器将接收请求,当可用服务器为空时客户端的请求将被阻塞。

源码实现

接下来我们来看一下源码里面关于 grpc 负载均衡的实现,这里主要分为初始化 balancer 和寻址两步

1. 初始化 balancer

之前介绍 grpc 服务发现时,我们知道了通过 dns_resolver 的 lookup 方法可以得到一个 address list,那么拿到地址列表之后具体干了什么事呢?下面我们来看看

        result, sc := d.lookup()
        // Next lookup should happen within an interval defined by d.freq. It may be
        // more often due to exponential retry on empty address list.
        if len(result) == 0 {
            d.retryCount++
            d.t.Reset(d.backoff.Backoff(d.retryCount))
        } else {
            d.retryCount = 0
            d.t.Reset(d.freq)
        }
        d.cc.NewServiceConfig(sc)
        d.cc.NewAddress(result)

这里调用了 NewAddress 方法,在 NewAddress 这个方法里面又调用了 updateResolverState 这个方法,对负载均衡器的初始化就是在这个方法中进行的,如下:

    if cc.dopts.balancerBuilder == nil {
        // Only look at balancer types and switch balancer if balancer dial
        // option is not set.
        var newBalancerName string
        if cc.sc != nil && cc.sc.lbConfig != nil {
            newBalancerName = cc.sc.lbConfig.name
            balCfg = cc.sc.lbConfig.cfg
        } else {
            var isGRPCLB bool
            for _, a := range s.Addresses {
                if a.Type == resolver.GRPCLB {
                    isGRPCLB = true
                    break
                }
            }
            if isGRPCLB {
                newBalancerName = grpclbName
            } else if cc.sc != nil && cc.sc.LB != nil {
                newBalancerName = *cc.sc.LB
            } else {
                newBalancerName = PickFirstBalancerName
            }
        }
        cc.switchBalancer(newBalancerName)
    } else if cc.balancerWrapper == nil {
        // Balancer dial option was set, and this is the first time handling
        // resolved addresses. Build a balancer with dopts.balancerBuilder.
        cc.curBalancerName = cc.dopts.balancerBuilder.Name()
        cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
    }

    cc.balancerWrapper.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})

之前说到了我们在 dns_resolver 中查找 address 时是通过 grpclb 去进行查找的,所以它返回的 resolver 的策略就是 grpclb 策略。这里会进入到 switchBalancer 方法,我们来看看这个方法干了啥

func (cc *ClientConn) switchBalancer(name string) {

    builder := balancer.Get(name)

    ...

    cc.curBalancerName = builder.Name()
    cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
}

这里通过 grpclb 这个 name 去获取到了 grpclb 策略的一个 balancer 实现,然后调用了 newCCBalancerWrapper 这个方法,继续跟踪

func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
    ccb := &ccBalancerWrapper{
        cc:               cc,
        stateChangeQueue: newSCStateUpdateBuffer(),
        ccUpdateCh:       make(chan *balancer.ClientConnState, 1),
        done:             make(chan struct{}),
        subConns:         make(map[*acBalancerWrapper]struct{}),
    }
    go ccb.watcher()
    ccb.balancer = b.Build(ccb, bopts)
    return ccb
}

来看一下这里的 Build 方法,去 grpclb 这个策略的实现类里面看,发现它返回了一个 lbBalancer 实例

func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
    ...
    lb := &lbBalancer{
        cc:              newLBCacheClientConn(cc),
        target:          opt.Target.Endpoint,
        opt:             opt,
        fallbackTimeout: b.fallbackTimeout,
        doneCh:          make(chan struct{}),

        manualResolver: r,
        subConns:       make(map[resolver.Address]balancer.SubConn),
        scStates:       make(map[balancer.SubConn]connectivity.State),
        picker:         &errPicker{err: balancer.ErrNoSubConnAvailable},
        clientStats:    newRPCStats(),
        backoff:        defaultBackoffConfig, // TODO: make backoff configurable.
    }
    ...
    return lb
}

2. 寻址

之前我们说到了,helloworld demo 中 client 发送请求主要分为三步,对 balancer 的初始化其实是在第一步 grpc.Dial 时初始化 dialContext 时完成的。那么寻址过程,就是在第三步调用 sayHello 时完成的。

我们进入 sayHello ——> c.cc.Invoke ——> invoke ——> newClientStream 方法中,有下面一段代码:

if err := cs.newAttemptLocked(sh, trInfo); err != nil {
        cs.finish(err)
        return nil, err
    }

进入 newAttemptLocked 方法,如下:

func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) error {
    cs.attempt = &csAttempt{
        cs:           cs,
        dc:           cs.cc.dopts.dc,
        statsHandler: sh,
        trInfo:       trInfo,
    }

    if err := cs.ctx.Err(); err != nil {
        return toRPCErr(err)
    }
    t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method)
    ...
}

我们发现它调用了 getTransport 方法,进入这个方法,我们找到了 pick 方法的调用

top Created with Sketch.