gRPC的通信方式-客户端流式、服务端流式、双向流式在Java的调用示例

场景

gPRC简介以及Java中使用gPRC实现客户端与服务端通信(附代码下载):

https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/108711541

在上面的博客中介绍了gRPC以及使用最基本的rpc通信方式即一个请求对象返回一个响应的方式进行通信。

除此之外gRPC还有以下三种方式。

服务端流式

一个请求对象,服务端返回多个结果对象

proto示例语法

rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse) {}

客户端流式

客户端传入多个请求对象,服务端返回一个响应结果。

proto示例语法

rpc GetStudentsWrapperByAges(stream StudentRequest) returns (StudentResponseList) {}

双向流式

传入多个对象可以返回多个响应对象

注:

博客:
https://blog.csdn.net/badao_liumang_qizhi
关注公众号
霸道的程序猿
获取编程相关电子书、教程推送与免费下载。

实现

服务端流式实现

在上面博客的基础上,打开Person.proto文件

message StudentRequest {
    int32 age = 1;
}

message StudentResponse {
    string name = 1;
    int32 age = 2;
    string city = 3;
}

添加两个message作为请求和响应对象。

因为gRPC的请求和响应对象必须在message中定义,不能直接使用string或者int32这种作为参数。

然后在新建接口方法

service PersonService {
    rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse) {}
}

此方法是要请求参数为一个age,然后返回多个学生对象。

然后调用插件生成代码。

然后来到PersonServiceImpl中对接口方法进行实现

    @Override
    public void getStudentsByAge(StudentRequest request, StreamObserver<StudentResponse> responseObserver) {
        System.out.println("接收到的客户端消息为:"+request.getAge());

        responseObserver.onNext(StudentResponse.newBuilder().setName("1公众号:霸道的程序猿")
                .setAge(30)
                .setCity("北京")
                .build());

        responseObserver.onNext(StudentResponse.newBuilder().setName("2公众号:霸道的程序猿")
                .setAge(40)
                .setCity("上海")
                .build());

        responseObserver.onNext(StudentResponse.newBuilder().setName("3公众号:霸道的程序猿")
                .setAge(50)
                .setCity("广州")
                .build());
        responseObserver.onCompleted();
    }

然后来到客户端中

       ManagedChannel managedChannel  = ManagedChannelBuilder.forAddress("localhost",8899)
                .usePlaintext().build();
        PersonServiceGrpc.PersonServiceBlockingStub blockingStub = PersonServiceGrpc.newBlockingStub(managedChannel);

        System.out.println("请求-流式响应,调用getRealNameByUsername");
        Iterator<StudentResponse> iter = blockingStub.getStudentsByAge(StudentRequest.newBuilder().setAge(20).build());
        while (iter.hasNext())
        {
            StudentResponse studentResponse = iter.next();
            System.out.println(studentResponse.getName());
            System.out.println(studentResponse.getAge());
            System.out.println(studentResponse.getCity());
        }

然后运行服务端后再运行客户端

此时服务端

客户端流式实现

打开proto文件

message StudentRequest {
    int32 age = 1;
}

message StudentResponse {
    string name = 1;
    int32 age = 2;
    string city = 3;
}

message StudentResponseList {
    repeated StudentResponse studentResponse = 1;
}

添加响应的list,要实现客户端发动流式多个请求参数(年龄),服务端返回单个list对象,其中每个List的数据是学生对象。

添加接口方法

service PersonService {
    rpc GetStudentsWrapperByAges(stream StudentRequest) returns (StudentResponseList) {}
}

然后调用插件生成代码。打开PersonServiceImpl进行方法的实现

   @Override
    public StreamObserver<StudentRequest> getStudentsWrapperByAges(final StreamObserver<StudentResponseList> responseObserver) {
        return new StreamObserver<StudentRequest>() {
            public void onNext(StudentRequest studentRequest) {
                System.out.println("onNext:"+studentRequest.getAge());
            }

            public void onError(Throwable throwable) {
                System.out.println(throwable.getMessage());
            }

            public void onCompleted() {
                StudentResponse studentResponse = StudentResponse.newBuilder()
                        .setName("公众号:霸道的程序猿")
                        .setAge(20)
                        .setCity("北京").build();
                StudentResponse studentResponse1 = StudentResponse.newBuilder()
                        .setName("1公众号:霸道的程序猿")
                        .setAge(30)
                        .setCity("上海").build();

                StudentResponseList studentResponseList = StudentResponseList.newBuilder()
                        .addStudentResponse(studentResponse).addStudentResponse(studentResponse1).build();

                responseObserver.onNext(studentResponseList);
                responseObserver.onCompleted();

            }
        };
    }

