RabbitMQ学习笔记

RabbitMQ

一、概述

1.1 JMS

Java Message Service,java平台中面向消息中间件的API,类似JDBC,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

1.2工作原理

RabbitMQ基本结构

image-20210223140145763

组成部分说明如下:

  • Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。

  • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。. Queue :消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。

  • Producer:消息生产者,即生产方客户端,生产方客户端将消费发送到MQ。. Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

----发送消息-----

1、生产者和Broker建立TCP连接。

2、生产者和Broker建立通道。

3、生产者通过通道消息发送Broker,由Exchange将消息进行转发。4、Exchange将消息转发到指定的Queue (队列)

4、Exchange将消息转发到指定的Queue

----接收消息----

1、消费者和Broker建立TCP连接

2、消费者和Broker建立通道

3、消费者监听指定的Queue (队列)

4、当有消息到达Queue时Broker默认将消息推送给消费者。5、消费者接收到消息。

二、生产者消费者实例

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private static final String QUEUE_NAME="hello_world";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");

factory.setVirtualHost("/");

Connection connection=null;
Channel channel=null;
try {
// 建立连接
connection = factory.newConnection();
// 创建会话通道,生产者和mq服务所有通信都在channel通道中完成
channel = connection.createChannel();
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
// 1.队列名称 2.持久化 3.是否独占连接(队列只允许在该连接中访问) 4.自动删除 5.扩展参数(存活时间等等)
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
// 发送消息
// 1.exchange 不指定则为默认交换机 2.routingKey 路由key,交换机根据key将消息转发到指定的队列。默认交换机,routingKey默认为队列名称
String msg="hello_world!!";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("已发送到MQ:"+msg);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

运行后可以在界面中看到创建了新的消息队列,有一条待发送消息

image-20210223191908528

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private static final String QUEUE_NAME="hello_world";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");

factory.setVirtualHost("/");

Connection connection;
try {
// 建立连接
connection = factory.newConnection();
// 创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
// 监听队列
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
// 消费方法
DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
// 接收到消息后,此方法会调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 交换机
String exchange = envelope.getExchange();
// 消息id
long deliveryTag = envelope.getDeliveryTag();
// 消息内容
String s = new String(body,"utf-8");
System.out.println("收到消息:"+s);
System.out.println("交换机:"+exchange);
System.out.println("消息id:"+deliveryTag);
}
};
// 消费 1.队列名称 2.autoAck 自动回复 3.消费方法:当消费者接收到消息要执行的方法
String s = channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}

生产者需要关闭通道和连接,但是消费者不用,因为消费者需要一直监听

image-20210223193142225

工作模式

RabbitMq有以下几种工作模式:

  • Work Queues 工作队列
  • Publish/Subscribe 发布订阅
  • Routing 路由
  • Topics 通配符
  • Header Header转发器
  • RPC 远程过程调用

工作队列

image-20210223194059827

多个消费端消费同一个队列中的消息。

  • 一个生产者将消息发送给一个队列,多个消费者同时监听一个队列的消息。

  • 消息不能被重复消费

  • 采用轮询的方式将消息平均发送给消费者

发布订阅模式

image-20210223194516223
  • 一个生产者将消息发送给交换机
  • 与交换机绑定的有多个队列,每个消费者监听自己的队列
  • 生产者将消息发送给交换机,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都会收到消息

交换机类型:

  • fanout:对应工作模式为:Publish/Subscribe
  • direct:对应工作模式为:Routing
  • topic:对应工作模式为:Topics
  • headers:对应工作模式为:headers

Routing路由模式

根据routingKey路由到响应的队列进行发送

Topics通配符工作模式

与路由模式类似,但是匹配队列的方式为通配符匹配,类似模糊查询

  • #:匹配一个或多个词
  • *:匹配一个词

Header模式

Header取消了RoutingKey,使用header中的key-value来匹配队列

RPC模式

RPC即客户端远程调用服务端的方法,使用MQ可以实现RP.C的异步调用,基于Direct交换机实现,

流程如下:

1、客户端既是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。

2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果

3、服务端将RPC方法的结果发送到RPC响应队列

MQ设计模型

设计理念

如何保证消息不丢失?

持久化机制、消息ack、 消息确认机制:消费者消费成功后再删除该消息

MQ服务端向消费者推送消息 和 消费主动拉取消息 的区别。

  • 前者消费者已经和MQ保持了长连接
  • 后者为消费者第一次启动

如何对抗高并发

