1
/
5

gRPC Internal - gRPC の設計と内部実装から見えてくる世界

こんにちは、Wantedly の Infrastructure Team で Engineer をしている南(@south37)です。

今日は、WANTEDLY TECH BOOK 6 から「gRPC Internal」という章を抜粋して Blog にします。

「WANTEDLY TECH BOOK 1-7を一挙大公開」でも書いた通り、Wantedly では WANTEDLY TECH BOOK のうち最新版を除いた電子版を無料で配布する事にしました。Wantedly Engineer Blogでも過去記事の内容を順次公開予定であり、この Blog もその一環となっています。

WANTEDLY TECH BOOK を一挙大公開 | Wantedly Engineer Blog
Wantedly における Go 導入にまつわる技術背景 | Wantedly Engineer Blog (本記事は Go Conference 2019 Autumn にて無料配布した冊子『WANTEDLY TECHBOOK GoCon Edition vol.2』からの掲載です) 配布した冊子の前半では Go の導入にあたってどのような工夫をしてきたのかを紹介しました。そこに書かれていたように、新しいプログラミング言語を導入するにはそれなりの整備コストがかかります。それではなぜそこまでして Go を
https://www.wantedly.com/companies/wantedly/post_articles/214809

以下、「WANTEDLY TECH BOOK 6 - gRPC Internal」の内容です。

はじめに

この章では gRPC Internal と題しまして、gRPC という RPC フレームワークをターゲットにその概要、使い方、そして内部実装について説明したいと思います。

どうして概要や使い方だけではなく、内部実装にまで踏み込むのでしょうか。その理由は2つあります。

1つめは、ツールやミドルウェア、フレームワークを適切に使うには、往々にして内部実装の知識が求められるからです。たとえば、RDB などが分かりやすい例でしょう。内部実装を理解し、その制約やパフォーマンス特性を把握していなければ、ツールやミドルウェア、フレームワークを効率的に使うことはできません。

2つめは、内部実装を知ることそのものがエンジニアとしての成長に繋がるからです。仕組みを知り、そこで活用されているテクニックや工夫を知ることは、自身の引き出しを増やし、エンジニアとしてのレベルを上げてくれます。

gRPC は、今や効率的な RPC フレームワークのデファクトスタンダードになりつつある存在です。この章を読むことで、gRPC を深く理解してすぐに使い始められる様になり、 さらにはエンジニアとしての成長にまでつながれば幸いです。

1. gRPC とは

まずはじめに、gRPC の概要を簡単に説明したいと思います。

gRPC とは、google が開発した high-performance な Remote Procedure Call(RPC)フレームワークです。OSS として提供されており、さまざまな言語、さまざまなプラットフォームで利用することができます。

gRPC は、client-server 間の通信に利用されます。client が特定の method を呼び出すと、それが server への request となり、server で処理された結果が response として返されて RPC の返り値になります。


[gRPC を利用した通信の模式図。gRPC Guide(12)より引用]

gRPC は次のような優れた特徴を持っています。

  1. Protocol Buffers[13] を Interface Definition Language(IDL)として利用して、service の定義を記述することができる。service 定義は、RPC に対する static な型付けとして機能する。Protocol Buffers はまた、効率的な binary format への encode/decode にも利用することができる。
  2. HTTP/2[14] を transport として利用することで、1 TCP connection 上での multiplexing や bidirectional streaming が可能。Header compression も含めて、効率的な通信が実現できる

これらの特徴はなぜ必要とされたのでしょうか。その由来は、gRPC が作られた背景にあると考えられます。gRPC 誕生の背景については、gRPC Motivation and Design Principles[2] に次のように記されています。

Google は長年、多数の Microservice 間の通信に Stubby と呼ばれる RPC infrastructure を利用してきた。(中略)プラットフォーム間で統一された RPC infrustarcture を利用することで、効率性、セキュリティ、信頼性、行動分析の大幅な改善が可能となり、その期間の信じられないほどの成長を支えた。
Stubby は優れた機能を多数備えていたが、internal な infrastructure と密結合しており public release には適さなかった。しかし、SPDY, HTTP/2, QUIC の登場により 、多くの同等の機能が public standard となり、さらにはStubby が提供してない機能までもが提供された。この標準化を利用し、その適用範囲をモバイル、IoT、およびクラウドのユースケースに拡張する為に Stubby を作り直す時期が来たことが明らかとなった。

つまり、Google の巨大な system を支える基盤となった Stubby という RPC infrastructure が、gRPC 誕生の背景には存在した訳です。「HTTP/2 を利用した効率的な transport」などが gRPC の特徴となっているのも納得でしょう。

gRPC はさまざまなユースケースが考えられますが、主要なユースケースの1つは Microservice における service 間の comunication です。Microservice における gRPC の利点について、少し掘り下げてみます。

Microservice における gRPC の利点

Microservice のサービス同士の communication には、さまざまな方法が考えられます。たとえば、traditional な HTTP/REST/JSON を利用するというのもその1つです。それらと比較して、gRPC にはどういった利点があるのでしょうか?

Microservice の communication が備えているべき特徴について、書籍「マイクロサービスアーキテクチャ」では次の4つが紹介されています(この本の中では、「Microservice の理想的な統合技術の特徴」として紹介されています)。

  1. 破壊的変更を回避できる
  2. API が技術非依存
  3. コンシューマにとって単純なサービス
  4. 内部の実装詳細を隠す

順に解説します。

「1. 破壊的変更を回避できる」というのは、client を変更せずに server が変更できることを指しています。典型的なケースの1つは、server が自身の response に新しく field を追加したい時、client に影響を与えずにそれができることです。「複数の microservice を同時に更新する(デプロイする)」ことは極めて困難である為、これは重要な特徴となります。

「2. API を技術非依存にする」というのは、API が microservice の実装を制限してはいけない、ということを指しています。たとえば、「特定の言語でしか利用出来ない RPC library」などがその典型例です。そういった RPC library を Microservice 間の通信に利用してしまうと、特定の言語から逃れられなくなります。

「3. コンシューマにとって単純なサービス」というのは、server との結合度が高まってしまうような client library を提供すべきではない、ということを指しています。たとえば、ドメインロジックが client library に入ってしまうと、ドメインロジック更新の際に server と全 client を同時に更新する必要が出てきてしまいます。

「4. 内部の実装詳細を隠す」というのは、server の内部実装を client に露出させるべきではない、ということを指しています。内部実装が露出してしまうと結合度が高まってしまい、変更のコストが増大してしまいます。

gRPC は、次のようにこれら4つの特徴を満たしています。

  • 「1. 破壊的変更を回避できる」について
    • gRPC は後方互換性のある schema 変更を support している
  • 「2. API を技術非依存にする」について
    • gRPC はさまざまな言語やプラットフォームで動作する様に実装されている
  • 「3. コンシューマにとって単純なサービス」について
    • gRPC が自動生成する client (stub) コードにはそもそもドメインロジックは入り込まない。もちろん、gRPC を内部で利用する client library を作ることもできるが、それをするかどうかは利用者側の責任。
  • 「4. 内部の実装詳細を隠す」について
    • gRPC はあくまで IDL としての Protocol Buffers で記述された response を返すだけであるため、その response がどう組み立てられているかは露出させない。

Microservice 開発において、gRPC は優れた技術選択となるでしょう。

「1. gRPC とは」まとめ

gRPC は 2015 年に OSS として公開されて以来、存在感を増し続けています。もちろん、世界的に Microservices Architecture が注目を集める中で重要度が上がっていることも一因でしょう。しかしそれだけではなく、さまざまな用途での利用も広がっています。たとえば Tensorflow の内部実装には gRPC が利用されています[4]。また、Google Cloud Platform(GCP)の多くのサービスの API が gRPC として提供されるようになっています[5]etcd など、gRPC で API を提供する middleware も登場しました[6]。2017 年には Cloud Native Computing Foundation(CNCF)が host する project の 1 つになりました[7]

gRPC は効率的な RPC フレームワークのデファクトスタンダードになりつつあるといっても、過言では無いでしょう。gRPC を理解し、使いこなせる様になっておきたいところです 。

2. gRPC の使い方

ここまで、gRPC の概要について説明しました。次に、gRPC の利用方法についても、実例とともに簡単に解説したいと思います。

なお、ここでは Ruby から gRPC を利用する方法を解説します。Ruby 以外の言語で gRPC を利用したい場合は、gRPC のドキュメント[16] を参照してみてください。

2-1. gRPC を Ruby から利用する

Ruby において、gRPC は gem として提供されています。その為、gem install で簡単に install することができます。

リスト8.1 grpc の install

gem install grpc

さらに、gRPC は protocol buffer compiler(protoc)の plugin を提供しており、Protocol Buffers を IDL として記述された service definition file(.proto file)から client と server のコードを自動生成します。この protoc やその plugin は grpc-tools という gem に bundle されており、grpc-tools を install することで 利用できます。

リスト8.2 grpc-tools の install

gem install grpc-tools

これで準備は整いました。次に、これらを利用して実際に gRPC を動かしてみましょう。

2-2. gRPC を動かす

ここからは、次の 3 ステップで gRPC による通信を試します。

  1. Service definition を記述する
  2. client と server コードを自動生成する
  3. server と client の実装を書いて動かす

なお、ここで出てくるコードは全て GitHub で https://github.com/south37/grpc-echo-example として公開しています[15]。気になる方はそちらも参照してみてください。

1. Service definition を記述する

gRPC を利用する際、最初にするのは Service definition を .proto file に記述することす。今回は、もっともシンプルな例として、 1 つの文字列を parameter として受け取って1つの文字列を返す単純な echo service を定義してみましょう。なお、Protocol Buffers の文法についても簡単に説明しますが、詳細は Protocol Buffers のドキュメントを参照してみてください[13]

まず、service Echo {…} という記述で service の interface を定義します。引数や返り値の型としては message 型を指定するのですが、それぞれ 1 つの string だけ をもつ EchoRequestEchoResponse にしています。echo service として、「1つの文字列を受け取って同じ文字列を返す」挙動を表現しています。

これは、次のような極めてシンプルな記述になります。

リスト8.3 protos/echo.proto に記述した echo service の service definition

syntax = "proto3";

package echo;

service Echo {
  rpc echo (EchoRequest) returns (EchoResponse) {}
}

message EchoRequest {
  string message = 1;
}

message EchoResponse {
  string message = 1;
}

2. client と server コードを自動生成する

次は client と server コードの自動生成です。次のコマンドを実行して、コードを自動生成します。

リスト8.4 protoc を利用してコードを自動生成する

$ grpc_tools_ruby_protoc -I ./protos --ruby_out=lib --grpc_out=lib ./protos/echo.proto

これで、lib 以下に echo_pb.rbecho_services_pb.rb というファイルが生成されます。

リスト8.5 自動生成されたファイル

