RocketMq深入分析讲解两种削峰方式

 

何时需要削峰

当上游调用下游服务速率高于下游服务接口QPS时,那么如果不对调用速率进行控制,那么会发生很多失败请求

 

通过消息队列的削峰方法有两种

控制消费者消费速率和生产者投放延时消息,本质都是控制消费速度

通过消费者参数控制消费速度

先分析那些参数对控制消费速度有作用

1.PullInterval: 设置消费端,拉取mq消息的间隔时间。

注意:该时间算起时间是rocketMq消费者从broker消息后算起。经过PullInterval再次向broker拉去消息

源码分析:

首先需要了解rocketMq的消息拉去过程

拉去消息的类

PullMessageService

public class PullMessageService extends ServiceThread {
  private final InternalLogger log = ClientLogger.getLog();
  private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
  private final MQClientInstance mQClientFactory;
  private final ScheduledExecutorService scheduledExecutorService = Executors
  .newSingleThreadScheduledExecutor(new ThreadFactory() {
      @Override
      public Thread newThread(Runnable r) {
          return new Thread(r, "PullMessageServiceScheduledThread");
      }
  });
  public PullMessageService(MQClientInstance mQClientFactory) {
      this.mQClientFactory = mQClientFactory;
  }
  public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
      if (!isStopped()) {
          this.scheduledExecutorService.schedule(new Runnable() {
              @Override
              public void run() {
                  PullMessageService.this.executePullRequestImmediately(pullRequest);
              }
          }, timeDelay, TimeUnit.MILLISECONDS);
      } else {
          log.warn("PullMessageServiceScheduledThread has shutdown");
      }
  }
  public void executePullRequestImmediately(final PullRequest pullRequest) {
      try {
          this.pullRequestQueue.put(pullRequest);
      } catch (InterruptedException e) {
          log.error("executePullRequestImmediately pullRequestQueue.put", e);
      }
  }
  public void executeTaskLater(final Runnable r, final long timeDelay) {
      if (!isStopped()) {
          this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);
      } else {
          log.warn("PullMessageServiceScheduledThread has shutdown");
      }
  }
  public ScheduledExecutorService getScheduledExecutorService() {
      return scheduledExecutorService;
  }
  private void pullMessage(final PullRequest pullRequest) {
      final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
      if (consumer != null) {
          DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
          impl.pullMessage(pullRequest);
      } else {
          log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
      }
  }
  @Override
  public void run() {
      log.info(this.getServiceName() + " service started");
      while (!this.isStopped()) {
          try {
              PullRequest pullRequest = this.pullRequestQueue.take();
              this.pullMessage(pullRequest);
          } catch (InterruptedException ignored) {
          } catch (Exception e) {
              log.error("Pull Message Service Run Method exception", e);
          }
      }
      log.info(this.getServiceName() + " service end");
  }
  @Override
  public void shutdown(boolean interrupt) {
      super.shutdown(interrupt);
                     ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 1000, TimeUnit.MILLISECONDS);
                     }
                     @Override
                     public String getServiceName() {
                     return PullMessageService.class.getSimpleName();
                     }
                     }

继承自ServiceThread,这是一个单线程执行的service,不断获取阻塞队列中的pullRequest,进行消息拉取。

executePullRequestLater会延时将pullrequest放入到pullRequestQueue,达到延时拉去的目的。

那么PullInterval参数就是根据这个功能发挥的作用,在消费者拉去消息成功的回调

 PullCallback pullCallback = new PullCallback() {
          @Override
          public void onSuccess(PullResult pullResult) {
              if (pullResult != null) {
                  pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                      subscriptionData);
                  switch (pullResult.getPullStatus()) {
                      case FOUND:
                          long prevRequestOffset = pullRequest.getNextOffset();
                          pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                          long pullRT = System.currentTimeMillis() - beginTimestamp;
                          DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                              pullRequest.getMessageQueue().getTopic(), pullRT);
                          long firstMsgOffset = Long.MAX_VALUE;
                          if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                              DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                          } else {
                              firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                              DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                  pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                              boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                              DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                  pullResult.getMsgFoundList(),
                                  processQueue,
                                  pullRequest.getMessageQueue(),
                                  dispatchToConsume);
                              if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                  DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                      DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                              } else {
                                  DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                              }
                          }
                          if (pullResult.getNextBeginOffset() < prevRequestOffset
                              || firstMsgOffset < prevRequestOffset) {
                              log.warn(
                                  "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                  pullResult.getNextBeginOffset(),
                                  firstMsgOffset,
                                  prevRequestOffset);
                          }
                          break;
                      case NO_NEW_MSG:
                          pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                          DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                          DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                          break;
                      case NO_MATCHED_MSG:
                          pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                          DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                          DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                          break;
                      case OFFSET_ILLEGAL:
                          log.warn("the pull request offset illegal, {} {}",
                              pullRequest.toString(), pullResult.toString());
                          pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                          pullRequest.getProcessQueue().setDropped(true);
                          DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
                              @Override
                              public void run() {
                                  try {
                                      DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                          pullRequest.getNextOffset(), false);
                                      DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
                                      DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
                                      log.warn("fix the pull request offset, {}", pullRequest);
                                  } catch (Throwable e) {
                                      log.error("executeTaskLater Exception", e);
                                  }
                              }
                          }, 10000);
                          break;
                      default:
                          break;
                  }
              }
          }
          @Override
          public void onException(Throwable e) {
              if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                  log.warn("execute the pull request exception", e);
              }
              DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
          }
      };

