From a9287c6b562da327587e2a4bac92df14eb7e2b01 Mon Sep 17 00:00:00 2001 From: guyue <1721849008@qq.com> Date: 星期六, 26 七月 2025 19:16:14 +0800 Subject: [PATCH] 增加获取结果缓冲区的上限 --- src/main/java/com/linghu/controller/CollectController.java | 18 +++++++++++++----- 1 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/linghu/controller/CollectController.java b/src/main/java/com/linghu/controller/CollectController.java index 7b5bc54..23cf78b 100644 --- a/src/main/java/com/linghu/controller/CollectController.java +++ b/src/main/java/com/linghu/controller/CollectController.java @@ -20,6 +20,7 @@ import org.springframework.core.ParameterizedTypeReference; import org.springframework.dao.DuplicateKeyException; import org.springframework.http.*; +import org.springframework.web.reactive.function.client.ExchangeStrategies; import org.springframework.web.reactive.function.client.WebClient; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; @@ -220,7 +221,7 @@ return Mono.just(ResponseResult.success("任务已被取消")); } // 如果任务状态是"submitted"或"running",继续轮询 - if (!"completed".equalsIgnoreCase(statusResponse.getStatus()) && !"failed".equalsIgnoreCase(statusResponse.getStatus()) && !"cancelled".equalsIgnoreCase(statusResponse.getStatus()) ) { + if (!"completed".equalsIgnoreCase(statusResponse.getStatus()) && !"failed".equalsIgnoreCase(statusResponse.getStatus()) && !"cancelled".equalsIgnoreCase(statusResponse.getStatus()) && !("ERROR".equalsIgnoreCase(statusResponse.getStatus()) && statusResponse.getMessage().contains("Task not found")) ) { return Mono.delay(Duration.ofSeconds(5)) // 延迟 5 秒后再次查询 .flatMap(aLong -> waitForTaskCompletion(taskId, batchQueue, searchTaskRequest, keywordId)); // 递归调用继续等待 } else { @@ -550,7 +551,13 @@ @ApiOperation(value = "获取任务结果") @GetMapping("/tasks/{taskId}") public Mono<TaskResultResponse> getTaskResult(@PathVariable String taskId) { - return webClient.get() + WebClient webClient2 = WebClient.builder() + .exchangeStrategies(ExchangeStrategies.builder() + .codecs(configurer -> configurer.defaultCodecs() + .maxInMemorySize(10 * 1024 * 1024)) // 10MB + .build()) + .build(); + return webClient2.get() .uri(baseUrl + "/api/v1/tasks/" + taskId + "/result") .accept(MediaType.APPLICATION_JSON) .retrieve() @@ -576,6 +583,7 @@ }) .onErrorResume(e -> { System.out.println("获取任务结果失败"); + log.error("获取任务结果失败: {}", e.getMessage(), e); TaskResultResponse result = new TaskResultResponse(); result.setDetail("获取任务结果失败: " + e.getMessage()); return Mono.just(result); @@ -651,14 +659,14 @@ System.out.println("未找到关联的关键词,task_id: " + result.getTask_id()); //报错 throw new Exception("未找到关联的关键词,task_id: " + result.getTask_id()); -// return; + } LambdaQueryWrapper<KeywordTask> keywordTaskWrapper2 = new LambdaQueryWrapper<>(); keywordTaskWrapper2.eq(KeywordTask::getKeyword_id, keyword.getKeyword_id()); List<KeywordTask> keywordTasks = keywordTaskService.list(keywordTaskWrapper2); - //如果全部为completed 关键词也为completed ,如果关联关系没有任务id,或者状态为running ,关键词为submitted, - if (keywordTasks.stream().allMatch(task -> "completed".equals(task.getStatus()) || "false".equals(task.getStatus()) || "cancelled".equals(task.getStatus()) ||"canceled".equals(task.getStatus())) ) { + //如果全部为completed 或者错误、取消、任务不存在 关键词也为completed ,如果关联关系没有任务id,或者状态为running ,关键词为submitted, + if (keywordTasks.stream().allMatch(task -> "completed".equals(task.getStatus()) || "false".equals(task.getStatus()) || "cancelled".equals(task.getStatus()) ||"canceled".equals(task.getStatus()) || "nonentity".equals(task.getStatus())) ) { keyword.setStatus("completed"); keywordService.updateById(keyword); -- Gitblit v1.7.1