Netty章节十四:Grpc四种服务方法的Java远程调用

Grpc四种服务方法的Java远程调用

快速入门安装grpc请参考官方案例

详细说明

也可参考官方GitHub的grpc-java说明

下载/添加JAR包

添加JAR包。或对于非Android的Maven,添加到您的pom.xml

<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-netty-shaded</artifactId>
  <version>1.29.0</version>
</dependency>
<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-protobuf</artifactId>
  <version>1.29.0</version>
</dependency>
<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-stub</artifactId>
  <version>1.29.0</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
  <groupId>org.apache.tomcat</groupId>
  <artifactId>annotations-api</artifactId>
  <version>6.0.53</version>
  <scope>provided</scope>
</dependency>

或者对于非android的Gradle,增加你的依赖

implementation 'io.grpc:grpc-netty-shaded:1.29.0'
implementation 'io.grpc:grpc-protobuf:1.29.0'
implementation 'io.grpc:grpc-stub:1.29.0'
compileOnly 'org.apache.tomcat:annotations-api:6.0.53' // necessary for Java 9+

对于Android客户端,使用grpc-okhttp代替grpc-net -,使用grpc-protobuf-lite代替grpc-protobuf

implementation 'io.grpc:grpc-okhttp:1.29.0'
implementation 'io.grpc:grpc-protobuf-lite:1.29.0'
implementation 'io.grpc:grpc-stub:1.29.0'
compileOnly 'org.apache.tomcat:annotations-api:6.0.53' // necessary for Java 9+

生成代码

对与基于prtobuf的代码生成你可以将proto文件放置到src/main/protosrc/test/proto 的目录下同时搭配上恰当的插件。

对于使用Maven构建系统集成的基于protobuf的代码生成,您可以使用 protobuf-maven-plugin(Eclipse 与 NetBeans 还应查看IDE文档)

<build>
  <extensions>
    <extension>
      <groupId>kr.motd.maven</groupId>
      <artifactId>os-maven-plugin</artifactId>
      <version>1.6.2</version>
    </extension>
  </extensions>
  <plugins>
    <plugin>
      <groupId>org.xolstice.maven.plugins</groupId>
      <artifactId>protobuf-maven-plugin</artifactId>
      <version>0.6.1</version>
      <configuration>
        <protocArtifact>com.google.protobuf:protoc:3.11.0:exe:${os.detected.classifier}</protocArtifact>
        <pluginId>grpc-java</pluginId>
        <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.29.0:exe:${os.detected.classifier}</pluginArtifact>
      </configuration>
      <executions>
        <execution>
          <goals>
            <goal>compile</goal>
            <goal>compile-custom</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

对于使用Gradle构建系统集成的基于protobuf的代码生成,您可以使用 protobuf-gradle-plugin

plugins {
    id 'com.google.protobuf' version '0.8.8'
}

protobuf {
  protoc {
    artifact = "com.google.protobuf:protoc:3.11.0"
  }
  plugins {
    grpc {
      artifact = 'io.grpc:protoc-gen-grpc-java:1.29.0'
    }
  }
  generateProtoTasks {
    all()*.plugins {
      grpc {}
    }
  }
}

protobuf-gradle-plugin:

  1. 他会使用protoc命令行工具,根据你的*.proto文件来生成Java源文件
  2. 它将生成的Java源文件添加到相应的Java编译单元(Java项目中的sourceSet;,以便它们可以与Java源代码一起编译。

传输层

传输层负责将字节从连线中取出和放入的繁重工作。它的接口是足够的抽象,可以加入不同的实现,传输是以stream/流工厂的形式来进行的建模,注意,传输层API被认为是gRPC内部的API,它比包io.grpc下的核心API具有更弱的API保证。

  1. 基于Netty的传输是基于Netty的主要传输实现。它同时适用于客户机和服务器。
  2. 基于OkHttp的传输是一种基于OkHttp的轻量级传输。它主要用于Android系统,只适用于客户端。
  3. 进程内传输用于服务器与客户端位于同一进程中的情况。它对于测试非常有用,同时对于生产使用也是安全的。

加入依赖

plugins {
    id 'java'
    id 'com.google.protobuf' version '0.8.8'
}

group 'com.sakura'
version '1.0'

sourceCompatibility = 1.8
targetCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    testCompile (
            "junit:junit:4.12"

    )
    compile (
            "io.netty:netty-all:4.1.46.Final",
            'com.google.protobuf:protobuf-java:3.11.4',
            'com.google.protobuf:protobuf-java-util:3.11.4',
            'io.grpc:grpc-netty-shaded:1.28.0',
            'io.grpc:grpc-protobuf:1.28.0',
            'io.grpc:grpc-stub:1.28.0'
    )
}

