guyue
2025-07-23 1d22a73ebd5cb8bd420e8ab55e18d5bd19bfdc1e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
package com.linghu.timeTask;
 
 
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.linghu.controller.CollectController;
import com.linghu.model.dto.TaskResultResponse;
import com.linghu.model.dto.TaskStatusResponse;
import com.linghu.model.entity.KeywordTask;
import com.linghu.model.entity.Question;
import com.linghu.service.KeywordTaskService;
import com.linghu.service.QuestionService;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
 
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.time.format.DateTimeFormatter;
import java.util.List;
 
 
@Component
@Slf4j
public class ScheduledTasks {
    private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
 
    @Autowired
    private KeywordTaskService keywordTaskService;
    @Autowired
    private CollectController collectController;
    @Autowired
    private WebClient webClient; // 假设已配置WebClient
    @Autowired
    private QuestionService questionService;
 
    private volatile boolean isHealthy = true; // 健康状态标识
    private volatile boolean initialCheckComplete = false; // 初始检查完成标志
    private volatile boolean taskEnabled = true; // 任务启用开关
    @PostConstruct
    public void init() {
        // 启动健康检查定时任务,每10分钟执行一次
        checkInitialHealth();
        // 初始健康时,主动启动任务(如果需要应用启动就立即执行任务)
 
    }
 
    /**
     * 执行初始健康检查,确保系统启动时任务状态正确
     */
    private void checkInitialHealth() {
        log.info("执行系统启动初始健康检查...");
 
        try {
            // 同步执行健康检查,最多等待30秒
            Boolean healthCheckResult = collectController.checkThirdPartyHealth()
                    .map(response -> "healthy".equalsIgnoreCase(response.getStatus()))
                    .block(Duration.ofSeconds(30));
 
            isHealthy = Boolean.TRUE.equals(healthCheckResult);
            taskEnabled = isHealthy;
 
            if (isHealthy) {
                log.info("系统启动时健康检查通过,任务处理将正常执行");
            } else {
                log.warn("系统启动时健康检查失败,任务处理将暂停");
            }
        } catch (Exception e) {
            log.error("初始健康检查失败: {}", e.getMessage());
            isHealthy = false;
            taskEnabled = false;
        } finally {
            initialCheckComplete = true;
        }
    }
 
    /**
     * 健康检查定时任务,每10分钟执行一次
     */
    @Scheduled(initialDelay = 600000, fixedRate = 600000) // 10分钟 = 600000毫秒
    public void checkHealth() {
        // 等待初始检查完成
        if (!initialCheckComplete) {
            log.debug("初始健康检查未完成,跳过本次健康检查");
            return;
        }
 
        log.info("开始执行健康检查...");
        try {
            collectController.checkThirdPartyHealth()
                    .map(response -> "healthy".equalsIgnoreCase(response.getStatus()))
                    .subscribe(
                            healthStatus -> {
                                boolean previousHealthStatus = isHealthy;
                                isHealthy = healthStatus;
 
                                // 状态变化时更新任务开关
                                if (previousHealthStatus != isHealthy) {
                                    taskEnabled = isHealthy;
                                    if (isHealthy) {
                                        log.info("健康检查通过,恢复任务处理");
                                    } else {
                                        log.warn("健康检查失败,暂停任务处理");
                                    }
                                } else {
                                    log.info("健康状态未变化,当前状态: {}", isHealthy ? "健康" : "不健康");
                                }
                            },
                            error -> {
                                log.error("健康检查请求失败: {}", error.getMessage());
                                if (isHealthy) {
                                    isHealthy = false;
                                    taskEnabled = false;
                                    log.warn("因健康检查请求失败,暂停任务处理");
                                }
                            }
                    );
        } catch (Exception e) {
            log.error("健康检查执行异常: {}", e.getMessage());
            if (isHealthy) {
                isHealthy = false;
                taskEnabled = false;
            }
        }
    }
 
