こんにちは、Wantedly の Infrastructure Team で Engineer をしている南(@south37)です。
今日は、WANTEDLY TECH BOOK 6 から「gRPC Internal」という章を抜粋して Blog にします。
「WANTEDLY TECH BOOK 1-7を一挙大公開」でも書いた通り、Wantedly では WANTEDLY TECH BOOK のうち最新版を除いた電子版を無料で配布する事にしました。Wantedly Engineer Blogでも過去記事の内容を順次公開予定であり、この Blog もその一環となっています。
次に、lib/echo_services_pb.rb の中身を見てみます。こちらには、echo service の定義が grpc gem が読み取れる形で記述されています。
リスト8.8 lib/echo_services_pb.rb
1# Generated by the protocol buffer compiler. DO NOT EDIT!2# Source: echo.proto for package 'echo'34require'grpc'5require'echo_pb'67moduleEcho8moduleEcho9classService1011includeGRPC::GenericService
1213self.marshal_class_method =:encode14self.unmarshal_class_method =:decode15self.service_name ='echo.Echo'1617 rpc :echo, EchoRequest, EchoResponse
18end1920 Stub = Service.rpc_stub_class
21end22end
Service class の中で rpc というメソッドが呼ばれていて、これによって「echo という RPC メソッドが EchoRequest を受け取り、EchoResponse を返す」という定義が行われます。
さらにその下では Stub という class が生成されています。これは、client が RPC のために利用する class です。詳しい利用方法は「3. server と client の実装を書いて動かす」で見てみたいと思います。
3. server と client の実装を書いて動かす
次に、gRPC の server と client の実装をそれぞれ書いてみたいと思います。
まずは server 実装です。コードは次のようになります。
リスト8.9 lib/echo_server.rb
1# Sample gRPC server that implements the Echo::Echo service.2#3# Usage: $ bundle exec ruby path/to/echo_server.rb45 this_dir =File.expand_path(File.dirname(__FILE__))6 lib_dir =File.join(this_dir,'lib')7$LOAD_PATH.unshift(lib_dir)unless$LOAD_PATH.include?(lib_dir)89require'logger'10require'grpc'11require'echo_services_pb'1213# EchoServer is simple server that implements the Echo::Echo service.14classEchoServer< Echo::Echo::Service
15 EchoLogger =Logger.new(STDOUT)1617defecho(echo_req, _unused_call)18 EchoLogger.info("echo \"#{echo_req.message}\"")19 Echo::EchoResponse.new(message: echo_req.message)20end21end2223# main starts an RpcServer that receives requests to EchoServer at the sample24# server port.25defmain26 s =GRPC::RpcServer.new27 s.add_http2_port('0.0.0.0:50051',:this_port_is_insecure)28 logger =Logger.new(STDOUT)29 logger.info("... running insecurely on 0.0.0.0:50051")30 s.handle(EchoServer)31# Runs the server with SIGHUP, SIGINT and SIGQUIT signal handlers to32# gracefully shutdown.33 s.run_till_terminated_or_interrupted(['SIGHUP','SIGINT','SIGQUIT'])34end3536 main
まず注目するのは 14 行目から 21 行目までの EchoServer class を定義している箇所です。この中で、echo という instance method として echo という RPC の振る舞いを定義しています。今回は単純な echo service なので、request の message を取り出してそれをそのまま response の message として返しています。実際には、ここで意味のある処理を行うことになります。たとえば、request parameter として id を受け取り、その id に紐づくリソースを取得して response として返す、などが分かりやすい例でしょう。このように、単純な method として RPC の実装を定義するというのが、gRPC の特徴の1つです。
23行目以降では、特定の port を listen する形で server を起動する処理が記述されています。これは、どちらかといえばボイラープレートに近いものでしょう。今回の例では、50051 port を listen して、さらに SIGHUP, SIGINT, SIGQUIT signall が送られたら gracefull shutdown する実装になっています。
このコードを次のコマンドで実行すると、gRPC server の process が立ち上がります。これで、gRPC の request を受け付ける準備ができました。
リスト8.10 gRPC server を起動
$ bundle exec ruby echo_server.rb
I, [2019-03-29T23:42:42.461343 #69099] INFO -- : ... running insecurely on 0.0.0.0:50051
これで、lib/echo_pb.rb が生成されました。この中で定義された Echo::EchoRequest という message class は、データを binary format へ serialize/deserialize す る為に利用することができます。このように、Protocol Buffers 単体でも gRPC と似たステップで簡単に利用することができます。
リスト8.15 lib/echo_pb.rb
# Generated by the protocol buffer compiler. DO NOT EDIT!# source: echo.protorequire'google/protobuf'
Google::Protobuf::DescriptorPool.generated_pool.build do
add_message "echo.EchoRequest"do
optional :message,:string,1endendmoduleEcho
EchoRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("echo.EchoRequest").msgclass
end
# lib/echo_services_pb.rb1# Generated by the protocol buffer compiler. DO NOT EDIT!2# Source: echo.proto for package 'echo'34require'grpc'5require'echo_pb'67moduleEcho8moduleEcho9classService1011includeGRPC::GenericService
1213self.marshal_class_method =:encode14self.unmarshal_class_method =:decode15self.service_name ='echo.Echo'1617 rpc :echo, EchoRequest, EchoResponse
18end1920 Stub = Service.rpc_stub_class
21end22end
リスト8.19 再掲:「gRPC の使い方」の lib/echo_client.rb
# lib/echo_client.rb12defmain13 stub = Echo::Echo::Stub.new('localhost:50051',:this_channel_is_insecure)14 m =ARGV.size >0?ARGV[0]:'hello'15 message = stub.echo(Echo::EchoRequest.new(message: m)).message
16 print "echo response: \"#{message}\"\n"17end1819 main
client からは、Service.rpc_stub_class method によって作られた Stub class が使われています。Service の中では、Service.rpc method によって echo という service method が定義されています。まずは、この Service.rpc と Service.rpc_stub_class に注目してみます。
Service class は GRPC::GenericService を include しており、.rpc と .rpc_stub_class もこの中で定義されています(厳密には、Dsl という module の中でこれらは定義されていて、include のタイミングで extend されています)。.rpc は次のような定義になっています。注目するのは 95 行目で、ここで name, input, output の情報を集約した RpcDesc という class の object を作り、rpc_descs という hash に格納しています。RpcDesc は重要な class で、client と server 両方の処理において利用されます。詳細はまた後で説明します。
リスト8.20 lib/grpc/generic/service.rb
79# Adds an RPC spec.80#81# Takes the RPC name and the classes representing the types to be82# serialized, and adds them to the including classes rpc_desc hash.83#84# input and output should both have the methods #marshal and #unmarshal85# that are responsible for writing and reading an object instance from a86# byte buffer respectively.87#88# @param name [String] the name of the rpc89# @param input [Object] the input parameter's class90# @param output [Object] the output parameter's class91defrpc(name, input, output)92 fail(DuplicateRpcName, name)if rpc_descs.key? name
93 assert_can_marshal(input)94 assert_can_marshal(output)95 rpc_descs[name]=RpcDesc.new(name, input, output,96 marshal_class_method,97 unmarshal_class_method)98define_method(GenericService.underscore(name.to_s).to_sym)do|*|99 fail GRPC::BadStatus.new_status_exception(100GRPC::Core::StatusCodes::UNIMPLEMENTED)101end102end
リスト8.21 lib/grpc/generic/service.rb
140# the RpcDescs defined for this GenericService, keyed by name.141defrpc_descs142@rpc_descs||={}143end
.rpc がやっていることはそれ以外には引数の値のチェックくらいで、特筆すべきものはありません。ただし、98 行目で define_method で「service method のデフォルト実装を与えていること」は知っておくと良いと思います。これは server 実装において利用されます。「gRPC の使い方」で説明した様に、gRPC の server を実装する際は、 GRPC::GenericService を include した Service class を継承して、server class を用意するのでした。その際は、上記のデフォルト実装を override する形で「service method の実装を行う」ことになっています。その為、たとえば .proto file に記述されているのにまだ実装されていない service method へ client から request した場合 には、100 行目のGRPC::Core::StatusCodes::UNIMPLEMENTED という error code が返されることになります。
リスト8.22 再掲:「gRPC の使い方」の lib/echo_server.rb
13# EchoServer is simple server that implements the Echo::Echo service.14classEchoServer< Echo::Echo::Service
15 EchoLogger =Logger.new(STDOUT)1617defecho(echo_req, _unused_call)# ここで `Echo::Echo::Service` で定義された `echo` method を override18 EchoLogger.info("echo \"#{echo_req.message}\"")19 Echo::EchoResponse.new(message: echo_req.message)20end21end
145# Creates a rpc client class with methods for accessing the methods146# currently in rpc_descs.147defrpc_stub_class148 descs = rpc_descs
149 route_prefix = service_name
150Class.new(ClientStub)do151# @param host [String] the host the stub connects to152# @param creds [Core::ChannelCredentials|Symbol] The channel153# credentials to use, or :this_channel_is_insecure otherwise154# @param kw [KeywordArgs] the channel arguments, plus any optional155# args for configuring the client's channel156definitialize(host, creds,**kw)157super(host, creds,**kw)158end159160# Used define_method to add a method for each rpc_desc. Each method161# calls the base class method for the given descriptor.162 descs.each_pair do|name, desc|163 mth_name = GenericService.underscore(name.to_s).to_sym
164 marshal = desc.marshal_proc
165 unmarshal = desc.unmarshal_proc(:output)166 route ="/#{route_prefix}/#{name}"167if desc.request_response?168define_method(mth_name)do|req, metadata ={}|169GRPC.logger.debug("calling #{@host}:#{route}")170 request_response(route, req, marshal, unmarshal, metadata)171end172elsif desc.client_streamer?173define_method(mth_name)do|reqs, metadata ={}|174GRPC.logger.debug("calling #{@host}:#{route}")175 client_streamer(route, reqs, marshal, unmarshal, metadata)176end177elsif desc.server_streamer?178define_method(mth_name)do|req, metadata ={},&blk|179GRPC.logger.debug("calling #{@host}:#{route}")180 server_streamer(route, req, marshal, unmarshal, metadata,&blk)181end182else# is a bidi_stream183define_method(mth_name)do|reqs, metadata ={},&blk|184GRPC.logger.debug("calling #{@host}:#{route}")185 bidi_streamer(route, reqs, marshal, unmarshal, metadata,&blk)186end187end188end189end190end
111# request_response sends a request to a GRPC server, and returns the112# response.113#114# == Flow Control ==115# This is a blocking call.116#117# * it does not return until a response is received.118#119# * the requests is sent only when GRPC cores flow control allows it to120# be sent.121#122# == Errors ==123# An RuntimeError is raised if124#125# * the server responds with a non-OK status126#127# * the deadline is exceeded128#129# == Return Value ==130#131# If return_op is false, the call returns the response132#133# If return_op is true, the call returns an Operation, calling execute134# on the Operation returns the response.135#136# @param method [String] the RPC method to call on the GRPC server137# @param req [Object] the request sent to the server138# @param marshal [Function] f(obj)->string that marshals requests139# @param unmarshal [Function] f(string)->obj that unmarshals responses140# @param deadline [Time] (optional) the time the request should complete141# @param return_op [true|false] return an Operation if true142# @param parent [Core::Call] a prior call whose reserved metadata143# will be propagated by this one.144# @param credentials [Core::CallCredentials] credentials to use when making145# the call146# @param metadata [Hash] metadata to be sent to the server147# @return [Object] the response received from the server148defrequest_response(method, req, marshal, unmarshal,149 deadline:nil,150 return_op:false,151 parent:nil,152 credentials:nil,153 metadata:{})154 c = new_active_call(method, marshal, unmarshal,155 deadline: deadline,156 parent: parent,157 credentials: credentials)158 interception_context =@interceptors.build_context
159 intercept_args ={160 method: method,161 request: req,162 call: c.interceptable,163 metadata: metadata
164}165if return_op
166# return the operation view of the active_call; define #execute as a167# new method for this instance that invokes #request_response.168 c.merge_metadata_to_send(metadata)169 op = c.operation
170 op.define_singleton_method(:execute)do171 interception_context.intercept!(:request_response, intercept_args)do172 c.request_response(req,metadata: metadata)173end174end175 op
176else177 interception_context.intercept!(:request_response, intercept_args)do178 c.request_response(req,metadata: metadata)179end180end181end
476# Creates a new active stub477#478# @param method [string] the method being called.479# @param marshal [Function] f(obj)->string that marshals requests480# @param unmarshal [Function] f(string)->obj that unmarshals responses481# @param parent [Grpc::Call] a parent call, available when calls are482# made from server483# @param credentials [Core::CallCredentials] credentials to use when making484# the call485defnew_active_call(method, marshal, unmarshal,486 deadline:nil,487 parent:nil,488 credentials:nil)489 deadline = from_relative_time(@timeout)if deadline.nil?490# Provide each new client call with its own completion queue491 call =@ch.create_call(parent,# parent call492@propagate_mask,# propagation options493 method,494nil,# host use nil,495 deadline)496 call.set_credentials! credentials unless credentials.nil?497ActiveCall.new(call, marshal, unmarshal, deadline,498 started:false)499end
Channel と Call という新しい 2 つの class が出てきました。実は、この2つは gRPC において非常に重要なものです。gRPC の C-core はその機能を C の struct およ び function として提供しているのですが、Channel と Call は C-core から提供される grpc_channel と grpc_call という C の struct をそれぞれラップしたものとなっています。その為、どちらも gem の native extention として定義されています。
61/** The Channel interface allows creation of Call objects. */62typedefstructgrpc_channel grpc_channel;...67/** A Call represents an RPC. When created, it is in a configuration state
68 allowing properties to be set until it is invoked. After invoke, the Call
69 can have messages written to it and read from it. */70typedefstructgrpc_call grpc_call;
343# request_response sends a request to a GRPC server, and returns the344# response.345#346# @param req [Object] the request sent to the server347# @param metadata [Hash] metadata to be sent to the server. If a value is348# a list, multiple metadata for its key are sent349# @return [Object] the response received from the server350defrequest_response(req,metadata:{})351 raise_error_if_already_executed
352 ops ={353SEND_MESSAGE=>@marshal.call(req),354SEND_CLOSE_FROM_CLIENT=>nil,355RECV_INITIAL_METADATA=>nil,356RECV_MESSAGE=>nil,357RECV_STATUS_ON_CLIENT=>nil358}359@send_initial_md_mutex.synchronize do360# Metadata might have already been sent if this is an operation view361unless@metadata_sent362 ops[SEND_INITIAL_METADATA]=@metadata_to_send.merge!(metadata)363end364@metadata_sent=true365end366367begin368 batch_result =@call.run_batch(ops)369# no need to check for cancellation after a CallError because this370# batch contains a RECV_STATUS op371ensure372 set_input_stream_done
373 set_output_stream_done
374end375376@call.metadata = batch_result.metadata
377 attach_status_results_and_complete_call(batch_result)378 get_message_from_batch_result(batch_result)379end
511 typedef enum {512/** Send initial metadata: one and only one instance MUST be sent foreach513 call,unless the call was cancelled -in which case this can be skipped.514 This op completes after all bytes of metadata have been accepted by
515 outgoing flow control.*/516GRPC_OP_SEND_INITIAL_METADATA=0,517/** Send a message:0or more of these operations can occur foreach call.518 This op completes after all bytes for the message have been accepted by
519 outgoing flow control.*/520GRPC_OP_SEND_MESSAGE,521/** Send a close from the client: one and only one instance MUST be sent from
522 the client,unless the call was cancelled -in which case this can be
523 skipped. This op completes after all bytes for the call
524(including the close) have passed outgoing flow control.*/525GRPC_OP_SEND_CLOSE_FROM_CLIENT,526/** Send status from the server: one and only one instance MUST be sent from
527 the server unless the call was cancelled -in which case this can be
528 skipped. This op completes after all bytes for the call
529(including the status) have passed outgoing flow control.*/530GRPC_OP_SEND_STATUS_FROM_SERVER,531/** Receive initial metadata: one and only one MUST be made on the client,532 must not be made on the server.533 This op completes after all initial metadata has been read from the
534 peer.*/535GRPC_OP_RECV_INITIAL_METADATA,536/** Receive a message:0or more of these operations can occur foreach call.537 This op completes after all bytes of the received message have been
538 read,or after a half-close has been received on this call.*/539GRPC_OP_RECV_MESSAGE,540/** Receive status on the client: one and only one must be made on the client.541 This operation always succeeds, meaning ops paired with this operation
542 will also appear to succeed, even though they may not have. In that case543 the status will indicate some failure.544 This op completes after all activity on the call has completed.*/545GRPC_OP_RECV_STATUS_ON_CLIENT,546/** Receive close on the server: one and only one must be made on the
547 server. This op completes after the close has been received by the
548 server. This operation always succeeds, meaning ops paired with
549 this operation will also appear to succeed, even though they may not550 have.*/551GRPC_OP_RECV_CLOSE_ON_SERVER552} grpc_op_type;
ops が送信される実態である Call#run_batch についても少し見ておきます。これは ext/grpc/rb_call.c の中で grpc_rb_call_run_batch という c function として定義されています。
795-811 行目のコメントで説明がある様に、これは operation を post して、その完了を待つものになっています。少々長いですが、注目するべき処理は 841行目の grpc_call_start_batch という function call と、852 行目の rb_completion_queue_pluck という function call だけです。grpc_call_start_batch は C-core によって提供 されている function で、ops の実行を開始します。rb_completion_queue_pluck は grpc gem の中で定義されている function ですが、内部では C-core が提供する grpc_completion_queue_pluck という function を呼び出しています。
ここで、completion queue というワードが新たに登場しました。これは、gRPC の C-core が結果を受け取るために用意しているインターフェースです。grpc_completion_queue という struct が queue として用意されていて、operation が完了するとその通知が queue に post されます。grpc_rb_call_run_batch は rb_completion_queue_pluck を呼び出して、ops 完了の待ち合わせを行なっています。そして、859 行目で ops の結果を result として受け取り、862 行目でその result を return しています。
リスト8.30 ext/grpc/rb_call.c
795/* call-seq:
796 ops = {
797 GRPC::Core::CallOps::SEND_INITIAL_METADATA => <op_value>,
798 GRPC::Core::CallOps::SEND_MESSAGE => <op_value>,
799 ...
800 }
801 tag = Object.new
802 timeout = 10
803 call.start_batch(tag, timeout, ops)
804
805 Start a batch of operations defined in the array ops; when complete, post a
806 completion of type 'tag' to the completion queue bound to the call.
807
808 Also waits for the batch to complete, until timeout is reached.
809 The order of ops specified in the batch has no significance.
810 Only one operation of each type can be active at once in any given
811 batch */812static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash){813 run_batch_stack* st =NULL;814 grpc_rb_call* call =NULL;815 grpc_event ev;816 grpc_call_error err;817 VALUE result = Qnil;818 VALUE rb_write_flag =rb_ivar_get(self, id_write_flag);819unsigned write_flag =0;820void* tag =(void*)&st;821822grpc_ruby_fork_guard();823if(RTYPEDDATA_DATA(self)==NULL){824rb_raise(grpc_rb_eCallError,"Cannot run batch on closed call");825return Qnil;826}827TypedData_Get_Struct(self, grpc_rb_call,&grpc_call_data_type, call);828829/* Validate the ops args, adding them to a ruby array */830if(TYPE(ops_hash)!= T_HASH){831rb_raise(rb_eTypeError,"call#run_batch: ops hash should be a hash");832return Qnil;833}834if(rb_write_flag != Qnil){835 write_flag =NUM2UINT(rb_write_flag);836}837 st =gpr_malloc(sizeof(run_batch_stack));838grpc_run_batch_stack_init(st, write_flag);839grpc_run_batch_stack_fill_ops(st, ops_hash);840841/* call grpc_call_start_batch, then wait for it to complete using
842 * pluck_event */843 err =grpc_call_start_batch(call->wrapped, st->ops, st->op_num, tag,NULL);844if(err != GRPC_CALL_OK){845grpc_run_batch_stack_cleanup(st);846gpr_free(st);847rb_raise(grpc_rb_eCallError,848"grpc_call_start_batch failed with %s (code=%d)",849grpc_call_error_detail_of(err), err);850return Qnil;851}852 ev =rb_completion_queue_pluck(call->queue, tag,853gpr_inf_future(GPR_CLOCK_REALTIME),NULL);854if(!ev.success){855rb_raise(grpc_rb_eCallError,"call#run_batch failed somehow");856}857/* Build and return the BatchResult struct result,
858 if there is an error, it's reflected in the status */859 result =grpc_run_batch_stack_build_result(st);860grpc_run_batch_stack_cleanup(st);861gpr_free(st);862return result;863}
262defget_message_from_batch_result(recv_message_batch_result)263unless recv_message_batch_result.nil?||264 recv_message_batch_result.message.nil?265return@unmarshal.call(recv_message_batch_result.message)266end267GRPC.logger.debug('found nil; the final response has been sent')268nil269end
14classEchoServer< Echo::Echo::Service
15 EchoLogger =Logger.new(STDOUT)1617defecho(echo_req, _unused_call)18 EchoLogger.info("echo \"#{echo_req.message}\"")19 Echo::EchoResponse.new(message: echo_req.message)20end21end2223# main starts an RpcServer that receives requests to EchoServer at the sample24# server port.25defmain26 s =GRPC::RpcServer.new27 s.add_http2_port('0.0.0.0:50051',:this_port_is_insecure)28 logger =Logger.new(STDOUT)29 logger.info("... running insecurely on 0.0.0.0:50051")30 s.handle(EchoServer)31# Runs the server with SIGHUP, SIGINT and SIGQUIT signal handlers to32# gracefully shutdown.33 s.run_till_terminated_or_interrupted(['SIGHUP','SIGINT','SIGQUIT'])34end3536 main
301# handle registration of classes302#303# service is either a class that includes GRPC::GenericService and whose304# #new function can be called without argument or any instance of such a305# class.306#307# E.g, after308#309# class Divider310# include GRPC::GenericService311# rpc :div DivArgs, DivReply # single request, single response312# def initialize(optional_arg='default option') # no args313# ...314# end315#316# srv = GRPC::RpcServer.new(...)317#318# # Either of these works319#320# srv.handle(Divider)321#322# # or323#324# srv.handle(Divider.new('replace optional arg'))325#326# It raises RuntimeError:327# - if service is not valid service class or object328# - its handler methods are already registered329# - if the server is already running330#331# @param service [Object|Class] a service class or object as described332# above333defhandle(service)334@run_mutex.synchronize do335unless@running_state==:not_started336 fail 'cannot add services if the server has been started'337end338 cls = service.is_a?(Class)? service : service.class339 assert_valid_service_class(cls)340 add_rpc_descs_for(service)341end342end
#add_rpc_descs_for は次のような実装になっています。まず注目するのは 537 行目と 541 行目で、ここで rps_specs へ RpcDesc class の object をセットしていま す。この RpcDesc class は、client 実装で登場したのと同じものです。
次に注目するのは 543-547 行目で、ここで service class に定義された method を handler として登録しています。
面白いのは method メソッドが利用されていることです。これは、「既存の object の method から Method class の object を作るメソッド」で、Method class の object は Proc などと同様に #call で内部の処理を呼び出すことができます。つまり、ここで handlers[route] に格納した処理が適切なタイミングで call されて、 そこで gRPC の利用者(開発者)が定義した service method が呼ばれる訳です。なお、#add_rpc_descs_for の 543-547 行目の処理から分かる様に、実は GRPC::RpcServer#handle には class では無く object を渡すこともできます。必要になった際にはこの機能を活用してみてください。
リスト8.35 lib/grpc/generic/rpc_server.rb
534# This should be called while holding @run_mutex535defadd_rpc_descs_for(service)536 cls = service.is_a?(Class)? service : service.class537 specs, handlers =(@rpc_descs||={}),(@rpc_handlers||={})538 cls.rpc_descs.each_pair do|name, spec|539 route ="/#{cls.service_name}/#{name}".to_sym
540 fail "already registered: rpc #{route} from #{spec}"if specs.key? route
541 specs[route]= spec
542 rpc_name = GenericService.underscore(name.to_s).to_sym
543if service.is_a?(Class)544 handlers[route]= cls.new.method(rpc_name)545else546 handlers[route]= service.method(rpc_name)547end548GRPC.logger.info("handling #{route} with #{handlers[route]}")549end550end
service class が handler に登録されるまでの処理はわかりました。次は、GRPC::RpcServer#s.run_till_terminated_or_interrupted で server を running にする処理を 見てみます。
GRPC::RpcServer#s.run_till_terminated_or_interrupted は次のような実装になっています。392-411 行目で signal handler を登録したあとで、413 行目の #run で server が running になります。
リスト8.37 lib/grpc/generic/rpc_server.rb
364# runs the server with signal handlers365# @param signals366# List of String, Integer or both representing signals that the user367# would like to send to the server for graceful shutdown368# @param wait_interval (optional)369# Integer seconds that user would like stop_server_thread to poll370# stop_server371defrun_till_terminated_or_interrupted(signals, wait_interval =60)372@stop_server=false373@stop_server_mu=Mutex.new374@stop_server_cv=ConditionVariable.new375376@stop_server_thread=Thread.newdo377 loop do378breakif@stop_server379@stop_server_mu.synchronize do380@stop_server_cv.wait(@stop_server_mu, wait_interval)381end382end383384# stop is surrounded by mutex, should handle multiple calls to stop385# correctly386 stop
387end388389 valid_signals = Signal.list
390391# register signal handlers392 signals.eachdo|sig|393# input validation394if sig.class==String395 sig.upcase!396if sig.start_with?('SIG')397# cut out the SIG prefix to see if valid signal398 sig = sig[3..-1]399end400end401402# register signal traps for all valid signals403if valid_signals.value?(sig)|| valid_signals.key?(sig)404 Signal.trap(sig)do405@stop_server=true406@stop_server_cv.broadcast
407end408else409 fail "#{sig} not a valid signal"410end411end412413 run
414415@stop_server_thread.join
416end
344# runs the server345#346# - if no rpc_descs are registered, this exits immediately, otherwise it347# continues running permanently and does not return until program exit.348#349# - #running? returns true after this is called, until #stop cause the350# the server to stop.351defrun352@run_mutex.synchronize do353 fail 'cannot run without registering services'if rpc_descs.size.zero?354@pool.start
355@server.start
356 transition_running_state(:running)357@run_cond.broadcast
358end359 loop_handle_server_calls
360end
448# handles calls to the server449defloop_handle_server_calls450 fail 'not started'if running_state ==:not_started451while running_state ==:running452begin453 an_rpc =@server.request_call
454breakif(!an_rpc.nil?)&& an_rpc.call.nil?455 active_call = new_active_server_call(an_rpc)456unless active_call.nil?457@pool.schedule(active_call)do|ac|458 c, mth = ac
459begin460 rpc_descs[mth].run_server_method(461 c,462 rpc_handlers[mth],463@interceptors.build_context
464)465rescue StandardError
466 c.send_status(GRPC::Core::StatusCodes::INTERNAL,467'Server handler failed')468end469end470end471rescue Core::CallError, RuntimeError => e
472# these might happen for various reasons. The correct behavior of473# the server is to log them and continue, if it's not shutting down.474if running_state ==:running475GRPC.logger.warn("server call failed: #{e}")476end477next478end479end480# @running_state should be :stopping here481@run_mutex.synchronize do482 transition_running_state(:stopped)483GRPC.logger.info("stopped: #{self}")484@server.close
485end486end
110##111# @param [GRPC::ActiveCall] active_call The current active call object112# for the request113# @param [Method] mth The current RPC method being called114# @param [GRPC::InterceptionContext] inter_ctx The interception context115# being executed116#117defrun_server_method(active_call, mth, inter_ctx =InterceptionContext.new)118# While a server method is running, it might be cancelled, its deadline119# might be reached, the handler could throw an unknown error, or a120# well-behaved handler could throw a StatusError.121if request_response?122 handle_request_response(active_call, mth, inter_ctx)123elsif client_streamer?124 handle_client_streamer(active_call, mth, inter_ctx)125elsif server_streamer?126 handle_server_streamer(active_call, mth, inter_ctx)127else# is a bidi_stream128 handle_bidi_streamer(active_call, mth, inter_ctx)129end130rescue BadStatus => e
131# this is raised by handlers that want GRPC to send an application error132# code and detail message and some additional app-specific metadata.133GRPC.logger.debug("app err:#{active_call}, status:#{e.code}:#{e.details}")134 send_status(active_call, e.code, e.details, e.metadata)135rescue Core::CallError => e
136# This is raised by GRPC internals but should rarely, if ever happen.137# Log it, but don't notify the other endpoint..138GRPC.logger.warn("failed call: #{active_call}\n#{e}")139rescue Core::OutOfTime
140# This is raised when active_call#method.call exceeds the deadline141# event. Send a status of deadline exceeded142GRPC.logger.warn("late call: #{active_call}")143 send_status(active_call,DEADLINE_EXCEEDED,'late')144rescue StandardError, NotImplementedError => e
145# This will usuaally be an unhandled error in the handling code.146# Send back a UNKNOWN status to the client147#148# Note: this intentionally does not map NotImplementedError to149# UNIMPLEMENTED because NotImplementedError is intended for low-level150# OS interaction (e.g. syscalls) not supported by the current OS.151GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")152GRPC.logger.warn(e)153 send_status(active_call,UNKNOWN,"#{e.class}: #{e.message}")154end
A channel filter defines how operations on a channel are implemented. Channel filters are chained together to create full channels, and if those chains are linear, then channel stacks provide a mechanism to minimize allocations for that chain. Call stacks are created by channel stacks and represent the per-call data for that stack.
gRPC の transport は、grpc_transport_vtable という struct の function としていくつかの function を実装している必要があります(この struct は function pointer を保持するコンテナとなっています)。もっとも重要なものは、perform_stream_op と呼ばれる function です。function signature は次のようなものなります。
transport 実装は、このそれぞれの stream operation を適切に処理することが求められます。
その他、次の function も transport では定義されます。また、cancellations などをうまく処理することも求められる様です。
perform_transport_op
init_stream
destroy_stream, destroy_transport
set_pollset, set_pollset_set, get_endpoint
ここまで、gRPC が transport 実装に求めるものについて概観しました。gRPC は transport 実装に期待する振る舞いを src/core/lib/transport/transport_impl.h の中で grpc_transport_vtable という struct として定義しており、その振る舞いを実装さえすれば transport 実装を差し替えることが可能です。
gRPC の pluggable な transport という特徴がどう実現されているのか、イメージが掴めたかと思います。