Browse Source

直接读取C端数据库数据

qiangxuan 1 tháng trước cách đây
mục cha
commit
ef240f25cd

+ 14 - 36
zgzt-sys-java/jeecg-module-conn/src/main/java/org/jeecg/modules/billetActual/service/impl/BilletBasicInfoExceptionServiceImpl.java

@@ -47,43 +47,21 @@ public class BilletBasicInfoExceptionServiceImpl extends ServiceImpl<BilletBasic
             String classShiftGroupStr = !oConvertUtils.getString(redisTemplate.opsForValue().get(classShiftGroup)).isEmpty() ? oConvertUtils.getString(redisTemplate.opsForValue().get(classShiftGroup)) : "";
             String brandNumStr = !oConvertUtils.getString(redisTemplate.opsForValue().get(brandNum)).isEmpty() ? oConvertUtils.getString(redisTemplate.opsForValue().get(brandNum)) : "";
             log.info("{}{}", "钢坯实绩异常数据接收时,缓存中的班组班别:", classShiftGroupStr + " " + classShiftStr);
-            // 查询钢坯基础是否存在
-            LambdaQueryWrapper<BilletBasicInfoException> queryWrapper = new LambdaQueryWrapper<BilletBasicInfoException>()
-                    .eq(BilletBasicInfoException::getBilletNo, billetBasicInfoException.getBilletNo())
-                    .eq(BilletBasicInfoException::getCcmNo, billetBasicInfoException.getCcmNo())
-                    .eq(BilletBasicInfoException::getHeatNo, billetBasicInfoException.getHeatNo())
-                    .orderByDesc(BilletBasicInfoException::getCreateTime).last("limit 1");
-            BilletBasicInfoException basicInfoExceptionData = baseMapper.selectOne(queryWrapper);
-            if (oConvertUtils.isNotEmpty(basicInfoExceptionData) && oConvertUtils.isNotEmpty(basicInfoExceptionData.getId())) { // 编辑
-                log.info("{}{}", "更新钢坯实绩异常数据最新一条中的班组班别:", basicInfoExceptionData.getShiftGroup() + " " + basicInfoExceptionData.getShift());
-                // 编辑钢坯基础信息
-                billetBasicInfoException.setId(basicInfoExceptionData.getId());
-                billetBasicInfoException.setBilletWeight(basicInfoExceptionData.getBilletWeight());
-                billetBasicInfoException.setWeight(basicInfoExceptionData.getWeight());
-                billetBasicInfoException.setShift(basicInfoExceptionData.getShift());
-                billetBasicInfoException.setActualLength(basicInfoExceptionData.getActualLength());
-                billetBasicInfoException.setThickness(basicInfoExceptionData.getThickness());
-                billetBasicInfoException.setSpec(basicInfoExceptionData.getSpec());
-                billetBasicInfoException.setGrade(basicInfoExceptionData.getGrade());
-                billetBasicInfoException.setShiftGroup(basicInfoExceptionData.getShiftGroup());
-                baseMapper.updateById(billetBasicInfoException);
-            } else {
-                // 查询定尺规则
-                LambdaQueryWrapper<BilletRulerConfig> queryWrapperbilletRulerConfig = new LambdaQueryWrapper<BilletRulerConfig>()
-                        .eq(BilletRulerConfig::getLength, billetBasicInfoException.getLength());
-                BilletRulerConfig billetRulerConfig = billetRulerConfigMapper.selectOne(queryWrapperbilletRulerConfig);
-                Double weight = 0.0;
-                if (oConvertUtils.isNotEmpty(billetRulerConfig)) {
-                    weight = billetRulerConfig.getWeight();
-                }
-                // 新增数据
-                billetBasicInfoException.setBilletWeight(weight);
-                billetBasicInfoException.setShift(classShiftStr);
-                billetBasicInfoException.setBrandNum(brandNumStr);
-                billetBasicInfoException.setShiftGroup(classShiftGroupStr);
-                log.info("{}{}", "新增钢坯实绩异常数据:", billetBasicInfoException.getShiftGroup() + " " + billetBasicInfoException.getShift());
-                baseMapper.insert(billetBasicInfoException);
+            // 查询定尺规则
+            LambdaQueryWrapper<BilletRulerConfig> queryWrapperbilletRulerConfig = new LambdaQueryWrapper<BilletRulerConfig>()
+                    .eq(BilletRulerConfig::getLength, billetBasicInfoException.getLength());
+            BilletRulerConfig billetRulerConfig = billetRulerConfigMapper.selectOne(queryWrapperbilletRulerConfig);
+            Double weight = 0.0;
+            if (oConvertUtils.isNotEmpty(billetRulerConfig)) {
+                weight = billetRulerConfig.getWeight();
             }
+            // 新增数据
+            billetBasicInfoException.setBilletWeight(weight);
+            billetBasicInfoException.setBrandNum(brandNumStr);
+            billetBasicInfoException.setShift(classShiftStr);
+            billetBasicInfoException.setShiftGroup(classShiftGroupStr);
+            log.info("{}{}", "新增钢坯实绩异常数据:", billetBasicInfoException.getShiftGroup() + " " + billetBasicInfoException.getShift());
+            baseMapper.insert(billetBasicInfoException);
             log.info("{}{}", "end钢坯实绩<异常>数据逻辑处理结束:", DateUtils.date2Str(new Date(), DateUtils.datetimeFormat.get()));
         } catch (Exception e) {
             log.error("自动化处理钢坯实绩异常数据时发生异常", e);

+ 66 - 59
zgzt-sys-java/jeecg-module-conn/src/main/java/org/jeecg/modules/watch/TaskWriteBilletWatch.java

@@ -329,7 +329,7 @@ public class TaskWriteBilletWatch {
             }, executorService);
 
             redisTemplate.opsForValue().set(billetDataQueryTimeKey, queryEndTime);