$ ls -1 lib
echo_pb.rb
echo_services_pb.rb

次に、それぞれの中身を見てみましょう。

lib/echo_pb.rb の中には、message 型の定義が ruby の protocol buffer library(google-protobuf gem)が読み取れる形式で記述されています。

このファイルを require することで、Echo::EchoRequestEcho::EchoResponse という class が使えるようになります。

リスト8.6 lib/echo_pb.rb

  1 # Generated by the protocol buffer compiler.  DO NOT EDIT!
  2 # source: echo.proto
  3
  4 require 'google/protobuf'
  5
  6 Google::Protobuf::DescriptorPool.generated_pool.build do
  7   add_message "echo.EchoRequest" do
  8     optional :message, :string, 1
  9   end
 10   add_message "echo.EchoResponse" do
 11     optional :message, :string, 1
 12   end
 13 end
 14
 15 module Echo
 16   EchoRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("echo.EchoRequest").msgclass
 17   EchoResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("echo.EchoResponse").msgclass
 18 end

Echo::EchoRequestEcho::EchoResponse は message class と呼ばれ、Protocol Buffers の binary format への serialize/deserialize 機能をもつ class になっています。

次のように pry で挙動を確認すると、#to_proto を呼ぶことで binary format へ serialize できることが分かります。また、実は google-protobuf gem は JSON への serialize 機能も提供しており、#to_json で JSON が生成されることも確認できます。ここまでで、message class の振る舞いが確認できました。

なお、Protocol Buffers の binary format については、Encoding のドキュメントで解説されています[17]。気になる方はそちらも参照してみてください。

リスト8.7 Echo::EchoRequest の挙動

$ pry
[1] pry(main)> require './lib/echo_pb'
=> true
[2] pry(main)> Echo::EchoRequest
=> Echo::EchoRequest
[3] pry(main)> echo_req = Echo::EchoRequest.new(message: "hello")
=> <Echo::EchoRequest: message: "hello”>
[4] pry(main)> echo_req.to_proto
=> "\n\x05hello"
[5] pry(main)> echo_req.to_json
=> "{\"message\":\"hello\"}"

次に、lib/echo_services_pb.rb の中身を見てみます。こちらには、echo service の定義が grpc gem が読み取れる形で記述されています。

リスト8.8 lib/echo_services_pb.rb

  1 # Generated by the protocol buffer compiler.  DO NOT EDIT!
  2 # Source: echo.proto for package 'echo'
  3
  4 require 'grpc'
  5 require 'echo_pb'
  6
  7 module Echo
  8   module Echo
  9     class Service
 10
 11       include GRPC::GenericService
 12
 13       self.marshal_class_method = :encode
 14       self.unmarshal_class_method = :decode
 15       self.service_name = 'echo.Echo'
 16
 17       rpc :echo, EchoRequest, EchoResponse
 18     end
 19
 20     Stub = Service.rpc_stub_class
 21   end
 22 end

Service class の中で rpc というメソッドが呼ばれていて、これによって「echo という RPC メソッドが EchoRequest を受け取り、EchoResponse を返す」という定義が行われます。

さらにその下では Stub という class が生成されています。これは、client が RPC のために利用する class です。詳しい利用方法は「3. server と client の実装を書いて動かす」で見てみたいと思います。

3. server と client の実装を書いて動かす

次に、gRPC の server と client の実装をそれぞれ書いてみたいと思います。

まずは server 実装です。コードは次のようになります。

リスト8.9 lib/echo_server.rb

  1 # Sample gRPC server that implements the Echo::Echo service.
  2 #
  3 # Usage: $ bundle exec ruby path/to/echo_server.rb
  4
  5 this_dir = File.expand_path(File.dirname(__FILE__))
  6 lib_dir = File.join(this_dir, 'lib')
  7 $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
  8
  9 require 'logger'
 10 require 'grpc'
 11 require 'echo_services_pb'
 12
 13 # EchoServer is simple server that implements the Echo::Echo service.
 14 class EchoServer < Echo::Echo::Service
 15   EchoLogger = Logger.new(STDOUT)
 16
 17   def echo(echo_req, _unused_call)
 18     EchoLogger.info("echo \"#{echo_req.message}\"")
 19     Echo::EchoResponse.new(message: echo_req.message)
 20   end
 21 end
 22
 23 # main starts an RpcServer that receives requests to EchoServer at the sample
 24 # server port.
 25 def main
 26   s = GRPC::RpcServer.new
 27   s.add_http2_port('0.0.0.0:50051', :this_port_is_insecure)
 28   logger = Logger.new(STDOUT)
 29   logger.info("... running insecurely on 0.0.0.0:50051")
 30   s.handle(EchoServer)
 31   # Runs the server with SIGHUP, SIGINT and SIGQUIT signal handlers to
 32   # gracefully shutdown.
 33   s.run_till_terminated_or_interrupted(['SIGHUP', 'SIGINT', 'SIGQUIT'])
 34 end
 35
 36 main

まず注目するのは 14 行目から 21 行目までの EchoServer class を定義している箇所です。この中で、echo という instance method として echo という RPC の振る舞いを定義しています。今回は単純な echo service なので、request の message を取り出してそれをそのまま response の message として返しています。実際には、ここで意味のある処理を行うことになります。たとえば、request parameter として id を受け取り、その id に紐づくリソースを取得して response として返す、などが分かりやすい例でしょう。このように、単純な method として RPC の実装を定義するというのが、gRPC の特徴の1つです。

23行目以降では、特定の port を listen する形で server を起動する処理が記述されています。これは、どちらかといえばボイラープレートに近いものでしょう。今回の例では、50051 port を listen して、さらに SIGHUP, SIGINT, SIGQUIT signall が送られたら gracefull shutdown する実装になっています。

このコードを次のコマンドで実行すると、gRPC server の process が立ち上がります。これで、gRPC の request を受け付ける準備ができました。

リスト8.10 gRPC server を起動

