guyue
2025-07-23 1d22a73ebd5cb8bd420e8ab55e18d5bd19bfdc1e
更新判断新平台
6个文件已修改
306 ■■■■■ 已修改文件
src/main/java/com/linghu/controller/CollectController.java 60 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/linghu/mapper/PlatformMapper.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/linghu/service/PlatformService.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/linghu/service/impl/PlatformServiceImpl.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/linghu/timeTask/ScheduledTasks.java 229 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/mapper/PlatformMapper.xml 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/linghu/controller/CollectController.java
@@ -589,7 +589,8 @@
     */
    private Platform getOrCreatePlatform(String domain,String platformName) {
        // 1. 先尝试查询已存在的平台
        Platform platform = platformService.getPlatformByDomain(domain,platformName);
        Platform platform = platformService.getPlatformByDomain(domain);
        if (platform != null) {
            return platform;
        }
@@ -624,7 +625,7 @@
        } catch (DuplicateKeyException e) {
            // 3. 若捕获到重复键异常,说明并发创建了,重新查询即可(此时数据库中已存在该平台)
            log.warn("平台domain={}已存在,无需重复创建", domain, e);
            return platformService.getPlatformByDomain(domain,platformName); // 重新查询,一定能获取到
            return platformService.getPlatformByDomain(domain); // 重新查询,一定能获取到
        } catch (Exception e) {
            // 处理其他异常(如数据库连接失败等)
            log.error("创建平台失败,domain={}", domain, e);
@@ -724,11 +725,27 @@
                List<Reference> allReferences = new ArrayList<>();
                List<Reference> resultList = new ArrayList<>();
                // 4. 新增:统计所有提问词的结果状态
                boolean allEmptyReferences = true; // 所有提问词引用数据为空
                boolean allSystemBusy = true;      // 所有提问词系统繁忙
                // 遍历结果
                for (UserResult userResult : result.getResults()) {
                    //更新账号状态
                    if ( "failed".equals(userResult.getStatus())){
                        if (userResult.getError().contains("登录失败")){
                            LambdaUpdateWrapper<User> userWrapper =  new LambdaUpdateWrapper<>();
                            userWrapper.eq(User::getUser_email, userResult.getUser_email());
                            userWrapper.set(User::getStatus, "无法登录");
                            userService.update(userWrapper);
                            //更新所有提问词的状态
                            questionService.update(new LambdaUpdateWrapper<Question>().eq(Question::getKeyword_id, keyword.getKeyword_id())
                                    .set(Question::getStatus, "failed")
                                    .set(Question::getError, "账户登录失败"));
                        }else if (userResult.getError().contains("信息错误")){
                            LambdaUpdateWrapper<User> userWrapper =  new LambdaUpdateWrapper<>();
                            userWrapper.eq(User::getUser_email, userResult.getUser_email());
                            userWrapper.set(User::getStatus, "信息错误");
                            userService.update(userWrapper);
                        }
                    }
                    for (QuestionResult questionResult : userResult.getQuestions_results()) {
                        try {
                            Question question = questionMap.get(questionResult.getQuestion());
@@ -765,15 +782,13 @@
                                String finalStatus = determineFinalStatus(allResults);
                                if ("success".equals(finalStatus)){
                                    question.setStatus("success");
                                    question.setError("");
                                }else if ("no_results".equals(finalStatus)){
                                    question.setStatus("failed");
                                    question.setStatus("success");
                                    question.setError("采集结果无引用数据");
                                }else if ("busyness".equals(finalStatus)){
                                    question.setStatus("failed");
                                    question.setError("DeepSeek繁忙,请稍后尝试");
                                }else if ("failed".equals(finalStatus)){
                                    question.setStatus("failed");
                                    question.setError("所有账号登录失败");
                                }
                                // 更新问题对象
@@ -876,25 +891,7 @@
                            System.out.println("处理问题结果失败: " + e.getMessage());
                        }
                    }
                    //更新账号状态
                    if ( "failed".equals(userResult.getStatus())){
                        if (userResult.getError().contains("登录失败")){
                            LambdaUpdateWrapper<User> userWrapper =  new LambdaUpdateWrapper<>();
                            userWrapper.eq(User::getUser_email, userResult.getUser_email());
                            userWrapper.set(User::getStatus, "无法登录");
                            userService.update(userWrapper);
                            //更新所有提问词的状态
                            questionService.update(new LambdaUpdateWrapper<Question>().eq(Question::getKeyword_id, keyword.getKeyword_id())
                                    .set(Question::getStatus, "failed")
                                    .set(Question::getError, "账户登录失败"));
                        }else if (userResult.getError().contains("信息错误")){
                            LambdaUpdateWrapper<User> userWrapper =  new LambdaUpdateWrapper<>();
                            userWrapper.eq(User::getUser_email, userResult.getUser_email());
                            userWrapper.set(User::getStatus, "信息错误");
                            userService.update(userWrapper);
                        }
                    }
                }
@@ -921,7 +918,6 @@
        int totalCount = results.size();
        int emptyResponseCount = 0;
        int systemBusyCount = 0;
        int failedCount = 0;
        for (QuestionResultList result : results) {
            // 判断回答是否为空
@@ -933,9 +929,7 @@
            if ("success".equals(result.getStatus())  && (result.getResponse().isEmpty()|| result.getResponse().contains("WebDriver连接中断") || result.getResponse().contains("响应超时"))) {
                systemBusyCount++;
            }
            if ("failed".equals(result.getStatus()) && result.getError().contains("登录失败")){
                failedCount++;
            }
        }
        // 全返回系统繁忙
@@ -946,9 +940,7 @@
        if (emptyResponseCount == totalCount) {
            return "no_results";
        }
        if (failedCount == totalCount) {
            return "failed";
        }
        // 系统繁忙比例超过阈值(可配置,这里设为70%)
//        double busyRate = (double) systemBusyCount / totalCount;
src/main/java/com/linghu/mapper/PlatformMapper.java
@@ -13,4 +13,5 @@
public interface PlatformMapper extends BaseMapper<Platform> {
    public Platform getPlatformByDomain(@Param("domain") String domain, @Param("platformName") String platformName);
    public Platform getPlatformByDomainWeiYi(@Param("domain") String domain);
}
src/main/java/com/linghu/service/PlatformService.java
@@ -10,4 +10,6 @@
 */
public interface PlatformService extends IService<Platform> {
    public Platform getPlatformByDomain(String domain,String platformName);
    public Platform getPlatformByDomain(String domain);
}
src/main/java/com/linghu/service/impl/PlatformServiceImpl.java
@@ -24,4 +24,9 @@
        return platformMapper.getPlatformByDomain(domain,platformName);
    }
    @Override
    public Platform getPlatformByDomain(String domain) {
        return platformMapper.getPlatformByDomainWeiYi(domain);
    }
}
src/main/java/com/linghu/timeTask/ScheduledTasks.java
@@ -22,14 +22,10 @@
import reactor.core.scheduler.Schedulers;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.LocalDateTime;
import java.time.Duration;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
@@ -45,124 +41,163 @@
    @Autowired
    private QuestionService questionService;
    private ScheduledFuture<?> scheduledTask; // 定时任务引用
    // 健康检查专用调度器(单线程足够)
    private final ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor();
    // 任务处理专用调度器(可根据任务量调整线程数)
    private final ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(2); // 2 个线程
    private volatile boolean isHealthy = true; // 健康状态标识
    private volatile boolean initialCheckComplete = false; // 初始检查完成标志
    private volatile boolean taskEnabled = true; // 任务启用开关
    @PostConstruct
    public void init() {
        // 启动健康检查定时任务,每10分钟执行一次
        healthCheckScheduler.scheduleAtFixedRate(this::checkHealth, 0, 10, TimeUnit.MINUTES);
        checkInitialHealth();
        // 初始健康时,主动启动任务(如果需要应用启动就立即执行任务)
        if (isHealthy) {
            startTaskProcessing();
    }
    /**
     * 执行初始健康检查,确保系统启动时任务状态正确
     */
    private void checkInitialHealth() {
        log.info("执行系统启动初始健康检查...");
        try {
            // 同步执行健康检查,最多等待30秒
            Boolean healthCheckResult = collectController.checkThirdPartyHealth()
                    .map(response -> "healthy".equalsIgnoreCase(response.getStatus()))
                    .block(Duration.ofSeconds(30));
            isHealthy = Boolean.TRUE.equals(healthCheckResult);
            taskEnabled = isHealthy;
            if (isHealthy) {
                log.info("系统启动时健康检查通过,任务处理将正常执行");
            } else {
                log.warn("系统启动时健康检查失败,任务处理将暂停");
            }
        } catch (Exception e) {
            log.error("初始健康检查失败: {}", e.getMessage());
            isHealthy = false;
            taskEnabled = false;
        } finally {
            initialCheckComplete = true;
        }
    }
    /**
     * 健康检查方法,调用第三方服务健康检查接口
     * 健康检查定时任务,每10分钟执行一次
     */
    private void checkHealth() {
        log.info("开始执行健康检查...");
        collectController.checkThirdPartyHealth()
                .subscribe(
                        response -> {
                            boolean previousHealthStatus = isHealthy;
                            isHealthy = "healthy".equalsIgnoreCase(response.getStatus());
    @Scheduled(initialDelay = 600000, fixedRate = 600000) // 10分钟 = 600000毫秒
    public void checkHealth() {
        // 等待初始检查完成
        if (!initialCheckComplete) {
            log.debug("初始健康检查未完成,跳过本次健康检查");
            return;
        }
                            // 状态变化时记录日志并控制任务执行
                            if (previousHealthStatus != isHealthy) {
                                if (isHealthy) {
                                    log.info("健康检查通过,恢复任务处理");
                                    startTaskProcessing();
        log.info("开始执行健康检查...");
        try {
            collectController.checkThirdPartyHealth()
                    .map(response -> "healthy".equalsIgnoreCase(response.getStatus()))
                    .subscribe(
                            healthStatus -> {
                                boolean previousHealthStatus = isHealthy;
                                isHealthy = healthStatus;
                                // 状态变化时更新任务开关
                                if (previousHealthStatus != isHealthy) {
                                    taskEnabled = isHealthy;
                                    if (isHealthy) {
                                        log.info("健康检查通过,恢复任务处理");
                                    } else {
                                        log.warn("健康检查失败,暂停任务处理");
                                    }
                                } else {
                                    log.warn("健康检查失败,暂停任务处理");
                                    stopTaskProcessing();
                                    log.info("健康状态未变化,当前状态: {}", isHealthy ? "健康" : "不健康");
                                }
                            },
                            error -> {
                                log.error("健康检查请求失败: {}", error.getMessage());
                                if (isHealthy) {
                                    isHealthy = false;
                                    taskEnabled = false;
                                    log.warn("因健康检查请求失败,暂停任务处理");
                                }
                            }
                        },
                        error -> {
                            log.error("健康检查请求失败: {}", error.getMessage());
                            if (isHealthy) { // 仅在健康状态变化时记录并停止任务
                                isHealthy = false;
                                log.warn("因健康检查请求失败,暂停任务处理");
                                stopTaskProcessing();
                            }
                        }
                );
    }
    /**
     * 启动任务处理定时执行
     */
    private synchronized void startTaskProcessing() {
        if (scheduledTask == null || scheduledTask.isCancelled()) {
            scheduledTask = taskScheduler.scheduleAtFixedRate(
                    this::executeTaskProcessing,
                    0,
                    10,
                    TimeUnit.SECONDS
            );
            System.out.println("nima");
        }
    }
    /**
     * 停止任务处理定时执行
     */
    private synchronized void stopTaskProcessing() {
        if (scheduledTask != null && !scheduledTask.isCancelled()) {
            scheduledTask.cancel(false);
        }
    }
    @PreDestroy
    public void destroy() {
        // 关闭健康检查调度器
        healthCheckScheduler.shutdown();
        try {
            if (!healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
                healthCheckScheduler.shutdownNow();
                    );
        } catch (Exception e) {
            log.error("健康检查执行异常: {}", e.getMessage());
            if (isHealthy) {
                isHealthy = false;
                taskEnabled = false;
            }
        } catch (InterruptedException e) {
            healthCheckScheduler.shutdownNow();
        }
        // 关闭任务处理调度器
        taskScheduler.shutdown();
        try {
            if (!taskScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
                taskScheduler.shutdownNow();
            }
        } catch (InterruptedException e) {
            taskScheduler.shutdownNow();
        }
    }
    /**
     * 实际的任务处理方法,替代原@Scheduled注解方法
     * 任务处理定时任务,每10秒执行一次
     */
    @Scheduled(initialDelay = 0, fixedRate = 10000) // 10秒 = 10000毫秒
    public void executeTaskProcessing() {
        // 检查初始检查是否完成和任务是否启用
        if (!initialCheckComplete) {
            log.debug("初始检查未完成,跳过本次任务处理");
            return;
        }
        if (!taskEnabled) {
            log.debug("任务已被禁用,跳过本次任务处理");
            return;
        }
        if (!isHealthy) {
            log.debug("系统不健康,跳过任务处理");
            return;
        }
        // 查询所有状态为pending的任务
        LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(KeywordTask::getStatus, "pending");
        try {
            // 查询所有状态为pending的任务
            LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
            queryWrapper.eq(KeywordTask::getStatus, "pending");
        keywordTaskService.list(queryWrapper)
                .stream()
                .filter(task -> task.getTask_id() != null)
                .forEach(task -> processTaskStatus(task)
                        .subscribeOn(Schedulers.boundedElastic())
                        .subscribe(
                                updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
                                error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
                        )
                );
            List<KeywordTask> tasks = keywordTaskService.list(queryWrapper);
            log.info("查询到 {} 个待处理任务", tasks.size());
            for (KeywordTask task : tasks) {
                if (task.getTask_id() != null) {
                    processTaskStatus(task)
                            .subscribeOn(Schedulers.boundedElastic())
                            .subscribe(
                                    updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
                                    error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
                            );
                }
            }
        } catch (Exception e) {
            log.error("任务处理执行异常: {}", e.getMessage());
        }
    }
    /**
     * 实际的任务处理方法,替代原@Scheduled注解方法
     */
//    public void executeTaskProcessing() {
//        if (!isHealthy) {
//            log.debug("系统不健康,跳过任务处理");
//            return;
//        }
//
//        // 查询所有状态为pending的任务
//        LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
//        queryWrapper.eq(KeywordTask::getStatus, "pending");
//
//        keywordTaskService.list(queryWrapper)
//                .stream()
//                .filter(task -> task.getTask_id() != null)
//                .forEach(task -> processTaskStatus(task)
//                        .subscribeOn(Schedulers.boundedElastic())
//                        .subscribe(
//                                updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
//                                error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
//                        )
//                );
//    }
    private Mono<KeywordTask> processTaskStatus(KeywordTask task) {
src/main/resources/mapper/PlatformMapper.xml
@@ -26,8 +26,17 @@
            <include refid="Base_Column_List"/>
            from platform
            where domain = #{domain}
            <if test="null != platformName and '' != platformName">
                and platform_name =#{platformName}
            </if>
    </select>
    <select id="getPlatformByDomainWeiYi" resultType="com.linghu.model.entity.Platform">
        select
        <include refid="Base_Column_List"/>
            from platform
        where domain = #{domain}
    </select>
</mapper>