跳到主要内容
版本:dev

gRPC 支持

Fory 可以为包含 service 定义的 schema 生成 Java gRPC service companion。生成的 service code 使用普通 grpc-java channel、server、deadline、status code、interceptor 和 transport security, 但 request/response 对象使用 Fory 序列化,而不是 protobuf。

当两端都由同一份 Fory IDL、protobuf IDL 或 FlatBuffers IDL 生成,并且你希望使用 gRPC 传输语义与 Fory payload 编码时,可以使用这种模式。如果 API 必须被通用 protobuf client、 reflection 工具或期望 protobuf message bytes 的组件消费,请使用标准 protobuf gRPC 代码生成。

Scala 生成的 grpc-java companion 见 Scala gRPC 支持。Kotlin coroutine stub 和 service base 见 Kotlin gRPC 支持

添加依赖

生成的 Java service 文件编译时需要 grpc-java。Fory Java artifact 不会把 gRPC 作为硬依赖, 因此请在应用中添加 grpc-java 依赖:

<dependency>
<groupId>org.apache.fory</groupId>
<artifactId>fory-core</artifactId>
<version>${fory.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
</dependency>

Gradle 示例:

dependencies {
implementation("org.apache.fory:fory-core:$foryVersion")
implementation("io.grpc:grpc-api:$grpcVersion")
implementation("io.grpc:grpc-stub:$grpcVersion")
implementation("io.grpc:grpc-netty-shaded:$grpcVersion")
}

定义 Service

package demo.greeter;

message HelloRequest {
string name = 1;
}

message HelloReply {
string reply = 1;
}

service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply);
}

使用 --grpc 生成 Java model 和 gRPC companion:

foryc service.fdl --java_out=./generated/java --grpc

该 schema 会生成:

文件用途
HelloRequest.javarequest 的 Fory model 类型
HelloReply.javaresponse 的 Fory model 类型
GreeterForyModule.java生成类型的 Fory 注册 module
GreeterGrpc.javagrpc-java service base、stub 和 codec

生成的 method descriptor 使用 Fory-backed MethodDescriptor.Marshaller,因此不会调用 protobuf parser。

实现 Server

实现生成的 service base,并注册到标准 grpc-java Server

package demo.greeter;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;

final class GreeterService extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(
HelloRequest request, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = new HelloReply();
reply.setReply("Hello, " + request.getName());
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}

public final class GreeterServer {
public static void main(String[] args) throws Exception {
Server server =
ServerBuilder.forPort(50051)
.addService(new GreeterService())
.build()
.start();
server.awaitTermination();
}
}

生成代码负责注册和序列化 request/response 类型,service 实现不需要手动创建 Fory 实例。

创建 Client

使用普通 grpc-java channel 和生成 stub:

package demo.greeter;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

public final class GreeterClient {
public static void main(String[] args) {
ManagedChannel channel =
ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
try {
GreeterGrpc.GreeterBlockingStub stub =
GreeterGrpc.newBlockingStub(channel);

HelloRequest request = new HelloRequest();
request.setName("Fory");
HelloReply reply = stub.sayHello(request);
System.out.println(reply.getReply());
} finally {
channel.shutdownNow();
}
}
}

Channel lifecycle、deadline、credential、metadata、load balancing、retry 和 interceptor 都保持 grpc-java 行为。

Streaming RPC

Fory service 可以使用 gRPC 的所有 streaming shape:

service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply);
rpc LotsOfReplies (HelloRequest) returns (stream HelloReply);
rpc LotsOfGreetings (stream HelloRequest) returns (HelloReply);
rpc Chat (stream HelloRequest) returns (stream HelloReply);
}

生成 Java service 方法遵循 grpc-java 约定:

IDL shapeServer 方法形态Client 方法形态
rpc A (Req) returns (Res)void a(Req request, StreamObserver<Res> responses)blocking、async、future unary stub
rpc A (Req) returns (stream Res)void a(Req request, StreamObserver<Res> responses)blocking iterator 或 async observer
rpc A (stream Req) returns (Res)StreamObserver<Req> a(StreamObserver<Res> responses)async request observer
rpc A (stream Req) returns (stream Res)StreamObserver<Req> a(StreamObserver<Res> responses)async request observer

Server 可以直接实现生成的 streaming 方法:

package demo.greeter;

import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;

