RocketMq基本使用

  • 导入MQ客户端依赖
1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
  • 消息发送者步骤
1
2
3
4
5
6
1.创建消息生产者producer,并制定生产者组名
2.指定NameServer地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer
  • 消息消费者步骤
1
2
3
4
5
1.创建消费者Consumer,并指定消费者组名
2.指定NameServer地址
3.订阅主题Topic和Tag
4.设置回调函数,处理信息
5.启动消费者consumer

1. 基本样例

消息发送

1. 发送同步消息

用于相应时间不敏感的场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.example.rocketmq.producer;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class BaseProducer {

public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("baseGroup");
//2.指定NameServer地址
producer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.启动producer
producer.start();

for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
Message msg = new Message("baseMQ","A",("我是BaseMQ消息"+i).getBytes());
//5.发送消息 (同步消息)
SendResult result = producer.send(msg);
System.out.println(result);
}
//6.关闭生产者producer
producer.shutdown();
}
}

2. 发送异步消息

用于对相应时间敏感的场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.example.rocketmq.producer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class AsyncProducer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("baseGroupA");
//2.指定NameServer地址
producer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.启动producer
producer.start();
for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
Message msg = new Message("baseMQ","B",("我是异步消息"+i).getBytes());
//5.发送消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.print("发送成功:");
System.out.println(sendResult);
}

@Override
public void onException(Throwable throwable) {
System.out.print("发送异常:");
System.out.println(throwable);
}
});
//防止发送过快而导致链接失败
Thread.sleep(1000L);
}
//6.关闭生产者producer
producer.shutdown();
}
}

3. 发送单向消息

对发送结果不关心的场景,如发送日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.example.rocketmq.producer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class OneWayProducer {
public static void main(String[] args) throws Exception{
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("baseGroup");
//2.指定NameServer地址
producer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.启动producer
producer.start();
for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
Message msg = new Message("baseMQ","A",("我是BaseMQ单向消息"+i).getBytes());
//5.发送消息
producer.sendOneway(msg);
}
//6.关闭生产者producer
producer.shutdown();
}
}

消息消费

1. 负载均衡模式(默认)

所有消息的消费均平分给消费者集群中订阅相同的的每一台机器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.example.rocketmq.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;

