• 注册
  • 架构博客 架构博客 关注:0 内容:188

    【轻松上手 RocketMQ 专栏】事务消息与幂等性

  • 查看作者
  • 打赏作者
  • 当前位置: 职业司 > 架构 > 架构博客 > 正文
    • 架构博客
    • 这一讲,我们结合业务场景来讲讲事务消息以及消息的幂等性。

      1、业务场景

      在电商场景中,一般付款成功后,会给用户发放优惠券,一来给用户优惠,二来激励用户继续消费。

      上面的场景:在电商系统中,会出现付款成功后、准备发优惠券的时候,服务器宕机了。这个时候会造成用户成功付款,却没收到优惠券的情况。这种情况下,我们很容易想到用事务来保证付款和发优惠券的原子性即可:要么付款和发优惠券同时成功,要么同时失败,是不允许其他一个成功,另一个失败的。

      但上面,存在一种情况:付款和发优惠券高度耦合,这样子容易出现:发优惠券一直失败,会导致付款一直失败的场景。

      对于这种场景的解决方案:引入消息中间件MQ来解耦。

      • 1、支付订单
      • 2、发送支付消息
      • 3、消费支付消息
      • 4、发放优惠券

      上面这个是发放优惠券的流程图。

      但是上述流程中,存在MQ不可用、消息重复的异常情况,进而导致:

      • 产生付款成功,发优惠券失败
      • 优惠券重复发放

      怎样才能确保付款成功后进行优惠券发放并且不会重复呢?这需要引入事务消息和幂等性处理。

      2、事务消息

      首先我们来了解下事务消息。分布式事务是一种抽象的概念。

      那具体的实现呢?

      是有很多种实现的。

      在这里,主要介绍:RocketMQ的事务消息。

      事务消息的流程图

      流程步骤:
      • 1、生产者发送half消息
      • 2、MQ回复ACK确认消息
      • 3、执行本地事务:订单付款。如果订单付款成功,那么就给MQ发送commit消息。如果订单付款失败,就发送rollback消息
      • 4、如果步骤3发送消息失败,这个时候MQ的定时器会检查half消息。MQ回调方法,去检查本地事务的执行情况。如果执行成功,就返回commit消息。如果执行失败,就返回rollback消息。
      • 5、如果MQ收到的是commit消息,此时会把half消息复制到真正的topic中
      • 6、消费者对消息进行消费,下发优惠券

      3、如何使用事务消息

      上面,大概知道了事务消息的流程。

      接下来,要知道如何使用。

      还是以付款下发优惠券为例。

      3.1 发送half消息-MQ回复ACK确认消息


       @Override
          public void finishedOrder(String orderNo, String phoneNumber) {
              
              try {
                // 退房事务消息,topic:完成订单
              Message msg = new Message(orderFinishedTopic, JSON.toJSONString(orderInfo).getBytes(StandardCharsets.UTF_8));
              
                  // 发送half消息
                  TransactionSendResult transactionSendResult = orderFinishedTransactionMqProducer.sendMessageInTransaction(msg, null);
                  
              } catch (MQClientException e) {
                 
              }

          }

      3.2 执行本地事务:付款

      @Override
          public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            
              try {
                  // 修改订单的状态
                  orderService.payOrder();
                
                  // 成功 提交prepare消息
                  return LocalTransactionState.COMMIT_MESSAGE;
              } catch (Exception e) {
                  // 执行本地事务失败 回滚prepare消息
                  return LocalTransactionState.ROLLBACK_MESSAGE;
              }
          }

      3.3 MQ定时器回调查询half消息状态

      @Override
          public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            
              try {
                  //查询订单状态
                  Integer orderStatus = orderService.getOrderStatus();
                  if (Objects.equals(orderStatus, OrderStatusEnum.FINISHED.getStatus())) {          //返回commit消息
                      return LocalTransactionState.COMMIT_MESSAGE;
                  } else {
                      //返回rollback消息
                      return LocalTransactionState.ROLLBACK_MESSAGE;
                  }
              } catch (Exception e) {
                  // 查询订单状态失败
                  return LocalTransactionState.ROLLBACK_MESSAGE;
              }
          }

      3.4 消费者进行消费,下发优惠券

       @Bean(value = "orderFinishedConsumer")
          public DefaultMQPushConsumer finishedConsumer(@Qualifier(value = "orderFinishedMessageListener") OrderFinishedMessageListener orderFinishedMessageListener) throws MQClientException {
              DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(orderFinishedConsumerGroup);
              consumer.setNamesrvAddr(namesrvAddress);
              //topic:完成订单
              consumer.subscribe(orderFinishedTopic, "*");
              consumer.setMessageListener(orderFinishedMessageListener);
              consumer.start();
              return consumer;
          }
          
      监听器:OrderFinishedMessageListener
      @Override
          public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {            
                      //下发优惠券
                      couponService.distributeCoupon();

              }
              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          }

      4、知其然知其所以然

      你看完上面,已经知道如何使用事务消息。

      接下来,你需要了解其底层原理:看看源码(面试常问)

      step1:首先看发送half消息的代码:

      step2:进入代码里面:

      step3:其实就是默认调用了DefaultMQProducer#sendMessageInTransaction。

      public TransactionSendResult sendMessageInTransaction(final Message msg,
              ...省略一堆代码

              SendResult sendResult = null;
              // 给待发送消息添加属性,表名是一个事务消息,即半消息,这里设置为true。(这个属性后面会用到)
              MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
              MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
              try {
                  //发送消息--重点0
                  sendResult = this.send(msg);
              } catch (Exception e) {
                  throw new MQClientException("send message Exception", e);
              }

              LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
              Throwable localException = null;
              switch (sendResult.getSendStatus()) {
                  //消息发送成功
                  case SEND_OK: {
                      try {
                          if (sendResult.getTransactionId() != null) {
                              msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                          }
                          String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                          if (null != transactionId && !"".equals(transactionId)) {
                              msg.setTransactionId(transactionId);
                          }
                          if (null != localTransactionExecuter) {

                              localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                          } else if (transactionListener != null) {
                              log.debug("Used new transaction API");
                              //执行本地事务,executeLocalTransaction需要子类去具体实现
                              localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                          }
                          if (null == localTransactionState) {
                              localTransactionState = LocalTransactionState.UNKNOW;
                          }

                          if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                              log.info("executeLocalTransactionBranch return {}", localTransactionState);
                              log.info(msg.toString());
                          }
                      } catch (Throwable e) {
                          log.info("executeLocalTransactionBranch exception", e);
                          log.info(msg.toString());
                          localException = e;
                      }
                  }
                  break;
                  case FLUSH_DISK_TIMEOUT:
                  case FLUSH_SLAVE_TIMEOUT:
                  case SLAVE_NOT_AVAILABLE:
                      localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                      break;
                  default:
                      break;
              }

              try {
                  // 最后,给broker发送提交或者回滚事务的RPC请求
                  this.endTransaction(sendResult, localTransactionState, localException);
              } catch (Exception e) {
                  log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
              }
              // 组装结果返回
              TransactionSendResult transactionSendResult = new TransactionSendResult();
              transactionSendResult.setSendStatus(sendResult.getSendStatus());
              transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
              transactionSendResult.setMsgId(sendResult.getMsgId());
              transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
              transactionSendResult.setTransactionId(sendResult.getTransactionId());
              transactionSendResult.setLocalTransactionState(localTransactionState);
              return transactionSendResult;
          }

      上面的DefaultMQProducerImpl#sendMessageInTransaction方法主要流程:

      • 简单的数据校验
      • 给消息添加属性,表明这个事务消息
      • 发送消息,且返回消息的结果–重点0
      • 根据消息不同结果,进行不同的处理
      • 如果消息发送成功,那么就执行本地事务(付款),返回本地事务的结果–重点1
      • 最后,根据本地事务的结果,给broker发送Commit或rollback的消息–重点2

      上面我们简述了一个大概的流程。未涉及到太多细节,是对一个整体流程的了解。

      接下来,我们深入了解一些细节:

      我们先研究一下重点0:sendResult = this.send(msg); 我们点进去会发现,send的底层其实就是调用了DefaultMQProducerImpl#sendKernelImpl方法。

      step4:接着到SendMessageProcessor#sendMessage

      step5:事务消息,继续进入TransactionalMessageServiceImpl#prepareMessage–>TransactionalMessageBridge#putHalfMessage–>TransactionalMessageBridge#parseHalfMessageInner

      step6:接着,我们坐着研究一下重点1,即transactionListener.executeLocalTransaction(msg, arg);

      public interface TransactionListener {
          /**
           * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
           *
           * @param msg Half(prepare) message
           * @param arg Custom business parameter
           * @return Transaction state
           */
          LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

          /**
           * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
           * method will be invoked to get local transaction status.
           *
           * @param msg Check message
           * @return Transaction state
           */
          LocalTransactionState checkLocalTransaction(final MessageExt msg);
      }

      你会发现,这是一个接口,有2个方法,一个是执行本地事务executeLocalTransaction。另一个是检查本地事务checkLocalTransaction。这两个方法需要实现类去实现。

      比如:执行本地事务:付款

      step7:接着我们来看重点2:this.endTransaction(sendResult, localTransactionState, localException);

      public void endTransaction(
              // 省略一堆代码
              //事务id
              String transactionId = sendResult.getTransactionId();
              // broker地址
              final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
              EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
              requestHeader.setTransactionId(transactionId);
              requestHeader.setCommitLogOffset(id.getOffset());
              // 根据事务消息和本地事务的执行结果,发送不同的结果给broker
              switch (localTransactionState) {
                  case COMMIT_MESSAGE:
                      requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                      break;
                  case ROLLBACK_MESSAGE:
                      requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                      break;
                  case UNKNOW:
                      requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                      break;
                  default:
                      break;
              }

              requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
              requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
              requestHeader.setMsgId(sendResult.getMsgId());
              String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
              //发送给broker
              this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
                  this.defaultMQProducer.getSendMsgTimeout());
          }

      到这个时候,我们已经把消息从生产者发送到了broker里面。

      那接下来,我们就需要了解broker是如何处理事务消息的。

      step8: 事务消息如何回查

      直接看代码注解即可
      TransactionalMessageCheckService#onWaitEnd

      @Override
          protected void onWaitEnd() {
              //timeout是从broker配置文件中获取transactionTimeOut值,代表事务的过期时间,(一个消息的存储时间 + timeout) > 系统当前时间,才会对该消息执行事务状态会查
              long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
              //checkMax是从broker配置文件中获取transactionCheckMax值,代表事务的最大检测次数,如果超过检测次数,消息会默认为丢弃,即rollback消息
              int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
              long begin = System.currentTimeMillis();
              log.info("Begin to check prepare message, begin time:{}", begin);
              //回查:核心点org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl.check
              this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
              log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
          }

      step9:进入check方法:TransactionalMessageServiceImpl#check。

      直接看注解即可

      @Override
          public void check(long transactionTimeout, int transactionCheckMax,
              AbstractTransactionalMessageCheckListener listener) {
              try {
                  //RMQ_SYS_TRANS_HALF_TOPIC主题
                  String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
                  //获取RMQ_SYS_TRANS_HALF_TOPIC主题下的所有队列
                  Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
                  //数据校验
                  if (msgQueues == null || msgQueues.size() == 0) {
                      log.warn("The queue of topic is empty :" + topic);
                      return;
                  }
                  log.debug("Check topic={}, queues={}", topic, msgQueues);
                  //遍历队列
                  for (MessageQueue messageQueue : msgQueues) {
                      long startTime = System.currentTimeMillis();
                      //根据队列获取对应topic:RMQ_SYS_TRANS_OP_HALF_TOPIC下的opQueue
                      //RMQ_SYS_TRANS_HALF_TOPIC:prepare消息的主题,事务消息首先先进入到该主题。
                      //RMQ_SYS_TRANS_OP_HALF_TOPIC:当消息服务器收到事务消息的提交或回滚请求后,会将消息存储在该主题下
                      MessageQueue opQueue = getOpQueue(messageQueue);
                      //messageQueue队列的偏移量
                      long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                      //opQueue队列的偏移量
                      long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);

                      log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
                      //如果其中一个队列的偏移量小于0,就跳过
                      if (halfOffset < 0 || opOffset < 0) {
                          log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                              halfOffset, opOffset);
                          continue;
                      }
                      //doneOpOffset和removeMap主要的目的是避免重复调用事务回查接口
                      List<Long> doneOpOffset = new ArrayList<>();
                      HashMap<Long, Long> removeMap = new HashMap<>();
                      PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
                      if (null == pullResult) {
                          log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
                              messageQueue, halfOffset, opOffset);
                          continue;
                      }
                      // single thread
                      //空消息的次数
                      int getMessageNullCount = 1;
                      //RMQ_SYS_TRANS_HALF_TOPIC#queueId的最新偏移量
                      long newOffset = halfOffset;
                      //RMQ_SYS_TRANS_HALF_TOPIC的偏移量
                      long i = halfOffset;
                      while (true) {
                          //限制每次最多处理的时间是60s
                          if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                              log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                              break;
                          }
                          //removeMap包含当前信息,则跳过,处理下一条信息
                          //removeMap的信息填充是在上面的fillOpRemoveMap
                          //fillOpRemoveMap具体逻辑是:具体实现逻辑是从RMQ_SYS_TRANS_OP_HALF_TOPIC主题中拉取32条,
                          //如果拉取的消息队列偏移量大于等于RMQ_SYS_TRANS_HALF_TOPIC#queueId当前的处理进度时
                          //会添加到removeMap中,表示已处理过
                          if (removeMap.containsKey(i)) {
                              log.info("Half offset {} has been committed/rolled back", i);
                              Long removedOpOffset = removeMap.remove(i);
                              doneOpOffset.add(removedOpOffset);
                          } else {
                              //根据消息队列偏移量i从RMQ_SYS_TRANS_HALF_TOPIC队列中获取消息
                              GetResult getResult = getHalfMsg(messageQueue, i);
                              MessageExt msgExt = getResult.getMsg();
                              //如果消息为空
                              if (msgExt == null) {
                                  //则根据允许重复次数进行操作,默认重试一次  MAX_RETRY_COUNT_WHEN_HALF_NULL=1
                                  //如果超过重试次数,直接跳出while循环,结束该消息队列的事务状态回查
                                  if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                                      break;
                                  }
                                  //如果是由于没有新的消息而返回为空(拉取状态为:PullStatus.NO_NEW_MSG),则结束该消息队列的事务状态回查。
                                  if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                                      log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
                                          messageQueue, getMessageNullCount, getResult.getPullResult());
                                      break;
                                  } else {
                                      log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
                                          i, messageQueue, getMessageNullCount, getResult.getPullResult());
                                      //其他原因,则将偏移量i设置为:getResult.getPullResult().getNextBeginOffset(),重新拉取
                                      i = getResult.getPullResult().getNextBeginOffset();
                                      newOffset = i;
                                      continue;
                                  }
                              }
                              //判断该消息是否需要discard(吞没,丢弃,不处理)、或skip(跳过)
                              //needDiscard 依据:如果该消息回查的次数超过允许的最大回查次数,
                              // 则该消息将被丢弃,即事务消息提交失败,不能被消费者消费,其做法,
                              // 主要是每回查一次,在消息属性TRANSACTION_CHECK_TIMES中增1,默认最大回查次数为5次。

                              //needSkip依据:如果事务消息超过文件的过期时间,
                              // 默认72小时(具体请查看RocketMQ过期文件相关内容),则跳过该消息。
                              if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
                                  listener.resolveDiscardMsg(msgExt);
                                  newOffset = i + 1;
                                  i++;
                                  continue;
                              }
                              //消息的存储时间大于开始时间,中断while循环
                              if (msgExt.getStoreTimestamp() >= startTime) {
                                  log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
                                      new Date(msgExt.getStoreTimestamp()));
                                  break;
                              }
                              //该消息已存储的时间=系统当前时间-消息存储的时间戳
                              long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
                              //checkImmunityTime:检测事务的时间
                              //transactionTimeout:事务消息的超时时间
                              long checkImmunityTime = transactionTimeout;
                              //用户设定的checkImmunityTimeStr
                              String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
                              if (null != checkImmunityTimeStr) {
                                  //checkImmunityTime=Long.valueOf(checkImmunityTimeStr)
                                  checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                                  if (valueOfCurrentMinusBorn < checkImmunityTime) {
                                      if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                                          //最近进度=当前消息进度+1
                                          newOffset = i + 1;
                                          i++;
                                          continue;
                                      }
                                  }
                              } else {//如果当前时间小于事务超时时间,则结束while循环
                                  if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
                                      log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
                                          checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                                      break;
                                  }
                              }
                              List<MessageExt> opMsg = pullResult.getMsgFoundList();
                              //是否需要回查,判断依据如下:
                              //消息已存储的时间大于事务超时时间
                              boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
                                  || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
                                  || (valueOfCurrentMinusBorn <= -1);

                              if (isNeedCheck) {
                                  if (!putBackHalfMsgQueue(msgExt, i)) {//11
                                      continue;
                                  }
                                  //重点:进行事务回查(异步)
                                  listener.resolveHalfMsg(msgExt);
                              } else {
                                  //加载已处理的消息进行筛选
                                  pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                                  log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                                      messageQueue, pullResult);
                                  continue;
                              }
                          }
                          newOffset = i + 1;
                          i++;
                      }
                      //保存half消息队列的回查进度
                      if (newOffset != halfOffset) {
                          transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
                      }
                      long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
                      //保存处理队列opQueue的处理今夕
                      if (newOpOffset != opOffset) {
                          transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
                      }
                  }
              } catch (Throwable e) {
                  log.error("Check error", e);
              }

          }

      step10:继续深入研究一下:resolveHalfMsg

      public void resolveHalfMsg(final MessageExt msgExt) {
              executorService.execute(new Runnable() {
                  @Override
                  public void run() {
                      try {
                          //针对每个待反查的half消息,进行回查本地事务结果
                          sendCheckMessage(msgExt);
                      } catch (Exception e) {
                          LOGGER.error("Send check message error!", e);
                      }
                  }
              });
          }

      step11:继续追进sendCheckMessage(msgExt)方法

      /**
           * 发送回查消息
           * @param msgExt
           * @throws Exception
           */
          public void sendCheckMessage(MessageExt msgExt) throws Exception {
              CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
              checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
              checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
              checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
              checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
              checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
              //原主题
              msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
              //原队列id
              msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));

              msgExt.setStoreSize(0);
              String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
              Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
              if (channel != null) {
                  //回调查询本地事务状态
                  brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
              } else {
                  LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
              }
          }

      到这里,基本上把事务消息的流程和实现细节走了一遍。

      利用事务消息,我们可以确保消息在事务提交后一定能发送成功到RocketMQ,接下来,我们继续处理消息可能重复的问题。

      5、消息重复的原因分析

      1、目的:解决优惠券重复发放的问题
      2、解决方案:待定
      3、原因:未知

      抽象原因

      • 原因1:生产者多次发送同一条消息,导致消费者多次消息同一条消息,因此重复发放优惠券
      • 原因2:生产者发送一次消息,但消费者多次消费同一条消息,因此重复发送优惠券

      具象原因

      • 比如原因1,用户支付订单后,订单系统处理有点慢,这会让支付系统以为请求超时,这时支付系统会再次调用订单系统。这就会导致订单系统多次发送同一条支付消息。

      • 原因1,消息重试,网络异常,都会导致生产者多次发送同一条消息,这里不细说,想要细究,欢迎留言。

      • 比如原因2,用户支付成功,订单系统和支付系统交互也没超时,顺利发送一条支付消息,这个时候优惠券系统也成功消费支付消息,这时发放了一张优惠券。但意外来了,这个时候优惠券系统崩了,但还没来得及提交消费进度offset到RocketMQ。因而重启优惠券系统后,又会重新消费一次支付消息,从而重复发放优惠券。

      我们知道问题原因后,就可以开始设计解决方案了

      • 针对原因1:生产者多次发送同一条消息,导致消费者多次消息同一条消息,因此重复发放优惠券
        • 确保那就是要确保生产者能成功发送有且只有一次消息,但需要确保消费者只消费一次。
      • 对阵原因2:生产者发送一次消息,但消费者多次消费同一条消息,因此重复发送优惠券
        • 那就是要确保消费者只消费一次。但奈何这个不太现实,为什么呢?因为服务器重启,上线升级版本,这是非常常见的现象。因此,只消费一次,不太现实。

      上面这2种方案,都涉及到消费者,都不能完美解决重复消费消息的问题

      其实我们想想,我们可以用事务消息的方法,来确保消息一定能发送成功到RocketMQ,这个时候我们只需要解决消费者的消费问题即可。

      6、幂等性

      想要解决重复消息问题,我们需要引入幂等性机制。

      什么是幂等性

      就是无论别人对你的接口请求多少次,你都需要保证接口调用一次和多次的结果是相同的。

      天上飞的理论,得有落地实现。

      幂等性,就是理论。那具体的落地实现一般有:

      业务判断法

      举个例子,在电商系统中,有订单id,这个时候在优惠券系统每消费一条支付消息,同步插入一条订单数据,能插入成功,证明之前这个订单没被消费过,发送优惠券。插入失败,则证明这个订单之前已经被消费过了。不做任何操作即可。

      Redis缓存法

      在并发量特别高的订单系统中,支付消息会特别多,这个时候,如果用业务判断法,插入数据库,容易存在瓶颈。这个时候如果想要提高并发量,可以考虑使用Redis。

      Redis缓存订单id,如果这个订单id已经被消费过后,会存在Redis中。当这个订单id再次被消费时,就会被过滤,不操作。

      这也是一种幂等性的实现方法。但Redis容易丢数据,这也是需要考虑的。

      7、总结

      今天我们通过电商优惠券发放的场景,介绍基于RocketMQ事务消息实现分布式事务和消息幂等。

      用什么幂等性的具体方案,得看你的使用场景。

      • 如果你的并发量不高,直接用数据库即可解决。

      • 如果你允许有误差,允许重复发放优惠券,只追求高并发量,直接用Redis即可。

      • 如果你既要并发量,也要准确性,可以结合数据库+Redis的方案,但这种方案实现复杂度比较高。

      所以,选择什么方案,真的得看你的使用场景。

      你也可以看看自己公司的分布式事务、幂等性方案是如何实现的?

      好了,今天的分享就到这结束了,欢迎交流。

      后续文章

      • RocketMQ-入门(已更新)

      • RocketMQ-发送消息(已更新)

      • RocketMQ-集群模式和广播模式(已更新)
      • RocketMQ-顺序与批量消息源码解析与场景分析(已更新)
      • RocketMQ-延迟消息源码解析与场景分析(已更新)
      • Rock
        e
        tMQ
        -事务


        幂等性
        (已更新)
      • RocketMQ-高可用与
        主从复制
      • RocketMQ-高性能、存储机制与消息过滤

      本文分享自微信公众号 – RocketMQ官微(ApacheRocketMQ)。
      如有侵权,请联系 support@oschina.cn 删除。
      本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

      请登录之后再进行评论

      登录

      手机阅读天地(APP)

      • 微信公众号
      • 微信小程序
      • 安卓APP
      手机浏览,惊喜多多
      匿名树洞,说我想说!
      问答悬赏,VIP可见!
      密码可见,回复可见!
      即时聊天、群聊互动!
      宠物孵化,赠送礼物!
      动态像框,专属头衔!
      挑战/抽奖,金币送不停!
      赶紧体会下,不会让你失望!
    • 实时动态
    • 签到
    • 做任务
    • 发表内容
    • 偏好设置
    • 到底部
    • 帖子间隔 侧栏位置:
    • 还没有账号?点这里立即注册