Thrift学习

Thrift学习

一:thrift介绍

  Thrift是facebook开发的用来处理各不同系统之间数据通讯的rpc服务框架,后来成为apche的开源项目。thrift支持多种程序语言,包括Java,Python,Ruby,JavaScript,Node.js,Go,C,C++,C#,Erlang,Delphi,Perl,Php,SmallTalk,OCaml,Haxe,Haskell,D语言。Thrift采用IDL(Interface Defination Language)描述性语言来定义数据结构和接口。Thrift模型如下所示:
 
                                                  图 thrift模型图

 

二 thrift数据传输协议

TBinaryProtocol                 二进制传输协议
TCompactProtocol                使用VLQ编码进行压缩的数据传输协议
TJSONProtocol                   JSON格式的数据传输协议
TSimpleJSONProtocol             简单的JSON格式数据传输协议
TDebugProtocol                  调试时使用的文本传输协议

 

三 thrift传输层

TFramedTransport               按块的大小进行传输
TFileTransport                 按照文件的方式进行传输
TMemoryTransport               使用内存IO方式进行传输
TZlibTransport                 执行zlib压缩方式传输

四 thrift服务器端

TSimpleServer                  简单的单线程标准阻塞式服务器
TThreadPoolServer              多线程阻塞式服务器
TNonblockingServer             多线程非阻塞式服务器
THsHaServer                    半同步半异步服务器
其实传输层的传输只有阻塞和非阻塞,再加上具体的工作方式 单线程 多线程

五 thrift客户端

TClient                    简单单线程阻塞式客户端
TAsynClient                异步客户端(多线程)

六 thrift开发步骤

1服务器端

实现服务处理接口impl
创建TProcessor
创建TServerTransport(TServerSocket)   创建阻塞通信的还是非阻塞通信
创建TProtocol                                      数据传输协议
创建TServer                                       服务器类型 单工(单线程)  双工(多线程)  半单工半双工(多线程)
启动Server

2客户端

创建Transport(TSocket)               创建阻塞通信(客户端只有阻塞)
创建TProtocol                        数据传输协议
基于TTransport和TProtocol创建Client
调用Client的相应方法

七 thrift数据类型

1基本类型

bool:布尔值,true 或 false

byte:8 位有符号整数

i16:16 位有符号整数

i32:32 位有符号整数

i64:64 位有符号整数

double:64 位浮点数

string:utf-8编码的字符串

2结构体类型

struct:定义公共的对象

enum: 枚举类型

3容器类型

list:对应 Java 的 ArrayList

set:对应 Java 的 HashSet

map:对应 Java 的 HashMap

4异常类型

exception:对应 Java 的 Exception

5服务类型

service:对应服务的类  提供接口

八 thrift例子

enum 类型

复制代码
struct Student{
     1: required i32 id
     2: required string username
     3: required string password
     4: requried string number
     5: optional double age
}
复制代码

struct 类型

复制代码
struct School{
     1: required i32 id
     2: required string name
     3: required set<Student> students
     4: required list<Student> rank
     5: required map<string, string> number_name
}
复制代码

service 类型

service ThriftMysqlService{
     void addUser(1:Student user)
     list<Student> queryAllUser()
     Student queryOneUser(1:i32 id)
     map<string, string> queryOneArticle(1:i32 id)
}

具体代码

thrift.thrift 定义数据类型和接口的文件

复制代码
namespace java org.seava.thrift_example.thrift

struct User{
    1: required i32 userId
    2: required string username
    3: required string password
}

service ThriftService{
    void addUser(1:User user)
    User queryUser(1:i32 id)
    list<User> queryUserList()
    map<string, string> queryUserNamePass()
    map<i32, User> queryUserMap()
}
复制代码

到apache的官网下载thrift.exe程序, 下载地址 http://thrift.apache.org/ ,下下来通过cmd命令窗口去运行如下命令

thrift  -gen java xxx.thrift

接口实现 ThriftServiceImpl.java

