guyue
2025-07-21 ad835011afaf88624e5b5f27b248c6b1089b7d8a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
package com.linghu.timeTask;
 
 
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.linghu.controller.CollectController;
import com.linghu.model.dto.TaskResultResponse;
import com.linghu.model.dto.TaskStatusResponse;
import com.linghu.model.entity.KeywordTask;
import com.linghu.model.entity.Question;
import com.linghu.service.KeywordTaskService;
import com.linghu.service.QuestionService;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
 
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
 
@Component
@Slf4j
public class ScheduledTasks {
    private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
 
    @Autowired
    private KeywordTaskService keywordTaskService;
    @Autowired
    private CollectController collectController;
    @Autowired
    private WebClient webClient; // 假设已配置WebClient
    @Autowired
    private QuestionService questionService;
 
    private ScheduledFuture<?> scheduledTask; // 定时任务引用
    // 健康检查专用调度器(单线程足够)
    private final ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor();
    // 任务处理专用调度器(可根据任务量调整线程数)
    private final ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(2); // 2 个线程
    private volatile boolean isHealthy = true; // 健康状态标识
    @PostConstruct
    public void init() {
        // 启动健康检查定时任务,每10分钟执行一次
        healthCheckScheduler.scheduleAtFixedRate(this::checkHealth, 0, 10, TimeUnit.MINUTES);
        // 初始健康时,主动启动任务(如果需要应用启动就立即执行任务)
        if (isHealthy) {
            startTaskProcessing();
        }
    }
 
    /**
     * 健康检查方法,调用第三方服务健康检查接口
     */
    private void checkHealth() {
        log.info("开始执行健康检查...");
        collectController.checkThirdPartyHealth()
                .subscribe(
                        response -> {
                            boolean previousHealthStatus = isHealthy;
                            isHealthy = "healthy".equalsIgnoreCase(response.getStatus());
 
                            // 状态变化时记录日志并控制任务执行
                            if (previousHealthStatus != isHealthy) {
                                if (isHealthy) {
                                    log.info("健康检查通过,恢复任务处理");
                                    startTaskProcessing();
                                } else {
                                    log.warn("健康检查失败,暂停任务处理");
                                    stopTaskProcessing();
                                }
                            }
                        },
                        error -> {
                            log.error("健康检查请求失败: {}", error.getMessage());
                            if (isHealthy) { // 仅在健康状态变化时记录并停止任务
                                isHealthy = false;
                                log.warn("因健康检查请求失败,暂停任务处理");
                                stopTaskProcessing();
                            }
                        }
                );
    }
 
    /**
     * 启动任务处理定时执行
     */
    private synchronized void startTaskProcessing() {
        if (scheduledTask == null || scheduledTask.isCancelled()) {
            scheduledTask = taskScheduler.scheduleAtFixedRate(
                    this::executeTaskProcessing,
                    0,
                    10,
                    TimeUnit.SECONDS
            );
            System.out.println("nima");
        }
    }
 
    /**
     * 停止任务处理定时执行
     */
    private synchronized void stopTaskProcessing() {
        if (scheduledTask != null && !scheduledTask.isCancelled()) {
            scheduledTask.cancel(false);
        }
    }
    @PreDestroy
    public void destroy() {
        // 关闭健康检查调度器
        healthCheckScheduler.shutdown();
        try {
            if (!healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
                healthCheckScheduler.shutdownNow();
            }
        } catch (InterruptedException e) {
            healthCheckScheduler.shutdownNow();
        }
 
        // 关闭任务处理调度器
        taskScheduler.shutdown();
        try {
            if (!taskScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
                taskScheduler.shutdownNow();
            }
        } catch (InterruptedException e) {
            taskScheduler.shutdownNow();
        }
    }
    /**
     * 实际的任务处理方法,替代原@Scheduled注解方法
     */
    public void executeTaskProcessing() {
        if (!isHealthy) {
            log.debug("系统不健康,跳过任务处理");
            return;
        }
 
        // 查询所有状态为pending的任务
        LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(KeywordTask::getStatus, "pending");
 
        keywordTaskService.list(queryWrapper)
                .stream()
                .filter(task -> task.getTask_id() != null)
                .forEach(task -> processTaskStatus(task)
                        .subscribeOn(Schedulers.boundedElastic())
                        .subscribe(
                                updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
                                error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
                        )
                );
    }
 
 
    private Mono<KeywordTask> processTaskStatus(KeywordTask task) {
        return collectController.getTaskStatus(task.getTask_id())
                .flatMap(statusResponse -> {
                    if ("completed".equalsIgnoreCase(statusResponse.getStatus())) {
                        log.info("任务 {} 已完成,获取结果", task.getTask_id());
                        return collectController.getTaskResult(task.getTask_id())
                                .doOnSuccess(result -> log.info("获取任务 {} 结果成功", task.getTask_id()))
                                .thenReturn(task)
                                .map(t -> {
                                    t.setStatus("completed");
                                    return t;
                                });
                    } else if (!"submitted".equalsIgnoreCase(statusResponse.getStatus())
                            && !"running".equalsIgnoreCase(statusResponse.getStatus())
                            && !"Error".equalsIgnoreCase(statusResponse.getStatus())) {
                        task.setStatus("false");
                        return Mono.just(task);
                    }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) {
                        //更新每个提问词的状态
                        return updateQuestionStatus(task, statusResponse); // 抽取为独立方法
                    }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&"任务不存在".equalsIgnoreCase(statusResponse.getMessage())){
                        task.setStatus("nonentity");
                        return Mono.just(task);
                    }
                    else {
                        // 任务仍在进行中,不更新状态
                        return Mono.empty();
                    }
                })
                .switchIfEmpty(Mono.just(task)) // 如果状态检查返回empty,保持原有任务
                .flatMap(t -> {
                    if (!"pending".equalsIgnoreCase(t.getStatus())) {
 
                        // 修改这里:将updateById的结果包装成Mono
                        return Mono.fromSupplier(() -> keywordTaskService.updateById(t))
                                .thenReturn(t);
                    }
                    return Mono.just(t);
                })
                .onErrorResume(e -> {
                    log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage());
                    task.setStatus("error");
 
                    // 将updateById的结果包装成Mono
                    return Mono.fromSupplier(() -> keywordTaskService.updateById(task))
                            .thenReturn(task);
                });
    }
 
    private Mono<KeywordTask> updateQuestionStatus(KeywordTask task, TaskStatusResponse statusResponse) {
        // 1. 先执行同步查询,获取 List<Question>
        List<Question> questions = questionService.lambdaQuery()
                .eq(Question::getKeyword_id, task.getKeyword_id())
                .list();
 
        // 2. 将 List 转为 Flux,再进行响应式处理
        return Flux.fromIterable(questions)
                .flatMap(question -> {
                    // 更新逻辑...
                    String newStatus = statusResponse.getQuestions_status().stream()
                            .filter(qs -> qs.getQuestion().equals(question.getQuestion()))
                            .findFirst()
                            .map(qs -> qs.getStatus())
                            .orElse(question.getStatus());
                    question.setStatus(newStatus);
 
                    return Mono.fromSupplier(() -> questionService.updateById(question))
                            // 此时可以调用 doOnError 处理异常
                            .doOnError(e -> log.error("更新 Question {} 失败: {}", question.getQuestion_id(), e.getMessage()))
                            .onErrorReturn(false); // 异常时返回 false
                })
                .then(Mono.just(task));
    }
}