SpringBoot——RabittMQ(二十四)
pom 1 2 3 4 5 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency >
配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 spring.rabbitmq.host =localhost spring.rabbitmq.port =5672 spring.rabbitmq.username =guest spring.rabbitmq.password =guest spring.rabbitmq.publisher-confirms =true spring.rabbitmq.publisher-returns =true spring.rabbitmq.listener.simple.acknowledge-mode =manual spring.rabbitmq.listener.simple.prefetch =100 spring.mail.host =smtp.163.com spring.mail.username =18621142249@163.com spring.mail.password =123456wangzai spring.mail.from =18621142249@163.com spring.mail.properties.mail.smtp.auth =true spring.mail.properties.mail.smtp.starttls.enable =true spring.mail.properties.mail.smtp.starttls.required =true
RabbitConfig 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 @Configuration @Slf 4jpublic class RabbitConfig { @Autowired private CachingConnectionFactory connectionFactory; @Autowired private MsgLogService msgLogService; @Bean public RabbitTemplate rabbitTemplate () { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(converter()); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("消息成功发送到Exchange" ); String msgId = correlationData.getId(); msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_SUCCESS); } else { log.info("消息发送到Exchange失败, {}, cause: {}" , correlationData, cause); } }); rabbitTemplate.setMandatory(true ); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}" , exchange, routingKey, replyCode, replyText, message); }); return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter converter () { return new Jackson2JsonMessageConverter(); } public static final String MAIL_QUEUE_NAME = "mail.queue" ; public static final String MAIL_EXCHANGE_NAME = "mail.exchange" ; public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key" ; @Bean public Queue mailQueue () { return new Queue(MAIL_QUEUE_NAME, true ); } @Bean public DirectExchange mailExchange () { return new DirectExchange(MAIL_EXCHANGE_NAME, true , false ); } @Bean public Binding mailBinding () { return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME); } }
MailUtil 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Component @Slf 4jpublic class MailUtil { @Value ("${spring.mail.from}" ) private String from; @Autowired private JavaMailSender mailSender; public boolean send (Mail mail) { String to = mail.getTo(); String title = mail.getTitle(); String content = mail.getContent(); SimpleMailMessage message = new SimpleMailMessage(); message.setFrom(from); message.setTo(to); message.setSubject(title); message.setText(content); try { mailSender.send(message); log.info("邮件发送成功" ); return true ; } catch (MailException e) { log.error("邮件发送失败, to: {}, title: {}" , to, title, e); return false ; } } }
生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Service public class TestServiceImpl implements TestService { @Autowired private MsgLogMapper msgLogMapper; @Autowired private RabbitTemplate rabbitTemplate; @Override public ServerResponse send (Mail mail) { String msgId = RandomUtil.UUID32(); mail.setMsgId(msgId); MsgLog msgLog = new MsgLog(msgId, mail, RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME); msgLogMapper.insert(msgLog); CorrelationData correlationData = new CorrelationData(msgId); rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME, MessageHelper.objToMsg(mail), correlationData); return ServerResponse.success(ResponseCode.MAIL_SEND_SUCCESS.getMsg()); } }
消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Component @Slf 4jpublic class MailConsumer { @Autowired private MsgLogService msgLogService; @Autowired private MailUtil mailUtil; @RabbitListener (queues = RabbitConfig.MAIL_QUEUE_NAME) public void consume (Message message, Channel channel) throws IOException { Mail mail = MessageHelper.msgToObj(message, Mail.class ) ; log.info("收到消息: {}" , mail.toString()); String msgId = mail.getMsgId(); MsgLog msgLog = msgLogService.selectByMsgId(msgId); if (null == msgLog || msgLog.getStatus().equals(Constant.MsgLogStatus.CONSUMED_SUCCESS)) { log.info("重复消费, msgId: {}" , msgId); return ; } MessageProperties properties = message.getMessageProperties(); long tag = properties.getDeliveryTag(); boolean success = mailUtil.send(mail); if (success) { msgLogService.updateStatus(msgId, Constant.MsgLogStatus.CONSUMED_SUCCESS); channel.basicAck(tag, false ); } else { channel.basicNack(tag, false , true ); } } }
ResendMsg定时任务重新投递发送失败的消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Component @Slf 4jpublic class ResendMsg { @Autowired private MsgLogService msgLogService; @Autowired private RabbitTemplate rabbitTemplate; private static final int MAX_TRY_COUNT = 3 ; @Scheduled (cron = "0/30 * * * * ?" ) public void resend () { log.info("开始执行定时任务(重新投递消息)" ); List<MsgLog> msgLogs = msgLogService.selectTimeoutMsg(); msgLogs.forEach(msgLog -> { String msgId = msgLog.getMsgId(); if (msgLog.getTryCount() >= MAX_TRY_COUNT) { msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL); log.info("超过最大重试次数, 消息投递失败, msgId: {}" , msgId); } else { msgLogService.updateTryCount(msgId, msgLog.getNextTryTime()); CorrelationData correlationData = new CorrelationData(msgId); rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData); log.info("第 " + (msgLog.getTryCount() + 1 ) + " 次重新投递消息" ); } }); log.info("定时任务执行结束(重新投递消息)" ); } }