在 case found的情况下,也就是拉取到消息的q情况,在PullInterval>0的情况下,会延时投递到pullRequestQueue中,实现拉取消息的间隔

if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                  DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                      DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                              } else {
                                  DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                              }

2.PullBatchSize: 设置每次pull消息的数量,该参数设置是针对逻辑消息队列,并不是每次pull消息拉到的总消息数

消费端分配了两个消费队列来监听。那么PullBatchSize 设置为32,那么该消费端每次pull到 64个消息。

消费端每次pull到消息总数=PullBatchSize*监听队列数

源码分析

消费者拉取消息时

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage中

会执行

 this.pullAPIWrapper.pullKernelImpl(
              pullRequest.getMessageQueue(),
              subExpression,
              subscriptionData.getExpressionType(),
              subscriptionData.getSubVersion(),
              pullRequest.getNextOffset(),
              this.defaultMQPushConsumer.getPullBatchSize(),
              sysFlag,
              commitOffsetValue,
              BROKER_SUSPEND_MAX_TIME_MILLIS,
              CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
              CommunicationMode.ASYNC,
              pullCallback
          );

其中 this.defaultMQPushConsumer.getPullBatchSize(),就是配置的PullBatchSize,代表的是每次从broker的一个队列上拉取的最大消息数。

3.ThreadMin和ThreadMax: 消费端消费pull到的消息需要的线程数量。

源码分析:

还是在消费者拉取消息成功时

  boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                  pullResult.getMsgFoundList(),
                                  processQueue,
                                  pullRequest.getMessageQueue(),
                                  dispatchToConsume);

通过consumeMessageService执行

默认情况下是并发消费

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest

  @Override
  public void submitConsumeRequest(
      final List<MessageExt> msgs,
      final ProcessQueue processQueue,
      final MessageQueue messageQueue,
      final boolean dispatchToConsume) {
      final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
      if (msgs.size() <= consumeBatchSize) {
          ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
          try {
              this.consumeExecutor.submit(consumeRequest);
          } catch (RejectedExecutionException e) {
              this.submitConsumeRequestLater(consumeRequest);
          }
      } else {
          for (int total = 0; total < msgs.size(); ) {
              List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
              for (int i = 0; i < consumeBatchSize; i++, total++) {
                  if (total < msgs.size()) {
                      msgThis.add(msgs.get(total));
                  } else {
                      break;
                  }
              }
              ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
              try {
                  this.consumeExecutor.submit(consumeRequest);
              } catch (RejectedExecutionException e) {
                  for (; total < msgs.size(); total++) {
                      msgThis.add(msgs.get(total));
                  }
                  this.submitConsumeRequestLater(consumeRequest);
              }
          }
      }
  }

其中consumeExecutor初始化

this.consumeExecutor = new ThreadPoolExecutor(
          this.defaultMQPushConsumer.getConsumeThreadMin(),
          this.defaultMQPushConsumer.getConsumeThreadMax(),
          1000 * 60,
          TimeUnit.MILLISECONDS,
          this.consumeRequestQueue,
          new ThreadFactoryImpl("ConsumeMessageThread_"));

对象线程池最大和核心线程数。对于顺序消费ConsumeMessageOrderlyService也会使用最大和最小线程数这两个参数,只是消费时会锁定队列。

以上三种情况:是针对参数配置,来调整消费速度。

除了这三种情况外还有两种服务部署情况,可以调整消费速度:

4.rocketMq 逻辑消费队列配置数量 有消费端每次pull到消息总数=PullBatchSize*监听队列数

可知rocketMq 逻辑消费队列配置数量即上图中的 queue1 ,queue2,配置数量越多每次pull到的消息总数也就越多。如果下边配置读队列数量:修改tocpic的逻辑队列数量

5.消费端节点部署数量 :