final class GreeterService extends GreeterGrpc.GreeterImplBase {
@Override
public void lotsOfReplies(
HelloRequest request, StreamObserver<HelloReply> responseObserver) {
HelloReply first = new HelloReply();
first.setReply("Hello, " + request.getName());
responseObserver.onNext(first);

HelloReply second = new HelloReply();
second.setReply("Welcome, " + request.getName());
responseObserver.onNext(second);
responseObserver.onCompleted();
}

@Override
public StreamObserver<HelloRequest> lotsOfGreetings(
StreamObserver<HelloReply> responseObserver) {
List<String> names = new ArrayList<>();
return new StreamObserver<>() {
@Override
public void onNext(HelloRequest request) {
names.add(request.getName());
}

@Override
public void onError(Throwable error) {
responseObserver.onError(error);
}

@Override
public void onCompleted() {
HelloReply reply = new HelloReply();
reply.setReply(String.join(", ", names));
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
};
}

@Override
public StreamObserver<HelloRequest> chat(
StreamObserver<HelloReply> responseObserver) {
return new StreamObserver<>() {
@Override
public void onNext(HelloRequest request) {
HelloReply reply = new HelloReply();
reply.setReply("Hello, " + request.getName());
responseObserver.onNext(reply);
}

@Override
public void onError(Throwable error) {
responseObserver.onError(error);
}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}

生成的 client 返回标准 grpc-java 调用形态:

package demo.greeter;

import io.grpc.stub.StreamObserver;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

final class StreamingClient {
private final GreeterGrpc.GreeterBlockingStub blockingStub;
private final GreeterGrpc.GreeterStub asyncStub;

StreamingClient(
GreeterGrpc.GreeterBlockingStub blockingStub,
GreeterGrpc.GreeterStub asyncStub) {
this.blockingStub = blockingStub;
this.asyncStub = asyncStub;
}

void run() throws InterruptedException {
Iterator<HelloReply> replies =
blockingStub.lotsOfReplies(newRequest("Fory"));
while (replies.hasNext()) {
System.out.println(replies.next().getReply());
}

CountDownLatch greetingsDone = new CountDownLatch(1);
StreamObserver<HelloRequest> greetings =
asyncStub.lotsOfGreetings(new StreamObserver<>() {
@Override
public void onNext(HelloReply reply) {
System.out.println(reply.getReply());
}

@Override
public void onError(Throwable error) {
greetingsDone.countDown();
}

@Override
public void onCompleted() {
greetingsDone.countDown();
}
});
greetings.onNext(newRequest("Ada"));
greetings.onNext(newRequest("Grace"));
greetings.onCompleted();
greetingsDone.await(5, TimeUnit.SECONDS);

CountDownLatch chatDone = new CountDownLatch(1);
StreamObserver<HelloRequest> chat =
asyncStub.chat(new StreamObserver<>() {
@Override
public void onNext(HelloReply reply) {
System.out.println(reply.getReply());
}

@Override
public void onError(Throwable error) {
chatDone.countDown();
}

@Override
public void onCompleted() {
chatDone.countDown();
}
});
chat.onNext(newRequest("Fory"));
chat.onCompleted();
chatDone.await(5, TimeUnit.SECONDS);
}

private static HelloRequest newRequest(String name) {
HelloRequest request = new HelloRequest();
request.setName(name);
return request;
}
}

生成 descriptor 会保留 IDL 中的 service 和 method 名称作为 gRPC path。

Service 行为

生成的 service code 只替换 request/response 序列化。常规 gRPC service 行为仍由 grpc-java 提供:

  • Deadline 和取消
  • TLS 和认证
  • 名称解析与负载均衡
  • Client/server interceptor
  • Status code 和 metadata
  • Channel 池化与生命周期管理

故障排查

缺少 io.grpc 或 Guava 类

添加上面的 grpc-java 依赖。生成的 Fory service 文件导入 grpc-java API,但 Fory Java artifact 不会自动依赖 gRPC。

UNIMPLEMENTED

确认生成的 service 实现已通过 ServerBuilder.addService(...) 注册,并且 client 与 server 来自相同 package、service 和 method 名称。

Protobuf Client 无法解码

Fory gRPC companion 不使用 protobuf wire encoding。请使用 Fory 生成的 client 调用 Fory 生成的 service,或提供单独的 protobuf endpoint。