RocketMq基本使用

  • 导入MQ客户端依赖
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.0</version>
</dependency>
  • 消息发送者步骤
1.创建消息生产者producer,并制定生产者组名
2.指定NameServer地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer
  • 消息消费者步骤
1.创建消费者Consumer,并指定消费者组名
2.指定NameServer地址
3.订阅主题Topic和Tag
4.设置回调函数,处理信息
5.启动消费者consumer

1. 基本样例

消息发送

1. 发送同步消息

用于相应时间不敏感的场景

package com.example.rocketmq.producer;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class BaseProducer {

    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("baseGroup");
        //2.指定NameServer地址
        producer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
        //3.启动producer
        producer.start();

        for (int i = 0; i < 10; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            Message msg = new Message("baseMQ","A",("我是BaseMQ消息"+i).getBytes());
            //5.发送消息 (同步消息)
            SendResult result = producer.send(msg);
            System.out.println(result);
        }
        //6.关闭生产者producer
        producer.shutdown();
    }
}

2. 发送异步消息

用于对相应时间敏感的场景

package com.example.rocketmq.producer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("baseGroupA");
        //2.指定NameServer地址
        producer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
        //3.启动producer
        producer.start();
        for (int i = 0; i < 10; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            Message msg = new Message("baseMQ","B",("我是异步消息"+i).getBytes());
            //5.发送消息
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.print("发送成功:");
                    System.out.println(sendResult);
                }

                @Override
                public void onException(Throwable throwable) {
                    System.out.print("发送异常:");
                    System.out.println(throwable);
                }
            });
            //防止发送过快而导致链接失败
            Thread.sleep(1000L);
        }
        //6.关闭生产者producer
        producer.shutdown();
    }
}

3. 发送单向消息

对发送结果不关心的场景,如发送日志。

package com.example.rocketmq.producer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class OneWayProducer {
    public static void main(String[] args) throws Exception{
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("baseGroup");
        //2.指定NameServer地址
        producer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
        //3.启动producer
        producer.start();
        for (int i = 0; i < 10; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            Message msg = new Message("baseMQ","A",("我是BaseMQ单向消息"+i).getBytes());
            //5.发送消息
            producer.sendOneway(msg);
        }
        //6.关闭生产者producer
        producer.shutdown();
    }
}

消息消费

1. 负载均衡模式(默认)

所有消息的消费均平分给消费者集群中订阅相同的的每一台机器

package com.example.rocketmq.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,并指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BaseGroup");
        //2.指定NameServer地址
        consumer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("baseMQ","A");
        //4.设置回调函数,处理信息
        consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
            for (MessageExt messageExt : list) {
                byte[] body = messageExt.getBody();
                String msg = null;
                try {
                    msg = new String(body,"utf-8");
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
                System.out.println(msg);
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        //5.启动消费者consumer
        consumer.start();
    }
}

2. 广播模式

所有消费者都能受到所有的消息,设置setMessageModelMessageModel.BROADCASTING即可开启广播模式。

package com.example.rocketmq.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.io.UnsupportedEncodingException;

public class BroConsumer {
    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,并指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BaseGroup");
        //2.指定NameServer地址
        consumer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("baseMQ","B");
        //设置广播模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //4.设置回调函数,处理信息
        consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
            for (MessageExt messageExt : list) {
                byte[] body = messageExt.getBody();
                String msg = null;
                try {
                    msg = new String(body,"utf-8");
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
                System.out.println(msg);
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        //5.启动消费者consumer
        consumer.start();
        System.out.println("广播模式消费者启动成功!");
    }
}

2.顺序消息

顺序消息消费主要是保证局部消息顺序。因为消费者是多线程消费borker中的所有队列,所以要保证消费者消费顺序,生产者保证顺序消息发送到同一个broker队列中(通过业务表示如orderID之类的),消费者保证每个broker队列都用一个线程来消费,这样就可以保证消息的顺序消费。以订单消息为例子:创建、支付、完成。

消息队列一般流程

所以需要将生产者的消息按照业务表示分别发送到broker的同一个队列中去,而消费者对每个队列的消息用单线程去读取。这样就可以实现消息的顺序读取了。

消息顺序读取流程

代码实现:

  • 创建一个订单流程pojo模拟订单发送流程
package com.example.rocketmq.pojo;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;

@Data
public class OrderStep {

    private int orderId;

    private String desc;
    
    public OrderStep(int orderId, String desc){
        this.orderId = orderId;
        this.desc = desc;
    }