-            
+
             // 处理异步结果(阻塞获取,需处理异常)
             List<Map<String, Object>> fiveChangeShiftData = fiveChangeShiftFuture.get(30, TimeUnit.SECONDS); // 设置超时时间
             List<Map<String, Object>> sixChangeShiftData = sixChangeShiftFuture.get(30, TimeUnit.SECONDS); // 设置超时时间
@@ -358,69 +358,77 @@ public class TaskWriteBilletWatch {
             if (oConvertUtils.listIsEmpty(mergedData)){
                 return;
             }
+            log.info("{}{}", ">>>>>>>>获取C端时间范围:", queryCacheStartTime + "<|>" + queryEndTime);
+            log.info("{}{}", ">>>>>>>>获取C端所有数据结果:", JSON.toJSON(mergedData));
+            // 使用 CompletionStage 链式处理,确保按顺序执行
+            CompletableFuture<Void> processingChain = CompletableFuture.completedFuture(null);
 
-            // 用于跟踪所有异步任务
-            List<CompletableFuture<Void>> futures = new ArrayList<>(mergedData.size());
             for (Map<String, Object> map : mergedData) {
-                CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+                // 为每个数据项创建一个新的 CompletableFuture,并将其链接到前一个
+                processingChain = processingChain.thenComposeAsync(ignored -> {
                     String dataSource = (String) map.get("dataSource");
-                    if (dataSource == null) {
-                        log.warn("未识别的C端Mysql数据来源: {}", map);
-                        return;
-                    }
-                    try {
-                        switch (dataSource) {
-                            case "fiveChangeShift":
-                                processFiveChangeShift(map);
-                                break;
-                            case "sixChangeShift":
-                                processSixChangeShift(map);
-                                break;
-                            case "billet":
-                                processBillet(map);
-                                break;
-                            case "heats":
-                                processHeats(map);
-                                break;
-                            case "assembly":
-                                processAssembly(map);
-                                break;
-                            case "storageBill":
-                                processStorageBill(map);
-                                break;
-                            case "updatePlate":
-                                processUpdatePlate(map);
-                                break;
-                            case "rollClubOne":
-                                processRollClubOne(map);
-                                break;
-                            case "rollHeignt":
-                                processRollHeight(map);
-                                break;
-                            case "hotCharge":
-                                processHotCharge(map);
-                                break;
-                            case "stackingUp":
-                                processStackingUp(map);
-                                break;
-                            case "stackingLoading":
-                                processStackingLoading(map);
-                                break;
-                            case "parkLotCarGo":
-                                processParkLotCarGo(map);
-                                break;
-                            default:
-                                log.warn("未知的数据来源: {}", dataSource);
+                    LocalDateTime createdAt = (LocalDateTime) map.get("created_at");
+
+                    log.info("开始处理数据 [数据源: {}, created_at: {}]", dataSource, createdAt);
+
+                    return CompletableFuture.runAsync(() -> {
+                        if (dataSource == null) {
+                            log.warn("未识别的C端Mysql数据来源: {}", map);
+                            return;
                         }
-                    } catch (Exception e) {
-                        log.error("处理数据失败 [数据源: {}, 数据: {}]", dataSource, map, e);
-                    }
+                        try {
+                            switch (dataSource) {
+                                case "fiveChangeShift":
+                                    processFiveChangeShift(map);
+                                    break;
+                                case "sixChangeShift":
+                                    processSixChangeShift(map);
+                                    break;
+                                case "billet":
+                                    processBillet(map);
+                                    break;
+                                case "heats":
+                                    processHeats(map);
+                                    break;
+                                case "assembly":
+                                    processAssembly(map);
+                                    break;
+                                case "storageBill":
+                                    processStorageBill(map);
+                                    break;
+                                case "updatePlate":
+                                    processUpdatePlate(map);
+                                    break;
+                                case "rollClubOne":
+                                    processRollClubOne(map);
+                                    break;
+                                case "rollHeignt":
+                                    processRollHeight(map);
+                                    break;
+                                case "hotCharge":
+                                    processHotCharge(map);
+                                    break;
+                                case "stackingUp":
+                                    processStackingUp(map);
+                                    break;
+                                case "stackingLoading":
+                                    processStackingLoading(map);
+                                    break;
+                                case "parkLotCarGo":
+                                    processParkLotCarGo(map);
+                                    break;
+                                default:
+                                    log.warn("未知的数据来源: {}", dataSource);
+                            }
+                        } catch (Exception e) {
+                            log.error("处理数据失败 [数据源: {}, created_at: {}]", dataSource, createdAt, e);
+                        }
+                    }, executorService);
                 }, executorService);
-                futures.add(future);
             }
-
-            // 等待所有数据处理任务完成
-            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
+            // 等待所有处理完成
+            processingChain.get();
+            log.info("所有数据按顺序处理完成");
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             log.warn("主线程被中断: " + e.getMessage());
@@ -436,7 +444,6 @@ public class TaskWriteBilletWatch {
     }
 
 
-
     private void processFiveChangeShift(Map<String, Object> map) {
         JSONObject jsonObject = new JSONObject(map);
         billetHotsendChangeShiftService.autoChangeShift(jsonObject);