| | |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.stream.Collectors; |
| | | |
| | | @Slf4j |
| | | @Service |
| | | public class CollectionServiceImpl implements CollectionService { |
| | |
| | | |
| | | // 将新的任务请求加入队列 |
| | | taskQueue.add(searchTaskRequest); |
| | | //更新状态为采集中 |
| | | LambdaUpdateWrapper<Keyword> updateWrapper = new LambdaUpdateWrapper<>(); |
| | | updateWrapper.eq(Keyword::getKeyword_id, searchTaskRequest.getKeyword_id()); |
| | | updateWrapper.set(Keyword::getStatus, FinalStatus.SUBMITTED.getValue()); |
| | | |
| | | keywordService.update(updateWrapper); |
| | | // 如果当前没有任务在处理中,则启动任务队列的处理 |
| | | if (!isProcessing) { |
| | | processNextTaskInQueue(); |
| | |
| | | return Flux.fromIterable(tasksToCancelRemotely) |
| | | .flatMap(task -> { |
| | | // 创建状态更新和远程取消的组合操作 |
| | | Mono<Void> updateStatus = updateTaskStatus(task.getTask_id(), "cancelled"); |
| | | Mono<Void> updateStatus = updateTaskStatus(task.getTask_id(), FinalStatus.CANCELLED.getValue()); |
| | | Mono<ResponseResult<?>> cancelOp = cancelRemoteTask(task.getTask_id()) |
| | | .onErrorResume(e -> { |
| | | log.error("取消任务 {} 失败: {}", task.getTask_id(), e.getMessage()); |
| | |
| | | // 如果任务状态是"submitted"或"running",继续轮询 |
| | | if (!FinalStatus.COMPLETED.getValue().equalsIgnoreCase(statusResponse.getStatus()) |
| | | && !FinalStatus.FAILED.getValue().equalsIgnoreCase(statusResponse.getStatus()) |
| | | && !FinalStatus.CANCELED.getValue().equalsIgnoreCase(statusResponse.getStatus()) |
| | | && !FinalStatus.CANCELLED.getValue().equalsIgnoreCase(statusResponse.getStatus()) |
| | | && !(FinalStatus.ERROR.getValue().equalsIgnoreCase(statusResponse.getStatus()) |
| | | && statusResponse.getMessage().contains("Task not found")) ) { |
| | | return Mono.delay(Duration.ofSeconds(5)) // 延迟 5 秒后再次查询 |
| | |
| | | KeywordTask keywordTask = new KeywordTask(); |
| | | keywordTask.setKeyword_id(keywordId); |
| | | keywordTask.setTask_id(null); // 任务ID为空 |
| | | keywordTask.setCreate_time( LocalDateTime.now()); |
| | | |
| | | keywordTask.setNum(keyword.getNum()); |
| | | keywordTaskService.save(keywordTask); // 保存 KeywordTask |
| | |
| | | //更新关键词状态 |
| | | LambdaUpdateWrapper<Keyword> updateWrapper = new LambdaUpdateWrapper<>(); |
| | | updateWrapper.eq(Keyword::getKeyword_id, batchRequest.getKeyword_id()); |
| | | updateWrapper.set(Keyword::getStatus, FinalStatus.COMPLETED.getValue()); |
| | | updateWrapper.set(Keyword::getTask_id, taskResponse.getTask_id()); |
| | | keywordService.update(updateWrapper); |
| | | //设置轮数 |
| | | Keyword keyword = keywordService.getById(batchRequest.getKeyword_id()); |
| | | // 更新关键词任务与任务ID的关联 |
| | | // 获取与关键词相关的任务,task_id 为 null,确保只取一个任务 |
| | | // 获取与关键词相关的任务 |
| | | List<KeywordTask> keywordTasks = keywordTaskService.list(new LambdaQueryWrapper<KeywordTask>() |
| | | .eq(KeywordTask::getKeyword_id, keyword.getKeyword_id()) |
| | | .eq(KeywordTask::getNum, keyword.getNum()) |
| | |
| | | if (keywordTasks.size() > 0) { |
| | | KeywordTask keywordTask = keywordTasks.get(0); |
| | | keywordTask.setTask_id(taskResponse.getTask_id()); |
| | | keywordTask.setStatus("pending"); |
| | | keywordTask.setStatus(FinalStatus.PENDING.getValue()); |
| | | keywordTaskService.updateById(keywordTask); |
| | | } |
| | | //将提问词列表的状态转为pending |
| | | for (String questionName : batchRequest.getQuestions()) { |
| | | questionService.update(new LambdaUpdateWrapper<Question>().eq(Question::getKeyword_id, keyword.getKeyword_id()).eq(Question::getQuestion,questionName).set(Question::getStatus, "pending")); |
| | | questionService.update(new LambdaUpdateWrapper<Question>().eq(Question::getKeyword_id, keyword.getKeyword_id()).eq(Question::getQuestion,questionName).set(Question::getStatus, FinalStatus.PENDING.getValue())); |
| | | |
| | | } |
| | | //所有关键词都在采集中或者已完成或者错误设置订单进入采集状态 |
| | |
| | | new LambdaQueryWrapper<KeywordTask>() |
| | | .eq(KeywordTask::getKeyword_id, keywordId) |
| | | .isNotNull(KeywordTask::getTask_id) |
| | | .eq(KeywordTask::getStatus, "pending") |
| | | .eq(KeywordTask::getStatus, FinalStatus.PENDING.getValue()) |
| | | ); |
| | | } |
| | | |
| | |
| | | List<KeywordTask> keywordTasks = keywordTaskService.list(keywordTaskWrapper); |
| | | |
| | | // 更新关键词状态 |
| | | keyword.setStatus("completed"); |
| | | keyword.setStatus(FinalStatus.COMPLETED.getValue()); |
| | | keywordService.updateById(keyword); |
| | | |
| | | //更新提问词状态为取消 |
| | |
| | | private Mono<ResponseResult<?>> cancelRemoteTask(String taskId) { |
| | | // 使用Collections.singletonMap或手动创建Map |
| | | Map<String, Object> requestBody = new HashMap<>(); |
| | | requestBody.put("status", "pending"); |
| | | requestBody.put("status", FinalStatus.PENDING.getValue()); |
| | | |
| | | return webClient.post() |
| | | .uri(baseUrl + "/api/v1/tasks/" + taskId + "/cancel") |
| | |
| | | .allMatch(task -> FinalStatus.COMPLETED.getValue().equals(task.getStatus()) |
| | | || FinalStatus.FALSE.getValue().equals(task.getStatus()) |
| | | || FinalStatus.CANCELLED.getValue().equals(task.getStatus()) |
| | | || FinalStatus.CANCELED.getValue().equals(task.getStatus()) |
| | | || FinalStatus.NONENTITY.getValue().equals(task.getStatus())); |
| | | |
| | | if (allCompletedOrFailed) { |
| | |
| | | List<Reference> validRefGroup = refGroup.stream() |
| | | .filter(Objects::nonNull) |
| | | .collect(Collectors.toList()); |
| | | if (validRefGroup.isEmpty()) return; |
| | | if (validRefGroup.isEmpty()){ |
| | | return; |
| | | } |
| | | |
| | | Optional<Reference> existingRef = validRefGroup.stream() |
| | | .filter(ref -> ref.getReference_id() != null) |
| | |
| | | // 批量更新问题状态 |
| | | List<Question> questionsToUpdate = new ArrayList<>(); |
| | | questions.forEach(question -> { |
| | | List<QuestionResultList> results = questionResultsMap.getOrDefault(question.getQuestion(), io.jsonwebtoken.lang.Collections.emptyList()); |
| | | List<QuestionResultList> results = questionResultsMap.getOrDefault(question.getQuestion(), Collections.emptyList()); |
| | | FinalStatus finalStatus = determineFinalStatus(results); |
| | | |
| | | // 使用枚举进行switch判断 |