| | |
| | | |
| | | |
| | | import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; |
| | | import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; |
| | | import com.linghu.controller.CollectController; |
| | | import com.linghu.model.dto.TaskResultResponse; |
| | | import com.linghu.model.dto.TaskStatusResponse; |
| | | import com.linghu.model.entity.KeywordTask; |
| | | import com.linghu.model.entity.Question; |
| | | import com.linghu.service.KeywordTaskService; |
| | | import com.linghu.service.QuestionService; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | |
| | | private CollectController collectController; |
| | | @Autowired |
| | | private WebClient webClient; // 假设已配置WebClient |
| | | @Autowired |
| | | private QuestionService questionService; |
| | | |
| | | private String baseUrl = "http://thirdparty-service"; // 第三方服务基础URL |
| | | private ScheduledFuture<?> scheduledTask; // 定时任务引用 |
| | |
| | | ) |
| | | ); |
| | | } |
| | | // @Scheduled(fixedRate = 10000) // 每5秒执行一次 |
| | | // public void scheduleFixedRateTask() { |
| | | // // 查询所有状态为pending的任务 |
| | | // LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); |
| | | // queryWrapper.eq(KeywordTask::getStatus, "pending"); |
| | | // |
| | | // keywordTaskService.list(queryWrapper) |
| | | // .stream() |
| | | // .filter(task -> task.getTask_id() != null) |
| | | // .forEach(task -> processTaskStatus(task) |
| | | // .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行 |
| | | // .subscribe( |
| | | // updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), |
| | | // error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) |
| | | // ) |
| | | // ); |
| | | // } |
| | | |
| | | |
| | | private Mono<KeywordTask> processTaskStatus(KeywordTask task) { |
| | | return collectController.getTaskStatus(task.getTask_id()) |
| | |
| | | && !"Error".equalsIgnoreCase(statusResponse.getStatus())) { |
| | | task.setStatus("false"); |
| | | return Mono.just(task); |
| | | }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) { |
| | | //更新每个提问词的状态 |
| | | return updateQuestionStatus(task, statusResponse); // 抽取为独立方法 |
| | | }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&"任务不存在".equalsIgnoreCase(statusResponse.getMessage())){ |
| | | task.setStatus("false"); |
| | | task.setStatus("nonentity"); |
| | | return Mono.just(task); |
| | | } |
| | | else { |
| | |
| | | .thenReturn(task); |
| | | }); |
| | | } |
| | | |
| | | private Mono<KeywordTask> updateQuestionStatus(KeywordTask task, TaskStatusResponse statusResponse) { |
| | | // 1. 先执行同步查询,获取 List<Question> |
| | | List<Question> questions = questionService.lambdaQuery() |
| | | .eq(Question::getKeyword_id, task.getKeyword_id()) |
| | | .list(); |
| | | |
| | | // 2. 将 List 转为 Flux,再进行响应式处理 |
| | | return Flux.fromIterable(questions) |
| | | .flatMap(question -> { |
| | | // 更新逻辑... |
| | | String newStatus = statusResponse.getQuestions_status().stream() |
| | | .filter(qs -> qs.getQuestion().equals(question.getQuestion())) |
| | | .findFirst() |
| | | .map(qs -> qs.getStatus()) |
| | | .orElse(question.getStatus()); |
| | | question.setStatus(newStatus); |
| | | |
| | | return Mono.fromSupplier(() -> questionService.updateById(question)) |
| | | // 此时可以调用 doOnError 处理异常 |
| | | .doOnError(e -> log.error("更新 Question {} 失败: {}", question.getQuestion_id(), e.getMessage())) |
| | | .onErrorReturn(false); // 异常时返回 false |
| | | }) |
| | | .then(Mono.just(task)); |
| | | } |
| | | } |