消费者根据自身的能力,拉取MQ服务器端信息进行消费。实现流量削峰。

缺点:存在延迟的问题

如何避免消息堆积

  • 提高消费者速率
  • 消费者批量获取消息

如何避免消费者重复消费(幂等问题)

全局id+业务场景保证唯一性

如何保证消息顺序一致性

绑定同一个消费者和队列

MQ推与拉取架构模型

多线程版本MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class MQThreadVersion {
private static LinkedBlockingDeque<String> msgs = new LinkedBlockingDeque<>();

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
//生产者线程
executorService.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1);
for (int i = 0; i < 10; i++) {
msgs.offer("消息" + i);
System.out.println("生产者投递了消息" + i);
}

} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 消费者线程
executorService.execute(() -> {
while (true) {
if (!msgs.isEmpty()) {
System.out.println(Thread.currentThread().getName() + "拿到消息:" + msgs.poll());
}
}

});
}
}

基于网络通讯版本MQ(netty)

首先有三个端,MQServerMQProducerMQConsumer

maven项目导入netty依赖

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class NettyMQConsumer {

public static void main(String[] args) {
int port = 9008;
NettyMQConsumer consumer = new NettyMQConsumer();
consumer.connect(port,"127.0.0.1");
}
public void connect(int port, String host) {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {

client.group(group)
// 设置为netty客户端
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyMQConsumer.NettyClientHandler());
}
});

// 绑定端口,异步链接操作
ChannelFuture future = client.connect(host, port).sync();
// 等待客户端连接端口关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 优雅关闭线程组
group.shutdownGracefully();
}
}

public class NettyClientHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
JSONObject json = new JSONObject();
json.put("type", "consumer");
// 生产发送数据
byte[] req = json.toJSONString().getBytes();
ByteBuf firstMsg = Unpooled.buffer(req.length);
firstMsg.writeBytes(req);
ctx.writeAndFlush(firstMsg);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class NettyMQProducer {
public void connect(int port, String host) {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {

client.group(group)
// 设置为netty客户端
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyMQProducer.NettyClientHandler());
}
});

// 绑定端口,异步链接操作
ChannelFuture future = client.connect(host, port).sync();
// 等待客户端连接端口关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 优雅关闭线程组
group.shutdownGracefully();
}
}

public class NettyClientHandler extends ChannelHandlerAdapter {


@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

JSONObject data = new JSONObject();
data.put("type", "producer");
JSONObject msg = new JSONObject();
msg.put("userId", "123456");
msg.put("age", "23");
data.put("msg", msg);
// 生产发送数据
byte[] req = data.toJSONString().getBytes();
ByteBuf firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
ctx.writeAndFlush(firstMSG);
}


/**
* 客户端读取服务器端数据
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
new String(bytes,"UTF-8");
System.out.println("客户端接收到服务器端请求");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
public class NettyMQServer {
public static void main(String[] args) throws Exception {
int port = 9008;
new NettyMQServer().bind(port);
}
public void bind(int port) throws Exception {
/**
* Netty 抽象出两组线程池BossGroup和WorkerGroup
* BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写。
*/
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(bossGroup, workerGroup)
// 设定NioServerSocketChannel 为服务器端
.channel(NioServerSocketChannel.class)
//BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,
//用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
.option(ChannelOption.SO_BACKLOG, 100)
// 服务器端监听数据回调Handler
.childHandler(new NettyMQServer.ChildChannelHandler());
//绑定端口, 同步等待成功;
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("当前服务器端启动成功...");
//等待服务端监听端口关闭
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
//优雅关闭 线程组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 设置异步回调监听
ch.pipeline().addLast(new NettyMQServer.NettyServerHandler());
}
}

public static final String type_producer = "producer";
public static final String type_consumer = "consumer";
private static LinkedBlockingDeque<String> msgs =new LinkedBlockingDeque<>();
private static ArrayList<ChannelHandlerContext> ctxs = new ArrayList<>();

public class NettyServerHandler extends SimpleChannelInboundHandler<Object> {

/**
* 服务器接收客户端请求
*
* @param ctx
* @param data
* @throws Exception
*/
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object data)
throws Exception {
JSONObject clientMsg = getData(data);
String type = clientMsg.getString("type");
switch (type) {
case type_producer:
producer(clientMsg);
break;
case type_consumer:
consumer(ctx);
break;
default:
break;
}
}

