package com.linghu.controller; import java.time.Duration; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.stream.Collectors; import javax.servlet.http.HttpServletRequest; import com.fasterxml.jackson.core.JsonProcessingException; import com.linghu.model.dto.*; import com.linghu.model.entity.*; import com.linghu.service.*; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.*; import org.springframework.web.reactive.function.client.WebClient; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.linghu.model.common.ResponseResult; import com.linghu.utils.JwtUtils; import io.jsonwebtoken.lang.Collections; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.web.bind.annotation.* ; import org.springframework.http.HttpStatus; import com.linghu.model.dto.TaskResultResponse.QuestionResult; import com.linghu.model.dto.TaskResultResponse.UserResult; import reactor.core.scheduler.Schedulers; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @RestController @RequestMapping("/collect") @Api(value = "采集接口", tags = "采集管理") @Slf4j public class CollectController { @Autowired private ReferenceService referenceService; @Value("${linghu.url}") private String baseUrl; @Autowired private WebClient webClient; @Autowired private JwtUtils jwtUtils; @Autowired private KeywordService keywordService; @Autowired private QuestionService questionService; @Autowired private KeywordTaskService keywordTaskService; @Autowired private PlatformService platformService; @Autowired private TypeService typeService; /* @PostMapping("/search") @ApiOperation(value = "开始采集") public Mono createSearchTask( @RequestBody SearchTaskRequest searchTaskRequest, HttpServletRequest request) throws JsonProcessingException { return webClient.post() .uri(baseUrl + "/api/v1/search") .contentType(MediaType.APPLICATION_JSON) .bodyValue(searchTaskRequest) .retrieve() .onStatus(HttpStatus::is4xxClientError, response -> response.bodyToMono(String.class) .flatMap(errorBody -> Mono.error(new RuntimeException(errorBody)))) .bodyToMono(new ParameterizedTypeReference() { }) .flatMap(responseResult -> { // 提取任务ID SearchTaskResponse taskResponse = responseResult; if (taskResponse != null && taskResponse.getTask_id() != null) { // 保存任务ID到关键词 LambdaUpdateWrapper updateWrapper = new LambdaUpdateWrapper<>(); updateWrapper.eq(Keyword::getKeyword_id, searchTaskRequest.getKeyword_id()); updateWrapper.set(Keyword::getStatus,"Submitted"); updateWrapper.set(Keyword::getTask_id, taskResponse.getTask_id()); keywordService.update(updateWrapper); // 可选:更新响应中的其他信息 } return Mono.just(taskResponse); }) .onErrorResume(e -> { // return Mono.just(ResponseResult.error("调用失败: " + e.getMessage())); SearchTaskResponse task = new SearchTaskResponse(); task.setMessage("调用失败: " + e.getMessage()); return Mono.just(task); }); }*/ // public SearchTaskController(WebClient.Builder webClientBuilder, KeywordService keywordService) { // this.webClient = webClientBuilder.build(); // this.keywordService = keywordService; // } /* @PostMapping("/search") @ApiOperation(value = "开始采集") public Mono createSearchTask( @RequestBody SearchTaskRequest searchTaskRequest, HttpServletRequest request) throws JsonProcessingException { int maxConcurrentUsers = searchTaskRequest.getConfig() != null ? searchTaskRequest.getConfig().getMax_concurrent_users() : 3; List> userBatches = splitUsersIntoBatches(searchTaskRequest.getUsers(), maxConcurrentUsers); return processBatchesSequentially(userBatches, searchTaskRequest) .onErrorResume(e -> { SearchTaskResponse task = new SearchTaskResponse(); task.setMessage("调用失败: " + e.getMessage()); return Mono.just(task); }); } private List> splitUsersIntoBatches(List users, int batchSize) { List> batches = new ArrayList<>(); for (int i = 0; i < users.size(); i += batchSize) { batches.add(users.subList(i, Math.min(i + batchSize, users.size()))); } return batches; } private Mono processBatchesSequentially(List> userBatches, SearchTaskRequest originalRequest) { Mono resultMono = Mono.empty(); for (List batch : userBatches) { SearchTaskRequest batchRequest = new SearchTaskRequest(); batchRequest.setUsers(batch); batchRequest.setQuestions(originalRequest.getQuestions()); batchRequest.setConfig(originalRequest.getConfig()); batchRequest.setSave_to_database(originalRequest.getSave_to_database()); batchRequest.setWebhook_url(originalRequest.getWebhook_url()); batchRequest.setKeyword_id(originalRequest.getKeyword_id()); resultMono = resultMono.then(createSingleBatchTask(batchRequest)); } return resultMono; } private Mono createSingleBatchTask(SearchTaskRequest batchRequest) { return webClient.post() .uri(baseUrl + "/api/v1/search") .contentType(MediaType.APPLICATION_JSON) .bodyValue(batchRequest) .retrieve() .onStatus(HttpStatus::is4xxClientError, response -> response.bodyToMono(String.class) .flatMap(errorBody -> Mono.error(new RuntimeException(errorBody)))) .bodyToMono(new ParameterizedTypeReference() { }) .flatMap(responseResult -> { SearchTaskResponse taskResponse = responseResult; if (taskResponse != null && taskResponse.getTask_id() != null) { LambdaUpdateWrapper updateWrapper = new LambdaUpdateWrapper<>(); updateWrapper.eq(Keyword::getKeyword_id, batchRequest.getKeyword_id()); updateWrapper.set(Keyword::getStatus, "Submitted"); updateWrapper.set(Keyword::getTask_id, taskResponse.getTask_id()); keywordService.update(updateWrapper); } return waitForTaskCompletion(taskResponse.getTask_id()) .then(Mono.just(taskResponse)); }); } private Mono waitForTaskCompletion(String taskId) { return Flux.interval(Duration.ofSeconds(5)) // 每5秒执行一次 .flatMap(tick -> webClient.get() .uri(baseUrl + "/api/v1/tasks/" + taskId) .retrieve() .bodyToMono(TaskStatusResponse.class) ) .filter(response -> "completed".equals(response.getStatus())) .next() // 找到第一个完成的响应后结束流 .then(); // 转换为Mono }*/ // 添加一个辅助方法来安全地将字符串转换为double private double parseUsage(String usageStr) { try { if (usageStr != null) { // 移除可能存在的百分号 usageStr = usageStr.replace("%", "").trim(); return Double.parseDouble(usageStr); } return 0.0; } catch (NumberFormatException e) { log.error("解析资源使用率失败: {}", e.getMessage()); return 0.0; } } @PostMapping("/search") @ApiOperation(value = "开始采集") public Mono> createSearchTask( @RequestBody SearchTaskRequest searchTaskRequest, HttpServletRequest request) throws JsonProcessingException { // 首先检查服务器资源 return getServerResource() .flatMap(resourceResponse -> { // 将字符串类型的使用率转换为double类型 double cpuUsage = parseUsage(resourceResponse.getCpu_usage_percent()); double memoryUsage = parseUsage(resourceResponse.getMemory_usage_percent()); // 检查CPU和内存使用率 if (cpuUsage >= 90.0 || memoryUsage >= 90.0) { String errorMsg = String.format("服务器资源不足:CPU使用率 %.1f%%,内存使用率 %.1f%%", resourceResponse.getCpu_usage_percent(), resourceResponse.getMemory_usage_percent()); log.warn(errorMsg); return Mono.just(ResponseResult.error(503, errorMsg)); } Integer keywordId = searchTaskRequest.getKeyword_id(); int maxConcurrentUsers = searchTaskRequest.getConfig() != null ? searchTaskRequest.getConfig().getMax_concurrent_users() : 3; List> userBatches = splitUsersIntoBatches(searchTaskRequest.getUsers(), maxConcurrentUsers,keywordId); // 获取 keywordId //分割 return Flux.fromIterable(userBatches) .flatMap(batch -> { SearchTaskRequest batchRequest = new SearchTaskRequest(); batchRequest.setUsers(batch); batchRequest.setQuestions(searchTaskRequest.getQuestions()); batchRequest.setConfig(searchTaskRequest.getConfig()); batchRequest.setSave_to_database(searchTaskRequest.getSave_to_database()); batchRequest.setWebhook_url(searchTaskRequest.getWebhook_url()); batchRequest.setKeyword_id(keywordId); return createSingleBatchTask(batchRequest) .delaySubscription(Duration.ofSeconds(2)); // 批次之间添加延迟 }, 1) // 限制并发数为1,确保顺序执行 .collectList() // 收集所有批次的响应 .flatMap(responses -> saveKeywordTasks(keywordId, responses) // 保存关联关系 .thenReturn(responses) // 返回原始响应 ) .map(responses -> ResponseResult.success(responses)) // 使用ResponseResult包装结果 .onErrorResume(e -> { log.error("创建搜索任务失败: {}", e.getMessage(), e); return Mono.just(ResponseResult.error("创建搜索任务失败: " + e.getMessage())); }); }) .onErrorResume(e -> { log.error("检查服务器资源失败: {}", e.getMessage(), e); return Mono.just(ResponseResult.error("检查服务器资源失败: " + e.getMessage())); }); } private Mono saveKeywordTasks(Integer keywordId, List taskResponses) { List keywordTasks = taskResponses.stream() .filter(response -> response.getTask_id() != null) .map(response -> { KeywordTask keywordTask = new KeywordTask(); keywordTask.setKeyword_id(keywordId); keywordTask.setTask_id(response.getTask_id()); keywordTask.setStatus("pending"); return keywordTask; }) .collect(Collectors.toList()); // 将 MyBatis-Plus 的同步方法包装为 Mono return Mono.fromRunnable(() -> { boolean success = keywordTaskService.saveOrUpdateBatch(keywordTasks); if (!success) { throw new RuntimeException("保存关键词任务关联失败"); } }) .doFinally(signalType -> log.info("成功保存 {} 个关键词任务关联", keywordTasks.size())) .then(); } private List> splitUsersIntoBatches(List users, int batchSize,Integer keywordId) { Keyword keyword = keywordService.getById(keywordId); if (null==keyword.getNum()){ keyword.setNum(0); } keyword.setNum(keyword.getNum()+1); keywordService.updateById(keyword); List> batches = new ArrayList<>(); for (int i = 0; i < users.size(); i += batchSize) { batches.add(users.subList(i, Math.min(i + batchSize, users.size()))); } return batches; } private Mono createSingleBatchTask(SearchTaskRequest batchRequest) { return webClient.post() .uri(baseUrl + "/api/v1/search") .contentType(MediaType.APPLICATION_JSON) .bodyValue(batchRequest) .retrieve() .onStatus(HttpStatus::is4xxClientError, response -> response.bodyToMono(String.class) .flatMap(errorBody -> Mono.error(new RuntimeException(errorBody)))) .bodyToMono(new ParameterizedTypeReference() {}) .flatMap(taskResponse -> { if (taskResponse != null && taskResponse.getTask_id() != null) { // 使用 Reactor 的方式更新数据库 return Mono.fromRunnable(() -> { LambdaUpdateWrapper updateWrapper = new LambdaUpdateWrapper<>(); updateWrapper.eq(Keyword::getKeyword_id, batchRequest.getKeyword_id()); updateWrapper.set(Keyword::getStatus, "submitted"); updateWrapper.set(Keyword::getTask_id, taskResponse.getTask_id()); keywordService.update(updateWrapper); }).subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行 .thenReturn(taskResponse); } return Mono.just(taskResponse); }); } // 移除原来的waitForTaskCompletion方法,不再需要同步等待 @ApiOperation(value = "查询任务状态") @GetMapping("/status") public Mono getTaskStatus(String taskId) { return webClient.get() .uri(baseUrl + "/api/v1/tasks/" + taskId) .accept(MediaType.APPLICATION_JSON) .retrieve() .onStatus(HttpStatus::isError, response -> response.bodyToMono(TaskStatusResponse.class) .flatMap(errorBody -> Mono.error(new RuntimeException(errorBody.getDetail())))) .bodyToMono(TaskStatusResponse.class) .flatMap(result -> { TaskStatusResponse taskStatusResponse = result; if (taskStatusResponse != null && taskStatusResponse.getStatus() != null) { List updateQuestions = taskStatusResponse.getQuestions_status().stream() .map(qs -> { Question question = new Question(); question.setQuestion_id(qs.getQuestion_id()); question.setStatus(qs.getStatus()); return question; }).collect(Collectors.toList()); // 包装成响应式操作 return Mono.fromCallable(() -> { questionService.updateBatchById(updateQuestions); return result; }); } return Mono.just(result); }) .onErrorResume(e -> { // 创建一个自定义的错误响应对象 TaskStatusResponse errorResponse = new TaskStatusResponse(); errorResponse.setStatus("ERROR"); errorResponse.setMessage(e.getMessage()); errorResponse.setDetail(e.getMessage()); return Mono.just(errorResponse); }); } @PostMapping("/cancel/{taskId}") @ApiOperation(value = "取消任务") public Mono> cancelTask(@PathVariable String taskId) { return webClient.post() .uri(baseUrl + "/api/v1/tasks/" + taskId + "/cancel") .contentType(MediaType.APPLICATION_JSON) .bodyValue(Collections.emptyMap()) // 添加空请求体 .retrieve() .onStatus(HttpStatus::isError, response -> response.bodyToMono(TaskCancelResponse.class) .flatMap(errorBody -> Mono.error(new RuntimeException(errorBody.getDetail())))) .bodyToMono(TaskCancelResponse.class) .flatMap(cancelResponse -> { // 更新关键词状态 Mono updateKeyword = Mono.fromRunnable(() -> { LambdaUpdateWrapper updateWrapper = new LambdaUpdateWrapper<>(); updateWrapper.eq(Keyword::getTask_id, taskId); updateWrapper.set(Keyword::getStatus, "canceled"); // 统一使用"canceled" keywordService.update(updateWrapper); }) .subscribeOn(Schedulers.boundedElastic()) .then(); // 更新关键词任务状态 Mono updateKeywordTask = Mono.fromRunnable(() -> { LambdaUpdateWrapper updateWrapper = new LambdaUpdateWrapper<>(); updateWrapper.eq(KeywordTask::getTask_id, taskId); updateWrapper.set(KeywordTask::getStatus, "canceled"); // 统一使用"canceled" keywordTaskService.update(updateWrapper); }) .subscribeOn(Schedulers.boundedElastic()) .then(); // 并行执行两个更新操作,并在完成后返回cancelResponse return Mono.when(updateKeyword, updateKeywordTask) .thenReturn(cancelResponse); }) .map(data -> ResponseResult.success(data)) .onErrorResume(e -> { if (e.getMessage().contains("任务不存在")) { return Mono.just(ResponseResult.error(200, e.getMessage())); } else if (e.getMessage().contains("无法取消")) { return Mono.just(ResponseResult.error(200, e.getMessage())); } return Mono.just(ResponseResult.error(500, e.getMessage())); }); } @ApiOperation(value = "获取任务结果") @GetMapping("/tasks/{taskId}") public Mono getTaskResult(@PathVariable String taskId) { return webClient.get() .uri(baseUrl + "/api/v1/tasks/" + taskId + "/result") .accept(MediaType.APPLICATION_JSON) .retrieve() .onStatus(HttpStatus::is4xxClientError, response -> { if (response.statusCode() == HttpStatus.NOT_FOUND) { return response.bodyToMono(String.class) .flatMap(errorBody -> Mono.error(new RuntimeException("任务不存在"))); } else if (response.statusCode() == HttpStatus.BAD_REQUEST) { return response.bodyToMono(String.class) .flatMap(errorBody -> Mono.error(new RuntimeException("任务未完成,无法获取结果"))); } return response.createException().flatMap(Mono::error); }) .bodyToMono(TaskResultResponse.class) .flatMap(responseResult -> { TaskResultResponse result = responseResult; if (result != null && result.getResults() != null) { return updateQuestionAndReference(result) .thenReturn(responseResult); } return Mono.just(responseResult); }) .onErrorResume(e -> { System.out.println("获取任务结果失败"); TaskResultResponse result = new TaskResultResponse(); result.setDetail("获取任务结果失败: " + e.getMessage()); return Mono.just(result); }); } // private Mono updateQuestionAndReference(TaskResultResponse result) { // return Mono.fromRunnable(() -> { // // 1. 更新关键词状态 // LambdaUpdateWrapper keywordUpdate = new LambdaUpdateWrapper<>(); // keywordUpdate.eq(Keyword::getTask_id, result.getTask_id()) // .set(Keyword::getStatus, "completed"); // keywordService.update(keywordUpdate); // // // 查询关键词ID // LambdaQueryWrapper keywordQuery = new LambdaQueryWrapper<>(); // keywordQuery.eq(Keyword::getTask_id, result.getTask_id()); // Keyword keyword = keywordService.getOne(keywordQuery); // // if (keyword == null) { // System.out.println("未找到关联的关键词,task_id: " + result.getTask_id()); // return; // } // // // 2. 处理每个用户的问题结果 // for (UserResult userResult : result.getResults()) { // for (QuestionResult questionResult : userResult.getQuestions_results()) { // // 2.1 查询问题ID // LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); // queryWrapper.eq(Question::getQuestion, questionResult.getQuestion()) // .eq(Question::getKeyword_id, keyword.getKeyword_id()); // Question question = questionService.getOne(queryWrapper); // // if (question != null) { // // 更新问题状态 // LambdaUpdateWrapper updateWrapper = new LambdaUpdateWrapper<>(); // updateWrapper.eq(Question::getQuestion_id, question.getQuestion_id()) // .set(Question::getStatus, questionResult.getStatus()) // .set(Question::getResponse, questionResult.getResponse()) // .set(Question::getExtracted_count, questionResult.getExtracted_count()) // .set(Question::getError, questionResult.getError()) // .set(Question::getTimestamp, LocalDateTime.parse( // questionResult.getTimestamp(), // DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS") // )); // questionService.update(updateWrapper); // // // 2.2 保存引用数据 // List references = questionResult.getReferences().stream() // .map(ref -> { // Reference reference = new Reference(); // reference.setQuestion_id(question.getQuestion_id()); // reference.setTitle(ref.getTitle()); // reference.setUrl(ref.getUrl()); // reference.setDomain(ref.getDomain()); // reference.setCreate_time(LocalDateTime.now()); // return reference; // }) // .collect(Collectors.toList()); // // if (!references.isEmpty()) { // referenceService.saveBatch(references); // } // } else { // System.out.println("未找到匹配的问题,question " + question.getQuestion()); // // } // } // } // }); // } private Mono updateQuestionAndReference(TaskResultResponse result) { return Mono.fromRunnable(() -> { try { // 1. 根据KeywordTask更新关键词状态 // 查询关键词ID LambdaQueryWrapper keywordTaskWrapper = new LambdaQueryWrapper<>(); keywordTaskWrapper.eq(KeywordTask::getTask_id, result.getTask_id()); KeywordTask keywordTask = keywordTaskService.getOne(keywordTaskWrapper); Keyword keyword = keywordService.getById(keywordTask.getKeyword_id()); if (keyword == null) { System.out.println("未找到关联的关键词,task_id: " + result.getTask_id()); //报错 throw new Exception("未找到关联的关键词,task_id: " + result.getTask_id()); // return; } keyword.setStatus("completed"); keywordService.updateById(keyword); // 2. 批量查询所有问题 LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(Question::getKeyword_id, keyword.getKeyword_id()); List questions = questionService.list(queryWrapper); // 构建问题映射表,用于快速查找 Map questionMap = questions.stream() .collect(Collectors.toMap(Question::getQuestion, q -> q)); // 3. 收集所有需要更新的问题和引用 List questionsToUpdate = new ArrayList<>(); List allReferences = new ArrayList<>(); List resultList = new ArrayList<>(); // 遍历结果 for (UserResult userResult : result.getResults()) { for (QuestionResult questionResult : userResult.getQuestions_results()) { try { Question question = questionMap.get(questionResult.getQuestion()); if (question != null) { // 更新问题对象 question.setStatus(questionResult.getStatus()); question.setResponse(questionResult.getResponse()); question.setExtracted_count(questionResult.getExtracted_count()); question.setError(questionResult.getError()); question.setKeyword_id(keyword.getKeyword_id()); // 解析时间戳 if (questionResult.getTimestamp() != null) { DateTimeFormatter formatter = DateTimeFormatter .ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS"); question.setTimestamp( LocalDateTime.parse(questionResult.getTimestamp(), formatter)); } //更新 // questionService.updateById(question); questionsToUpdate.add(question); List references = Optional.ofNullable(questionResult.getReferences()) .orElse(Collections.emptyList()) .stream() .map(ref -> { Reference reference = new Reference(); reference.setQuestion_id(question.getQuestion_id()); reference.setTitle(ref.getTitle()); reference.setUrl(ref.getUrl()); reference.setDomain(ref.getDomain()); reference.setNum(keyword.getNum()); reference.setTask_id(result.getTask_id()); reference.setKeyword_id(keyword.getKeyword_id()); //域名和平台id映射 reference.setCreate_time(LocalDateTime.now()); Platform platform = platformService.getPlatformByDomain(reference.getDomain()); if (platform == null) { //平台为空 创建平台 类型为“默认” Type type = typeService.getOne(new LambdaQueryWrapper().eq(Type::getType_name,"默认")); if (type == null) { Type newType = new Type(); newType.setType_name("默认"); typeService.save(newType); type = newType; } Platform platform1 = new Platform(); platform1.setDomain(reference.getDomain()); platform1.setPlatform_name(reference.getDomain()); platform1.setType_id(type.getType_id()); platformService.save(platform1); reference.setType_id(type.getType_id()); reference.setPlatform_id(platform1.getPlatform_id()); } else { reference.setPlatform_id(platform.getPlatform_id()); Type type = typeService.getById(platform.getType_id()); if (type != null){ reference.setType_id(type.getType_id()); } } return reference; }) .collect(Collectors.toList()); // 添加到总引用列表 if (!references.isEmpty()) { allReferences.addAll(references); } //取数据库中当前关键词的当前轮次的当前问题id结果拿出来 List dbList = referenceService.list(new LambdaQueryWrapper().eq(Reference::getKeyword_id, keyword.getKeyword_id()) .eq(Reference::getNum, keyword.getNum()) .eq(Reference::getQuestion_id, question.getQuestion_id()) ); // 1. 合并两个列表 List combinedList = new ArrayList<>(); combinedList.addAll(allReferences); combinedList.addAll(dbList); // 2. 创建复合键的Map,用于统计完全匹配的记录 Map> compositeKeyMap = combinedList.stream() .collect(Collectors.groupingBy( ref -> ref.getTitle() + "|" + ref.getUrl() + "|" + ref.getDomain() )); // 3. 处理每组重复记录 compositeKeyMap.forEach((key, refGroup) -> { // 3.1 找出组内有ID的记录(优先从dbList中获取) Optional existingRecord = refGroup.stream() .filter(ref -> ref.getReference_id() != null) .findFirst(); // 3.2 统计该组的重复次数(总数-1) int repetitionCount = refGroup.size() - 1; // 3.3 决定最终保留的记录 Reference recordToSave; if (existingRecord.isPresent()) { // 使用已有ID的记录并更新重复次数 recordToSave = existingRecord.get(); recordToSave.setRepetition_num( (recordToSave.getRepetition_num() == null ? 0 : recordToSave.getRepetition_num()) + repetitionCount ); } else { // 没有ID记录则取第一条并设置重复次数 recordToSave = refGroup.get(0); recordToSave.setRepetition_num(repetitionCount); } resultList.add(recordToSave); }); referenceService.saveOrUpdateBatch(resultList); } } catch (Exception e) { log.error(e.getMessage(), e); System.out.println("处理问题结果失败: " + e.getMessage()); } } } // 4. 批量更新问题 System.out.println(questionsToUpdate); if (!questionsToUpdate.isEmpty()) { questionService.updateBatchById(questionsToUpdate); System.out.println("成功批量更新 " + questionsToUpdate.size() + " 个问题"); } // 5. 批量插入引用,使用流式分批处理 // if (!allReferences.isEmpty()) { // int batchSize = 1000; // IntStream.iterate(0, i -> i + batchSize) // .limit((allReferences.size() + batchSize - 1) / batchSize) // .forEach(i -> { // List batch = allReferences.subList( // i, Math.min(i + batchSize, allReferences.size())); // referenceService.saveBatch(batch); // }); // System.out.println("成功批量插入 " + allReferences.size() + " 条引用数据"); // } } catch (Exception e) { log.error("更新问题和引用数据失败: " ,e.getMessage(), e); // System.out.println("更新问题和引用数据失败: " + e.getMessage()); throw new RuntimeException("更新问题和引用数据失败", e); } }); } @GetMapping("/tasks/all") @ApiOperation(value = "获取所有任务列表") public Mono getAllTasks() { return webClient.get() .uri(baseUrl + "/api/v1/tasks") .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(new ParameterizedTypeReference() { }) .onErrorResume(e -> { TaskListResponse response = new TaskListResponse(); response.setDetail("获取任务列表失败: " + e.getMessage()); return Mono.just(response); // return Mono.just(ResponseResult.error("获取任务列表失败: " + e.getMessage())); }); } @GetMapping("/health") @ApiOperation("健康检查") public Mono checkThirdPartyHealth() { return webClient.get() .uri(baseUrl + "/health") // 假设第三方健康检查接口路径为/health .retrieve() .bodyToMono(HealthResponse.class) .onErrorResume(e -> Mono.just( new HealthResponse("unhealthy", null, "", e.getMessage()))); } /** * 查询服务器资源 */ @GetMapping("/server/resource") @ApiOperation(value = "查询服务器资源") public Mono getServerResource() { return webClient.get() .uri(baseUrl + "/api/v1/system/resources") .retrieve() .bodyToMono(ServerResourceResponse.class) .onErrorResume(e -> Mono.just( new ServerResourceResponse( e.getMessage()))); } /** * 传入orderid查所有关键词id以及关键词下面的所有任务id,轮询所有任务状态,如果状态为completed,则循环调用获取结果接口,处理结果 */ }