//构建protobuf插件配置
protobuf {
    //输出目录的根目录名,生成的java文件的位置,会在指定的目录下的main下生成
    generateProtoTasks.generatedFilesBaseDir = "$projectDir/src"

    protoc {
        artifact = "com.google.protobuf:protoc:3.11.0"
    }
    plugins {
        grpc {
            artifact = 'io.grpc:protoc-gen-grpc-java:1.28.0'
        }
    }
    generateProtoTasks {
        all()*.plugins {
            grpc {
                //grpc的接口类的生成目录,默认是grpc
                outputSubDir = 'java'
            }
        }
    }
}

编写一个Proto文件

syntax = "proto3";//定义使用的proto版本

package com.sakura.proto;//所有语言适用的包路径定义语法
option java_package = "com.sakura.proto";//java包路径 优先级高于package
option java_outer_classname = "Student";//生成的外部类名
option java_multiple_files = true;//是否生成多个文件


//定义rpc的方法
service StudentService{
    //1.客户端发出一个普通的请求,服务器的返回一个普通的响应
    rpc GetRealNameByUsername(MyRequest) returns (MyResponse);
    //grpc的请求以及响应不能是基本数据类型,必须是一个message类型,不管请求里有几个参数
    //他必须是你定义的一个message类型的
    //2.根据学生的年龄获取与这个年龄相等的学生对象客户端发生一个普通的请求,服务器的以流的形式返回
    rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse);
    //3.以流式的方式请求一个StudentRequest服务器会返回一个StudentResponseList
    rpc GetStudentsWrapperByAges(stream StudentRequest) returns (StudentResponseList){}
    //4.客户源与服务端都以流式的方式,双向的数据流传递
    rpc BiTalk(stream StreamRequest) returns (stream StreamResponse);
}


//消息
message MyRequest{
    string username = 1;
}
message MyResponse{
    string realname = 2;
}

//单向流使用的消息
message StudentResponse{
    string name = 1;
    int32 age = 2;
    string city = 3;
}
message StudentRequest{
    int32 age = 1;
}
message StudentResponseList{
    repeated StudentResponse studentResponse = 1;
}

//双向数据流传递使用的消息
message StreamRequest{
    string request_info = 1;
}

message StreamResponse{
    string response_info = 1;
}

服务端代码

public class GrpcServer {

    private Server server;

    public static void main(String[] args) throws Exception{
        GrpcServer server = new GrpcServer();

        server.start();
        server.awaitTermination();

    }