$ bundle exec ruby echo_server.rb
I, [2019-03-29T23:42:42.461343 #69099]  INFO -- : ... running insecurely on 0.0.0.0:50051

次に、client 実装を見てみます。gRPC の request を送るコードは、次のようになります。

リスト8.11 lib/echo_client.rb

  1 # Sample app that connects to a Echo::Echo service.
  2 #
  3 # Usage: $ bundle exec ruby path/to/greeter_client.rb [message]
  4
  5 this_dir = File.expand_path(File.dirname(__FILE__))
  6 lib_dir = File.join(this_dir, 'lib')
  7 $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
  8
  9 require 'grpc'
 10 require 'echo_services_pb'
 11
 12 def main
 13   stub = Echo::Echo::Stub.new('localhost:50051', :this_channel_is_insecure)
 14   m = ARGV.size > 0 ?  ARGV[0] : 'hello'
 15   message = stub.echo(Echo::EchoRequest.new(message: m)).message
 16   print "echo response: \"#{message}\"\n"
 17 end
 18
 19 main

main の中で、自動生成した Echo::Echo::Stub を利用しています。これが RPC を行うための class になっており、Echo::Echo::Stub#echo を実行すると server へ gRPC の request が行われます。request parameter が Echo::EchoRequest, response が Echo::EchoResponse になっていて、response から message を取り出すことが できます。

次のように、新しく terminal を立ち上げて上記のコードを実行すると、echo service から response を受け取る様子が確認できます。

リスト8.12 gRPC server へ request を送る

$ bundle exec ruby echo_client.rb "Hello World"
echo response: "Hello World"

これで、gRPC を実際に動かしてみることができました!

2-3. gRPC の使い方まとめ

Ruby から gRPC を利用する方法を簡単に説明しました。gRPC が、想像以上に簡単に利用できることが分かったかと思います。

なお、今回説明したのは「1 つの request を送って 1 つの response を受け取る」というもっともシンプルなケースでした。ただし、gRPC は他にも「client から streaming で request message を送るケース」、「server から streaming で response message を受け取るケース」、「client から streaming で request message を送り、server から streaming で response message を受け取るケース」などがあります。それらの利用法については、gRPC のドキュメントを参照してみてください[18]

gRPC を単純に動かすだけなら、これまで見てきた様に簡単です。一方で、実際に production で利用しようと思うと、いくつかの問題について考える必要があります。その1つは gRPC の Load Balancing です。gRPC は client-server 間で connection を keep alive するので、HTTP/1.1 の通信とは違う考え方で Load Balancing する必要があります。これに関しては gRPC Blog がさまざまな方法をまとめているので、そちらを参照してみてください[19]

その他、gRPC を利用する上で有益な情報が gRPC Docs[16]gRPC Blog[20]awesome-grpc[21] などにまとまっています。本格的に利用する際は、それらを参照してみてく ださい。

3. gRPC を構成する重要技術: Protocol Buffers と HTTP/2

ここまで、gRPC の概要および使い方について解説しました。次は、この章の主題である「gRPC の設計および内部実装」にいきたいところですが、その前にここまでの説明で何度も登場した Protocol Buffers と HTTP/2 という2つの重要技術について、簡単に説明しておきたいと思います。

3-1. Protocol Buffers

まずは Protocol Buffers についてです。Protocol Buffers は端的にいえば、言語+コンパイラ+ライブラリの3つの要素から構成される serialization mechanism です。Protocol Buffers は独自の Inteface Definition Language(IDL)を持っており、その言語を利用して構造化データを message 型として記述すると、コンパイラ(protoc)が各プログラミング言語向けの実装を生成してくれます。その自動生成されたコードを Protocol Buffers ライブラリとともに利用することで、効率的な binary format への serialize/deserialize が行えます。

第2節の「gRPC の使い方」の中でも Protocol Buffers の利用方法について触れましたが、簡単なコード例を再度載せておきます。まず、message 型を .proto file の中で定義します。

リスト8.13 protos/echo.proto に message 型の定義を記述する

syntax = "proto3";

package echo;

message EchoRequest {
  string message = 1;
}

Protocol Buffers のコンパイラ(protoc)を利用して、次のコマンドでコードを自動生成します。

リスト8.14 protoc コマンドでコードを自動生成する

$ protoc -I ./protos --ruby_out=lib ./protos/echo.proto

これで、lib/echo_pb.rb が生成されました。この中で定義された Echo::EchoRequest という message class は、データを binary format へ serialize/deserialize す る為に利用することができます。このように、Protocol Buffers 単体でも gRPC と似たステップで簡単に利用することができます。

リスト8.15 lib/echo_pb.rb

# Generated by the protocol buffer compiler.  DO NOT EDIT!
# source: echo.proto

require 'google/protobuf'

Google::Protobuf::DescriptorPool.generated_pool.build do
  add_message "echo.EchoRequest" do
    optional :message, :string, 1
  end
end

module Echo
  EchoRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("echo.EchoRequest").msgclass
end

Protocol Buffers は2つの側面から優れたものだといえます。1つ目は、言語(protocol buffer language)とコンパイラ(protoc)を持っており、さまざまなものを自動生成できること、またその機能をプラグインで自由に拡張できるということです。

protocol buffer language はデータの format を必要最低限の簡潔な記述で .proto file として表現することができます。そして、protoc は .profo file からさまざまなものを自動生成することができます。protoc の拡張性はプラグイン機構として実現されており[22]、たとえば gRPC のコード生成機能は protoc のプラグインとして提供されています[23][24]。

gRPC の例を見てみましょう。第 2 節でコード生成の際に実行したコマンドは次のようなものでした。

リスト8.16 再掲: protoc を利用してコードを自動生成する

$ grpc_tools_ruby_protoc -I ./protos --ruby_out=lib --grpc_out=lib ./protos/echo.proto

これは、内部では次のようなコマンドとして実行されます。

リスト8.17 実際に実行される protoc コマンド

$ protoc --plugin=protoc-gen-grpc=/path/to/grpc_ruby_plugin -I ./protos --ruby_out=lib --grpc_out=lib ./protos/echo.proto

このコマンドの中で、--grpc_out=lib と書いている箇所が gRPC plugin の利用を意味しています。protoc は、--XXX_out=YYY というオプションを指定されると、protoc-gen-XXX という実行ファイルを探してそれをプラグインとみなして呼び出します。この例では、protoc-gen-grpc が探索されます。通常は $PATH から探索されますが、--plugin というオプションを渡すことでプラグインの path を指定することもできます。grpc_tools_ruby_protoc コマンドはこの path 指定機能を利用していることが分かります。

protoc-gen-grpc プラグインは、.proto file の中の service definition から XXX_services_pb.rb というファイルを生成します。これが、gRPC を利用する際に XXX_pb.rbXXX_services_pb.rb という2つのファイルが生成された理由です。protoc の「message class の実装を生成する」機能に加えて、「service class の実装を生成する」機能がプラグインとして追加されていたわけです。

protoc のプラグインとしては、gRPC 以外にもさまざまなものが存在しています。swagger を生成する grpc-gen-swagger や[25]gRPC server に対して RESTful JSON API の proxy を生成する grpc-gateway などもその1つです[26]。プラグインの形でさまざまな機能を提供できることが、Protocol Buffers の強力な機能の1つといえます。

Protocol Buffers の2つ目の優れている点は、その binary format です。Protocol Buffer の binary format は、データサイズが小さく、serialize/deserialize が早く、データの後方互換性が保ちやすいという優れた特徴を持っています。Protocol Buffers のドキュメントでは、XML と比べて3-10分の1のデータサイズで、20-100倍高速に処理できると説明されています[27]。また、Protocol Buffers は Google の Lingua Franca(共通言語)であるといわれています。Google の内部の RPC は Protocol Buffers 形式でやり取りをされ、ストレージにも Prrotocol Buffers 形式でデータが保存され、データ解析パイプラインや mobile client との通信も Protocol Buffers 形式で行われているそうです[28]。Google がこれだけ活用していることからも、Protocol Buffers が優れた binary format であることが分かります。

以上、Protocol Buffers について簡単に説明しました。gRPC にとって、Protocol Buffers は言語やコンパイラを提供するものであり、さらにその binary format はデフォルトの serialization/deserialization format となっています。gRPC とは切っても切り離せない重要技術であることが分かります。

3-2. HTTP/2

次に、HTTP/2 についてです。HTTP/2 は、HTTP プロトコルのメジャーバージョンの1つです。元々 Google で開発された SPDY という protocol が元になって標準化されており、HTTP/1.1 の semantics は残しつつ transport が大幅に改善されています。

ここでは、簡単に HTTP/2 の機能について説明したいと思います。より詳細が知りたい方は、HTTP/2 の RFC[29]Illya Grigorik と Surma による Introduction to HTTP/2 という解説記事[30] を参照してみてください。

HTTP/2 の主要な機能は次のようなものです。

  1. 多重化
  2. ヘッダ圧縮
  3. 優先度制御
  4. サーバープッシュ

これらの中でも、特に gRPC からは transport を効率化する機能である「1. 多重化」と「2. ヘッダ圧縮」が大いに活用されています。この2つに絞って説明します。

まずは多重化についてです。これは、HTTP/2 では 1 TCP connection 上で複数の request - response を並列して行うことができることを意味しています。元々 HTTP/1.1 は 多重化の機能を持っておらず、request - response は sequential に行う以外の方法がありませんでした。これは request の並列化に並列数と同じだけの TCP connection を必要とするなど、いくつかの問題を抱えていました。HTTP/2 はこれらの問題を解決しています。

多重化は、stream という仕組みによって実現されています。HTTP/2 においては、1つの TCP connection 内には複数の stream が存在し、それぞれの stream 内では request-response が frame という単位に分割して送信されます。stream は unique な id を持っており、frame はどの stream に所属しているかを stream id として保持していま す。そのため、1 TCP connection 上で複数の stream の frame が混在して送信されても、きちんと stream ごとに処理を進めることができる様になっています。

このような機構によって、多重化は実現されています。

[HTTP/2 の Stream と Frame の模式図の模式図。Introduction to HTTP/2(30) より引用]


[HTTP/2 において、さまざまな Stream の Frame が 1 つの TCP Connection 上で送信される様子。Introduction to HTTP/2(30) より引用]

次にヘッダ圧縮についてです。元々 HTTP/1.1 では、HTTP ヘッダは plain text として送信されていました。これは、request ごとに 500-800 byte の overhead となっており、パフォーマンスやリソース使用量の観点から無視できないものでした。HTTP/2 は HPACK と呼ばれる圧縮アルゴリズムでヘッダを圧縮することで、この問題を解決しています。HPACK は Static Huffman Encoding を利用してヘッダを encode することで、送信量を減らします。さらに、一度送信した header のリストを client と server が共有し、 それを参照して更なる圧縮を行います。HPACK について詳しく知りたい方は、RFC も参照してみてください[31]。

以上、HTTP/2 とその機能の一部である「多重化」や「ヘッダ圧縮」について説明しました。gRPC は、これらの HTTP/2 の機能を利用して、双方向 streaming などを実現して います。HTTP/2 を理解することで、gRPC の効率的な通信がどう実現されているのか、理解が深まると思います。

4. gRPC の設計と内部実装

ここまで、gRPC の概要および使い方、さらには gRPC を構成する重要技術である Protocol Buffers と HTTP/2 についてまで説明してきました。次はいよいよ、gRPC の設計および内部実装に注目してみたいと思います。

4-1. gRPC の設計原則

まず、gRPC の設計原則について簡単にご紹介します。gRPC Blog の gRPC Motivation and Design Principles [2] という記事では、gRPC の設計原則として次の 16 項目が紹介されています。

  1. Services not Objects, Messages not References
  2. Coverage & Simplicity
  3. Free & Open
  4. Interoperability & Reach
  5. General Purpose & Performant
  6. Layered
  7. Payload Agnostic
  8. Streaming
  9. Blocking & Non-Blocking
  10. Cancellation & Timeout
  11. Lameducking
  12. Flow-Control
  13. Pluggable
  14. Extensions as APIs
  15. Metadata Exchange
  16. Standardized Status Codes

重要だと思われる一部を抜粋して、簡単に解説します。

  • 「1. Services not Objects, Messages not References」というのは、分散システム内の通信をどう実現させるかの方針を示しています。通信方法として、世の中には分散 object などのアプローチも存在しますが、そうではなく RPC として「システム間で粒度の粗い message 交換を行う」方針をとることを示しています。この部分の説明には分散 object の落とし穴[9] という Matin Fowler 氏のブログポストがリンクされています。気になる方はそちらも参照してみてください。
  • 「2. Coverage & Simplicy」は広いプラットフォームで利用できるということを表しています。また、「3. Fee & Open」は、オープンソースにすることで全ての人が自由に 使えるということを表しています。
  • 「4. Interoperability & Reach」は、一般的な Internet インフラ上で通信可能な protocol であるということを表しています。これは、internal な cluster 内の通信だけでなく、internet 上の通信にも gRPC を使える様にするという方針の表れでしょう。
  • 「5. General Purpose & Performant」は、幅広いユースケースに適用できるものでありながら、パフォーマンスの犠牲がほとんど無いものにするという指針を示しています 。
  • 「6. Layered」は gRPC を構成する技術スタックがそれぞれ独立して変更できる設計にするという指針を示しています。一例として、通信の際の data format の変更は、application layer の binding に影響を与えずにできるべき、という例が紹介されています。
  • 「7. Payload Agnostic」は、Protocol Buffers, JSON, XML などさまざまな encoding を使える様にするという指針を示しています。同様に、payload の圧縮機構も pluggable であるべきと述べられています。
  • 「8. Streaming」や「12. Flow-Control」は、streaming でデータをやり取りしつつ、フロー制御で流量の調節を行うことを表しています。大きなデータをやり取りしたい時や、時間軸で変化する message sequence を表現したいときに、これらは有用です。
  • 「9. Blocking & Non-Blocking」は、server-client 間の message 交換の方法として同期処理と非同期処理の両方をサポートするということを表しています。
  • 「10. Cancellation & Timeout」は、cancelation や timeout を可能にするという指針を示しています。
  • 「11. Lameducking」は、server が graceful shutdown できるということを示しています。
  • 「13. Pluggable」は、セキュリティ、ヘルスチェック、負荷分散、フェイルオーバー、監視、トレース、ロギングなどさまざまな機能をプラグインとして追加できる設計であるべきという指針を示しています。
  • 「16. Standardized Status Codes」は、status code で error の種類が大別できることを表しています。domain specific な詳細は、メタデータとして表現されます。

これらの設計指針には、これまでに「1. gRPC とは」で述べた「多数の言語から multi platform で利用可能」、「HTTP/2 を利用した効率的な transport」という特徴が表れています。一方で、「6. Layerd」、「7. Payload Agnostic」、「13. Pluggable」などはこれまでには注目して来なかった部分です。gRPC は技術スタックを意識した作りになっており、それぞれのスタック(レイヤー)が疎結合かつ Pluggable になっていることが分かります

実装を追いながら、これらの設計指針がどのように実現されているのかを見てみましょう。

4-2. gRPC の言語ごとの実装について

gRPC は、数多くのプログラミング言語から利用できるようになっています。実は、プログラミング言語ごとに、gRPC 実装自体は異なるものになっています。

https://github.com/grpc/ という grpc の Github organization の repository 一覧を見ると、grpc という prefix をもつ repository がいくつも存在することがわかりま す。2019年3月時点では次の 9 つが存在します。それぞれ、「どの言語向けの実装を提供しているのか」が違います。

これらのうち、grpc (C-core), grpc-java, grpc-go の 3 つがメインの実装として gRPC Blog では紹介されています[8]。この 3 つの実装について概観してみましょう。

grpc (C-core)

まず、grpc (C-core) について。これは、C++, Python, Ruby, Objective-C, PHP, C# という 6 つのプログラミング言語向けの実装を提供しています。設計としては、次の図に示したように、「C 言語で書かれた gRPC C Surface and C/C++ Core」がさまざまな言語の library で wrap され、利用できるようになっています。

[grpc (C-core) のスタックの模式図。Visualizing gRPC Language Stacks(8) より引用]

たとえば、Python アプリケーションから利用するケースを考えてみましょう。Python アプリケーションからは、Python 向けに gRPC の CLI が自動生成した client (Stub) コードを利用して RPC を行います。この call は interceptor を経由して Python library (wrapping library) に到達し、ここで「C Surface and C/C++ Core」の C call に変換されます。C-Core は RPC を HTTP/2 に encode し、さらに場合によっては TLS で暗号化し、ネットワークソケットに書き込みます。

上図から分かる様に、gRPC はスタックのさまざまな要素が差し替え可能になっています。たとえば、プログラミング言語として C# を利用し、transport として HTTP/2 ではなく In Process を利用するということができます。In Process を利用することで、同一 process 内であればより効率的な通信を行うことができます。あるいは、transport として Cronet と呼ばれる chromium の network library を利用することもできます。Cronet は 内部で QUIC という効率的な network protocol を利用できます。gRPC は、環境に合わせてさまざまな transport 実装を差し替えられる flexibility を持っているといえます。

grpc-go

次に、grpc-go について。これは、go 向けの実装を提供しています。C-core と比較すると、support する言語が1つだけである為にかなり simple な作りになっています。また、transport も HTTP/2 だけを support している様です。

[grpc-go のスタックの模式図。Visualizing gRPC Language Stacks(8) より引用]

grpc-java

次に、grpc-java について。これは、java 向けの実装を提供しています。次のようなスタックになっています。

[grpc-java のスタックの模式図。Visualizing gRPC Language Stacks(8) より引用]

grpc-java は、C-core と同様に HTTP/2, QUIC, In Process の3つの transport 実装をサポートしています。C-core との大きな違いとしては、Application が gRPC の生成する client (Stub) コードや Interceptor を経由せずに、直接 gRPC Java Core library を利用することができる様になっています。grpc-java はまた、HTTP/2 実装自体も pluggable な library として切り出しているそうです[8]

gRPC の言語ごとの実装まとめ

ここまでに見てきた様に、gRPC は言語ごとに必要に合わせて別の実装が存在します。この設計は、言語ごとの機能差を生み出してしまいやすいというデメリットもありますが 、一方で「肥大化した共通ライブラリ」を持たず迅速に開発を進めやすい、また言語ごとに言語の機能を活かした実装を行いやすいというメリットがあります。おそらくこれらのメリットを活かすために、こういった「言語ごとに実装する」という戦略を取っているのだと思います。

また、grpc (C-core) や grpc-java では HTTP/2, Cronet, In Process など複数の transport 実装から必要なものを選べる様になっていることも印象的でした。この特徴は、gRPC の 16 の設計原則のうち「6. Layerd」などの現れでしょう。設計原則を念頭において、実装が進められていることが分かります。

4-3. gRPCの Ruby 実装を読む

gRPC に対して更なる理解を深める為に、実際にコードを読んでみましょう。ここでは、grpc (C-core) の v1.19.1 を対象に、Ruby の grpc gem 実装のコードリーディングを行います。その中では C-core の実装が利用されているので、必要に応じて C-core の説明も行います。

4-3-1. gRPC client の実装コードを読む

まずは、gRPC client の実装がどう行われているか、コードを読んでみましょう。入り口としては、「2. gRPC の使い方」で出てきたコードから始めるのが良さそうです。

リスト8.18 再掲:「gRPC の使い方」の lib/echo_services_pb.rb

# lib/echo_services_pb.rb
  1 # Generated by the protocol buffer compiler.  DO NOT EDIT!
  2 # Source: echo.proto for package 'echo'
  3
  4 require 'grpc'
  5 require 'echo_pb'
  6
  7 module Echo
  8   module Echo
  9     class Service
 10
 11       include GRPC::GenericService
 12
 13       self.marshal_class_method = :encode
 14       self.unmarshal_class_method = :decode
 15       self.service_name = 'echo.Echo'
 16
 17       rpc :echo, EchoRequest, EchoResponse
 18     end
 19
 20     Stub = Service.rpc_stub_class
 21   end
 22 end

リスト8.19 再掲:「gRPC の使い方」の lib/echo_client.rb

# lib/echo_client.rb
 12 def main
 13   stub = Echo::Echo::Stub.new('localhost:50051', :this_channel_is_insecure)
 14   m = ARGV.size > 0 ?  ARGV[0] : 'hello'
 15   message = stub.echo(Echo::EchoRequest.new(message: m)).message
 16   print "echo response: \"#{message}\"\n"
 17 end
 18
 19 main

client からは、Service.rpc_stub_class method によって作られた Stub class が使われています。Service の中では、Service.rpc method によって echo という service method が定義されています。まずは、この Service.rpcService.rpc_stub_class に注目してみます。

Service class は GRPC::GenericService を include しており、.rpc.rpc_stub_class もこの中で定義されています(厳密には、Dsl という module の中でこれらは定義されていて、include のタイミングで extend されています)。.rpc は次のような定義になっています。注目するのは 95 行目で、ここで name, input, output の情報を集約した RpcDesc という class の object を作り、rpc_descs という hash に格納しています。RpcDesc は重要な class で、client と server 両方の処理において利用されます。詳細はまた後で説明します。

リスト8.20 lib/grpc/generic/service.rb

 79       # Adds an RPC spec.
 80       #
 81       # Takes the RPC name and the classes representing the types to be
 82       # serialized, and adds them to the including classes rpc_desc hash.
 83       #
 84       # input and output should both have the methods #marshal and #unmarshal
 85       # that are responsible for writing and reading an object instance from a
 86       # byte buffer respectively.
 87       #
 88       # @param name [String] the name of the rpc
 89       # @param input [Object] the input parameter's class
 90       # @param output [Object] the output parameter's class
 91       def rpc(name, input, output)
 92         fail(DuplicateRpcName, name) if rpc_descs.key? name
 93         assert_can_marshal(input)
 94         assert_can_marshal(output)
 95         rpc_descs[name] = RpcDesc.new(name, input, output,
 96                                       marshal_class_method,
 97                                       unmarshal_class_method)
 98         define_method(GenericService.underscore(name.to_s).to_sym) do |*|
 99           fail GRPC::BadStatus.new_status_exception(
100             GRPC::Core::StatusCodes::UNIMPLEMENTED)
101         end
102       end

リスト8.21 lib/grpc/generic/service.rb

140       # the RpcDescs defined for this GenericService, keyed by name.
141       def rpc_descs
142         @rpc_descs ||= {}
143       end

.rpc がやっていることはそれ以外には引数の値のチェックくらいで、特筆すべきものはありません。ただし、98 行目で define_method で「service method のデフォルト実装を与えていること」は知っておくと良いと思います。これは server 実装において利用されます。「gRPC の使い方」で説明した様に、gRPC の server を実装する際は、 GRPC::GenericService を include した Service class を継承して、server class を用意するのでした。その際は、上記のデフォルト実装を override する形で「service method の実装を行う」ことになっています。その為、たとえば .proto file に記述されているのにまだ実装されていない service method へ client から request した場合 には、100 行目のGRPC::Core::StatusCodes::UNIMPLEMENTED という error code が返されることになります。

リスト8.22 再掲:「gRPC の使い方」の lib/echo_server.rb

 13 # EchoServer is simple server that implements the Echo::Echo service.
 14 class EchoServer < Echo::Echo::Service
 15   EchoLogger = Logger.new(STDOUT)
 16
 17   def echo(echo_req, _unused_call)  # ここで `Echo::Echo::Service` で定義された `echo` method を override
 18     EchoLogger.info("echo \"#{echo_req.message}\"")
 19     Echo::EchoResponse.new(message: echo_req.message)
 20   end
 21 end

次に、.rpc_stub_class の実装を見てみます。コードは長いですが、よく見るとやっていることはシンプルです。ClientStub を継承した class を 150 行目の Class.new で作成し、その中で 160 行目以降で上記で説明した rpc_descs の内容をもとに instance method を定義して、最後にその class を返しています。ここで定義される instance method が、RPC の request を送る為に呼び出される method です。

リスト8.23 lib/grpc/generic/service.rb

145       # Creates a rpc client class with methods for accessing the methods
146       # currently in rpc_descs.
147       def rpc_stub_class
148         descs = rpc_descs
149         route_prefix = service_name
150         Class.new(ClientStub) do
151           # @param host [String] the host the stub connects to
152           # @param creds [Core::ChannelCredentials|Symbol] The channel
153           #     credentials to use, or :this_channel_is_insecure otherwise
154           # @param kw [KeywordArgs] the channel arguments, plus any optional
155           #                         args for configuring the client's channel
156           def initialize(host, creds, **kw)
157             super(host, creds, **kw)
158           end
159
160           # Used define_method to add a method for each rpc_desc.  Each method
161           # calls the base class method for the given descriptor.
162           descs.each_pair do |name, desc|
163             mth_name = GenericService.underscore(name.to_s).to_sym
164             marshal = desc.marshal_proc
165             unmarshal = desc.unmarshal_proc(:output)
166             route = "/#{route_prefix}/#{name}"
167             if desc.request_response?
168               define_method(mth_name) do |req, metadata = {}|
169                 GRPC.logger.debug("calling #{@host}:#{route}")
170                 request_response(route, req, marshal, unmarshal, metadata)
171               end
172             elsif desc.client_streamer?
173               define_method(mth_name) do |reqs, metadata = {}|
174                 GRPC.logger.debug("calling #{@host}:#{route}")
175                 client_streamer(route, reqs, marshal, unmarshal, metadata)
176               end
177             elsif desc.server_streamer?
178               define_method(mth_name) do |req, metadata = {}, &blk|
179                 GRPC.logger.debug("calling #{@host}:#{route}")
180                 server_streamer(route, req, marshal, unmarshal, metadata, &blk)
181               end
182             else  # is a bidi_stream
183               define_method(mth_name) do |reqs, metadata = {}, &blk|
184                 GRPC.logger.debug("calling #{@host}:#{route}")
185                 bidi_streamer(route, reqs, marshal, unmarshal, metadata, &blk)
186               end
187             end
188           end
189         end
190       end

instance method の内容は、RpcDesc の種類に応じて異なります。このチェックは RpcDesc#request_response? などのメソッドで行われていますが、端的にいうと「request と response がそれぞれ streaming かどうか」で 2 x 2 = 4 種類あり、そのうちのどれなのかで分岐しています。このチェックのメソッドは lib/grpc/generic/rpc_desc.rb の 175-189 行目で定義されています。

リスト8.24 lib/grpc/generic/rpc_desc.rb

175     def request_response?
176       !input.is_a?(Stream) && !output.is_a?(Stream)
177     end
178
179     def client_streamer?
180       input.is_a?(Stream) && !output.is_a?(Stream)
181     end
182
183     def server_streamer?
184       !input.is_a?(Stream) && output.is_a?(Stream)
185     end
186
187     def bidi_streamer?
188       input.is_a?(Stream) && output.is_a?(Stream)
189     end

もっとも単純な例である「request, response 共に単一の message 型であるケース(request, response ともに stream でないケース)」をみてみましょう。これは、lib/grpc/generic/service.rb 167行目の desc.request_response? が true を返すケースで、168-171行目がここで定義される instance method の実装になっています。この中では、シンプルに request_response という method が呼び出されます。これは、ClientStub class の中で定義されています。

ClientStub#request_response は、次のような実装になっています。この中では、まず 154 行目で #new_active_call メソッドが呼び出されて ActiveCall class のオ ブジェクトが生成されます。この ActiveCall は、RPC の request を送信する為に利用されるオブジェクトです。詳細はこの後で説明します。gRPC は interceptor と呼ばれ る RPC をフックする機構を持っており、158行目以降で interceptor による処理が行われた上で、最終的には ActiveCall#request_response が呼び出されます。なお、165 行目でチェックしている return_op は「返り値を object ではなく Operation として受け取るかどうか」を表すフラグです。デフォルトでは false なので、ここでは false の場合のコードだけを読むことにします。その場合には、 ActiveCall#request_response の結果が response として返されます。

リスト8.25 lib/grpc/generic/client_stub.rb

111     # request_response sends a request to a GRPC server, and returns the
112     # response.
113     #
114     # == Flow Control ==
115     # This is a blocking call.
116     #
117     # * it does not return until a response is received.
118     #
119     # * the requests is sent only when GRPC cores flow control allows it to
120     #   be sent.
121     #
122     # == Errors ==
123     # An RuntimeError is raised if
124     #
125     # * the server responds with a non-OK status
126     #
127     # * the deadline is exceeded
128     #
129     # == Return Value ==
130     #
131     # If return_op is false, the call returns the response
132     #
133     # If return_op is true, the call returns an Operation, calling execute
134     # on the Operation returns the response.
135     #
136     # @param method [String] the RPC method to call on the GRPC server
137     # @param req [Object] the request sent to the server
138     # @param marshal [Function] f(obj)->string that marshals requests
139     # @param unmarshal [Function] f(string)->obj that unmarshals responses
140     # @param deadline [Time] (optional) the time the request should complete
141     # @param return_op [true|false] return an Operation if true
142     # @param parent [Core::Call] a prior call whose reserved metadata
143     #   will be propagated by this one.
144     # @param credentials [Core::CallCredentials] credentials to use when making
145     #   the call
146     # @param metadata [Hash] metadata to be sent to the server
147     # @return [Object] the response received from the server
148     def request_response(method, req, marshal, unmarshal,
149                          deadline: nil,
150                          return_op: false,
151                          parent: nil,
152                          credentials: nil,
153                          metadata: {})
154       c = new_active_call(method, marshal, unmarshal,
155                           deadline: deadline,
156                           parent: parent,
157                           credentials: credentials)
158       interception_context = @interceptors.build_context
159       intercept_args = {
160         method: method,
161         request: req,
162         call: c.interceptable,
163         metadata: metadata
164       }
165       if return_op
166         # return the operation view of the active_call; define #execute as a
167         # new method for this instance that invokes #request_response.
168         c.merge_metadata_to_send(metadata)
169         op = c.operation
170         op.define_singleton_method(:execute) do
171           interception_context.intercept!(:request_response, intercept_args) do
172             c.request_response(req, metadata: metadata)
173           end
174         end
175         op
176       else
177         interception_context.intercept!(:request_response, intercept_args) do
178           c.request_response(req, metadata: metadata)
179         end
180       end
181     end

ActiveCall が生成される #new_active_call の実装も見ておきましょう。この中では、491 行目の Channel#create_call によって Call class のオブジェクトが生成され、それを引数として 497 行目で ActiveCall オブジェクトが生成され、method の返り値となっています。

リスト8.26 lib/grpc/generic/client_stub.rb

476     # Creates a new active stub
477     #
478     # @param method [string] the method being called.
479     # @param marshal [Function] f(obj)->string that marshals requests
480     # @param unmarshal [Function] f(string)->obj that unmarshals responses
481     # @param parent [Grpc::Call] a parent call, available when calls are
482     #   made from server
483     # @param credentials [Core::CallCredentials] credentials to use when making
484     #   the call
485     def new_active_call(method, marshal, unmarshal,
486                         deadline: nil,
487                         parent: nil,
488                         credentials: nil)
489       deadline = from_relative_time(@timeout) if deadline.nil?
490       # Provide each new client call with its own completion queue
491       call = @ch.create_call(parent, # parent call
492                              @propagate_mask, # propagation options
493                              method,
494                              nil, # host use nil,
495                              deadline)
496       call.set_credentials! credentials unless credentials.nil?
497       ActiveCall.new(call, marshal, unmarshal, deadline,
498                      started: false)
499     end

ChannelCall という新しい 2 つの class が出てきました。実は、この2つは gRPC において非常に重要なものです。gRPC の C-core はその機能を C の struct およ び function として提供しているのですが、ChannelCall は C-core から提供される grpc_channelgrpc_call という C の struct をそれぞれラップしたものとなっています。その為、どちらも gem の native extention として定義されています。

grpc_channel は client-server 間の connection に相当する struct です。grpc_call は1つの RPC request に相当する struct で、grpc_call を invoke することで request が送られます。grpc_call は1つの grpc_channel に紐づく形で生成されます。それぞれの description は grpc/grpc の include/grpc/impl/codegen/grpc_types.h に記載されています(定義は別のファイルにあります)。

リスト8.27 grpc/grpc の include/grpc/impl/codegen/grpc_types.h

 61 /** The Channel interface allows creation of Call objects. */
 62 typedef struct grpc_channel grpc_channel;
.
.
.
 67 /** A Call represents an RPC. When created, it is in a configuration state
 68     allowing properties to be set until it is invoked. After invoke, the Call
 69     can have messages written to it and read from it. */
 70 typedef struct grpc_call grpc_call;

ClientStub#request_response の実装に戻ります。その中では、ActiveCall#request_response が呼ばれるのでした。その実装は次のようなものになっています。この中で、実際に RPC の request が送られます。

リスト8.28 lib/grpc/generic/active_call.rb

343     # request_response sends a request to a GRPC server, and returns the
344     # response.
345     #
346     # @param req [Object] the request sent to the server
347     # @param metadata [Hash] metadata to be sent to the server. If a value is
348     # a list, multiple metadata for its key are sent
349     # @return [Object] the response received from the server
350     def request_response(req, metadata: {})
351       raise_error_if_already_executed
352       ops = {
353         SEND_MESSAGE => @marshal.call(req),
354         SEND_CLOSE_FROM_CLIENT => nil,
355         RECV_INITIAL_METADATA => nil,
356         RECV_MESSAGE => nil,
357         RECV_STATUS_ON_CLIENT => nil
358       }
359       @send_initial_md_mutex.synchronize do
360         # Metadata might have already been sent if this is an operation view
361         unless @metadata_sent
362           ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata)
363         end
364         @metadata_sent = true
365       end
366
367       begin
368         batch_result = @call.run_batch(ops)
369         # no need to check for cancellation after a CallError because this
370         # batch contains a RECV_STATUS op
371       ensure
372         set_input_stream_done
373         set_output_stream_done
374       end
375
376       @call.metadata = batch_result.metadata
377       attach_status_results_and_complete_call(batch_result)
378       get_message_from_batch_result(batch_result)
379     end

