Java回调机制在RPC框架中的应用示例

完整源码:

应用场景描述:

服务提供者在项目启动时,创建并启动一个TCP服务器,然后将自己提供的所有服务注册到注册中心。

这里有3个角色:

1.服务提供者
2.TCP服务器
3.注册中心

引出回调:

服务提供者,要求TCP服务器启动成功之后,回调一下"注册服务"的逻辑。

开始撸码:

首先定义一个回调基类,这是一个抽象类,里面只有一个抽象的回调函数,是将来回调时要实现的方法

 1 /**
 2  * 回调基类
 3  *
 4  * @author syj
 5  */
 6 public abstract class BaseCallBack {
 7     /**
 8      * 回调执行逻辑
 9      *
10      * @throws Exception
11      */
12     public abstract void run() throws Exception;
13 }

谁来调用这个回调函数呢,前面说了,TCP服务器启动之后要调用这个回调函数,所以回调的操作要在TCP服务器中完成。

TCP服务器是一个具体的服务实现,我们可能会有多种服务器的实现,所以先定义一个抽象的服务类:

 1 /**
 2  * 服务抽象
 3  *
 4  * @author syj
 5  */
 6 public abstract class Server {
 7 
 8     // 服务启动后回调
 9     private BaseCallBack startedCallBack;
10 
11     // 服务停止后回调
12     private BaseCallBack stopedCallBack;
13 
14     /**
15      * 设置服务启动后的回调逻辑
16      *
17      * @param startedCallBack
18      */
19     public void setStartedCallBack(BaseCallBack startedCallBack) {
20         this.startedCallBack = startedCallBack;
21     }
22 
23     /**
24      * 设置服务停止后的回调逻辑
25      *
26      * @param stopedCallBack
27      */
28     public void setStopedCallBack(BaseCallBack stopedCallBack) {
29         this.stopedCallBack = stopedCallBack;
30     }
31 
32     /**
33      * 服务启动后回调
34      */
35     public void onStarted() {
36         if (startedCallBack != null) {
37             try {
38                 startedCallBack.run();
39             } catch (Exception e) {
40                 e.printStackTrace();
41             }
42         }
43     }
44 
45     /**
46      * 服务停止后回调
47      */
48     public void onStoped() {
49         if (stopedCallBack != null) {
50             try {
51                 stopedCallBack.run();
52             } catch (Exception e) {
53                 e.printStackTrace();
54             }
55         }
56     }
57 
58     /**
59      * 启动服务
60      *
61      * @param provider
62      * @throws Exception
63      */
64     public abstract void start(Provider provider) throws Exception;
65 
66     /**
67      * 停止服务
68      *
69      * @throws Exception
70      */
71     public abstract void stop() throws Exception;
72 
73 }

这个服务器抽象类,主要有启动服务和停止服务的方法,还持有两个回调对象,一个是服务器启动后的回调对象,一个是服务器停止后的回调对象。并有两个方法分别去调用这两个回调对象的run方法。

下面定义一个TCP服务器类,它是上面服务器抽象类的一个具体实现类:

 1 /**
 2  * 服务实现类
 3  *
 4  * @author syj
 5  */
 6 public class TcpServerImpl extends Server {
 7 
 8 
 9     /**
10      * 启动服务
11      *
12      * @param provider
13      */
14     @Override
15     public void start(Provider provider) {
16         System.out.println(">>>> start! " + provider.getTcpSrvAddr() + ":" + provider.getTcpSrvPort());
17         // 启动后回调
18         onStarted();
19 
20     }
21 
22     /**
23      * 停止服务
24      *
25      */
26     @Override
27     public void stop() {
28         System.out.println(">>>> stop!");
29         // 停止后回调
30         onStoped();
31     }
32 }

该类主要有两个功能,一个是启动TCP服务,一个是停止TCP服务,这两个方法的最后都需要触发回调逻辑的执行;

