Netty章节十:Apache Thrift 安装及快速入门/框架介绍

Apache Thrift是什么?

Apache Thrift软件框架用于可扩展的跨语言服务开发,将软件堆栈与代码生成引擎相结合,构建可在C ++,Java,Python,PHP,Ruby,Erlang,Perl,Haskell,C#之间高效无缝工作的服务, Cocoa,JavaScript,Node.js,Smalltalk,OCaml和Delphi等语言。

Thrift最初由facebook研发,主要用于各个服务之间的RPC通信,支持跨语言,支持的语言有C++,Java,Python,PHP,Ruby,Erlang,PErl,Haskell,C#,Cocoa,JavaScript,Node.js,
Smalltalk,and OCaml都支持。

Thrift是一个典型的CS(客户端/服务端)结构,客户端和服务端可以使用不同的语言开发。既然客户端和服务器端能使用不同的语言开发,那么一定就要有一种中间语言来关联客户端和服务器端的语言。这种语言就是IDL(Interface Description Language)。

Thrift不支持无符号类型,因为很多编程语言不存在无符号类型,比如说java。一个RPC框架如果支持多种语言,那么这个RPC框架所支持的数据类型一定是这个RPC框架多语言支持的数据类型的交集。

Apache Thrift 概念

Thrift支持的数据类型

  • bool: 布尔类型(true或者false)
  • byte: 有符号字节
  • i16: 16位有符号整数
  • i32: 32位有符号整数
  • i64: 64位有符号整数
  • double: 64位浮点数
  • string: 字符串

集合中的元素可以是除了service之外的任何类型,包括exception。

Thrift支持三种组件分别是

  1. Structs(结构体,编译生成之后就是类)
  2. Service(客户端和服务端通信的接口)
  3. exception(客户端和服务端通信接口抛出的异常)

结构体(struct)

就像C语言一样,Thrift支持struct类型,目的就是将一些数据聚合在一起,方便传输管理,struct的定义形式如下:

struct People{
    1:string name;
    2:i32 age;
    3:string gender;
 }

异常(exception)

Thrift支持自定义exception,规则与struct一样

exception RequestException{
    1: i32 code;
    2: string reason;
 }

服务(service)

Thrift定义服务相当于Java中创建Interface一样,创建的service经过代码生成命令之后就会生成客户端和服务器端的框架代码。定义形式如下:

service HelloWorldService{
    //service中定义的函数,相当于Java Interface中定义的方法
    string hello(1:required String username),

}

枚举(enum)

枚举的定义形式和Java的Enum定义类似

enum Gender{
    MALE,
    FEMALE
}

常量(const)

Thrift也支持常量定义,使用const关键字

const i32 MIN_GATE=30
const string URL="https://www.google.com"

类型定义/别名

Thrift支持类似C++一样的typedef定义,比如我们对i32不熟悉,我们就使用int类代替i32,比如我们对i64不熟悉,我们就使用long代替i64

typedef i32 int 
typedef i64 long

文件包含
Thrift也支持文件包含,相当于C/C++中的include,java中的import。使用关键字include定义:

include "thrift.generated"

支持的传输格式(协议)

  1. TBinaryProtocol 二进制格式
  2. TCompactProtocol 压缩格式(优于二进制格式) 推荐使用
  3. TJSONProtocol JSON格式
  4. TSimpleJSONProtocol 提供JSON只写协议,生成的文件很容易通过脚本语言解析(无法通过程序很容易的读取,因为他却少必要的元数据信息(metadata),也就是对他解码的时候没有一个参照的标准不知道该怎么解析,‘只写’:他可以生成TSimpleJSONProtocol协议要求的格式(可以写出去),但是对端无法再将数据读取回来(无法解析))
  5. TDebugProtocol 使用易懂的可读的文本格式,以便于debug

支持的数据传输方式

  1. TSocket 阻塞式socket(类似Java中的ServerScoket)
  2. TFramedTransport 以frame为单位进行传输,非阻塞式服务中使用(他会将一端传输给另一端的数据分成一个一个的frame,类似于WebSocket) 推荐使用
  3. TFileTransport 以文件形式进行传输
  4. TMenoryTransport 将内存用于I/O Java实现时内部实际使用了简单的ByteArrayOutputStream
  5. TZlibTransport 使用zlib协议进行压缩,与其他传输方式联合使用,当前无Java实现