复制代码
 1 package org.seava.thrift_example.thrift;
 2 
 3 import java.util.ArrayList;
 4 import java.util.HashMap;
 5 import java.util.List;
 6 import java.util.Map;
 7 
 8 
 9 public class ThriftServiceImpl implements ThriftService.Iface {
10 
11       public void addUser(User user) throws org.apache.thrift.TException{
12           System.out.println(user.userId + "  " + user.username + "  " + user.password);
13       }
14 
15       public User queryUser(int id) throws org.apache.thrift.TException{
16           System.out.println(id);
17           User user = new User();
18           user.userId = 100;
19           user.username = "FFF";
20           user.password = "NNN";
21           return user;
22       }
23 
24       public List<User> queryUserList() throws org.apache.thrift.TException{
25           User user = new User();
26           user.userId = 100;
27           user.username = "FFF";
28           user.password = "NNN";
29           User user2 = new User();
30           user2.userId = 102;
31           user2.username = "FFF2";
32           user2.password = "NNN2";
33           List<User> list = new ArrayList<User>();
34           list.add(user2);
35           list.add(user);
36           return list;
37       }
38 
39       public Map<String,String> queryUserNamePass() throws org.apache.thrift.TException{
40           User user = new User();
41           user.userId = 100;
42           user.username = "FFF";
43           user.password = "NNN";
44           Map<String, String> map = new HashMap<String, String>();
45           map.put("password", user.password);
46           map.put("useranme", user.username);
47           return map;
48       }
49 
50       public Map<Integer,User> queryUserMap() throws org.apache.thrift.TException{
51           User user = new User();
52           user.userId = 100;
53           user.username = "FFF";
54           user.password = "NNN";
55           User user2 = new User();
56           user2.userId = 102;
57           user2.username = "FFF2";
58           user2.password = "NNN2";
59           Map<Integer, User> map = new HashMap<Integer, User>();
60           map.put(user.userId, user);
61           map.put(user2.userId, user2);
62           return map;
63       }
64 
65 }
复制代码

服务器 Server.java 

复制代码
  1 package org.seava.thrift_example.thrift;
  2 
  3 import org.apache.thrift.TProcessor;
  4 import org.apache.thrift.protocol.TBinaryProtocol;
  5 import org.apache.thrift.protocol.TCompactProtocol;
  6 import org.apache.thrift.server.THsHaServer;
  7 import org.apache.thrift.server.TNonblockingServer;
  8 import org.apache.thrift.server.TServer;
  9 import org.apache.thrift.server.TSimpleServer;
 10 import org.apache.thrift.server.TThreadPoolServer;
 11 import org.apache.thrift.transport.TFramedTransport;
 12 import org.apache.thrift.transport.TNonblockingServerSocket;
 13 import org.apache.thrift.transport.TNonblockingServerTransport;
 14 import org.apache.thrift.transport.TServerSocket;
 15 import org.apache.thrift.transport.TTransportException;
 16 
 17 
 18 public class Server {
 19     
 20     public static int port = 8090;
 21     
 22     /**
 23      * 简单服务器类型  阻塞单线程
 24      * 步骤
 25      * 创建TProcessor
 26      * 创建TServerTransport
 27      * 创建TProtocol
 28      * 创建TServer
 29      * 启动Server
 30      */
 31     public static void startSimpleServer(){
 32         //创建processor
 33         TProcessor tprocessor = new ThriftService.Processor<ThriftService.Iface>(new ThriftServiceImpl());
 34         try {
 35             //创建transport 阻塞通信
 36             TServerSocket serverTransport = new TServerSocket(port);
 37             //创建protocol
 38             TBinaryProtocol.Factory protocol = new TBinaryProtocol.Factory();
 39             //将processor transport protocol设入到服务器server中
 40             TServer.Args args = new TServer.Args(serverTransport);
 41             args.processor(tprocessor);
 42             args.protocolFactory(protocol);
 43             //定义服务器类型 设定参数
 44             TServer server = new TSimpleServer(args);
 45             //开启服务
 46             server.serve();
 47         } catch (TTransportException e) {
 48             e.printStackTrace();
 49         }
 50     }
 51     
 52     /**
 53      * 多线程服务器   阻塞多线程
 54      */
 55     public static void startThreadPoolServer(){
 56         //创建processor
 57         TProcessor tprocessor = new ThriftService.Processor<ThriftService.Iface>(new ThriftServiceImpl());
 58         try{
 59             //创建transport 阻塞通信
 60             TServerSocket serverTransport = new TServerSocket(port);
 61             //创建protocol  数据传输协议
 62             TBinaryProtocol.Factory protocol = new TBinaryProtocol.Factory();
 63             TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport);
 64             args.processor(tprocessor);
 65             args.protocolFactory(protocol);
 66             //创建服务器类型  多线程
 67             TServer server = new TThreadPoolServer(args);
 68             //开启服务
 69             server.serve();
 70         }catch(Exception e){
 71             e.printStackTrace();
 72         }
 73     }
 74     
 75     /**
 76      * 非阻塞I/O
 77      */
 78     public static void startTNonblockingServer(){
 79         //创建processor
 80         TProcessor tprocessor = new ThriftService.Processor<ThriftService.Iface>(new ThriftServiceImpl());
 81         try{
 82             //创建transport 非阻塞 nonblocking
 83             TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(port);
 84             //创建protocol 数据传输协议
 85             TCompactProtocol.Factory protocol = new TCompactProtocol.Factory();
 86             //创建transport 数据传输方式  非阻塞需要用这种方式传输
 87             TFramedTransport.Factory transport = new TFramedTransport.Factory();
 88             TNonblockingServer.Args args = new TNonblockingServer.Args(serverTransport);
 89             args.processor(tprocessor);
 90             args.transportFactory(transport);
 91             args.protocolFactory(protocol);
 92             //创建服务器 类型是非阻塞
 93             TServer server = new TNonblockingServer(args);
 94             //开启服务
 95             server.serve();
 96         }catch(Exception e){
 97             e.printStackTrace();
 98         }
 99     }
