Ed1a13902f3c2be870ee0bd59b9d0c06
10 - grpc 协议编解码器

协议编解码器

一般的协议都会包括协议头和协议体,对于业务而言,一般只关心需要发送的业务数据。所以,协议头的内容一般是框架自动帮忙填充。将业务数据包装成指定协议格式的数据包就是编码的过程,从指定协议格式中的数据包中取出业务数据的过程就是解码的过程。

每个 rpc 框架基本都有自己的编解码器,下面我们就来说说 grpc 的编解码过程。

grpc 解码

我们还是从我们的 examples 目录下的 helloworld demo 中 server 的 main 函数入手

func main() {
    lis, err := net.Listen("tcp", port)
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    s := grpc.NewServer()
    pb.RegisterGreeterServer(s, &server{})
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

在 s.Serve(lis) ——> s.handleRawConn(rawConn) —— > s.serveStreams(st) ——> s.handleStream(st, stream, s.traceInfo(st, stream)) ——> s.processUnaryRPC(t, stream, srv, md, trInfo) 方法中有一段代码:

sh := s.opts.statsHandler
...
df := func(v interface{}) error {
        if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
            return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
        }
        if sh != nil {
            sh.HandleRPC(stream.Context(), &stats.InPayload{
                RecvTime:   time.Now(),
                Payload:    v,
                WireLength: payInfo.wireLength,
                Data:       d,
                Length:     len(d),
            })
        }
        if binlog != nil {
            binlog.Log(&binarylog.ClientMessage{
                Message: d,
            })
        }
        if trInfo != nil {
            trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
        }
        return nil
}

这段代码的逻辑先调 getCodec 获取解包类,然后调用这个类的 Unmarshal 方法进行解包。将业务数据取出来,然后调用 handler 进行处理。

func (s *Server) getCodec(contentSubtype string) baseCodec {
    if s.opts.codec != nil {
        return s.opts.codec
    }
    if contentSubtype == "" {
        return encoding.GetCodec(proto.Name)
    }
    codec := encoding.GetCodec(contentSubtype)
    if codec == nil {
        return encoding.GetCodec(proto.Name)
    }
    return codec
}

我们来看 getCodec 这个方法,它是通过 contentSubtype 这个字段来获取解包类的。假如不设置 contentSubtype ,那么默认会用名字为 proto 的解码器。

我们来看看 contentSubtype 是如何设置的。之前说到了 grpc 的底层默认是基于 http2 的。在 serveHttp 时调用了 NewServerHandlerTransport 这个方法来创建一个 ServerTransport,然后我们发现,其实就是根据 content-type 这个字段去生成的。

func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats stats.Handler) (ServerTransport, error) {
    ...

    contentType := r.Header.Get("Content-Type")
    // TODO: do we assume contentType is lowercase? we did before
    contentSubtype, validContentType := contentSubtype(contentType)
    if !validContentType {
        return nil, errors.New("invalid gRPC request content-type")
    }
    if _, ok := w.(http.Flusher); !ok {
        return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher")
    }

    st := &serverHandlerTransport{
        rw:             w,
        req:            r,
        closedCh:       make(chan struct{}),
        writes:         make(chan func()),
        contentType:    contentType,
        contentSubtype: contentSubtype,
        stats:          stats,
    }
}

我们来看看 contentSubtype 这个方法 。

...
baseContentType = "application/grpc"
...
func contentSubtype(contentType string) (string, bool) {
    if contentType == baseContentType {
        return "", true
    }
    if !strings.HasPrefix(contentType, baseContentType) {
        return "", false
    }
    // guaranteed since != baseContentType and has baseContentType prefix
    switch contentType[len(baseContentType)] {
    case '+', ';':
        // this will return true for "application/grpc+" or "application/grpc;"
        // which the previous validContentType function tested to be valid, so we
        // just say that no content-subtype is specified in this case
        return contentType[len(baseContentType)+1:], true
    default:
        return "", false
    }
}

可以看到 grpc 协议默认以 application/grpc 开头,假如不一这个开头会返回错误,假如我们想使用 json 的解码器,应该设置 content-type = application/grpc+json 。下面是一个基于 grpc 协议的请求 request :

HEADERS (flags = END_HEADERS)
:method = POST
:scheme = http
top Created with Sketch.