消息中间件——RabbitMQ—无消息丢失和幂等性(五)

消息无丢失?

生产者丢失?

RabbitMQ使⽤发送⽅确认模式,确保消息正确地发送到RabbitMQ。

发送⽅确认模式:将信道设置成confirm模式(发送⽅确认模式),则所有在信道上发布的消息

都会被指派⼀个唯⼀的ID。⼀旦消息被投递到⽬的队列后,或者消息被写⼊磁盘后(可持久化

的消息),信道会发送⼀个确认给⽣产者(包含消息唯⼀ID)。

如果RabbitMQ发⽣内部错误,从⽽导致消息丢失,会发送⼀条nack(not acknowledged,未确认)消息。

发送⽅确认模式是异步的,⽣产者应⽤程序在等待确认的同时,可以继续发送消息。当确认消息到达⽣产者应⽤程序,⽣产者应⽤程序的回调⽅法就会被触发来处理确认消息。

消费者丢失?

接收⽅消息确认机制:消费者接收每一条消息后都必须进行确认(ack确认机制)。只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。

这⾥并没有⽤到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer⾜够⻓的时间来处理消息。

特殊情况:

  • 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要根据bizId去重)
  • 如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息。

ack确认机制:

多个消费者同时收取消息,比如消息接收到一半的时候,一个消费者死掉了(逻辑复杂时间太长,超时了或者消费被停机或者网络断开链接),如何保证消息不丢?
这个使用就要使用Message acknowledgment 机制,就是消费端消费完成要通知服务端,服务端才把消息从内存删除。
这样就解决了,及时一个消费者出了问题,没有同步消息给服务端,还有其他的消费端去消费,保证了消息不丢的case。

RabbitMQ内部丢失?

  1. 镜像队列集群
  2. 消息持久化

幂等性

上面我们在保证无消息丢失的情况下,难免造成了重复消费的情况。

生产者幂等

在消息⽣产时,MQ内部针对每条⽣产者发送的消息⽣成⼀个inner-msg-id,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进⼊队列;

消费者幂等

在消息消费时,要求消息体中必须要有⼀个bizId(对于同⼀业务全局唯⼀,如⽀付ID、订单ID、帖⼦ID等)作为去重和幂等的依据,避免同⼀条消息被重复消费。

消息持久化

ACK确认机制

设置集群镜像模式

消息补偿机制

为什么还要消息补偿机制呢?难道消息还会丢失,没错,系统是在一个复杂的环境,不要想的太简单了,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题,
但是作为有追求的程序员来讲,要绝对保证我的系统的稳定性,有一种危机意识。

比如:持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,消息丢了,怎么办?

产线网络环境太复杂,所以不知数太多,消息补偿机制需要建立在消息要写入DB日志,发送日志,接受日志,两者的状态必须记录。

然后根据DB日志记录check 消息发送消费是否成功,不成功,进行消息补偿措施,重新发送消息处理。

消息确认

消息的确认做有很多法,其中包括事务机制、批量确认、异步确认等。

  1. 事务机制:我们在channel对象中可以看到 txSelect(),txCommit(),txrollback() 这些方法,分别对应着开启事务,提交事务,回滚。由于使用事务会造成生产者与Broker交互次数增加,造成性能资源的浪费,而且事务机制是阻塞的,在发送一条消息后需要等待RabbitMq回应,之后才能发送下一条,因此事务机制不提倡,大家在网上也很少看到RabbitMq使用事务进行消息确认的。

  2. 批量确认:批量其实是一个节约资源的操作,但是在RabbitMq中我们使用批量操作会造成消息重复消费,原因是批量操作是使客户端程序定期或者消息达到一定量,来调用方法等待Broker返回,这样其实是一个提高效率的做法,但是如果出现消息重发的情况,当前这批次的消息都需要重发,这就造成了重复消费,因此批量确认的操作性能没有提高反而下降。

  3. 异步确认:异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,笔者接触过RocketMq,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是怎么实现的。

我们大概分为两大类,发送方确认和接收方确认,其中发送方确认又分为生产者到交换器到确认和交换器到队列的确认。

发布者确认

ConfirmCallback

是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中。

我们需要在生产者的配置中添加下面配置,表示开启发布者确认

1
spring.rabbitmq.publisher-confirms=true