100     
101     /**
102      * 半同步半异步的非阻塞I/O
103      */
104     public static void startTHsHaServer(){
105         //创建processor
106         TProcessor tprocessor = new ThriftService.Processor<ThriftService.Iface>(new ThriftServiceImpl());
107         try{
108             //创建transport  非阻塞
109             TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(port);
110             //非阻塞需要的传输方式
111             TFramedTransport.Factory transport = new TFramedTransport.Factory();
112             //数据传输协议
113             TCompactProtocol.Factory protocol = new TCompactProtocol.Factory();
114             //创建半同步半异步服务
115             THsHaServer.Args args = new THsHaServer.Args(serverTransport);
116             args.processor(tprocessor);
117             args.transportFactory(transport);
118             args.protocolFactory(protocol);
119             //创建 服务类型
120             TServer server = new THsHaServer(args);
121             //开启服务
122             server.serve();
123         }catch(Exception e){
124             e.printStackTrace();
125         }
126     }
127     
128     public static void main(String args[]){
129         //开启简单服务器
130 //        Server.startSimpleServer();
131         //开启多线程服务器
132 //        Server.startThreadPoolServer();
133 //        Server.startTNonblockingServer();
134 //        Server.startTHsHaServer();
135         Server.startTNonblockingServer();
136     }
137 }
复制代码

Server.java实现了简单服务器(阻塞单线程)   阻塞多线程   非阻塞   半同步半异步非阻塞

注意: 非阻塞时传输层需要选择TFramedTransport           

客户端 Client.java

