From 693f70da11701e777203e263d7da41abb648dd0f Mon Sep 17 00:00:00 2001 From: guyue <1721849008@qq.com> Date: 星期三, 16 七月 2025 20:16:29 +0800 Subject: [PATCH] 对比采集,实时状态 --- src/main/java/com/linghu/timeTask/ScheduledTasks.java | 53 +++++++++++++++++++++++++++++++++++------------------ 1 files changed, 35 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/linghu/timeTask/ScheduledTasks.java b/src/main/java/com/linghu/timeTask/ScheduledTasks.java index 7ec4f3c..701c2fd 100644 --- a/src/main/java/com/linghu/timeTask/ScheduledTasks.java +++ b/src/main/java/com/linghu/timeTask/ScheduledTasks.java @@ -2,11 +2,14 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.linghu.controller.CollectController; import com.linghu.model.dto.TaskResultResponse; import com.linghu.model.dto.TaskStatusResponse; import com.linghu.model.entity.KeywordTask; +import com.linghu.model.entity.Question; import com.linghu.service.KeywordTaskService; +import com.linghu.service.QuestionService; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +44,8 @@ private CollectController collectController; @Autowired private WebClient webClient; // 假设已配置WebClient + @Autowired + private QuestionService questionService; private String baseUrl = "http://thirdparty-service"; // 第三方服务基础URL private ScheduledFuture<?> scheduledTask; // 定时任务引用 @@ -161,23 +166,7 @@ ) ); } -// @Scheduled(fixedRate = 10000) // 每5秒执行一次 -// public void scheduleFixedRateTask() { -// // 查询所有状态为pending的任务 -// LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); -// queryWrapper.eq(KeywordTask::getStatus, "pending"); -// -// keywordTaskService.list(queryWrapper) -// .stream() -// .filter(task -> task.getTask_id() != null) -// .forEach(task -> processTaskStatus(task) -// .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行 -// .subscribe( -// updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), -// error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) -// ) -// ); -// } + private Mono<KeywordTask> processTaskStatus(KeywordTask task) { return collectController.getTaskStatus(task.getTask_id()) @@ -196,8 +185,11 @@ && !"Error".equalsIgnoreCase(statusResponse.getStatus())) { task.setStatus("false"); return Mono.just(task); + }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) { + //更新每个提问词的状态 + return updateQuestionStatus(task, statusResponse); // 抽取为独立方法 }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&"任务不存在".equalsIgnoreCase(statusResponse.getMessage())){ - task.setStatus("false"); + task.setStatus("nonentity"); return Mono.just(task); } else { @@ -224,4 +216,29 @@ .thenReturn(task); }); } + + private Mono<KeywordTask> updateQuestionStatus(KeywordTask task, TaskStatusResponse statusResponse) { + // 1. 先执行同步查询,获取 List<Question> + List<Question> questions = questionService.lambdaQuery() + .eq(Question::getKeyword_id, task.getKeyword_id()) + .list(); + + // 2. 将 List 转为 Flux,再进行响应式处理 + return Flux.fromIterable(questions) + .flatMap(question -> { + // 更新逻辑... + String newStatus = statusResponse.getQuestions_status().stream() + .filter(qs -> qs.getQuestion().equals(question.getQuestion())) + .findFirst() + .map(qs -> qs.getStatus()) + .orElse(question.getStatus()); + question.setStatus(newStatus); + + return Mono.fromSupplier(() -> questionService.updateById(question)) + // 此时可以调用 doOnError 处理异常 + .doOnError(e -> log.error("更新 Question {} 失败: {}", question.getQuestion_id(), e.getMessage())) + .onErrorReturn(false); // 异常时返回 false + }) + .then(Mono.just(task)); + } } \ No newline at end of file -- Gitblit v1.7.1