guyue
7 天以前 a9287c6b562da327587e2a4bac92df14eb7e2b01
src/main/java/com/linghu/controller/CollectController.java
@@ -20,6 +20,7 @@
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.http.*;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@@ -220,7 +221,7 @@
                        return Mono.just(ResponseResult.success("任务已被取消"));
                    }
                    // 如果任务状态是"submitted"或"running",继续轮询
                    if (!"completed".equalsIgnoreCase(statusResponse.getStatus()) && !"failed".equalsIgnoreCase(statusResponse.getStatus()) && !"cancelled".equalsIgnoreCase(statusResponse.getStatus()) ) {
                    if (!"completed".equalsIgnoreCase(statusResponse.getStatus()) && !"failed".equalsIgnoreCase(statusResponse.getStatus()) && !"cancelled".equalsIgnoreCase(statusResponse.getStatus()) && !("ERROR".equalsIgnoreCase(statusResponse.getStatus()) && statusResponse.getMessage().contains("Task not found")) ) {
                        return Mono.delay(Duration.ofSeconds(5))  // 延迟 5 秒后再次查询
                                .flatMap(aLong -> waitForTaskCompletion(taskId, batchQueue, searchTaskRequest, keywordId));  // 递归调用继续等待
                    } else {
@@ -550,7 +551,13 @@
    @ApiOperation(value = "获取任务结果")
    @GetMapping("/tasks/{taskId}")
    public Mono<TaskResultResponse> getTaskResult(@PathVariable String taskId) {
        return webClient.get()
        WebClient webClient2 = WebClient.builder()
                .exchangeStrategies(ExchangeStrategies.builder()
                        .codecs(configurer -> configurer.defaultCodecs()
                                .maxInMemorySize(10 * 1024 * 1024)) // 10MB
                        .build())
                .build();
        return webClient2.get()
                .uri(baseUrl + "/api/v1/tasks/" + taskId + "/result")
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
@@ -576,6 +583,7 @@
                })
                .onErrorResume(e -> {
                    System.out.println("获取任务结果失败");
                    log.error("获取任务结果失败: {}", e.getMessage(), e);
                        TaskResultResponse result = new TaskResultResponse();
                    result.setDetail("获取任务结果失败: " + e.getMessage());
                    return Mono.just(result);
@@ -651,14 +659,14 @@
                    System.out.println("未找到关联的关键词,task_id: " + result.getTask_id());
                    //报错
                    throw new Exception("未找到关联的关键词,task_id: " + result.getTask_id());
//                    return;
                }
                LambdaQueryWrapper<KeywordTask> keywordTaskWrapper2 = new LambdaQueryWrapper<>();
                keywordTaskWrapper2.eq(KeywordTask::getKeyword_id, keyword.getKeyword_id());
                List<KeywordTask> keywordTasks = keywordTaskService.list(keywordTaskWrapper2);
                //如果全部为completed  关键词也为completed  ,如果关联关系没有任务id,或者状态为running  ,关键词为submitted,
                if (keywordTasks.stream().allMatch(task -> "completed".equals(task.getStatus()) || "false".equals(task.getStatus()) || "cancelled".equals(task.getStatus()) ||"canceled".equals(task.getStatus())) ) {
                //如果全部为completed 或者错误、取消、任务不存在 关键词也为completed  ,如果关联关系没有任务id,或者状态为running  ,关键词为submitted,
                if (keywordTasks.stream().allMatch(task -> "completed".equals(task.getStatus()) || "false".equals(task.getStatus()) || "cancelled".equals(task.getStatus()) ||"canceled".equals(task.getStatus()) || "nonentity".equals(task.getStatus())) ) {
                    keyword.setStatus("completed");
                    keywordService.updateById(keyword);