1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| @Configuration @Slf4j public class RabbitConfig {
@Autowired private CachingConnectionFactory connectionFactory;
@Autowired private MsgLogService msgLogService;
@Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(converter());
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("消息成功发送到Exchange"); String msgId = correlationData.getId(); msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_SUCCESS); } else { log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause); } });
rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message); });
return rabbitTemplate; }
@Bean public Jackson2JsonMessageConverter converter() { return new Jackson2JsonMessageConverter(); } public static final String MAIL_QUEUE_NAME = "mail.queue"; public static final String MAIL_EXCHANGE_NAME = "mail.exchange"; public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";
@Bean public Queue mailQueue() { return new Queue(MAIL_QUEUE_NAME, true); }
@Bean public DirectExchange mailExchange() { return new DirectExchange(MAIL_EXCHANGE_NAME, true, false); }
@Bean public Binding mailBinding() { return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME); } }
|