简识MQ之Kafka、ActiveMQ、RabbitMQ、RocketMQ传递机制

news/2025/2/21 5:26:44

四种主流消息队列(Kafka、ActiveMQ、RabbitMQ、RocketMQ)的生产者与消费者传递信息的机制说明,以及实际使用中的注意事项和示例:


1. Apache Kafka

传递机制

  • 模型:基于 发布-订阅模型,生产者向 主题(Topic) 发送消息,消费者订阅主题并消费消息。
  • 核心流程
    1. 生产者将消息发送到 Kafka 集群的 Broker,根据 分区策略(如轮询、哈希)将消息写入对应的分区(Partition)。
    2. 消费者通过消费者组(Consumer Group)订阅主题,每个分区的数据会被分配给组内的消费者(通过 Rebalance 机制)。
    3. 消费者从分区中拉取消息(poll 方式)并处理。

示例代码(Kafka 生产者)

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key", "message"));
producer.close();

示例代码(Kafka 消费者)

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}

注意事项

  1. 分区与顺序性

    • Kafka 不保证跨分区的消息顺序,但单个分区内的消息按顺序存储。
    • 示例:发送订单创建事件时,需将同一用户的消息发送到同一分区(通过 key)。
  2. 消费者组与 Rebalance

    • 消费者组内成员变化时(如新增消费者),会触发分区重新分配(Rebalance),可能导致短暂消息不可读。
    • 建议:避免频繁增减消费者实例。
  3. 消息持久化

    • 生产者可通过 acks=all 确保消息写入所有副本后返回成功,但会增加延迟。
    • 适用场景:对消息可靠性要求极高的场景(如金融交易)。

2. Apache ActiveMQ

传递机制

  • 模型:支持 点对点(Queue) 和 发布-订阅(Topic) 模型。
  • 核心流程
    1. 生产者发送消息到队列或主题。
    2. 消息通过 异步/同步 方式传递给消费者(默认异步)。
    3. 可启用 持久化,消息存储到磁盘以防 Broker宕机。

示例代码(ActiveMQ 生产者)

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("my-queue");

MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("Hello ActiveMQ!");
producer.send(message);
connection.close();

示例代码(ActiveMQ 消费者)

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("my-queue");

MessageConsumer consumer = session.createConsumer(queue);
TextMessage message = (TextMessage) consumer.receive();
System.out.println("Received: " + message.getText());
consumer.acknowledge(); // 手动确认消息
connection.close();

注意事项

  1. 消息持久化

    • 需设置 DeliveryMode.PERSISTENT,否则消息可能丢失。
    • 示例:关键业务消息(如订单支付通知)必须持久化。
  2. 事务支持

    • 生产者和消费者可通过事务确保消息的原子性(发送/接收一致性)。
    • 风险:长事务可能导致性能下降。
  3. 死信队列(DLQ)

    • 配置 deadLetterExchange 和 deadLetterRoutingKey 处理无法消费的消息。
    • 示例:超过重试次数的消息自动进入 DLQ。

3. RabbitMQ

传递机制

  • 模型:灵活的消息路由模型,基于 交换器(Exchange) 和 绑定(Binding)
  • 核心流程
    1. 生产者将消息发送到交换器,并附带路由键(Routing Key)。
    2. 交换器根据类型(如 Direct、Topic、Headers)将消息路由到绑定的队列。
    3. 消费者从队列中拉取消息。

示例代码(RabbitMQ 生产者Producers)

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection()) {
    Channel channel = connection.createChannel();
    String exchangeName = "direct-exchange";
    channel.exchangeDeclare(exchangeName, "direct");
    
    String routingKey = "user.login";
    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
        .contentType("text/plain")
        .deliveryMode(2) // 持久化
        .build();
    channel.basicPublish(exchangeName, routingKey, props, "Login Event".getBytes());
}

示例代码(RabbitMQ 消费者Consumers)

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection()) {
    Channel channel = connection.createChannel();
    String queueName = "user_queue";
    channel.queueDeclare(queueName, true, false, false, null);
    
    String exchangeName = "direct-exchange";
    channel.queueBind(queueName, exchangeName, "user.login");
    
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("Received: " + message);
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    };
    channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});
}

注意事项

  1. 消息确认机制

    • 消费者需发送 ACK 确认消息处理,避免重复消费。
    • 示例:使用 channel.basicAck() 或 channel.basicNack()
  2. 死信队列配置

    • 在队列声明时配置 x-dead-letter-exchange 和 x-dead-letter-routing-key
    • 示例:处理失败的消息进入专用队列。
  3. 内存限制

    • RabbitMQ 默认限制队列大小为内存中的一定比例,需根据业务调整 vm_memory_high_watermark

4. RocketMQ

传递机制

  • 模型:基于 主题(Topic) 和 队列(Queue) 的分布式模型。
  • 核心流程
    1. 生产者Producers发送消息到主题,主题将消息路由到多个队列(负载均衡)。
    2. 消费者Consumers通过消费者组(Consumer Group)订阅主题,从队列中拉取消息。
    • 顺序消息:同一队列内的消息按顺序消费。
    • 广播消息:消费者组内每个消费者都收到同一条消息(仅限 Topic 模型)。

示例代码(RocketMQ 生产者Producers)

