双向流模式

syntax = "proto3";
package services;
import "Models.proto";
message UserScoreRequest {
    repeated UserInfo users = 1;
}
message UserScoreResponse {
    repeated UserInfo users = 1;
}
service UserService {
    rpc GetUserScore (UserScoreRequest) returns (UserScoreResponse);
    rpc GetUserScoreByServerStream (UserScoreRequest) returns (stream UserScoreResponse);
    rpc GetUserScoreByClientSteam (stream UserScoreRequest) returns (UserScoreResponse);
    rpc GetUserScoreByTWS (stream UserScoreRequest) returns (stream UserScoreResponse); //双向流模式
}

服务端代码

package services

import (
    "context"
    "io"
    "log"
)

type UserService struct {
}

func (this *UserService) GetUserScore(ctx context.Context, in *UserScoreRequest) (*UserScoreResponse, error) {
    var score int32 = 101
    users := make([]*UserInfo, 0)
    for _, user := range in.Users {
        user.UserScore = score
        score++
        users = append(users, user)
    }
    return &UserScoreResponse{Users: users}, nil
}

//服务端流
func (this *UserService) GetUserScoreByServerStream(in *UserScoreRequest, stream UserService_GetUserScoreByServerStreamServer) error {
    var score int32 = 101
    users := make([]*UserInfo, 0)
    for index, user := range in.Users {
        user.UserScore = score
        score++
        users = append(users, user)
        if (index+1)%2 == 0 && index > 0 { //吗每隔两条发送,>0是第一条不处理
            err := stream.Send(&UserScoreResponse{Users: users})
            if err != nil {
                return err
            }
            users = users[0:0]
        }
    }
    if len(users) > 0 { //因为每两条发送一次,如果是奇数最后一条数据就没有发送出去 就漏掉了,所以这里要补发
        err := stream.Send(&UserScoreResponse{Users: users})
        if err != nil {
            return err
        }
    }
    return nil
}

//客户端流
func (this *UserService) GetUserScoreByClientSteam(stream UserService_GetUserScoreByClientSteamServer) error {
    //客户端流一般用于服务端接收数据耗时比较小,速度比较快,但是客户端发送的比较慢,所以为了避免服务端在等待客户端发送的过程中浪费时间,可以先按批次处理客户端发送过来的数据,最后再完整的返回给客户端
    var score int32 = 101
    users := make([]*UserInfo, 0)
    for {
        req, err := stream.Recv()
        if err == io.EOF { //接收完了,我再客户端设置的一次发送五条,而服务端会先处理完每次的五条,直到客户端发送完所有的数据,才会走这个判断,这时候才会把处理好的数据全部发送给客户端
            return stream.SendAndClose(&UserScoreResponse{Users: users}) //发送并关闭流
        }
        if err != nil {
            return err
        }
        for _, user := range req.Users {
            user.UserScore = score //好比是服务端做的业务处理
            score++
            users = append(users, user)
        }
    }
}

//双向流
func (this *UserService) GetUserScoreByTWS(stream UserService_GetUserScoreByTWSServer) error {
    var score int32 = 101
    users := make([]*UserInfo, 0)
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }
        for _, user := range req.Users {
            user.UserScore = score //好比是服务端做的业务处理
            score++
            users = append(users, user)
        }
        err = stream.Send(&UserScoreResponse{Users: users}) //接收客户端分批发过来的数据,收到一批处理完直接返回,然后再去接收下一批再处理
        if err != nil {
            log.Println(err)
            return err
        }
        users = (users)[0:0] //清空本次发送的数据
        //return 服务端如果想关闭stream设置好关闭的条件然后return就可以了
    }
}

客户端代码

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "grpccli/helper"
    "grpccli/services"
    "io"
    "log"
)

func main() {
    //creds, err := credentials.NewClientTLSFromFile("keys/server.crt", "localhost")
    //if err != nil {
    //    log.Fatal(err)+
    //}

    creds := helper.GetClientCreds()

    conn, err := grpc.Dial(":8081", grpc.WithTransportCredentials(creds))
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()
    userClient := services.NewUserServiceClient(conn)
    var i int32
    stream, err := userClient.GetUserScoreByTWS(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    var uid int32 = 1
    for {
        req := services.UserScoreRequest{}
        req.Users = make([]*services.UserInfo, 0)
        for i = 0; i < 5; i++ { //加入5条信息,假设是一个耗时的操作
            req.Users = append(req.Users, &services.UserInfo{UserId: uid})
            uid++
        }
        err := stream.Send(&req) //发送部分数据先给服务器去处理
        if err != nil {
            log.Println(err)
        }
        res, err := stream.Recv() //先接收服务器处理后返回的部分数据
        if err == io.EOF {
            fmt.Println(res.Users)
            break
        }
        if err != nil {
            fmt.Println(res.Users)
            log.Println(err)
        }
        fmt.Println(res.Users)

    }
}




原文地址:https://www.cnblogs.com/hualou/p/12070297.html