RocketMq入门 配置的acl没有生效

修改RocketMq源码  distribution/conf/broker.conf 和 distribution/conf/plain_acl.yml之后,配置文件未生效

在启动broker.startup时,配置运行环境

 Program arguments: 配置文件 输入 -c C:UsersAdministratorDesktop ocketmq-all-4.7.1-source-releasedistributionconfroker.conf

Environment variables:设置环境变量 输入 ROCKETMQ_HOME=C:UsersAdministratorDesktop ocketmq-all-4.7.1-source-releasedistribution

rocketmq中只有broker需要设置环境变量

producer使用acl:

public static void main(String[] args) throws Exception {

    //Instantiate with a producer group name.
    DefaultMQProducer producer = new
            DefaultMQProducer("please_rename_unique_group_name",getAclRPCHook());

    // Specify name server addresses.
    producer.setNamesrvAddr("localhost:9876");

    //Launch the instance.
    producer.start();

    for (int i = 0; i < 10; i++) {
        //Create a message instance, specifying topic, tag and message body.
        Message msg = new Message("TopicTest",
                "TagA",
                ("Hello RocketMQ wangshuai").getBytes(RemotingHelper.DEFAULT_CHARSET));

        //Call send message to deliver message to one of brokers.
        SendResult sendResult = producer.send(msg);

        System.out.printf("%s%n", sendResult);
    }
    //Shut down once the producer instance is not longer in use.
    producer.shutdown();
}
static RPCHook getAclRPCHook(){
    return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));
}

consumer使用acl:

public static void main(String[] args) throws InterruptedException, MQClientException {

   // Instantiate with specified consumer group name.
   DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name",getAclRPCHook(),new AllocateMessageQueueAveragely());

   // Specify name server addresses.
   consumer.setNamesrvAddr("localhost:9876");

   // Subscribe one more more topics to consume.
   consumer.subscribe("TopicTest", "*");

   // Register callback to execute on arrival of messages fetched from brokers.
//      注册回调 以便获取到达broker的消息
   consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
           System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
           return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

     //Launch the consumer instance.
     consumer.start();

     System.out.printf("Consumer Started.%n");
}
static RPCHook getAclRPCHook(){
    return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));
}

  

  

原文地址:https://www.cnblogs.com/pass-ion/p/13489705.html