guyue
2 天以前 3f95bd04287246a76e113cc8044e1fcbbda2257b
定时任务
1个文件已添加
4个文件已修改
111 ■■■■■ 已修改文件
src/main/java/com/linghu/LingHuApplication.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/linghu/controller/CollectController.java 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/linghu/model/entity/KeywordTask.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/linghu/timeTask/ScheduledTasks.java 94 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/mapper/KeywordTaskMapper.xml 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/linghu/LingHuApplication.java
@@ -3,8 +3,10 @@
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
@MapperScan("com.linghu.mapper")
public class LingHuApplication {
src/main/java/com/linghu/controller/CollectController.java
@@ -239,6 +239,7 @@
                    KeywordTask keywordTask = new KeywordTask();
                    keywordTask.setKeyword_id(keywordId);
                    keywordTask.setTask_id(response.getTask_id());
                    keywordTask.setStatus("pending");
                    return keywordTask;
                })
                .collect(Collectors.toList());
@@ -642,4 +643,9 @@
                .onErrorResume(e -> Mono.just(
                        new ServerResourceResponse( e.getMessage())));
    }
    /**
     * 传入orderid查所有关键词id以及关键词下面的所有任务id,轮询所有任务状态,如果状态为completed,则循环调用获取结果接口,处理结果
     */
}
src/main/java/com/linghu/model/entity/KeywordTask.java
@@ -29,6 +29,7 @@
     * 
     */
    private String task_id;
    private String status;
    @TableField(exist = false)
    private static final long serialVersionUID = 1L;
@@ -47,7 +48,8 @@
        KeywordTask other = (KeywordTask) that;
        return (this.getId() == null ? other.getId() == null : this.getId().equals(other.getId()))
            && (this.getKeyword_id() == null ? other.getKeyword_id() == null : this.getKeyword_id().equals(other.getKeyword_id()))
            && (this.getTask_id() == null ? other.getTask_id() == null : this.getTask_id().equals(other.getTask_id()));
            && (this.getTask_id() == null ? other.getTask_id() == null : this.getTask_id().equals(other.getTask_id()))
                && (this.getStatus() == null ? other.getStatus() == null : this.getStatus().equals(other.getStatus()));
    }
    @Override
@@ -57,6 +59,7 @@
        result = prime * result + ((getId() == null) ? 0 : getId().hashCode());
        result = prime * result + ((getKeyword_id() == null) ? 0 : getKeyword_id().hashCode());
        result = prime * result + ((getTask_id() == null) ? 0 : getTask_id().hashCode());
        result = prime * result + ((getStatus() == null) ? 0 : getStatus().hashCode());
        return result;
    }
@@ -70,6 +73,7 @@
        sb.append(", keyword_id=").append(keyword_id);
        sb.append(", task_id=").append(task_id);
        sb.append(", serialVersionUID=").append(serialVersionUID);
        sb.append(", status=").append(status);
        sb.append("]");
        return sb.toString();
    }
src/main/java/com/linghu/timeTask/ScheduledTasks.java
New file
@@ -0,0 +1,94 @@
package com.linghu.timeTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.linghu.controller.CollectController;
import com.linghu.model.dto.TaskResultResponse;
import com.linghu.model.dto.TaskStatusResponse;
import com.linghu.model.entity.KeywordTask;
import com.linghu.service.KeywordTaskService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Component
public class ScheduledTasks {
    private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class);
    private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
    @Autowired
    private KeywordTaskService keywordTaskService;
    @Autowired
    private CollectController collectController;
    @Scheduled(fixedRate = 5000) // 每5秒执行一次
    public void scheduleFixedRateTask() {
        // 查询所有状态为pending的任务
        LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(KeywordTask::getStatus, "pending");
        keywordTaskService.list(queryWrapper)
                .stream()
                .filter(task -> task.getTask_id() != null)
                .forEach(task -> processTaskStatus(task)
                        .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行
                        .subscribe(
                                updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
                                error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
                        )
                );
    }
    private Mono<KeywordTask> processTaskStatus(KeywordTask task) {
        return collectController.getTaskStatus(task.getTask_id())
                .flatMap(statusResponse -> {
                    if ("completed".equalsIgnoreCase(statusResponse.getStatus())) {
                        log.info("任务 {} 已完成,获取结果", task.getTask_id());
                        return collectController.getTaskResult(task.getTask_id())
                                .doOnSuccess(result -> log.info("获取任务 {} 结果成功", task.getTask_id()))
                                .thenReturn(task)
                                .map(t -> {
                                    t.setStatus("completed");
                                    return t;
                                });
                    } else if (!"submit".equalsIgnoreCase(statusResponse.getStatus())
                            && !"running".equalsIgnoreCase(statusResponse.getStatus())) {
                        task.setStatus("false");
                        return Mono.just(task);
                    } else {
                        // 任务仍在进行中,不更新状态
                        return Mono.empty();
                    }
                })
                .switchIfEmpty(Mono.just(task)) // 如果状态检查返回empty,保持原有任务
                .flatMap(t -> {
                    if (!"pending".equalsIgnoreCase(t.getStatus())) {
                        // 修改这里:将updateById的结果包装成Mono
                        return Mono.fromSupplier(() -> keywordTaskService.updateById(t))
                                .thenReturn(t);
                    }
                    return Mono.just(t);
                })
                .onErrorResume(e -> {
                    log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage());
                    task.setStatus("error");
                    // 修改这里:将updateById的结果包装成Mono
                    return Mono.fromSupplier(() -> keywordTaskService.updateById(task))
                            .thenReturn(task);
                });
    }
}
src/main/resources/mapper/KeywordTaskMapper.xml
@@ -8,9 +8,10 @@
            <id property="id" column="id" jdbcType="INTEGER"/>
            <result property="keyword_id" column="keyword_id" jdbcType="INTEGER"/>
            <result property="task_id" column="task_id" jdbcType="VARCHAR"/>
        <result property="status" column="status" jdbcType="VARCHAR"/>
    </resultMap>
    <sql id="Base_Column_List">
        id,keyword_id,task_id
        id,keyword_id,task_id, status
    </sql>
</mapper>