New file |
| | |
| | | package com.ruoyi.jianguan.governmentCloud; |
| | | |
| | | |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.ruoyi.common.core.utils.DateUtils; |
| | | import com.ruoyi.integration.api.model.*; |
| | | import com.ruoyi.jianguan.mongodb.service.*; |
| | | import com.ruoyi.jianguan.mqtt.config.TopicConstants; |
| | | import com.ruoyi.jianguan.mqtt.util.MqttPushUtil; |
| | | import com.ruoyi.jianguan.rocket.model.ErrorMessageMessage; |
| | | import io.netty.util.concurrent.DefaultThreadFactory; |
| | | import lombok.SneakyThrows; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.scheduling.annotation.Scheduled; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import java.text.ParseException; |
| | | import java.text.SimpleDateFormat; |
| | | import java.time.LocalDate; |
| | | import java.time.LocalDateTime; |
| | | import java.util.Date; |
| | | import java.util.List; |
| | | import java.util.concurrent.*; |
| | | |
| | | /** |
| | | * 定时上传政务云数据 |
| | | */ |
| | | @Slf4j |
| | | @Component |
| | | public class UploadDataTaskUtil { |
| | | |
| | | @Autowired |
| | | private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService; |
| | | @Autowired |
| | | private MqttPushUtil mqttPushUtil; |
| | | |
| | | /** |
| | | * 每天的9点执行的任务 |
| | | */ |
| | | // @Scheduled(cron = "0 0 9 * * *") |
| | | @Scheduled(fixedRate = 60000) |
| | | public void taskDay(){ |
| | | try { |
| | | // 传输mongodb的硬件数据 |
| | | createCustomThreadPool(); |
| | | }catch (Exception e){ |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 创建自定义线程池 |
| | | * 特点: |
| | | * 1. 支持定时及周期性任务 |
| | | * 2. 核心线程数固定,但可以不断创建新线程执行后续任务 |
| | | * 3. 适用于需要定时执行或周期性执行的场景 |
| | | */ |
| | | @SneakyThrows |
| | | public void createCustomThreadPool() { |
| | | /* |
| | | 创建自定义线程池 |
| | | 字段: |
| | | 1. corePoolSize:核心线程池数量 |
| | | 2. maximumPoolSize: 最大线程池数量 |
| | | 3. keepAliveTime: 线程空闲时间 |
| | | 4. unit: 时间单位 |
| | | 5. workQueue: 阻塞队列 |
| | | 5. threadFactory: 线程工厂 |
| | | 5. handler: 拒绝策略 |
| | | */ |
| | | ThreadPoolExecutor customthreadPoolExecutor = new ThreadPoolExecutor( |
| | | 5, // 根据CPU核心数设置 |
| | | 10, // 最大应急线程数 |
| | | 30, TimeUnit.SECONDS, // 空闲线程存活时间 |
| | | new ArrayBlockingQueue<>(100), // 有界队列防止内存溢出 |
| | | new DefaultThreadFactory("custom-thread-pool"), // 自定义线程命名 |
| | | new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略 |
| | | ); |
| | | |
| | | try { |
| | | LocalDate localDate = LocalDate.now().minusDays(1); |
| | | String start = localDate + " 00:00:00"; |
| | | String end = localDate + " 23:59:59"; |
| | | Date startTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(start); |
| | | Date endTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(end); |
| | | // 查询所有的mango数据 |
| | | List<UploadRealTimeMonitoringData> uploadRealTimeMonitoringDataList = uploadRealTimeMonitoringDataService.getRangeTimeData(startTime, endTime); |
| | | customthreadPoolExecutor.execute(() -> { |
| | | if (uploadRealTimeMonitoringDataList != null && uploadRealTimeMonitoringDataList.size() > 0) { |
| | | for (UploadRealTimeMonitoringData uploadRealTimeMonitoringData : uploadRealTimeMonitoringDataList) { |
| | | JSONObject jsonObject = new JSONObject(); |
| | | jsonObject.put("device_code", uploadRealTimeMonitoringData.getCharging_pile_code()); |
| | | jsonObject.put("report_time", System.currentTimeMillis()); |
| | | jsonObject.put("properties", uploadRealTimeMonitoringData); |
| | | mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", uploadRealTimeMonitoringData.getCharging_pile_code()), jsonObject.toJSONString()); |
| | | } |
| | | } |
| | | }); |
| | | |
| | | TimeUnit.MILLISECONDS.sleep(1); |
| | | |
| | | // Future<String> future = customthreadPoolExecutor.submit(() -> { |
| | | // log.info("线程:{},办理业务", Thread.currentThread().getName()); |
| | | // return "业务办理完成"; |
| | | // }); |
| | | // log.info(future.get()); |
| | | } finally { |
| | | gracefulShutdown(customthreadPoolExecutor); |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 优雅关闭线程池通用方法 |
| | | * |
| | | * @param pool 需要关闭的线程池 |
| | | */ |
| | | private static void gracefulShutdown(ExecutorService pool) { |
| | | pool.shutdown(); // 拒绝新任务提交 |
| | | try { |
| | | // 等待现有任务完成 |
| | | if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { |
| | | pool.shutdownNow(); // 取消等待中的任务 只等待运行中的任务 |
| | | // 再次等待任务响应中断 |
| | | if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { |
| | | log.error("线程池未完全关闭"); |
| | | } |
| | | } |
| | | } catch (InterruptedException e) { |
| | | // 重新尝试关闭 |
| | | pool.shutdownNow(); |
| | | Thread.currentThread().interrupt(); |
| | | } finally { |
| | | log.info("线程池是否执行完成:{}", pool.isTerminated()); |
| | | } |
| | | } |
| | | |
| | | public static void main(String[] args) { |
| | | System.err.println(System.currentTimeMillis()); |
| | | } |
| | | |
| | | } |