From dd028e18a12ad9ae7c43ed09b15ddd6bde1a43e9 Mon Sep 17 00:00:00 2001 From: guyue <1721849008@qq.com> Date: 星期三, 03 九月 2025 11:27:50 +0800 Subject: [PATCH] 采集中状态修改提前,统计数据合并 --- src/main/java/com/linghu/controller/CollectController.java | 301 +++++++------------------------------------------ 1 files changed, 45 insertions(+), 256 deletions(-) diff --git a/src/main/java/com/linghu/controller/CollectController.java b/src/main/java/com/linghu/controller/CollectController.java index 323f987..5ccdb76 100644 --- a/src/main/java/com/linghu/controller/CollectController.java +++ b/src/main/java/com/linghu/controller/CollectController.java @@ -1,295 +1,84 @@ package com.linghu.controller; -import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; + import javax.servlet.http.HttpServletRequest; +import javax.validation.Valid; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.linghu.model.dto.*; + +import com.linghu.service.*; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.core.ParameterizedTypeReference; -import org.springframework.http.*; -import org.springframework.web.client.RestTemplate; -import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; - -import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.linghu.model.common.ResponseResult; -import com.linghu.model.dto.SearchTaskRequest; -import com.linghu.model.entity.Keyword; -import com.linghu.model.entity.Question; -import com.linghu.model.entity.User; -import com.linghu.service.KeywordService; -import com.linghu.service.QuestionService; -import com.linghu.service.ReferenceService; -import com.linghu.utils.JwtUtils; -import com.linghu.model.dto.SearchTaskResponse; -import com.linghu.model.dto.TaskStatusResponse; -import com.linghu.model.dto.TaskCancelResponse; - -import io.jsonwebtoken.lang.Collections; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; +import org.springframework.web.bind.annotation.* ; -import org.springframework.web.bind.annotation.*; -import org.springframework.http.HttpStatus; -import com.linghu.model.dto.TaskResultResponse; -import com.linghu.model.dto.TaskResultResponse.QuestionResult; -import com.linghu.model.dto.TaskResultResponse.UserResult; -import com.linghu.model.entity.Reference; -import java.util.stream.Collectors; + @RestController @RequestMapping("/collect") @Api(value = "采集接口", tags = "采集管理") +@Slf4j public class CollectController { - @Autowired - private ReferenceService referenceService; - - @Value("${linghu.url}") - private String baseUrl; @Autowired - private WebClient webClient; + private CollectionService collectionService; - @Autowired - private JwtUtils jwtUtils; - @Autowired - private KeywordService keywordService; - @Autowired - private QuestionService questionService; + @PostMapping("/search") @ApiOperation(value = "开始采集") - public Mono<ResponseResult<SearchTaskResponse>> createSearchTask( - @RequestBody SearchTaskRequest searchTaskRequest, - HttpServletRequest request) { - String token = request.getHeader("Authorization"); - User user = jwtUtils.parseToken(token); - List<User> users = new ArrayList<>(); - users.add(user); - searchTaskRequest.setUsers(users); - - return webClient.post() - .uri(baseUrl + "/search") - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(searchTaskRequest) - .retrieve() - .bodyToMono(new ParameterizedTypeReference<ResponseResult<SearchTaskResponse>>() { - }) - .flatMap(responseResult -> { - // 提取任务ID - SearchTaskResponse taskResponse = responseResult.getData(); - if (taskResponse != null && taskResponse.getTask_id() != null) { - // 保存任务ID到关键词 - LambdaUpdateWrapper<Keyword> updateWrapper = new LambdaUpdateWrapper<>(); - updateWrapper.eq(Keyword::getKeyword_id, searchTaskRequest.getKeyword_id()); - updateWrapper.set(Keyword::getTask_id, taskResponse.getTask_id()); - keywordService.update(updateWrapper); - // 可选:更新响应中的其他信息 - // taskResponse.setMessage("任务已提交并保存,ID: " + taskResponse.getTaskId()); - } - return Mono.just(responseResult); - }) - .onErrorResume(e -> { - return Mono.just(ResponseResult.error("调用失败: " + e.getMessage())); - }); + public Mono<ResponseResult<?>> createSearchTask( + @Valid @RequestBody SearchTaskRequest searchTaskRequest, + HttpServletRequest request) throws JsonProcessingException { + // 首先检查服务器资源 + return collectionService.getResponseResultMono(searchTaskRequest); } @ApiOperation(value = "查询任务状态") @GetMapping("/status") - public Mono<TaskStatusResponse> getTaskStatus(String taskId) { - return webClient.get() - .uri(baseUrl + "/tasks/" + taskId) - .accept(MediaType.APPLICATION_JSON) - .retrieve() - .onStatus(HttpStatus::is4xxClientError, response -> response.bodyToMono(String.class) - .flatMap(errorBody -> Mono.error(new RuntimeException("任务不存在: " + errorBody)))) - .bodyToMono(TaskStatusResponse.class) - .flatMap(result -> { - TaskStatusResponse taskStatusResponse = result; - if (taskStatusResponse != null && taskStatusResponse.getStatus() != null) { - List<Question> updateQuestions = taskStatusResponse.getQuestions_status().stream() - .map(qs -> { - Question question = new Question(); - question.setQuestion_id(qs.getQuestion_id()); - question.setStatus(qs.getStatus()); - return question; - }).collect(Collectors.toList()); - - questionService.updateBatchById(updateQuestions); - } - return Mono.just(result); - }); + public Mono<TaskStatusResponse> getTaskStatus(@RequestParam(value = "taskId" )String taskId) { + return collectionService.getError(taskId); } - @PostMapping("/cancel/{taskId}") + @PostMapping("/cancel/{keywordId}") @ApiOperation(value = "取消任务") - public Mono<ResponseResult<TaskCancelResponse>> cancelTask(@PathVariable String taskId) { - return webClient.post() - .uri(baseUrl + "/tasks/" + taskId + "/cancel") - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(Collections.emptyMap()) // 添加空请求体 - .retrieve() - .onStatus(HttpStatus::isError, response -> { - if (response.statusCode() == HttpStatus.NOT_FOUND) { - return response.bodyToMono(String.class) - .flatMap(errorBody -> Mono.error(new RuntimeException("任务不存在"))); - } else if (response.statusCode() == HttpStatus.BAD_REQUEST) { - return response.bodyToMono(String.class) - .flatMap(errorBody -> Mono.error(new RuntimeException("任务已经完成,无法取消"))); - } - return response.createException().flatMap(Mono::error); - }) - .bodyToMono(TaskCancelResponse.class) - .map(data -> ResponseResult.success(data)) - .onErrorResume(e -> { - if (e.getMessage().contains("任务不存在")) { - return Mono.just(ResponseResult.error(404, "任务不存在")); - } else if (e.getMessage().contains("无法取消")) { - return Mono.just(ResponseResult.error(400, "任务已完成,无法取消")); - } - return Mono.just(ResponseResult.error(500, "取消任务失败: " + e.getMessage())); - }); + public Mono<ResponseResult<TaskCancelResponse>> cancelTask(@PathVariable Integer keywordId) { + return collectionService.getResponseResult(keywordId); } - // @ApiOperation(value = "获取任务结果") - // @GetMapping("/tasks/{taskId}/result") - // public Mono<ResponseResult<TaskResultResponse>> getTaskResult(@PathVariable - // String taskId) { - // return webClient.get() - // .uri(baseUrl + "/tasks/" + taskId + "/result") - // .retrieve() - // .onStatus(HttpStatus::isError, response -> response.bodyToMono(String.class) - // .flatMap(errorBody -> Mono.error(new RuntimeException("获取结果失败: " + - // errorBody)))) - // .bodyToMono(TaskResultResponse.class) - // .flatMap(result -> { - // // 更新keyword状态 - // LambdaUpdateWrapper<Keyword> keywordWrapper = new LambdaUpdateWrapper<>(); - // keywordWrapper.eq(Keyword::getTask_id, taskId) - // .set(Keyword::getStatus, "completed"); - // keywordService.update(keywordWrapper); - // // 更新question信息并收集references - // List<Question> updateQuestions = new ArrayList<>(); - // List<Reference> references = new ArrayList<>(); + @ApiOperation(value = "获取任务结果") + @GetMapping("/tasks/{taskId}") + public Mono<TaskResultResponse> getTaskResult(@PathVariable String taskId) { + return collectionService.getTaskResultResponseMono(taskId); + } - // result.getResults().forEach(userResult -> { - // userResult.getQuestions_results().forEach(qResult -> { - // Question question = new Question(); - // question.setQuestion_id(qResult.getQuestion_id()); - // question.setResponse(qResult.getResponse()); - // question.setStatus(qResult.getStatus()); - // updateQuestions.add(question); + @GetMapping("/tasks/all") + @ApiOperation(value = "获取所有任务列表") + public Mono<TaskListResponse> getAllTasks() { + return collectionService.getTaskListResponseMono(); + } - // // 转换references - // references.addAll(qResult.getReferences().stream() - // .map(ref -> new Reference( - // qResult.getQuestion_id(), - // ref.getTitle(), - // ref.getUrl(), - // ref.getDomain(), - // result.getTask_id(), - // )) - // .collect(Collectors.toList())); - // }); - // }); + @GetMapping("/health") + @ApiOperation("健康检查") + public Mono<HealthResponse> checkThirdPartyHealth() { + return collectionService.getHealthResponseMono(); + } - // // 批量更新和插入 - // if (!updateQuestions.isEmpty()) { - // questionService.updateBatchById(updateQuestions); - // } - // if (!references.isEmpty()) { - // referenceService.saveBatch(references); - // } + /** + * 查询服务器资源 + */ + @GetMapping("/server/resource") + @ApiOperation(value = "查询服务器资源") + public Mono<ServerResourceResponse> getServerResource() { + return collectionService.getServerResourceResponseMono(); + } - // return Mono.just(ResponseResult.success(result)); - // }) - // .onErrorResume(e -> Mono.just(ResponseResult.error(e.getMessage()))); - // } - // @ApiOperation(value = "获取任务结果") - // @GetMapping("/tasks/{taskId}/result") - // public Mono<ResponseResult<TaskResultResponse>> - // getTaskResultlMono(@PathVariable String taskId) { - // return webClient.get() - // .uri(baseUrl + "/tasks/" + taskId + "/result") - // .accept(MediaType.APPLICATION_JSON) - // .retrieve() - // .onStatus(HttpStatus::is4xxClientError, response -> { - // if (response.statusCode() == HttpStatus.NOT_FOUND) { - // return response.bodyToMono(String.class) - // .flatMap(errorBody -> Mono.error(new RuntimeException("任务不存在"))); - // } else if (response.statusCode() == HttpStatus.BAD_REQUEST) { - // return response.bodyToMono(String.class) - // .flatMap(errorBody -> Mono.error(new RuntimeException("任务未完成,无法获取结果"))); - // } - // return response.createException().flatMap(Mono::error); - // }) - // .bodyToMono(new - // ParameterizedTypeReference<ResponseResult<TaskResultResponse>>() {}) - // .flatMap(responseResult -> { - // TaskResultResponse result = responseResult.getData(); - // if (result != null && result.getResults() != null) { - // // 处理结果并更新数据库 - // return updateQuestionAndReference(result) - // .thenReturn(responseResult); - // } - // return Mono.just(responseResult); - // }) - // .onErrorResume(e -> { - - // return Mono.just(ResponseResult.error(e.getMessage())); - // }); - // } - - // 更新问题和引用数据 - // private Mono<Void> updateQuestionAndReference(TaskResultResponse result) { - // return Mono.fromRunnable(() -> { - // // 1. 更新关键词状态 - // LambdaUpdateWrapper<Keyword> keywordUpdate = new LambdaUpdateWrapper<>(); - // keywordUpdate.eq(Keyword::getTask_id, result.getTask_id()) - // .set(Keyword::getStatus, "completed"); - // keywordService.update(keywordUpdate); - - // // 2. 处理每个用户的问题结果 - // for (UserResult userResult : result.getResults()) { - // for (QuestionResult questionResult : userResult.getQuestions_results()) { - // // 2.1 更新问题状态 - // LambdaUpdateWrapper<Question> questionUpdate = new LambdaUpdateWrapper<>(); - // questionUpdate.eq(Question::getTa, result.getTask_id()) - // .eq(Question::getContent, questionResult.getQuestion()) - // .set(Question::getStatus, questionResult.getStatus()) - // .set(Question::getResponse, questionResult.getResponse()) - // .set(Question::getProcessTime, - // LocalDateTime.parse(questionResult.getTimestamp())); - // questionService.update(questionUpdate); - - // // 2.2 保存引用数据 - // List<Reference> references = questionResult.getReferences().stream() - // .map(ref -> { - // Reference reference = new Reference(); - // reference.setQuestionId(questionService.getOne(questionUpdate).getId()); - // reference.setTitle(ref.getTitle()); - // reference.setUrl(ref.getUrl()); - // reference.setDomain(ref.getDomain()); - // reference.setCreateTime(LocalDateTime.now()); - // return reference; - // }) - // .collect(Collectors.toList()); - - // if (!references.isEmpty()) { - // referenceService.saveBatch(references); - // } - // } - // } - // }); - // } } -- Gitblit v1.7.1