ActiveCall#request_response の中では、352 行目で ops という hash が組み立てられて、それが 368 行目の Call#run_batch の呼び出しによって処理されています。この中で、server との通信が行われます。server で処理をして帰ってきた結果は batch_result となり、378 行目で get_message_from_batch_result によって処理されて返り値となります。

ここで、ops という新しい概念が出てきました。実は、gRPC の通信は内部 (C-core) では「grpc_op という struct で表現される operation の batch 処理」として抽象化されています。

SEND_MESSAGE などは operation の type を意味しています。これらは C-core の中で定義されていて、次のようなものになっています。

リスト8.29 grpc/grpc の include/grpc/impl/codegen/grpc_types.h

511 typedef enum {
512   /** Send initial metadata: one and only one instance MUST be sent for each
513       call, unless the call was cancelled - in which case this can be skipped.
514       This op completes after all bytes of metadata have been accepted by
515       outgoing flow control. */
516   GRPC_OP_SEND_INITIAL_METADATA = 0,
517   /** Send a message: 0 or more of these operations can occur for each call.
518       This op completes after all bytes for the message have been accepted by
519       outgoing flow control. */
520   GRPC_OP_SEND_MESSAGE,
521   /** Send a close from the client: one and only one instance MUST be sent from
522       the client, unless the call was cancelled - in which case this can be
523       skipped. This op completes after all bytes for the call
524       (including the close) have passed outgoing flow control. */
525   GRPC_OP_SEND_CLOSE_FROM_CLIENT,
526   /** Send status from the server: one and only one instance MUST be sent from
527       the server unless the call was cancelled - in which case this can be
528       skipped. This op completes after all bytes for the call
529       (including the status) have passed outgoing flow control. */
530   GRPC_OP_SEND_STATUS_FROM_SERVER,
531   /** Receive initial metadata: one and only one MUST be made on the client,
532       must not be made on the server.
533       This op completes after all initial metadata has been read from the
534       peer. */
535   GRPC_OP_RECV_INITIAL_METADATA,
536   /** Receive a message: 0 or more of these operations can occur for each call.
537       This op completes after all bytes of the received message have been
538       read, or after a half-close has been received on this call. */
539   GRPC_OP_RECV_MESSAGE,
540   /** Receive status on the client: one and only one must be made on the client.
541       This operation always succeeds, meaning ops paired with this operation
542       will also appear to succeed, even though they may not have. In that case
543       the status will indicate some failure.
544       This op completes after all activity on the call has completed. */
545   GRPC_OP_RECV_STATUS_ON_CLIENT,
546   /** Receive close on the server: one and only one must be made on the
547       server. This op completes after the close has been received by the
548       server. This operation always succeeds, meaning ops paired with
549       this operation will also appear to succeed, even though they may not
550       have. */
551   GRPC_OP_RECV_CLOSE_ON_SERVER
552 } grpc_op_type;

