[转载] Thrift-server与spring集成

转载自http://shift-alt-ctrl.iteye.com/blog/1990026

Thrift服务server端,其实就是一个ServerSocket线程 + 处理器,当Thrift-client端建立链接之后,处理器负责解析socket流信息,并根据其指定的"方法名"+参数列表,来调用"服务实现类"的方法,并将执行结果(或者异常)写入到socket中.

    一个server,就需要创建一个ServerSocket,并侦听本地的一个端口,这种情况对分布式部署,有一些额外的要求:client端需要知道一个"service"被部署在了那些server上.

设计思路:

    1) 每个server内部采用threadPool的方式,来提升并发能力.

    2) 当server启动成功后,向zookeeper注册服务节点,此后client端就可以"感知到"服务的状态

    3) 通过spring的方式,配置thrift-server服务类.

    其中zookeepeer注册是可选选项

1.pom.xml

Java代码  收藏代码
  1. <dependencies>  
  2.     <dependency>  
  3.         <groupId>org.springframework</groupId>  
  4.         <artifactId>spring-context</artifactId>  
  5.         <version>3.0.7.RELEASE</version>  
  6.     </dependency>  
  7.     <dependency>  
  8.         <groupId>org.apache.zookeeper</groupId>  
  9.         <artifactId>zookeeper</artifactId>  
  10.         <version>3.4.5</version>  
  11.         <!--<exclusions>-->  
  12.             <!--<exclusion>-->  
  13.                 <!--<groupId>log4j</groupId>-->  
  14.                 <!--<artifactId>log4j</artifactId>-->  
  15.             <!--</exclusion>-->  
  16.         <!--</exclusions>-->  
  17.     </dependency>  
  18.     <!--  
  19.     <dependency>  
  20.         <groupId>com.101tec</groupId>  
  21.         <artifactId>zkclient</artifactId>  
  22.         <version>0.4</version>  
  23.     </dependency>  
  24.     -->  
  25.     <dependency>  
  26.         <groupId>org.apache.thrift</groupId>  
  27.         <artifactId>libthrift</artifactId>  
  28.         <version>0.9.1</version>  
  29.     </dependency>  
  30.     <dependency>  
  31.         <groupId>org.apache.curator</groupId>  
  32.         <artifactId>curator-recipes</artifactId>  
  33.         <version>2.3.0</version>  
  34.     </dependency>  
  35.     <dependency>  
  36.         <groupId>commons-pool</groupId>  
  37.         <artifactId>commons-pool</artifactId>  
  38.         <version>1.6</version>  
  39.     </dependency>  
  40.   
  41. </dependencies>  

    本实例,使用了apache-curator作为zookeeper客户端.

2. spring-thrift-server.xml

Java代码  收藏代码
  1. <!-- zookeeper -->  
  2. <bean id="thriftZookeeper" class="com.demo.thrift.zookeeper.ZookeeperFactory" destroy-method="close">  
  3.     <property name="connectString" value="127.0.0.1:2181"></property>  
  4.     <property name="namespace" value="demo/thrift-service"></property>  
  5. </bean>  
  6. <bean id="sericeAddressReporter" class="com.demo.thrift.support.impl.DynamicAddressReporter" destroy-method="close">  
  7.     <property name="zookeeper" ref="thriftZookeeper"></property>  
  8. </bean>  
  9. <bean id="userService" class="com.demo.service.UserServiceImpl"/>  
  10. <bean class="com.demo.thrift.ThriftServiceServerFactory" destroy-method="close">  
  11.     <property name="service" ref="userService"></property>  
  12.     <property name="configPath" value="UserServiceImpl"></property>  
  13.     <property name="port" value="9090"></property>  
  14.     <property name="addressReporter" ref="sericeAddressReporter"></property>  
  15. </bean>  

3. ThriftServiceServerFactory.java

    此类严格上说并不是一个工厂类,它的主要作用就是封装指定的"service" ,然后启动一个server的过程,其中"service"属性表示服务的实现类,addressReporter表示当server启动成功后,需要指定的操作(比如,向zookeeper发送service的IP信息).

    究竟当前server的ip地址是多少,在不同的设计中,有所不同,比如:有些管理员喜欢将本机的IP地址写入到os下的某个文件中,如果上层应用需要获取可靠的IP信息,就需要读取这个文件...你可以实现自己的ThriftServerIpTransfer来获取当前server的IP.

    为了减少xml中的配置信息,在factory中,使用了反射机制来构建"Processor"类.

