| | |
| | | package com.linghu.controller; |
| | | |
| | | import java.time.LocalDateTime; |
| | | import java.util.ArrayList; |
| | | import java.util.Date; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.stream.Collectors; |
| | | |
| | | |
| | | import javax.servlet.http.HttpServletRequest; |
| | | import javax.validation.Valid; |
| | | |
| | | import com.fasterxml.jackson.core.JsonProcessingException; |
| | | import com.linghu.model.dto.*; |
| | | |
| | | import com.linghu.service.*; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.beans.factory.annotation.Value; |
| | | import org.springframework.core.ParameterizedTypeReference; |
| | | import org.springframework.http.*; |
| | | import org.springframework.web.client.RestTemplate; |
| | | import org.springframework.web.reactive.function.client.WebClient; |
| | | import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; |
| | | |
| | | import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; |
| | | import com.linghu.model.common.ResponseResult; |
| | | import com.linghu.model.dto.SearchTaskRequest; |
| | | import com.linghu.model.entity.Keyword; |
| | | import com.linghu.model.entity.Question; |
| | | import com.linghu.model.entity.User; |
| | | import com.linghu.service.KeywordService; |
| | | import com.linghu.service.QuestionService; |
| | | import com.linghu.service.ReferenceService; |
| | | import com.linghu.utils.JwtUtils; |
| | | import com.linghu.model.dto.SearchTaskResponse; |
| | | import com.linghu.model.dto.TaskStatusResponse; |
| | | import com.linghu.model.dto.TaskCancelResponse; |
| | | |
| | | import io.jsonwebtoken.lang.Collections; |
| | | import io.swagger.annotations.Api; |
| | | import io.swagger.annotations.ApiOperation; |
| | | import reactor.core.publisher.Mono; |
| | | import org.springframework.web.bind.annotation.* ; |
| | | |
| | | import org.springframework.web.bind.annotation.*; |
| | | import org.springframework.http.HttpStatus; |
| | | import com.linghu.model.dto.TaskResultResponse; |
| | | import com.linghu.model.dto.TaskResultResponse.QuestionResult; |
| | | import com.linghu.model.dto.TaskResultResponse.UserResult; |
| | | import com.linghu.model.entity.Reference; |
| | | import java.util.stream.Collectors; |
| | | |
| | | |
| | | @RestController |
| | | @RequestMapping("/collect") |
| | | @Api(value = "采集接口", tags = "采集管理") |
| | | @Slf4j |
| | | public class CollectController { |
| | | |
| | | @Autowired |
| | | private ReferenceService referenceService; |
| | | |
| | | @Value("${linghu.url}") |
| | | private String baseUrl; |
| | | |
| | | @Autowired |
| | | private WebClient webClient; |
| | | private CollectionService collectionService; |
| | | |
| | | @Autowired |
| | | private JwtUtils jwtUtils; |
| | | @Autowired |
| | | private KeywordService keywordService; |
| | | @Autowired |
| | | private QuestionService questionService; |
| | | |
| | | |
| | | @PostMapping("/search") |
| | | @ApiOperation(value = "开始采集") |
| | | public Mono<ResponseResult<SearchTaskResponse>> createSearchTask( |
| | | @RequestBody SearchTaskRequest searchTaskRequest, |
| | | HttpServletRequest request) { |
| | | String token = request.getHeader("Authorization"); |
| | | User user = jwtUtils.parseToken(token); |
| | | List<User> users = new ArrayList<>(); |
| | | users.add(user); |
| | | searchTaskRequest.setUsers(users); |
| | | |
| | | return webClient.post() |
| | | .uri(baseUrl + "/search") |
| | | .contentType(MediaType.APPLICATION_JSON) |
| | | .bodyValue(searchTaskRequest) |
| | | .retrieve() |
| | | .bodyToMono(new ParameterizedTypeReference<ResponseResult<SearchTaskResponse>>() { |
| | | }) |
| | | .flatMap(responseResult -> { |
| | | // 提取任务ID |
| | | SearchTaskResponse taskResponse = responseResult.getData(); |
| | | if (taskResponse != null && taskResponse.getTask_id() != null) { |
| | | // 保存任务ID到关键词 |
| | | LambdaUpdateWrapper<Keyword> updateWrapper = new LambdaUpdateWrapper<>(); |
| | | updateWrapper.eq(Keyword::getKeyword_id, searchTaskRequest.getKeyword_id()); |
| | | updateWrapper.set(Keyword::getTask_id, taskResponse.getTask_id()); |
| | | keywordService.update(updateWrapper); |
| | | // 可选:更新响应中的其他信息 |
| | | // taskResponse.setMessage("任务已提交并保存,ID: " + taskResponse.getTaskId()); |
| | | } |
| | | return Mono.just(responseResult); |
| | | }) |
| | | .onErrorResume(e -> { |
| | | return Mono.just(ResponseResult.error("调用失败: " + e.getMessage())); |
| | | }); |
| | | public Mono<ResponseResult<?>> createSearchTask( |
| | | @Valid @RequestBody SearchTaskRequest searchTaskRequest, |
| | | HttpServletRequest request) throws JsonProcessingException { |
| | | // 首先检查服务器资源 |
| | | return collectionService.getResponseResultMono(searchTaskRequest); |
| | | } |
| | | |
| | | @ApiOperation(value = "查询任务状态") |
| | | @GetMapping("/status") |
| | | public Mono<TaskStatusResponse> getTaskStatus(String taskId) { |
| | | return webClient.get() |
| | | .uri(baseUrl + "/tasks/" + taskId) |
| | | .accept(MediaType.APPLICATION_JSON) |
| | | .retrieve() |
| | | .onStatus(HttpStatus::is4xxClientError, response -> response.bodyToMono(String.class) |
| | | .flatMap(errorBody -> Mono.error(new RuntimeException("任务不存在: " + errorBody)))) |
| | | .bodyToMono(TaskStatusResponse.class) |
| | | .flatMap(result -> { |
| | | TaskStatusResponse taskStatusResponse = result; |
| | | if (taskStatusResponse != null && taskStatusResponse.getStatus() != null) { |
| | | List<Question> updateQuestions = taskStatusResponse.getQuestions_status().stream() |
| | | .map(qs -> { |
| | | Question question = new Question(); |
| | | question.setQuestion_id(qs.getQuestion_id()); |
| | | question.setStatus(qs.getStatus()); |
| | | return question; |
| | | }).collect(Collectors.toList()); |
| | | |
| | | questionService.updateBatchById(updateQuestions); |
| | | } |
| | | return Mono.just(result); |
| | | }); |
| | | public Mono<TaskStatusResponse> getTaskStatus(@RequestParam(value = "taskId" )String taskId) { |
| | | return collectionService.getError(taskId); |
| | | } |
| | | |
| | | @PostMapping("/cancel/{taskId}") |
| | | @PostMapping("/cancel/{keywordId}") |
| | | @ApiOperation(value = "取消任务") |
| | | public Mono<ResponseResult<TaskCancelResponse>> cancelTask(@PathVariable String taskId) { |
| | | return webClient.post() |
| | | .uri(baseUrl + "/tasks/" + taskId + "/cancel") |
| | | .contentType(MediaType.APPLICATION_JSON) |
| | | .bodyValue(Collections.emptyMap()) // 添加空请求体 |
| | | .retrieve() |
| | | .onStatus(HttpStatus::isError, response -> { |
| | | if (response.statusCode() == HttpStatus.NOT_FOUND) { |
| | | return response.bodyToMono(String.class) |
| | | .flatMap(errorBody -> Mono.error(new RuntimeException("任务不存在"))); |
| | | } else if (response.statusCode() == HttpStatus.BAD_REQUEST) { |
| | | return response.bodyToMono(String.class) |
| | | .flatMap(errorBody -> Mono.error(new RuntimeException("任务已经完成,无法取消"))); |
| | | } |
| | | return response.createException().flatMap(Mono::error); |
| | | }) |
| | | .bodyToMono(TaskCancelResponse.class) |
| | | .map(data -> ResponseResult.success(data)) |
| | | .onErrorResume(e -> { |
| | | if (e.getMessage().contains("任务不存在")) { |
| | | return Mono.just(ResponseResult.error(404, "任务不存在")); |
| | | } else if (e.getMessage().contains("无法取消")) { |
| | | return Mono.just(ResponseResult.error(400, "任务已完成,无法取消")); |
| | | } |
| | | return Mono.just(ResponseResult.error(500, "取消任务失败: " + e.getMessage())); |
| | | }); |
| | | public Mono<ResponseResult<TaskCancelResponse>> cancelTask(@PathVariable Integer keywordId) { |
| | | return collectionService.getResponseResult(keywordId); |
| | | } |
| | | // @ApiOperation(value = "获取任务结果") |
| | | // @GetMapping("/tasks/{taskId}/result") |
| | | // public Mono<ResponseResult<TaskResultResponse>> getTaskResult(@PathVariable |
| | | // String taskId) { |
| | | // return webClient.get() |
| | | // .uri(baseUrl + "/tasks/" + taskId + "/result") |
| | | // .retrieve() |
| | | // .onStatus(HttpStatus::isError, response -> response.bodyToMono(String.class) |
| | | // .flatMap(errorBody -> Mono.error(new RuntimeException("获取结果失败: " + |
| | | // errorBody)))) |
| | | // .bodyToMono(TaskResultResponse.class) |
| | | // .flatMap(result -> { |
| | | // // 更新keyword状态 |
| | | // LambdaUpdateWrapper<Keyword> keywordWrapper = new LambdaUpdateWrapper<>(); |
| | | // keywordWrapper.eq(Keyword::getTask_id, taskId) |
| | | // .set(Keyword::getStatus, "completed"); |
| | | // keywordService.update(keywordWrapper); |
| | | |
| | | // // 更新question信息并收集references |
| | | // List<Question> updateQuestions = new ArrayList<>(); |
| | | // List<Reference> references = new ArrayList<>(); |
| | | @ApiOperation(value = "获取任务结果") |
| | | @GetMapping("/tasks/{taskId}") |
| | | public Mono<TaskResultResponse> getTaskResult(@PathVariable String taskId) { |
| | | return collectionService.getTaskResultResponseMono(taskId); |
| | | } |
| | | |
| | | // result.getResults().forEach(userResult -> { |
| | | // userResult.getQuestions_results().forEach(qResult -> { |
| | | // Question question = new Question(); |
| | | // question.setQuestion_id(qResult.getQuestion_id()); |
| | | // question.setResponse(qResult.getResponse()); |
| | | // question.setStatus(qResult.getStatus()); |
| | | // updateQuestions.add(question); |
| | | @GetMapping("/tasks/all") |
| | | @ApiOperation(value = "获取所有任务列表") |
| | | public Mono<TaskListResponse> getAllTasks() { |
| | | return collectionService.getTaskListResponseMono(); |
| | | } |
| | | |
| | | // // 转换references |
| | | // references.addAll(qResult.getReferences().stream() |
| | | // .map(ref -> new Reference( |
| | | // qResult.getQuestion_id(), |
| | | // ref.getTitle(), |
| | | // ref.getUrl(), |
| | | // ref.getDomain(), |
| | | // result.getTask_id(), |
| | | // )) |
| | | // .collect(Collectors.toList())); |
| | | // }); |
| | | // }); |
| | | @GetMapping("/health") |
| | | @ApiOperation("健康检查") |
| | | public Mono<HealthResponse> checkThirdPartyHealth() { |
| | | return collectionService.getHealthResponseMono(); |
| | | } |
| | | |
| | | // // 批量更新和插入 |
| | | // if (!updateQuestions.isEmpty()) { |
| | | // questionService.updateBatchById(updateQuestions); |
| | | // } |
| | | // if (!references.isEmpty()) { |
| | | // referenceService.saveBatch(references); |
| | | // } |
| | | /** |
| | | * 查询服务器资源 |
| | | */ |
| | | @GetMapping("/server/resource") |
| | | @ApiOperation(value = "查询服务器资源") |
| | | public Mono<ServerResourceResponse> getServerResource() { |
| | | return collectionService.getServerResourceResponseMono(); |
| | | } |
| | | |
| | | // return Mono.just(ResponseResult.success(result)); |
| | | // }) |
| | | // .onErrorResume(e -> Mono.just(ResponseResult.error(e.getMessage()))); |
| | | // } |
| | | |
| | | // @ApiOperation(value = "获取任务结果") |
| | | // @GetMapping("/tasks/{taskId}/result") |
| | | // public Mono<ResponseResult<TaskResultResponse>> |
| | | // getTaskResultlMono(@PathVariable String taskId) { |
| | | // return webClient.get() |
| | | // .uri(baseUrl + "/tasks/" + taskId + "/result") |
| | | // .accept(MediaType.APPLICATION_JSON) |
| | | // .retrieve() |
| | | // .onStatus(HttpStatus::is4xxClientError, response -> { |
| | | // if (response.statusCode() == HttpStatus.NOT_FOUND) { |
| | | // return response.bodyToMono(String.class) |
| | | // .flatMap(errorBody -> Mono.error(new RuntimeException("任务不存在"))); |
| | | // } else if (response.statusCode() == HttpStatus.BAD_REQUEST) { |
| | | // return response.bodyToMono(String.class) |
| | | // .flatMap(errorBody -> Mono.error(new RuntimeException("任务未完成,无法获取结果"))); |
| | | // } |
| | | // return response.createException().flatMap(Mono::error); |
| | | // }) |
| | | // .bodyToMono(new |
| | | // ParameterizedTypeReference<ResponseResult<TaskResultResponse>>() {}) |
| | | // .flatMap(responseResult -> { |
| | | // TaskResultResponse result = responseResult.getData(); |
| | | // if (result != null && result.getResults() != null) { |
| | | // // 处理结果并更新数据库 |
| | | // return updateQuestionAndReference(result) |
| | | // .thenReturn(responseResult); |
| | | // } |
| | | // return Mono.just(responseResult); |
| | | // }) |
| | | // .onErrorResume(e -> { |
| | | |
| | | // return Mono.just(ResponseResult.error(e.getMessage())); |
| | | // }); |
| | | // } |
| | | |
| | | // 更新问题和引用数据 |
| | | // private Mono<Void> updateQuestionAndReference(TaskResultResponse result) { |
| | | // return Mono.fromRunnable(() -> { |
| | | // // 1. 更新关键词状态 |
| | | // LambdaUpdateWrapper<Keyword> keywordUpdate = new LambdaUpdateWrapper<>(); |
| | | // keywordUpdate.eq(Keyword::getTask_id, result.getTask_id()) |
| | | // .set(Keyword::getStatus, "completed"); |
| | | // keywordService.update(keywordUpdate); |
| | | |
| | | // // 2. 处理每个用户的问题结果 |
| | | // for (UserResult userResult : result.getResults()) { |
| | | // for (QuestionResult questionResult : userResult.getQuestions_results()) { |
| | | // // 2.1 更新问题状态 |
| | | // LambdaUpdateWrapper<Question> questionUpdate = new LambdaUpdateWrapper<>(); |
| | | // questionUpdate.eq(Question::getTa, result.getTask_id()) |
| | | // .eq(Question::getContent, questionResult.getQuestion()) |
| | | // .set(Question::getStatus, questionResult.getStatus()) |
| | | // .set(Question::getResponse, questionResult.getResponse()) |
| | | // .set(Question::getProcessTime, |
| | | // LocalDateTime.parse(questionResult.getTimestamp())); |
| | | // questionService.update(questionUpdate); |
| | | |
| | | // // 2.2 保存引用数据 |
| | | // List<Reference> references = questionResult.getReferences().stream() |
| | | // .map(ref -> { |
| | | // Reference reference = new Reference(); |
| | | // reference.setQuestionId(questionService.getOne(questionUpdate).getId()); |
| | | // reference.setTitle(ref.getTitle()); |
| | | // reference.setUrl(ref.getUrl()); |
| | | // reference.setDomain(ref.getDomain()); |
| | | // reference.setCreateTime(LocalDateTime.now()); |
| | | // return reference; |
| | | // }) |
| | | // .collect(Collectors.toList()); |
| | | |
| | | // if (!references.isEmpty()) { |
| | | // referenceService.saveBatch(references); |
| | | // } |
| | | // } |
| | | // } |
| | | // }); |
| | | // } |
| | | |
| | | } |