如何实现用MQ解决并发问题原创
金蝶云社区-sharkv
sharkv
19人赞赏了该文章 4,862次浏览 未经作者许可,禁止转载编辑于2022年04月15日 11:25:32
summary-icon摘要由AI智能服务提供

文本描述了通信运营商在推广促销中遇到的技术问题及其解决方案。首先,文本说明了功能性需求是用户通过SIM卡申请单抢号,成功则通知用户。而非功能性需求则指出高并发时可能导致服务器负载过重和数据重复的问题。为解决这些问题,提出了通过MQ(消息队列)异步削峰,并结合队列的FIFO(先进先出)性质来消除重复数据插入。在实现过程中,创建了号卡申请单,增加了号码的唯一性校验,并通过代码示例展示了如何通过MQ处理并发请求和直接并发提交的测试逻辑。同时,模拟了并发请求的场景,展示了如何在代码中利用CountDownLatch来控制线程的同步和异步执行,确保在大量并发情况下,系统能够稳定运行,并有效避免数据重复问题。

关键字:二开MQ、并发问题、数据重复

一、需求

功能性需求:通信运营商新增了一批号卡,进行了一系列促销推广,并在某时间段内允许抢号。抢号流程如下:通过SIM卡申请单,选择号卡,提交申请后,若抢号成功系统会发送消息通知用户“抢号成功”。

非功能性需求:

1.开抢时间段内,大量并发涌入,可能会增加服务器负载,造成服务器崩溃

2.若开抢时间内,大量用户抢同一号卡,则可能造成重复数据

二、思路与方案

1.通过MQ异步削峰,缓解订单系统同时处理大量订单插入的压力。

2.结合队列的数据结构FIFO性质,消除重复插入问题。

 

三、实现过程

1.创建号卡申请单

为了测试方便单据编号取消必录,取消唯一性校验,添加工具栏【直接并发提交】【MQ处理并发】

 

1.png


图 1

增加号码的唯一性校验,并添加必录

2.png


图 2

注册插件kd.demo.sci.formplugin.MulThreadsEdit

 

public void itemClick(ItemClickEvent evt) {
                  if(evt.getItemKey().equals("concurrent")) {
                          Runnable taskTemp = new BizCodeThread(false);
                 LatchTest latchTest = new LatchTest();
                 try {
                                   latchTest.startTaskAllInOnce(200, taskTemp);
                          } catch (InterruptedException e) {
                                   Logger.error(e);
                          }
                  }else if(evt.getItemKey().equals("mq-concurrent")) {
                          Runnable taskTemp = new BizCodeThread(true);
                 LatchTest latchTest = new LatchTest();
                 try {
                                   latchTest.startTaskAllInOnce(200, taskTemp);
                          } catch (InterruptedException e) {
                                   Logger.error(e);
                          }
                 
                  }
         }

3.png


图 3

 

2.   模拟并发请求kd.demo.sci.task.LatchTest

public long startTaskAllInOnce(int threadNums, final Runnable job) throws InterruptedException {
    final CountDownLatch startGate = new CountDownLatch(1);
    final CountDownLatch endGate = new CountDownLatch(threadNums);
    for(int i = 0; i < threadNums; i++) {
             ThreadPools.executeOnce("multhreads", new Runnable() {
                          @Override
                          public void run() {
                                    try {
                        // 使线程在此等待,当开始门打开时,一起涌入门中
                        startGate.await();
                        try {
                               ThreadPools.executeOnceIncludeRequestContext("tt", job);
                        } finally {
                            // 将结束门减1,减到0时,就可以开启结束门了
                            endGate.countDown();
                        }
                    } catch (InterruptedException ie) {
                        ie.printStackTrace();
                    }
                                  
                          }
                  });
    }
    long startTime = System.nanoTime();
    System.out.println(startTime + " [" + Thread.currentThread() + "] All thread is ready, concurrent going...");
    // 因开启门只需一个开关,所以立马就开启开始门
    startGate.countDown();
    // 等等结束门开启
    endGate.await();
    long endTime = System.nanoTime();
    System.out.println(endTime + " [" + Thread.currentThread() + "] All thread is completed.");
    return endTime - startTime;
}

 

4.png


图 4

3.模拟申请单业务

并发业务实现类kd.demo.sci.task.BizCodeThread

public class BizCodeThread implements Runnable {
         private boolean isUseMQ = false;
        
         public BizCodeThread(boolean isUseMQ) {
                  super();
                  this.isUseMQ = isUseMQ;
         }
         @Override
         public void run() {
                  if(isUseMQ) {
                          MessagePublisher mp = MQFactory.get().createSimplePublisher("kded_tc", "erkai_queue");
                          mp.publish("hello");
                  }else {
                          DynamicObject obj = BusinessDataServiceHelper.newDynamicObject("kded_simapply");
                          obj.set("billstatus", "A");
                          obj.set("kded_phone", "17299999999");
                          OperationResult result = OperationServiceHelper.executeOperate("submit", "kded_simapply", new DynamicObject[] {obj}, OperateOption.create());
                 
                          //---------发消息通知用户申请成功或失败
                          //--------MessageCenterServiceHelper.sendMessage(messageInfo)
                  }
         }
}
 
消费者实现类kd.demo.sci.consumer.DemoConsumer
public void onMessage(Object message, String messageId, boolean resend, MessageAcker acker) {
         log.info("自定义DemoConsumer开始消费");
         try {
                  DynamicObject obj = BusinessDataServiceHelper.newDynamicObject("kded_simapply");
                  obj.set("billstatus", "A");
                  obj.set("kded_phone", "17299999999");
                  OperationResult result = OperationServiceHelper.executeOperate("submit", "kded_simapply", new DynamicObject[] {obj},
                 
                  //---------发消息通知用户申请成功或失败
                  //--------MessageCenterServiceHelper.sendMessage(messageInfo)
         } catch (Throwable e) {
                  boolean discard = false; //是否废弃这条消息,根据具体场景判断
                  if (discard){
                          acker.discard(messageId);//废弃
                          // 记录废弃原因,并写业务日志
                  } else{
                          acker.deny(messageId);//告诉mq重发这条消息
                          // 记录异常原因,并写业务日志
                  }
         }
}

分别模拟了直接触发和MQ触发两种方法

业务内容:生成17299999999的号卡申请单

注意事项:如果是后台审核则不能直接在表单插件实现。

四、效果图

1.点击【直接并发提交】

【monitor流量监控】最大QPS172

5.png



图 5

【monitor指标监控】:系统CPU负载峰值50%

6.png



图 6

【单据列表】出现了4条重复数据

7.png



图 7

2.点击【MQ处理并发】

【monitor流量监控】最大QPS27

8.png



图 8

【monitor指标监控】:系统CPU负载峰值13%

9.png



图 9

【单据列表】数据正确,无重复数据

10.png



图 10

五、开发环境版本

V4.0.0.14

六、参考资料

【开发平台】指导手册

学习成长中心 

附件.rar(9.69KB)

赞 19