xuhy
2025-05-08 4fa226a851baab95b44083208ff791b6160cda54
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
package com.ruoyi.jianguan.governmentCloud;
 
 
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.common.core.utils.DateUtils;
import com.ruoyi.integration.api.model.*;
import com.ruoyi.jianguan.mongodb.service.*;
import com.ruoyi.jianguan.mqtt.config.TopicConstants;
import com.ruoyi.jianguan.mqtt.util.MqttPushUtil;
import com.ruoyi.jianguan.rocket.model.ErrorMessageMessage;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
 
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.*;
 
/**
 * 定时上传政务云数据
 */
@Slf4j
@Component
public class UploadDataTaskUtil {
 
    @Autowired
    private AcquisitionBillingModeService acquisitionBillingModeService;
    @Autowired
    private BillingModeVerifyService billingModeVerifyService;
    @Autowired
    private BmsAbortService bmsAbortService;
    @Autowired
    private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService;
    @Autowired
    private OnlineService onlineService;
    @Autowired
    private EndChargeService endChargeService;
    @Autowired
    private ErrorMessageMessageService errorMessageMessageService;
    @Autowired
    private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService;
    @Autowired
    private ChargingHandshakeService chargingHandshakeService;
    @Autowired
    private ParameterSettingService parameterSettingService;
    @Autowired
    private MotorAbortService motorAbortService;
    @Autowired
    private BmsInformationService bmsInformationService;
    @Autowired
    private ChargingPileStartsChargingService chargingPileStartsChargingService;
    @Autowired
    private PlatformStartChargingReplyService platformStartChargingReplyService;
    @Autowired
    private PlatformStopChargingReplyService platformStopChargingReplyService;
    @Autowired
    private TransactionRecordService transactionRecordService;
    @Autowired
    private UpdateBalanceReplyService updateBalanceReplyService;
    @Autowired
    private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService;
    @Autowired
    private ClearOfflineCardReplyService clearOfflineCardReplyService;
    @Autowired
    private WorkingParameterSettingReplyService workingParameterSettingReplyService;
    @Autowired
    private TimingSettingService timingSettingService;
    @Autowired
    private SetupBillingModelReplyService setupBillingModelReplyService;
    @Autowired
    private GroundLockRealTimeDataService groundLockRealTimeDataService;
    @Autowired
    private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService;
    @Autowired
    private PlatformRestartReplyService platformRestartReplyService;
    @Autowired
    private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService;
    @Autowired
    private QrCodeDeliveryReplyService qrCodeDeliveryReplyService;
    @Autowired
    private SecurityDetectionService securityDetectionService;
    @Autowired
    private MqttPushUtil mqttPushUtil;
 
