From dd028e18a12ad9ae7c43ed09b15ddd6bde1a43e9 Mon Sep 17 00:00:00 2001 From: guyue <1721849008@qq.com> Date: 星期三, 03 九月 2025 11:27:50 +0800 Subject: [PATCH] 采集中状态修改提前,统计数据合并 --- src/main/java/com/linghu/controller/CollectController.java | 403 +++++---------------------------------------------------- 1 files changed, 35 insertions(+), 368 deletions(-) diff --git a/src/main/java/com/linghu/controller/CollectController.java b/src/main/java/com/linghu/controller/CollectController.java index 4df7b80..5ccdb76 100644 --- a/src/main/java/com/linghu/controller/CollectController.java +++ b/src/main/java/com/linghu/controller/CollectController.java @@ -1,417 +1,84 @@ package com.linghu.controller; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; + import javax.servlet.http.HttpServletRequest; +import javax.validation.Valid; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.linghu.model.dto.*; -import org.springframework.beans.BeanUtils; + +import com.linghu.service.*; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.core.ParameterizedTypeReference; -import org.springframework.http.*; -import org.springframework.web.client.RestTemplate; -import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; - -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.linghu.model.common.ResponseResult; -import com.linghu.model.entity.Keyword; -import com.linghu.model.entity.Question; -import com.linghu.model.entity.User; -import com.linghu.service.KeywordService; -import com.linghu.service.QuestionService; -import com.linghu.service.ReferenceService; -import com.linghu.utils.JwtUtils; - -import io.jsonwebtoken.lang.Collections; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; - import org.springframework.web.bind.annotation.* ; -import org.springframework.http.HttpStatus; -import com.linghu.model.dto.TaskResultResponse.QuestionResult; -import com.linghu.model.dto.TaskResultResponse.UserResult; -import com.linghu.model.entity.Reference; -import java.util.stream.Collectors; -import java.util.stream.IntStream; + + @RestController @RequestMapping("/collect") @Api(value = "采集接口", tags = "采集管理") +@Slf4j public class CollectController { - @Autowired - private ReferenceService referenceService; - - @Value("${linghu.url}") - private String baseUrl; @Autowired - private WebClient webClient; + private CollectionService collectionService; - @Autowired - private JwtUtils jwtUtils; - @Autowired - private KeywordService keywordService; - @Autowired - private QuestionService questionService; + @PostMapping("/search") @ApiOperation(value = "开始采集") - public Mono<SearchTaskResponse> createSearchTask( - @RequestBody SearchTaskRequest searchTaskRequest, + public Mono<ResponseResult<?>> createSearchTask( + @Valid @RequestBody SearchTaskRequest searchTaskRequest, HttpServletRequest request) throws JsonProcessingException { -// String token = request.getHeader("Authorization"); -// User user = jwtUtils.parseToken(token); -// // 复制到UserDto -// UserDto userDto = new UserDto(); -// userDto.setName(user.getUser_name()); -// userDto.setEmail(user.getUser_email()); -// userDto.setPassword(user.getPassword()); -// -// List<UserDto> users = new ArrayList<>(); -// users.add(userDto); -// searchTaskRequest.setUsers(users); - // json格式 - ObjectMapper objectMapper = new ObjectMapper(); - System.out.println(objectMapper.writeValueAsString(searchTaskRequest)); - - return webClient.post() - .uri(baseUrl + "/api/v1/search") - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(searchTaskRequest) - .retrieve() - .onStatus(HttpStatus::is4xxClientError, response -> response.bodyToMono(String.class) - .flatMap(errorBody -> Mono.error(new RuntimeException(errorBody)))) - .bodyToMono(new ParameterizedTypeReference<SearchTaskResponse>() { - }) - .flatMap(responseResult -> { - // 提取任务ID - SearchTaskResponse taskResponse = responseResult; - if (taskResponse != null && taskResponse.getTask_id() != null) { - // 保存任务ID到关键词 - LambdaUpdateWrapper<Keyword> updateWrapper = new LambdaUpdateWrapper<>(); - updateWrapper.eq(Keyword::getKeyword_id, searchTaskRequest.getKeyword_id()); - updateWrapper.set(Keyword::getTask_id, taskResponse.getTask_id()); - keywordService.update(updateWrapper); - // 可选:更新响应中的其他信息 - // taskResponse.setMessage("任务已提交并保存,ID: " + taskResponse.getTaskId()); - } - return Mono.just(taskResponse); - }) - .onErrorResume(e -> { - // return Mono.just(ResponseResult.error("调用失败: " + e.getMessage())); - SearchTaskResponse task = new SearchTaskResponse(); - task.setMessage("调用失败: " + e.getMessage()); - return Mono.just(task); - }); + // 首先检查服务器资源 + return collectionService.getResponseResultMono(searchTaskRequest); } @ApiOperation(value = "查询任务状态") @GetMapping("/status") - public Mono<TaskStatusResponse> getTaskStatus(String taskId) { - return webClient.get() - .uri(baseUrl + "/api/v1/tasks/" + taskId) - .accept(MediaType.APPLICATION_JSON) - .retrieve() - .onStatus(HttpStatus::is4xxClientError, response -> response.bodyToMono(TaskStatusResponse.class) - .flatMap(errorBody -> Mono.error(new RuntimeException(errorBody.getDetail())))) - .bodyToMono(TaskStatusResponse.class) - .flatMap(result -> { - TaskStatusResponse taskStatusResponse = result; - if (taskStatusResponse != null && taskStatusResponse.getStatus() != null) { - List<Question> updateQuestions = taskStatusResponse.getQuestions_status().stream() - .map(qs -> { - Question question = new Question(); - question.setQuestion_id(qs.getQuestion_id()); - question.setStatus(qs.getStatus()); - return question; - }).collect(Collectors.toList()); - - questionService.updateBatchById(updateQuestions); - } - return Mono.just(result); - }) - .onErrorResume(e -> { - // 创建一个自定义的错误响应对象 - TaskStatusResponse errorResponse = new TaskStatusResponse(); - errorResponse.setStatus("ERROR"); - errorResponse.setMessage(e.getMessage()); - errorResponse.setDetail(e.getMessage()); - - return Mono.just(errorResponse); - }); + public Mono<TaskStatusResponse> getTaskStatus(@RequestParam(value = "taskId" )String taskId) { + return collectionService.getError(taskId); } - @PostMapping("/cancel/{taskId}") + @PostMapping("/cancel/{keywordId}") @ApiOperation(value = "取消任务") - public Mono<ResponseResult<TaskCancelResponse>> cancelTask(@PathVariable String taskId) { - return webClient.post() - .uri(baseUrl + "/api/v1/tasks/" + taskId + "/cancel") - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(Collections.emptyMap()) // 添加空请求体 - .retrieve() - .onStatus(HttpStatus::isError, response -> response.bodyToMono(TaskCancelResponse.class) - .flatMap(errorBody -> Mono.error(new RuntimeException(errorBody.getDetail())))) - .bodyToMono(TaskCancelResponse.class) - .map(data -> ResponseResult.success(data)) - .onErrorResume(e -> { - if (e.getMessage().contains("任务不存在")) { - return Mono.just(ResponseResult.error(404, "任务不存在")); - } else if (e.getMessage().contains("无法取消")) { - return Mono.just(ResponseResult.error(400, "任务已完成,无法取消")); - } - return Mono.just(ResponseResult.error(500, "取消任务失败: " + e.getMessage())); - }); + public Mono<ResponseResult<TaskCancelResponse>> cancelTask(@PathVariable Integer keywordId) { + return collectionService.getResponseResult(keywordId); } @ApiOperation(value = "获取任务结果") @GetMapping("/tasks/{taskId}") public Mono<TaskResultResponse> getTaskResult(@PathVariable String taskId) { - return webClient.get() - .uri(baseUrl + "/api/v1/tasks/" + taskId + "/result") - .accept(MediaType.APPLICATION_JSON) - .retrieve() - .onStatus(HttpStatus::is4xxClientError, response -> { - if (response.statusCode() == HttpStatus.NOT_FOUND) { - return response.bodyToMono(String.class) - .flatMap(errorBody -> Mono.error(new RuntimeException("任务不存在"))); - } else if (response.statusCode() == HttpStatus.BAD_REQUEST) { - return response.bodyToMono(String.class) - .flatMap(errorBody -> Mono.error(new RuntimeException("任务未完成,无法获取结果"))); - } - return response.createException().flatMap(Mono::error); - }) - .bodyToMono(TaskResultResponse.class) - .flatMap(responseResult -> { - TaskResultResponse result = responseResult; - if (result != null && result.getResults() != null) { - return updateQuestionAndReference(result) - .thenReturn(responseResult); - } - return Mono.just(responseResult); - }) - .onErrorResume(e -> { - System.out.println("获取任务结果失败"); - TaskResultResponse result = new TaskResultResponse(); - result.setDetail("获取任务结果失败: " + e.getMessage()); - return Mono.just(result); - }); - } - -// private Mono<Void> updateQuestionAndReference(TaskResultResponse result) { -// return Mono.fromRunnable(() -> { -// // 1. 更新关键词状态 -// LambdaUpdateWrapper<Keyword> keywordUpdate = new LambdaUpdateWrapper<>(); -// keywordUpdate.eq(Keyword::getTask_id, result.getTask_id()) -// .set(Keyword::getStatus, "completed"); -// keywordService.update(keywordUpdate); -// -// // 查询关键词ID -// LambdaQueryWrapper<Keyword> keywordQuery = new LambdaQueryWrapper<>(); -// keywordQuery.eq(Keyword::getTask_id, result.getTask_id()); -// Keyword keyword = keywordService.getOne(keywordQuery); -// -// if (keyword == null) { -// System.out.println("未找到关联的关键词,task_id: " + result.getTask_id()); -// return; -// } -// -// // 2. 处理每个用户的问题结果 -// for (UserResult userResult : result.getResults()) { -// for (QuestionResult questionResult : userResult.getQuestions_results()) { -// // 2.1 查询问题ID -// LambdaQueryWrapper<Question> queryWrapper = new LambdaQueryWrapper<>(); -// queryWrapper.eq(Question::getQuestion, questionResult.getQuestion()) -// .eq(Question::getKeyword_id, keyword.getKeyword_id()); -// Question question = questionService.getOne(queryWrapper); -// -// if (question != null) { -// // 更新问题状态 -// LambdaUpdateWrapper<Question> updateWrapper = new LambdaUpdateWrapper<>(); -// updateWrapper.eq(Question::getQuestion_id, question.getQuestion_id()) -// .set(Question::getStatus, questionResult.getStatus()) -// .set(Question::getResponse, questionResult.getResponse()) -// .set(Question::getExtracted_count, questionResult.getExtracted_count()) -// .set(Question::getError, questionResult.getError()) -// .set(Question::getTimestamp, LocalDateTime.parse( -// questionResult.getTimestamp(), -// DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS") -// )); -// questionService.update(updateWrapper); -// -// // 2.2 保存引用数据 -// List<Reference> references = questionResult.getReferences().stream() -// .map(ref -> { -// Reference reference = new Reference(); -// reference.setQuestion_id(question.getQuestion_id()); -// reference.setTitle(ref.getTitle()); -// reference.setUrl(ref.getUrl()); -// reference.setDomain(ref.getDomain()); -// reference.setCreate_time(LocalDateTime.now()); -// return reference; -// }) -// .collect(Collectors.toList()); -// -// if (!references.isEmpty()) { -// referenceService.saveBatch(references); -// } -// } else { -// System.out.println("未找到匹配的问题,question " + question.getQuestion()); -// -// } -// } -// } -// }); -// } - - private Mono<Void> updateQuestionAndReference(TaskResultResponse result) { - return Mono.fromRunnable(() -> { - try { - // 1. 更新关键词状态 - LambdaUpdateWrapper<Keyword> keywordUpdate = new LambdaUpdateWrapper<>(); - keywordUpdate.eq(Keyword::getTask_id, result.getTask_id()) - .set(Keyword::getStatus, "completed"); - keywordService.update(keywordUpdate); - - // 查询关键词ID - LambdaQueryWrapper<Keyword> keywordQuery = new LambdaQueryWrapper<>(); - keywordQuery.eq(Keyword::getTask_id, result.getTask_id()); - Keyword keyword = keywordService.getOne(keywordQuery); - - if (keyword == null) { - System.out.println("未找到关联的关键词,task_id: " + result.getTask_id()); - return; - } - - // 2. 批量查询所有问题 - LambdaQueryWrapper<Question> queryWrapper = new LambdaQueryWrapper<>(); - queryWrapper.eq(Question::getKeyword_id, keyword.getKeyword_id()); - List<Question> questions = questionService.list(queryWrapper); - - // 构建问题映射表,用于快速查找 - Map<String, Question> questionMap = questions.stream() - .collect(Collectors.toMap(Question::getQuestion, q -> q)); - - // 3. 收集所有需要更新的问题和引用 - List<Question> questionsToUpdate = new ArrayList<>(); - List<Reference> allReferences = new ArrayList<>(); - - // 遍历结果 - for (UserResult userResult : result.getResults()) { - for (QuestionResult questionResult : userResult.getQuestions_results()) { - try { - Question question = questionMap.get(questionResult.getQuestion()); - if (question != null) { - // 更新问题对象 - question.setStatus(questionResult.getStatus()); - question.setResponse(questionResult.getResponse()); - question.setExtracted_count(questionResult.getExtracted_count()); - question.setError(questionResult.getError()); - - // 解析时间戳 - if (questionResult.getTimestamp() != null) { - DateTimeFormatter formatter = DateTimeFormatter - .ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS"); - question.setTimestamp( - LocalDateTime.parse(questionResult.getTimestamp(), formatter)); - } - //更新 -// questionService.updateById(question); - - questionsToUpdate.add(question); - - // 收集引用数据,处理空集合情况 - List<Reference> references = - Optional.ofNullable(questionResult.getReferences()) - .orElse(Collections.emptyList()) - .stream() - .map(ref -> { - Reference reference = new Reference(); - reference.setQuestion_id(question.getQuestion_id()); - reference.setTitle(ref.getTitle()); - reference.setUrl(ref.getUrl()); - reference.setDomain(ref.getDomain()); - reference.setCreate_time(LocalDateTime.now()); - return reference; - }) - .collect(Collectors.toList()); - - // 添加到总引用列表 - if (!references.isEmpty()) { - allReferences.addAll(references); - } - } - } catch (Exception e) { - System.out.println("处理问题结果失败: " + e.getMessage()); - } - } - } - - // 4. 批量更新问题 - System.out.println(questionsToUpdate); - if (!questionsToUpdate.isEmpty()) { - questionService.updateBatchById(questionsToUpdate); - System.out.println("成功批量更新 " + questionsToUpdate.size() + " 个问题"); - } - - // 5. 批量插入引用,使用流式分批处理 - if (!allReferences.isEmpty()) { - int batchSize = 1000; - IntStream.iterate(0, i -> i + batchSize) - .limit((allReferences.size() + batchSize - 1) / batchSize) - .forEach(i -> { - List<Reference> batch = allReferences.subList( - i, Math.min(i + batchSize, allReferences.size())); - referenceService.saveBatch(batch); - }); - System.out.println("成功批量插入 " + allReferences.size() + " 条引用数据"); - } - - } catch (Exception e) { - System.out.println("更新问题和引用数据失败: " + e.getMessage()); - throw new RuntimeException("更新问题和引用数据失败", e); - } - }); + return collectionService.getTaskResultResponseMono(taskId); } @GetMapping("/tasks/all") @ApiOperation(value = "获取所有任务列表") public Mono<TaskListResponse> getAllTasks() { - return webClient.get() - .uri(baseUrl + "/api/v1/tasks") - .accept(MediaType.APPLICATION_JSON) - .retrieve() - .bodyToMono(new ParameterizedTypeReference<TaskListResponse>() { - }) - .onErrorResume(e -> { - TaskListResponse response = new TaskListResponse(); - response.setDetail("获取任务列表失败: " + e.getMessage()); - return Mono.just(response); - - // return Mono.just(ResponseResult.error("获取任务列表失败: " + e.getMessage())); - }); + return collectionService.getTaskListResponseMono(); } @GetMapping("/health") + @ApiOperation("健康检查") public Mono<HealthResponse> checkThirdPartyHealth() { - return webClient.get() - .uri(baseUrl + "/health") // 假设第三方健康检查接口路径为/health - .retrieve() - .bodyToMono(HealthResponse.class) - .onErrorResume(e -> Mono.just( - new HealthResponse("unhealthy", null, "", e.getMessage()))); + return collectionService.getHealthResponseMono(); } + + /** + * 查询服务器资源 + */ + @GetMapping("/server/resource") + @ApiOperation(value = "查询服务器资源") + public Mono<ServerResourceResponse> getServerResource() { + return collectionService.getServerResourceResponseMono(); + } + + + } -- Gitblit v1.7.1