From 3f95bd04287246a76e113cc8044e1fcbbda2257b Mon Sep 17 00:00:00 2001 From: guyue <1721849008@qq.com> Date: 星期四, 10 七月 2025 20:09:50 +0800 Subject: [PATCH] 定时任务 --- src/main/resources/mapper/KeywordTaskMapper.xml | 3 + src/main/java/com/linghu/controller/CollectController.java | 6 +++ src/main/java/com/linghu/model/entity/KeywordTask.java | 6 ++ src/main/java/com/linghu/LingHuApplication.java | 2 + src/main/java/com/linghu/timeTask/ScheduledTasks.java | 94 +++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 109 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/linghu/LingHuApplication.java b/src/main/java/com/linghu/LingHuApplication.java index 0612a01..8b72245 100644 --- a/src/main/java/com/linghu/LingHuApplication.java +++ b/src/main/java/com/linghu/LingHuApplication.java @@ -3,8 +3,10 @@ import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication +@EnableScheduling @MapperScan("com.linghu.mapper") public class LingHuApplication { diff --git a/src/main/java/com/linghu/controller/CollectController.java b/src/main/java/com/linghu/controller/CollectController.java index 9a7eac3..e6b7671 100644 --- a/src/main/java/com/linghu/controller/CollectController.java +++ b/src/main/java/com/linghu/controller/CollectController.java @@ -239,6 +239,7 @@ KeywordTask keywordTask = new KeywordTask(); keywordTask.setKeyword_id(keywordId); keywordTask.setTask_id(response.getTask_id()); + keywordTask.setStatus("pending"); return keywordTask; }) .collect(Collectors.toList()); @@ -642,4 +643,9 @@ .onErrorResume(e -> Mono.just( new ServerResourceResponse( e.getMessage()))); } + /** + * 传入orderid查所有关键词id以及关键词下面的所有任务id,轮询所有任务状态,如果状态为completed,则循环调用获取结果接口,处理结果 + */ + + } diff --git a/src/main/java/com/linghu/model/entity/KeywordTask.java b/src/main/java/com/linghu/model/entity/KeywordTask.java index 7a1c5b9..9b8bde5 100644 --- a/src/main/java/com/linghu/model/entity/KeywordTask.java +++ b/src/main/java/com/linghu/model/entity/KeywordTask.java @@ -29,6 +29,7 @@ * */ private String task_id; + private String status; @TableField(exist = false) private static final long serialVersionUID = 1L; @@ -47,7 +48,8 @@ KeywordTask other = (KeywordTask) that; return (this.getId() == null ? other.getId() == null : this.getId().equals(other.getId())) && (this.getKeyword_id() == null ? other.getKeyword_id() == null : this.getKeyword_id().equals(other.getKeyword_id())) - && (this.getTask_id() == null ? other.getTask_id() == null : this.getTask_id().equals(other.getTask_id())); + && (this.getTask_id() == null ? other.getTask_id() == null : this.getTask_id().equals(other.getTask_id())) + && (this.getStatus() == null ? other.getStatus() == null : this.getStatus().equals(other.getStatus())); } @Override @@ -57,6 +59,7 @@ result = prime * result + ((getId() == null) ? 0 : getId().hashCode()); result = prime * result + ((getKeyword_id() == null) ? 0 : getKeyword_id().hashCode()); result = prime * result + ((getTask_id() == null) ? 0 : getTask_id().hashCode()); + result = prime * result + ((getStatus() == null) ? 0 : getStatus().hashCode()); return result; } @@ -70,6 +73,7 @@ sb.append(", keyword_id=").append(keyword_id); sb.append(", task_id=").append(task_id); sb.append(", serialVersionUID=").append(serialVersionUID); + sb.append(", status=").append(status); sb.append("]"); return sb.toString(); } diff --git a/src/main/java/com/linghu/timeTask/ScheduledTasks.java b/src/main/java/com/linghu/timeTask/ScheduledTasks.java new file mode 100644 index 0000000..95c2a50 --- /dev/null +++ b/src/main/java/com/linghu/timeTask/ScheduledTasks.java @@ -0,0 +1,94 @@ +package com.linghu.timeTask; + + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.linghu.controller.CollectController; +import com.linghu.model.dto.TaskResultResponse; +import com.linghu.model.dto.TaskStatusResponse; +import com.linghu.model.entity.KeywordTask; +import com.linghu.service.KeywordTaskService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@Component +public class ScheduledTasks { + private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class); + private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); + + @Autowired + private KeywordTaskService keywordTaskService; + + @Autowired + private CollectController collectController; + + @Scheduled(fixedRate = 5000) // 每5秒执行一次 + public void scheduleFixedRateTask() { + // 查询所有状态为pending的任务 + LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(KeywordTask::getStatus, "pending"); + + keywordTaskService.list(queryWrapper) + .stream() + .filter(task -> task.getTask_id() != null) + .forEach(task -> processTaskStatus(task) + .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行 + .subscribe( + updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), + error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) + ) + ); + } + + private Mono<KeywordTask> processTaskStatus(KeywordTask task) { + return collectController.getTaskStatus(task.getTask_id()) + .flatMap(statusResponse -> { + if ("completed".equalsIgnoreCase(statusResponse.getStatus())) { + log.info("任务 {} 已完成,获取结果", task.getTask_id()); + return collectController.getTaskResult(task.getTask_id()) + .doOnSuccess(result -> log.info("获取任务 {} 结果成功", task.getTask_id())) + .thenReturn(task) + .map(t -> { + t.setStatus("completed"); + return t; + }); + } else if (!"submit".equalsIgnoreCase(statusResponse.getStatus()) + && !"running".equalsIgnoreCase(statusResponse.getStatus())) { + task.setStatus("false"); + return Mono.just(task); + } else { + // 任务仍在进行中,不更新状态 + return Mono.empty(); + } + }) + .switchIfEmpty(Mono.just(task)) // 如果状态检查返回empty,保持原有任务 + .flatMap(t -> { + if (!"pending".equalsIgnoreCase(t.getStatus())) { + + + // 修改这里:将updateById的结果包装成Mono + return Mono.fromSupplier(() -> keywordTaskService.updateById(t)) + .thenReturn(t); + } + return Mono.just(t); + }) + .onErrorResume(e -> { + log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage()); + task.setStatus("error"); + + // 修改这里:将updateById的结果包装成Mono + return Mono.fromSupplier(() -> keywordTaskService.updateById(task)) + .thenReturn(task); + }); + } +} \ No newline at end of file diff --git a/src/main/resources/mapper/KeywordTaskMapper.xml b/src/main/resources/mapper/KeywordTaskMapper.xml index 39b9d4c..caacffe 100644 --- a/src/main/resources/mapper/KeywordTaskMapper.xml +++ b/src/main/resources/mapper/KeywordTaskMapper.xml @@ -8,9 +8,10 @@ <id property="id" column="id" jdbcType="INTEGER"/> <result property="keyword_id" column="keyword_id" jdbcType="INTEGER"/> <result property="task_id" column="task_id" jdbcType="VARCHAR"/> + <result property="status" column="status" jdbcType="VARCHAR"/> </resultMap> <sql id="Base_Column_List"> - id,keyword_id,task_id + id,keyword_id,task_id, status </sql> </mapper> -- Gitblit v1.7.1