From 3f95bd04287246a76e113cc8044e1fcbbda2257b Mon Sep 17 00:00:00 2001
From: guyue <1721849008@qq.com>
Date: 星期四, 10 七月 2025 20:09:50 +0800
Subject: [PATCH] 定时任务

---
 src/main/resources/mapper/KeywordTaskMapper.xml            |    3 +
 src/main/java/com/linghu/controller/CollectController.java |    6 +++
 src/main/java/com/linghu/model/entity/KeywordTask.java     |    6 ++
 src/main/java/com/linghu/LingHuApplication.java            |    2 +
 src/main/java/com/linghu/timeTask/ScheduledTasks.java      |   94 +++++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 109 insertions(+), 2 deletions(-)

diff --git a/src/main/java/com/linghu/LingHuApplication.java b/src/main/java/com/linghu/LingHuApplication.java
index 0612a01..8b72245 100644
--- a/src/main/java/com/linghu/LingHuApplication.java
+++ b/src/main/java/com/linghu/LingHuApplication.java
@@ -3,8 +3,10 @@
 import org.mybatis.spring.annotation.MapperScan;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableScheduling;
 
 @SpringBootApplication
+@EnableScheduling
 @MapperScan("com.linghu.mapper")
 public class LingHuApplication {
 
diff --git a/src/main/java/com/linghu/controller/CollectController.java b/src/main/java/com/linghu/controller/CollectController.java
index 9a7eac3..e6b7671 100644
--- a/src/main/java/com/linghu/controller/CollectController.java
+++ b/src/main/java/com/linghu/controller/CollectController.java
@@ -239,6 +239,7 @@
                     KeywordTask keywordTask = new KeywordTask();
                     keywordTask.setKeyword_id(keywordId);
                     keywordTask.setTask_id(response.getTask_id());
+                    keywordTask.setStatus("pending");
                     return keywordTask;
                 })
                 .collect(Collectors.toList());
@@ -642,4 +643,9 @@
                 .onErrorResume(e -> Mono.just(
                         new ServerResourceResponse( e.getMessage())));
     }
+    /**
+     * 传入orderid查所有关键词id以及关键词下面的所有任务id,轮询所有任务状态,如果状态为completed,则循环调用获取结果接口,处理结果
+     */
+
+
 }
diff --git a/src/main/java/com/linghu/model/entity/KeywordTask.java b/src/main/java/com/linghu/model/entity/KeywordTask.java
index 7a1c5b9..9b8bde5 100644
--- a/src/main/java/com/linghu/model/entity/KeywordTask.java
+++ b/src/main/java/com/linghu/model/entity/KeywordTask.java
@@ -29,6 +29,7 @@
      * 
      */
     private String task_id;
+    private String status;
 
     @TableField(exist = false)
     private static final long serialVersionUID = 1L;
@@ -47,7 +48,8 @@
         KeywordTask other = (KeywordTask) that;
         return (this.getId() == null ? other.getId() == null : this.getId().equals(other.getId()))
             && (this.getKeyword_id() == null ? other.getKeyword_id() == null : this.getKeyword_id().equals(other.getKeyword_id()))
-            && (this.getTask_id() == null ? other.getTask_id() == null : this.getTask_id().equals(other.getTask_id()));
+            && (this.getTask_id() == null ? other.getTask_id() == null : this.getTask_id().equals(other.getTask_id()))
+                && (this.getStatus() == null ? other.getStatus() == null : this.getStatus().equals(other.getStatus()));
     }
 
     @Override
@@ -57,6 +59,7 @@
         result = prime * result + ((getId() == null) ? 0 : getId().hashCode());
         result = prime * result + ((getKeyword_id() == null) ? 0 : getKeyword_id().hashCode());
         result = prime * result + ((getTask_id() == null) ? 0 : getTask_id().hashCode());
+        result = prime * result + ((getStatus() == null) ? 0 : getStatus().hashCode());
         return result;
     }
 
@@ -70,6 +73,7 @@
         sb.append(", keyword_id=").append(keyword_id);
         sb.append(", task_id=").append(task_id);
         sb.append(", serialVersionUID=").append(serialVersionUID);