    //创建一系列订单模拟数据  直接用订单desc来模拟订单业务
    public static List<OrderStep>  buildOrders(){
        //模拟消息流
        List<OrderStep> orderSteps = new ArrayList<>();
        //订单1 的流程
        OrderStep order1Step1 = new OrderStep(134,"创建");
        OrderStep order1Step2 = new OrderStep(134, "支付");
        OrderStep order1Step3 = new OrderStep(134, "推送");
        OrderStep order1Step4 = new OrderStep(134, "完成");
        //订单2 的流程
        OrderStep order2Step1 = new OrderStep(135,"创建");
        OrderStep order2Step2 = new OrderStep(135, "支付");
        OrderStep order2Step3 = new OrderStep(135, "推送");
        OrderStep order2Step4 = new OrderStep(135, "完成");
        //订单3 的流程
        OrderStep order3Step1 = new OrderStep(136,"创建");
        OrderStep order3Step2 = new OrderStep(136, "支付");
        OrderStep order3Step3 = new OrderStep(136, "推送");
        //模拟发送队列中 3条订单消息交叉发送
        // 订单1创建
        orderSteps.add(order1Step1);
            //订单2创建
        orderSteps.add(order2Step1);
                //订单3创建
        orderSteps.add(order3Step1);
        //订单1支付
        orderSteps.add(order1Step2);
            //订单2支付
        orderSteps.add(order2Step2);
            //订单2推送
        orderSteps.add(order2Step3);
        //订单1推送
        orderSteps.add(order1Step3);
        //订单1完成
        orderSteps.add(order1Step4);
                //订单3支付
        orderSteps.add(order3Step2);
            //订单2完成
        orderSteps.add(order2Step4);
                //订单3推送
        orderSteps.add(order3Step3);
        return orderSteps;
    }
}
  • 消息生产者发送顺序消息
package com.example.rocketmq.producer;

import com.example.rocketmq.pojo.OrderStep;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

public class OrderProducer {
    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("OrderGroup");
        //2.指定NameServer地址
        producer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
        //3.启动producer
        producer.start();
        List<OrderStep> orderSteps = OrderStep.buildOrders();
        for (OrderStep orderStep : orderSteps) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            Message msg = new Message("Order", "shop", orderStep.toString().getBytes());
            //5.发送顺序消息
            /**
             *  参数msg:消息体
             *  参数MessageQueueSelector :消息队列选择器
             *  参数arg:业务参数
             */
            producer.send(msg, new MessageQueueSelector() {
                /**
                 * @param list    返回的所有的broker消息队列
                 * @param message 传入的消息对象
                 * @param arg     传入的业务参数
                 * @return 选择哪个消息队列来发送
                 */
                @Override
                public MessageQueue select(List<MessageQueue> list, Message message, Object arg) {
                    //写选择逻辑
                    OrderStep orderStep = (OrderStep) arg;
                    //纯数字orderId 可根据余数来选择哪个 队列发送消息
                    int index = orderStep.getOrderId() % list.size();
                    System.out.println("orderId:" + orderStep.getOrderId() + "desc:" + orderStep.getDesc() + " 处于队列:" + index);
                    return list.get(index);
                }
            }, orderStep);

            Thread.sleep(1000L);
        }
        //6.无消息发送则关闭生产者producer
        producer.shutdown();
    }
}
  • 消息消费者顺序消费消息
