最简单实现高并发插入数万条数据(可同步可异步)

获取数据| 整理数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

public static int count = 0;

String uniqueTimeId = IDUtils.getInstance().getUniqueTimeId();

String sqlof1 = "select vc_scode from ( "+sqlExpr+" )";

String sqlof2 = SqlExprConstant.pool_sql.replace(SqlExprConstant.POOL_TEMPLATE, poolId);

String joiner = " minus ";

//查询筛选结果
List<Map<String, Object>> reduce1 = riskruleService.executeSql(sqlof1+joiner+sqlof2);


//插入数据库 121_证券筛选执行结果
List<String> keys1 = reduce1.stream().filter(map -> !Objects.isNull(map.get("VC_SCODE")))
.map(i ->(String)i.get("VC_SCODE"))
.distinct().collect(Collectors.toList());


//并发插入数据
CurrencyAdd(keys1.size(), 100, keys1, uniqueTimeId, "1");

高并发插入上万条数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* 高并发插入上万条数据
* @param totalCount
* @param threadTotal
* @throws InterruptedException
* @author Bruce
*/
public void CurrencyAdd(int totalCount ,int threadTotal,List<String> keys,String uniqueTimeId,String cCompType) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
// 限制同时执行的线程数
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(totalCount);

for (int i = 0; i < totalCount; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add(cCompType);
//根据下标获取取值插入
log.info("current count no is --->{}",count);
CompSecuFilterRes res = new CompSecuFilterRes();
String code = "";

code = keys.get(count-1);

res.setVcScode(code );
res.setVcFilterSeqno(uniqueTimeId);
res.setCCompType(cCompType);
res.setDMdftime(new Date());
riskruleService.addCompSecuFilterRes(res);
semaphore.release();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
countDownLatch.countDown();

});
}

//异步不等待执行过程就把这行注掉
//countDownLatch.await();
executorService.shutdown();
log.info("count{}", count);
}

private synchronized static void add(String type ) {

count++;

}

日志信息

1
2
3
4
5
6
7
8
9
10
11
12
13
...

2019-03-27 at 18:38:37 CST INFO com.xx.xxx.controller.xxxxController 442 lambda$4 - current count no is --->97
2019-03-27 at 18:38:37 CST DEBUG org.apache.ibatis.logging.jdbc.BaseJdbcLogger 159 debug - ==> Preparing: insert into T_COMP_SECU_FILTER_RES (VC_FILTER_SEQNO, C_REC_TYPE, VC_SCODE, C_COMP_TYPE, D_MDFTIME) values (?, ?, ?, ?, ?)
2019-03-27 at 18:38:37 CST DEBUG org.apache.ibatis.logging.jdbc.BaseJdbcLogger 159 debug - ==> Parameters: 1553683109754193(String), null, 011801593YH(String), 2(String), 2019-03-27(Date)
2019-03-27 at 18:38:37 CST DEBUG org.apache.ibatis.logging.jdbc.BaseJdbcLogger 159 debug - <== Updates: 1
2019-03-27 at 18:38:37 CST INFO com.xx.xxx.controller.xxxxController 442 lambda$4 - current count no is --->98
2019-03-27 at 18:38:37 CST DEBUG org.apache.ibatis.logging.jdbc.BaseJdbcLogger 159 debug - ==> Preparing: insert into T_COMP_SECU_FILTER_RES (VC_FILTER_SEQNO, C_REC_TYPE, VC_SCODE, C_COMP_TYPE, D_MDFTIME) values (?, ?, ?, ?, ?)
2019-03-27 at 18:38:37 CST DEBUG org.apache.ibatis.logging.jdbc.BaseJdbcLogger 159 debug - ==> Parameters: 1553683109754193(String), null, 041758030YH(String), 1(String), 2019-03-27(Date)
2019-03-27 at 18:38:37 CST DEBUG org.apache.ibatis.logging.jdbc.BaseJdbcLogger 159 debug - <== Updates: 1
2019-03-27 at 18:38:37 CST INFO com.xx.xxx.controller.xxxxController 442 lambda$4 - current count no is --->99

...
生活不止苟且,还有我喜爱的海岸.