09e9db687637c022352c4ca34bddb9f9
12 - grpc 数据流转

grpc 数据流转

阅读本文的前提是你对 grpc 协议的编解码和 协议打解包过程都比较清楚了,假如不是很了解可以先去阅读 《10 - grpc 协议编解码器》《11 - grpc 协议解包过程全剖析》

再谈协议

我们知道协议是一款 rpc 框架的基础。协议里面定义了一次客户端需要携带的信息,包括请求的后端服务名 ServiceName,方法名 Method、超时时间 Timeout、编码 Encoding、认证信息 Authority 等等。

前面我们已经说到了,grpc 是基于 http2 协议的,我们来看看 grpc 协议里面的一些关键信息:

可以看到,一次请求需要携带这么多信息,server 会根据 client 携带的这些信息来进行相应的处理。那么这些协议里面定义的内容要如何被传递下去呢?

数据承载体

为了回答上面的问题,我们需要一个数据承载体结构,来保存协议里面的一些需要透传的一些重要信息,比如 Method 等。在 grpc 中,这个结构就是 Stream, 我们来看一下 Stream 的定义。

// Stream represents an RPC in the transport layer.
type Stream struct {
    id           uint32
    st           ServerTransport    // nil for client side Stream
    ctx          context.Context    // the associated context of the stream
    cancel       context.CancelFunc // always nil for client side Stream
    done         chan struct{}      // closed at the end of stream to unblock writers. On the client side.
    ctxDone      <-chan struct{}    // same as done chan but for server side. Cache of ctx.Done() (for performance)
    method       string             // the associated RPC method of the stream
    recvCompress string
    sendCompress string
    buf          *recvBuffer
    trReader     io.Reader
    fc           *inFlow
    wq           *writeQuota

    // Callback to state application's intentions to read data. This
    // is used to adjust flow control, if needed.
    requestRead func(int)

    headerChan       chan struct{} // closed to indicate the end of header metadata.
    headerChanClosed uint32        // set when headerChan is closed. Used to avoid closing headerChan multiple times.

    // hdrMu protects header and trailer metadata on the server-side.
    hdrMu sync.Mutex
    // On client side, header keeps the received header metadata.
    //
    // On server side, header keeps the header set by SetHeader(). The complete
    // header will merged into this after t.WriteHeader() is called.
    header  metadata.MD
    trailer metadata.MD // the key-value map of trailer metadata.

    noHeaders bool // set if the client never received headers (set only after the stream is done).

    // On the server-side, headerSent is atomically set to 1 when the headers are sent out.
    headerSent uint32

    state streamState

    // On client-side it is the status error received from the server.
    // On server-side it is unused.
    status *status.Status

    bytesReceived uint32 // indicates whether any bytes have been received on this stream
    unprocessed   uint32 // set if the server sends a refused stream or GOAWAY including this stream

    // contentSubtype is the content-subtype for requests.
    // this must be lowercase or the behavior is undefined.
    contentSubtype string
}

server 端 Stream 的构造

接下来我们来看看 server 端 Stream 的构造。前面的内容已经说过 server 的处理流程了。我们直接进入 serveStreams 这个方法。路径为:s.Serve(lis) ——> s.handleRawConn(rawConn) ——> s.serveStreams(st)

func (s *Server) serveStreams(st transport.ServerTransport) {
    defer st.Close()
    var wg sync.WaitGroup
    st.HandleStreams(func(stream *transport.Stream) {
        wg.Add(1)
        go func() {
            defer wg.Done()
            s.handleStream(st, stream, s.traceInfo(st, stream))
        }()
    }, func(ctx context.Context, method string) context.Context {
        if !EnableTracing {
            return ctx
        }
        tr := trace.New("grpc.Recv."+methodFamily(method), method)
        return trace.NewContext(ctx, tr)
    })
    wg.Wait()
}

最上层 HandleStreams 是对 http2 数据帧的处理。grpc 一共处理了 MetaHeadersFrame 、DataFrame、RSTStreamFrame、SettingsFrame、PingFrame、WindowUpdateFrame、GoAwayFrame 等 7 种帧。

// HandleStreams receives incoming streams using the given handler. This is
// typically run in a separate goroutine.
// traceCtx attaches trace to ctx and returns the new context.
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
    defer close(t.readerDone)
    for {
        frame, err := t.framer.fr.ReadFrame()
        atomic.StoreUint32(&t.activity, 1)
        if err != nil {
            if se, ok := err.(http2.StreamError); ok {
                warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
                t.mu.Lock()
                s := t.activeStreams[se.StreamID]
                t.mu.Unlock()
                if s != nil {
                    t.closeStream(s, true, se.Code, false)
                } else {
                    t.controlBuf.put(&cleanupStream{
                        streamID: se.StreamID,
                        rst:      true,
                        rstCode:  se.Code,
                        onWrite:  func() {},
                    })
                }
                continue
            }
            if err == io.EOF || err == io.ErrUnexpectedEOF {
                t.Close()
                return
top Created with Sketch.