package com.example.rocketmq.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class OrderConsumer {
    public static void main(String[] args) throws Exception {
        //1.创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderGroup");
        //2.指定NameServer
        consumer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
        //3.订阅topic和tag  (*为订阅所有tag,tag为*时无法顺序消费消息)
        consumer.subscribe("Order", "shop");
        //4.设置监听器,处理消息  监听器用这个MessageListenerOrderly保证同一个队列单线程
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                for (MessageExt messageExt : list) {
                    System.out.println("消费者线程【" + Thread.currentThread().getName() + "】消费消息:" + new String(messageExt.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        //5.启动消费者
        consumer.start();
        System.out.println("消费者已启动!");
    }
}

3.延时消息

  • 延迟消息发送只需要设置延迟级别即可 message.setDelayTimeLevel(3)
  • 设置延时级别只能在这下面些中选 1级别对应1s以此类推
  • 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
package com.example.rocketmq.producer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class DelayProducer {
    public static void main(String[] args) throws Exception {
        //1.创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("baseGroup");
        //2.链接Nameserver
        producer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
        //3.启动生产者
        producer.start();
        for (int i = 0; i < 10; i++) {
            //4.创建发送消息,并指定topic,tag
            Message msg = new Message("delay","tagA",("我是第"+i+"条延迟消息"+" 消息发送时间为: "+System.currentTimeMillis()).getBytes());
            //设置延时级别 3 为延迟10s
            msg.setDelayTimeLevel(3);
            //5.发送消息
            producer.send(msg);
        }
        //6.无消息发送,则关闭生产者
        producer.shutdown();
    }
}
  • 消费者消费
  • 测试发现设定延时级别为10s时,落盘时间 - 发送时间 = 9s消费者消费的当前时间 - 发送时间 = 10s
package com.example.rocketmq.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;

public class Consumer {
    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,并指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("baseGroup");
        //2.指定NameServer地址
        consumer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("delay","tagA");
        //4.设置回调函数,处理信息
        consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
            for (MessageExt messageExt : list) {
                byte[] body = messageExt.getBody();
                String msg = null;
                try {
                    msg = new String(body,"utf-8");
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
                System.out.println(msg+ " 落盘时间为:"+(messageExt.getStoreTimestamp())+" 时间差:"+ (System.currentTimeMillis() - Long.parseLong(msg.substring(msg.length()-13)))/1000);
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        //5.启动消费者consumer
        consumer.start();
        System.out.println("启动成功!");
    }
}
  • 输出结果

4.批量消息

  • 生产者将发送消息保存到 List 中,直接发送List即可
  • 一般来说批量消息发送 消息大小不大于4m,大于的时候可以对消息分割后再分别发送
package com.example.rocketmq.producer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.ArrayList;
import java.util.List;

public class BatchProducer {

    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("baseGroup");
        //2.指定NameServer地址
        producer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
        //3.启动producer
        producer.start();
        List<Message> msgs = new ArrayList<>();
        //4.创建消息对象,指定主题Topic、Tag和消息体
        Message msg1 = new Message("batch","A",("我是批量消息A").getBytes());
        Message msg2 = new Message("batch","A",("我是批量消息B").getBytes());
        Message msg3 = new Message("batch","A",("我是批量消息C").getBytes());
        msgs.add(msg1);
        msgs.add(msg2);
        msgs.add(msg3);
        //5.发送消息 批量发送消息
        SendResult result = producer.send(msgs);
        System.out.println(result);
        //6.关闭生产者producer
        producer.shutdown();
    }
}
  • 消息消费和普通一样消费即可

5.过滤消息

  • 消息消费的时候可以对消息进行过滤来,消费特定的消息。

  • 在订阅时,subExpression就时过滤条件,一般可以通过 tags来过滤,MessageSelector.byTags

    consumer.subscribe("batch","tagA || tagB || tagC");
    //或者
    consumer.subscribe("batch",MessageSelector.byTags("tagA || tagB || tagC"));
  • 通过 sql 语法来过滤特定信息, MessageSelector.bySql

    • 注意:报错:The broker does not support consumer to filter message by SQL92 时,请修改broker.conf配置文件为enablePropertyFilter=true
    consumer.subscribe("baseMQ", MessageSelector.bySql("i is not null  and i < 3 and name = 'tangseng'"));
  • 生产者发送消息时 在消息中设置过滤参数

    for (int i = 0; i < 10; i++) {
              //4.创建消息对象,指定主题Topic、Tag和消息体
              Message msg = new Message("baseMQ",("我是BaseMQ消息"+i).getBytes());
              msg.putUserProperty("i",i+"");
              //设置过滤参数
              if(i < 5){
                  msg.putUserProperty("name","tangseng");
              }
              //5.发送消息
              SendResult result = producer.send(msg);
              System.out.println(result);
          }

6.事务消息

  • 事务消息流程

  • 生产者模拟发送 三条消息
package com.example.rocketmq.producer;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

public class TranProducer {

    public static void main(String[] args) throws Exception {
        //1.创建事务消息生产者
        TransactionMQProducer producer = new TransactionMQProducer("Transaction");
        //2.设置NameServer
        producer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
        //3.启动生产者
        producer.start();
        //设置回调监听者
        producer.setTransactionListener(new TransactionListener() {
            /**
             * 本地事务处理
             * @param message  发送的消息体
             * @param o        发送消息传入的参数
             * @return 消息事务状态
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                //模拟 事务三种状态   Tag A的消息 commit ,Tag B的消息 rollback ,Tag C的消息unknow
                if (message.getTags().equals("A")) {
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
                if (message.getTags().equals("B")) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
                return LocalTransactionState.UNKNOW;
            }

            /**
             * 不明事务状态的回调 查询事务状态
             * @param messageExt
             * @return
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("消息事务状态不明,主动检测事务状态");
                System.out.println(messageExt.getTags());
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        String[] tags = {"A", "B", "C"};
        for (int i = 0; i < 3; i++) {
            //4.创建消息 并指定 topic tag
            Message message = new Message("Tran", tags[i], ("我是事务消息 " + i).getBytes());
            //5.发送消息
            producer.sendMessageInTransaction(message, null);
            Thread.sleep(4000L);
        }
    }
}
  • 生产者发送结果

  • 消费者消费
package com.example.rocketmq.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

public class Consumer {
    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,并指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("baseGroup");
        //2.指定NameServer地址
        consumer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("Tran", "*");
        //4.设置回调函数,处理信息
        consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
            for (MessageExt messageExt : list) {
                System.out.println(new String(messageExt.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        //5.启动消费者consumer
        consumer.start();
        System.out.println("启动成功!");
    }
}
  • 消费者消费结果

因为【我是事务消息 1】是Tag B,在生产者事务监听器中模拟 tagB 为事务回滚,所以 tag B的消息 由于rollback而被删除。