支持的服务模型

  1. TSimpleServer 简单的单线程服务模型,常用于测试
  2. TThreadPoolServer 多线程服务模型,使用标准的阻塞式IO
  3. TNonblockingServer 多线程服务模型,使用非阻塞式IO(需使用TFramedTransport数据传输方式)
  4. THsHaServer THsHa引入了线程池去处理,其模型把读写任务放到线程池去处理。Half-sync/Half-async(半同步半异步)的处理模式,Half-aysnc是在处理IO事件上(accept/read/write io),Half-sync用于handler对rpc的同步处理(需使用TFramedTransport数据传输方式) 推荐使用

最佳结合方案

使用TCompactProtocol作为传输格式,使用TFramedTransport作为传输方式,使用THsHaServer作为服务模型

Thrift支持的容器类型

  1. list:一系列由T类型的数据组成的有序列表,元素可以重复。

  2. set:一系列由T类型的数据组成的无序集合,元素不可重复。

  3. map:一个字典结构,key为k类型,value为V类型,相当于java中的HashMap

    以上集合容器都可以使用泛型的。

Thrift 架构

请输入图片描述
请输入图片描述

Thrift框架实际上实现了C/S通信模型

  • 通过代码生成工具,生成客户端和服务端代码(可以为不同语言),实现跨语言支持
  • 生成的代码主要完成数据结构化解析、发送和接收,通过processor调用服务端处理逻辑
  • TProtocal为协议层,主要实现各种格式的序列化协议,如二进制、JSON和压缩格式等
  • TTransport为传输层,主要实现了阻塞IO和非阻塞IO的实现
  • 底层IO传输,主要使用socket、http等一些传输协议

Thrift 组件

Thrift的核心组件, 主要包含以下几个方面

  • IDL服务描述组件,负责完成跨平台和跨语言(针对不同语言完成了Server层和Client代码的生成)
  • TServer和Client,服务端和客户端组件的实现
  • TProtocal 协议和解编码组件
  • TTransport 传输组件
  • TProcessor 服务调用组件,完成对服务实现的调用

Thrift Server

  • Thrift Server的职责是将Thrift支持的各种特性结合起来。
    • 创建传输Transport并为Transport创建输入或输出TProtocal
    • 创建基于输入或输出的处理器processor(process调用服务端业务实现)
    • 等待连接建立并将数据交给处理器processor,处理完成返回client
  • Thrift服务端的实现,目前主要有TSimpleServer、TNonblockingServer、THsHaServer、TThreadPoolServer、TThreadSelectorServer的实现,当前生产环境中主要使用的是TThreadPoolServer的实现。

TSimpleServer

请输入图片描述

TSimpleServer的工作模式最简单地阻塞IO,一次只能接收和处理一个Socket连接,效率比较低,生产中并不会使用这种Server的实现

TNonblockingServer

非阻塞服务模式实现,对所有客户端的调用几乎是公平,该服务模式采用的是单线程工作,但采用的时NIO的实现方式。
请输入图片描述

  • 该工作模式效率提升主要体现在IO多路复用上, 采用nio同时监听多个socket的状态变化
  • 仍然采用单线程顺序执行,在业务处理复杂和耗时的情况下,效率仍然是不高的

THsHaServer

半同步半异步模式,THsHaServer是TNonblockingServer的子类,因为TNonblockingServer仍然采用一个县城完成socket的监听和业务处理,效率相对较低。THsHaServer引入了线程池专门进行业务处理

请输入图片描述

  • 主线程只读取数据,业务处理交给线程池完成处理,主线程效率大大提升
  • 主线程仍然要对所有的socket监听和读取,当并发大和发送数据较多的情况下,监听的socket请求不能及时接受

TThreadPoolServer

TThreadPoolServer模式采用阻塞socket方式工作,主线程负责阻塞监听新socket,业务处理交给线程池处理
请输入图片描述

  • 线程池模式中,数据读取和业务处理都交给线程池处理,主线程只负责监听,因此在并发量较大情况下也能及时接受
  • 线程池处理模式,比较适合服务端能够预知多少客户端并发的情况,这样每个请求都能够及时处理,性能也相对理想
  • 线程池模式的处理能够受限于线程池的工作能力,在高并发情况下,新的请求只能够排队等待