+        sb.append(", status=").append(status);
         sb.append("]");
         return sb.toString();
     }
diff --git a/src/main/java/com/linghu/timeTask/ScheduledTasks.java b/src/main/java/com/linghu/timeTask/ScheduledTasks.java
new file mode 100644
index 0000000..95c2a50
--- /dev/null
+++ b/src/main/java/com/linghu/timeTask/ScheduledTasks.java
@@ -0,0 +1,94 @@
+package com.linghu.timeTask;
+
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.linghu.controller.CollectController;
+import com.linghu.model.dto.TaskResultResponse;
+import com.linghu.model.dto.TaskStatusResponse;
+import com.linghu.model.entity.KeywordTask;
+import com.linghu.service.KeywordTaskService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class ScheduledTasks {
+    private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class);
+    private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
+
+    @Autowired
+    private KeywordTaskService keywordTaskService;
+
+    @Autowired
+    private CollectController collectController;
+
+    @Scheduled(fixedRate = 5000) // 每5秒执行一次
+    public void scheduleFixedRateTask() {
+        // 查询所有状态为pending的任务
+        LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
+        queryWrapper.eq(KeywordTask::getStatus, "pending");
+
+        keywordTaskService.list(queryWrapper)
+                .stream()
+                .filter(task -> task.getTask_id() != null)
+                .forEach(task -> processTaskStatus(task)
+                        .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行
+                        .subscribe(
+                                updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
+                                error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
+                        )
+                );
+    }
+
+    private Mono<KeywordTask> processTaskStatus(KeywordTask task) {
+        return collectController.getTaskStatus(task.getTask_id())
+                .flatMap(statusResponse -> {
+                    if ("completed".equalsIgnoreCase(statusResponse.getStatus())) {
+                        log.info("任务 {} 已完成,获取结果", task.getTask_id());
+                        return collectController.getTaskResult(task.getTask_id())
+                                .doOnSuccess(result -> log.info("获取任务 {} 结果成功", task.getTask_id()))
+                                .thenReturn(task)
+                                .map(t -> {
+                                    t.setStatus("completed");
+                                    return t;
+                                });
+                    } else if (!"submit".equalsIgnoreCase(statusResponse.getStatus())
+                            && !"running".equalsIgnoreCase(statusResponse.getStatus())) {
+                        task.setStatus("false");
+                        return Mono.just(task);
+                    } else {
+                        // 任务仍在进行中,不更新状态
+                        return Mono.empty();
+                    }
+                })
+                .switchIfEmpty(Mono.just(task)) // 如果状态检查返回empty,保持原有任务
+                .flatMap(t -> {
+                    if (!"pending".equalsIgnoreCase(t.getStatus())) {
+
+
+                        // 修改这里:将updateById的结果包装成Mono
+                        return Mono.fromSupplier(() -> keywordTaskService.updateById(t))
+                                .thenReturn(t);
+                    }
+                    return Mono.just(t);
+                })
+                .onErrorResume(e -> {
+                    log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage());
+                    task.setStatus("error");
+
+                    // 修改这里:将updateById的结果包装成Mono
+                    return Mono.fromSupplier(() -> keywordTaskService.updateById(task))
+                            .thenReturn(task);
+                });
+    }
+}
\ No newline at end of file
diff --git a/src/main/resources/mapper/KeywordTaskMapper.xml b/src/main/resources/mapper/KeywordTaskMapper.xml
index 39b9d4c..caacffe 100644
--- a/src/main/resources/mapper/KeywordTaskMapper.xml
+++ b/src/main/resources/mapper/KeywordTaskMapper.xml
@@ -8,9 +8,10 @@
             <id property="id" column="id" jdbcType="INTEGER"/>
             <result property="keyword_id" column="keyword_id" jdbcType="INTEGER"/>
             <result property="task_id" column="task_id" jdbcType="VARCHAR"/>
+        <result property="status" column="status" jdbcType="VARCHAR"/>
     </resultMap>
 
     <sql id="Base_Column_List">
-        id,keyword_id,task_id
+        id,keyword_id,task_id, status
     </sql>
 </mapper>

--
Gitblit v1.7.1