guyue
2025-09-03 dd028e18a12ad9ae7c43ed09b15ddd6bde1a43e9
src/main/java/com/linghu/service/impl/CollectionServiceImpl.java
@@ -30,7 +30,6 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
@Slf4j
@Service
public class CollectionServiceImpl implements CollectionService {
@@ -85,7 +84,12 @@
                    // 将新的任务请求加入队列
                    taskQueue.add(searchTaskRequest);
                    //更新状态为采集中
                    LambdaUpdateWrapper<Keyword> updateWrapper = new LambdaUpdateWrapper<>();
                    updateWrapper.eq(Keyword::getKeyword_id, searchTaskRequest.getKeyword_id());
                    updateWrapper.set(Keyword::getStatus, FinalStatus.SUBMITTED.getValue());
                    keywordService.update(updateWrapper);
                    // 如果当前没有任务在处理中,则启动任务队列的处理
                    if (!isProcessing) {
                        processNextTaskInQueue();
@@ -181,7 +185,7 @@
        return Flux.fromIterable(tasksToCancelRemotely)
                .flatMap(task -> {
                    // 创建状态更新和远程取消的组合操作
                    Mono<Void> updateStatus = updateTaskStatus(task.getTask_id(), "cancelled");
                    Mono<Void> updateStatus = updateTaskStatus(task.getTask_id(), FinalStatus.CANCELLED.getValue());
                    Mono<ResponseResult<?>> cancelOp = cancelRemoteTask(task.getTask_id())
                            .onErrorResume(e -> {
                                log.error("取消任务 {} 失败: {}", task.getTask_id(), e.getMessage());
@@ -347,7 +351,7 @@
                    // 如果任务状态是"submitted"或"running",继续轮询
                    if (!FinalStatus.COMPLETED.getValue().equalsIgnoreCase(statusResponse.getStatus())
                            && !FinalStatus.FAILED.getValue().equalsIgnoreCase(statusResponse.getStatus())
                            && !FinalStatus.CANCELED.getValue().equalsIgnoreCase(statusResponse.getStatus())
                            && !FinalStatus.CANCELLED.getValue().equalsIgnoreCase(statusResponse.getStatus())
                            && !(FinalStatus.ERROR.getValue().equalsIgnoreCase(statusResponse.getStatus())
                            && statusResponse.getMessage().contains("Task not found")) ) {
                        return Mono.delay(Duration.ofSeconds(5))  // 延迟 5 秒后再次查询
@@ -397,6 +401,7 @@
            KeywordTask keywordTask = new KeywordTask();
            keywordTask.setKeyword_id(keywordId);
            keywordTask.setTask_id(null);  // 任务ID为空
            keywordTask.setCreate_time( LocalDateTime.now());
            keywordTask.setNum(keyword.getNum());
            keywordTaskService.save(keywordTask);  // 保存 KeywordTask
@@ -439,13 +444,12 @@
                                    //更新关键词状态
                                    LambdaUpdateWrapper<Keyword> updateWrapper = new LambdaUpdateWrapper<>();
                                    updateWrapper.eq(Keyword::getKeyword_id, batchRequest.getKeyword_id());
                                    updateWrapper.set(Keyword::getStatus, FinalStatus.COMPLETED.getValue());
                                    updateWrapper.set(Keyword::getTask_id, taskResponse.getTask_id());
                                    keywordService.update(updateWrapper);
                                    //设置轮数
                                    Keyword keyword = keywordService.getById(batchRequest.getKeyword_id());
                                    // 更新关键词任务与任务ID的关联
                                    // 获取与关键词相关的任务,task_id 为 null,确保只取一个任务
                                    // 获取与关键词相关的任务
                                    List<KeywordTask> keywordTasks = keywordTaskService.list(new LambdaQueryWrapper<KeywordTask>()
                                            .eq(KeywordTask::getKeyword_id, keyword.getKeyword_id())
                                            .eq(KeywordTask::getNum, keyword.getNum())
@@ -453,12 +457,12 @@
                                    if (keywordTasks.size() > 0) {
                                        KeywordTask keywordTask = keywordTasks.get(0);
                                        keywordTask.setTask_id(taskResponse.getTask_id());
                                        keywordTask.setStatus("pending");
                                        keywordTask.setStatus(FinalStatus.PENDING.getValue());
                                        keywordTaskService.updateById(keywordTask);
                                    }
                                    //将提问词列表的状态转为pending
                                    for (String questionName : batchRequest.getQuestions()) {
                                        questionService.update(new LambdaUpdateWrapper<Question>().eq(Question::getKeyword_id, keyword.getKeyword_id()).eq(Question::getQuestion,questionName).set(Question::getStatus, "pending"));
                                        questionService.update(new LambdaUpdateWrapper<Question>().eq(Question::getKeyword_id, keyword.getKeyword_id()).eq(Question::getQuestion,questionName).set(Question::getStatus, FinalStatus.PENDING.getValue()));
                                    }
                                    //所有关键词都在采集中或者已完成或者错误设置订单进入采集状态
@@ -497,7 +501,7 @@
                new LambdaQueryWrapper<KeywordTask>()
                        .eq(KeywordTask::getKeyword_id, keywordId)
                        .isNotNull(KeywordTask::getTask_id)
                        .eq(KeywordTask::getStatus, "pending")
                        .eq(KeywordTask::getStatus, FinalStatus.PENDING.getValue())
        );
    }
@@ -524,7 +528,7 @@
                List<KeywordTask> keywordTasks = keywordTaskService.list(keywordTaskWrapper);
                // 更新关键词状态
                keyword.setStatus("completed");
                keyword.setStatus(FinalStatus.COMPLETED.getValue());
                keywordService.updateById(keyword);
                //更新提问词状态为取消
@@ -588,7 +592,7 @@
    private Mono<ResponseResult<?>> cancelRemoteTask(String taskId) {
        // 使用Collections.singletonMap或手动创建Map
        Map<String, Object> requestBody = new HashMap<>();
        requestBody.put("status", "pending");
        requestBody.put("status", FinalStatus.PENDING.getValue());
        return webClient.post()
                .uri(baseUrl + "/api/v1/tasks/" + taskId + "/cancel")
@@ -762,7 +766,6 @@
                .allMatch(task -> FinalStatus.COMPLETED.getValue().equals(task.getStatus())
                        || FinalStatus.FALSE.getValue().equals(task.getStatus())
                        || FinalStatus.CANCELLED.getValue().equals(task.getStatus())
                        || FinalStatus.CANCELED.getValue().equals(task.getStatus())
                        || FinalStatus.NONENTITY.getValue().equals(task.getStatus()));
        if (allCompletedOrFailed) {
@@ -908,7 +911,9 @@
            List<Reference> validRefGroup = refGroup.stream()
                    .filter(Objects::nonNull)
                    .collect(Collectors.toList());
            if (validRefGroup.isEmpty()) return;
            if (validRefGroup.isEmpty()){
                return;
            }
            Optional<Reference> existingRef = validRefGroup.stream()
                    .filter(ref -> ref.getReference_id() != null)
@@ -983,7 +988,7 @@
        // 批量更新问题状态
        List<Question> questionsToUpdate = new ArrayList<>();
        questions.forEach(question -> {
            List<QuestionResultList> results = questionResultsMap.getOrDefault(question.getQuestion(), io.jsonwebtoken.lang.Collections.emptyList());
            List<QuestionResultList> results = questionResultsMap.getOrDefault(question.getQuestion(), Collections.emptyList());
            FinalStatus finalStatus = determineFinalStatus(results);
            // 使用枚举进行switch判断