Rocketmq之一个JVM中producer的producerGroup要唯一

如果代码是这么写的

public static void main (String[] args) throws MQClientException {

        DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
        defaultMQProducer.setProducerGroup("operationLogGroup");
        defaultMQProducer.setInstanceName("Constant.operationLogInstance");
        defaultMQProducer.setNamesrvAddr("Constant.rocketQueneAddr");

        DefaultMQProducer defaultMQProducer2 = new DefaultMQProducer();
        defaultMQProducer2.setProducerGroup("operationLogGroup");
        defaultMQProducer2.setInstanceName("Constant.operationLogInstance");
        defaultMQProducer2.setNamesrvAddr("Constant.rocketQueneAddr");
        try {
            defaultMQProducer.start();
            defaultMQProducer2.start();
            Message message = new Message();

            defaultMQProducer.send(message);
        } catch (Exception e) {
            System.out.println("produce operation log message error" + e.getMessage());
        } finally {
            defaultMQProducer.shutdown();
        }
    }

一旦执行会报错

produce operation log message errorThe producer group[operationLogGroup] has been created before, specify another name please.

源代码是在这里

DefaultMQProducerImpl#start()

public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;

                this.checkConfig();

                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }

                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
        if (null == group || null == producer) {
            return false;
        }

        MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
        if (prev != null) {
            log.warn("the producer group[{}] exist already.", group);
            return false;
        }

        return true;
    }

其中  private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>(); 

所以一个JVM中同一个producergroup不能有两个producer的

同样的是,在一个JVM进程中同一个消费组也不能有两个消费者。

 
原文地址:https://www.cnblogs.com/juniorMa/p/14756985.html