    private void start()throws Exception{
        //创建服务通道配置端口,传入映射的方法的实现类,然后构建并启动
        this.server = ServerBuilder.forPort(8899).addService(new StudentServiceImpl())
                .build().start();

        System.out.println("server started!");
        //设置一个回调钩子
        Runtime.getRuntime().addShutdownHook(new Thread(() ->{
            System.out.println("JVM 关闭");
            try {
                this.stop();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));
        System.out.println("执行到这里");
    }

    private void stop() throws InterruptedException {
        if(this.server != null){
            //关闭服务
            this.server.shutdown();
        }
    }

    private void awaitTermination() throws InterruptedException {
        if(this.server != null){
            //等待终止,让服务不停止,可以设置超时时长
            this.server.awaitTermination();
            //this.server.awaitTermination(3000, TimeUnit.MILLISECONDS);
        }
    }

}

服务接口实现类

/**
 * @ClassName : StudentServiceImpl
 * @Description : 远程调用的方法的具体实现,实现生成代码中的内部抽象类
 */
public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {

    /**
     * 重写父类的方法
     * @param request 客户端发来的数据
     * @param responseObserver 响应观察者 用于响应客户端的对象
     */
    @Override
    public void getRealNameByUsername(MyRequest request, StreamObserver<MyResponse> responseObserver) {
        System.out.println("接收到客户端信息:" + request.getUsername());
        /*
            onCompleted()   标示这个方法调用结束,只能调用一次
            onError()   异常时调用
            onNext()    接下来要做什么事,可以用于结果返回
         */
        //构造响应对象,并返回
        responseObserver.onNext(MyResponse.newBuilder().setRealname("星空").build());
        //标示服务器处理结束
        responseObserver.onCompleted();
    }


    @Override
    public void getStudentsByAge(StudentRequest request, StreamObserver<StudentResponse> responseObserver) {
        System.out.println("接受到客户端信息:" + request.getAge());
        responseObserver.onNext(StudentResponse.newBuilder().setName("彩虹海").setAge(18).setCity("北京").build());
        responseObserver.onNext(StudentResponse.newBuilder().setName("彩虹海2").setAge(20).setCity("上海").build());
        responseObserver.onNext(StudentResponse.newBuilder().setName("彩虹海3").setAge(22).setCity("广州").build());
        responseObserver.onNext(StudentResponse.newBuilder().setName("彩虹海4").setAge(18).setCity("深圳").build());

        responseObserver.onCompleted();
    }

    @Override
    public StreamObserver<StudentRequest> getStudentsWrapperByAges(StreamObserver<StudentResponseList> responseObserver) {
        //实现StreamObserver接口,实现方法当特定的事件触发时,回调方法就会的到调用
        return new StreamObserver<StudentRequest>() {
            /**
             * 接收客户端的请求,请求到来时被调用
             * 每来一次请求,onNext()方法就会被调用一次
             * 因为请求是流式的,onNext会被调用多次
             * @param value
             */
            @Override
            public void onNext(StudentRequest value) {
                System.out.println("onNext:" + value.getAge());
            }

            /**
             * 出现异常时被调用
             * @param t
             */
            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            /**
             * 表示客户端将流式数据全部发给服务器端之后,客户端就会有一个onCompleted事件,服务器端就会感知到
             * 然后服务器端在onCompleted中为客户端返回最终结果
             */
            @Override
            public void onCompleted() {
                StudentResponse.Builder studentResponse =
                        StudentResponse.newBuilder().setName("彩虹海1").setAge(18).setCity("宇宙");
                StudentResponse.Builder studentResponse2 =
                        StudentResponse.newBuilder().setName("彩虹海2").setAge(20).setCity("宇宙");
                StudentResponseList studentResponseList = StudentResponseList.newBuilder().addStudentResponse(studentResponse)
                        .addStudentResponse(studentResponse2).build();
                //返回结果
                responseObserver.onNext(studentResponseList);
                //表示处理完成
                responseObserver.onCompleted();
            }
        };
    }

    @Override
    public StreamObserver<StreamRequest> biTalk(StreamObserver<StreamResponse> responseObserver) {
        return new StreamObserver<StreamRequest>() {
            /**
             * 客户端发来请求时被调用,每请求一次则被调用一次
             * @param value 客户端发来的数据
             */
            @Override
            public void onNext(StreamRequest value) {
                //打印客户端发来的数据
                System.out.println(value.getRequestInfo());

                //向客户端返回数据
                responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo(UUID.randomUUID().toString()).build());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            /**
             * 客户端的onCompleted方法被调用时,被调用
             */
            @Override
            public void onCompleted() {
                //双向的数据流传递,虽然是在两个不同的流中传递互不干扰
                //但是当一方的流被关闭时另一方也要关闭与之交互的流
                responseObserver.onCompleted();
            }
        };
    }
}

客户端代码

/**
 * @ClassName : GrpcClient
 * @Description : grpc client
 */
public class GrpcClient {


    public static void main(String[] args) throws InterruptedException {
        //usePlaintext()使用纯文本的方式,不加密
        ManagedChannel managedChannel =
                ManagedChannelBuilder.forTarget("localhost:8899").usePlaintext().build();
        //客户端与服务端交互的对象  server与client通信的对象
        //blockingStub 阻塞的方式/同步  发出一个请求一定要等到另一端返回了响应才继续往下执行
        StudentServiceGrpc.StudentServiceBlockingStub blockingStub =
                StudentServiceGrpc.newBlockingStub(managedChannel);
        //只要是客户端是以流式的方式向服务器发送请求,这种请求一定以异步的
        //blockingStub是同步的阻塞的,则不会被提供方法
        //获取一个异步的通信对象
        //创建一个支持该服务的所有呼叫类型的新异步存根,不会等待对方响应会一直向下执行
        StudentServiceGrpc.StudentServiceStub stub =
                StudentServiceGrpc.newStub(managedChannel);
/*        //构建消息
        MyRequest request = MyRequest.newBuilder().setUsername("出发,目标彩虹海").build();
        //调用具体方法,接收到响应
        MyResponse response = blockingStub.getRealNameByUsername(request);
        System.out.println("接收到服务器信息:" + response.getRealname());

        System.out.println("--------------------普通请求与响应 结束----------------------");
*/
 /*       
        //返回一个流式的响应就是一个迭代器,每返回一个对象就进入到迭代器中,再返回对象再进入迭代器,以此类推
        Iterator<StudentResponse> iter =
                blockingStub.getStudentsByAge(StudentRequest.newBuilder().setAge(18).build());
        //iter.hasNext()  还有没有下一个
        while (iter.hasNext()){
            StudentResponse studentResponse = iter.next();
            System.out.println(studentResponse.getName() + " , " +
                    studentResponse.getAge() + " , " + studentResponse.getCity());
        }
        System.out.println("-----------------------普通请求 流式响应 结束-------------------");
        
*/
        //客户端请求一个steam(流式) blockingStub(同步)无法使用 只有使用异步形式
        //构造接收服务端信息的方法
/*      StreamObserver<StudentResponseList> studentResponseListStreamObserver = new StreamObserver<StudentResponseList>() {
            /**
             * 服务端向客户端响应结果时会被调用
             * 服务端返回的数据,每返回一次数据则被调用一次
             * 如果服务器端也是流式的并且返回了多个数据,那么每次返回数据的时候都会被调用一次
             * @param value
             *//*
            @Override
            public void onNext(StudentResponseList value) {
                value.getStudentResponseList().forEach(studentResponse -> {
                    System.out.println(studentResponse.getName() + " , " +
                            studentResponse.getAge() + " , " + studentResponse.getCity());
                    System.out.println("**************");
                });
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }
        };

        //构造客户端向服务端发送的数据
        //getStudentsWrapperByAges(传入处理服务端返回数据的回调对象)
        StreamObserver<StudentRequest> studentsWrapperByAges =
                stub.getStudentsWrapperByAges(studentResponseListStreamObserver);
        //发送数据
        studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(18).build());
        studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(28).build());
        studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(38).build());
        studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(48).build());
        //表示客户端调用结束
        studentsWrapperByAges.onCompleted();
        System.out.println("-----------------------流式请求 普通响应 结束-------------------");
*/

        StreamObserver<StreamRequest> streamRequestStreamObserver =
                stub.biTalk(new StreamObserver<StreamResponse>() {
            /**
             * 收到服务器响应结果时,被调用
             * @param value 服务器返回的数据
             */
            @Override
            public void onNext(StreamResponse value) {
                System.out.println(value.getResponseInfo());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            /**
             * 服务器端onCompleted()被调用时,被触发
             */
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }
        });

        for (int i = 0; i < 10; i++) {
            streamRequestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDateTime.now().toString()).build());
            Thread.sleep(1000);
        }

        streamRequestStreamObserver.onCompleted();
		System.out.println("-----------------------双向数据流传递 结束-------------------");

        //客户端向服务器端发送数据,数据还没发送就继续向下执行走到了onCompleted(),
        //然后在数据还未发出时程序就正常执行完成结束了
        //线程睡眠,强制让程序等待,等待studentsWrapperByAges将数据全部发送给服务器端
        //如果不睡眠的话,数据还没发送给服务器端,jvm就停止了,也就发送不出去了
        Thread.sleep(5000);
    }
}

原文地址:https://www.cnblogs.com/mikisakura/p/12983862.html