gRPC 系列——grpc超时传递原理
[作者简介] 郑伟,小米信息技术部架构组
引子
有个业务方反馈说日志中偶尔出现 xorm 抛出来的 context deadline exceeded
的报错,想咨询下是什么原因。业务方实现的 gRPC Handler 大概代码如下:
1 2 3 4 5 6 7 8
| func (s Svc) BizHandler(ctx context.Context, r *projectv1.BizHandlerRequest) (*projectv1.BizHandlerResponse, error) { var bean dao.Bean if err := db.W().Find(ctx, &bean); err != nil { return nil, err } ... }
|
目前业务方使用过的 xorm 是我们改造过的,函数签名中都添加了 ctx 参数,目的是为了接入 OpenTracing 做分布式追踪。
业务方反馈的这个 context deadline execcded
问题应该是出在查询 bean 的时候使用了带 timeout 的 ctx,如果这个 ctx 的 timeout 时间很短,有可能会在执行查询操作前就抛出 context deadline execcded
错误。
xorm 底层使用的标准包 database/sql
,最终执行查询的函数可能是 ctxDriverQuery
或 ctxDriverStmtQuery
这两个函数。以 ctxDriverQuery
为例:
1 2 3 4 5 6 7 8 9 10 11 12
| func ctxDriverQuery(ctx context.Context, queryerCtx driver.QueryerContext, queryer driver.Queryer, query string, nvdargs []driver.NamedValue) (driver.Rows, error) { ... select { default: case <-ctx.Done(): return nil, ctx.Err() } return queryer.Query(query, dargs) }
|
可以看到如果 db.W().Find(ctx, &bean)
使用的 ctx 是设置了 timeout 的 ctx,那么是有可能在经过 xorm 的一些冗长的前置处理后,调用标准包的 ctxDriver
系列函数时产生 context deadline execcded
错误。
这个很好理解,但是业务方声称并未在 gRPC Handler 中主动为 context 设置 timeout。那么这个带 timeout 的 context 到底怎么产生的呢?
谁构造的带 timeout 的 context?
业务方的 gRPC handler 中对传入的 ctx 明显未做 context.WithTimeout()
处理,我们把目光投向客户端。业务方的 service graph 是这样:
1
| ServiceA -> ServiceB -> ServiceC -> xorm
|
当前反馈查询 xorm 报错的是 ServiceC,我们找到 ServiceB 看了下调用 ServiceC gRPC Handler 代码。ServiceB 中 ctx 来自 ServiceA,ServiceB 中拿到 ctx 后,也并未设置 timeout。
看来设置 timeout 的只可能是整个调用链发起方(即 ServiceA),通过 Review 代码我们发现 ServiceA 发起 RPC 调用时,确实传入了带 timeout 的 ctx:
1 2 3 4 5 6 7
| func InvokeServiceB() { ... ctx,_ := context.WithTimeout(ctx, 3*time.Second) response, err := grpcClient.ServicebBiz(ctx, request) ... }
|
我们将 ServiceA 中发起 RPC 调用的 ctx 超时设置成 10 秒,再测试发现 ServiceC 反馈的 context deadline execcded
报错消失了。
gRPC 超时如何做到跨进程传递?
我们测试发现,不仅是 Go gRPC 服务之间超时可以传递(如果你拿到上游的 ctx 继续往下透传的话)。Go 和 Java 服务之间,超时也会随着调用链传递。那么 gRPC 的超时是如何做到跨进程跨语言传递的?
有朋友可能想到了 metadata,是否 gRPC 请求链上游设置了超时后,gRPC 框架底层将过期时间放在 metadata 里了?遗憾的是我们打印 metadata 后发现并未发现 timeout 字段踪迹。那么超时时间到底是怎样传递的呢?以 grpc-go
源码为例,我们来找下线索。
我们知道 gRPC 基于 HTTP2,HTTP2 传输的最小单位是 Frame(帧)。HTTP2 的帧包含很多类型:DATA Frame
、HEADERS Frame
、PRIORITY Frame
、RST_STREAM Frame
、CONTINUATON Frame
等。一个 HTTP2 请求/响应可以被拆成多个帧并行发送,每一帧都有一个 StreamID 来标记属于哪个 Stream。服务端收到 Frame 后,根据 StreamID 组装出原始请求数据。
对于 gRPC 而言,Data Frame 用来存放请求的 response payload;Headers Frame 可用来存放一些需要进行跨进程传递的数据,比如 grpc-status(RPC 请求状态码)
、 :path(RPC 完整路径)
等。那么超时时间是否也通过 HEADERS Frame 传递呢?
客户端设置 timeout
我们知道,用户定义好 protobuf 并通过 protoc 生成桩代码后,桩代码中已经包含了 gRPCCient 接口的实现,每一个在 protobuf 中定义的 RPC,底层都会通过 ClientConn. Invoke 向服务端发起调用:
比如对于这样的 protobuf:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| syntax = "proto3";
package proto;
service DemoService { rpc SayHi(HiRequest) returns (HiResponse); }
message HiRequest { string name = 1; }
message HiResponse { string message = 1; }
|
生成的桩代码中已经包含了 Client 实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| type DemoServiceClient interface { SayHiOK(ctx context.Context, in *HiRequest, opts ...grpc.CallOption) (*HiResponse, error) }
type demoServiceClient struct { cc *grpc.ClientConn }
func NewDemoServiceClient(cc *grpc.ClientConn) DemoServiceClient { return &demoServiceClient{cc} }
func (c *demoServiceClient) SayHiOK(ctx context.Context, in *HiRequest, opts ...grpc.CallOption) (*HiResponse, error) { out := new(HiResponse) err := c.cc.Invoke(ctx, "/proto.DemoService/SayHi", in, out, opts...) if err != nil { return nil, err } return out, nil }
|
客户端发起 gRPC 请求时,最终会调用 invoke() 方法,invoke() 源码大概如下:
1 2 3 4 5 6 7 8 9 10 11 12
| func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...) if err != nil { return err } if err := cs.SendMsg(req); err != nil { return err } return cs.RecvMsg(reply) }
|
我们看下 newClientStream 源码,newClientStream 源码比较复杂,我们挑重点看:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { ... if err := cc.waitForResolvedAddrs(ctx); err != nil { return nil, err } ...
cs := &clientStream{ callHdr: callHdr, ctx: ctx, ... }
if err := cs.newAttemptLocked(sh, trInfo); err != nil { cs.finish(err) return nil, err } ... return cs, nil }
|
其中 csAttempt.newStream 实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| type csAttempt struct { cs *clientStream t transport.ClientTransport s *transport.Stream ... }
func (a *csAttempt) newStream() error { ... s, err := a.t.NewStream(cs.ctx, cs.callHdr) cs.attempt.s = s ... return nil }
|
transport.ClientTransport
是一个接口,gRPC 内部 internal/transport.http2Client
实现了此接口。
http2Client.NewStream()
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) { ctx = peer.NewContext(ctx, t.getPeer()) headerFields, err := t.createHeaderFields(ctx, callHdr) ... hdr := &headerFrame{ hf: headerFields, endStream: false, ... } ... for { success, err := t.controlBuf.executeAndPut(func(it interface{}) bool { if !checkForStreamQuota(it) { return false } if !checkForHeaderListSize(it) { return false } return true }, hdr) ... return s, nil }
|
createHeaderFields
实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
| func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) { ... if dl, ok := ctx.Deadline(); ok { timeout := time.Until(dl) headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)}) } ... return headerFields, nil }
可以看到客户端发起请求时,如果设置了带 timeout 的ctx,则会导致底层 HTTP2 HEADERS Frame 中追加 `grpc-timeout` 字段
|
服务端解析 timeout
服务端通过 Serve
方法启动 grpc Server,监听来自客户端连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| func (s *Server) Serve(lis net.Listener) error { ... for { rawConn, err := lis.Accept() ... s.serveWG.Add(1) go func() { s.handleRawConn(rawConn) s.serveWG.Done() }() } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| func (s *Server) handleRawConn(rawConn net.Conn) { ... st := s.newHTTP2Transport(conn, authInfo) go func() { s.serveStreams(st) s.removeConn(st) }() }
func (s *Server) serveStreams(st transport.ServerTransport) { defer st.Close() var wg sync.WaitGroup st.HandleStreams(func(stream *transport.Stream) { wg.Add(1) go func() { defer wg.Done() s.handleStream(st, stream, s.traceInfo(st, stream)) }() }, ...) wg.Wait() }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) { defer close(t.readerDone) for { t.controlBuf.throttle() frame, err := t.framer.fr.ReadFrame() ... switch frame := frame.(type) { case *http2.MetaHeadersFrame: if t.operateHeaders(frame, handle, traceCtx) { t.Close() break } case *http2.DataFrame: t.handleData(frame) ... } } }
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) { streamID := frame.Header().StreamID state := &decodeState{ serverSide: true, } if err := state.decodeHeader(frame); err != nil { if se, ok := status.FromError(err); ok { ... }
buf := newRecvBuffer() s := &Stream{ id: streamID, st: t, buf: buf, fc: &inFlow{limit: uint32(t.initialWindowSize)}, recvCompress: state.data.encoding, method: state.data.method, contentSubtype: state.data.contentSubtype, } ... if state.data.timeoutSet { s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout) } else { s.ctx, s.cancel = context.WithCancel(t.ctx) } ... t.controlBuf.put(®isterStream{ streamID: s.id, wq: s.wq, }) handle(s) return false }
|
decodeHeader
会遍历 frame 中所有 Fields,并调用 processHeaderField
对 HTTP2 HEADERS 帧中的特定的 Field 进行处理。
- 比如可以从
:path
中解析出包含 protobuf package、service name 和 RPC method name 的完整路径;
- 比如可以从
grpc-timeout
中解析出上游传递过来的 timeout;
decodeHeader
内部实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error { ... for _, hf := range frame.Fields { d.processHeaderField(hf) } }
func (d *decodeState) processHeaderField(f hpack.HeaderField) { switch f.Name { ... case "grpc-timeout": d.data.timeoutSet = true var err error if d.data.timeout, err = decodeTimeout(f.Value); err != nil { d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed time-out: %v", err) } ... case ":path": d.data.method = f.Value } }
|
至此可以看到,gRPC 框架确实是通过 HTTP2 HEADERS Frame 中的 grpc-timeout
字段来实现跨进程传递超时时间。
总结
- 客户端客户端发起 RPC 调用时传入了带 timeout 的 ctx
- gRPC 框架底层通过 HTTP2 协议发送 RPC 请求时,将 timeout 值写入到
grpc-timeout
HEADERS Frame 中
- 服务端接收 RPC 请求时,gRPC 框架底层解析 HTTP2 HEADERS 帧,读取
grpc-timeout
值,并覆盖透传到实际处理 RPC 请求的业务 gPRC Handle 中
- 如果此时服务端又发起对其他 gRPC 服务的调用,且使用的是透传的 ctx,这个 timeout 会减去在本进程中耗时,从而导致这个 timeout 传递到下一个 gRPC 服务端时变短,这样即实现了所谓的
超时传递
。目前这个功能测试发现在 grpc-go
和 grpc-java
中实现, grpc-python
貌似暂未实现此功能(见 https://github.com/grpc/grpc/issues/18358)。
作者
郑伟,小米信息技术部架构组
招聘
信息部是小米公司整体系统规划建设的核心部门,支撑公司国内外的线上线下销售服务体系、供应链体系、ERP 体系、内网 OA 体系、数据决策体系等精细化管控的执行落地工作,服务小米内部所有的业务部门以及 40 家生态链公司。
同时部门承担大数据基础平台研发和微服务体系建设落,语言涉及 Java、Go,长年虚位以待对大数据处理、大型电商后端系统、微服务落地有深入理解和实践的各路英雄。
欢迎投递简历:jin.zhang(a)xiaomi.com(武汉)