Zbus 笔记

http://blog.csdn.net/cx308679291/article/details/50113257

Zbus学习笔记

标签: zbus
 分类:

 ZBUS = MQ + RPC + PROXY 

  • 支持消息队列, 发布订阅, RPC, 代理(TCP/HTTP/DMZ)
  • 亿级消息堆积能力、支持HA高可用
  • 超轻量级,单个Jar包无依赖 ~250K 
  • 丰富的API--JAVA/C/C++/C#/Python/Node.JS多语言接入
 
1.MQ
通过ZBus的消息队列服务,连接生产者与消费者,二者可以在完全不知道对方存在的情况下去调用
a、生产者发送消息
 //创建Broken代理
BrokerConfig config=new BrokerConfig();
config.setServerAddress("192.168.1.169:15555");
final Broker broker=new SingleBroker(config);

Producer producer=new Producer(broker,"chenxing");
producer.createMQ();

for(int i=0;i<100000;i++){

//发送消息
Message msg=new Message();
HashMap<String,Object> map=new HashMap<String,Object>();
map.put("xm","陈星");
map.put("sex", "男");
map.put("content", "能收到我发送的请求吗?"+String.valueOf(i));

SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
map.put("askTime", df.format(new Date()));
msg.setBody(JSON.toJSONString(map));
msg=producer.sendSync(msg);
Thread.sleep(1000);
}
b、消费者处理消息
// 创建Broker代表
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setServerAddress("192.168.1.169:15555");
Broker broker = new SingleBroker(brokerConfig);


MqConfig config = new MqConfig();
config.setBroker(broker);
config.setMq("chenxing");


// 创建消费者
@SuppressWarnings("resource")
final Consumer c = new Consumer(config);
c.onMessage(new MessageHandler() {
@Override
public void handle(Message msg, Session sess) throws IOException {
System.out.println(msg);
//HashMap<String,Object> map=JSON.parseObject(msg.getBodyString(), new TypeReference<HashMap<String,Object>>(){});
SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
System.out.println("处理完毕!"+df.format(new Date()));
}
});


c.start();
 
2、远程RPC调用
a、注册服务
final String serverAddress = ConfigKit.option(args, "-b", "192.168.1.169:15555");
final int threadCount = ConfigKit.option(args, "-c", 32); 
final String mq = ConfigKit.option(args, "-mq", "ceshi");

RpcProcessor processor = new RpcProcessor(); 
//增加模块,模块名在调用时需要指定
processor.addModule(new HellowImpl()); 

//配置Broker
BrokerConfig brokerCfg = new BrokerConfig();
brokerCfg.setServerAddress(serverAddress); 
brokerCfg.setMaxTotal(threadCount);
brokerCfg.setMinIdle(threadCount);

Broker broker = new SingleBroker(brokerCfg);

ServiceConfig config = new ServiceConfig();
config.setConsumerCount(threadCount); 
config.setMq(mq); 
config.setBroker(broker);    
config.setMessageProcessor(processor); 

@SuppressWarnings("resource")
Service svc = new Service(config);
svc.start();  
b、定义接口和实现类
public interface HellowInterface {
int GetResult(int a,int b);
}
 
public class HellowImpl implements HellowInterface {

@Override
public int GetResult(int a, int b) {
// TODO Auto-generated method stub
return a+b;
}


}
 
c、在调用方添加接口的注册文件
public interface HellowInterface {
int GetResult(int a,int b);
}
 
d、消费者调用
BrokerConfig config = new BrokerConfig();
config.setTrackServerList("192.168.1.169:15555");

Broker broker = new SingleBroker(config);
 
MessageInvoker invoker = new MqInvoker(broker, "ceshi");

RpcFactory factory = new RpcFactory(invoker); 

HellowInterface hello = factory.getService(HellowInterface.class);

for(int i=0;i<10000;i++){ 
try{
int result=hello.GetResult(i, 1);
System.out.println("1加上"+i+"结果为:"+result);
}catch(Exception e){
//continue;
}
}

Thread.sleep(1000);
broker.close();  
 
 
服务查询的地址
http://zbus.org/MyRpc/Interface/getOrder
http://zbus.org/MyRpc/getOrder
http://zbus.org/MyRpc
http://zbus.org/MyRpc/?cmd=query
原文地址:https://www.cnblogs.com/donaldlee2008/p/5751701.html