liujie
2025-07-22 0156b6db1248403e43f58abc314e2bc5abccb267
ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/governmentCloud/UploadDataTaskUtil.java
@@ -1,14 +1,26 @@
package com.ruoyi.jianguan.governmentCloud;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.common.redis.service.RedisService;
import com.ruoyi.integration.api.model.*;
import com.ruoyi.jianguan.mongodb.service.*;
import com.ruoyi.jianguan.mqtt.config.TopicConstants;
import com.ruoyi.jianguan.mqtt.util.MqttPushUtil;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
/**
@@ -19,70 +31,28 @@
public class UploadDataTaskUtil {
    @Autowired
    private AcquisitionBillingModeService acquisitionBillingModeService;
    @Autowired
    private BillingModeVerifyService billingModeVerifyService;
    @Autowired
    private BmsAbortService bmsAbortService;
    @Autowired
    private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService;
    @Autowired
    private OnlineService onlineService;
    @Autowired
    private EndChargeService endChargeService;
    @Autowired
    private ErrorMessageMessageService errorMessageMessageService;
    @Autowired
    private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService;
    @Autowired
    private ChargingHandshakeService chargingHandshakeService;
    private MqttPushUtil mqttPushUtil;
    @Autowired
    private ParameterSettingService parameterSettingService;
    @Autowired
    private MotorAbortService motorAbortService;
    @Autowired
    private BmsInformationService bmsInformationService;
    @Autowired
    private ChargingPileStartsChargingService chargingPileStartsChargingService;
    @Autowired
    private PlatformStartChargingReplyService platformStartChargingReplyService;
    @Autowired
    private PlatformStopChargingReplyService platformStopChargingReplyService;
    @Autowired
    private TransactionRecordService transactionRecordService;
    @Autowired
    private UpdateBalanceReplyService updateBalanceReplyService;
    @Autowired
    private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService;
    @Autowired
    private ClearOfflineCardReplyService clearOfflineCardReplyService;
    @Autowired
    private WorkingParameterSettingReplyService workingParameterSettingReplyService;
    @Autowired
    private TimingSettingService timingSettingService;
    @Autowired
    private SetupBillingModelReplyService setupBillingModelReplyService;
    @Autowired
    private GroundLockRealTimeDataService groundLockRealTimeDataService;
    @Autowired
    private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService;
    @Autowired
    private PlatformRestartReplyService platformRestartReplyService;
    @Autowired
    private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService;
    @Autowired
    private QrCodeDeliveryReplyService qrCodeDeliveryReplyService;
    @Autowired
    private SecurityDetectionService securityDetectionService;
    private RedisService redisService;
    /**
     * 每天的9点执行的任务
     */
    @Scheduled(cron = "0 0 9 * * *")
    @Scheduled(fixedRate = 86400000)
    public void taskDay(){
        try {
            // 传输mongodb的硬件数据
            createCustomThreadPool();
            // 判断是否存在标识
            Thread.sleep(1000);
            String flag = redisService.getCacheObject("cloud_integration");
            System.out.println("市政云硬件数据传输标识:" + flag);
            if (Objects.isNull(flag)) {
                log.info("市政云硬件数据传输无标识,开始传输硬件数据");
                redisService.setCacheObject("cloud_integration", "1",24L,TimeUnit.HOURS);
                // 传输mongodb的硬件数据
                createCustomThreadPool();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
@@ -97,7 +67,7 @@
     * 3. 适用于需要定时执行或周期性执行的场景
     */
    @SneakyThrows
    public static void createCustomThreadPool() {
    public void createCustomThreadPool() {
        /*
          创建自定义线程池
          字段:
@@ -119,19 +89,30 @@
        );
        try {
            LocalDate localDate = LocalDate.now().minusDays(1);
            String start = localDate + " 00:00:00";
            String end = localDate + " 23:59:59";
            Date startTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(start);
            Date endTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(end);
            // 查询所有的mango数据
            List<UploadRealTimeMonitoringData> uploadRealTimeMonitoringDataList = uploadRealTimeMonitoringDataService.getRangeTimeData(startTime, endTime);
            System.err.println("查询所有的mango数据:"+uploadRealTimeMonitoringDataList.size());
            customthreadPoolExecutor.execute(() -> {
                if (uploadRealTimeMonitoringDataList != null && uploadRealTimeMonitoringDataList.size() > 0) {
                    for (UploadRealTimeMonitoringData uploadRealTimeMonitoringData : uploadRealTimeMonitoringDataList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", uploadRealTimeMonitoringData.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        jsonObject.put("properties", uploadRealTimeMonitoringData);
                        R<String> chargePileCode = mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", uploadRealTimeMonitoringData.getCharging_pile_code()), jsonObject.toJSONString());
                        log.info("返回结果:{}", chargePileCode.getData());
                        log.info("数据:{}", uploadRealTimeMonitoringData.getCharging_pile_code());
                        log.info("数据:{}", jsonObject.toJSONString());
                    }
                }
            });
            TimeUnit.MILLISECONDS.sleep(1);
//            Future<String> future = customthreadPoolExecutor.submit(() -> {
//                log.info("线程:{},办理业务", Thread.currentThread().getName());
//                return "业务办理完成";
//            });
//            log.info(future.get());
        } finally {
            gracefulShutdown(customthreadPoolExecutor);
        }
@@ -163,4 +144,8 @@
        }
    }
    public static void main(String[] args) {
        System.err.println(System.currentTimeMillis());
    }
}