Browse Source

完善轧钢平台钢坯接收接口在项目重启时停止对定时任务的执行以及抛出相应异常

lingpeng.li 2 months ago
parent
commit
ee197d02b6

+ 0 - 3
zgztBus/jeecg-module-sbm/src/main/java/org/jeecg/modules/actualControl/heatsActuals/controller/BilletSendRecordController.java

@@ -160,8 +160,6 @@ public class BilletSendRecordController extends JeecgController<BilletSendRecord
         return super.importExcel(request, response, BilletSendRecord.class);
     }
 
-
-    @Scheduled(cron = "0 */5 * * * ?") // 每5分钟触发一次
     @ApiOperation(value = "轧钢平台钢坯接收接口", notes = "轧钢平台钢坯接收接口")
     @GetMapping(value = "/sendData")
     public Result<IPage<BilletSendRecord>> sendData() {
@@ -169,5 +167,4 @@ public class BilletSendRecordController extends JeecgController<BilletSendRecord
         return Result.OK("发送成功");
     }
 
-
 }

+ 425 - 0
zgztBus/jeecg-module-sbm/src/main/java/org/jeecg/modules/actualControl/heatsActuals/service/impl/BilletSendProcessor.java

@@ -0,0 +1,425 @@
+package org.jeecg.modules.actualControl.heatsActuals.service.impl;
+
+import com.alibaba.fastjson.JSON;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.jeecg.modules.actualControl.billetActual.billetActual.entity.BilletBasicInfo;
+import org.jeecg.modules.actualControl.billetActual.billetActual.mapper.BilletBasicInfoMapper;
+import org.jeecg.modules.actualControl.heatsActuals.dto.BilletSendRequest;
+import org.jeecg.modules.actualControl.heatsActuals.dto.BilletSendResponse;
+import org.jeecg.modules.actualControl.heatsActuals.entity.BilletSendRecord;
+import org.jeecg.modules.actualControl.heatsActuals.entity.BilletSendRequestLog;
+import org.jeecg.modules.actualControl.heatsActuals.entity.HeatsActuals;
+import org.jeecg.modules.actualControl.heatsActuals.mapper.BilletSendRecordMapper;
+import org.jeecg.modules.actualControl.heatsActuals.mapper.BilletSendRequestLogMapper;
+import org.jeecg.modules.actualControl.heatsActuals.mapper.HeatsActualsMapper;
+import org.jeecg.modules.billet.rollClubThree.entity.RollClubThreeDetails;
+import org.jeecg.modules.billet.rollClubThree.mapper.RollClubThreeDetailsMapper;
+import org.jeecg.modules.billet.rollClubTwo.entity.RollClubTwoDetails;
+import org.jeecg.modules.billet.rollClubTwo.mapper.RollClubTwoDetailsMapper;
+import org.jeecg.modules.billet.rollOutShipp.entity.RollOutShippDetails;
+import org.jeecg.modules.billet.rollOutShipp.mapper.RollOutShippDetailsMapper;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.web.client.RestTemplate;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * 专门处理发送逻辑
+ */
+@Service
+@Slf4j
+public class BilletSendProcessor {
+
+    @Autowired
+    private RestTemplate restTemplate;
+    @Autowired
+    private BilletBasicInfoMapper billetBasicInfoMapper;
+    @Autowired
+    private BilletSendRecordMapper billetSendRecordMapper;
+    @Autowired
+    private HeatsActualsMapper heatsActualsMapper;
+    @Autowired
+    private RollClubTwoDetailsMapper rollClubTwoDetailsMapper;
+    @Autowired
+    private RollClubThreeDetailsMapper rollClubThreeDetailsMapper;
+    @Autowired
+    private RollOutShippDetailsMapper rollOutShippDetailsMapper;
+    @Autowired
+    private BilletSendRequestLogMapper billetSendRequestLogMapper;
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+
+    @Transactional(rollbackFor = Exception.class)
+    public List<BilletSendResponse> sendFromHeatsActuals(HeatsActuals heats) {
+        List<BilletSendResponse> allResponses = new ArrayList<>();
+        String heatNo = heats.getHeatsCode();
+
+        // 查询全量钢坯基础信息
+        List<BilletBasicInfo> billetBasicInfos = selectByHeatNo(heatNo);
+        int totalBilletCount = selectByHeatNoAll(heatNo).size();
+
+        // 查询历史已推送数量
+        int historyPushedCount = getHistoryPushedBilletCount(heatNo);
+
+        int newPushedCount = 0; // 本次推送新增数量
+
+        // 根据 belongTable 分组
+        Map<String, List<BilletBasicInfo>> belongTableGroupMap = billetBasicInfos.stream()
+                .collect(Collectors.groupingBy(BilletBasicInfo::getBelongTable));
+
+        for (Map.Entry<String, List<BilletBasicInfo>> belongEntry : belongTableGroupMap.entrySet()) {
+            String belongTable = belongEntry.getKey();
+            List<BilletBasicInfo> infosInBelongTable = belongEntry.getValue();
+
+            BilletSendRequest request = buildBaseRequest(heats, belongTable);
+
+            List<BilletSendRequest.SizeData> sizeDataList = new ArrayList<>();
+
+            Map<String, Map<Integer, List<BilletBasicInfo>>> regrouped = regroupByTransportTypeAndLength(heats, belongTable, infosInBelongTable);
+
+            for (Map.Entry<String, Map<Integer, List<BilletBasicInfo>>> typeEntry : regrouped.entrySet()) {
+                String transportTypeStr = typeEntry.getKey();
+                Map<Integer, List<BilletBasicInfo>> lengthMap = typeEntry.getValue();
+
+                for (Map.Entry<Integer, List<BilletBasicInfo>> lenEntry : lengthMap.entrySet()) {
+                    Integer length = lenEntry.getKey();
+                    List<BilletBasicInfo> billets = lenEntry.getValue();
+
+                    List<String> billetsToPush = filterAlreadyPushedBillets(heatNo, request.getRollNo(), length, transportTypeStr, billets);
+
+                    if (billetsToPush.isEmpty()) {
+//                        log.info("炉次:{},分组:{} 长度:{} 送坯方式:{},无新增钢坯可推送", heatNo, belongTable, length, transportTypeStr);
+                        continue;
+                    }
+
+                    BilletSendRequest.SizeData sizeData = buildSizeData(length, transportTypeStr, billets, billetsToPush);
+                    sizeDataList.add(sizeData);
+
+//                    log.info("准备推送炉次:{},分组:{},长度:{},送坯方式:{},数量:{}",
+//                            heatNo, belongTable, length, transportTypeStr, billetsToPush.size());
+
+
+                    newPushedCount += billetsToPush.size();
+                }
+            }
+
+            if (!sizeDataList.isEmpty()) {
+                request.setDataList(sizeDataList);
+
+                // 远程调用
+                BilletSendResponse response = sendRequestAndLog(request, belongTable);
+                if (response != null) {
+                    // 远程调用成功后,保存或更新推送日志
+                    log.info("远程推送成功,炉次:{},belongTable:{},返回:{}", heatNo, belongTable, JSON.toJSONString(response));
+                    savePushLogAfterSuccess(heatNo, request.getRollNo(), heats.getSpec(), sizeDataList);
+                    allResponses.add(response);
+                } else {
+                    log.error("远程调用失败,炉次:{},belongTable:{}", heatNo, belongTable);
+                    throw new RuntimeException("远程调用失败,heatNo: " + heatNo + ", belongTable: " + belongTable);
+                }
+            }
+        }
+
+        // 校验推送完整性
+        if ((historyPushedCount + newPushedCount) == totalBilletCount) {
+            UpdateWrapper<HeatsActuals> updateWrapper = new UpdateWrapper<>();
+            updateWrapper.eq("heats_code", heatNo).set("complete_status", 0);
+            heatsActualsMapper.update(null, updateWrapper);
+            log.info("炉次 {} 所有钢坯已完成推送,更新 complete_status 为 0", heatNo);
+        } else {
+//            log.info("炉次 {} 当前推送进度:历史 {} + 本次 {} / 总数 {}", heatNo, historyPushedCount, newPushedCount, totalBilletCount);
+        }
+
+        return allResponses;
+    }
+
+
+    public List<BilletBasicInfo> selectByHeatNo(String heatNo) {
+
+        LambdaQueryWrapper<BilletBasicInfo> infoQueryWrapper = new LambdaQueryWrapper<>();
+        infoQueryWrapper
+                .eq(BilletBasicInfo::getHeatNo, heatNo)
+                .isNotNull(BilletBasicInfo::getBelongTable)
+                .ne(BilletBasicInfo::getBelongTable, "")
+                .ne(BilletBasicInfo::getBelongTable, "billet_auto_tmp")
+                .ne(BilletBasicInfo::getBelongTable, "stacking_and_loading_vehicles")
+                .ne(BilletBasicInfo::getBelongTable, "billet_hotsend_abandons")
+                .orderByDesc(BilletBasicInfo::getCreateTime);
+
+        List<BilletBasicInfo> billetBasicInfos = billetBasicInfoMapper.selectList(infoQueryWrapper);
+
+        return billetBasicInfos;
+    }
+
+    public List<BilletBasicInfo> selectByHeatNoAll(String heatNo) {
+
+        LambdaQueryWrapper<BilletBasicInfo> infoQueryWrapper = new LambdaQueryWrapper<>();
+        infoQueryWrapper
+                .eq(BilletBasicInfo::getHeatNo, heatNo)
+                .orderByDesc(BilletBasicInfo::getCreateTime);
+
+        List<BilletBasicInfo> billetBasicInfos = billetBasicInfoMapper.selectList(infoQueryWrapper);
+
+        return billetBasicInfos;
+    }
+
+    private int getHistoryPushedBilletCount(String heatNo) {
+        QueryWrapper<BilletSendRequestLog> query = new QueryWrapper<>();
+        query.eq("heat_no", heatNo);
+        List<BilletSendRequestLog> logs = billetSendRequestLogMapper.selectList(query);
+        return logs.stream()
+                .filter(log -> StringUtils.isNotBlank(log.getBilletList()))
+                .mapToInt(log -> log.getBilletList().split(",").length)
+                .sum();
+    }
+
+    private BilletSendRequest buildBaseRequest(HeatsActuals heats, String belongTable) {
+        BilletSendRequest request = new BilletSendRequest();
+        request.setHeatNo(heats.getHeatsCode());
+        request.setSteelGrade(heats.getGrade());
+        request.setSpecification(heats.getSpec());
+        request.setSendTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
+        request.setRollNo(convertRollNo(belongTable));
+        return request;
+    }
+
+    private Map<String, Map<Integer, List<BilletBasicInfo>>> regroupByTransportTypeAndLength(HeatsActuals heats, String belongTable, List<BilletBasicInfo> billets) {
+        Map<String, Map<Integer, List<BilletBasicInfo>>> regrouped = new HashMap<>();
+        Map<Integer, List<BilletBasicInfo>> byLength = billets.stream()
+                .collect(Collectors.groupingBy(BilletBasicInfo::getLength));
+        for (Map.Entry<Integer, List<BilletBasicInfo>> lenEntry : byLength.entrySet()) {
+            Integer length = lenEntry.getKey();
+            List<BilletBasicInfo> billetsInLength = lenEntry.getValue();
+            for (BilletBasicInfo billet : billetsInLength) {
+                int transportType = determineTransportBilletType(belongTable, heats.getHeatsCode(), billet.getBilletNo());
+                String key = String.valueOf(transportType);
+                regrouped.computeIfAbsent(key, k -> new HashMap<>())
+                        .computeIfAbsent(length, k -> new ArrayList<>())
+                        .add(billet);
+            }
+        }
+        return regrouped;
+    }
+
+    private List<String> filterAlreadyPushedBillets(String heatNo, String rollNo, Integer length, String transportTypeStr, List<BilletBasicInfo> billets) {
+        QueryWrapper<BilletSendRequestLog> logQuery = new QueryWrapper<>();
+        logQuery.eq("heat_no", heatNo)
+                .eq("roll_no", rollNo)
+                .eq("length", length)
+                .eq("transport_billet_type", Integer.parseInt(transportTypeStr));
+        BilletSendRequestLog existingLog = billetSendRequestLogMapper.selectOne(logQuery);
+
+        Set<String> pushedBillets = new HashSet<>();
+        if (existingLog != null && StringUtils.isNotBlank(existingLog.getBilletList())) {
+            pushedBillets.addAll(Arrays.asList(existingLog.getBilletList().split(",")));
+        }
+
+        return billets.stream()
+                .map(BilletBasicInfo::getBilletNo)
+                .filter(billetNo -> !pushedBillets.contains(billetNo))
+                .collect(Collectors.toList());
+    }
+
+    private BilletSendRequest.SizeData buildSizeData(Integer length, String transportTypeStr, List<BilletBasicInfo> allBillets, List<String> billetsToPush) {
+        BilletSendRequest.SizeData sizeData = new BilletSendRequest.SizeData();
+        sizeData.setLength(length);
+        sizeData.setTransportBilletType(Integer.parseInt(transportTypeStr));
+        sizeData.setBilletList(billetsToPush);
+
+        double avgWeightTon = allBillets.stream()
+                .filter(billet -> billetsToPush.contains(billet.getBilletNo()))
+                .map(BilletBasicInfo::getBilletWeight)
+                .filter(Objects::nonNull)
+                .mapToDouble(b -> b.doubleValue())
+                .average()
+                .orElse(0.0);
+
+        // 转换为 kg 并保留 4 位小数
+        BigDecimal perWeightKg = BigDecimal.valueOf(avgWeightTon * 1000).setScale(4, RoundingMode.HALF_UP);
+        sizeData.setPerWeight(perWeightKg.doubleValue());
+
+        return sizeData;
+    }
+
+
+    // 新增方法:远程调用成功后保存或更新推送日志
+    private void savePushLogAfterSuccess(String heatNo, String rollNo, String spec, List<BilletSendRequest.SizeData> sizeDataList) {
+        for (BilletSendRequest.SizeData sizeData : sizeDataList) {
+            QueryWrapper<BilletSendRequestLog> query = new QueryWrapper<>();
+            query.eq("heat_no", heatNo)
+                    .eq("roll_no", rollNo)
+                    .eq("length", sizeData.getLength())
+                    .eq("transport_billet_type", sizeData.getTransportBilletType());
+            BilletSendRequestLog currentLog = billetSendRequestLogMapper.selectOne(query);
+
+            if (currentLog == null) {
+                BilletSendRequestLog newLog = new BilletSendRequestLog();
+                newLog.setHeatNo(heatNo);
+                newLog.setRollNo(rollNo);
+                newLog.setSpecification(spec);
+                newLog.setLength(sizeData.getLength());
+                newLog.setSendTime(new Date());
+                newLog.setTransportBilletType(sizeData.getTransportBilletType());
+                newLog.setPerWeight(sizeData.getPerWeight());
+                newLog.setBilletListCount(sizeData.getBilletList().size());
+                newLog.setBilletList(String.join(",", sizeData.getBilletList()));
+                billetSendRequestLogMapper.insert(newLog);
+                log.info("插入推送日志成功,炉号:{},产线标识:{},规格:{},长度:{},送坯方式:{},钢坯数量:{}",
+                        heatNo, rollNo, spec, sizeData.getLength(), sizeData.getTransportBilletType(), sizeData.getBilletList().size());
+            } else {
+                Set<String> updatedBillets = new HashSet<>(Arrays.asList(currentLog.getBilletList().split(",")));
+                updatedBillets.addAll(sizeData.getBilletList());
+                currentLog.setSendTime(new Date());
+                currentLog.setBilletListCount(updatedBillets.size());
+                currentLog.setBilletList(String.join(",", updatedBillets));
+                billetSendRequestLogMapper.updateById(currentLog);
+                log.info("更新推送日志成功,炉号:{},产线标识:{},规格:{},长度:{},送坯方式:{},累计钢坯数量:{}",
+                        heatNo, rollNo, spec, sizeData.getLength(), sizeData.getTransportBilletType(), updatedBillets.size());
+            }
+        }
+    }
+
+
+    private BilletSendResponse sendRequestAndLog(BilletSendRequest request, String belongTable) {
+        final int maxRetries = 3;
+        final long baseDelayMillis = 3000;
+
+        int attempt = 0;
+        while (attempt < maxRetries) {
+            try {
+                String url = "http://192.168.0.111:8850/sr/exchange/receiveBillet";
+                ResponseEntity<BilletSendResponse> response = restTemplate.postForEntity(url, request, BilletSendResponse.class);
+
+                BilletSendResponse body = response.getBody();
+                saveSendRecord(body, request);
+//                log.info("推送成功,响应信息: {}", body);
+
+                return body;
+            } catch (Exception e) {
+                attempt++;
+                log.error("推送失败,重试第 {} 次,错误信息:{}", attempt, e.getMessage());
+
+                if (attempt >= maxRetries) {
+                    saveSendException(request.getHeatNo(), belongTable, e.getMessage());
+                    return null;
+                }
+
+                try {
+                    Thread.sleep(baseDelayMillis * attempt);
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    saveSendException(request.getHeatNo(), belongTable, "重试等待被中断: " + ie.getMessage());
+                    return null;
+                }
+            }
+        }
+        return null;
+    }
+
+    private void saveSendRecord(BilletSendResponse response, BilletSendRequest request) throws JsonProcessingException {
+        BilletSendRecord record = new BilletSendRecord();
+        record.setHeatNo(request.getHeatNo());
+        record.setRollNo(request.getRollNo());
+        record.setCode(response.getCode());
+        record.setMsg(response.getMsg());
+        record.setRequestJson(objectMapper.writeValueAsString(request));
+        billetSendRecordMapper.insert(record);
+    }
+
+    private void saveSendException(String heatNo, String rollNo, String errorMsg) {
+        BilletSendRecord record = new BilletSendRecord();
+        record.setHeatNo(heatNo);
+        record.setRollNo(rollNo);
+        record.setCode(590);
+        record.setMsg(errorMsg);
+        record.setRequestJson("异常: " + errorMsg);
+        billetSendRecordMapper.insert(record);
+    }
+
+    private String convertRollNo(String rawRollNo) {
+        if (!ROLL_NO_MAP.containsKey(rawRollNo)) {
+
+        }
+        return ROLL_NO_MAP.get(rawRollNo);
+    }
+
+    private int determineTransportBilletType(String belongTable, String heatsCode, String billetNo) {
+        if ("roll_club_two".equals(belongTable)) {
+            return queryRollClubTwoDetails(heatsCode, billetNo);
+        } else if ("roll_club_three".equals(belongTable)) {
+            return queryRollClubThreeDetails(heatsCode, billetNo);
+        } else if ("roll_out_shipp".equals(belongTable)) {
+            return queryRollOutShippDetails(heatsCode, billetNo);
+        } else {
+            return 2; // 其他情况
+        }
+    }
+
+    private int queryRollClubTwoDetails(String heatsCode, String billetNo) {
+        LambdaQueryWrapper<RollClubTwoDetails> wrapper = new LambdaQueryWrapper<>();
+        wrapper.eq(RollClubTwoDetails::getHeatNo, heatsCode)
+                .like(RollClubTwoDetails::getBilletNo, billetNo)
+                .last("limit 1");
+
+        RollClubTwoDetails detail = rollClubTwoDetailsMapper.selectOne(wrapper);
+        if (detail == null || StringUtils.isBlank(detail.getStackAddr())) {
+            return 1;
+        } else {
+            return 0;
+        }
+    }
+
+    private int queryRollClubThreeDetails(String heatsCode, String billetNo) {
+        LambdaQueryWrapper<RollClubThreeDetails> wrapper = new LambdaQueryWrapper<>();
+        wrapper.eq(RollClubThreeDetails::getHeatNo, heatsCode)
+                .like(RollClubThreeDetails::getBilletNo, billetNo)
+                .last("limit 1");
+
+        RollClubThreeDetails detail = rollClubThreeDetailsMapper.selectOne(wrapper);
+        if (detail == null || StringUtils.isBlank(detail.getStackAddr())) {
+            return 1;
+        } else {
+            return 0;
+        }
+    }
+
+
+    private int queryRollOutShippDetails(String heatsCode, String billetNo) {
+        LambdaQueryWrapper<RollOutShippDetails> wrapper = new LambdaQueryWrapper<>();
+        wrapper.eq(RollOutShippDetails::getHeatNo, heatsCode)
+                .like(RollOutShippDetails::getBilletNo, billetNo)
+                .last("limit 1");
+
+        RollOutShippDetails detail = rollOutShippDetailsMapper.selectOne(wrapper);
+        if (detail == null || StringUtils.isBlank(detail.getStackAddr())) {
+            return 1;
+        } else {
+            return 0;
+        }
+    }
+
+    private static final Map<String, String> ROLL_NO_MAP = new HashMap<>();
+
+    static {
+        ROLL_NO_MAP.put("roll_club_one", "T");
+        ROLL_NO_MAP.put("roll_club_two", "TA");
+        ROLL_NO_MAP.put("roll_club_three", "TB");
+        ROLL_NO_MAP.put("roll_height", "G");
+        ROLL_NO_MAP.put("板带", "YL");
+        ROLL_NO_MAP.put("roll_out_shipp", "S");
+    }
+}