Java代码  收藏代码
  1. public class ThriftServiceServerFactory implements InitializingBean {  
  2.   
  3.     private Integer port;  
  4.   
  5.     private Integer priority = 1;// default  
  6.   
  7.     private Object service;// serice实现类  
  8.   
  9.     private ThriftServerIpTransfer ipTransfer;  
  10.   
  11.     private ThriftServerAddressReporter addressReporter;  
  12.       
  13.     private ServerThread serverThread;  
  14.       
  15.     private String configPath;  
  16.   
  17.     public void setService(Object service) {  
  18.         this.service = service;  
  19.     }  
  20.   
  21.     public void setPriority(Integer priority) {  
  22.         this.priority = priority;  
  23.     }  
  24.   
  25.     public void setPort(Integer port) {  
  26.         this.port = port;  
  27.     }  
  28.   
  29.     public void setIpTransfer(ThriftServerIpTransfer ipTransfer) {  
  30.         this.ipTransfer = ipTransfer;  
  31.     }  
  32.   
  33.     public void setAddressReporter(ThriftServerAddressReporter addressReporter) {  
  34.         this.addressReporter = addressReporter;  
  35.     }  
  36.       
  37.   
  38.     public void setConfigPath(String configPath) {  
  39.         this.configPath = configPath;  
  40.     }  
  41.   
  42.     @Override  
  43.     public void afterPropertiesSet() throws Exception {  
  44.         if (ipTransfer == null) {  
  45.             ipTransfer = new LocalNetworkIpTransfer();  
  46.         }  
  47.         String ip = ipTransfer.getIp();  
  48.         if (ip == null) {  
  49.             throw new NullPointerException("cant find server ip...");  
  50.         }  
  51.         String hostname = ip + ":" + port + ":" + priority;  
  52.         Class serviceClass = service.getClass();  
  53.         ClassLoader classLoader = Thread.currentThread().getContextClassLoader();  
  54.         Class<?>[] interfaces = serviceClass.getInterfaces();  
  55.         if (interfaces.length == 0) {  
  56.             throw new IllegalClassFormatException("service-class should implements Iface");  
  57.         }  
  58.   
  59.         // reflect,load "Processor";  
  60.         Processor processor = null;  
  61.         for (Class clazz : interfaces) {  
  62.             String cname = clazz.getSimpleName();  
  63.             if (!cname.equals("Iface")) {  
  64.                 continue;  
  65.             }  
  66.             String pname = clazz.getEnclosingClass().getName() + "$Processor";  
  67.             try {  
  68.                 Class pclass = classLoader.loadClass(pname);  
  69.                 if (!pclass.isAssignableFrom(Processor.class)) {  
  70.                     continue;  
  71.                 }  
  72.                 Constructor constructor = pclass.getConstructor(clazz);  
  73.                 processor = (Processor) constructor.newInstance(service);  
  74.                 break;  
  75.             } catch (Exception e) {  
  76.                 //  
  77.             }  
  78.         }  
  79.   
  80.         if (processor == null) {  
  81.             throw new IllegalClassFormatException("service-class should implements Iface");  
  82.         }  
  83.         //需要单独的线程,因为serve方法是阻塞的.  
  84.         serverThread = new ServerThread(processor, port);  
  85.         serverThread.start();  
  86.         // report  
  87.         if (addressReporter != null) {  
  88.             addressReporter.report(configPath, hostname);  
  89.         }  
  90.   
  91.     }  
  92.   
  93.     class ServerThread extends Thread {  
  94.         private TServer server;  
  95.   
  96.         ServerThread(Processor processor, int port) throws Exception {  
  97.             TServerSocket serverTransport = new TServerSocket(port);  
  98.             Factory portFactory = new TBinaryProtocol.Factory(true, true);  
  99.             Args args = new Args(serverTransport);  
  100.             args.processor(processor);  
  101.             args.protocolFactory(portFactory);  
  102.             server = new TThreadPoolServer(args);  
  103.         }  
  104.   
  105.         @Override  
  106.         public void run(){  
  107.             try{  
  108.                 server.serve();  
  109.             }catch(Exception e){  
  110.                 //  
  111.             }  
  112.         }  
  113.           
  114.         public void stopServer(){  
  115.             server.stop();  
  116.         }  
  117.     }  
  118.   
  119.     public void close() {  
  120.         serverThread.stopServer();  
  121.     }  
  122.   
  123. }  

4. DynamicAddressReporter.java

    在ThriftServiceServerFactory中,有个可选的属性:addressReporter, DynamicAddressReporter提供了向zookeeper注册service信息的能力,当server启动正常后,把server的IP + port发送到zookeeper中;那么此后服务消费client,就可以从zookeeper中获取server列表,并与它们建立链接(池).这样client端只需要关注zookeeper的节点名称即可,不需要配置大量的ip+port.

Java代码  收藏代码
  1. public class DynamicAddressReporter implements ThriftServerAddressReporter {  
  2.       
  3.     private CuratorFramework zookeeper;  
  4.       
  5.     public DynamicAddressReporter(){}  
  6.       
  7.     public DynamicAddressReporter(CuratorFramework zookeeper){  
  8.         this.zookeeper = zookeeper;  
  9.     }  
  10.   
  11.   
  12.     public void setZookeeper(CuratorFramework zookeeper) {  
  13.         this.zookeeper = zookeeper;  
  14.     }  
  15.   
  16.     @Override  
  17.     public void report(String service, String address) throws Exception {  
  18.         if(zookeeper.getState() == CuratorFrameworkState.LATENT){  
  19.             zookeeper.start();  
  20.             zookeeper.newNamespaceAwareEnsurePath(service);  
  21.         }  
  22.         zookeeper.create()  
  23.             .creatingParentsIfNeeded()  
  24.             .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)  
  25.             .forPath(service +"/i_",address.getBytes("utf-8"));  
  26.     }  
  27.       
  28.       
  29.     public void close(){  
  30.         zookeeper.close();  
  31.     }  
  32.   
  33. }  

5. 测试类

Java代码  收藏代码
  1. public class ServiceMain {  
  2.   
  3.     /** 
  4.      * @param args 
  5.      */  
  6.     public static void main(String[] args) {  
  7.         try {  
  8.             ApplicationContext context = new ClassPathXmlApplicationContext("spring-thrift-server.xml");  
  9.             Thread.sleep(3000000);  
  10.         } catch (Exception e) {  
  11.             e.printStackTrace();  
  12.         }  
  13.   
  14.     }  
  15.   
  16. }  

  本文就不在展示如何使用thrift文件生成service API的过程,请参考[Thrift简介]

   Thrift-client端代码开发与配合,请参见[Thrift-client]

   更多代码,请参考附件.

原文地址:https://www.cnblogs.com/scott19820130/p/4919129.html