    /**
     * 任务处理定时任务,每10秒执行一次
     */
    @Scheduled(initialDelay = 0, fixedRate = 10000) // 10秒 = 10000毫秒
    public void executeTaskProcessing() {
        // 检查初始检查是否完成和任务是否启用
        if (!initialCheckComplete) {
            log.debug("初始检查未完成,跳过本次任务处理");
            return;
        }
 
        if (!taskEnabled) {
            log.debug("任务已被禁用,跳过本次任务处理");
            return;
        }
 
        if (!isHealthy) {
            log.debug("系统不健康,跳过任务处理");
            return;
        }
 
        try {
            // 查询所有状态为pending的任务
            LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
            queryWrapper.eq(KeywordTask::getStatus, "pending");
 
            List<KeywordTask> tasks = keywordTaskService.list(queryWrapper);
            log.info("查询到 {} 个待处理任务", tasks.size());
 
            for (KeywordTask task : tasks) {
                if (task.getTask_id() != null) {
                    processTaskStatus(task)
                            .subscribeOn(Schedulers.boundedElastic())
                            .subscribe(
                                    updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
                                    error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
                            );
                }
            }
        } catch (Exception e) {
            log.error("任务处理执行异常: {}", e.getMessage());
        }
    }
    /**
     * 实际的任务处理方法,替代原@Scheduled注解方法
     */
//    public void executeTaskProcessing() {
//        if (!isHealthy) {
//            log.debug("系统不健康,跳过任务处理");
//            return;
//        }
//
//        // 查询所有状态为pending的任务
//        LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
//        queryWrapper.eq(KeywordTask::getStatus, "pending");
//
//        keywordTaskService.list(queryWrapper)
//                .stream()
//                .filter(task -> task.getTask_id() != null)
//                .forEach(task -> processTaskStatus(task)
//                        .subscribeOn(Schedulers.boundedElastic())
//                        .subscribe(
//                                updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
//                                error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
//                        )
//                );
//    }
 
 
    private Mono<KeywordTask> processTaskStatus(KeywordTask task) {
        return collectController.getTaskStatus(task.getTask_id())
                .flatMap(statusResponse -> {
                    if ("completed".equalsIgnoreCase(statusResponse.getStatus())) {
                        log.info("任务 {} 已完成,获取结果", task.getTask_id());
                        return collectController.getTaskResult(task.getTask_id())
                                .doOnSuccess(result -> log.info("获取任务 {} 结果成功", task.getTask_id()))
                                .thenReturn(task)
                                .map(t -> {
                                    t.setStatus("completed");
                                    return t;
                                });
                    } else if (!"submitted".equalsIgnoreCase(statusResponse.getStatus())
                            && !"running".equalsIgnoreCase(statusResponse.getStatus())
                            && !"Error".equalsIgnoreCase(statusResponse.getStatus())) {
                        task.setStatus("false");
                        return Mono.just(task);
                    }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) {
                        //更新每个提问词的状态
                        return updateQuestionStatus(task, statusResponse); // 抽取为独立方法
                    }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&"任务不存在".equalsIgnoreCase(statusResponse.getMessage())){
                        task.setStatus("nonentity");
                        return Mono.just(task);
                    }
                    else {
                        // 任务仍在进行中,不更新状态
                        return Mono.empty();
                    }
                })
                .switchIfEmpty(Mono.just(task)) // 如果状态检查返回empty,保持原有任务
                .flatMap(t -> {
                    if (!"pending".equalsIgnoreCase(t.getStatus())) {
 
                        // 修改这里:将updateById的结果包装成Mono
                        return Mono.fromSupplier(() -> keywordTaskService.updateById(t))
                                .thenReturn(t);
                    }
                    return Mono.just(t);
                })
                .onErrorResume(e -> {
                    log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage());
                    task.setStatus("error");
 
                    // 将updateById的结果包装成Mono
                    return Mono.fromSupplier(() -> keywordTaskService.updateById(task))
                            .thenReturn(task);
                });
    }
 
    private Mono<KeywordTask> updateQuestionStatus(KeywordTask task, TaskStatusResponse statusResponse) {
        // 1. 先执行同步查询,获取 List<Question>
        List<Question> questions = questionService.lambdaQuery()
                .eq(Question::getKeyword_id, task.getKeyword_id())
                .list();
 
        // 2. 将 List 转为 Flux,再进行响应式处理
        return Flux.fromIterable(questions)
                .flatMap(question -> {
                    // 更新逻辑...
                    String newStatus = statusResponse.getQuestions_status().stream()
                            .filter(qs -> qs.getQuestion().equals(question.getQuestion()))
                            .findFirst()
                            .map(qs -> qs.getStatus())
                            .orElse(question.getStatus());
                    question.setStatus(newStatus);
 
                    return Mono.fromSupplier(() -> questionService.updateById(question))
                            // 此时可以调用 doOnError 处理异常
                            .doOnError(e -> log.error("更新 Question {} 失败: {}", question.getQuestion_id(), e.getMessage()))
                            .onErrorReturn(false); // 异常时返回 false
                })
                .then(Mono.just(task));
    }
}