部署数量无论一个节点监听所有队列,还是多个节点按照分配策略分配监听队列数量,理论上每秒pull到的数量都一样的,但是多节点消费端消费线程数量要比单节点消费线程数量多,也就是多节点消费速度大于单节点。

 

消费延时控流

针对消息订阅者的消费延时流控的基本原理是,每次消费时在客户端增加一个延时来控制消费速度,此时理论上消费并发最快速度为:

单节点部署:

ConsumInterval :延时时间单位毫秒

ConcurrentThreadNumber:消费端线程数量

MaxRate :理论每秒处理数量

MaxRate = 1 / ConsumInterval * ConcurrentThreadNumber

如果消息并发消费线程(ConcurrentThreadNumber)为 20,延时(ConsumInterval)为 100 ms,代入上述公式可得

如果消息并发消费线程(ConcurrentThreadNumber)为 20,延时(ConsumInterval)为 100 ms,代入上述公式可得

200 = 1 / 0.1 * 20

由上可知,理论上可以将并发消费控制在 200 以下

如果是多个节点部署如两个节点,理论消费速度最高为每秒处理400个消息。

如下延时流控代码:

 /**
   * 测试mq 并发 接受
   */
  @Component
  @RocketMQMessageListener(topic = ConstantTopic.WRITING_LIKE_TOPIC,selectorExpression = ConstantTopic.WRITING_LIKE_ADD_TAG, consumerGroup = "writing_like_topic_add_group")
  class ConsumerLikeSave implements RocketMQListener<LikeWritingParams>, RocketMQPushConsumerLifecycleListener{
      @SneakyThrows
      @Override
      public void onMessage(LikeWritingParams params) {
          System.out.println("睡上0.1秒");
          Thread.sleep(100);
          long begin = System.currentTimeMillis();
          System.out.println("mq消费速度"+Thread.currentThread().getName()+"  "+DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now()));
          //writingLikeService.saveLike2Db(params.getUserId(),params.getWritingId());
          long end = System.currentTimeMillis();
        //  System.out.println("消费:: " +Thread.currentThread().getName()+ "毫秒:"+(end - begin));
      }
      @Override
      public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
          defaultMQPushConsumer.setConsumeThreadMin(20); //消费端拉去到消息以后分配线索去消费
          defaultMQPushConsumer.setConsumeThreadMax(50);//最大消费线程,一般情况下,默认队列没有塞满,是不会启用新的线程的
          defaultMQPushConsumer.setPullInterval(0);//消费端多久一次去rocketMq 拉去消息
          defaultMQPushConsumer.setPullBatchSize(32);     //消费端每个队列一次拉去多少个消息,若该消费端分赔了N个监控队列,那么消费端每次去rocketMq拉去消息说为N*1
          defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
          defaultMQPushConsumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
          defaultMQPushConsumer.setConsumeMessageBatchMaxSize(2);
      }
  }

注释:如上消费端,单节点每秒处理速度也就是最高200个消息,实际上要小于200,业务代码执行也是需要时间。

但是要注意实际操作中并发流控实际是默认存在的,

spring boot 消费端默认配置

this.consumeThreadMin = 20;

this.consumeThreadMax = 20;

this.pullInterval = 0L;

this.pullBatchSize = 32;

若业务逻辑执行需要20ms,那么单节点处理速度就是:1/0.02*20=1000

这里默认拉去的速度1s内远大于1000

注意: 这里虽然pullInterval 等于0 当时受限于每次拉去64个,处理完也是需要一端时间才能回复ack,才能再次拉取,所以消费速度应该小于1000

所以并发流控要消费速度大于消费延时流控 ,那么消费延时流控才有意义

使用rokcetMq支持的延时消息也可以实现消息的延时消费,通过对delayLevel对应的时间进行配置为我们的需求。为不同的消息设置不同delayLevel,达到延时消费的目的。

 

总结

rocketMq 肖锋流控两种方式:

并发流控:就是根据业务流控速率要求,来调整topic 消费队列数量(read queue),消费端部署节点,消费端拉去间隔时间,消费端消费线程数量等,来达到要求的速率内

延时消费流控:就是在消费端延时消费消息(sleep),具体延时多少要根据业务要求速率,和消费端线程数量,和节点部署数量来控制

关于RocketMq深入分析讲解两种削峰方式的文章就介绍至此,更多相关RocketMq削峰内容请搜索编程宝库以前的文章,希望以后支持编程宝库

 前言使用mq自带的死信去实现延时消息要注意一个坑点,就是mq只会检测队首的消息的过期时间,假设先放入队列10s过期消息,再放入2s过期。mq会检测头部10s是否过期,10s不过期的 ...