然后在生产者的Java配置类实现该接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Component

public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback{

@Autowired

private RabbitTemplate rabbitTemplate;



@PostConstruct

public void initRabbitTemplate() {

// 设置生产者消息确认

rabbitTemplate.setConfirmCallback(this);



}



/**

* 消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中

*

* @param correlationData

* @param b

* @param s

*/

@Override

public void confirm(@Nullable CorrelationData correlationData, boolean b, @Nullable String s) {

System.out.println("ack:[{}]" + b);

if (b) {

System.out.println("消息到达rabbitmq服务器");

} else {

System.out.println("消息可能未到达rabbitmq服务器");

}

}

ReturnCallback

通过实现 ReturnCallback 接口,启动消息失败返回,此接口是在交换器路由不到队列时触发回调,该方法可以不使用,因为交换器和队列是在代码里绑定的,如果消息成功投递到Broker后几乎不存在绑定队列失败,除非你代码写错了。

使用此接口需要在生产者配置中加入一下配置,表示发布者返回

1
spring.rabbitmq.publisher-returns=true

然后基于刚才的生产者Java配置里实现接口ReturnCallback

1
@Componentpublic class RabbitTemplateConfig implements  RabbitTemplate.ReturnCallback {    @Autowired    private RabbitTemplate rabbitTemplate;     @PostConstruct    public void initRabbitTemplate() {               rabbitTemplate.setReturnCallback(this);    }     /**     * 启动消息失败返回,比如路由不到队列时触发回调     *     * @param message     * @param i     * @param s     * @param s1     * @param s2     */    @Override    public void returnedMessage(Message message, int i, String s, String s1, String s2) {        System.out.println("消息主体 message : " + message);        System.out.println("消息主体 replyCode : " + i);        System.out.println("描述 replyText:" + s);        System.out.println("消息使用的交换器 exchange : " + s1);        System.out.println("消息使用的路由键 routing : " + s2);    }}

以上两段Java配置可以写在一个类里。

到此,我们完成了生产者的异步确认,我们可以在回调函数中对当前失败的消息进行补偿,这样保证了我们没有发送成功的数据也被观察到了,比如某某条数据需要发送到消费者消费,但是没有发送成功,这就需要你在此做一些其他操作喽,根据你具体业务来。

消息消费确认

消费者确认发生在监听队列的消费者处理业务失败,如,发生了异常,不符合要求的数据……,这些场景我们就需要手动处理,比如重新发送或者丢弃。

我们知道ACK是默认是自动的,自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,加入你用回滚了也只是保证了数据的一致性,但是消息还是丢了,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。

消息确认模式有:

AcknowledgeMode.NONE:自动确认
AcknowledgeMode.AUTO:根据情况确认
AcknowledgeMode.MANUAL:手动确认

需要在消费者的配置里加手动 ack(确认)则需要修改确认模式为 manual,手动确认的方式有很多,可以在RabbitListenerContainerFactory类进行设置。

1
spring.rabbitmq.listener.direct.acknowledge-mode=MANUAL

消费者类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

@Service

public class AsyncConfirmConsumer {

@RabbitListener(queues = "confirm_queue")

@RabbitHandler

public void asyncConfirm(Order order, Message message, Channel channel) throws IOException {



try {

System.out.println("消费消息:" + order.getName());

// int a = 1 / 0;

channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);

System.out.println("消费消息确认" + message.getMessageProperties().getConsumerQueue() + ",接收到了回调方法");

} catch (Exception e) {

//重新回到队列

// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

// System.out.println("尝试重发:" + message.getMessageProperties().getConsumerQueue());

//requeue =true 重回队列,false 丢弃

channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);

// TODO 该消息已经导致异常,重发无意义,自己实现补偿机制





}
}
}
文章目录
  1. 1. 消息无丢失?
    1. 1.1. 生产者丢失?
    2. 1.2. 消费者丢失?
    3. 1.3. RabbitMQ内部丢失?
  2. 2. 幂等性
    1. 2.1. 生产者幂等
    2. 2.2. 消费者幂等
  3. 3. 消息持久化
  4. 4. ACK确认机制
  5. 5. 设置集群镜像模式
  6. 6. 消息补偿机制
  7. 7. 消息确认
    1. 7.1. 发布者确认
    2. 7.2. 消息消费确认
|