+ 6 - 403
zgztBus/jeecg-module-sbm/src/main/java/org/jeecg/modules/actualControl/heatsActuals/service/impl/BilletSendRecordServiceImpl.java

@@ -1,42 +1,21 @@
 package org.jeecg.modules.actualControl.heatsActuals.service.impl;
 
-import com.alibaba.fastjson.JSON;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
-import org.jeecg.modules.actualControl.billetActual.billetActual.entity.BilletBasicInfo;
 import org.jeecg.modules.actualControl.billetActual.billetActual.mapper.BilletBasicInfoMapper;
-import org.jeecg.modules.actualControl.heatsActuals.dto.BilletSendRequest;
-import org.jeecg.modules.actualControl.heatsActuals.dto.BilletSendResponse;
 import org.jeecg.modules.actualControl.heatsActuals.entity.BilletSendRecord;
-import org.jeecg.modules.actualControl.heatsActuals.entity.BilletSendRequestLog;
 import org.jeecg.modules.actualControl.heatsActuals.entity.HeatsActuals;
 import org.jeecg.modules.actualControl.heatsActuals.mapper.BilletSendRecordMapper;
-import org.jeecg.modules.actualControl.heatsActuals.mapper.BilletSendRequestLogMapper;
 import org.jeecg.modules.actualControl.heatsActuals.mapper.HeatsActualsMapper;
 import org.jeecg.modules.actualControl.heatsActuals.service.IBilletSendRecordService;
