消息中间件——pull和push

常用的消息中间件支持模型

中间件 push模型 pull模型
RabbitMQ 支持 支持
Kafka 支持(只有)
RocketMQ 支持 支持

两种模型优缺点对比

  • 所谓 Push 模型,即当 Producer 发出的消息到达后,服务端马上将这条消息投递给 Consumer;
  • 而 Pull 则是服务端收到这条消息后什么也不做,只是等着 Consumer 主动到自己这里来读,即 Consumer 这里有一个“拉取”的动作。

Push模型优缺点

Push模型优点

实时(因为服务端Broker一旦收到消息,就会发送给消费者,不管消费这准备好没有,消费者是死是活,缓存到消费端的BlockingQueue中)

Push缺点

  1. 消息保存在服务端broker,容易造成消息堆积。
  2. 服务端broker需要维护每次传输状态,遇到问题需要重试。
  3. 服务端broker需要依据订阅者消费能力做流控(流转机制)。

Pull模型优缺点

Pull模型优点
保存在消费端,获取消息方便。(什么保存在消费端)
[2] 传输失败,不需要重试。(如何理解?)
[3] 消费端可以根据自身消费能力决定是否pull(流转机制) (这个好理解)

Pull缺点
默认的短轮询方式的实时性依赖于pull间隔时间,间隔越大,实时性越低,长轮询方式和push一致。(默认的端轮训指的是什么? 指的当长时间没有消息时,消费端实现的间隔时间去服务端轮训消息的过程)

SpringBoot——RabittMQ(二十四)

pom

1
2
3
4
5
<!--mq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启confirms回调 P -> Exchange
spring.rabbitmq.publisher-confirms=true
# 开启returnedMessage回调 Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 设置手动确认(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=100

# mail
spring.mail.host=smtp.163.com
spring.mail.username=18621142249@163.com
spring.mail.password=123456wangzai
spring.mail.from=18621142249@163.com
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true

RabbitConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Configuration
@Slf4j
public class RabbitConfig {

@Autowired
private CachingConnectionFactory connectionFactory;

@Autowired
private MsgLogService msgLogService;

@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(converter());

// 消息是否成功发送到Exchange
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息成功发送到Exchange");
String msgId = correlationData.getId();
msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_SUCCESS);
} else {
log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
}
});

// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
rabbitTemplate.setMandatory(true);
// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
});

return rabbitTemplate;
}

@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
// 发送邮件
public static final String MAIL_QUEUE_NAME = "mail.queue";
public static final String MAIL_EXCHANGE_NAME = "mail.exchange";
public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";

@Bean
public Queue mailQueue() {
return new Queue(MAIL_QUEUE_NAME, true);
}

@Bean
public DirectExchange mailExchange() {
return new DirectExchange(MAIL_EXCHANGE_NAME, true, false);
}

@Bean
public Binding mailBinding() {
return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);
}
}

MailUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Component
@Slf4j
public class MailUtil {

@Value("${spring.mail.from}")
private String from;

@Autowired
private JavaMailSender mailSender;

/**
* 发送简单邮件
*
* @param mail
*/
public boolean send(Mail mail) {
String to = mail.getTo();// 目标邮箱
String title = mail.getTitle();// 邮件标题
String content = mail.getContent();// 邮件正文

SimpleMailMessage message = new SimpleMailMessage();
message.setFrom(from);
message.setTo(to);
message.setSubject(title);
message.setText(content);

try {
mailSender.send(message);
log.info("邮件发送成功");
return true;
} catch (MailException e) {
log.error("邮件发送失败, to: {}, title: {}", to, title, e);
return false;
}
}

}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Service
public class TestServiceImpl implements TestService {

@Autowired
private MsgLogMapper msgLogMapper;

@Autowired
private RabbitTemplate rabbitTemplate;

@Override
public ServerResponse send(Mail mail) {
String msgId = RandomUtil.UUID32();
mail.setMsgId(msgId);

MsgLog msgLog = new MsgLog(msgId, mail, RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME);
msgLogMapper.insert(msgLog);// 消息入库

CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME, MessageHelper.objToMsg(mail), correlationData);// 发送消息

return ServerResponse.success(ResponseCode.MAIL_SEND_SUCCESS.getMsg());
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Component
@Slf4j
public class MailConsumer {

@Autowired
private MsgLogService msgLogService;

@Autowired
private MailUtil mailUtil;

@RabbitListener(queues = RabbitConfig.MAIL_QUEUE_NAME)
public void consume(Message message, Channel channel) throws IOException {
Mail mail = MessageHelper.msgToObj(message, Mail.class);
log.info("收到消息: {}", mail.toString());

String msgId = mail.getMsgId();

MsgLog msgLog = msgLogService.selectByMsgId(msgId);
if (null == msgLog || msgLog.getStatus().equals(Constant.MsgLogStatus.CONSUMED_SUCCESS)) {// 消费幂等性
log.info("重复消费, msgId: {}", msgId);
return;
}

MessageProperties properties = message.getMessageProperties();
long tag = properties.getDeliveryTag();

boolean success = mailUtil.send(mail);
if (success) {
msgLogService.updateStatus(msgId, Constant.MsgLogStatus.CONSUMED_SUCCESS);
channel.basicAck(tag, false);// 消费确认
} else {
channel.basicNack(tag, false, true);
}
}

}

ResendMsg定时任务重新投递发送失败的消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Component
@Slf4j
public class ResendMsg {

@Autowired
private MsgLogService msgLogService;

@Autowired
private RabbitTemplate rabbitTemplate;

// 最大投递次数
private static final int MAX_TRY_COUNT = 3;

/**
* 每30s拉取投递失败的消息, 重新投递
*/
@Scheduled(cron = "0/30 * * * * ?")
public void resend() {
log.info("开始执行定时任务(重新投递消息)");

List<MsgLog> msgLogs = msgLogService.selectTimeoutMsg();
msgLogs.forEach(msgLog -> {
String msgId = msgLog.getMsgId();
if (msgLog.getTryCount() >= MAX_TRY_COUNT) {
msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL);
log.info("超过最大重试次数, 消息投递失败, msgId: {}", msgId);
} else {
msgLogService.updateTryCount(msgId, msgLog.getNextTryTime());// 投递次数+1

CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData);// 重新投递

log.info("第 " + (msgLog.getTryCount() + 1) + " 次重新投递消息");
}
});

log.info("定时任务执行结束(重新投递消息)");
}
}
|