3ed70b38d82ba31e91677a2215ebf52b
3 - grpc hello world server 解析

grpc 输出 hello world 的处理过程

我们介绍 grpc quick start 时,通过快速启动一个 grpc server 端和 client 端,然后以 rpc 调用的方式输出一个 hello world。那么输出 hello world 需要经过哪些方法的处理呢?.......这个我也不知道,所以我们先去瞅瞅源码,探究一下 hello world 的背后是连接是如何建立的,然后一起来解读这个问题哈哈。

这节内容我们先来研究一下 server 端连接建立过程。

先放上 server 端的 main 函数。

func main() {
    lis, err := net.Listen("tcp", port)
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    s := grpc.NewServer()
    pb.RegisterGreeterServer(s, &server{})
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

我们发现其实 server 端连接的建立主要包括三步:

(1)创建 server

(2)server 的注册

(3)调用 Serve 监听端口并处理请求

ok,弄清楚主流程之后下面我们进入每个步骤里面去看一下代码实现。

1、创建 server

server 的创建比较简单,其实就下面一个方法:

func NewServer(opt ...ServerOption) *Server {
    opts := defaultServerOptions
    for _, o := range opt {
        o.apply(&opts)
    }
    s := &Server{
        lis:    make(map[net.Listener]bool),
        opts:   opts,
        conns:  make(map[transport.ServerTransport]bool),
        m:      make(map[string]*service),
        quit:   grpcsync.NewEvent(),
        done:   grpcsync.NewEvent(),
        czData: new(channelzData),
    }
    s.cv = sync.NewCond(&s.mu)
    if EnableTracing {
        _, file, line, _ := runtime.Caller(1)
        s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
    }

    if channelz.IsOn() {
        s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
    }
    return s
}

这个方法的核心无非是创建了一个 server 结构体,然后为结构体的属性赋值。我们顺便来瞅瞅 server 的结构:

// Server is a gRPC server to serve RPC requests.
type Server struct {
    // serverOptions 就是描述协议的各种参数选项,包括发送和接收的消息大小、buffer大小等等各种,跟 http Headers 类似,我们这里就暂时先不管
    opts serverOptions

    // 一个互斥锁
    mu     sync.Mutex // guards following
    // listener map
    lis    map[net.Listener]bool
    // connections map
    conns  map[transport.ServerTransport]bool
    // server 是否在处理请求的一个状态位
    serve  bool
    drain  bool
    cv     *sync.Cond          // signaled when connections close for GracefulStop
    // service map
    m      map[string]*service // service name -> service info
    events trace.EventLog

    quit               *grpcsync.Event
    done               *grpcsync.Event
    channelzRemoveOnce sync.Once
    serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop

    channelzID int64 // channelz unique identification number
    czData     *channelzData
}

虽然 server 结构体里面各种乱起八糟的字段,但是我们可以先不管哈哈哈,比较重要的无非就是三个 map 表分别用来存放多个 listener 、connection 和 service。其他字段都是为了实现协议描述或者并发控制的功能。我们重点关注下

m      map[string]*service 

这个结构,service 中主要包含了 MethodDesc 和 StreamDesc 这两个 map

type service struct {
    server interface{} // the server for service methods
    md     map[string]*MethodDesc
    sd     map[string]*StreamDesc
    mdata  interface{}
}

enter image description here

enter image description here

2、server 注册

server 的注册调用了 RegisterGreeterServer 方法,这个方法是 pb.go 文件里面的,如下:

func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
    s.RegisterService(&_Greeter_serviceDesc, srv)
}

这个方法调用了 server 的 RegisterService 方法,然后传入了一个 ServiceDesc 的数据结构,如下 :

var _Greeter_serviceDesc = grpc.ServiceDesc{
    ServiceName: "helloworld.Greeter",
    HandlerType: (*GreeterServer)(nil),
    Methods: []grpc.MethodDesc{
        {
            MethodName: "SayHello",
            Handler:    _Greeter_SayHello_Handler,
        },
        {
            MethodName: "SayHelloAgain",
            Handler:    _Greeter_SayHelloAgain_Handler,
        },
    },
    Streams:  []grpc.StreamDesc{},
    Metadata: "helloworld.proto",
}

我们来看看 RegisterService 这个方法,可以看到主要是调用了 register 方法,register 方法则按照方法名为 key,将方法注入到 server 的 service map 中。看到这里我们其实可以预测一下,server 不同 rpc 请求的处理,也是根据 service 中不同的 serviceName 去 service map 中取出不同的 handler 进行处理

func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
    ht := reflect.TypeOf(sd.HandlerType).Elem()
    st := reflect.TypeOf(ss)
    if !st.Implements(ht) {
        grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
    }
    s.register(sd, ss)
}

func (s *Server) register(sd *ServiceDesc, ss interface{}) {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.printf("RegisterService(%q)", sd.ServiceName)
    if s.serve {
        grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
    }
    if _, ok := s.m[sd.ServiceName]; ok {
        grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
    }
    srv := &service{
        server: ss,
        md:     make(map[string]*MethodDesc),
        sd:     make(map[string]*StreamDesc),
        mdata:  sd.Metadata,
    }
    for i := range sd.Methods {
        d := &sd.Methods[i]
        srv.md[d.MethodName] = d
    }
    for i := range sd.Streams {
        d := &sd.Streams[i]
        srv.sd[d.StreamName] = d
    }
    s.m[sd.ServiceName] = srv
}

3、Serve 过程

回想所有 C/S 模式下,client 和 server 的通信基本是类似的。大致过程无非是 server 通过死循环的方式在某一个端口实现监听,然后 client 对这个端口发起连接请求,握手成功后建立连接,然后 server 处理 client 发送过来的请求数据,根据请求类型和请求参数,调用不同的 handler 进行处理,回写响应数据。

所以,对 server 端来说,主要是了解其如何实现监听,如何为请求分配不同的 handler 和 回写响应数据。

top Created with Sketch.