ActiveCall#request_response の中では operation としてさまざまなものを渡しています。SEND_MESSAGE には server へ送信するデータが入っており、これは 353行目 で呼び出される @marshal.call(req) になっています。ここで、デフォルトでは Protocol Buffers による serialize が行われます。GRPC_OP_SEND_MESSAGE の説明にも記載されてる様に、この operartion はデータがすべて送信されると完了します。次に RECV_MESSAGE ですが、GRPC_OP_RECV_MESSAGE の説明にも記載されてる様に、server から受信したデータがすべて読み出されるとこの operation は完了します。他にもさまざまな operation が指定されていますが、基本的にはこの2つの operation によって blocking する形での request-response が実現されています。

ops が送信される実態である Call#run_batch についても少し見ておきます。これは ext/grpc/rb_call.c の中で grpc_rb_call_run_batch という c function として定義されています。

795-811 行目のコメントで説明がある様に、これは operation を post して、その完了を待つものになっています。少々長いですが、注目するべき処理は 841行目の grpc_call_start_batch という function call と、852 行目の rb_completion_queue_pluck という function call だけです。grpc_call_start_batch は C-core によって提供 されている function で、ops の実行を開始します。rb_completion_queue_pluck は grpc gem の中で定義されている function ですが、内部では C-core が提供する grpc_completion_queue_pluck という function を呼び出しています。