TThreadSelectorServer

ThreadSelectorServer是目前Thrift提供的最高级的工作模式,其内部主要的工作流程如下

  • 一个accept thread线程对象,专门用于处理监听socket新连接

  • 若干个selector thread线程对象,专门用于处理业务socket上得IO,所有网络读写都由selector thread完成

  • 一个负载均衡器(SelectorThreadLocadBalancer),主要用于accept thread接收到新socket请求时,决定分配请求到selector thread

  • ExecutorService工作线程池,用于业务处理,在selector thread 读取socket请求数据,交给业务线程池具体执行
    请输入图片描述

  • 专门的accept thread用于接收新socket请求,可以接受大量的请求

  • socket请求经过负载均衡器分散到selector thread,可以应对io读写较大的情况

  • executor工作线程池,具体执行业务逻辑,可以发挥服务端最大的工作能力

TTransport

  • TTransport传输层提供了和网络之间交互的读写抽象,这使得Thrift能够将底层传输和系统其他部分(例如序列化和反序列化)分离开来。
  • Transport暴露的接口主要有open、close、read、write、flush等
  • 除了Transport提供的上卖弄接口,Thrift提供了用于接收和创建原始对象的ServerTransport接口,主要用于服务端为传入的链接创建新的传输对象。open、listen、accept和close等
  • 同时Thrift还提供了文件传输和HTTP传输等传输实现

客户端Transport实现

  • 客户端的传输实现主要分为两类,阻塞传输实现和非阻塞传输实现
  • 阻塞传输实现主要在TIOStreamTransport和TSocket中实现
    • TIOStreamTransport是最常用的传输层实现,它通过一个输入流和输出流实现了传输层的所有操作,其和Java的结构完美兼容(Java实现了各种IO流)
    • TSocket是通过Socket完成对Thrift传输实现,是客户端Socket连接实现和服务端传输的连接实现
  • 阻塞传输相关类TNonblockingTransport(接口定义)和TNonblockingSocket(java nio中SocketChannel的包装实现)
  • THttpClient是http的传输实现,主要用于服务端是HTTP服务,作为thrift的客户端的请求读取实现

服务端Transport实现

  • TServerSocket是通过包装ServerSocket的传输实现,是一种阻塞的传输实现
  • TNonblockserServerSocket是一种通过包装nio的ServerSocketChannel的实现,基础传输还是ServerSocket

缓存传输实现

  • TMemoryInputTransport 封装了字节数组byte[]作为输入流的封装,从系统缓冲区读取数据,不支持写缓存。TMemoryBuffer则通过TByteArrayOutputStream作为输出流的封装,支持缓存读也支持往缓冲区写入数据。
  • TFrameTransport是一种缓冲的Transport实现,它通过在每个消息前都有一个4个字节的帧消息来保证每次读取完整的消息
    • 封装TMemoryInputTransport作为输入流、TByteArrayOutputStream作为输出流,作为内存缓冲区的封装
    • TFrameTransport的flush执行时,会先写4byte的消息头,然后写入消息体
    • 在读取消息时,也会先读取4byte的长度,然后在读取消息体
  • TFastFramedTransport是一种内存利用率更高的内存读写实现,它使用自动增长的byte[](长度不够时才new),而不是每次都new一个byte[],从而提升了内存的使用率。其余实现和TFramedTransport一样,也会有消息头作为帧来记录消息的长度

其他传输实现介绍

  • TFileTransport 文件传输实现,基于Event的异步实现
  • TZlibTransport 基于zlib库的解压缩传输实现,通过压缩减少网络传输
  • TSaslTransport 是基于Simple Authentication Security Layer的认证实现

传输层实现总结

  • Thrift的传输层采用装饰器模式实现了包装IO流,可以通过包装流和节点流的概念区分各种Transport实现
  • 节点流表示自身采用byte[]提供IO读写的实现,包装流表示封装类其他传输实现提供IO的读写
  • 包装流主要是TFrame的传输实现,其实现是在写完消息flush时,回家上4byte的消息头,读消息的时候也会读取4byte的消息头
  • Thrift协议和具体的传输对象绑定,协议使用具体的Transport来实现数据的读取

