| | |
| | | import org.springframework.core.ParameterizedTypeReference; |
| | | import org.springframework.dao.DuplicateKeyException; |
| | | import org.springframework.http.*; |
| | | import org.springframework.web.reactive.function.client.ExchangeStrategies; |
| | | import org.springframework.web.reactive.function.client.WebClient; |
| | | |
| | | import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; |
| | |
| | | return Mono.just(ResponseResult.success("任务已被取消")); |
| | | } |
| | | // 如果任务状态是"submitted"或"running",继续轮询 |
| | | if (!"completed".equalsIgnoreCase(statusResponse.getStatus()) && !"failed".equalsIgnoreCase(statusResponse.getStatus()) && !"cancelled".equalsIgnoreCase(statusResponse.getStatus()) ) { |
| | | if (!"completed".equalsIgnoreCase(statusResponse.getStatus()) && !"failed".equalsIgnoreCase(statusResponse.getStatus()) && !"cancelled".equalsIgnoreCase(statusResponse.getStatus()) && !("ERROR".equalsIgnoreCase(statusResponse.getStatus()) && statusResponse.getMessage().contains("Task not found")) ) { |
| | | return Mono.delay(Duration.ofSeconds(5)) // 延迟 5 秒后再次查询 |
| | | .flatMap(aLong -> waitForTaskCompletion(taskId, batchQueue, searchTaskRequest, keywordId)); // 递归调用继续等待 |
| | | } else { |
| | |
| | | @ApiOperation(value = "获取任务结果") |
| | | @GetMapping("/tasks/{taskId}") |
| | | public Mono<TaskResultResponse> getTaskResult(@PathVariable String taskId) { |
| | | return webClient.get() |
| | | WebClient webClient2 = WebClient.builder() |
| | | .exchangeStrategies(ExchangeStrategies.builder() |
| | | .codecs(configurer -> configurer.defaultCodecs() |
| | | .maxInMemorySize(10 * 1024 * 1024)) // 10MB |
| | | .build()) |
| | | .build(); |
| | | return webClient2.get() |
| | | .uri(baseUrl + "/api/v1/tasks/" + taskId + "/result") |
| | | .accept(MediaType.APPLICATION_JSON) |
| | | .retrieve() |
| | |
| | | }) |
| | | .onErrorResume(e -> { |
| | | System.out.println("获取任务结果失败"); |
| | | log.error("获取任务结果失败: {}", e.getMessage(), e); |
| | | TaskResultResponse result = new TaskResultResponse(); |
| | | result.setDetail("获取任务结果失败: " + e.getMessage()); |
| | | return Mono.just(result); |
| | |
| | | System.out.println("未找到关联的关键词,task_id: " + result.getTask_id()); |
| | | //报错 |
| | | throw new Exception("未找到关联的关键词,task_id: " + result.getTask_id()); |
| | | // return; |
| | | |
| | | } |
| | | LambdaQueryWrapper<KeywordTask> keywordTaskWrapper2 = new LambdaQueryWrapper<>(); |
| | | keywordTaskWrapper2.eq(KeywordTask::getKeyword_id, keyword.getKeyword_id()); |
| | | List<KeywordTask> keywordTasks = keywordTaskService.list(keywordTaskWrapper2); |
| | | |
| | | //如果全部为completed 关键词也为completed ,如果关联关系没有任务id,或者状态为running ,关键词为submitted, |
| | | if (keywordTasks.stream().allMatch(task -> "completed".equals(task.getStatus()) || "false".equals(task.getStatus()) || "cancelled".equals(task.getStatus()) ||"canceled".equals(task.getStatus())) ) { |
| | | //如果全部为completed 或者错误、取消、任务不存在 关键词也为completed ,如果关联关系没有任务id,或者状态为running ,关键词为submitted, |
| | | if (keywordTasks.stream().allMatch(task -> "completed".equals(task.getStatus()) || "false".equals(task.getStatus()) || "cancelled".equals(task.getStatus()) ||"canceled".equals(task.getStatus()) || "nonentity".equals(task.getStatus())) ) { |
| | | keyword.setStatus("completed"); |
| | | keywordService.updateById(keyword); |
| | | |