RabbitMQ的次要感化根本上能够用8个字归纳综合,削峰挖谷同步解耦。可是引进MQ我们也不能不思索引进MQ后带去的一些成绩,如动静丧失。 正在一些营业场景纷歧样,处置方法也便纷歧样,好比收短疑,日记搜集我们次要看吞吐量以是抵消息丧失容忍度较下,那类场景根本上不消花太多工夫正在动静丧失成绩擅埽 别的一种,如我们用MQ去做散布式事件,绝保计较,提秤弈计较,那类营业抵消息丧失容忍度较底,以是我们必然要思索动静丧失的成绩。此次分享的内容实刘帽看最年夜限定的避免动静丧失,逆带提一下动静的重收战反复消耗。 RabbitMQ 模子图
ConfirmCallback战ReturnCallback正在那个里我们次要完成了ConfirmCallback战ReturnCallback两个接心。那两个接心次要是雍么收收动静后回调的。由于rabbit收收动静是尽管收,至于收出收胜利,收收办法不论。 正在那里必然要把那两个开闭翻开, publisher-confirms="true" publisher-returns="true"。 消费者端利用ConfirmCallback战ReturnCallback回调机造,最年夜限度的包管动静没有丧失,对本有CorrelationData类停止扩大,去完成动静的重收,详细请看源码。 动静的日记链路跟踪利用MQ去解耦效劳,同步化处置一些庞大耗时逻辑,可是也带去了一个成绩。因为同步化当前,排查询题便很没有便利了,底子没有明白那个动静甚么时分消耗,消耗的日记也很欠好排查。以是引进了Slf4j MDC机造将主线程的日记链路战动静的日记链路连起去,便利MQ成绩的排查。 RabbitSender
- import com.alibaba.fastjson.JSON;
- import com.wlqq.insurance.common.enums.MetricNameEnum;
- import com.wlqq.insurance.common.enums.SystemTypeEnum;
- import com.wlqq.insurance.common.log.core.FisLoggerFactory;
- import com.wlqq.insurance.common.mq.CorrelationData;
- import com.wlqq.insurance.common.service.AlertService;
- import org.slf4j.Logger;
- import org.slf4j.MDC;
- import org.springframework.amqp.AmqpException;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.InitializingBean;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.util.Assert;
- import org.springframework.util.StringUtils;
-
- import java.util.UUID;
-
- /**
- * Rabbit 收收动静
- *
- * @author yuhao.wang
- */
- public class RabbitSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback, InitializingBean {
- private final Logger logger = FisLoggerFactory.getLogger(RabbitSender.class);
-
- @Value("${mq.retry.count}")
- private int mqRetryCount;
-
- /**
- * 告警效劳
- */
- @Autowired
- private AlertService alertService;
-
- /**
- * Rabbit MQ 客户端
- */
- private RabbitTemplate rabbitTemplate;
-
- /**
- * 收收MQ动静,同步
- *
- * @param exchangeName 交流机称号
- * @param routingKey 路由称号
- * @param message 收收动静体
- */
- public void sendMessage(String exchangeName, String routingKey, com.wlqq.insurance.common.mq.message.Message message) {
- Assert.notNull(message, "message 动静体不克不及为NULL");
- Assert.notNull(exchangeName, "exchangeName 不克不及为NULL");
- Assert.notNull(routingKey, "routingKey 不克不及为NULL");
- // 获得CorrelationData工具
- CorrelationData correlationData = this.correlationData(message, message.getMessageId());
- correlationData.setExchange(exchangeName);
- correlationData.setRoutingKey(routingKey);
- correlationData.setMessage(message);
-
- logger.info("收收MQ动静,动静ID:{},动静体:{}, exchangeName:{}, routingKey:{}",
- correlationData.getId(), JSON.toJSONString(message), exchangeName, routingKey);
- // 收收动静
- this.convertAndSend(exchangeName, routingKey, message, correlationData);
- }
-
- /**
- * RPC方法,收收MQ动静
- *
- * @param exchangeName 交流机称号
- * @param routingKey 路由称号
- * @param message 收收动静体
- */
- public void sendAndReceiveMessage(String exchangeName, String routingKey, com.wlqq.insurance.common.mq.message.Message message) {
- Assert.notNull(message, "message 动静体不克不及为NULL");
- Assert.notNull(exchangeName, "exchangeName 不克不及为NULL");
- Assert.notNull(routingKey, "routingKey 不克不及为NULL");
- // 获得CorrelationData工具
- CorrelationData correlationData = this.correlationData(message, message.getMessageId());
- correlationData.setExchange(exchangeName);
- correlationData.setRoutingKey(routingKey);
- correlationData.setMessage(message);
-
- logger.info("收收MQ动静,动静ID:{},动静体:{}, exchangeName:{}, routingKey:{}",
- correlationData.getId(), JSON.toJSONString(message), exchangeName, routingKey);
-
- rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, message);
- }
-
- /**
- * 用于完成动静收收到RabbitMQ交流器后领受ack回调。
- * 假如动静收收确瘸搂败便停止重试。
- *
- * @param correlationData
- * @param ack
- * @param cause
- */
- @Override
- public void confirm(org.springframework.amqp.rabbit.support.CorrelationData correlationData, boolean ack, String cause) {
- CorrelationData correlationDataExtends = null;
- if (correlationData instanceof CorrelationData) {
- correlationDataExtends = (CorrelationData) correlationData;
- if (correlationDataExtends.getMdcContainer() != null) {
- // 日记链路跟踪
- MDC.setContextMap(correlationDataExtends.getMdcContainer());
- }
- }
-
- // 动静回调确瘸搂败处置
- if (!ack) {
- if (correlationDataExtends != null) {
- //动静收收失利,便停止重试,重试事后借不克不及胜利便记载到数据库
- if (correlationDataExtends.getRetryCount() < mqRetryCount) {
- logger.info("MQ动静收收失利,动静重收,动静ID:{},重收次数:{},动静体:{}", correlationDataExtends.getId(),
- correlationDataExtends.getRetryCount(), JSON.toJSONString(correlationDataExtends.getMessage()));
-
- // 将重试次数减一
- correlationDataExtends.setRetryCount(correlationDataExtends.getRetryCount() + 1);
-
- // 重收收动静
- this.convertAndSend(correlationDataExtends.getExchange(), correlationDataExtends.getRoutingKey(),
- correlationDataExtends.getMessage(), correlationDataExtends);
- } else {
- //动静重试收收失利,将动静放到数据库等候补收
- logger.error("MQ动静重收失利,动静ID:{},动静体:{}", correlationData.getId(),
- JSON.toJSONString(correlationDataExtends.getMessage()));
-
- alertService.postAlert(MetricNameEnum.SYSTEM_INTERNAL_EXCEPTION, SystemTypeEnum.MQ.name(),
- correlationDataExtends.getExchange(), null);
- }
- }
- } else {
- logger.info("动静收收胜利,动静ID:{}", correlationData.getId());
- }
- }
-
- /**
- * 用于完成动静收收到RabbitMQ交流器,但无响应行列取交流器绑按时的回调。
- * 正在脑裂的状况下会呈现这类状况。
- */
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
- // 反序量动静
- Object msg = rabbitTemplate.getMessageConverter().fromMessage(message);
- if (msg instanceof com.wlqq.insurance.common.mq.message.Message) {
- // 日记链路跟踪
- MDC.setContextMap(((com.wlqq.insurance.common.mq.message.Message) msg).getMdcContainer());
- }
-
- logger.error("MQ动静收收失利,replyCode:{}, replyText:{},exchange:{},routingKey:{},动静体:{}",
- replyCode, replyText, exchange, routingKey, JSON.toJSONString(msg));
-
- alertService.postAlert(MetricNameEnum.SYSTEM_INTERNAL_EXCEPTION, SystemTypeEnum.MQ.name(), exchange, null);
- }
-
- /**
- * 动静相干数据(动静ID)
- *
- * @param message 动静体
- * @param messageId 动静ID
- * @return
- */
- private CorrelationData correlationData(Object message, String messageId) {
- // 动静ID默许利用UUID
- if (StringUtils.isEmpty(messageId)) {
- messageId = UUID.randomUUID().toString();
- }
- return new CorrelationData(messageId, message);
- }
-
- /**
- * 收收动静
- *
- * @param exchange 交流机称号
- * @param routingKey 路由key
- * @param message 动静内容
- * @param correlationData 动静相干数据(动静ID)
- * @throws AmqpException
- */
- private void convertAndSend(String exchange, String routingKey, final Object message, CorrelationData correlationData) {
- try {
- rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
- } catch (Exception e) {
- logger.error("MQ动静收收非常,动静ID:{},动静体:{}, exchangeName:{}, routingKey:{}",
- correlationData.getId(), JSON.toJSONString(message), exchange, routingKey, e);
-
- alertService.postAlert(MetricNameEnum.SYSTEM_INTERNAL_EXCEPTION, SystemTypeEnum.MQ.name(), exchange, null);
- }
- }
-
- @Override
- public void afterPropertiesSet() throws Exception {
- rabbitTemplate.setConfirmCallback(this);
- rabbitTemplate.setReturnCallback(this);
- }
-
- public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
- this.rabbitTemplate = rabbitTemplate;
- }
- }
赶钙代码
CorrelationData- import lombok.Data;
- import org.slf4j.MDC;
-
- import java.util.Map;
-
- /**
- * 收收动静当编闭数据
- *
- * @author yuhao.wang
- */
- @Data
- public class CorrelationData extends org.springframework.amqp.rabbit.support.CorrelationData {
-
-
- /**
- * MDC容器
- * 获得女线程MDC中的内容,做日记链路
- */
- private Map<String, String> mdcContainer = MDC.getCopyOfContextMap();
-
- /**
- * 动静体
- */
- private volatile Object message;
-
- /**
- * 交流机称号
- */
- private String exchange;
-
- /**
- * 路由key
- */
- private String routingKey;
-
- /**
- * 重试次数
- */
- private int retryCount = 0;
-
- public CorrelationData(String id) {
- super(id);
- }
-
- public CorrelationData(String id, Object data) {
- this(id);
- this.message = data;
- }
- }
赶钙代码
Message- /**
- * MQ动静的女类动静体
- *
- * @author yuhao.wang
- */
- @Data
- public class Message implements Serializable {
- private static final long serialVersionUID = -4731326195678504565L;
-
- /**
- * MDC容器
- * 获得女线程MDC中的内容,做日记链路
- */
- private Map<String, String> mdcContainer = MDC.getCopyOfContextMap();
-
- /**
- * 动静ID(动静的独一标示)
- */
- private String messageId;
- }
赶钙代码
AbstractConsumer- /**
- * 默许消耗者
- *
- * @author yuhao.wang3
- */
- public abstract class AbstractConsumer implements MessageListener {
- private static final Logger LOGGER = FisLoggerFactory.getLogger(AbstractConsumer.class);
-
- @Override
- public void onMessage(Message msg) {
- String body = null;
-
- try {
- // 日记链路跟踪逻辑
- body = new String(msg.getBody(), "utf-8");
- DefaultMessage message = JSON.parseObject(body, DefaultMessage.class);
- Map<String, String> container = message.getMdcContainer();
- if (container != null) {
- // 日记链路跟踪
- MDC.setContextMap(message.getMdcContainer());
- }
- } catch (Exception e) {
- LOGGER.warn("出有找到MQ动静日记链路数据,没法做日记链路追辟");
- }
-
- try {
- // 处置动静逻辑
- doMessage(msg);
- LOGGER.info("胜利处置MQ动静, 动静体:{}", body);
- } catch (Exception e) {
- LOGGER.error("处置MQ动静非常 {}, 动静体:{}", JSON.toJSONString(msg), body, e);
- }
- }
-
- /**
- * 处置动静的完成办法
- *
- * @param msg
- */
- public abstract void doMessage(Message msg);
- }
赶钙代码
|