RocketMQ普通消息的同步、异步、单向消息放送方式详解

奋斗吧
奋斗吧
擅长邻域:未填写

标签: RocketMQ普通消息的同步、异步、单向消息放送方式详解 JavaScript博客 51CTO博客

2023-06-24 18:24:02 248浏览

RocketMQ普通消息的同步、异步、单向消息放送方式详解,同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。这种可靠性同步地发送方式


普通消息-三种消息发送方式

1 发送同步消息

同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

RocketMQ普通消息的同步、异步、单向消息放送方式详解_java

代码演示

package com.neu.rocketmq.example.quickstart;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: yqq
 * @Date: 2023/06/19/18:31
 * @Description:
 */
public class SyncProducer {
    public static void main(String[] args) throws Exception {
        //实例化生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("group-test");
        //设置NameServer地址
        producer.setNamesrvAddr("node1:9876");
        //启动Producer实例
        producer.start();
        //同步方法发送10条消息
        for (int i = 0; i < 10; i++) {
            //创建消息,并指定Topic,Tag和消息体
            Message message = new Message("TopicTest", "TagA", ("Hello RocketMQ" + i).getBytes());
            //同步发送方式
            SendResult sr = producer.send(message);
            //通过sr返回消息是否送达
            System.out.printf("%s%n",sr);
        }
        //如果不再发送消息,关闭producer
        producer.shutdown();
    }
}
SendResult [sendStatus=SEND_OK, msgId=7F000001217818B4AAC260B91B3A0000, offsetMsgId=C0A858A100002A9F00000000000012AE, messageQueue=MessageQueue [topic=TopicTest, brokerName=node1.itcast.cn, queueId=2], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=7F000001217818B4AAC260B91B3F0001, offsetMsgId=C0A858A100002A9F000000000000139C, messageQueue=MessageQueue [topic=TopicTest, brokerName=node1.itcast.cn, queueId=3], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=7F000001217818B4AAC260B91B410002, offsetMsgId=C0A858A100002A9F000000000000148A, messageQueue=MessageQueue [topic=TopicTest, brokerName=node1.itcast.cn, queueId=0], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=7F000001217818B4AAC260B91B420003, offsetMsgId=C0A858A100002A9F0000000000001578, messageQueue=MessageQueue [topic=TopicTest, brokerName=node1.itcast.cn, queueId=1], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=7F000001217818B4AAC260B91B440004, offsetMsgId=C0A858A100002A9F0000000000001666, messageQueue=MessageQueue [topic=TopicTest, brokerName=node1.itcast.cn, queueId=2], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=7F000001217818B4AAC260B91B440005, offsetMsgId=C0A858A100002A9F0000000000001754, messageQueue=MessageQueue [topic=TopicTest, brokerName=node1.itcast.cn, queueId=3], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=7F000001217818B4AAC260B91B460006, offsetMsgId=C0A858A100002A9F0000000000001842, messageQueue=MessageQueue [topic=TopicTest, brokerName=node1.itcast.cn, queueId=0], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=7F000001217818B4AAC260B91B480007, offsetMsgId=C0A858A100002A9F0000000000001930, messageQueue=MessageQueue [topic=TopicTest, brokerName=node1.itcast.cn, queueId=1], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=7F000001217818B4AAC260B91B480008, offsetMsgId=C0A858A100002A9F0000000000001A1E, messageQueue=MessageQueue [topic=TopicTest, brokerName=node1.itcast.cn, queueId=2], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=7F000001217818B4AAC260B91B4A0009, offsetMsgId=C0A858A100002A9F0000000000001B0C, messageQueue=MessageQueue [topic=TopicTest, brokerName=node1.itcast.cn, queueId=3], queueOffset=4]
  • msgId

消息的全局唯一标识(RocketMQ的ID生成是使用机器IP和消息偏移量的组成),由消息队列 MQ 系统自动生成,唯一标识某条消息

  • sendStatus

发送的标识:成功,失败等

  • queueId

queueId是Topic的分区;Producer发送具体一条消息的时,对应选择的该Topic下的某一个Queue的标识ID。

  • queueOffset

Message queue是无限长的数组。一条消息进来下标就会涨1,而这个数组的下标就是queueOffset,queueOffset是从0开始递增。

RocketMQ普通消息的同步、异步、单向消息放送方式详解_Test_02

2 发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。

RocketMQ普通消息的同步、异步、单向消息放送方式详解_Test_03

代码演示

package com.neu.rocketmq.example.quickstart;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: yqq
 * @Date: 2023/06/19/18:54
 * @Description:
 */
public class AsynProducer {
    public static void main(String[] args) throws Exception {
        //实例化生产者
        DefaultMQProducer producer = new DefaultMQProducer("group-test01");
        //设置NameServer的地址
        producer.setNamesrvAddr("node1:9876");
        //启动Producer
        producer.start();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            //创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest01", "TagB", "OrderID888",
                    ("Hello world").getBytes(RemotingHelper.DEFAULT_CHARSET));
            //SendCallback接受异步返回结果的回调
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%s%n",sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    System.out.printf("% -10d Exception %s %n",index,e);
                    e.printStackTrace();
                }
            });
        }
        Thread.sleep(1000);
        //如果不再发送消息,关闭Producer实例
        producer.shutdown();
    }
}

发送结果分析跟发送同步消息相同。

3 单向发送

这种方式主要用在不特别关心发送结果的场景,例如日志发送。单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

RocketMQ普通消息的同步、异步、单向消息放送方式详解_java_04

package com.neu.rocketmq.example.quickstart;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: yqq
 * @Date: 2023/06/19/19:33
 * @Description:
 */
public class OnceProducer {
    public static void main(String[] args) throws Exception {
        //实例化生产者
        DefaultMQProducer producer = new DefaultMQProducer("group-test02");
        //设置NameServer地址
        producer.setNamesrvAddr("node1:9876");
        //启动生产者
        producer.start();
        for (int i = 0; i < 10; i++) {
            //创建消息,并指定Topic,Tag,和消息
            Message msg = new Message("TopicTest02", "TagC",
                    ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            //发送单向消息,没有任何返回结果
            producer.sendOneway(msg);
        }
        //关闭producer
        producer.shutdown();
    }
}

RocketMQ普通消息的同步、异步、单向消息放送方式详解_rocketmq_05

4 消息发送的权衡

RocketMQ普通消息的同步、异步、单向消息放送方式详解_Test_06


好博客就要一起分享哦!分享海报

此处可发布评论

评论(0展开评论

暂无评论,快来写一下吧

展开评论

您可能感兴趣的博客

客服QQ 1913284695