RocketMQ生产者一个应用不能发送多个NameServer消息解决

软件发布|下载排行|最新软件

当前位置:首页IT学院IT技术

RocketMQ生产者一个应用不能发送多个NameServer消息解决

梦想实现家_Z   2022-11-30 我要评论

前言

目前有两套RocketMQ集群,集群A包含topic名称为cluster_A_topic,集群B包含topic名称为cluster_B_topic,在应用服务OrderApp上通过RocketMQ Client创建两个DefaultMQProducer实例发送消息给集群A和集群B

架构图如下:

根据上述架构图,我们给出的示例代码如下:

// 创建第一个DefaultMQProducer
DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1");
    // 设置nameServer地址
    producer1.setNamesrvAddr("192.168.2.230:9876");
    try {
      producer1.start();
      // 发送消息
      SendResult result1 = producer1.send(new Message("cluster_A_topic", "ping".getBytes(StandardCharsets.UTF_8)));
      switch (result1.getSendStatus()) {
        case SEND_OK:
          System.out.println("cluster_A_topic 发送成功!");
          break;
        case FLUSH_DISK_TIMEOUT:
          System.out.println("cluster_A_topic 持久化失败!");
          break;
        case FLUSH_SLAVE_TIMEOUT:
          System.out.println("cluster_A_topic 同步slave失败!");
          break;
        case SLAVE_NOT_AVAILABLE:
          System.out.println("cluster_A_topic 副本不可用!");
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    // 创建第二个DefaultMQProducer
    DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_2");
    // 设置nameServer地址
    producer2.setNamesrvAddr("192.168.2.231:9876");
    try {
      producer2.start();
      // 发送消息
      SendResult result2 = producer2.send(new Message("cluster_B_topic", "ping".getBytes(StandardCharsets.UTF_8)));
      switch (result2.getSendStatus()) {
        case SEND_OK:
          System.out.println("cluster_B_topic 发送成功!");
          break;
        case FLUSH_DISK_TIMEOUT:
          System.out.println("cluster_B_topic 持久化失败!");
          break;
        case FLUSH_SLAVE_TIMEOUT:
          System.out.println("cluster_B_topic 同步slave失败!");
          break;
        case SLAVE_NOT_AVAILABLE:
          System.out.println("cluster_B_topic 副本不可用!");
      }
      return "ok";
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      producer1.shutdown();
      producer2.shutdown();
    }

结果竟然报错了,报错内容时cluster_B_topic不存在:

经过不断的测试,发现只有放在最前面启动的DefaultMQProducer会生效,后面启动的DefaultMQProducer发送消息就报错说对应的topic不存在,而且报错的broker竟然是前面启动的DefaultMQProducer对应的broker。这就不科学了,难道RocketMQ不允许在一个应用上创建多个生产者?

问题定位

首先说明一下,当前使用的RocketMQ Client版本是4.8.0。为了确定是哪儿出了问题,不得不对源码来一波探索[哭泣脸

Copyright 2022 版权所有 软件发布 访问手机版

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 联系我们