private void consumer(ChannelHandlerContext ctx) {
// 保存消费者连接
ctxs.add(ctx);
// 主动拉取mq服务器端缓存中没有被消费的消息
String data = msgs.poll();
if (StringUtils.isEmpty(data)) {
return;
}
// 将该消息发送给消费者
byte[] req = data.getBytes();
ByteBuf firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
ctx.writeAndFlush(firstMSG);
}

private void producer(JSONObject clientMsg) {
// 缓存生产者投递 消息
String msg = clientMsg.getString("msg");
msgs.offer(msg);

//需要将该消息推送消费者
ctxs.forEach((ctx) -> {
// 将该消息发送给消费者
String data = msgs.poll();
if (data == null) {
return;
}
byte[] req = data.getBytes();
ByteBuf firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
ctx.writeAndFlush(firstMSG);
});
}

private JSONObject getData(Object data) throws UnsupportedEncodingException {
ByteBuf buf = (ByteBuf) data;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
return JSONObject.parseObject(body);
}


@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {

ctx.close();
}
}
}

RabbitMQ概述

VirtualHost:分类,可以划分为多个空间,防止出现重名队列。如会员virtualhost和普通用户virtualhost。

image-20210524114536780
image-20210524120727764

5672: MQ内部生产者消费者通讯的端口号

15672:MQ控制台管理平台Http协议端口号

25672:MQ集群通信端口号

RabbitMQ如何保证消息不丢失

默认情况下,MQ服务器端都会对队列种的消息进行持久化

  • 生产者角度:确保消息投递到MQ服务器端成功

消息ack确认机制

image-20210524125133355

缺点:如果使用同步机制,会影响效率,如果服务器端一直没有回复确认会导致生产者阻塞。

异步:生产者投递消息后不被阻塞,使用观察者模式(监听),等待服务器端的回调。

另外还可以设置超时重传,类似tcp。

  • 消费者角度:确保消息消费成功

RabbitMQ如果消费者消费成功通知服务器端会立即删除该条消息,但是Kafka不会立即删除,会继续缓存在队列中。

Kafka消息删除与消费者是否消费无关,与数据最长保存时间以及数据最大量有关

log.retention.hours=48 #数据最多保存48小时 log.retention.bytes=1073741824 #数据最多1G

kafka中记录了消费者消费消息的偏移量,记录应该从哪一条消息开始消费

Channel的Confirm模式

生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理;

confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息;

开启confirm模式的方式

生产者通过调用channel.confirmSelect()方法将channel设置为confirm模式,(注意一点,已经在transaction事务模式的channel是不能再设置成confirm模式的,即这两种模式是不能共存的),如果没有设置no-wait标志的话,broker会返回confirm.select-ok表示同意发送者将当前channel信道设置为confirm模式(从目前RabbitMQ最新版本3.6来看,如果调用了channel.confirmSelect方法,默认情况下是直接将no-wait设置成false的,也就是默认情况下broker是必须回传confirm.select-ok的,而且我也没找到我们自己能够设置no-wait标志的方法);

(1):普通confirm模式,每发送一条消息,调用waitForConfirms()方法等待服务端confirm,这实际上是一种串行的confirm,每publish一条消息之后就等待服务端confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传;

(2):批量confirm模式,每发送一批消息之后,调用waitForConfirms()方法,等待服务端confirm,这种批量确认的模式极大的提高了confirm效率,但是如果一旦出现confirm返回false或者超时的情况,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息,如果这种情况频繁发生的话,效率也会不升反降;

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class RabbitMQConnection {
public static Connection getConnection() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/retain");
Connection connection = null;
try {
connection = factory.newConnection();

} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return connection;
}
}

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Producer {
static Channel channel = null;
public static final String QUEUE_NAME = "ack";

public static void main(String[] args) {
Connection connection = RabbitMQConnection.getConnection();
try {
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
String msg = "宋大佬nb";
// 将当前channel设置为confirm模式
channel.confirmSelect();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
// 等待服务端回调(同步)
boolean ack = channel.waitForConfirms();
if (ack) {
System.out.println("消息投递到服务端成功");
}else {
System.out.println("消息投递到服务端失败");

}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

验证生产者是否投递到服务端成功:

image-20210524131747182
image-20210524131757514

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Consumer {
public static final String QUEUE_NAME = "ack";

public static void main(String[] args) throws IOException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消费者获得消息:" + msg);
// 消费成功回调,通知服务端删除该条消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 设置autoAck为false
channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
}
}

验证消费者消费后回调结果:

image-20210524132413217
image-20210524132451489

结果显示消费成功已经被删除。

事务消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 事务消息
* @param args
*/
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQConnection.getConnection();
try {
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
String msg = "宋大佬nb";
// 开启事务消息模式
channel.txSelect();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
// 提交事务
int i = 1/0; // 模拟出错,会进行回滚
channel.txCommit();
System.out.println("消息投递到服务端成功");
} catch (Exception e) {
if (channel!=null) {
System.out.println("消息投递到服务端失败");
channel.txRollback();
}
e.printStackTrace();
}
}
}
image-20210524133248449
image-20210524133306602

