消息中间件——Kafka—无消息丢失和幂等性(三)

无消息丢失

首先要先清楚大部分情况下消息丢失是在什么情况下发生的。

生产者丢失

producer客户端有一个acks的配置:

  • 这个配置为0的时候,producer是发送之后不管的,这个时候就很有可能因为网络等原因造成数据丢失,所以应该尽量避免。
  • 但是将ack设置为1就没问题了吗,那也不一定,因为有可能在leader副本接收到数据,但还没同步给其他副本的时候就挂掉了,这时候数据也是丢失了。
  • 要达到最严格的无消息丢失配置,应该是要将acks的参数设置为-1(也就是all),并且将 min.insync.replicas 配置项调高到大于1(最小ISR个数)。

当然生产者还需要以异步获取发送结果的方式发送数据,min.insync.replicas,而当数据发送丢失的时候,就可以进行手动重发或其他操作,从而确保生产者发送成功。

kafka内部丢失

  1. 副本数量

    replication.factor配置参数代表副本数量,默认为1。实际上我们需要将其设置为大于1,防止磁盘损坏,数据丢失。

  2. leader选举

    unclean.leader.election.enable 参数,这个参数是在主副本挂掉,然后在ISR集合中没有副本可以成为leader的时候,要不要让进度比较慢的副本成为leader的。

    不用多说,让进度比较慢的副本成为leader,肯定是要丢数据的。虽然可能会提高一些可用性,但如果你的业务场景丢失数据更加不能忍受,那还是将unclean.leader.election.enable设置为false吧。

消费者丢失

消费者丢失的问题主要跟 offset 处理不当有关。

消费者位移提交有一个参数,enable.auto.commit,默认是true,决定是否要让消费者自动提交位移。如果开启,那么consumer每次都是先提交位移,再进行消费,这样处理的话,好处是简单,坏处就是漏消费数据。

如果设置为false,改为手动提交位移,在每次消费完之后再手动提交位移信息。又会造成重复消费的问题。

无消息丢失配置总结

  • producer的acks设置位-1,同时min.insync.replicas设置大于1。并且使用带有回调的producer api发生消息。
  • 默认副本数replication.factor设置为大于1,或者创建topic的时候指定大于1的副本数。
  • unclean.leader.election.enable 设置为false,防止定期副本leader重选举
  • 消费者端,自动提交位移enable.auto.commit设置为false。在消费完后手动提交位移。

幂等

幂等这个词最早起源于函数式编程,意思是一个函数无论执行多少次都会返回一样的结果。比如说让一个数加1就不是幂等的,而让一个数取整就是幂等的。因为这个特性所以幂等的函数适用于并发的场景下。

在kafka中,幂等性意味着一个消息无论重复多少次,都会被当作一个消息来持久化处理。及 精确一次(exactly once)。要实现 exactly once,就不得不提到至多一次(at most once)和至少一次(at least once)。

最多一次就是保证一条消息只发送一次,这个其实最简单,异步发送一次然后不管就可以,缺点是容易丢数据,所以一般不采用。至少一次语义是kafka默认提供的语义,它保证每条消息都能至少接收并处理一次,缺点是可能有重复数据。

前面有介绍过acks机制,当设置producer客户端的acks是1的时候,broker接收到消息就会跟producer确认。但producer发送一条消息后,可能因为网络原因消息超时未达,这时候producer客户端会选择重发,broker回应接收到消息,但很可能最开始发送的消息延迟到达,就会造成消息重复接收。

幂等的生产者

kafka的producer默认是支持最少一次语义,也就是说不是幂等的,这样在一些比如支付等要求精确数据的场景会出现问题。

在0.11.0后,kafka提供了让producer支持幂等的配置操作。即:

props.put(“enable.idempotence”, ture)

在创建producer客户端的时候,添加这一行配置,producer就变成幂等的了。注意开启幂等性的时候,acks就自动是“all”了,如果这时候手动将ackss设置为0,那么会报错。其底层实现其实也很简单,就是对每条消息生成一个id值,broker会根据这个id值进行去重,从而实现幂等,这样一来就能够实现精确一次的语义了。

注意:单纯的设置将acks设置为all是没用的,重试次数 retries>0(为防止消息丢失)的话还是会有重复消息出现。

但是!幂等的producery也并非万能。有两个主要是缺陷:

  • 幂等性的producer仅做到单分区上的幂等性,即单分区消息不重复,多分区无法保证幂等性。
  • 只能保持单会话的幂等性,无法实现跨会话的幂等性,也就是说如果producer挂掉再重启,无法保证两个会话间的幂等(新会话可能会重发)。因为broker端无法获取之前的状态信息,所以无法实现跨会话的幂等。

所以需要事务producer

1
2
3
4
5
6
7
8
9
10
11
12
13
//初始化事务
producer.initTransactions();
try {
//开启一个事务
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
//提交
producer.commitTransaction();
} catch (KafkaException e) {
//出现异常的时候,终止事务
producer.abortTransaction();
}

但无论开启幂等还是事务的特性,都会对性能有一定影响,这是必然的。

消费者的幂等性

如果确实需要保证consumer的幂等,可以对每条消息维持一个全局的id,每次消费进行去重,当然耗费这么多的资源来实现exactly once的消费到底值不值,那就得看具体业务了。

文章目录
  1. 1. 无消息丢失
    1. 1.1. 生产者丢失
    2. 1.2. kafka内部丢失
    3. 1.3. 消费者丢失
    4. 1.4. 无消息丢失配置总结
    5. 1.5. 幂等
    6. 1.6. 幂等的生产者
    7. 1.7. 消费者的幂等性
|