与上面不同,客户端如果是流式请求的话,那么客户端必须使用异步的stub

PersonServiceGrpc.PersonServiceStub stub = PersonServiceGrpc.newStub(managedChannel);

客户端代码为

        ManagedChannel managedChannel  = ManagedChannelBuilder.forAddress("localhost",8899)
                .usePlaintext().build();

        PersonServiceGrpc.PersonServiceStub stub = PersonServiceGrpc.newStub(managedChannel);
  
        System.out.println("-----------------------------");
        System.out.println("流式请求-响应,调用GetStudentsWrapperByAges");
        StreamObserver<StudentResponseList> studentResponseListStreamObserver = new StreamObserver<StudentResponseList>() {
            public void onNext(StudentResponseList studentResponseList) {
                studentResponseList.getStudentResponseList().forEach(studengResponse ->{
                    System.out.println(studengResponse.getName());
                    System.out.println(studengResponse.getAge());
                    System.out.println(studengResponse.getCity());
                });
            }

            public void onError(Throwable throwable) {
                System.out.println(throwable.getMessage());
            }

            public void onCompleted() {
                System.out.println("completed");
            }
        };
        StreamObserver<StudentRequest> studentRequestStreamObserver = stub.getStudentsWrapperByAges(studentResponseListStreamObserver);
        studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(20).build());
        studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(30).build());
        studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(40).build());
        studentRequestStreamObserver.onCompleted();

        try {
            Thread.sleep(50000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

因为是异步的所以必须使进程进行休眠才能看到效果

运行服务端后运行客户端

此时服务端

双向流式实现

打开proto文件

message StreamRequest {
    string request_info = 1;
}

message StreamResponse {
    string response_info = 1;
}

新建流式请求与响应参数,然后新建接口方法

service PersonService {
    rpc BiTalk(stream StreamRequest) returns (stream StreamResponse) {}
}

然后实现接口方法

    @Override
    public StreamObserver<StreamRequest> biTalk(StreamObserver<StreamResponse> responseObserver) {
        return new StreamObserver<StreamRequest>() {
            @Override
            public void onNext(StreamRequest streamRequest) {
                System.out.println(streamRequest.getRequestInfo());
                responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo(UUID.randomUUID().toString()).build());
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println(throwable.getMessage());
            }

            @Override
            public void onCompleted() {
                responseObserver.onCompleted();
            }
        };
    }

在客户端中

package com.badao.grpcjava;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.time.LocalDate;
import java.util.Iterator;

public class GrpcClient {
    public static void main(String[] args) {
        ManagedChannel managedChannel  = ManagedChannelBuilder.forAddress("localhost",8899)
                .usePlaintext().build();
        PersonServiceGrpc.PersonServiceStub stub = PersonServiceGrpc.newStub(managedChannel);

        System.out.println("-----------------------------");
        System.out.println("流式请求-流式响应,调用BiTalk");

        StreamObserver<StreamRequest> requestStreamObserver = stub.biTalk(new StreamObserver<StreamResponse>() {
            @Override
            public void onNext(StreamResponse streamResponse) {
                System.out.println(streamResponse.getResponseInfo());
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println(throwable.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("onComplated");
            }
        });


        for(int i =0;i<10;i++)
        {
            requestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDate.now().toString()).build());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }


        try {
            Thread.sleep(50000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行服务端后运行客户端

 

 

示例代码下载

https://download.csdn.net/download/BADAO_LIUMANG_QIZHI/12883063

博客园: https://www.cnblogs.com/badaoliumangqizhi/ 关注公众号 霸道的程序猿 获取编程相关电子书、教程推送与免费下载。
原文地址:https://www.cnblogs.com/badaoliumangqizhi/p/13723374.html