From a271ed5a232236383bc96c8c1e380a5c91dc1c3c Mon Sep 17 00:00:00 2001 From: guyue <1721849008@qq.com> Date: 星期二, 12 八月 2025 01:07:19 +0800 Subject: [PATCH] 代码规范 --- src/main/java/com/linghu/timeTask/ScheduledTasks.java | 64 ++++++++++---------------------- 1 files changed, 20 insertions(+), 44 deletions(-) diff --git a/src/main/java/com/linghu/timeTask/ScheduledTasks.java b/src/main/java/com/linghu/timeTask/ScheduledTasks.java index 9312445..ee07098 100644 --- a/src/main/java/com/linghu/timeTask/ScheduledTasks.java +++ b/src/main/java/com/linghu/timeTask/ScheduledTasks.java @@ -3,6 +3,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.linghu.config.FinalStatus; import com.linghu.controller.CollectController; import com.linghu.model.dto.TaskResultResponse; import com.linghu.model.dto.TaskStatusResponse; @@ -39,8 +40,6 @@ private KeywordTaskService keywordTaskService; @Autowired private CollectController collectController; - @Autowired - private WebClient webClient; // 假设已配置WebClient @Autowired private QuestionService questionService; @Autowired @@ -160,7 +159,7 @@ try { // 查询所有状态为pending的任务 LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); - queryWrapper.eq(KeywordTask::getStatus, "pending"); + queryWrapper.eq(KeywordTask::getStatus, FinalStatus.PENDING.getValue()); List<KeywordTask> tasks = keywordTaskService.list(queryWrapper); log.info("查询到 {} 个待处理任务", tasks.size()); @@ -175,7 +174,7 @@ boolean updatedCount = keywordTaskService.update( new LambdaUpdateWrapper<KeywordTask>() .in(KeywordTask::getId, taskIds) // 限定为查询到的任务 - .eq(KeywordTask::getStatus, "pending") // 确保状态未被其他进程修改 + .eq(KeywordTask::getStatus, FinalStatus.PENDING.getValue()) // 确保状态未被其他进程修改 .set(KeywordTask::getStatus, "processing") ); log.info("成功标记 {} 个任务为processing(查询到{}个)", updatedCount, tasks.size()); @@ -197,57 +196,35 @@ } } /** - * 实际的任务处理方法,替代原@Scheduled注解方法 + * 实际的任务处理方法, */ -// public void executeTaskProcessing() { -// if (!isHealthy) { -// log.debug("系统不健康,跳过任务处理"); -// return; -// } -// -// // 查询所有状态为pending的任务 -// LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); -// queryWrapper.eq(KeywordTask::getStatus, "pending"); -// -// keywordTaskService.list(queryWrapper) -// .stream() -// .filter(task -> task.getTask_id() != null) -// .forEach(task -> processTaskStatus(task) -// .subscribeOn(Schedulers.boundedElastic()) -// .subscribe( -// updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), -// error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) -// ) -// ); -// } - private Mono<KeywordTask> processTaskStatus(KeywordTask task) { return collectController.getTaskStatus(task.getTask_id()) .flatMap(statusResponse -> { - if ("completed".equalsIgnoreCase(statusResponse.getStatus())) { + if (FinalStatus.COMPLETED.getValue().equalsIgnoreCase(statusResponse.getStatus())) { log.info("任务 {} 已完成,获取结果", task.getTask_id()); return collectController.getTaskResult(task.getTask_id()) .doOnSuccess(result -> log.info("获取任务 {} 结果成功", task.getTask_id())) .thenReturn(task) .map(t -> { - t.setStatus("completed"); + t.setStatus(FinalStatus.COMPLETED.getValue()); return t; }); - } else if (!"submitted".equalsIgnoreCase(statusResponse.getStatus()) - && !"running".equalsIgnoreCase(statusResponse.getStatus()) - && !"Error".equalsIgnoreCase(statusResponse.getStatus())) { - task.setStatus("false"); + } else if (!FinalStatus.SUBMITTED.getValue().equalsIgnoreCase(statusResponse.getStatus()) + && !FinalStatus.RUNNING.getValue().equalsIgnoreCase(statusResponse.getStatus()) + && !FinalStatus.ERROR.getValue().equalsIgnoreCase(statusResponse.getStatus())) { + task.setStatus(FinalStatus.FALSE.getValue()); // 新增:处理status为false时的关键词状态更新 return updateKeywordStatusWhenTaskFinished(task) .then(Mono.just(task)); - }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) { + }else if ( FinalStatus.RUNNING.getValue().equalsIgnoreCase(statusResponse.getStatus())) { // 改回 pending,进行下一轮查询 - task.setStatus("pending"); + task.setStatus(FinalStatus.PENDING.getValue()); //更新每个提问词的状态 return updateQuestionStatus(task, statusResponse); // 抽取为独立方法 - }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&statusResponse.getMessage().contains("Task not found")){ - task.setStatus("nonentity"); + }else if(FinalStatus.ERROR.getValue().equalsIgnoreCase(statusResponse.getStatus())&&statusResponse.getMessage().contains("Task not found")){ + task.setStatus(FinalStatus.NONENTITY.getValue()); // 更新关键词状态 return updateKeywordStatusWhenTaskFinished(task) .then(Mono.just(task)); @@ -267,7 +244,7 @@ }) .onErrorResume(e -> { log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage()); - task.setStatus("error"); + task.setStatus(FinalStatus.ERROR.getValue()); // 将updateById的结果包装成Mono return Mono.fromSupplier(() -> keywordTaskService.updateById(task)) @@ -311,16 +288,15 @@ }).flatMap(keywordTasks -> { // 检查所有关联任务是否都已完成(包括各种结束状态) boolean allCompleted = keywordTasks.stream().allMatch(t -> - "completed".equals(t.getStatus()) || - "false".equals(t.getStatus()) || - "cancelled".equals(t.getStatus()) || - "canceled".equals(t.getStatus()) || - "nonentity".equals(t.getStatus()) + FinalStatus.COMPLETED.getValue().equals(t.getStatus()) || + FinalStatus.FALSE.getValue().equals(t.getStatus()) || + FinalStatus.CANCELLED.getValue().equals(t.getStatus()) || + FinalStatus.NONENTITY.getValue().equals(t.getStatus()) ); if (allCompleted) { Keyword keyword = keywordService.getById(task.getKeyword_id()); - keyword.setStatus("completed"); + keyword.setStatus(FinalStatus.COMPLETED.getValue()); // return Mono.fromSupplier(() -> keywordService.updateById(keyword)) .then(); // 转换为Mono<Void> -- Gitblit v1.7.1