ここで、completion queue というワードが新たに登場しました。これは、gRPC の C-core が結果を受け取るために用意しているインターフェースです。grpc_completion_queue という struct が queue として用意されていて、operation が完了するとその通知が queue に post されます。grpc_rb_call_run_batchrb_completion_queue_pluck を呼び出して、ops 完了の待ち合わせを行なっています。そして、859 行目で ops の結果を result として受け取り、862 行目でその result を return しています。

リスト8.30 ext/grpc/rb_call.c

 795 /* call-seq:
 796    ops = {
 797      GRPC::Core::CallOps::SEND_INITIAL_METADATA => <op_value>,
 798      GRPC::Core::CallOps::SEND_MESSAGE => <op_value>,
 799      ...
 800    }
 801    tag = Object.new
 802    timeout = 10
 803    call.start_batch(tag, timeout, ops)
 804
 805    Start a batch of operations defined in the array ops; when complete, post a
 806    completion of type 'tag' to the completion queue bound to the call.
 807
 808    Also waits for the batch to complete, until timeout is reached.
 809    The order of ops specified in the batch has no significance.
 810    Only one operation of each type can be active at once in any given
 811    batch */
 812 static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) {
 813   run_batch_stack* st = NULL;
 814   grpc_rb_call* call = NULL;
 815   grpc_event ev;
 816   grpc_call_error err;
 817   VALUE result = Qnil;
 818   VALUE rb_write_flag = rb_ivar_get(self, id_write_flag);
 819   unsigned write_flag = 0;
 820   void* tag = (void*)&st;
 821
 822   grpc_ruby_fork_guard();
 823   if (RTYPEDDATA_DATA(self) == NULL) {
 824     rb_raise(grpc_rb_eCallError, "Cannot run batch on closed call");
 825     return Qnil;
 826   }
 827   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
 828
 829   /* Validate the ops args, adding them to a ruby array */
 830   if (TYPE(ops_hash) != T_HASH) {
 831     rb_raise(rb_eTypeError, "call#run_batch: ops hash should be a hash");
 832     return Qnil;
 833   }
 834   if (rb_write_flag != Qnil) {
 835     write_flag = NUM2UINT(rb_write_flag);
 836   }
 837   st = gpr_malloc(sizeof(run_batch_stack));
 838   grpc_run_batch_stack_init(st, write_flag);
 839   grpc_run_batch_stack_fill_ops(st, ops_hash);
 840
 841   /* call grpc_call_start_batch, then wait for it to complete using
 842    * pluck_event */
 843   err = grpc_call_start_batch(call->wrapped, st->ops, st->op_num, tag, NULL);
 844   if (err != GRPC_CALL_OK) {
 845     grpc_run_batch_stack_cleanup(st);
 846     gpr_free(st);
 847     rb_raise(grpc_rb_eCallError,
 848              "grpc_call_start_batch failed with %s (code=%d)",
 849              grpc_call_error_detail_of(err), err);
 850     return Qnil;
 851   }
 852   ev = rb_completion_queue_pluck(call->queue, tag,
 853                                  gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
 854   if (!ev.success) {
 855     rb_raise(grpc_rb_eCallError, "call#run_batch failed somehow");
 856   }
 857   /* Build and return the BatchResult struct result,
 858      if there is an error, it's reflected in the status */
 859   result = grpc_run_batch_stack_build_result(st);
 860   grpc_run_batch_stack_cleanup(st);
 861   gpr_free(st);
 862   return result;
 863 }

リスト8.31 grpc/grpc の include/grpc/impl/codegen/grpc_types.h

 54 /** Completion Queues enable notification of the completion of
 55  * asynchronous actions. */
 56 typedef struct grpc_completion_queue grpc_completion_queue;

Channel#run_batch の実装を見ました。再び ActiveCall#request_response に戻ります。

batch の結果は、ActiveCall#get_message_from_batch_result によって整形されて返り値となります。この method の実装は次のようになります。recv_message_batch_resultnil のときは返り値も nil になりますが、それ以外では @unmarshal.call(recv_message_batch_result.message) の結果が返されます。ここで、デフォルトでは Protocol Buffers による deserialize が行われます。nil になるのは、267 行目の debug log にもある様に、「streaming ですべての message が送信されたことを通 知されたケース」などです。

リスト8.32 lib/grpc/generic/active_call.rb

262     def get_message_from_batch_result(recv_message_batch_result)
263       unless recv_message_batch_result.nil? ||
264              recv_message_batch_result.message.nil?
265         return @unmarshal.call(recv_message_batch_result.message)
266       end
267       GRPC.logger.debug('found nil; the final response has been sent')
268       nil
269     end

これで、client から gRPC で request を送り、response を受け取るまでのひととおりの流れをコードから理解することができました。

4-3-2. gRPC server の実装コードを読む

次に、gRPC server の実装コードを読んでみましょう。再び、「gRPC の使い方」で出てきたコードを見てみます。

lib/echo_server.rb の中では、GRPC::RpcServer#handle 呼び出しで、EchoServer を handler として登録しています。

リスト8.33 再掲:「gRPC の使い方」の lib/echo_server.rb

 14 class EchoServer < Echo::Echo::Service
 15   EchoLogger = Logger.new(STDOUT)
 16
 17   def echo(echo_req, _unused_call)
 18     EchoLogger.info("echo \"#{echo_req.message}\"")
 19     Echo::EchoResponse.new(message: echo_req.message)
 20   end
 21 end
 22
 23 # main starts an RpcServer that receives requests to EchoServer at the sample
 24 # server port.
 25 def main
 26   s = GRPC::RpcServer.new
 27   s.add_http2_port('0.0.0.0:50051', :this_port_is_insecure)
 28   logger = Logger.new(STDOUT)
 29   logger.info("... running insecurely on 0.0.0.0:50051")
 30   s.handle(EchoServer)
 31   # Runs the server with SIGHUP, SIGINT and SIGQUIT signal handlers to
 32   # gracefully shutdown.
 33   s.run_till_terminated_or_interrupted(['SIGHUP', 'SIGINT', 'SIGQUIT'])
 34 end
 35
 36 main

GRPC::RpcServer#handle の実装は次のようになっています。この中では、340行目で #add_rpc_descs_for が呼び出されています。

リスト8.34 lib/grpc/generic/rpc_server.rb

301     # handle registration of classes
302     #
303     # service is either a class that includes GRPC::GenericService and whose
304     # #new function can be called without argument or any instance of such a
305     # class.
306     #
307     # E.g, after
308     #
309     # class Divider
310     #   include GRPC::GenericService
311     #   rpc :div DivArgs, DivReply    # single request, single response
312     #   def initialize(optional_arg='default option') # no args
313     #     ...
314     #   end
315     #
316     # srv = GRPC::RpcServer.new(...)
317     #
318     # # Either of these works
319     #
320     # srv.handle(Divider)
321     #
322     # # or
323     #
324     # srv.handle(Divider.new('replace optional arg'))
325     #
326     # It raises RuntimeError:
327     # - if service is not valid service class or object
328     # - its handler methods are already registered
329     # - if the server is already running
330     #
331     # @param service [Object|Class] a service class or object as described
332     #        above
333     def handle(service)
334       @run_mutex.synchronize do
335         unless @running_state == :not_started
336           fail 'cannot add services if the server has been started'
337         end
338         cls = service.is_a?(Class) ? service : service.class
339         assert_valid_service_class(cls)
340         add_rpc_descs_for(service)
341       end
342     end

#add_rpc_descs_for は次のような実装になっています。まず注目するのは 537 行目と 541 行目で、ここで rps_specsRpcDesc class の object をセットしていま す。この RpcDesc class は、client 実装で登場したのと同じものです。

次に注目するのは 543-547 行目で、ここで service class に定義された method を handler として登録しています。

面白いのは method メソッドが利用されていることです。これは、「既存の object の method から Method class の object を作るメソッド」で、Method class の object は Proc などと同様に #call で内部の処理を呼び出すことができます。つまり、ここで handlers[route] に格納した処理が適切なタイミングで call されて、 そこで gRPC の利用者(開発者)が定義した service method が呼ばれる訳です。なお、#add_rpc_descs_for の 543-547 行目の処理から分かる様に、実は GRPC::RpcServer#handle
には class では無く object を渡すこともできます。必要になった際にはこの機能を活用してみてください。

リスト8.35 lib/grpc/generic/rpc_server.rb

534     # This should be called while holding @run_mutex
535     def add_rpc_descs_for(service)
536       cls = service.is_a?(Class) ? service : service.class
537       specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {})
538       cls.rpc_descs.each_pair do |name, spec|
539         route = "/#{cls.service_name}/#{name}".to_sym
540         fail "already registered: rpc #{route} from #{spec}" if specs.key? route
541         specs[route] = spec
542         rpc_name = GenericService.underscore(name.to_s).to_sym
543         if service.is_a?(Class)
544           handlers[route] = cls.new.method(rpc_name)
545         else
546           handlers[route] = service.method(rpc_name)
547         end
548         GRPC.logger.info("handling #{route} with #{handlers[route]}")
549       end
550     end

リスト8.36 #method メソッドの動作

$ irb
irb(main):001:0> p_method = method(:p)
=> #<Method: main.p>
irb(main):002:0> p_method.call("Hello")
"Hello"
=> "Hello"

service class が handler に登録されるまでの処理はわかりました。次は、GRPC::RpcServer#s.run_till_terminated_or_interrupted で server を running にする処理を 見てみます。

GRPC::RpcServer#s.run_till_terminated_or_interrupted は次のような実装になっています。392-411 行目で signal handler を登録したあとで、413 行目の #run で server が running になります。

