跳到主要内容
版本:dev

gRPC 支持

Fory 可以为包含 service 定义的 schema 生成 Python gRPC companion module。生成代码使用 grpcio 负责传输,request 和 response 对象使用 pyfory 序列化。

当两端都由同一份 Fory IDL、protobuf IDL 或 FlatBuffers IDL 生成,并且你希望使用 gRPC 传输语义与 Fory payload 编码时,可以使用这种模式。如果客户端或工具必须直接消费 protobuf message bytes,请使用标准 protobuf gRPC 代码生成。

当前生成的 Python companion 面向同步 grpcio API。请使用普通 def servicer 方法、 grpc.server(...)、标准 grpc.Channel 实例,并用 Python iterator/generator 处理 streaming RPC。 生成的 stub 可以接收应用自行配置的任意 channel。Compiler 不会生成 grpc.aio stub 或 service base,因此不要把生成 servicer 方法实现成 async def,除非你在生成 companion 外自行封装 adapter。 基于 grpc.aio 的 Python gRPC async 支持将在下一个 Fory 版本提供。

添加依赖

pip install pyfory grpcio

Fory Python package 不会把 gRPC 作为硬依赖;只有编译或运行生成 gRPC companion 的应用需要安装 grpcio

定义 Service

package demo.greeter;

message HelloRequest {
string name = 1;
}

message HelloReply {
string reply = 1;
}

service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply);
}

生成 Python model 和 gRPC companion:

foryc service.fdl --python_out=./generated/python --grpc

该 schema 会生成:

文件用途
demo_greeter.pyFory dataclass 和注册辅助逻辑
demo_greeter_grpc.pygrpcio stub、servicer base 和注册函数

Module 名称来自 Fory package,点号会替换成下划线;没有 package 的 schema 使用 generated.pygenerated_grpc.py

实现 Server

from concurrent import futures

import grpc

import demo_greeter
import demo_greeter_grpc


class Greeter(demo_greeter_grpc.GreeterServicer):
def say_hello(self, request, context):
return demo_greeter.HelloReply(reply=f"Hello, {request.name}")


def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=8))
demo_greeter_grpc.add_servicer(Greeter(), server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()


if __name__ == "__main__":
serve()

创建 Client

使用生成的 stub 和普通 grpcio channel。生产 client 通常传入配置了 TLS/认证的 channel:

import grpc

import demo_greeter
import demo_greeter_grpc


def main():
credentials = grpc.ssl_channel_credentials()
with grpc.secure_channel("api.example.com:443", credentials) as channel:
stub = demo_greeter_grpc.GreeterStub(channel)
reply = stub.say_hello(demo_greeter.HelloRequest(name="Fory"))
print(reply.reply)


if __name__ == "__main__":
main()

本地测试和开发可以显式使用 insecure channel:

# 仅用于本地测试和开发。
# 生产环境请使用配置了 TLS/认证的 grpc.Channel。
with grpc.insecure_channel("localhost:50051") as channel:
stub = demo_greeter_grpc.GreeterStub(channel)

Channel、credential、deadline、metadata、interceptor、retry 和 server lifecycle 都保持 grpcio 行为。

Streaming RPC

Fory service 可以使用 unary、server-streaming、client-streaming 和 bidirectional streaming:

service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply);
rpc LotsOfReplies (HelloRequest) returns (stream HelloReply);
rpc LotsOfGreetings (stream HelloRequest) returns (HelloReply);
rpc Chat (stream HelloRequest) returns (stream HelloReply);
}

生成 Python companion 遵循 grpcio 的 iterator/generator 约定:

IDL shapeServicer 方法形态Stub 方法形态
rpc A (Req) returns (Res)返回一个 response 对象返回一个 response 对象
rpc A (Req) returns (stream Res)yield 多个 response 对象返回 response iterator
rpc A (stream Req) returns (Res)消费 request iterator 并返回一个 response接收 request iterator
rpc A (stream Req) returns (stream Res)消费 request iterator 并 yield response接收并返回 iterator

Servicer 方法使用 snake_case 名称;生成 descriptor 会保留 IDL 中的 service 和 method 名称作为 gRPC path。每个 message frame 都通过 Fory serializer/deserializer 编码。

Server 可以直接使用 Python iterator:

class Greeter(demo_greeter_grpc.GreeterServicer):
def lots_of_replies(self, request, context):
yield demo_greeter.HelloReply(reply=f"Hello, {request.name}")
yield demo_greeter.HelloReply(reply=f"Welcome, {request.name}")

def lots_of_greetings(self, request_iterator, context):
names = [request.name for request in request_iterator]
return demo_greeter.HelloReply(reply=", ".join(names))

def chat(self, request_iterator, context):
for request in request_iterator:
yield demo_greeter.HelloReply(reply=f"Hello, {request.name}")

生成的 client 使用标准 grpcio streaming 调用形态:

credentials = grpc.ssl_channel_credentials()
with grpc.secure_channel("api.example.com:443", credentials) as channel:
stub = demo_greeter_grpc.GreeterStub(channel)

for reply in stub.lots_of_replies(
demo_greeter.HelloRequest(name="Fory")
):
print(reply.reply)

def greeting_requests():
yield demo_greeter.HelloRequest(name="Ada")
yield demo_greeter.HelloRequest(name="Grace")

summary = stub.lots_of_greetings(greeting_requests())
print(summary.reply)

def chat_requests():
yield demo_greeter.HelloRequest(name="Fory")
yield demo_greeter.HelloRequest(name="RPC")

for reply in stub.chat(chat_requests()):
print(reply.reply)

Service 行为

生成的 service companion 只提供 Fory serialization callback。Service 行为仍遵循标准 grpcio

  • Deadline 和取消
  • TLS 和认证 credential
  • Client/server interceptor
  • Status code、details 和 metadata
  • Channel/server 生命周期
  • 同步 server 的线程池大小

故障排查

缺少 grpc

安装 grpcio

pip install grpcio

UNIMPLEMENTED

确认生成的 servicer 已注册到 server,并且 client 与 server 来自相同 package、service 和 method 名称。

Protobuf Client 无法解码

Fory gRPC 使用 Fory 二进制协议 payload,不是 protobuf wire-format message。两端应使用同一份 schema 生成的 Fory gRPC companion。