| | |
| | | } |
| | | } |
| | | //更新提问词和引用数据 |
| | | // private Mono<Void> updateQuestionAndReference(TaskResultResponse result) { |
| | | // return Mono.fromRunnable(() -> { |
| | | // try { |
| | | // //查看每个账号信息的status是否正常 |
| | | // |
| | | // // 1. 根据KeywordTask更新关键词状态 |
| | | // // 查询关键词ID |
| | | // LambdaQueryWrapper<KeywordTask> keywordTaskWrapper = new LambdaQueryWrapper<>(); |
| | | // keywordTaskWrapper.eq(KeywordTask::getTask_id, result.getTask_id()); |
| | | // KeywordTask keywordTask = keywordTaskService.getOne(keywordTaskWrapper); |
| | | // keywordTask.setStatus("completed"); |
| | | // keywordTaskService.updateById(keywordTask); |
| | | // Keyword keyword = keywordService.getById(keywordTask.getKeyword_id()); |
| | | // |
| | | // if (keyword == null) { |
| | | // System.out.println("未找到关联的关键词,task_id: " + result.getTask_id()); |
| | | // //报错 |
| | | // throw new Exception("未找到关联的关键词,task_id: " + result.getTask_id()); |
| | | // |
| | | // } |
| | | // LambdaQueryWrapper<KeywordTask> keywordTaskWrapper2 = new LambdaQueryWrapper<>(); |
| | | // keywordTaskWrapper2.eq(KeywordTask::getKeyword_id, keyword.getKeyword_id()); |
| | | // List<KeywordTask> keywordTasks = keywordTaskService.list(keywordTaskWrapper2); |
| | | // |
| | | // //如果全部为completed 或者错误、取消、任务不存在 关键词也为completed ,如果关联关系没有任务id,或者状态为running ,关键词为submitted, |
| | | // if (keywordTasks.stream().allMatch(task -> "completed".equals(task.getStatus()) || "false".equals(task.getStatus()) || "cancelled".equals(task.getStatus()) ||"canceled".equals(task.getStatus()) || "nonentity".equals(task.getStatus())) ) { |
| | | // keyword.setStatus("completed"); |
| | | // keywordService.updateById(keyword); |
| | | // |
| | | // } |
| | | // |
| | | // String orderId = keyword.getOrder_id(); |
| | | // if (orderId == null || orderId.isEmpty()) { |
| | | // System.out.println("关键词[" + keyword.getKeyword_id() + "]未关联订单,跳过订单状态更新"); |
| | | // return; |
| | | // } |
| | | // |
| | | // // 2.更新订单状态为待处理 查询该订单下的所有关键词,更新订单状态(有取消) |
| | | // LambdaQueryWrapper<Keyword> orderKeywordsWrapper = new LambdaQueryWrapper<>(); |
| | | // orderKeywordsWrapper.eq(Keyword::getOrder_id, orderId); |
| | | // List<Keyword> orderKeywords = keywordService.list(orderKeywordsWrapper); |
| | | // |
| | | // if (orderKeywords.isEmpty()) { |
| | | // System.out.println("订单[" + orderId + "]下无关键词,跳过状态更新"); |
| | | // return; |
| | | // } |
| | | // boolean allValid2 = orderKeywords.stream() |
| | | // .allMatch(k -> "completed".equals(k.getStatus()) || "false".equals(k.getStatus()) || "cancelled".equals(k.getStatus())); |
| | | // if (allValid2) { |
| | | // Orders orders = orderService.getById(orderId); |
| | | // if (orders != null) { |
| | | // orders.setStatus(1); // 假设Orders有Integer类型的status字段 |
| | | // orderService.updateById(orders); |
| | | // System.out.println("订单[" + orderId + "]所有关键词采集完成或者取消,已更新状态为1"); |
| | | // } else { |
| | | // System.out.println("未找到订单[" + orderId + "],无法更新状态"); |
| | | // } |
| | | // } |
| | | // // 3.更新订单状态为完成 检查所有关键词的状态是否均为 completed 或 false |
| | | // boolean allValid = orderKeywords.stream() |
| | | // .allMatch(k -> "completed".equals(k.getStatus()) || "false".equals(k.getStatus())); |
| | | // |
| | | // // 4. 若所有关键词状态均有效,更新订单状态为3 |
| | | // if (allValid) { |
| | | // Orders orders = orderService.getById(orderId); |
| | | // if (orders != null) { |
| | | // orders.setStatus(3); // 假设Orders有Integer类型的status字段 |
| | | // orderService.updateById(orders); |
| | | // System.out.println("订单[" + orderId + "]所有关键词状态符合条件,已更新状态为3"); |
| | | // } else { |
| | | // System.out.println("未找到订单[" + orderId + "],无法更新状态"); |
| | | // } |
| | | // } |
| | | // |
| | | // |
| | | // Orders orders = orderService.getById(keyword.getOrder_id()); |
| | | // |
| | | // // 2. 批量查询所有问题 |
| | | // LambdaQueryWrapper<Question> queryWrapper = new LambdaQueryWrapper<>(); |
| | | // queryWrapper.eq(Question::getKeyword_id, keyword.getKeyword_id()); |
| | | // List<Question> questions = questionService.list(queryWrapper); |
| | | // |
| | | // // 构建问题映射表,用于快速查找 |
| | | // Map<String, Question> questionMap = questions.stream() |
| | | // .collect(Collectors.toMap(Question::getQuestion, q -> q)); |
| | | // |
| | | // // 3. 收集所有需要更新的问题和引用 |
| | | // List<Question> questionsToUpdate = new ArrayList<>(); |
| | | // List<Reference> allReferences = new ArrayList<>(); |
| | | // List<Reference> resultList = new ArrayList<>(); |
| | | // |
| | | // // 遍历账号 |
| | | // for (UserResult userResult : result.getResults()) { |
| | | // //更新账号状态 |
| | | // if ( "failed".equals(userResult.getStatus())){ |
| | | // if (userResult.getError().contains("登录失败")){ |
| | | // LambdaUpdateWrapper<User> userWrapper = new LambdaUpdateWrapper<>(); |
| | | // userWrapper.eq(User::getUser_email, userResult.getUser_email()); |
| | | // userWrapper.set(User::getStatus, "无法登录"); |
| | | // userService.update(userWrapper); |
| | | // //更新所有提问词的状态 |
| | | // questionService.update(new LambdaUpdateWrapper<Question>().eq(Question::getKeyword_id, keyword.getKeyword_id()) |
| | | // .set(Question::getStatus, "failed") |
| | | // .set(Question::getError, "账户登录失败")); |
| | | // |
| | | // }else if (userResult.getError().contains("信息错误")){ |
| | | // LambdaUpdateWrapper<User> userWrapper = new LambdaUpdateWrapper<>(); |
| | | // userWrapper.eq(User::getUser_email, userResult.getUser_email()); |
| | | // userWrapper.set(User::getStatus, "信息错误"); |
| | | // userService.update(userWrapper); |
| | | // } |
| | | // } |
| | | // for (QuestionResult questionResult : userResult.getQuestions_results()) { |
| | | // try { |
| | | // Question question = questionMap.get(questionResult.getQuestion()); |
| | | // if (question != null) { |
| | | // |
| | | // |
| | | // //保存问题结果 |
| | | // QuestionResultList questionResultList = new QuestionResultList(); |
| | | // questionResultList.setKeyword_id(keyword.getKeyword_id()); |
| | | // questionResultList.setQuestion(questionResult.getQuestion()); |
| | | // questionResultList.setResponse(questionResult.getResponse()); |
| | | // questionResultList.setStatus(questionResult.getStatus()); |
| | | // questionResultList.setExtracted_count(questionResult.getExtracted_count()); |
| | | // questionResultList.setKeyword_task_id(result.getTask_id()); |
| | | // questionResultList.setError(questionResult.getError()); |
| | | // questionResultList.setNum(keyword.getNum()); |
| | | // if (questionResult.getTimestamp() != null) { |
| | | // DateTimeFormatter formatter = DateTimeFormatter |
| | | // .ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS"); |
| | | // questionResultList.setTimestamp( |
| | | // LocalDateTime.parse(questionResult.getTimestamp(), formatter)); |
| | | // } |
| | | // // 保存问题结果列表(新增保存逻辑) |
| | | // questionResultService.save(questionResultList); |
| | | // // 查询当前轮次下该提问词的所有结果 |
| | | // List<QuestionResultList> allResults = questionResultService.list( |
| | | // new LambdaQueryWrapper<QuestionResultList>() |
| | | // .eq(QuestionResultList::getKeyword_id, keyword.getKeyword_id()) |
| | | // .eq(QuestionResultList::getQuestion, question.getQuestion()) |
| | | // .eq(QuestionResultList::getNum, keyword.getNum()) |
| | | // ); |
| | | // |
| | | // // 判断最终状态 |
| | | // String finalStatus = determineFinalStatus(allResults); |
| | | // if ("success".equals(finalStatus)){ |
| | | // question.setStatus("success"); |
| | | // question.setError(""); |
| | | // }else if ("no_results".equals(finalStatus)){ |
| | | // question.setStatus("success"); |
| | | // question.setError("采集结果无引用数据"); |
| | | // }else if ("busyness".equals(finalStatus)){ |
| | | // question.setStatus("failed"); |
| | | // question.setError("DeepSeek繁忙,请稍后尝试"); |
| | | // } |
| | | // |
| | | // // 更新问题对象 |
| | | // question.setResponse(questionResult.getResponse()); |
| | | // question.setExtracted_count(questionResult.getExtracted_count()); |
| | | //// question.setError(questionResult.getError()); |
| | | // question.setKeyword_id(keyword.getKeyword_id()); |
| | | // |
| | | // if (questionResult.getTimestamp() != null) { |
| | | // DateTimeFormatter formatter = DateTimeFormatter |
| | | // .ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS"); |
| | | // question.setTimestamp( |
| | | // LocalDateTime.parse(questionResult.getTimestamp(), formatter)); |
| | | // } |
| | | // |
| | | // questionsToUpdate.add(question); |
| | | // // 初始化引用列表(避免null) |
| | | // List<Reference> references = new ArrayList<>(); |
| | | // List<TaskResultResponse.Reference> originalReferences = questionResult.getReferences(); |
| | | // if (originalReferences == null) { |
| | | // originalReferences = Collections.emptyList(); |
| | | // } |
| | | // |
| | | // // 遍历原始引用列表,转换为Reference对象 |
| | | // for (TaskResultResponse.Reference ref : originalReferences) { |
| | | // Reference reference = new Reference(); |
| | | // // 设置基本字段 |
| | | // reference.setQuestion_id(question.getQuestion_id()); |
| | | // reference.setTitle(ref.getTitle()); |
| | | // reference.setUrl(ref.getUrl()); |
| | | // reference.setDomain(ref.getDomain()); |
| | | // reference.setNum(keyword.getNum()); |
| | | // reference.setTask_id(result.getTask_id()); |
| | | // reference.setKeyword_id(keyword.getKeyword_id()); |
| | | // if (null!=ref.getPublish_time()) { |
| | | // reference.setCreate_time(ref.getPublish_time().atStartOfDay()); |
| | | // } |
| | | // |
| | | // // 关键:使用优化后的方法获取平台,避免重复创建 |
| | | // Platform platform = getOrCreatePlatform(ref.getDomain(),ref.getPlatform_name()); |
| | | // reference.setPlatform_id(platform.getPlatform_id()); |
| | | // reference.setType_id(platform.getType_id()); // 直接从平台获取类型ID,更可靠 |
| | | // // 添加到结果列表 |
| | | // references.add(reference); |
| | | // } |
| | | // // 添加到总引用列表 |
| | | // if (!references.isEmpty()) { |
| | | // allReferences.addAll(references); |
| | | // } |
| | | // |
| | | // //取数据库中当前关键词的当前轮次的当前问题id结果拿出来 |
| | | // List<Reference> dbList = referenceService.list(new LambdaQueryWrapper<Reference>().eq(Reference::getKeyword_id, keyword.getKeyword_id()) |
| | | // .eq(Reference::getNum, keyword.getNum()) |
| | | // .eq(Reference::getQuestion_id, question.getQuestion_id()) |
| | | // ); |
| | | // |
| | | // // 1. 合并两个列表 |
| | | // List<Reference> combinedList = new ArrayList<>(); |
| | | // combinedList.addAll(allReferences); |
| | | // combinedList.addAll(dbList); |
| | | // |
| | | // // 2. 创建复合键的Map,用于统计完全匹配的记录 |
| | | // Map<String, List<Reference>> compositeKeyMap = combinedList.stream() |
| | | // .collect(Collectors.groupingBy( |
| | | // ref -> ref.getTitle() + "|" + ref.getUrl() + "|" + ref.getDomain() |
| | | // )); |
| | | // |
| | | // // 3. 处理每组重复记录 |
| | | // compositeKeyMap.forEach((key, refGroup) -> { |
| | | // // 3.1 找出组内有ID的记录(优先从dbList中获取) |
| | | // Optional<Reference> existingRecord = refGroup.stream() |
| | | // .filter(ref -> ref.getReference_id() != null) |
| | | // .findFirst(); |
| | | // |
| | | // // 3.2 统计该组的重复次数(总数-1) |
| | | // int repetitionCount = refGroup.size() - 1; |
| | | // |
| | | // // 3.3 决定最终保留的记录 |
| | | // Reference recordToSave = new Reference(); |
| | | // if (existingRecord.isPresent()) { |
| | | // // 使用已有ID的记录并更新重复次数 |
| | | // recordToSave = existingRecord.get(); |
| | | // recordToSave.setRepetition_num( |
| | | // (recordToSave.getRepetition_num() == null ? 1 : recordToSave.getRepetition_num()) |
| | | // + repetitionCount |
| | | // ); |
| | | // } else { |
| | | // // 没有ID记录则取第一条并设置重复次数 |
| | | // recordToSave = refGroup.get(0); |
| | | // recordToSave.setRepetition_num(1+repetitionCount); |
| | | // } |
| | | // |
| | | // resultList.add(recordToSave); |
| | | // }); |
| | | // referenceService.saveOrUpdateBatch(resultList); |
| | | // } |
| | | // } catch (Exception e) { |
| | | // log.error(e.getMessage(), e); |
| | | // System.out.println("处理问题结果失败: " + e.getMessage()); |
| | | // } |
| | | // } |
| | | // |
| | | // |
| | | // } |
| | | // |
| | | // // 4. 批量更新问题 |
| | | // System.out.println(questionsToUpdate); |
| | | // if (!questionsToUpdate.isEmpty()) { |
| | | // questionService.updateBatchById(questionsToUpdate); |
| | | // System.out.println("成功批量更新 " + questionsToUpdate.size() + " 个问题"); |
| | | // } |
| | | // |
| | | // } catch (Exception e) { |
| | | // log.error("更新问题和引用数据失败: " ,e.getMessage(), e); |
| | | // throw new RuntimeException("更新问题和引用数据失败", e); |
| | | // } |
| | | // }); |
| | | // } |
| | | // // 根据所有批次的结果判断最终状态 |
| | | // private String determineFinalStatus(List<QuestionResultList> results) { |
| | | // if (results.isEmpty()) { |
| | | // return "no_results"; // 无结果 |
| | | // } |
| | | // |
| | | // // 统计关键指标 |
| | | // int totalCount = results.size(); |
| | | // int emptyResponseCount = 0; |
| | | // int systemBusyCount = 0; |
| | | // |
| | | // for (QuestionResultList result : results) { |
| | | // // 判断回答是否为空 |
| | | // if (result.getExtracted_count() == 0 ) { |
| | | // emptyResponseCount++; |
| | | // } |
| | | // |
| | | // // 判断是否为系统繁忙 |
| | | // if ("success".equals(result.getStatus()) && (result.getResponse().isEmpty()|| result.getResponse().contains("WebDriver连接中断") || result.getResponse().contains("响应超时"))) { |
| | | // systemBusyCount++; |
| | | // } |
| | | // |
| | | // } |
| | | // |
| | | // // 全返回系统繁忙 |
| | | // if (systemBusyCount == totalCount) { |
| | | // return "busyness"; |
| | | // } |
| | | // // 全返回信息为空 |
| | | // if (emptyResponseCount == totalCount) { |
| | | // return "no_results"; |
| | | // } |
| | | // |
| | | // |
| | | // // 系统繁忙比例超过阈值(可配置,这里设为70%) |
| | | //// double busyRate = (double) systemBusyCount / totalCount; |
| | | //// if (busyRate >= 0.7) { |
| | | //// return "系统繁忙,请稍后尝试"; |
| | | //// } |
| | | // |
| | | // // 其他情况返回成功 |
| | | // return "success"; |
| | | // } |
| | | |
| | | private Mono<Void> updateQuestionAndReference(TaskResultResponse result) { |
| | | return Mono.fromRunnable(() -> doUpdateQuestionAndReference(result)) |
| | | .onErrorResume(e -> { |
| | |
| | | log.info("无有效引用数据,跳过批量保存"); |
| | | return; |
| | | } |
| | | // // 在合并到 compositeKeyMap 之前 |
| | | // validReferences = validReferences.stream() |
| | | // .collect(Collectors.toMap( |
| | | // ref -> ref.getQuestion_id() |
| | | // + "|" + ref.getPlatform_id() |
| | | // + "|" + ref.getType_id() |
| | | // + "|" + ref.getUrl() |
| | | // + "|" + ref.getDomain() |
| | | // + "|" + ref.getKeyword_id() |
| | | // + "|" + ref.getNum(), |
| | | // ref -> ref, |
| | | // (r1, r2) -> r1 // 碰到同 key 就保第一个 |
| | | // )) |
| | | // .values() |
| | | // .stream() |
| | | // .collect(Collectors.toList()); |
| | | |
| | | // 2. 查询数据库中已存在的引用并过滤 null |
| | | LambdaQueryWrapper<Reference> dbRefWrapper = new LambdaQueryWrapper<>(); |
| | |
| | | log.warn("引用核心字段为空,跳过映射:{}", ref); |
| | | return; |
| | | } |
| | | String key = ref.getQuestion_id() + "|" + ref.getTitle() + "|" + ref.getUrl() + "|" + ref.getDomain() + "|" + ref.getNum(); |
| | | String key = ref.getQuestion_id() + "|" |
| | | + ref.getPlatform_id() + "|" |
| | | + ref.getType_id() + "|" |
| | | + ref.getTitle() + "|" |
| | | + ref.getUrl() + "|" |
| | | + ref.getDomain() + "|" |
| | | + ref.getNum(); |
| | | map.computeIfAbsent(key, k -> new ArrayList<>()).add(ref); |
| | | } |
| | | |