    /**
     * 每天的9点执行的任务
     */
    @Scheduled(cron = "0 0 9 * * *")
//    @Scheduled(fixedRate = 60000)
    public void taskDay(){
        try {
            // 传输mongodb的硬件数据
            createCustomThreadPool();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
 
 
    /**
     * 创建自定义线程池
     * 特点:
     * 1. 支持定时及周期性任务
     * 2. 核心线程数固定,但可以不断创建新线程执行后续任务
     * 3. 适用于需要定时执行或周期性执行的场景
     */
    @SneakyThrows
    public void createCustomThreadPool() {
        /*
          创建自定义线程池
          字段:
          1. corePoolSize:核心线程池数量
          2. maximumPoolSize: 最大线程池数量
          3. keepAliveTime: 线程空闲时间
          4. unit: 时间单位
          5. workQueue: 阻塞队列
          5. threadFactory: 线程工厂
          5. handler: 拒绝策略
         */
        ThreadPoolExecutor customthreadPoolExecutor = new ThreadPoolExecutor(
                5,                                              // 根据CPU核心数设置
                10,                                                         // 最大应急线程数
                30, TimeUnit.SECONDS,                                       // 空闲线程存活时间
                new ArrayBlockingQueue<>(100),                      // 有界队列防止内存溢出
                new DefaultThreadFactory("custom-thread-pool"),   // 自定义线程命名
                new ThreadPoolExecutor.CallerRunsPolicy()                   // 拒绝策略
        );
 
        try {
            LocalDate localDate = LocalDate.now().minusDays(1);
            String startTime = localDate + " 00:00:00";
            String endTime = localDate + " 23:59:59";
            // 查询所有的mango数据
            List<AcquisitionBillingMode> acquisitionBillingModeList = acquisitionBillingModeService.getRangeTimeData(startTime, endTime);
            List<BillingModeVerify> billingModeVerifyList = billingModeVerifyService.getRangeTimeData(startTime, endTime);
            List<BmsAbort> bmsAbortList = bmsAbortService.getRangeTimeData(startTime, endTime);
            List<BmsDemandAndChargerExportation> bmsDemandAndChargerExportationList = bmsDemandAndChargerExportationService.getRangeTimeData(startTime, endTime);
            List<Online> onlineList = onlineService.getRangeTimeData(startTime, endTime);
            List<EndCharge> endChargeList = endChargeService.getRangeTimeData(startTime, endTime);
            List<ErrorMessageMessage> errorMessageMessageList = errorMessageMessageService.getRangeTimeData(startTime, endTime);
            List<UploadRealTimeMonitoringData> uploadRealTimeMonitoringDataList = uploadRealTimeMonitoringDataService.getRangeTimeData(startTime, endTime);
            List<ChargingHandshake> chargingHandshakeList = chargingHandshakeService.getRangeTimeData(startTime, endTime);
            List<ParameterSetting> parameterSettingList = parameterSettingService.getRangeTimeData(startTime, endTime);
            List<MotorAbort> motorAbortList = motorAbortService.getRangeTimeData(startTime, endTime);
            List<BmsInformation> bmsInformationList = bmsInformationService.getRangeTimeData(startTime, endTime);
            List<ChargingPileStartsCharging> chargingPileStartsChargingList = chargingPileStartsChargingService.getRangeTimeData(startTime, endTime);
            List<PlatformStartChargingReply> platformStartChargingReplyList = platformStartChargingReplyService.getRangeTimeData(startTime, endTime);
            List<PlatformStopChargingReply> platformStopChargingReplyList = platformStopChargingReplyService.getRangeTimeData(startTime, endTime);
            List<TransactionRecord> transactionRecordList = transactionRecordService.getRangeTimeData(startTime, endTime);
            List<UpdateBalanceReply> updateBalanceReplyList = updateBalanceReplyService.getRangeTimeData(startTime, endTime);
            List<SynchronizeOfflineCardReply> synchronizeOfflineCardReplyList = synchronizeOfflineCardReplyService.getRangeTimeData(startTime, endTime);
            List<ClearOfflineCardReply> clearOfflineCardReplyList = clearOfflineCardReplyService.getRangeTimeData(startTime, endTime);
            List<WorkingParameterSettingReply> workingParameterSettingReplyList = workingParameterSettingReplyService.getRangeTimeData(startTime, endTime);
            List<TimingSetting> timingSettingList = timingSettingService.getRangeTimeData(startTime, endTime);
            List<SetupBillingModelReply> setupBillingModelReplyList = setupBillingModelReplyService.getRangeTimeData(startTime, endTime);
            List<GroundLockRealTimeData> groundLockRealTimeDataList = groundLockRealTimeDataService.getRangeTimeData(startTime, endTime);
            List<ChargingPileReturnsGroundLockData> chargingPileReturnsGroundLockDataList = chargingPileReturnsGroundLockDataService.getRangeTimeData(startTime, endTime);
            List<PlatformRestartReply> platformRestartReplyList = platformRestartReplyService.getRangeTimeData(startTime, endTime);
            List<PlatformRemoteUpdateReply> platformRemoteUpdateReplyList = platformRemoteUpdateReplyService.getRangeTimeData(startTime, endTime);
            List<QrCodeDeliveryReply> qrCodeDeliveryReplyList = qrCodeDeliveryReplyService.getRangeTimeData(startTime, endTime);
            List<SecurityDetection> securityDetectionList = securityDetectionService.getRangeTimeData(startTime, endTime);
            customthreadPoolExecutor.execute(() -> {
                if (acquisitionBillingModeList != null && acquisitionBillingModeList.size() > 0) {
                    for (AcquisitionBillingMode acquisitionBillingMode : acquisitionBillingModeList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", acquisitionBillingMode.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", acquisitionBillingMode.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
                if (billingModeVerifyList != null && billingModeVerifyList.size() > 0) {
                    for (BillingModeVerify billingModeVerify : billingModeVerifyList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", billingModeVerify.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", billingModeVerify.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
                if (bmsAbortList != null && bmsAbortList.size() > 0) {
                    for (BmsAbort bmsAbort : bmsAbortList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", bmsAbort.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", bmsAbort.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
                if (bmsDemandAndChargerExportationList != null && bmsDemandAndChargerExportationList.size() > 0) {
                    for (BmsDemandAndChargerExportation bmsDemandAndChargerExportation : bmsDemandAndChargerExportationList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", bmsDemandAndChargerExportation.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE",  bmsDemandAndChargerExportation.getCharging_pile_code()),  jsonObject.toJSONString());
                    }
                }
                if (onlineList != null && onlineList.size() > 0) {
                    for (Online online : onlineList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", online.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", online.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
                if (endChargeList != null && endChargeList.size() > 0) {
                    for (EndCharge endCharge : endChargeList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", endCharge.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", endCharge.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
                if (errorMessageMessageList != null && errorMessageMessageList.size() > 0) {
                    for (ErrorMessageMessage errorMessageMessage : errorMessageMessageList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", errorMessageMessage.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", errorMessageMessage.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
                if (uploadRealTimeMonitoringDataList != null && uploadRealTimeMonitoringDataList.size() > 0) {
                    for (UploadRealTimeMonitoringData uploadRealTimeMonitoringData : uploadRealTimeMonitoringDataList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", uploadRealTimeMonitoringData.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", uploadRealTimeMonitoringData.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
                if (chargingHandshakeList != null && chargingHandshakeList.size() > 0) {
                    for (ChargingHandshake chargingHandshake : chargingHandshakeList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", chargingHandshake.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", chargingHandshake.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
                if (parameterSettingList != null && parameterSettingList.size() > 0) {
                    for (ParameterSetting parameterSetting : parameterSettingList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", parameterSetting.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", parameterSetting.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
                if (motorAbortList != null && motorAbortList.size() > 0) {
                    for (MotorAbort motorAbort : motorAbortList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", motorAbort.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", motorAbort.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
                if (bmsInformationList != null && bmsInformationList.size() > 0) {
                    for (BmsInformation bmsInformation : bmsInformationList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", bmsInformation.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", bmsInformation.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
                if (chargingPileStartsChargingList != null && chargingPileStartsChargingList.size() > 0) {
                    for (ChargingPileStartsCharging chargingPileStartsCharging : chargingPileStartsChargingList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", chargingPileStartsCharging.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", chargingPileStartsCharging.getCharging_pile_code()),jsonObject.toJSONString());
                    }
                }
                if (platformStartChargingReplyList != null && platformStartChargingReplyList.size() > 0) {
                    for (PlatformStartChargingReply platformStartChargingReply : platformStartChargingReplyList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", platformStartChargingReply.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", platformStartChargingReply.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
                if (platformStopChargingReplyList != null && platformStopChargingReplyList.size() > 0) {
                    for (PlatformStopChargingReply platformStopChargingReply : platformStopChargingReplyList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", platformStopChargingReply.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", platformStopChargingReply.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
                if (transactionRecordList != null && transactionRecordList.size() > 0) {
                    for (TransactionRecord transactionRecord : transactionRecordList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", transactionRecord.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", transactionRecord.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
//                if (updateBalanceReplyList != null && updateBalanceReplyList.size() > 0) {
//                    for (UpdateBalanceReply updateBalanceReply : updateBalanceReplyList) {
//                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", updateBalanceReply.getCharging_pile_code()), JSONObject.toJSONString(updateBalanceReply));
//                    }
//                }
//                if (synchronizeOfflineCardReplyList != null && synchronizeOfflineCardReplyList.size() > 0) {
//                    for (SynchronizeOfflineCardReply synchronizeOfflineCardReply : synchronizeOfflineCardReplyList) {
//                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", synchronizeOfflineCardReply.getCharging_pile_code()), JSONObject.toJSONString(synchronizeOfflineCardReply));
//                    }
//                }
//                if (clearOfflineCardReplyList != null && clearOfflineCardReplyList.size() > 0) {
//                    for (ClearOfflineCardReply clearOfflineCardReply : clearOfflineCardReplyList) {
//                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", clearOfflineCardReply.getCharging_pile_code()), JSONObject.toJSONString(clearOfflineCardReply));
//                   }
//                }
                if (workingParameterSettingReplyList != null && workingParameterSettingReplyList.size() > 0) {
                    for (WorkingParameterSettingReply workingParameterSettingReply : workingParameterSettingReplyList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", workingParameterSettingReply.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", workingParameterSettingReply.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
                if (timingSettingList != null && timingSettingList.size() > 0) {
                    for (TimingSetting timingSetting : timingSettingList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", timingSetting.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", timingSetting.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
//                if (setupBillingModelReplyList != null && setupBillingModelReplyList.size() > 0) {
//                    for (SetupBillingModelReply setupBillingModelReply : setupBillingModelReplyList) {
//                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", setupBillingModelReply.getCharging_pile_code()), JSONObject.toJSONString(setupBillingModelReply));
//                    }
//                }
                if (groundLockRealTimeDataList != null && groundLockRealTimeDataList.size() > 0) {
                    for (GroundLockRealTimeData groundLockRealTimeData : groundLockRealTimeDataList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", groundLockRealTimeData.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", groundLockRealTimeData.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
                if (chargingPileReturnsGroundLockDataList != null && chargingPileReturnsGroundLockDataList.size() > 0) {
                    for (ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData : chargingPileReturnsGroundLockDataList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", chargingPileReturnsGroundLockData.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", chargingPileReturnsGroundLockData.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
                if (platformRestartReplyList != null && platformRestartReplyList.size() > 0) {
                    for (PlatformRestartReply platformRestartReply : platformRestartReplyList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", platformRestartReply.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", platformRestartReply.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
                if (platformRemoteUpdateReplyList != null && platformRemoteUpdateReplyList.size() > 0) {
                    for (PlatformRemoteUpdateReply platformRemoteUpdateReply : platformRemoteUpdateReplyList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", platformRemoteUpdateReply.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", platformRemoteUpdateReply.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
                if (qrCodeDeliveryReplyList != null && qrCodeDeliveryReplyList.size() > 0) {
                    for (QrCodeDeliveryReply qrCodeDeliveryReply : qrCodeDeliveryReplyList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", qrCodeDeliveryReply.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", qrCodeDeliveryReply.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
                if (securityDetectionList != null && securityDetectionList.size() > 0) {
                    for (SecurityDetection securityDetection : securityDetectionList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", securityDetection.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", securityDetection.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
 
            });
 
            TimeUnit.MILLISECONDS.sleep(1);
 
//            Future<String> future = customthreadPoolExecutor.submit(() -> {
//                log.info("线程:{},办理业务", Thread.currentThread().getName());
//                return "业务办理完成";
//            });
//            log.info(future.get());
        } finally {
            gracefulShutdown(customthreadPoolExecutor);
        }
    }
 
 
    /**
     * 优雅关闭线程池通用方法
     *
     * @param pool 需要关闭的线程池
     */
    private static void gracefulShutdown(ExecutorService pool) {
        pool.shutdown(); // 拒绝新任务提交
        try {
            // 等待现有任务完成
            if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                pool.shutdownNow(); // 取消等待中的任务 只等待运行中的任务
                // 再次等待任务响应中断
                if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                    log.error("线程池未完全关闭");
                }
            }
        } catch (InterruptedException e) {
            // 重新尝试关闭
            pool.shutdownNow();
            Thread.currentThread().interrupt();
        } finally {
            log.info("线程池是否执行完成:{}", pool.isTerminated());
        }
    }
 
    public static void main(String[] args) {
        System.err.println(System.currentTimeMillis());
    }
 
}