From 749570394745ae95ee65605eadb42800f42fa20a Mon Sep 17 00:00:00 2001 From: huliguo <2023611923@qq.com> Date: 星期五, 08 八月 2025 12:09:26 +0800 Subject: [PATCH] 规范 --- src/main/java/com/linghu/controller/CollectController.java | 390 +++++-------------------------------------------------- 1 files changed, 39 insertions(+), 351 deletions(-) diff --git a/src/main/java/com/linghu/controller/CollectController.java b/src/main/java/com/linghu/controller/CollectController.java index 434c9d6..a90b4bf 100644 --- a/src/main/java/com/linghu/controller/CollectController.java +++ b/src/main/java/com/linghu/controller/CollectController.java @@ -1,396 +1,84 @@ package com.linghu.controller; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; + import javax.servlet.http.HttpServletRequest; +import javax.validation.Valid; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.linghu.model.dto.*; + +import com.linghu.service.*; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.core.ParameterizedTypeReference; -import org.springframework.http.*; -import org.springframework.web.client.RestTemplate; -import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; - -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.linghu.model.common.ResponseResult; -import com.linghu.model.dto.HealthResponse; -import com.linghu.model.dto.SearchTaskRequest; -import com.linghu.model.entity.Keyword; -import com.linghu.model.entity.Question; -import com.linghu.model.entity.User; -import com.linghu.service.KeywordService; -import com.linghu.service.QuestionService; -import com.linghu.service.ReferenceService; -import com.linghu.utils.JwtUtils; -import com.linghu.model.dto.SearchTaskResponse; -import com.linghu.model.dto.TaskStatusResponse; -import com.linghu.model.dto.TaskCancelResponse; -import com.linghu.model.dto.TaskListResponse; - -import io.jsonwebtoken.lang.Collections; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; +import org.springframework.web.bind.annotation.* ; -import org.springframework.web.bind.annotation.*; -import org.springframework.http.HttpStatus; -import com.linghu.model.dto.TaskResultResponse; -import com.linghu.model.dto.TaskResultResponse.QuestionResult; -import com.linghu.model.dto.TaskResultResponse.UserResult; -import com.linghu.model.entity.Reference; -import java.util.stream.Collectors; + @RestController @RequestMapping("/collect") @Api(value = "采集接口", tags = "采集管理") +@Slf4j public class CollectController { - @Autowired - private ReferenceService referenceService; - - @Value("${linghu.url}") - private String baseUrl; @Autowired - private WebClient webClient; + private CollectionService collectionService; - @Autowired - private JwtUtils jwtUtils; - @Autowired - private KeywordService keywordService; - @Autowired - private QuestionService questionService; + @PostMapping("/search") @ApiOperation(value = "开始采集") - public Mono<ResponseResult<SearchTaskResponse>> createSearchTask( - @RequestBody SearchTaskRequest searchTaskRequest, - HttpServletRequest request) { - String token = request.getHeader("Authorization"); - User user = jwtUtils.parseToken(token); - List<User> users = new ArrayList<>(); - users.add(user); - searchTaskRequest.setUsers(users); - - return webClient.post() - .uri(baseUrl + "/search") - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(searchTaskRequest) - .retrieve() - .bodyToMono(new ParameterizedTypeReference<ResponseResult<SearchTaskResponse>>() { - }) - .flatMap(responseResult -> { - // 提取任务ID - SearchTaskResponse taskResponse = responseResult.getData(); - if (taskResponse != null && taskResponse.getTask_id() != null) { - // 保存任务ID到关键词 - LambdaUpdateWrapper<Keyword> updateWrapper = new LambdaUpdateWrapper<>(); - updateWrapper.eq(Keyword::getKeyword_id, searchTaskRequest.getKeyword_id()); - updateWrapper.set(Keyword::getTask_id, taskResponse.getTask_id()); - keywordService.update(updateWrapper); - // 可选:更新响应中的其他信息 - // taskResponse.setMessage("任务已提交并保存,ID: " + taskResponse.getTaskId()); - } - return Mono.just(responseResult); - }) - .onErrorResume(e -> { - return Mono.just(ResponseResult.error("调用失败: " + e.getMessage())); - }); + public Mono<ResponseResult<?>> createSearchTask( + @Valid @RequestBody SearchTaskRequest searchTaskRequest, + HttpServletRequest request) throws JsonProcessingException { + // 首先检查服务器资源 + return collectionService.getResponseResultMono(searchTaskRequest); } @ApiOperation(value = "查询任务状态") @GetMapping("/status") public Mono<TaskStatusResponse> getTaskStatus(String taskId) { - return webClient.get() - .uri(baseUrl + "/tasks/" + taskId) - .accept(MediaType.APPLICATION_JSON) - .retrieve() - .onStatus(HttpStatus::is4xxClientError, response -> response.bodyToMono(String.class) - .flatMap(errorBody -> Mono.error(new RuntimeException("任务不存在: " + errorBody)))) - .bodyToMono(TaskStatusResponse.class) - .flatMap(result -> { - TaskStatusResponse taskStatusResponse = result; - if (taskStatusResponse != null && taskStatusResponse.getStatus() != null) { - List<Question> updateQuestions = taskStatusResponse.getQuestions_status().stream() - .map(qs -> { - Question question = new Question(); - question.setQuestion_id(qs.getQuestion_id()); - question.setStatus(qs.getStatus()); - return question; - }).collect(Collectors.toList()); - - questionService.updateBatchById(updateQuestions); - } - return Mono.just(result); - }); + return collectionService.getError(taskId); } - @PostMapping("/cancel/{taskId}") + @PostMapping("/cancel/{keywordId}") @ApiOperation(value = "取消任务") - public Mono<ResponseResult<TaskCancelResponse>> cancelTask(@PathVariable String taskId) { - return webClient.post() - .uri(baseUrl + "/tasks/" + taskId + "/cancel") - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(Collections.emptyMap()) // 添加空请求体 - .retrieve() - .onStatus(HttpStatus::isError, response -> { - if (response.statusCode() == HttpStatus.NOT_FOUND) { - return response.bodyToMono(String.class) - .flatMap(errorBody -> Mono.error(new RuntimeException("任务不存在"))); - } else if (response.statusCode() == HttpStatus.BAD_REQUEST) { - return response.bodyToMono(String.class) - .flatMap(errorBody -> Mono.error(new RuntimeException("任务已经完成,无法取消"))); - } - return response.createException().flatMap(Mono::error); - }) - .bodyToMono(TaskCancelResponse.class) - .map(data -> ResponseResult.success(data)) - .onErrorResume(e -> { - if (e.getMessage().contains("任务不存在")) { - return Mono.just(ResponseResult.error(404, "任务不存在")); - } else if (e.getMessage().contains("无法取消")) { - return Mono.just(ResponseResult.error(400, "任务已完成,无法取消")); - } - return Mono.just(ResponseResult.error(500, "取消任务失败: " + e.getMessage())); - }); + public Mono<ResponseResult<TaskCancelResponse>> cancelTask(@PathVariable Integer keywordId) { + return collectionService.getResponseResult(keywordId); } @ApiOperation(value = "获取任务结果") @GetMapping("/tasks/{taskId}") - public Mono<ResponseResult<TaskResultResponse>> getTaskResult(@PathVariable String taskId) { - return webClient.get() - .uri(baseUrl + "/tasks/" + taskId + "/result") - .accept(MediaType.APPLICATION_JSON) - .retrieve() - .onStatus(HttpStatus::is4xxClientError, response -> { - if (response.statusCode() == HttpStatus.NOT_FOUND) { - return response.bodyToMono(String.class) - .flatMap(errorBody -> Mono.error(new RuntimeException("任务不存在"))); - } else if (response.statusCode() == HttpStatus.BAD_REQUEST) { - return response.bodyToMono(String.class) - .flatMap(errorBody -> Mono.error(new RuntimeException("任务未完成,无法获取结果"))); - } - return response.createException().flatMap(Mono::error); - }) - .bodyToMono(new ParameterizedTypeReference<ResponseResult<TaskResultResponse>>() { - }) - .flatMap(responseResult -> { - TaskResultResponse result = responseResult.getData(); - if (result != null && result.getResults() != null) { - return updateQuestionAndReference(result) - .thenReturn(responseResult); - } - return Mono.just(responseResult); - }) - .onErrorResume(e -> { - System.out.println("获取任务结果失败"); - return Mono.just(ResponseResult.error(e.getMessage())); - }); - } - - /* - * private Mono<Void> updateQuestionAndReference(TaskResultResponse result) { - * return Mono.fromRunnable(() -> { - * // 1. 更新关键词状态 - * LambdaUpdateWrapper<Keyword> keywordUpdate = new LambdaUpdateWrapper<>(); - * keywordUpdate.eq(Keyword::getTask_id, result.getTask_id()) - * .set(Keyword::getStatus, "completed"); - * keywordService.update(keywordUpdate); - * - * // 查询关键词ID - * LambdaQueryWrapper<Keyword> keywordQuery = new LambdaQueryWrapper<>(); - * keywordQuery.eq(Keyword::getTask_id, result.getTask_id()); - * Keyword keyword = keywordService.getOne(keywordQuery); - * - * if (keyword == null) { - * System.out.println("未找到关联的关键词,task_id: " + result.getTask_id()); - * return; - * } - * - * // 2. 处理每个用户的问题结果 - * for (UserResult userResult : result.getResults()) { - * for (QuestionResult questionResult : userResult.getQuestions_results()) { - * // 2.1 查询问题ID - * LambdaQueryWrapper<Question> queryWrapper = new LambdaQueryWrapper<>(); - * queryWrapper.eq(Question::getQuestion, questionResult.getQuestion()) - * .eq(Question::getKeyword_id, keyword.getKeyword_id()); - * Question question = questionService.getOne(queryWrapper); - * - * if (question != null) { - * // 更新问题状态 - * LambdaUpdateWrapper<Question> updateWrapper = new LambdaUpdateWrapper<>(); - * updateWrapper.eq(Question::getQuestion_id, question.getQuestion_id()) - * .set(Question::getStatus, questionResult.getStatus()) - * .set(Question::getResponse, questionResult.getResponse()) - * .set(Question::getExtracted_count, questionResult.getExtracted_count()) - * .set(Question::getError, questionResult.getError()) - * .set(Question::getTimestamp, LocalDateTime.parse( - * questionResult.getTimestamp(), - * DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS") - * )); - * questionService.update(updateWrapper); - * - * // 2.2 保存引用数据 - * List<Reference> references = questionResult.getReferences().stream() - * .map(ref -> { - * Reference reference = new Reference(); - * reference.setQuestion_id(question.getQuestion_id()); - * reference.setTitle(ref.getTitle()); - * reference.setUrl(ref.getUrl()); - * reference.setDomain(ref.getDomain()); - * reference.setCreate_time(new Date()); - * return reference; - * }) - * .collect(Collectors.toList()); - * - * if (!references.isEmpty()) { - * referenceService.saveBatch(references); - * } - * } else { - * System.out.println("未找到匹配的问题,question " + question.getQuestion()); - * - * } - * } - * } - * }); - * } - */ - - private Mono<Void> updateQuestionAndReference(TaskResultResponse result) { - return Mono.fromRunnable(() -> { - try { - // 1. 更新关键词状态 - LambdaUpdateWrapper<Keyword> keywordUpdate = new LambdaUpdateWrapper<>(); - keywordUpdate.eq(Keyword::getTask_id, result.getTask_id()) - .set(Keyword::getStatus, "completed"); - keywordService.update(keywordUpdate); - - // 查询关键词ID - LambdaQueryWrapper<Keyword> keywordQuery = new LambdaQueryWrapper<>(); - keywordQuery.eq(Keyword::getTask_id, result.getTask_id()); - Keyword keyword = keywordService.getOne(keywordQuery); - - if (keyword == null) { - // log.error("未找到关联的关键词,task_id: {}", result.getTask_id()); - System.out.println("未找到关联的关键词,task_id: " + result.getTask_id()); - return; - } - - // 2. 批量查询所有问题(假设Question有task_id和keyword_id字段) - LambdaQueryWrapper<Question> queryWrapper = new LambdaQueryWrapper<>(); - queryWrapper.eq(Question::getKeyword_id, keyword.getKeyword_id()); - List<Question> questions = questionService.list(queryWrapper); - - // 构建问题映射表,用于快速查找 - Map<String, Question> questionMap = questions.stream() - .collect(Collectors.toMap(Question::getQuestion, q -> q)); - - // 3. 收集所有需要更新的问题和引用 - List<Question> questionsToUpdate = new ArrayList<>(); - List<Reference> allReferences = new ArrayList<>(); - - // 遍历结果,只收集数据不执行数据库操作 - for (UserResult userResult : result.getResults()) { - for (QuestionResult questionResult : userResult.getQuestions_results()) { - try { - Question question = questionMap.get(questionResult.getQuestion()); - if (question != null) { - // 更新问题对象 - question.setStatus(questionResult.getStatus()); - question.setResponse(questionResult.getResponse()); - question.setExtracted_count(questionResult.getExtracted_count()); - question.setError(questionResult.getError()); - - // 解析时间戳 - if (questionResult.getTimestamp() != null) { - DateTimeFormatter formatter = DateTimeFormatter - .ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS"); - question.setTimestamp( - LocalDateTime.parse(questionResult.getTimestamp(), formatter)); - } - - questionsToUpdate.add(question); - - // 收集引用数据 - List<Reference> references = questionResult.getReferences().stream() - .map(ref -> { - Reference reference = new Reference(); - reference.setQuestion_id(question.getQuestion_id()); - reference.setTitle(ref.getTitle()); - reference.setUrl(ref.getUrl()); - reference.setDomain(ref.getDomain()); - reference.setCreate_time(LocalDateTime.now()); - return reference; - }) - .collect(Collectors.toList()); - - allReferences.addAll(references); - } else { - // log.warn("未找到匹配的问题,question: {}, keyword_id: {}", - // questionResult.getQuestion(), keyword.getKeyword_id()); - } - } catch (Exception e) { - // log.error("处理问题结果失败,question: {}, error: {}", - // questionResult.getQuestion(), e.getMessage(), e); - } - } - } - - // 4. 批量更新问题 - if (!questionsToUpdate.isEmpty()) { - questionService.updateBatchById(questionsToUpdate); - // log.info("成功批量更新 {} 个问题", questionsToUpdate.size()); - } - - // 5. 批量插入引用 - if (!allReferences.isEmpty()) { - // 分批处理,每批1000条记录,避免内存溢出 - int batchSize = 1000; - for (int i = 0; i < allReferences.size(); i += batchSize) { - List<Reference> batch = allReferences.subList( - i, Math.min(i + batchSize, allReferences.size())); - referenceService.saveBatch(batch); - } - // log.info("成功批量插入 {} 条引用数据", allReferences.size()); - } - - } catch (Exception e) { - // log.error("更新问题和引用数据失败,task_id: {}, error: {}", - // result.getTask_id(), e.getMessage(), e); - throw new RuntimeException("更新问题和引用数据失败", e); - } - }); + public Mono<TaskResultResponse> getTaskResult(@PathVariable String taskId) { + return collectionService.getTaskResultResponseMono(taskId); } @GetMapping("/tasks/all") @ApiOperation(value = "获取所有任务列表") - public Mono<ResponseResult<TaskListResponse>> getAllTasks() { - return webClient.get() - .uri(baseUrl + "/tasks") - .accept(MediaType.APPLICATION_JSON) - .retrieve() - .bodyToMono(new ParameterizedTypeReference<ResponseResult<TaskListResponse>>() { - }) - .onErrorResume(e -> { - return Mono.just(ResponseResult.error("获取任务列表失败: " + e.getMessage())); - }); + public Mono<TaskListResponse> getAllTasks() { + return collectionService.getTaskListResponseMono(); } @GetMapping("/health") + @ApiOperation("健康检查") public Mono<HealthResponse> checkThirdPartyHealth() { - return webClient.get() - .uri(baseUrl + "/health") // 假设第三方健康检查接口路径为/health - .retrieve() - .bodyToMono(HealthResponse.class) - .onErrorResume(e -> Mono.just( - new HealthResponse("unhealthy", null, "", e.getMessage()))); + return collectionService.getHealthResponseMono(); } + + /** + * 查询服务器资源 + */ + @GetMapping("/server/resource") + @ApiOperation(value = "查询服务器资源") + public Mono<ServerResourceResponse> getServerResource() { + return collectionService.getServerResourceResponseMono(); + } + + + } -- Gitblit v1.7.1