リスト8.37 lib/grpc/generic/rpc_server.rb

364     # runs the server with signal handlers
365     # @param signals
366     #     List of String, Integer or both representing signals that the user
367     #     would like to send to the server for graceful shutdown
368     # @param wait_interval (optional)
369     #     Integer seconds that user would like stop_server_thread to poll
370     #     stop_server
371     def run_till_terminated_or_interrupted(signals, wait_interval = 60)
372       @stop_server = false
373       @stop_server_mu = Mutex.new
374       @stop_server_cv = ConditionVariable.new
375
376       @stop_server_thread = Thread.new do
377         loop do
378           break if @stop_server
379           @stop_server_mu.synchronize do
380             @stop_server_cv.wait(@stop_server_mu, wait_interval)
381           end
382         end
383
384         # stop is surrounded by mutex, should handle multiple calls to stop
385         #   correctly
386         stop
387       end
388
389       valid_signals = Signal.list
390
391       # register signal handlers
392       signals.each do |sig|
393         # input validation
394         if sig.class == String
395           sig.upcase!
396           if sig.start_with?('SIG')
397             # cut out the SIG prefix to see if valid signal
398             sig = sig[3..-1]
399           end
400         end
401
402         # register signal traps for all valid signals
403         if valid_signals.value?(sig) || valid_signals.key?(sig)
404           Signal.trap(sig) do
405             @stop_server = true
406             @stop_server_cv.broadcast
407           end
408         else
409           fail "#{sig} not a valid signal"
410         end
411       end
412
413       run
414
415       @stop_server_thread.join
416     end

#run は次のような実装になっています。359 行目で #loop_handle_server_calls が呼ばれて、handler を処理する loop が回り始めます。

リスト8.38 lib/grpc/generic/rpc_server.rb

344     # runs the server
345     #
346     # - if no rpc_descs are registered, this exits immediately, otherwise it
347     #   continues running permanently and does not return until program exit.
348     #
349     # - #running? returns true after this is called, until #stop cause the
350     #   the server to stop.
351     def run
352       @run_mutex.synchronize do
353         fail 'cannot run without registering services' if rpc_descs.size.zero?
354         @pool.start
355         @server.start
356         transition_running_state(:running)
357         @run_cond.broadcast
358       end
359       loop_handle_server_calls
360     end

#loop_handle_server_calls は次のような実装になっています。451 行目から471行目が while loop となっています。簡単に全体像を説明すると、まず 453 行目で Core::Server#request_call が呼ばれて、NewServerRpc という struct を返します。次に、455 行目で #new_active_server_call が呼ばれて、ActiveCall class の object が生成されます。ここで出てくる ActiveCall class は、client 実装で出てきたものと同じものです。ActiveCall object は、worker pool の中の worker thread で 処理されます。worker pool への schedule は 457 行目の Pool#schedule で行われます。worker は、rpc_descs[mth] で request された rpc method に対応する RpcDesc オブジェクトを取り出し、RpcDesc#run_server_method を呼び出します。この中で、実際の rpc method の処理が行われます。

リスト8.39 lib/grpc/generic/rpc_server.rb

448     # handles calls to the server
449     def loop_handle_server_calls
450       fail 'not started' if running_state == :not_started
451       while running_state == :running
452         begin
453           an_rpc = @server.request_call
454           break if (!an_rpc.nil?) && an_rpc.call.nil?
455           active_call = new_active_server_call(an_rpc)
456           unless active_call.nil?
457             @pool.schedule(active_call) do |ac|
458               c, mth = ac
459               begin
460                 rpc_descs[mth].run_server_method(
461                   c,
462                   rpc_handlers[mth],
463                   @interceptors.build_context
464                 )
465               rescue StandardError
466                 c.send_status(GRPC::Core::StatusCodes::INTERNAL,
467                               'Server handler failed')
468               end
469             end
470           end
471         rescue Core::CallError, RuntimeError => e
472           # these might happen for various reasons.  The correct behavior of
473           # the server is to log them and continue, if it's not shutting down.
474           if running_state == :running
475             GRPC.logger.warn("server call failed: #{e}")
476           end
477           next
478         end
479       end
480       # @running_state should be :stopping here
481       @run_mutex.synchronize do
482         transition_running_state(:stopped)
483         GRPC.logger.info("stopped: #{self}")
484         @server.close
485       end
486     end

RpcDesc#run_server_method は次のような実装になっています。この中では、client 実装と同様に、RpcDesc が streaming かそうでないかで dispatch が行われています。一番シンプルな「client, server ともに streaming でないケース」では、#request_response? が true を返し #handle_request_response が呼ばれます。

リスト8.40 lib/grpc/generic/rpc_desc.rb

110     ##
111     # @param [GRPC::ActiveCall] active_call The current active call object
112     #   for the request
113     # @param [Method] mth The current RPC method being called
114     # @param [GRPC::InterceptionContext] inter_ctx The interception context
115     #   being executed
116     #
117     def run_server_method(active_call, mth, inter_ctx = InterceptionContext.new)
118       # While a server method is running, it might be cancelled, its deadline
119       # might be reached, the handler could throw an unknown error, or a
120       # well-behaved handler could throw a StatusError.
121       if request_response?
122         handle_request_response(active_call, mth, inter_ctx)
123       elsif client_streamer?
124         handle_client_streamer(active_call, mth, inter_ctx)
125       elsif server_streamer?
126         handle_server_streamer(active_call, mth, inter_ctx)
127       else  # is a bidi_stream
128         handle_bidi_streamer(active_call, mth, inter_ctx)
129       end
130     rescue BadStatus => e
131       # this is raised by handlers that want GRPC to send an application error
132       # code and detail message and some additional app-specific metadata.
133       GRPC.logger.debug("app err:#{active_call}, status:#{e.code}:#{e.details}")
134       send_status(active_call, e.code, e.details, e.metadata)
135     rescue Core::CallError => e
136       # This is raised by GRPC internals but should rarely, if ever happen.
137       # Log it, but don't notify the other endpoint..
138       GRPC.logger.warn("failed call: #{active_call}\n#{e}")
139     rescue Core::OutOfTime
140       # This is raised when active_call#method.call exceeds the deadline
141       # event.  Send a status of deadline exceeded
142       GRPC.logger.warn("late call: #{active_call}")
143       send_status(active_call, DEADLINE_EXCEEDED, 'late')
144     rescue StandardError, NotImplementedError => e
145       # This will usuaally be an unhandled error in the handling code.
146       # Send back a UNKNOWN status to the client
147       #
148       # Note: this intentionally does not map NotImplementedError to
149       # UNIMPLEMENTED because NotImplementedError is intended for low-level
150       # OS interaction (e.g. syscalls) not supported by the current OS.
151       GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
152       GRPC.logger.warn(e)
153       send_status(active_call, UNKNOWN, "#{e.class}: #{e.message}")
154     end

RpcDesc#handle_request_response は次のような実装になっています。51 行目で ActiveCall#read_unary_request を呼んで request parameter として送られた object を取得します。これはすでに deserialize された Ruby object となっています。そして、60 行目で handler である Method object の #call が呼ばれて、request object が処理されます。ここで、gRPC 開発者が定義した service method による処理が行われる訳です。最後に 61-64 行目でActiveCall#server_unary_response で response を返します。ここまでが、gRPC server による処理になります。

なお、52 行目で active_call.single_req_view として生成した object を 61 行目の Method#call の際に渡していることも分かるかと思います。これは、触れる attribute を制限する為に SingleReqView という class の object でラップした ActiveCall object です。service method が metadata などにアクセスしたいときは、この object を利用することができます。

リスト8.41 lib/grpc/generic/rpc_desc.rb

 50     def handle_request_response(active_call, mth, inter_ctx)
 51       req = active_call.read_unary_request
 52       call = active_call.single_req_view
 53
 54       inter_ctx.intercept!(
 55         :request_response,
 56         method: mth,
 57         call: call,
 58         request: req
 59       ) do
 60         resp = mth.call(req, call)
 61         active_call.server_unary_response(
 62           resp,
 63           trailing_metadata: active_call.output_metadata
 64         )
 65       end
 66     end

ActiveCall#read_unary_requestActiveCall#server_unary_response などの説明は割愛しますが、client 実装と同様に内部で ops を組み立てて、batch として送信しています。また、ActiveCall の内部で serialize/deserialize の処理も行われています。気になる方は、実装を読んでみてください。

4-3-3. gRPC の Ruby 実装を読む: まとめ

gRPC の Ruby 実装を読んで、gRPC client, gRPC server における処理について理解を深めました。RPC を operation の batch として表現していることや、completion queue を利用した待ち合わせなど、gRPC をただ利用しているだけでは見えなかった C-core の振る舞いについて垣間見ることができました。また、serialize/deserialize の処理がどう行われているのか、その hook point、service method の引数の意味や実行される環境・タイミングなどについても理解を深めることができました。これらは、gRPC を利用する上できっと有用な知識となると思います。

4-4. C-core の実装を概観する

C-core 自体の実装も見てみましょう。C-core の実装は膨大で、コードを追っていくのは現実的ではありません。そこで、ここでは C-core の内容についていくつかの観点から簡単に概説をしつつ、参考になるドキュメントを紹介する形式にしたいと思います。

4-4-1. C-core のコードについて

v1.19.1 の C-core のコードは https://github.com/grpc/grpc/tree/v1.19.1/src/core にあります。この中で、lib directory の中に gRPC C-core のメインの実装が、ext directory の中に optional plugin の実装が入っています。HTTP/2、Cronet、In-Process の3つの差し替え可能な transport 実装は、ext/transport の中に入っています。

C-core のコードについては、その実装の一部を説明するドキュメントが次の URL のリンク先にあります。これらも理解の助けになるはずです。

https://github.com/grpc/grpc/tree/v1.19.1/doc
https://github.com/grpc/grpc/tree/v1.19.1/doc/core

4-4-2. grpc_channel_filter について

grpc gem のソースコードリーディングでは、grpc_channel, grpc_call, grpc_completion_queue という struct が C-core の重要な要素として登場しました。その他にも、いくつか重要な struct は存在します。その中でも、特に重要なのが grpc_channel_filter です。

grpc_channel_filter は src/core/lib/channel/channel_stack.h の中で定義されている struct です。コード中のコメントでは次のように説明されており、channel の機能を pluggable な形で実装するために利用されていることが分かります。

A channel filter defines how operations on a channel are implemented. Channel filters are chained together to create full channels, and if those chains are linear, then channel stacks provide a mechanism to minimize allocations for that chain. Call stacks are created by channel stacks and represent the per-call data for that stack.

gRPC が動作するために必要な主要な channel filter は src/core/lib の中で定義されていますが、src/core/ext/filters の中でもいくつか channel filter が実装されています。次のリストが、src/core/ext/filters 以下の directory 一覧を示しています。