public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,并指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BaseGroup");
//2.指定NameServer地址
consumer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("baseMQ","A");
//4.设置回调函数,处理信息
consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
for (MessageExt messageExt : list) {
byte[] body = messageExt.getBody();
String msg = null;
try {
msg = new String(body,"utf-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
//5.启动消费者consumer
consumer.start();
}
}

2. 广播模式

所有消费者都能受到所有的消息,设置setMessageModelMessageModel.BROADCASTING即可开启广播模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.example.rocketmq.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.io.UnsupportedEncodingException;

public class BroConsumer {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,并指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BaseGroup");
//2.指定NameServer地址
consumer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("baseMQ","B");
//设置广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
//4.设置回调函数,处理信息
consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
for (MessageExt messageExt : list) {
byte[] body = messageExt.getBody();
String msg = null;
try {
msg = new String(body,"utf-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
//5.启动消费者consumer
consumer.start();
System.out.println("广播模式消费者启动成功!");
}
}

2.顺序消息

顺序消息消费主要是保证局部消息顺序。因为消费者是多线程消费borker中的所有队列,所以要保证消费者消费顺序,生产者保证顺序消息发送到同一个broker队列中(通过业务表示如orderID之类的),消费者保证每个broker队列都用一个线程来消费,这样就可以保证消息的顺序消费。以订单消息为例子:创建、支付、完成。

消息队列一般流程

所以需要将生产者的消息按照业务表示分别发送到broker的同一个队列中去,而消费者对每个队列的消息用单线程去读取。这样就可以实现消息的顺序读取了。

消息顺序读取流程

代码实现:

  • 创建一个订单流程pojo模拟订单发送流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.example.rocketmq.pojo;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;

@Data
public class OrderStep {

private int orderId;

private String desc;

public OrderStep(int orderId, String desc){
this.orderId = orderId;
this.desc = desc;
}

//创建一系列订单模拟数据 直接用订单desc来模拟订单业务
public static List<OrderStep> buildOrders(){
//模拟消息流
List<OrderStep> orderSteps = new ArrayList<>();
//订单1 的流程
OrderStep order1Step1 = new OrderStep(134,"创建");
OrderStep order1Step2 = new OrderStep(134, "支付");
OrderStep order1Step3 = new OrderStep(134, "推送");
OrderStep order1Step4 = new OrderStep(134, "完成");
//订单2 的流程
OrderStep order2Step1 = new OrderStep(135,"创建");
OrderStep order2Step2 = new OrderStep(135, "支付");
OrderStep order2Step3 = new OrderStep(135, "推送");
OrderStep order2Step4 = new OrderStep(135, "完成");
//订单3 的流程
OrderStep order3Step1 = new OrderStep(136,"创建");
OrderStep order3Step2 = new OrderStep(136, "支付");
OrderStep order3Step3 = new OrderStep(136, "推送");
//模拟发送队列中 3条订单消息交叉发送
// 订单1创建
orderSteps.add(order1Step1);
//订单2创建
orderSteps.add(order2Step1);
//订单3创建
orderSteps.add(order3Step1);
//订单1支付
orderSteps.add(order1Step2);
//订单2支付
orderSteps.add(order2Step2);
//订单2推送
orderSteps.add(order2Step3);
//订单1推送
orderSteps.add(order1Step3);
//订单1完成
orderSteps.add(order1Step4);
//订单3支付
orderSteps.add(order3Step2);
//订单2完成
orderSteps.add(order2Step4);
//订单3推送
orderSteps.add(order3Step3);
return orderSteps;
}
}

  • 消息生产者发送顺序消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.example.rocketmq.producer;

import com.example.rocketmq.pojo.OrderStep;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

public class OrderProducer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("OrderGroup");
//2.指定NameServer地址
producer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.启动producer
producer.start();
List<OrderStep> orderSteps = OrderStep.buildOrders();
for (OrderStep orderStep : orderSteps) {
//4.创建消息对象,指定主题Topic、Tag和消息体
Message msg = new Message("Order", "shop", orderStep.toString().getBytes());
//5.发送顺序消息
/**
* 参数msg:消息体
* 参数MessageQueueSelector :消息队列选择器
* 参数arg:业务参数
*/
producer.send(msg, new MessageQueueSelector() {
/**
* @param list 返回的所有的broker消息队列
* @param message 传入的消息对象
* @param arg 传入的业务参数
* @return 选择哪个消息队列来发送
*/
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object arg) {
//写选择逻辑
OrderStep orderStep = (OrderStep) arg;
//纯数字orderId 可根据余数来选择哪个 队列发送消息
int index = orderStep.getOrderId() % list.size();
System.out.println("orderId:" + orderStep.getOrderId() + "desc:" + orderStep.getDesc() + " 处于队列:" + index);
return list.get(index);
}
}, orderStep);

Thread.sleep(1000L);
}
//6.无消息发送则关闭生产者producer
producer.shutdown();
}
}

  • 消息消费者顺序消费消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.example.rocketmq.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class OrderConsumer {
public static void main(String[] args) throws Exception {
//1.创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderGroup");
//2.指定NameServer
consumer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.订阅topic和tag (*为订阅所有tag,tag为*时无法顺序消费消息)
consumer.subscribe("Order", "shop");
//4.设置监听器,处理消息 监听器用这个MessageListenerOrderly保证同一个队列单线程
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt messageExt : list) {
System.out.println("消费者线程【" + Thread.currentThread().getName() + "】消费消息:" + new String(messageExt.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
//5.启动消费者
consumer.start();
System.out.println("消费者已启动!");
}
}

3.延时消息

  • 延迟消息发送只需要设置延迟级别即可 message.setDelayTimeLevel(3)
  • 设置延时级别只能在这下面些中选 1级别对应1s以此类推
  • 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.example.rocketmq.producer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class DelayProducer {
public static void main(String[] args) throws Exception {
//1.创建生产者
DefaultMQProducer producer = new DefaultMQProducer("baseGroup");
//2.链接Nameserver
producer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.启动生产者
producer.start();
for (int i = 0; i < 10; i++) {
//4.创建发送消息,并指定topic,tag
Message msg = new Message("delay","tagA",("我是第"+i+"条延迟消息"+" 消息发送时间为: "+System.currentTimeMillis()).getBytes());
//设置延时级别 3 为延迟10s
msg.setDelayTimeLevel(3);
//5.发送消息
producer.send(msg);
}
//6.无消息发送,则关闭生产者
producer.shutdown();
}
}
  • 消费者消费
  • 测试发现设定延时级别为10s时,落盘时间 - 发送时间 = 9s消费者消费的当前时间 - 发送时间 = 10s
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.example.rocketmq.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;

public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,并指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("baseGroup");
//2.指定NameServer地址
consumer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("delay","tagA");
//4.设置回调函数,处理信息
consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
for (MessageExt messageExt : list) {
byte[] body = messageExt.getBody();
String msg = null;
try {
msg = new String(body,"utf-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println(msg+ " 落盘时间为:"+(messageExt.getStoreTimestamp())+" 时间差:"+ (System.currentTimeMillis() - Long.parseLong(msg.substring(msg.length()-13)))/1000);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
//5.启动消费者consumer
consumer.start();
System.out.println("启动成功!");
}
}
  • 输出结果

4.批量消息

  • 生产者将发送消息保存到 List 中,直接发送List即可
  • 一般来说批量消息发送 消息大小不大于4m,大于的时候可以对消息分割后再分别发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.example.rocketmq.producer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.ArrayList;
import java.util.List;

public class BatchProducer {

public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("baseGroup");
//2.指定NameServer地址
producer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.启动producer
producer.start();
List<Message> msgs = new ArrayList<>();
//4.创建消息对象,指定主题Topic、Tag和消息体
Message msg1 = new Message("batch","A",("我是批量消息A").getBytes());
Message msg2 = new Message("batch","A",("我是批量消息B").getBytes());
Message msg3 = new Message("batch","A",("我是批量消息C").getBytes());
msgs.add(msg1);
msgs.add(msg2);
msgs.add(msg3);
//5.发送消息 批量发送消息
SendResult result = producer.send(msgs);
System.out.println(result);
//6.关闭生产者producer
producer.shutdown();
}
}
  • 消息消费和普通一样消费即可

5.过滤消息

  • 消息消费的时候可以对消息进行过滤来,消费特定的消息。

  • 在订阅时,subExpression就时过滤条件,一般可以通过 tags来过滤,MessageSelector.byTags

    1
    2
    3
    consumer.subscribe("batch","tagA || tagB || tagC");
    //或者
    consumer.subscribe("batch",MessageSelector.byTags("tagA || tagB || tagC"));
  • 通过 sql 语法来过滤特定信息, MessageSelector.bySql

    • 注意:报错:The broker does not support consumer to filter message by SQL92 时,请修改broker.conf配置文件为enablePropertyFilter=true
    1
    consumer.subscribe("baseMQ", MessageSelector.bySql("i is not null  and i < 3 and name = 'tangseng'"));
  • 生产者发送消息时 在消息中设置过滤参数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    for (int i = 0; i < 10; i++) {
    //4.创建消息对象,指定主题Topic、Tag和消息体
    Message msg = new Message("baseMQ",("我是BaseMQ消息"+i).getBytes());
    msg.putUserProperty("i",i+"");
    //设置过滤参数
    if(i < 5){
    msg.putUserProperty("name","tangseng");
    }
    //5.发送消息
    SendResult result = producer.send(msg);
    System.out.println(result);
    }

    6.事务消息

  • 事务消息流程

  • 生产者模拟发送 三条消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.example.rocketmq.producer;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

public class TranProducer {

public static void main(String[] args) throws Exception {
//1.创建事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("Transaction");
//2.设置NameServer
producer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.启动生产者
producer.start();
//设置回调监听者
producer.setTransactionListener(new TransactionListener() {
/**
* 本地事务处理
* @param message 发送的消息体
* @param o 发送消息传入的参数
* @return 消息事务状态
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//模拟 事务三种状态 Tag A的消息 commit ,Tag B的消息 rollback ,Tag C的消息unknow
if (message.getTags().equals("A")) {
return LocalTransactionState.COMMIT_MESSAGE;
}
if (message.getTags().equals("B")) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}

/**
* 不明事务状态的回调 查询事务状态
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("消息事务状态不明,主动检测事务状态");
System.out.println(messageExt.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
});

String[] tags = {"A", "B", "C"};
for (int i = 0; i < 3; i++) {
//4.创建消息 并指定 topic tag
Message message = new Message("Tran", tags[i], ("我是事务消息 " + i).getBytes());
//5.发送消息
producer.sendMessageInTransaction(message, null);
Thread.sleep(4000L);
}
}
}
  • 生产者发送结果

  • 消费者消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.example.rocketmq.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,并指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("baseGroup");
//2.指定NameServer地址
consumer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("Tran", "*");
//4.设置回调函数,处理信息
consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
for (MessageExt messageExt : list) {
System.out.println(new String(messageExt.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
//5.启动消费者consumer
consumer.start();
System.out.println("启动成功!");
}
}
  • 消费者消费结果

因为【我是事务消息 1】是Tag B,在生产者事务监听器中模拟 tagB 为事务回滚,所以 tag B的消息 由于rollback而被删除。