-import org.jeecg.modules.billet.rollClubThree.entity.RollClubThreeDetails;
 import org.jeecg.modules.billet.rollClubThree.mapper.RollClubThreeDetailsMapper;
-import org.jeecg.modules.billet.rollClubTwo.entity.RollClubTwoDetails;
 import org.jeecg.modules.billet.rollClubTwo.mapper.RollClubTwoDetailsMapper;
-import org.jeecg.modules.billet.rollOutShipp.entity.RollOutShippDetails;
 import org.jeecg.modules.billet.rollOutShipp.mapper.RollOutShippDetailsMapper;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-import org.springframework.web.client.RestTemplate;
 
-import java.math.BigDecimal;
-import java.math.RoundingMode;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.stream.Collectors;
+import java.util.List;
 
 /**
  * @Description: 钢坯接收记录
@@ -48,24 +27,12 @@ import java.util.stream.Collectors;
 @Slf4j
 public class BilletSendRecordServiceImpl extends ServiceImpl<BilletSendRecordMapper, BilletSendRecord> implements IBilletSendRecordService {
 
-    @Autowired
-    private RestTemplate restTemplate;
-    @Autowired
-    private BilletBasicInfoMapper billetBasicInfoMapper;
-    @Autowired
-    private BilletSendRecordMapper billetSendRecordMapper;
+
     @Autowired
     private HeatsActualsMapper heatsActualsMapper;
-    @Autowired
-    private RollClubTwoDetailsMapper rollClubTwoDetailsMapper;
-    @Autowired
-    private RollClubThreeDetailsMapper rollClubThreeDetailsMapper;
-    @Autowired
-    private RollOutShippDetailsMapper rollOutShippDetailsMapper;
-    @Autowired
-    private BilletSendRequestLogMapper billetSendRequestLogMapper;
 
-    private final ObjectMapper objectMapper = new ObjectMapper();
+    @Autowired
+    private BilletSendProcessor billetSendProcessor;
 
     @Override
     public void sendHeatsActuals() {
@@ -78,372 +45,8 @@ public class BilletSendRecordServiceImpl extends ServiceImpl<BilletSendRecordMap
         List<HeatsActuals> heatsActuals = heatsActualsMapper.selectList(queryHeatsWrapper);
 
         for (HeatsActuals heatsActual : heatsActuals) {
-//            log.info("处理炉次:{}", heatsActual.getHeatsCode());
-            sendFromHeatsActuals(heatsActual);
-        }
-    }
-
-
-    @Transactional(rollbackFor = Exception.class)
-    public List<BilletSendResponse> sendFromHeatsActuals(HeatsActuals heats) {
-        List<BilletSendResponse> allResponses = new ArrayList<>();
-        String heatNo = heats.getHeatsCode();
-
-        // 查询全量钢坯基础信息
-        List<BilletBasicInfo> billetBasicInfos = selectByHeatNo(heatNo);
-        int totalBilletCount = selectByHeatNoAll(heatNo).size();
-
-        // 查询历史已推送数量
-        int historyPushedCount = getHistoryPushedBilletCount(heatNo);
-
-        int newPushedCount = 0; // 本次推送新增数量
-
-        // 根据 belongTable 分组
-        Map<String, List<BilletBasicInfo>> belongTableGroupMap = billetBasicInfos.stream()
-                .collect(Collectors.groupingBy(BilletBasicInfo::getBelongTable));
-
-        for (Map.Entry<String, List<BilletBasicInfo>> belongEntry : belongTableGroupMap.entrySet()) {
-            String belongTable = belongEntry.getKey();
-            List<BilletBasicInfo> infosInBelongTable = belongEntry.getValue();
-
-            BilletSendRequest request = buildBaseRequest(heats, belongTable);
-
-            List<BilletSendRequest.SizeData> sizeDataList = new ArrayList<>();
-
-            Map<String, Map<Integer, List<BilletBasicInfo>>> regrouped = regroupByTransportTypeAndLength(heats, belongTable, infosInBelongTable);
-
-            for (Map.Entry<String, Map<Integer, List<BilletBasicInfo>>> typeEntry : regrouped.entrySet()) {
-                String transportTypeStr = typeEntry.getKey();
-                Map<Integer, List<BilletBasicInfo>> lengthMap = typeEntry.getValue();
-
-                for (Map.Entry<Integer, List<BilletBasicInfo>> lenEntry : lengthMap.entrySet()) {
-                    Integer length = lenEntry.getKey();
-                    List<BilletBasicInfo> billets = lenEntry.getValue();
-
-                    List<String> billetsToPush = filterAlreadyPushedBillets(heatNo, request.getRollNo(), length, transportTypeStr, billets);
-
-                    if (billetsToPush.isEmpty()) {
-//                        log.info("炉次:{},分组:{} 长度:{} 送坯方式:{},无新增钢坯可推送", heatNo, belongTable, length, transportTypeStr);
-                        continue;
-                    }
-
-                    BilletSendRequest.SizeData sizeData = buildSizeData(length, transportTypeStr, billets, billetsToPush);
-                    sizeDataList.add(sizeData);
-
-//                    log.info("准备推送炉次:{},分组:{},长度:{},送坯方式:{},数量:{}",
-//                            heatNo, belongTable, length, transportTypeStr, billetsToPush.size());
-
-
-                    newPushedCount += billetsToPush.size();
-                }
-            }
-
-            if (!sizeDataList.isEmpty()) {
-                request.setDataList(sizeDataList);
-
-                // 远程调用
-                BilletSendResponse response = sendRequestAndLog(request, belongTable);
-                if (response != null) {
-                    // 远程调用成功后,保存或更新推送日志
-                    log.info("远程推送成功,炉次:{},belongTable:{},返回:{}", heatNo, belongTable, JSON.toJSONString(response));
-                    savePushLogAfterSuccess(heatNo, request.getRollNo(), heats.getSpec(), sizeDataList);
-                    allResponses.add(response);
-                } else {
-                    log.error("远程调用失败,炉次:{},belongTable:{}", heatNo, belongTable);
-                }
-            }
-        }
-
-        // 校验推送完整性
-        if ((historyPushedCount + newPushedCount) == totalBilletCount) {
-            UpdateWrapper<HeatsActuals> updateWrapper = new UpdateWrapper<>();
-            updateWrapper.eq("heats_code", heatNo).set("complete_status", 0);
-            heatsActualsMapper.update(null, updateWrapper);
-            log.info("炉次 {} 所有钢坯已完成推送,更新 complete_status 为 0", heatNo);
-        } else {
-//            log.info("炉次 {} 当前推送进度:历史 {} + 本次 {} / 总数 {}", heatNo, historyPushedCount, newPushedCount, totalBilletCount);
-        }
-
-        return allResponses;
-    }
-
-    private int getHistoryPushedBilletCount(String heatNo) {
-        QueryWrapper<BilletSendRequestLog> query = new QueryWrapper<>();
-        query.eq("heat_no", heatNo);
-        List<BilletSendRequestLog> logs = billetSendRequestLogMapper.selectList(query);
-        return logs.stream()
-                .filter(log -> StringUtils.isNotBlank(log.getBilletList()))
-                .mapToInt(log -> log.getBilletList().split(",").length)
-                .sum();
-    }
-
-    private BilletSendRequest buildBaseRequest(HeatsActuals heats, String belongTable) {
-        BilletSendRequest request = new BilletSendRequest();
-        request.setHeatNo(heats.getHeatsCode());
-        request.setSteelGrade(heats.getGrade());
-        request.setSpecification(heats.getSpec());
-        request.setSendTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
-        request.setRollNo(convertRollNo(belongTable));
-        return request;
-    }
-
-    private Map<String, Map<Integer, List<BilletBasicInfo>>> regroupByTransportTypeAndLength(HeatsActuals heats, String belongTable, List<BilletBasicInfo> billets) {
-        Map<String, Map<Integer, List<BilletBasicInfo>>> regrouped = new HashMap<>();
-        Map<Integer, List<BilletBasicInfo>> byLength = billets.stream()
-                .collect(Collectors.groupingBy(BilletBasicInfo::getLength));
-        for (Map.Entry<Integer, List<BilletBasicInfo>> lenEntry : byLength.entrySet()) {
-            Integer length = lenEntry.getKey();
-            List<BilletBasicInfo> billetsInLength = lenEntry.getValue();
-            for (BilletBasicInfo billet : billetsInLength) {
-                int transportType = determineTransportBilletType(belongTable, heats.getHeatsCode(), billet.getBilletNo());
-                String key = String.valueOf(transportType);
-                regrouped.computeIfAbsent(key, k -> new HashMap<>())
-                        .computeIfAbsent(length, k -> new ArrayList<>())
-                        .add(billet);
-            }
-        }
-        return regrouped;
-    }
-
-    private List<String> filterAlreadyPushedBillets(String heatNo, String rollNo, Integer length, String transportTypeStr, List<BilletBasicInfo> billets) {
-        QueryWrapper<BilletSendRequestLog> logQuery = new QueryWrapper<>();
-        logQuery.eq("heat_no", heatNo)
-                .eq("roll_no", rollNo)
-                .eq("length", length)
-                .eq("transport_billet_type", Integer.parseInt(transportTypeStr));
-        BilletSendRequestLog existingLog = billetSendRequestLogMapper.selectOne(logQuery);
-
-        Set<String> pushedBillets = new HashSet<>();
-        if (existingLog != null && StringUtils.isNotBlank(existingLog.getBilletList())) {
-            pushedBillets.addAll(Arrays.asList(existingLog.getBilletList().split(",")));
-        }
-
-        return billets.stream()
-                .map(BilletBasicInfo::getBilletNo)
-                .filter(billetNo -> !pushedBillets.contains(billetNo))
-                .collect(Collectors.toList());
-    }
-
-    private BilletSendRequest.SizeData buildSizeData(Integer length, String transportTypeStr, List<BilletBasicInfo> allBillets, List<String> billetsToPush) {
-        BilletSendRequest.SizeData sizeData = new BilletSendRequest.SizeData();
-        sizeData.setLength(length);
-        sizeData.setTransportBilletType(Integer.parseInt(transportTypeStr));
-        sizeData.setBilletList(billetsToPush);
-
-        double avgWeightTon = allBillets.stream()
-                .filter(billet -> billetsToPush.contains(billet.getBilletNo()))
-                .map(BilletBasicInfo::getBilletWeight)
-                .filter(Objects::nonNull)
-                .mapToDouble(b -> b.doubleValue())
-                .average()
-                .orElse(0.0);
-
-        // 转换为 kg 并保留 4 位小数
-        BigDecimal perWeightKg = BigDecimal.valueOf(avgWeightTon * 1000).setScale(4, RoundingMode.HALF_UP);
-        sizeData.setPerWeight(perWeightKg.doubleValue());
-
-        return sizeData;
-    }
-
-
-    // 新增方法:远程调用成功后保存或更新推送日志
-    private void savePushLogAfterSuccess(String heatNo, String rollNo, String spec, List<BilletSendRequest.SizeData> sizeDataList) {
-        for (BilletSendRequest.SizeData sizeData : sizeDataList) {
-            QueryWrapper<BilletSendRequestLog> query = new QueryWrapper<>();
-            query.eq("heat_no", heatNo)
-                    .eq("roll_no", rollNo)
-                    .eq("length", sizeData.getLength())
-                    .eq("transport_billet_type", sizeData.getTransportBilletType());
-            BilletSendRequestLog currentLog = billetSendRequestLogMapper.selectOne(query);
-
-            if (currentLog == null) {
-                BilletSendRequestLog newLog = new BilletSendRequestLog();
-                newLog.setHeatNo(heatNo);
-                newLog.setRollNo(rollNo);
-                newLog.setSpecification(spec);
-                newLog.setLength(sizeData.getLength());
-                newLog.setSendTime(new Date());
-                newLog.setTransportBilletType(sizeData.getTransportBilletType());
-                newLog.setPerWeight(sizeData.getPerWeight());
-                newLog.setBilletListCount(sizeData.getBilletList().size());
-                newLog.setBilletList(String.join(",", sizeData.getBilletList()));
-                billetSendRequestLogMapper.insert(newLog);
-                log.info("插入推送日志成功,炉号:{},产线标识:{},规格:{},长度:{},送坯方式:{},钢坯数量:{}",
-                        heatNo, rollNo, spec, sizeData.getLength(), sizeData.getTransportBilletType(), sizeData.getBilletList().size());
-            } else {
-                Set<String> updatedBillets = new HashSet<>(Arrays.asList(currentLog.getBilletList().split(",")));
-                updatedBillets.addAll(sizeData.getBilletList());
-                currentLog.setSendTime(new Date());
-                currentLog.setBilletListCount(updatedBillets.size());
-                currentLog.setBilletList(String.join(",", updatedBillets));
-                billetSendRequestLogMapper.updateById(currentLog);
-                log.info("更新推送日志成功,炉号:{},产线标识:{},规格:{},长度:{},送坯方式:{},累计钢坯数量:{}",
-                        heatNo, rollNo, spec, sizeData.getLength(), sizeData.getTransportBilletType(), updatedBillets.size());
-            }
-        }
-    }
-
-
-    private BilletSendResponse sendRequestAndLog(BilletSendRequest request, String belongTable) {
-        final int maxRetries = 3;
-        final long baseDelayMillis = 3000;
-
-        int attempt = 0;
-        while (attempt < maxRetries) {
-            try {
-                String url = "http://192.168.0.111:8850/sr/exchange/receiveBillet";
-                ResponseEntity<BilletSendResponse> response = restTemplate.postForEntity(url, request, BilletSendResponse.class);
-
-                BilletSendResponse body = response.getBody();
-                saveSendRecord(body, request);
-//                log.info("推送成功,响应信息: {}", body);
-
-                return body;
-            } catch (Exception e) {
-                attempt++;
-                log.error("推送失败,重试第 {} 次,错误信息:{}", attempt, e.getMessage());
-
-                if (attempt >= maxRetries) {
-                    saveSendException(request.getHeatNo(), belongTable, e.getMessage());
-                    return null;
-                }
-
-                try {
-                    Thread.sleep(baseDelayMillis * attempt);
-                } catch (InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-                    saveSendException(request.getHeatNo(), belongTable, "重试等待被中断: " + ie.getMessage());
-                    return null;
-                }
-            }
-        }
-        return null;
-    }
-
-
-    private void saveSendRecord(BilletSendResponse response, BilletSendRequest request) throws JsonProcessingException {
-        BilletSendRecord record = new BilletSendRecord();
-        record.setHeatNo(response.getHeatNo());
-        record.setRollNo(response.getRollNo());
-        record.setCode(response.getCode());
-        record.setMsg(response.getMsg());
-        record.setRequestJson(objectMapper.writeValueAsString(request));
-        billetSendRecordMapper.insert(record);
-    }
-
-    private void saveSendException(String heatNo, String rollNo, String errorMsg) {
-        BilletSendRecord record = new BilletSendRecord();
-        record.setHeatNo(heatNo);
-        record.setRollNo(rollNo);
-        record.setCode(590);
-        record.setMsg(errorMsg);
-        record.setRequestJson("异常: " + errorMsg);
-        billetSendRecordMapper.insert(record);
-    }
-
-
-    public List<BilletBasicInfo> selectByHeatNo(String heatNo) {
-
-        LambdaQueryWrapper<BilletBasicInfo> infoQueryWrapper = new LambdaQueryWrapper<>();
-        infoQueryWrapper
-                .eq(BilletBasicInfo::getHeatNo, heatNo)
-                .isNotNull(BilletBasicInfo::getBelongTable)
-                .ne(BilletBasicInfo::getBelongTable, "")
-                .ne(BilletBasicInfo::getBelongTable, "billet_auto_tmp")
-                .ne(BilletBasicInfo::getBelongTable, "stacking_and_loading_vehicles")
-                .ne(BilletBasicInfo::getBelongTable, "billet_hotsend_abandons")
-                .orderByDesc(BilletBasicInfo::getCreateTime);
-
-        List<BilletBasicInfo> billetBasicInfos = billetBasicInfoMapper.selectList(infoQueryWrapper);
-
-        return billetBasicInfos;
-    }
-
-
-    public List<BilletBasicInfo> selectByHeatNoAll(String heatNo) {
-
-        LambdaQueryWrapper<BilletBasicInfo> infoQueryWrapper = new LambdaQueryWrapper<>();
-        infoQueryWrapper
-                .eq(BilletBasicInfo::getHeatNo, heatNo)
-                .orderByDesc(BilletBasicInfo::getCreateTime);
-
-        List<BilletBasicInfo> billetBasicInfos = billetBasicInfoMapper.selectList(infoQueryWrapper);
-
-        return billetBasicInfos;
-    }
-
-
-    private static final Map<String, String> ROLL_NO_MAP = new HashMap<>();
-
-    static {
-        ROLL_NO_MAP.put("roll_club_one", "T");
-        ROLL_NO_MAP.put("roll_club_two", "TA");
-        ROLL_NO_MAP.put("roll_club_three", "TB");
-        ROLL_NO_MAP.put("roll_height", "G");
-        ROLL_NO_MAP.put("板带", "YL");
-        ROLL_NO_MAP.put("roll_out_shipp", "S");
-    }
-
-
-    private String convertRollNo(String rawRollNo) {
-        if (!ROLL_NO_MAP.containsKey(rawRollNo)) {
-
-        }
-        return ROLL_NO_MAP.get(rawRollNo);
-    }
-
-    private int determineTransportBilletType(String belongTable, String heatsCode, String billetNo) {
-        if ("roll_club_two".equals(belongTable)) {
-            return queryRollClubTwoDetails(heatsCode, billetNo);
-        } else if ("roll_club_three".equals(belongTable)) {
-            return queryRollClubThreeDetails(heatsCode, billetNo);
-        } else if ("roll_out_shipp".equals(belongTable)) {
-            return queryRollOutShippDetails(heatsCode, billetNo);
-        } else {
-            return 2; // 其他情况
-        }
-    }
-
-    private int queryRollClubTwoDetails(String heatsCode, String billetNo) {
-        LambdaQueryWrapper<RollClubTwoDetails> wrapper = new LambdaQueryWrapper<>();
-        wrapper.eq(RollClubTwoDetails::getHeatNo, heatsCode)
-                .like(RollClubTwoDetails::getBilletNo, billetNo)
-                .last("limit 1");
-
-        RollClubTwoDetails detail = rollClubTwoDetailsMapper.selectOne(wrapper);
-        if (detail == null || StringUtils.isBlank(detail.getStackAddr())) {
-            return 1;
-        } else {
-            return 0;
-        }
-    }
-
-    private int queryRollClubThreeDetails(String heatsCode, String billetNo) {
-        LambdaQueryWrapper<RollClubThreeDetails> wrapper = new LambdaQueryWrapper<>();
-        wrapper.eq(RollClubThreeDetails::getHeatNo, heatsCode)
-                .like(RollClubThreeDetails::getBilletNo, billetNo)
-                .last("limit 1");
-
-        RollClubThreeDetails detail = rollClubThreeDetailsMapper.selectOne(wrapper);
-        if (detail == null || StringUtils.isBlank(detail.getStackAddr())) {
-            return 1;
-        } else {
-            return 0;
-        }
-    }
-
-
-    private int queryRollOutShippDetails(String heatsCode, String billetNo) {
-        LambdaQueryWrapper<RollOutShippDetails> wrapper = new LambdaQueryWrapper<>();
-        wrapper.eq(RollOutShippDetails::getHeatNo, heatsCode)
-                .like(RollOutShippDetails::getBilletNo, billetNo)
-                .last("limit 1");
-
-        RollOutShippDetails detail = rollOutShippDetailsMapper.selectOne(wrapper);
-        if (detail == null || StringUtils.isBlank(detail.getStackAddr())) {
-            return 1;
-        } else {
-            return 0;
+            log.info("处理炉次:{}", heatsActual.getHeatsCode());
+            billetSendProcessor.sendFromHeatsActuals(heatsActual);
         }
     }
 

+ 39 - 0
zgztBus/jeecg-module-sbm/src/main/java/org/jeecg/modules/actualControl/heatsActuals/task/BilletSendTask.java

@@ -0,0 +1,39 @@
+package org.jeecg.modules.actualControl.heatsActuals.task;
+
+import lombok.extern.slf4j.Slf4j;
+import org.jeecg.modules.actualControl.heatsActuals.service.IBilletSendRecordService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+
+@Component
+@Slf4j
+public class BilletSendTask {
+
+    @Autowired
+    private IBilletSendRecordService billetSendRecordService;
+
+    private volatile boolean running = true;
+
+    @Scheduled(cron = "0 */5 * * * ?")// 每5分钟一次
+    public void autoSendData() {
+        if (!running) {
+            log.info("定时任务已被停止,跳过执行");
+            return;
+        }
+        try {
+            billetSendRecordService.sendHeatsActuals();
+        } catch (Exception e) {
+            log.error("定时任务执行失败:", e);
+        }
+    }
+
+    // 优雅关闭任务
+    @PreDestroy
+    public void onShutdown() {
+        running = false;
+        log.info("项目关闭中,停止发送钢坯接收记录定时任务");
+    }
+}