关于具体回调逻辑的定义写在哪里呢?自然是服务的提供者最清楚,所以写在服务提供者类中最合适:

 1 import java.util.TreeSet;
 2 
 3 /**
 4  * 服务操作
 5  *
 6  * @author syj
 7  */
 8 public class Provider {
 9 
10     // 模拟要注册的服务列表
11     public static TreeSet<String> serviceKeys = new TreeSet<String>() {{
12         add("userService");
13         add("productService");
14         add("orderService");
15     }};
16 
17     // 模拟本机http服务使用的ip和端口
18     public static String localAddress = "127.0.0.1:8081";
19 
20     // TCP服务器地址
21     private String tcpSrvAddr;
22 
23     // TCP服务器端口
24     private int tcpSrvPort;
25 
26     public String getTcpSrvAddr() {
27         return tcpSrvAddr;
28     }
29 
30     public int getTcpSrvPort() {
31         return tcpSrvPort;
32     }
33 
34     private Server server;
35     private Registry registry;
36 
37     public Provider() {
38     }
39 
40     /**
41      * 初始化配置
42      *
43      * @param tcpSrvAddr
44      * @param tcpSrvPort
45      */
46     public void initConfig(String tcpSrvAddr, int tcpSrvPort) {
47         this.tcpSrvAddr = tcpSrvAddr;
48         this.tcpSrvPort = tcpSrvPort;
49     }
50 
51     /**
52      * 启动服务
53      */
54     public void start() {
55         try {
56             registry = Registry.class.newInstance();
57             server = TcpServerImpl.class.newInstance();
58             // 设置服务启动后回调逻辑
59             server.setStartedCallBack(new BaseCallBack() {
60                 @Override
61                 public void run() {
62                     System.out.println(">>>> setStartedCallBack:" + serviceKeys + ":" + localAddress);
63                     // 注册服务
64                     registry.start();
65                     registry.registry(serviceKeys, localAddress);
66                 }
67             });
68 
69             // 设置服务停止后回调逻辑
70             server.setStopedCallBack(new BaseCallBack() {
71                 @Override
72                 public void run() {
73                     System.out.println(">>>> setStopedCallBack:" + tcpSrvAddr + ":" + tcpSrvPort);
74                     registry.remove(serviceKeys, localAddress);
75                 }
76             });
77 
78             // 启动服务
79             server.start(this);
80         } catch (Exception e) {
81             e.printStackTrace();
82         }
83     }
84 
85     /**
86      * 停止服务
87      */
88     public void stop() {
89         try {
90             server.stop();
91         } catch (Exception e) {
92             e.printStackTrace();
93         }
94     }
95 }

由于服务提供者需要启动TCP服务器,所以它依赖一个Server对象,在他的start方法中,启动TCP服务之前,先给这个TCP服务设置两个回调回调具体逻辑,就是前面说的一个是TCP服务器启动之后要执行的逻辑,一个是TCP服务器停止之后要执行的逻辑。

TCP服务器启动成功之后,要将服务提供者的所有服务注册到注册中心,TCP服务器停止之后要从注册中心移除自己的所有服务。

下面是注册中心类,它只负责服务的注册和移除,别的事不管:

 1 import java.util.HashMap;
 2 import java.util.Map;
 3 import java.util.Set;
 4 import java.util.TreeSet;
 5 
 6 /**
 7  * 服务注册中心
 8  *
 9  * @author syj
10  */
11 public class Registry {
12 
13     private Map<String, TreeSet<String>> registryData;
14 
15 
16     public void start() {
17         registryData = new HashMap<String, TreeSet<String>>();
18         System.out.println(">>>> 注册中心创建成功");
19     }
20 
21     public void stop() {
22         registryData.clear();
23     }
24 
25 
26     public boolean registry(Set<String> keys, String value) {
27         if (keys==null || keys.size()==0 || value==null || value.trim().length()==0) {
28             return false;
29         }
30         for (String key : keys) {
31             TreeSet<String> values = registryData.get(key);
32             if (values == null) {
33                 values = new TreeSet<>();
34                 registryData.put(key, values);
35             }
36             values.add(value);
37         }
38         System.out.println(">>>> 服务注册成功");
39         return true;
40     }
41 
42     public boolean remove(Set<String> keys, String value) {
43         if (keys==null || keys.size()==0 || value==null || value.trim().length()==0) {
44             return false;
45         }
46         for (String key : keys) {
47             TreeSet<String> values = registryData.get(key);
48             if (values != null) {
49                 values.remove(value);
50             }
51         }
52         System.out.println(">>>> 服务移除成功");
53         return true;
54     }
55 }

写个测试类测试一下:

 1 /**
 2  * 测试类
 3  *
 4  * @author syj
 5  */
 6 public class App {
 7     // TCP 服务器IP
 8     public static String tcpSrvAddr = "192.168.11.23";
 9     // TCP 服务端口
10     public static int tcpSrvPort = 9090;
11 
12     public static void main(String[] args) {
13         Provider provider = new Provider();
14         provider.initConfig(tcpSrvAddr, tcpSrvPort);
15         provider.start();
16         provider.stop();
17     }
18 }

输出结果:

>>>> start! 192.168.11.23:9090
>>>> setStartedCallBack:[orderService, productService, userService]:127.0.0.1:8081
>>>> 注册中心创建成功
>>>> 服务注册成功
>>>> stop!
>>>> setStopedCallBack:192.168.11.23:9090
>>>> 服务移除成功

总结:

A类调用B类的某个方法,B类的该方法中又调用了A类的某个方法,这就是回调。
这仅是一个形象的描述,具体的回调机制在实际应用时会很灵活。

比如这个例子中,Provider类的start方法调用啦Server类的start方法,而Server类的start方法中又调用了onStarted方法,
虽然这个onStarted方法不是Provider中的方法,但其执行的回调逻辑是在Provider中通过setStartedCallBack()来设置的。
原文地址:https://www.cnblogs.com/jun1019/p/10925644.html