TProtocol

协议抽象定义了将内存数据映射到有线格式的机制。换句话说,协议规定了数据类型如何使用底层传输对自身进行编码/解码。因为,协议实现了管理编码方案并负责(反)序列化。这里指的序列化协议的例子包含JSON、XML、纯文本、紧凑二进制等。 Thrift实现的协议如下:

  • 二进制,字段的长度和类型编码为字节码
  • 压缩实现 THRIFT-110
  • JSON实现

TBinaryProtocol

是一种字节流读取的实现,String类型读取是通过nio实现,其余类型通过原生数据直接读取实现。核心代码如下:

public ByteBuffer readBinary() throws TException {
    int size = readI32();

    checkStringReadLength(size);

    if (trans_.getBytesRemainingInBuffer() >= size) {
      ByteBuffer bb = ByteBuffer.wrap(trans_.getBuffer(), trans_.getBufferPosition(), size);
      trans_.consumeBuffer(size);
      return bb;
    }

    byte[] buf = new byte[size];
    trans_.readAll(buf, 0, size);
    return ByteBuffer.wrap(buf);
  }

  private void checkStringReadLength(int length) throws TProtocolException {
    if (length < 0) {
      throw new TProtocolException(TProtocolException.NEGATIVE_SIZE,
                                   "Negative length: " + length);
    }
    if (stringLengthLimit_ != NO_LENGTH_LIMIT && length > stringLengthLimit_) {
      throw new TProtocolException(TProtocolException.SIZE_LIMIT,
                                   "Length exceeded max allowed: " + length);
    }
  }

TCompactProtocol

TCompactProtocol协议作为TBinaryProtocol协议的升级强化版,都作为二进制编码传输方式,采用了一种乐器MIDI文件的编码方法。详细描述参见 THRIFT-110

  1. ZigZag——有符号数编码
编码前 编码后
0 0
-1 1
1 2
-2 3
2 4
-3 5

其效果等效于正数等于原先 * 2,负数变正数

32bits int = (i « 1) ^ (i » 31), 64bits long = (l « 1) ^ (l » 63)

  1. VLQ——编码压缩 A variable-length quantity (VLQ) 是一种通用编码,使用任意数量的二进制八位字节(8bit字节)来表示一个任意大的整数,其没定义为MIDI格式以节省空间资源。这种编码也被用于表示表式扩展音乐格式(XMF)中。即VLQ本质上就是用一个无符号的最大128来表示一个无符号的整数,并增加了一个第八位来表示字节是否继续。 即一字节的最高位(MHB)为标志位,不参与具体的内容,意思数值的大小仅仅有其它七位来表示。当最高位bit为1时,表示下一个byte也是该数值的内容(下一个byte的低七位bits);当最高位bit为0时,下一个byte不参与其中。通过这样的方式,而不是int固定的4个bytes,long 8个bytes来讲,对于小数,能节约不少的空间大小;但凡事有利有弊,当数值比较大时,就要占用更多的空间,例如较大的int ,需要5bytes,较大的long需要10bytes. 编码假定八位位组(八位字节),其中最高有效位(MSB)(通常也称为符号位)被保留以指示是否有另一个VLQ八位组

VLQ 八位字节

7 6 5 4 3 2 1 0
2^7 2^6 2^5 2^4 2^3 2^2 2^1 2^0
A Bn

如果A是0,那么这是整数的最后一个VLQ八位字节。如果A是1,则接下来是另一个VLQ字节。 B是7位数字[0x00,0x7F],n是VLQ八位字节的位置,其中B 0是最不重要的。VLQ八位组在流中首先排列得最重要

两种编码的结合

当VLQ编码遇到负数时,例如:long -1; 0XFFFFFFFFFFFFFFFF,就需要10bytes了,通过和ZigZag的结合,把负数转变相应的正数。当正数,负数的 数值 较小时,都可以通过两者的结合,有效的压缩占用的空间大小。但同上,数值较大不可避免的占用比平常正常编码更多的空间。

