guyue
2025-07-16 693f70da11701e777203e263d7da41abb648dd0f
对比采集,实时状态
3个文件已修改
65 ■■■■■ 已修改文件
src/main/java/com/linghu/controller/CollectController.java 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/linghu/model/dto/SearchTaskRequest.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/linghu/timeTask/ScheduledTasks.java 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/linghu/controller/CollectController.java
@@ -141,7 +141,7 @@
        // ... 原有逻辑 ...
        int maxConcurrentUsers = searchTaskRequest.getConfig() != null ?
                searchTaskRequest.getConfig().getMax_concurrent_users() : 3;
        List<List<UserDto>> userBatches = splitUsersIntoBatches(searchTaskRequest.getUsers(), maxConcurrentUsers, keywordId);
        List<List<UserDto>> userBatches = splitUsersIntoBatches(searchTaskRequest.getUsers(), maxConcurrentUsers, keywordId,searchTaskRequest.getIs_first());
        // 创建批次队列并存入全局映射
@@ -368,13 +368,14 @@
//                .then();
//    }
    private List<List<UserDto>> splitUsersIntoBatches(List<UserDto> users, int batchSize,Integer keywordId) {
    private List<List<UserDto>> splitUsersIntoBatches(List<UserDto> users, int batchSize,Integer keywordId,Boolean isFirst) {
        Keyword keyword = keywordService.getById(keywordId);
        if (null==keyword.getNum()){
            keyword.setNum(0);
        if (isFirst){
            keyword.setNum(1);
        }else {
            keyword.setNum(keyword.getNum()+1);
        }
        keyword.setNum(keyword.getNum()+1);
        keywordService.updateById(keyword);
        List<List<UserDto>> batches = new ArrayList<>();
src/main/java/com/linghu/model/dto/SearchTaskRequest.java
@@ -23,6 +23,7 @@
    private Boolean save_to_database = false;
    private String webhook_url;
    private Integer keyword_id;
    private Boolean is_first;
    @Data
    public static class ConfigDTO {
src/main/java/com/linghu/timeTask/ScheduledTasks.java
@@ -2,11 +2,14 @@
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.linghu.controller.CollectController;
import com.linghu.model.dto.TaskResultResponse;
import com.linghu.model.dto.TaskStatusResponse;
import com.linghu.model.entity.KeywordTask;
import com.linghu.model.entity.Question;
import com.linghu.service.KeywordTaskService;
import com.linghu.service.QuestionService;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,6 +44,8 @@
    private CollectController collectController;
    @Autowired
    private WebClient webClient; // 假设已配置WebClient
    @Autowired
    private QuestionService questionService;
    private String baseUrl = "http://thirdparty-service"; // 第三方服务基础URL
    private ScheduledFuture<?> scheduledTask; // 定时任务引用
@@ -161,23 +166,7 @@
                        )
                );
    }
//    @Scheduled(fixedRate = 10000) // 每5秒执行一次
//    public void scheduleFixedRateTask() {
//        // 查询所有状态为pending的任务
//        LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
//        queryWrapper.eq(KeywordTask::getStatus, "pending");
//
//        keywordTaskService.list(queryWrapper)
//                .stream()
//                .filter(task -> task.getTask_id() != null)
//                .forEach(task -> processTaskStatus(task)
//                        .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行
//                        .subscribe(
//                                updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
//                                error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
//                        )
//                );
//    }
    private Mono<KeywordTask> processTaskStatus(KeywordTask task) {
        return collectController.getTaskStatus(task.getTask_id())
@@ -196,8 +185,11 @@
                            && !"Error".equalsIgnoreCase(statusResponse.getStatus())) {
                        task.setStatus("false");
                        return Mono.just(task);
                    }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) {
                        //更新每个提问词的状态
                        return updateQuestionStatus(task, statusResponse); // 抽取为独立方法
                    }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&"任务不存在".equalsIgnoreCase(statusResponse.getMessage())){
                        task.setStatus("false");
                        task.setStatus("nonentity");
                        return Mono.just(task);
                    }
                    else {
@@ -224,4 +216,29 @@
                            .thenReturn(task);
                });
    }
    private Mono<KeywordTask> updateQuestionStatus(KeywordTask task, TaskStatusResponse statusResponse) {
        // 1. 先执行同步查询,获取 List<Question>
        List<Question> questions = questionService.lambdaQuery()
                .eq(Question::getKeyword_id, task.getKeyword_id())
                .list();
        // 2. 将 List 转为 Flux,再进行响应式处理
        return Flux.fromIterable(questions)
                .flatMap(question -> {
                    // 更新逻辑...
                    String newStatus = statusResponse.getQuestions_status().stream()
                            .filter(qs -> qs.getQuestion().equals(question.getQuestion()))
                            .findFirst()
                            .map(qs -> qs.getStatus())
                            .orElse(question.getStatus());
                    question.setStatus(newStatus);
                    return Mono.fromSupplier(() -> questionService.updateById(question))
                            // 此时可以调用 doOnError 处理异常
                            .doOnError(e -> log.error("更新 Question {} 失败: {}", question.getQuestion_id(), e.getMessage()))
                            .onErrorReturn(false); // 异常时返回 false
                })
                .then(Mono.just(task));
    }
}