From dd028e18a12ad9ae7c43ed09b15ddd6bde1a43e9 Mon Sep 17 00:00:00 2001 From: guyue <1721849008@qq.com> Date: 星期三, 03 九月 2025 11:27:50 +0800 Subject: [PATCH] 采集中状态修改提前,统计数据合并 --- src/main/java/com/linghu/service/impl/CollectionServiceImpl.java | 31 ++++++++++++++++++------------- 1 files changed, 18 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/linghu/service/impl/CollectionServiceImpl.java b/src/main/java/com/linghu/service/impl/CollectionServiceImpl.java index 60953c6..772a0c1 100644 --- a/src/main/java/com/linghu/service/impl/CollectionServiceImpl.java +++ b/src/main/java/com/linghu/service/impl/CollectionServiceImpl.java @@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; - @Slf4j @Service public class CollectionServiceImpl implements CollectionService { @@ -85,7 +84,12 @@ // 将新的任务请求加入队列 taskQueue.add(searchTaskRequest); + //更新状态为采集中 + LambdaUpdateWrapper<Keyword> updateWrapper = new LambdaUpdateWrapper<>(); + updateWrapper.eq(Keyword::getKeyword_id, searchTaskRequest.getKeyword_id()); + updateWrapper.set(Keyword::getStatus, FinalStatus.SUBMITTED.getValue()); + keywordService.update(updateWrapper); // 如果当前没有任务在处理中,则启动任务队列的处理 if (!isProcessing) { processNextTaskInQueue(); @@ -181,7 +185,7 @@ return Flux.fromIterable(tasksToCancelRemotely) .flatMap(task -> { // 创建状态更新和远程取消的组合操作 - Mono<Void> updateStatus = updateTaskStatus(task.getTask_id(), "cancelled"); + Mono<Void> updateStatus = updateTaskStatus(task.getTask_id(), FinalStatus.CANCELLED.getValue()); Mono<ResponseResult<?>> cancelOp = cancelRemoteTask(task.getTask_id()) .onErrorResume(e -> { log.error("取消任务 {} 失败: {}", task.getTask_id(), e.getMessage()); @@ -347,7 +351,7 @@ // 如果任务状态是"submitted"或"running",继续轮询 if (!FinalStatus.COMPLETED.getValue().equalsIgnoreCase(statusResponse.getStatus()) && !FinalStatus.FAILED.getValue().equalsIgnoreCase(statusResponse.getStatus()) - && !FinalStatus.CANCELED.getValue().equalsIgnoreCase(statusResponse.getStatus()) + && !FinalStatus.CANCELLED.getValue().equalsIgnoreCase(statusResponse.getStatus()) && !(FinalStatus.ERROR.getValue().equalsIgnoreCase(statusResponse.getStatus()) && statusResponse.getMessage().contains("Task not found")) ) { return Mono.delay(Duration.ofSeconds(5)) // 延迟 5 秒后再次查询 @@ -397,6 +401,7 @@ KeywordTask keywordTask = new KeywordTask(); keywordTask.setKeyword_id(keywordId); keywordTask.setTask_id(null); // 任务ID为空 + keywordTask.setCreate_time( LocalDateTime.now()); keywordTask.setNum(keyword.getNum()); keywordTaskService.save(keywordTask); // 保存 KeywordTask @@ -439,13 +444,12 @@ //更新关键词状态 LambdaUpdateWrapper<Keyword> updateWrapper = new LambdaUpdateWrapper<>(); updateWrapper.eq(Keyword::getKeyword_id, batchRequest.getKeyword_id()); - updateWrapper.set(Keyword::getStatus, FinalStatus.COMPLETED.getValue()); updateWrapper.set(Keyword::getTask_id, taskResponse.getTask_id()); keywordService.update(updateWrapper); //设置轮数 Keyword keyword = keywordService.getById(batchRequest.getKeyword_id()); // 更新关键词任务与任务ID的关联 - // 获取与关键词相关的任务,task_id 为 null,确保只取一个任务 + // 获取与关键词相关的任务 List<KeywordTask> keywordTasks = keywordTaskService.list(new LambdaQueryWrapper<KeywordTask>() .eq(KeywordTask::getKeyword_id, keyword.getKeyword_id()) .eq(KeywordTask::getNum, keyword.getNum()) @@ -453,12 +457,12 @@ if (keywordTasks.size() > 0) { KeywordTask keywordTask = keywordTasks.get(0); keywordTask.setTask_id(taskResponse.getTask_id()); - keywordTask.setStatus("pending"); + keywordTask.setStatus(FinalStatus.PENDING.getValue()); keywordTaskService.updateById(keywordTask); } //将提问词列表的状态转为pending for (String questionName : batchRequest.getQuestions()) { - questionService.update(new LambdaUpdateWrapper<Question>().eq(Question::getKeyword_id, keyword.getKeyword_id()).eq(Question::getQuestion,questionName).set(Question::getStatus, "pending")); + questionService.update(new LambdaUpdateWrapper<Question>().eq(Question::getKeyword_id, keyword.getKeyword_id()).eq(Question::getQuestion,questionName).set(Question::getStatus, FinalStatus.PENDING.getValue())); } //所有关键词都在采集中或者已完成或者错误设置订单进入采集状态 @@ -497,7 +501,7 @@ new LambdaQueryWrapper<KeywordTask>() .eq(KeywordTask::getKeyword_id, keywordId) .isNotNull(KeywordTask::getTask_id) - .eq(KeywordTask::getStatus, "pending") + .eq(KeywordTask::getStatus, FinalStatus.PENDING.getValue()) ); } @@ -524,7 +528,7 @@ List<KeywordTask> keywordTasks = keywordTaskService.list(keywordTaskWrapper); // 更新关键词状态 - keyword.setStatus("completed"); + keyword.setStatus(FinalStatus.COMPLETED.getValue()); keywordService.updateById(keyword); //更新提问词状态为取消 @@ -588,7 +592,7 @@ private Mono<ResponseResult<?>> cancelRemoteTask(String taskId) { // 使用Collections.singletonMap或手动创建Map Map<String, Object> requestBody = new HashMap<>(); - requestBody.put("status", "pending"); + requestBody.put("status", FinalStatus.PENDING.getValue()); return webClient.post() .uri(baseUrl + "/api/v1/tasks/" + taskId + "/cancel") @@ -762,7 +766,6 @@ .allMatch(task -> FinalStatus.COMPLETED.getValue().equals(task.getStatus()) || FinalStatus.FALSE.getValue().equals(task.getStatus()) || FinalStatus.CANCELLED.getValue().equals(task.getStatus()) - || FinalStatus.CANCELED.getValue().equals(task.getStatus()) || FinalStatus.NONENTITY.getValue().equals(task.getStatus())); if (allCompletedOrFailed) { @@ -908,7 +911,9 @@ List<Reference> validRefGroup = refGroup.stream() .filter(Objects::nonNull) .collect(Collectors.toList()); - if (validRefGroup.isEmpty()) return; + if (validRefGroup.isEmpty()){ + return; + } Optional<Reference> existingRef = validRefGroup.stream() .filter(ref -> ref.getReference_id() != null) @@ -983,7 +988,7 @@ // 批量更新问题状态 List<Question> questionsToUpdate = new ArrayList<>(); questions.forEach(question -> { - List<QuestionResultList> results = questionResultsMap.getOrDefault(question.getQuestion(), io.jsonwebtoken.lang.Collections.emptyList()); + List<QuestionResultList> results = questionResultsMap.getOrDefault(question.getQuestion(), Collections.emptyList()); FinalStatus finalStatus = determineFinalStatus(results); // 使用枚举进行switch判断 -- Gitblit v1.7.1