src/main/java/com/linghu/controller/CollectController.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/linghu/mapper/PlatformMapper.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/linghu/service/PlatformService.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/linghu/service/impl/PlatformServiceImpl.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/linghu/timeTask/ScheduledTasks.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/resources/mapper/PlatformMapper.xml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
src/main/java/com/linghu/controller/CollectController.java
@@ -589,7 +589,8 @@ */ private Platform getOrCreatePlatform(String domain,String platformName) { // 1. 先尝试查询已存在的平台 Platform platform = platformService.getPlatformByDomain(domain,platformName); Platform platform = platformService.getPlatformByDomain(domain); if (platform != null) { return platform; } @@ -624,7 +625,7 @@ } catch (DuplicateKeyException e) { // 3. 若捕获到重复键异常,说明并发创建了,重新查询即可(此时数据库中已存在该平台) log.warn("平台domain={}已存在,无需重复创建", domain, e); return platformService.getPlatformByDomain(domain,platformName); // 重新查询,一定能获取到 return platformService.getPlatformByDomain(domain); // 重新查询,一定能获取到 } catch (Exception e) { // 处理其他异常(如数据库连接失败等) log.error("创建平台失败,domain={}", domain, e); @@ -724,11 +725,27 @@ List<Reference> allReferences = new ArrayList<>(); List<Reference> resultList = new ArrayList<>(); // 4. 新增:统计所有提问词的结果状态 boolean allEmptyReferences = true; // 所有提问词引用数据为空 boolean allSystemBusy = true; // 所有提问词系统繁忙 // 遍历结果 for (UserResult userResult : result.getResults()) { //更新账号状态 if ( "failed".equals(userResult.getStatus())){ if (userResult.getError().contains("登录失败")){ LambdaUpdateWrapper<User> userWrapper = new LambdaUpdateWrapper<>(); userWrapper.eq(User::getUser_email, userResult.getUser_email()); userWrapper.set(User::getStatus, "无法登录"); userService.update(userWrapper); //更新所有提问词的状态 questionService.update(new LambdaUpdateWrapper<Question>().eq(Question::getKeyword_id, keyword.getKeyword_id()) .set(Question::getStatus, "failed") .set(Question::getError, "账户登录失败")); }else if (userResult.getError().contains("信息错误")){ LambdaUpdateWrapper<User> userWrapper = new LambdaUpdateWrapper<>(); userWrapper.eq(User::getUser_email, userResult.getUser_email()); userWrapper.set(User::getStatus, "信息错误"); userService.update(userWrapper); } } for (QuestionResult questionResult : userResult.getQuestions_results()) { try { Question question = questionMap.get(questionResult.getQuestion()); @@ -765,15 +782,13 @@ String finalStatus = determineFinalStatus(allResults); if ("success".equals(finalStatus)){ question.setStatus("success"); question.setError(""); }else if ("no_results".equals(finalStatus)){ question.setStatus("failed"); question.setStatus("success"); question.setError("采集结果无引用数据"); }else if ("busyness".equals(finalStatus)){ question.setStatus("failed"); question.setError("DeepSeek繁忙,请稍后尝试"); }else if ("failed".equals(finalStatus)){ question.setStatus("failed"); question.setError("所有账号登录失败"); } // 更新问题对象 @@ -876,25 +891,7 @@ System.out.println("处理问题结果失败: " + e.getMessage()); } } //更新账号状态 if ( "failed".equals(userResult.getStatus())){ if (userResult.getError().contains("登录失败")){ LambdaUpdateWrapper<User> userWrapper = new LambdaUpdateWrapper<>(); userWrapper.eq(User::getUser_email, userResult.getUser_email()); userWrapper.set(User::getStatus, "无法登录"); userService.update(userWrapper); //更新所有提问词的状态 questionService.update(new LambdaUpdateWrapper<Question>().eq(Question::getKeyword_id, keyword.getKeyword_id()) .set(Question::getStatus, "failed") .set(Question::getError, "账户登录失败")); }else if (userResult.getError().contains("信息错误")){ LambdaUpdateWrapper<User> userWrapper = new LambdaUpdateWrapper<>(); userWrapper.eq(User::getUser_email, userResult.getUser_email()); userWrapper.set(User::getStatus, "信息错误"); userService.update(userWrapper); } } } @@ -921,7 +918,6 @@ int totalCount = results.size(); int emptyResponseCount = 0; int systemBusyCount = 0; int failedCount = 0; for (QuestionResultList result : results) { // 判断回答是否为空 @@ -933,9 +929,7 @@ if ("success".equals(result.getStatus()) && (result.getResponse().isEmpty()|| result.getResponse().contains("WebDriver连接中断") || result.getResponse().contains("响应超时"))) { systemBusyCount++; } if ("failed".equals(result.getStatus()) && result.getError().contains("登录失败")){ failedCount++; } } // 全返回系统繁忙 @@ -946,9 +940,7 @@ if (emptyResponseCount == totalCount) { return "no_results"; } if (failedCount == totalCount) { return "failed"; } // 系统繁忙比例超过阈值(可配置,这里设为70%) // double busyRate = (double) systemBusyCount / totalCount; src/main/java/com/linghu/mapper/PlatformMapper.java
@@ -13,4 +13,5 @@ public interface PlatformMapper extends BaseMapper<Platform> { public Platform getPlatformByDomain(@Param("domain") String domain, @Param("platformName") String platformName); public Platform getPlatformByDomainWeiYi(@Param("domain") String domain); } src/main/java/com/linghu/service/PlatformService.java
@@ -10,4 +10,6 @@ */ public interface PlatformService extends IService<Platform> { public Platform getPlatformByDomain(String domain,String platformName); public Platform getPlatformByDomain(String domain); } src/main/java/com/linghu/service/impl/PlatformServiceImpl.java
@@ -24,4 +24,9 @@ return platformMapper.getPlatformByDomain(domain,platformName); } @Override public Platform getPlatformByDomain(String domain) { return platformMapper.getPlatformByDomainWeiYi(domain); } } src/main/java/com/linghu/timeTask/ScheduledTasks.java
@@ -22,14 +22,10 @@ import reactor.core.scheduler.Schedulers; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.time.LocalDateTime; import java.time.Duration; import java.time.format.DateTimeFormatter; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @Component @Slf4j @@ -45,124 +41,163 @@ @Autowired private QuestionService questionService; private ScheduledFuture<?> scheduledTask; // 定时任务引用 // 健康检查专用调度器(单线程足够) private final ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(); // 任务处理专用调度器(可根据任务量调整线程数) private final ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(2); // 2 个线程 private volatile boolean isHealthy = true; // 健康状态标识 private volatile boolean initialCheckComplete = false; // 初始检查完成标志 private volatile boolean taskEnabled = true; // 任务启用开关 @PostConstruct public void init() { // 启动健康检查定时任务,每10分钟执行一次 healthCheckScheduler.scheduleAtFixedRate(this::checkHealth, 0, 10, TimeUnit.MINUTES); checkInitialHealth(); // 初始健康时,主动启动任务(如果需要应用启动就立即执行任务) } /** * 执行初始健康检查,确保系统启动时任务状态正确 */ private void checkInitialHealth() { log.info("执行系统启动初始健康检查..."); try { // 同步执行健康检查,最多等待30秒 Boolean healthCheckResult = collectController.checkThirdPartyHealth() .map(response -> "healthy".equalsIgnoreCase(response.getStatus())) .block(Duration.ofSeconds(30)); isHealthy = Boolean.TRUE.equals(healthCheckResult); taskEnabled = isHealthy; if (isHealthy) { startTaskProcessing(); log.info("系统启动时健康检查通过,任务处理将正常执行"); } else { log.warn("系统启动时健康检查失败,任务处理将暂停"); } } catch (Exception e) { log.error("初始健康检查失败: {}", e.getMessage()); isHealthy = false; taskEnabled = false; } finally { initialCheckComplete = true; } } /** * 健康检查方法,调用第三方服务健康检查接口 * 健康检查定时任务,每10分钟执行一次 */ private void checkHealth() { log.info("开始执行健康检查..."); collectController.checkThirdPartyHealth() .subscribe( response -> { boolean previousHealthStatus = isHealthy; isHealthy = "healthy".equalsIgnoreCase(response.getStatus()); @Scheduled(initialDelay = 600000, fixedRate = 600000) // 10分钟 = 600000毫秒 public void checkHealth() { // 等待初始检查完成 if (!initialCheckComplete) { log.debug("初始健康检查未完成,跳过本次健康检查"); return; } // 状态变化时记录日志并控制任务执行 log.info("开始执行健康检查..."); try { collectController.checkThirdPartyHealth() .map(response -> "healthy".equalsIgnoreCase(response.getStatus())) .subscribe( healthStatus -> { boolean previousHealthStatus = isHealthy; isHealthy = healthStatus; // 状态变化时更新任务开关 if (previousHealthStatus != isHealthy) { taskEnabled = isHealthy; if (isHealthy) { log.info("健康检查通过,恢复任务处理"); startTaskProcessing(); } else { log.warn("健康检查失败,暂停任务处理"); stopTaskProcessing(); } } else { log.info("健康状态未变化,当前状态: {}", isHealthy ? "健康" : "不健康"); } }, error -> { log.error("健康检查请求失败: {}", error.getMessage()); if (isHealthy) { // 仅在健康状态变化时记录并停止任务 if (isHealthy) { isHealthy = false; taskEnabled = false; log.warn("因健康检查请求失败,暂停任务处理"); stopTaskProcessing(); } } ); } catch (Exception e) { log.error("健康检查执行异常: {}", e.getMessage()); if (isHealthy) { isHealthy = false; taskEnabled = false; } /** * 启动任务处理定时执行 */ private synchronized void startTaskProcessing() { if (scheduledTask == null || scheduledTask.isCancelled()) { scheduledTask = taskScheduler.scheduleAtFixedRate( this::executeTaskProcessing, 0, 10, TimeUnit.SECONDS ); System.out.println("nima"); } } /** * 停止任务处理定时执行 * 任务处理定时任务,每10秒执行一次 */ private synchronized void stopTaskProcessing() { if (scheduledTask != null && !scheduledTask.isCancelled()) { scheduledTask.cancel(false); } } @PreDestroy public void destroy() { // 关闭健康检查调度器 healthCheckScheduler.shutdown(); try { if (!healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS)) { healthCheckScheduler.shutdownNow(); } } catch (InterruptedException e) { healthCheckScheduler.shutdownNow(); } // 关闭任务处理调度器 taskScheduler.shutdown(); try { if (!taskScheduler.awaitTermination(5, TimeUnit.SECONDS)) { taskScheduler.shutdownNow(); } } catch (InterruptedException e) { taskScheduler.shutdownNow(); } } /** * 实际的任务处理方法,替代原@Scheduled注解方法 */ @Scheduled(initialDelay = 0, fixedRate = 10000) // 10秒 = 10000毫秒 public void executeTaskProcessing() { // 检查初始检查是否完成和任务是否启用 if (!initialCheckComplete) { log.debug("初始检查未完成,跳过本次任务处理"); return; } if (!taskEnabled) { log.debug("任务已被禁用,跳过本次任务处理"); return; } if (!isHealthy) { log.debug("系统不健康,跳过任务处理"); return; } try { // 查询所有状态为pending的任务 LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(KeywordTask::getStatus, "pending"); keywordTaskService.list(queryWrapper) .stream() .filter(task -> task.getTask_id() != null) .forEach(task -> processTaskStatus(task) List<KeywordTask> tasks = keywordTaskService.list(queryWrapper); log.info("查询到 {} 个待处理任务", tasks.size()); for (KeywordTask task : tasks) { if (task.getTask_id() != null) { processTaskStatus(task) .subscribeOn(Schedulers.boundedElastic()) .subscribe( updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) ) ); } } } catch (Exception e) { log.error("任务处理执行异常: {}", e.getMessage()); } } /** * 实际的任务处理方法,替代原@Scheduled注解方法 */ // public void executeTaskProcessing() { // if (!isHealthy) { // log.debug("系统不健康,跳过任务处理"); // return; // } // // // 查询所有状态为pending的任务 // LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); // queryWrapper.eq(KeywordTask::getStatus, "pending"); // // keywordTaskService.list(queryWrapper) // .stream() // .filter(task -> task.getTask_id() != null) // .forEach(task -> processTaskStatus(task) // .subscribeOn(Schedulers.boundedElastic()) // .subscribe( // updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), // error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) // ) // ); // } private Mono<KeywordTask> processTaskStatus(KeywordTask task) { src/main/resources/mapper/PlatformMapper.xml
@@ -26,8 +26,17 @@ <include refid="Base_Column_List"/> from platform where domain = #{domain} <if test="null != platformName and '' != platformName"> and platform_name =#{platformName} </if> </select> <select id="getPlatformByDomainWeiYi" resultType="com.linghu.model.entity.Platform"> select <include refid="Base_Column_List"/> from platform where domain = #{domain} </select> </mapper>