106903转化为VLQ字节码例子

请输入图片描述

其他转换例子

Integer Variable-length quantity
0x00000000 0x00
0x0000007F 0x7F
0x00000080 0x81 0x00
0x00002000 0xC0 0x00
0x00003FFF 0xFF 0x7F
0x00004000 0x81 0x80 0x00
0x001FFFFF 0xFF 0xFF 0x7F
0x00200000 0x81 0x80 0x80 0x00
0x08000000 0xC0 0x80 0x80 0x00
0x0FFFFFFF 0xFF 0xFF 0xFF 0x7F

writeVarint32实现实现

private void writeVarint32(int n) throws TException { 
    int idx = 0; 
    while (true) { 
      if ((n & ~0x7F) == 0) { 
        temp[idx++] = (byte)n; 
        // writeByteDirect((byte)n); 
        break; 
        // return; 
      } else { 
        temp[idx++] = (byte)((n & 0x7F) | 0x80); 
        // writeByteDirect((byte)((n & 0x7F) | 0x80)); 
        n >>>= 7; 
      } 
    } 
    trans_.write(temp, 0, idx);
}

variable-length-quantiry

TJSONProtocal实现

  • TJSONProtocol 和 TSimpleJSONProtocol 两种实现。
  • 实现比较简单,不再赘述。

Processor

  • Processor封装了从输入流中读取数据并写入输出流的能力。
  • 输入流和输出流由协议对象表示,处理结构非常接单
interface TProcessor {
    bool process(TProtocol in, TProtocol out) throws TException
}
  • 服务响应的处理器由编译器生成的代码,并由服务端业务实现。
  • 处理器实际上是从线路(通过协议输入流)读取数据,然后委托给处理程序(用户实现执行)
  • 处理程序结果,通过线路(通过协议输出流),写入响应中,客户端得到结果

TBaseProcessor实现

public abstract class TBaseProcessor<I> implements TProcessor {
  private final I iface;
  private final Map<String,ProcessFunction<I, ? extends TBase>> processMap;

  protected TBaseProcessor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processFunctionMap) {
    this.iface = iface;
    this.processMap = processFunctionMap;
  }

  public Map<String,ProcessFunction<I, ? extends TBase>> getProcessMapView() {
    return Collections.unmodifiableMap(processMap);
  }

  @Override
  public boolean process(TProtocol in, TProtocol out) throws TException {
    TMessage msg = in.readMessageBegin();
    ProcessFunction fn = processMap.get(msg.name);
    if (fn == null) {
      TProtocolUtil.skip(in, TType.STRUCT);
      in.readMessageEnd();
      TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
      out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
      x.write(out);
      out.writeMessageEnd();
      out.getTransport().flush();
      return true;
    }
    fn.process(msg.seqid, in, out, iface);
    return true;
  }
}

TBaseAsyncProcessor

public class TBaseAsyncProcessor<I> implements TAsyncProcessor, TProcessor {
    protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());

    final I iface;
    final Map<String,AsyncProcessFunction<I, ? extends TBase,?>> processMap;

    public TBaseAsyncProcessor(I iface, Map<String, AsyncProcessFunction<I, ? extends TBase,?>> processMap) {
        this.iface = iface;
        this.processMap = processMap;
    }

    public Map<String,AsyncProcessFunction<I, ? extends TBase,?>> getProcessMapView() {
        return Collections.unmodifiableMap(processMap);
    }

    public boolean process(final AsyncFrameBuffer fb) throws TException {

        final TProtocol in = fb.getInputProtocol();
        final TProtocol out = fb.getOutputProtocol();

        //Find processing function
        final TMessage msg = in.readMessageBegin();
        AsyncProcessFunction fn = processMap.get(msg.name);
        if (fn == null) {
            TProtocolUtil.skip(in, TType.STRUCT);
            in.readMessageEnd();
            if (!fn.isOneway()) {
              TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
              out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
              x.write(out);
              out.writeMessageEnd();
              out.getTransport().flush();
            }
            fb.responseReady();
            return true;
        }

        //Get Args
        TBase args = fn.getEmptyArgsInstance();

        try {
            args.read(in);
        } catch (TProtocolException e) {
            in.readMessageEnd();
            if (!fn.isOneway()) {
              TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
              out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
              x.write(out);
              out.writeMessageEnd();
              out.getTransport().flush();
            }
            fb.responseReady();
            return true;
        }
        in.readMessageEnd();

        if (fn.isOneway()) {
          fb.responseReady();
        }

        //start off processing function
        AsyncMethodCallback resultHandler = fn.getResultHandler(fb, msg.seqid);
        try {
          fn.start(iface, args, resultHandler);
        } catch (Exception e) {
          resultHandler.onError(e);
        }
        return true;
    }

    @Override
    public boolean process(TProtocol in, TProtocol out) throws TException {
        return false;
    }
}