DefaultMQProducer producer = new DefaultMQProducer("my-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();

Message msg = new Message("my-topic", "Order-123".getBytes(), "JSON".getBytes());
SendResult sendResult = producer.send(msg);
System.out.println("Send Result: " + sendResult);

producer.shutdown();

示例代码(RocketMQ 消费者Consumers)

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("my-topic", "*"); // 订阅所有队列

consumer.registerMessageListener(new MessageListener() {
    @Override
    public void consume(Message msg, ConsumeContext context) throws Exception {
        System.out.println("Received: " + new String(msg.getBody()));
        context.commitMessage(msg); // 提交消费位移
    }
});
consumer.start();

注意事项

  1. 事务消息

    • 生产者和消费者可通过事务确保消息的最终一致性。
    • 示例:订单创建成功后,发送支付通知(若失败则回滚)。
  2. 消息顺序性

    • 严格顺序场景需指定 MessageQueueSelector,确保同一订单的所有消息进入同一队列。
  3. 消息堆积

    • 消费者处理能力不足时,消息会堆积在队列中,需监控并扩容消费者实例。

总结对比

特性KafkaActiveMQRabbitMQRocketMQ
模型发布-订阅(仅 Topic)支持点对点和发布-订阅灵活路由(多种交换器)主题+队列(顺序/广播)
持久化支持分区副本支持消息持久化和事务支持队列和消息持久化支持消息持久化和事务
顺序性单分区有序不保证(除非事务)可通过队列保证单队列严格有序
适用场景高吞吐、日志/事件流通用、企业级消息系统复杂路由、多协议支持高可靠、顺序消息、分布式事务

通用注意事项

  1. 消息幂等性:防止重复消费(如订单支付场景)。
  2. 监控与告警:关注队列长度、消息堆积、消费者延迟。
  3. 序列化与压缩:选择高效的序列化方式(如 Protobuf)和压缩算法(如 GZIP)。
  4. 连接池管理:避免频繁创建/关闭连接,影响性能。

5、注意MQ的Kafka、ActiveMQ、RabbitMQ、RocketMQ区别;

        URL: 浅识MQ的 Kafka、ActiveMQ、RabbitMQ、RocketMQ区别-CSDN博客

6、注意:持久化策略

        URL:浅聊MQ之Kafka、RabbitMQ、ActiveMQ、RocketMQ持久化策略-CSDN博客

   

(望各位潘安、各位子健/各位彦祖、于晏不吝赐教!多多指正!🙏)


http://www.niftyadmin.cn/n/5860215.html

相关文章

AI IDE 使用体验及 AI 感受

近期感觉身边所有的人或事&#xff0c;全部都陷入到了 AI 焦虑中&#xff0c;从去年一年的 AI 猎奇&#xff0c;变成了 AI 好牛&#xff0c;但是与我有关吗&#xff1f;不行&#xff0c;必须强行与我有关的节奏&#xff0c;时代的发展正倒逼着我们去改变自己的工作范式&#xf…

DeepSeek私有化专家 | 云轴科技ZStack入选IDC中国生成式AI市场概览

DeepSeek 火爆全球AI生态圈&#xff0c;并引发企业用户大量私有化部署需求。 国际数据公司IDC近日发文《DeepSeek爆火的背后&#xff0c;大模型/生成式AI市场生态潜在影响引人关注》&#xff0c;认为中国市场DeepSeekAI模型的推出在大模型/生成式AI市场上引起了轰动&#xff0c…

C语言预处理学习笔记

1. 预处理器的功能 预处理器&#xff08;Preprocessor&#xff09;在编译C语言程序之前对源代码进行预处理。预处理指令以#号开头&#xff0c;主要包括文件包含、宏定义、条件编译等功能。 2. 文件包含 文件包含功能用于在一个文件中包含另一个文件的内容&#xff0c;通常用…

java毕业设计之医院门诊挂号系统(源码+文档)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于ssm的医院门诊挂号系统。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 医院门诊挂号系统的主要使用者…

nodejs链接redis

本篇文章介绍nodejs连接redis&#xff0c;以及redis的基本使用。 安装redis。 cnpm i redis -S封装redis配置信息。 config/db.js // redis 配置 let REDIS_CONF // 开发环境 if (env dev) {REDIS_CONF {port: 6639,host: 127.0.0.1} } // 生产环境 if (env production) {R…

Stack和Queue—模拟实现,实战应用全解析!

各位看官早安午安晚安呀 如果您觉得这篇文章对您有帮助的话 欢迎您一键三连&#xff0c;小编尽全力做到更好 欢迎您分享给更多人哦 大家好&#xff0c;我们今天来学习java数据结构的Stack和Queue&#xff08;栈和队列&#xff09; 一&#xff1a;栈 1.1&#xff1a;栈的概念 …

c/c++蓝桥杯经典编程题100道(23)最小生成树

最小生成树&#xff08;MST&#xff09;问题 ->返回c/c蓝桥杯经典编程题100道-目录 目录 最小生成树&#xff08;MST&#xff09;问题 一、题型解释 二、例题问题描述 三、C语言实现 解法1&#xff1a;Kruskal算法&#xff08;基于并查集&#xff0c;难度★★★&#…

Linux System V - 消息队列与责任链模式

概念 消息队列是一种以消息为单位的进程间通信机制&#xff0c;允许一个或多个进程向队列中发送消息&#xff0c;同时允许一个或多个进程从队列中接收消息。消息队列由内核维护&#xff0c;具有以下特点&#xff1a; 异步通信&#xff1a;发送方和接收方不需要同时运行&#x…