复制代码
  1 package org.seava.thrift_example.thrift;
  2 
  3 import java.util.List;
  4 import java.util.Map;
  5 import java.util.concurrent.CountDownLatch;
  6 import java.util.concurrent.TimeUnit;
  7 
  8 import org.apache.thrift.async.TAsyncClientManager;
  9 import org.apache.thrift.protocol.TBinaryProtocol;
 10 import org.apache.thrift.protocol.TCompactProtocol;
 11 import org.apache.thrift.protocol.TProtocol;
 12 import org.apache.thrift.protocol.TProtocolFactory;
 13 import org.apache.thrift.transport.TFramedTransport;
 14 import org.apache.thrift.transport.TNonblockingSocket;
 15 import org.apache.thrift.transport.TNonblockingTransport;
 16 import org.apache.thrift.transport.TSocket;
 17 import org.apache.thrift.transport.TTransport;
 18 
 19 
 20 public class Client implements Runnable {
 21 
 22     public static String ip = "localhost";
 23     public static int port = 8090;
 24     public static int time_out = 30000;
 25     
 26     /**
 27      * 客户端设置
 28      * 创建Transport
 29      * 创建TProtocol
 30      * 基于TTransport和TProtocol创建Client
 31      * 调用Client的相应方法
 32      */
 33     public static void startSimpleClient(){
 34         TTransport transport = null;
 35         try{
 36             //创建Transport
 37             transport = new TSocket(ip, port, time_out);
 38             //创建TProtocol
 39             TProtocol protocol = new TBinaryProtocol(transport);
 40             //基于TTransport和TProtocol创建Client
 41             ThriftService.Client client = new ThriftService.Client(protocol);
 42             transport.open();
 43             //调用client方法
 44             List<User> list = client.queryUserList();
 45             for(User user : list){
 46                 System.out.println(user.userId + " " + user.username + " " + user.password);
 47             }
 48             Map<String, String> map = client.queryUserNamePass();
 49             System.out.println(map);
 50             User user = client.queryUser(10);
 51             System.out.println(user.userId + " " + user.username + " " + user.password);
 52             Map<Integer, User> map_u = client.queryUserMap();
 53             System.out.println(map_u);
 54             User uu = new User();
 55             uu.userId = 1111;
 56             uu.username = "mmbbmmbb";
 57             uu.password = "ppbbppbb";
 58             client.addUser(uu);
 59         }catch(Exception e){
 60             e.printStackTrace();
 61         }
 62     }
 63     
 64     /**
 65      * 调用阻塞服务器的客户端
 66      */
 67     public static void startNonblockingClient(){
 68         TTransport transport = null;
 69         try{
 70             transport = new TFramedTransport(new TSocket(ip, port));
 71             TCompactProtocol protocol = new TCompactProtocol(transport);
 72             ThriftService.Client client = new ThriftService.Client(protocol);
 73             transport.open();
 74             //调用client方法
 75             List<User> list = client.queryUserList();
 76             for(User user : list){
 77                 System.out.println(user.userId + " " + user.username + " " + user.password);
 78             }
 79             Map<String, String> map = client.queryUserNamePass();
 80             System.out.println(map);
 81             User user = client.queryUser(10);
 82             System.out.println(user.userId + " " + user.username + " " + user.password);
 83             Map<Integer, User> map_u = client.queryUserMap();
 84             System.out.println(map_u);
 85             User uu = new User();
 86             uu.userId = 1111;
 87             uu.username = "mmbbmmbb";
 88             uu.password = "ppbbppbb";
 89             client.addUser(uu);
 90         }catch(Exception e){
 91             e.printStackTrace();
 92         }
 93     }
 94     
 95     public static void startAsynClient(){
 96         try{
 97             TAsyncClientManager clientManager = new TAsyncClientManager();
 98             TNonblockingTransport transport = new TNonblockingSocket(ip, port, time_out);
 99             TProtocolFactory tprotocol = new TCompactProtocol.Factory();
100             ThriftService.AsyncClient asyncClient = new ThriftService.AsyncClient(tprotocol, clientManager, transport);
101             System.out.println("Client start ...");
102             CountDownLatch latch = new CountDownLatch(1);
103             AsynCallback callBack = new AsynCallback(latch);
104             System.out.println("call method queryUser start ...");
105             asyncClient.queryUser(100, callBack);
106             System.out.println("call method queryUser end");
107             boolean wait = latch.await(30, TimeUnit.SECONDS);
108             System.out.println("latch.await =:" + wait);
109         }catch(Exception e){
110             e.printStackTrace();
111         }
112     }
113     
114     public void run(){
115         Client.startSimpleClient();
116     }
117     
118     public static void main(String args[]){
119         //调用简单服务器 
120 //        Client.startSimpleClient();
121         /*Client c1 = new Client();
122         Client c2 = new Client();
123         
124         new Thread(c1).start();
125         new Thread(c2).start();*/
126         
127 //        Client.startNonblockingClient();
128 //        Client.startNonblockingClient();
129         Client.startAsynClient();
130     }
131 }
复制代码

客户端实现了 阻塞单线程  和 异步客户端

具体代码在github上: https://github.com/WaterHsu/thrift-example.git

 
 
标签: thriftjava
原文地址:https://www.cnblogs.com/Leo_wl/p/4103492.html