结果事务进行回滚,队列中没有消息。

消费者均摊消息的缺点

image-20210524133452700

如果采用传统的公平队列,服务端会将消息均分给每个消费者。

假设两个消费者都消费5条消息,可能会存在消费者1先消费完后进行等待,消费者2还没有消费完的情况。所以采用能者多劳的模式。

解决方法:工作队列模式

注意:msg消息传对象时,首先要实现序列化接口,然后必须保证对象的全类名相等,否则会出现反序列化失败异常。

生产者如何获取消费结果

  1. 根据业务来定

    如果结果是数据库新增数据,那么根据自己生产的全局唯一id查询是否存在这条记录,存在则表示消费成功。但是这样会给数据库增加压力。

  2. RocketMQ自带全局消息id,能够根据全局消息id获取消费结果

    原理:生产者投递消息到mq服务器,mq服务器端会返回一个全局的消息id,消费者消费该消息成功之后,会给服务端发通知标记消息消费成功。

    生产者获取到该全局消息id,每隔2s调用mq接口查询是否被消费成功。

RabbitMQ死信队列

死信队列俗称备胎队列,消息中间件由于某种原因拒收该消息后,可以转移到死信队列中释放。

产生死信队列的原因

  • 消息已经过期
  • 队列达到最大长度
  • 消费者多次消费该消息失败,则转移到死信队列
image-20210524160044245

死信队列架构原理

死信队列和普通队列的区别不大

都有自己独立的交换机和路由key、队列和消费者。

区别:

  1. 先投递到普通交换机,普通交换机投递到普通队列缓存起来,被普通消费者消费。
  2. 如果一直没有消费者消费,将消息转移到死信交换机中,投递到死信队列,被死信消费者消费。

相当于死信队列是另外一个普通队列,目的是防止出现正式消费者挂掉后没有消费者去消费消息,出现意外。

死信队列消费者是正式消费者的后援,正式消费者挂了,死信消费者就上。

死信队列应用场景

  1. 30分种订单超时设计
    1. Redis过期key
    2. 死信延迟队列:创建普通队列没有对应消费者消费,在30分钟后会将消息转移到死信备胎消费者实现消费。死信消费者会根据订单号码查询是否支付,没有支付则回滚库存操作。

RabbitMQ消息幂等问题

幂等:重复,保证数据结果唯一

消息自动重试机制

当消费者处理业务代码的时候,抛出异常则mq会自动触发重试机制。

默认情况下是无限次数重试。同时如果在报异常之前执行了业务操作,同时还会因为重试被执行多次。

需要人为指定重试次数

1
2
3
4
5
6
7
8
9
listener:
simple:
retry:
#开启重试
enabled: true
# 最大重试次数
max-attempts: 5
# 重试间隔
initial-interbal: 3000

什么情况下需要实现重试策略?

  • 消费者获取消息后调用第三方接口失败了,是否需要重试?
    • 需要,因为重试多次可能会调用成功。
  • 消费者获取消息后,因为代码问题造成数据异常,是否需要重试?
    • 不需要,就算重试多次仍然会失败。可以用日志记录该条消息存放起来,存放在数据库db种,后期通过定时任务或者人工实现消息补偿,需要发布消费者版本。
    • 也可将该消息存放在死信队列种,单独写一个死信消费者进行消费。

如何保证RPC接口幂等性问题

全局id

生成全局id进行判断,但是会存在问题:

  • 如果多个请求同时阻塞,还是会插入多条数据。
  • 线程安全问题

实际上最终还需要在数据库层面进行控制

如insert操作:使用唯一主键约束

update操作:使用乐观锁机制

锁的机制

消费者手动ack模式

添加acknowledge-mode: manual

如果需要重试,抛出异常即可触发mq自动重试机制。

如果需要中止重试,或正常ack,使用channel.basicAck()进行通知消费者已经收到消息


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!