RocketMQ学习笔记(三)
技术分析
1.技术选型
- SpringBoot
- Dubbo
- Zookeeper
- RocketMQ
- Mysql
2.SpringBoot整合RocketMQ
1.生产者项目(SpringBoot项目)
1.引入项目依赖
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<!-- 可选 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
2.修改配置文件
- application.properties
rocketmq.name-server=192.168.2.104:9876;192.168.2.105:9876
rocketmq.producer.group="BaseProducer"
3.发送消息测试
- 利用
rocketMQTemplate
发送消息测试
package com.tangseng.rocketmq.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest()
@Slf4j
class ProducerApplicationTests {
@Autowired
RocketMQTemplate template;
@Test
void testSendMsg() {
template.convertAndSend("test", "这是一条消息测试");
log.info("消息发送成功");
}
@Test
void contextLoads() {
}
}
2.消费者项目(SpringBoot项目)
1.引入所需依赖
<!-- 消费者监听消息,所以需要引入web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
2.修改配置文件
- application.properties
rocketmq.name-server=192.168.2.105:9876;192.168.2.104:9876
#用于配置listener时的参数
rocketmq.consumer.group=BaseConsumerGroup
3.测试消费消息
- 新建一个listenter来监听消息,指定topic,消息监听mode等,该消息监听者为哪个集群的
package com.tangseng.rocketmq.consumer.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RocketMQMessageListener(topic = "test",consumerGroup = "${rocketmq.consumer.group}")
public class TestConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("接收到消息【{}】",s);
}
}
- 启动服务
3.Spring整合Dubbo
本笔记是整合com.alibaba项目的dubbo,如果需要整合org.apache下的dubbo,看这个Github说明文档
Dubbo 目前有如图所示的 5 个分支,其中 2.7.1-release 只是一个临时分支,忽略不计,对其他 4 个分支进行介绍。
- 2.5.x 近期已经通过投票,Dubbo 社区即将停止对其的维护。
- 2.6.x 为长期支持的版本,也是 Dubbo 贡献给 Apache 之前的版本,其包名前缀为:com.alibaba,JDK 版本对应 1.6。
- 3.x-dev 是前瞻性的版本,对 Dubbo 进行一些高级特性的补充,如支持 rx 特性。
- master 为长期支持的版本,版本号为 2.7.x,也是 Dubbo 贡献给 Apache 的开发版本,其包名前缀为:org.apache,JDK 版本对应 1.8。
引用自徐靖峰|个人博客
1.搭建Zookeeper集群
1.准备工作
准备集群主机
序号 机器IP zookeeper1 192.168.2.105 zookeeper2 192.168.2.106 配置每台主机
安装JDK。
下载 Zookeeper,上传到服务器中,解压。
配置zookeeper
#进入配置文件目录 cd /usr/local/apache-zookeeper-3.6.0-bin/conf #以默认模板 新建配置文件 cp zoo_sample.cfg zoo.cfg #新建目录保存zookeeper数据 cp ../ mkdir data #修改配置文件 vim conf/zoo.cfg
修改zoo.cfg的data目录为新建目录。
dataDir=/usr/local/apache-zookeeper-3.6.0-bin/data
2.配置集群
在每个zookeeper的data目录下创建一个myid文件,内容分别时 1、2、3以此类推,记录每个服务器的ID
cd /usr/local/apache-zookeeper-3.6.0-bin/data vim myid
zookeeper1 的 myid:
1
zookeeper2 的 myid:
2
为每个zookeeper服务添加集群服务器IP列表
修改zoo.cfg配置文件
server.1=192.168.2.105:10101:20201 server.2=192.168.2.106:10102:20202
说明:server.
myid的值
=主机id
:服务端口(clientPort)
:用于选举的端口
;注意端口配置防火墙开放,或配置开放安全组。
3.启动集群
启动服务
#进入bin目录 cd /usr/local/apache-zookeeper-3.6.0-bin/bin #启动服务 sh zkServer.sh start
启动成功,输出日志:
/usr/bin/java ZooKeeper JMX enabled by default Using config: /usr/local/apache-zookeeper-3.6.0-bin/bin/../conf/zoo.cfg Starting zookeeper ... STARTED
查询集群状态
sh zkServer.sh status
状态结果:
ZooKeeper JMX enabled by default Using config: /usr/local/apache-zookeeper-3.6.0-bin/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: leader
根据
Mode
中是leader
或follower
确定当前zookeeper服务是主还是从。
2.RPC服务接口
新建一个普通 maven 项目 命名为dubbo-demo-common 或者 dubbo-demo-api。该项目是RPC服务接口项目,也可以说是公共服务包。
简单RPC服务 测试接口
package com.tangseng.demo.common.service; public interface IUserService { String sayHello(String name); }
3.dubbo服务生产者
1. 项目结构选择
单个项目做demo的话,可以直接新建一个SpringBoot项目,命名为dubbo-demo-producer 链接zookeeper,再在依赖中引入dubbo-demo-common 依赖来实现 RPC接口,即可将服务注册到zookeeper中。
2.生产者依赖配置
- 添加依赖
<!--dubbo整合SpringBoot包-->
<dependency>
<groupId>com.alibaba.spring.boot</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
<!--zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
</dependency>
<!--RPC接口包-->
<dependency>
<groupId>com.tangseng</groupId>
<artifactId>dubbo-demo-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- web项目包 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
- 添加配置文件
spring.application.name=dubbo-demo-dsc
#dubbo配置
spring.dubbo.application.name=dubbo-demo-dsc
spring.dubbo.application.id=dubbo-demo-dsc
spring.dubbo.registry.address=zookeeper://192.168.2.105:2181;zookeeper://192.168.2.106:2181;
spring.dubbo.server=true
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20880
- 开启dubbo注解配置 启动类中加入
@EnableDubboConfiguration
注解。
package com.tangseng.dubbodemodsc;
import com.alibaba.dubbo.config.spring.context.annotation.EnableDubboConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableDubboConfiguration
@SpringBootApplication
public class DubboDemoDscApplication {
public static void main(String[] args) {
SpringApplication.run(DubboDemoDscApplication.class, args);
}
}
- 实现RPC接口
package com.tangseng.demo.producer.service.impl;
import com.tangseng.demo.common.service.IUserService;
import org.springframework.stereotype.Service;
@Service
@com.alibaba.dubbo.config.annotation.Service(interfaceClass = IUserService.class)
public class IUserServiceImpl implements IUserService {
@Override
public String sayHello(String name) {
return "Hello world ! " +name;
}
}
5.dubbo服务消费者
- 添加依赖
<!--dubbo整合SpringBoot包-->
<dependency>
<groupId>com.alibaba.spring.boot</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
<!--zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
</dependency>
<!--RPC接口包-->
<dependency>
<groupId>com.tangseng</groupId>
<artifactId>dubbo-demo-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- web项目包 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
- 添加配置
spring.application.name=dubbo-demo-comsumer
server.port=8088
logging.level.root=debug
#dubbo配置
spring.dubbo.application.name=dubbo-demo-comsumer
spring.dubbo.application.id=dubbo-demo-comsumer
spring.dubbo.registry.address=zookeeper://192.168.2.105:2181;zookeeper://192.168.2.106:2181
- 启用dubbo注解
在启动类中加入@EnableDubboConfiguration
注解
package com.tangseng.demo.consumer;
import com.alibaba.dubbo.config.spring.context.annotation.EnableDubboConfig;
import com.alibaba.dubbo.spring.boot.annotation.EnableDubboConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableDubboConfiguration
@SpringBootApplication
public class DubboDemoConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(DubboDemoConsumerApplication.class, args);
}
}
- 调用RPC接口 (注意==@Reference==为dubbo包下的注解,而不是Spring的注解)
package com.tangseng.demo.consumer.controller;
import com.alibaba.dubbo.config.annotation.Reference;
import com.tangseng.demo.common.service.IUserService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/user")
public class UserController {
@Reference
IUserService service;
@GetMapping("/say")
public String sayHello(String name){
return service.sayHello(name);
}
}
高级特性
1. 消息存储
RocketMQ消息存储方式:
- 关系型数据库 (千万级别的不推荐用)
- 文件系统 (默认)
在性能上 文件系统 > 关系型数据库
文件系统
- CommitLog:存储消息元数据 (包含了ConsumerQueue)。
- ConsumerQueue:消息逻辑队列 存储队列索引,consumer通过ConsumerQueue快速确定CommitLog中消息位置,来加快消息读取速度。
- IndexFile:索引文件,提供consumer通过 key,时间区间等去找CommitLog中消息。
文件系统中采用==顺序写==,可以提高消息写入速率,而在消息读取方面采用==”零拷贝“==,跳过用户态复制提高消息读取效率,由于零拷贝
作用文件大小范围为 1.5~2G,所以一般设置CommitLog大小为1G。
2. 刷盘机制
为了消息能在断电等异常状态中可以恢复,所以将消息保存到磁盘中。
- 同步刷盘:生产者发送消息给MQ后,MQ将消息保存到磁盘中才告诉生产者消息发送成功。该方式安全,服务器宕机,断电等可以快速恢复消息,但是性能较差。
- 异步刷盘:生产者发送消息给MQ后,MQ接收到后直接返回成功,之后再异步去保存到磁盘中。该方式性能好,可以容纳更多的吞吐量,但是消息相对不安全。如果服务器断电,可能消息会丢失。
在配置文件中指定刷盘方式:
#刷盘方式
# ASYNC_FLUSH 异步刷盘
# SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
3.高可用配置
保证系统高可用性策略
- 搭建NameServer 集群
- 采用主-从节点的方式搭建 Broker
- 消费者消费消息自动切换master、slave
- 采用双主双从模式Borker
主-从复制模式配置
#Broker角色
# ASYNC_MASTER 异步复制Master
# SYNC_MASTER 同步双写Master
# SLAVE
brokerRole=SYNC_MASTER
异步复制性能高,同步双写安全性高。
推荐配置为:刷盘模式采用异步刷盘,主从复制模式采用同步双写。
4.负载均衡
生产者发送消息时会轮询发送到每一个borker的MessageQueue中,实现负载均衡。MQ中默认内部已经实现。
消费者负载均衡模式消费消息
5.消息重试
消费者消费消息失败或者消费者认为设置在下面三种情况消息就会重试消费:
- 消费时 返回 Action.ReconsumerLater。
- 消费时 返回 null。
- 消费时 出现异常。
消息默认重试16次每次间隔时间不一,当重试了16次仍然不成功,则消息进入死信队列
。
顺序消息重试: 为了保证消息的顺序,在顺序消息发生重试时,不会消费该MessageQueue的其他消息,这样就会导致该MessageQueue中的消息阻塞,需要在消费代码中避免消息阻塞。
无序消息重试: 集群模式重试。
可通过返回Action.commitMessage
来通知MQ不再重试消息。
6.死信队列
在消息达到最大重试次数时,该消息就会被移至死信队列
,死信队列
中的消息不再消费,默认保存3天后,自动删除。需要定期处理死信队列中的消息,①重发该消息。②用特定的消费者定期处理死信队列中的消息。
7.消息幂等性
MQ中可能会产生重复的消息,在消费者消费消息时需要对消息幂等消费。
产生消息重复消息:
- 发送消息时,MQ在收到生产者发送的消息后,还没返回发送成功状态给生产者时发生宕机或断电。生产者以为消息发送失败,重新发送消息。
- 消息消费时,重复消费消息。
- 消费者负载均衡时,设备宕机等引发的Rebalance,会出现不同消费者消费同一个消息的情况。
解决方法:传入业务唯一性标识,将消息及消费该消息状态持久化到数据库中。每次在消费前根据业务ID来判断该消息是否已经被消费。