以上两种Processor的实现细节都在 FrameBuffer 和 AsyncFrameBuffer

  • FrameBuffer是Thrift NIO服务器端的一个核心组件,它一方面承担了NIO编程中的缓冲区的功能,另一方面还承担了RPC方法调用的职责。
    • 实现了客户端和服务端交互的状态机
    • 管理读取帧的大小和帧数据,将其作为一个包装数据进行数据传递,然后将响应数据写会客户端
    • 在这个过程中,它管理为客户端管理翻动选中key区域数据的读写
  • AsyncFrameBuffer是FrameBuffer的子类,主要功能和FrameBuffer,主要实现了异步的处理器的读写

Thrfit 服务过程解析

Server端

HelloServiceServer启动过程和客户端调用过程

请输入图片描述
过程详解

  1. 程序调用TheadPoolServer的serve方法后,server进入阻塞监听状态,阻塞在TServerSocket的accept方法上
  2. 当接收到客户端的调用请求,服务端创建新线程处理请求,原线程再次进入阻塞状态
  3. 新线程中同步TBinaryProtocol协议读取消息内容,调用HelloServerImpl的helloVoid方法,并将helloVoid_result中传回客户端

Client端

HelloServiceClient调用过程和接收返回结果过程

请输入图片描述

  1. 程序调用Hello.Client的helloVoid方法
  2. 在helloVoid中通过send_helloVoid发送对服务端请求,通过recv_helloVoid方法接收对服务请求后返回的结果

Thrift的安装

官方网站提供的下载安装地址,根据不同的操作系统选择自己的安装方式

Linux电脑可使用系统的包管理器安装,本次示例系统为 Archlinux

安装Apache Thrift:

sudo pacman -S thrift

查看Thrift 信息

❯ thrift -version                                                       
Thrift version 0.13.0

快速入门/第一个Thrift程序

定义IDL文件

#定义命名空间: namespace 语言名 路径
namespace java thrift.generated
namespace py py.thrift.generated

#定义类型别名
typedef i16 short
typedef i32 int
typedef i64 long
typedef bool boolean
typedef string String

#定义一个消息/对象/结构体  关键字struct
struct Person{
    1:optional String username,
    2:optional int age,
    3:optional boolean married
}
/*说明:
    唯一标记数:修饰符 数据类型 属性名
  修饰符: 默认就是optional
       required 必须的,必须存在,必须赋值
       optional 可选的,可以不使用
*/

#定义一个异常,数据传递时/方法调用是可能出现的异常 关键字exception
#服务端如果出现异常的话直接抛给客户端,让客户端catch处理
exception DataException{
    1:optional String message;
    2:optional String callStack;
    3:optional String date;
}

#定义服务接口 关键字service
#定义一系列方法,就是客户端于服务端进行交互所调用的方法,具体实现由服务端完成
service PersonService{
    //返回值 方法名(参数) throws (异常)
    Person getPersonByUsername(1:required String username) throws (1:DataException dataException),
    void savePersion(1:required Person person) throws (1:DataException dataException)
}

使用thrift编译器生成编译文件

格式:thrift --gen 要生成的语言 IDL文件

thrift --gen java src/thrift/data.thrift

请输入图片描述

将生成的代码复制到src/main目录下,并加入thrift依赖,本案例使用gradle作为包管理工具

org.apache.thrift:libthrift:0.13.0

Java编写客户端与服务器端

编写接口实现类

实际开发中放在服务端