リスト8.42 src/core/ext/filters の中の directory 一覧

$ ls -1 src/core/ext/filters
census
client_channel
deadline
http
load_reporting
max_age
message_size
workarounds

これらの directory を見ると、channel filter としてさまざまな機能が提供されていることが分かります。

注目すべきは http directory です。この中で、grpc_http_client_filtergrpc_http_server_filter という channel filter が定義されています。これらは、コード 中で

Processes metadata on the client side for HTTP2 transports

Processes metadata on the server side for HTTP2 transports

というコメントで振る舞いが説明されており、gRPC における metadata(HTTP/2 における header)を処理する実装となっています。実際、実装を見ていると grpc_http_client_filter の一処理として定義された hc_start_transport_stream_op_batch では、HTTP header の組み立て処理が記述されています。

このように、gRPC の様々な機能が channel filter として実装されていることが分かりました。channel filter の実装を見れば、gRPC の機能に対する理解が深まるはずで す。

4-4-3. gRPC の pluggable な transport について

gRPC は、HTTP/2, Cronet, In-process の 3 つから transport 実装を選ぶことができる、pluggable な仕組みになっています。

transport の振る舞いや、transport が満たすべき条件については、Transport Explainer というドキュメントにまとまっています[32]。ここでは、このドキュメントを元に gRPC の transport について簡単に説明したいと思います。

gRPC の transport は、grpc_transport_vtable という struct の function としていくつかの function を実装している必要があります(この struct は function pointer を保持するコンテナとなっています)。もっとも重要なものは、perform_stream_op と呼ばれる function です。function signature は次のようなものなります。

リスト8.43 perform_stream_op の function signature

void perform_stream_op(grpc_transport* self, grpc_stream* stream, grpc_transport_stream_op_batch* op);

grpc_stream は stream の unique な identifier です。grpc_transport_stream_op_batch は stream operation の batch を表現する値です。これらを引数にもつ perform_stream_op は、1つの stream 上の op batch を処理します。

この perform_stream_op の中で、transport 実装ごとの方法で「stream operation の処理」、「metadata の送信」、「message を byte stream に変換して送信」、「completion callback 呼び出しの schedule」などを行います。

stream operation としては、以下が存在します。

  • send_initial_metadata
  • recv_initial_metadata
  • send_message (zero or more)
  • recv_message (zero or more)
  • send_trailing_metadata
  • recv_trailing_metadata
  • cancel_stream
  • collect_stats

transport 実装は、このそれぞれの stream operation を適切に処理することが求められます。

その他、次の function も transport では定義されます。また、cancellations などをうまく処理することも求められる様です。

  • perform_transport_op
  • init_stream
  • destroy_stream, destroy_transport
  • set_pollset, set_pollset_set, get_endpoint

ここまで、gRPC が transport 実装に求めるものについて概観しました。gRPC は transport 実装に期待する振る舞いを src/core/lib/transport/transport_impl.h の中で grpc_transport_vtable という struct として定義しており、その振る舞いを実装さえすれば transport 実装を差し替えることが可能です。

gRPC の pluggable な transport という特徴がどう実現されているのか、イメージが掴めたかと思います。

4-4-4. grpc_transport_stream_op_batch と grpc_op の関係について

先ほど登場した grpc_transport_stream_op_batch は、transport に送る operation を batch としてまとめた struct です。「operation を batch としてまとめる」という話は、grpc gem のコードリーディングで出てきた grpc_op の batch と似ています。実は、この2つは関係しています。

grpc_op は C-core のインターフェース(surface)として提供されている struct です。grpc gem の中では、grpc_op の batch は

grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops, size_t nops, void* tag, void* reserved)

という C の関数に渡されて、内部で処理されるのでした。grpc_call_start_batch は内部で

static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, size_t nops, void* notify_tag, int is_notify_tag_closure)

を呼び出し、その中で grpc_op の内容を反映して grpc_transport_stream_op_batch の組み立てが行われます。組み立てた grpc_transport_stream_op_batch は、grpc_connected_filter と呼ばれる channel filter の

static void con_start_transport_stream_op_batch(grpc_call_element* elem, grpc_transport_stream_op_batch* batch)

という処理の中で、transport layer に渡されます。

このように、これまでに登場した C-core の surface や channel filter を通して、grpc_opgrpc_transport_stream_op_batch は繋がっています。これで、gRPC の動作について、application から transport まで動作を理解することができました。

4-4-5. gRPC over HTTP/2

gRPC Concepts Overview [34]gRPC over HTTP/2 [33] というドキュメントでは、gRPC が HTTP/2 上の通信としてどう表現されているかが記述されています。HTTP/2 用のツールを用いて gRPC をデバッグする際など、「gRPC と HTTP/2 の関係の理解」が役立つ機会はきっとあるでしょう。これらの内容について、簡単に説明したいと思います。

gRPC は bidirectional な streaming ができる様になっています。これは、HTTP/2 上の stream として実現されています。gRPC の Call Header(どの service method への RPC なのかを表す情報)や Metadata は HTTP/2 の header として、HPACK で圧縮されて送信されます。Payload Message(request parameter や response)は byte stream へと serialize されて、HTTP/2 の DATA frame として送信されます。response の際には、Payload Message の後で Status が送信されます。

これらの値について、もう少し詳細に見てみます。

gRPC の request は、path が "/<service-name>/<method-name>" の HTTP/2 request として送信されます。Metadata は user が定義するもの以外にも、gRPC library がその機能の実現のために設定するものも含めて、header として送信されます。次に、Request の際に送られる header の一例を示します。

  • Timeout の値は grpc-timeout header として表現
  • Content-Type は "application/grpc+proto" や "application/grpc+json" と表現
  • gRPC で利用される圧縮方式は grpc-encoding header として表現

user が定義する metadata は基本的には key-value pair であり、そのまま header として表現されます。ただし、grpc- という prefix をもつ key は gRPC が予約しているため、利用してはいけないことになっています。また、HTTP/2 の header は value として任意長の byte 列をセットすることが出来ないので、binary を metadata として送りたい場合は key の suffix を -bin にするというルールがあります。そうすると、Runtime library が binary header であると検知して、base64 encoding/decoding を行ってくれます。

単純な unary-call(client, server ともに stream では無い RPC)においては、たとえば次のような request/response が行われます。

リスト8.44 gRPC request の HTTP/2 上での表現

# Request
HEADERS (flags = END_HEADERS)
:method = POST
:scheme = http
:path = /google.pubsub.v2.PublisherService/CreateTopic
:authority = pubsub.googleapis.com
grpc-timeout = 1S
content-type = application/grpc+proto
grpc-encoding = gzip
authorization = Bearer y235.wef315yfh138vh31hv93hv8h3v

DATA (flags = END_STREAM)
<Length-Prefixed Message>

リスト8.45 gRPC response の HTTP/2 上での表現

# Response
HEADERS (flags = END_HEADERS)
:status = 200
grpc-encoding = gzip
content-type = application/grpc+proto

DATA
<Length-Prefixed Message>

HEADERS (flags = END_STREAM, END_HEADERS)
grpc-status = 0 # OK
trace-proto-bin = jher831yy13JHy3hc

このように、完全に HTTP/2 にマッピングする形で gRPC が実装されていることが分かったかと思います。これによって、gRPC は transport の機能は完全に HTTP/2 に任せて、より上位のレイヤーの関心ごとに注力できる様になっています。

4-4-6. C-core の実装について: まとめ

以上、C-core の実装について、ドキュメントやコードを元に簡単に説明しました。gRPC の機能がどう実装されているのか、pluggable な transport はどう実現されているのか、また HTTP/2 との対応関係がどうなっているのかなどについて理解が深まったと思います。さらに詳細が気になる方は、自身でもドキュメントやコードを参照してみてください。

5. まとめ

以上、gRPC の概要、使い方、そして設計と内部実装について説明しました。gRPC は今後も重要技術であり続けると思われます。この章を読むことで、gRPC に対する理解が深まることを願っています。また、内部実装を知ることで、エンジニアとしてより成長を感じていただければ幸いです。

6. 参考文献

[1] https://platformlab.stanford.edu/Seminar%20Talks/gRPC.pdf

[2] https://grpc.io/blog/principles

[3] 書籍「マイクロサービスアーキテクチャ」

[4] https://www.tensorflow.org/guide/extend/architecture

[5] https://cloud.google.com/apis/docs/overview#multiple-surfaces-rest-and-grpc

[6] https://coreos.com/etcd/docs/latest/learning/api.html

[7] https://www.cncf.io/blog/2017/03/01/cloud-native-computing-foundation-host-grpc-google/

[8] https://grpc.io/blog/grpc-stacks

[9] https://martinfowler.com/articles/distributed-objects-microservices.html

[10] https://github.com/grpc/grpc/tree/v1.19.1/src/core/ext/transport/chttp2

[11] https://github.com/grpc/grpc/tree/v1.19.1/src/core/lib

[12] https://grpc.io/docs/guides/

[13] https://developers.google.com/protocol-buffers/

[14] https://en.wikipedia.org/wiki/HTTP/2

[15] https://github.com/south37/grpc-echo-example

[16] https://grpc.io/docs/

[17] https://developers.google.com/protocol-buffers/docs/encoding

[18] https://grpc.io/docs/tutorials/basic/ruby.html

[19] https://grpc.io/blog/loadbalancing

[20] https://grpc.io/blog/

[21] https://github.com/grpc-ecosystem/awesome-grpc

[22] https://developers.google.com/protocol-buffers/docs/reference/other#plugins

[23] https://grpc.io/docs/guides/#working-with-protocol-buffers

[24] https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.compiler.plugin.pb

[25] https://github.com/grpc-ecosystem/grpc-gateway/tree/master/protoc-gen-swagger

[26] https://github.com/grpc-ecosystem/grpc-gateway

[27] https://developers.google.com/protocol-buffers/docs/overview#whynotxml

[28] https://opensource.google.com/projects/protobuf

[29] https://tools.ietf.org/html/rfc7540

[30] https://developers.google.com/web/fundamentals/performance/http2/

[31] https://tools.ietf.org/html/draft-ietf-httpbis-header-compression-12

[32] https://github.com/grpc/grpc/blob/v1.19.1/doc/core/transport_explainer.md

[33] https://github.com/grpc/grpc/blob/v1.19.1/doc/PROTOCOL-HTTP2.md

[34] https://github.com/grpc/grpc/blob/master/CONCEPTS.md

[35] Photo Credit: Joel Filipe https://unsplash.com/photos/4NZlogMPIp0

Wantedly, Inc.からお誘い
この話題に共感したら、メンバーと話してみませんか?
Wantedly, Inc.では一緒に働く仲間を募集しています
100 いいね!
100 いいね!

同じタグの記事

今週のランキング

南 直さんにいいねを伝えよう
南 直さんや会社があなたに興味を持つかも