grpc server callback 方式异步调用 java 实现
时间: 2023-08-05 16:23:48 浏览: 124
gRPC 的服务端回调(Server-side Callbacks)可以实现异步调用。在 Java 中,可以通过实现 `StreamObserver` 接口并在服务端将其作为参数传递来实现服务端回调。
下面是一个简单的例子:
1. 定义 proto 文件
```
syntax = "proto3";
package example;
service ExampleService {
rpc ExampleMethod(ExampleRequest) returns (stream ExampleResponse) {}
}
message ExampleRequest {
string message = 1;
}
message ExampleResponse {
string message = 1;
}
```
2. 生成 Java 代码
使用 `protoc` 工具生成 Java 代码:
```
$ protoc --proto_path=. --java_out=. example.proto
```
3. 实现服务端回调
在服务端实现 `ExampleService` 接口,并在 `ExampleMethod` 方法中传递 `StreamObserver` 实例作为参数:
```
public class ExampleServiceImpl extends ExampleServiceGrpc.ExampleServiceImplBase {
@Override
public void exampleMethod(ExampleRequest request, StreamObserver<ExampleResponse> responseObserver) {
// 异步处理请求
CompletableFuture.supplyAsync(() -> {
// 模拟处理过程
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return request.getMessage().toUpperCase();
}).thenAccept(result -> {
// 回调通知客户端
responseObserver.onNext(ExampleResponse.newBuilder().setMessage(result).build());
responseObserver.onCompleted();
});
}
}
```
在 `exampleMethod` 方法中,通过 `CompletableFuture.supplyAsync` 异步处理请求,并在处理完成后使用 `responseObserver.onNext` 和 `responseObserver.onCompleted` 方法通知客户端处理结果。
4. 启动服务端
启动服务端并监听端口:
```
public class ExampleServer {
public static void main(String[] args) throws IOException, InterruptedException {
Server server = ServerBuilder.forPort(8080).addService(new ExampleServiceImpl()).build();
server.start();
System.out.println("Server started");
server.awaitTermination();
}
}
```
5. 客户端调用
在客户端中使用 `ManagedChannel` 和 `ExampleServiceGrpc.newStub` 方法创建 `ExampleService` 的 stub,并使用 `StreamObserver` 处理服务端回调:
```
public class ExampleClient {
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build();
ExampleServiceGrpc.ExampleServiceStub stub = ExampleServiceGrpc.newStub(channel);
CountDownLatch latch = new CountDownLatch(1);
stub.exampleMethod(ExampleRequest.newBuilder().setMessage("hello").build(), new StreamObserver<ExampleResponse>() {
@Override
public void onNext(ExampleResponse value) {
System.out.println(value.getMessage());
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
});
latch.await();
channel.shutdown();
}
}
```
在 `exampleMethod` 方法中,调用 `stub.exampleMethod` 方法并传递 `StreamObserver` 实例作为参数。在 `StreamObserver` 中处理服务端回调。
阅读全文