//thrift生成的接口文件的实现类
public class PersonServiceImpl implements PersonService.Iface {

    @Override
    public Person getPersonByUsername(String username) throws DataException, TException {
        System.out.println("Got Client Param:" + username);

        Person person = new Person().setUsername(username)
                .setAge(18).setMarried(false);
        return person;
    }

    @Override
    public void savePersion(Person person) throws DataException, TException {
        System.out.println("Got Client Param:");
        System.out.println(person.getUsername());
        System.out.println(person.getAge());
        System.out.println(person.isMarried());
    }
}

服务器端

/*
	服务端  实现client远程调用Server方法,这个结果说明了方法还是在Server端。
 	Client调用方法,方法走了一遍,但其实还是在server端走,最后的结果通过网络传输到Client
   	方法体里的打印值,还是在Server端打印。只有Client端自己打印的值,才会出现在Client中。
*/
public class ThriftServer {
    public static void main(String[] args) throws Exception {
        //非阻塞的socket  绑定端口号8899,表示客户端与服务端建立的连接
        TNonblockingServerSocket socket = new TNonblockingServerSocket(8899);
        //高可用的server,并设置工作线程的最大值和最小值  arg作用就是构建一系列信息
        THsHaServer.Args arg = new THsHaServer.Args(socket)
                .minWorkerThreads(2).maxWorkerThreads(4);
        //设置处理器(Processor),将实现接口作为泛型,因为客户端那边调用的就是这个,
        //所以后面传输的也是这个对象new PersonServiceImpl()
        PersonService.Processor<PersonServiceImpl> processor =
                new PersonService.Processor<>(new PersonServiceImpl());

        //设置协议工厂
        //协议层:表示数据传输格式,这里TCompactProtocol(二进制压缩协议)表示压缩格式,速率很快
        arg.protocolFactory(new TCompactProtocol.Factory());
        //传输层:表示数据的传输方式,这里TFramedTransport是以frame为单位传输,非阻塞式传输
        arg.transportFactory(new TFramedTransport.Factory());
        arg.processorFactory(new TProcessorFactory(processor));

        //启动server  支持的服务模型:THsHaServer半同步,半异步Server
        TServer server = new THsHaServer(arg);

        System.out.println("Thrift Server Started!");

        //一个异步死循环,永远不会退出
        server.serve();
    }
}

客户端

public class ThriftClient {
    public static void main(String[] args) {
        //传输层/传输协议:要和服务端的传输协议保持一致,设置地址,端口号,和超时时间,是一个连接/socket
        TTransport transport = new TFramedTransport(
                new TSocket("localhost",8899),600);
        //协议层设置,设置数据传输格式,传入传输层,要与服务端保持一致
        TProtocol protocol = new TCompactProtocol(transport);
        //获得thrift自动生成的Client对象,可以与服务端进行远程调用的对象
        PersonService.Client client = new PersonService.Client(protocol);
        try {
            //打开socket
            transport.open();
            //关键:client本来就没有getPersonByUsername方法,这是通过网络传输调用
            Person person = client.getPersonByUsername("星空");

            System.out.println(person.getUsername());
            System.out.println(person.getAge());
            //对于boolean型,不是get,而是is开头.但是set都一样
            System.out.println(person.isMarried());

            System.out.println("------------------");

            Person person1 = new Person().setUsername("测试")
                .setAge(18).setMarried(false);
            client.savePersion(person1);

        }catch (Exception e){
            throw new RuntimeException(e.getMessage(),e);
        }finally {
            //最后关闭transport
            transport.close();
        }
    }
}

测试

启动服务器,再启动客户端
服务器端打印:

Thrift Server Started!
Got Client Param:星空
Got Client Param:
测试
18
false

客户端打印:

星空
18
false
------------------

于Google Protobuf相比,Google Protobuf只是进行编解码(序列化与反序列)操作,使用netty作为网络载体进行远程方法调用。而Thrift不仅仅既可以进行编解码工作,还提供传输对象功能,并且可以自己定义业务接口。

thrift等于

  1. 一个 code generator 代码生成器
  2. 序列化、反序列化
  3. server层。
原文地址:https://www.cnblogs.com/mikisakura/p/12983564.html