package com.linghu.controller; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import javax.servlet.http.HttpServletRequest; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.*; import org.springframework.web.client.RestTemplate; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.linghu.model.common.ResponseResult; import com.linghu.model.dto.SearchTaskRequest; import com.linghu.model.entity.Keyword; import com.linghu.model.entity.Question; import com.linghu.model.entity.User; import com.linghu.service.KeywordService; import com.linghu.service.QuestionService; import com.linghu.service.ReferenceService; import com.linghu.utils.JwtUtils; import com.linghu.model.dto.SearchTaskResponse; import com.linghu.model.dto.TaskStatusResponse; import com.linghu.model.dto.TaskCancelResponse; import io.jsonwebtoken.lang.Collections; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; import org.springframework.web.bind.annotation.*; import org.springframework.http.HttpStatus; import com.linghu.model.dto.TaskResultResponse; import com.linghu.model.dto.TaskResultResponse.QuestionResult; import com.linghu.model.dto.TaskResultResponse.UserResult; import com.linghu.model.entity.Reference; import java.util.stream.Collectors; @RestController @RequestMapping("/collect") @Api(value = "采集接口", tags = "采集管理") public class CollectController { @Autowired private ReferenceService referenceService; @Value("${linghu.url}") private String baseUrl; @Autowired private WebClient webClient; @Autowired private JwtUtils jwtUtils; @Autowired private KeywordService keywordService; @Autowired private QuestionService questionService; @PostMapping("/search") @ApiOperation(value = "开始采集") public Mono> createSearchTask( @RequestBody SearchTaskRequest searchTaskRequest, HttpServletRequest request) { String token = request.getHeader("Authorization"); User user = jwtUtils.parseToken(token); List users = new ArrayList<>(); users.add(user); searchTaskRequest.setUsers(users); return webClient.post() .uri(baseUrl + "/search") .contentType(MediaType.APPLICATION_JSON) .bodyValue(searchTaskRequest) .retrieve() .bodyToMono(new ParameterizedTypeReference>() { }) .flatMap(responseResult -> { // 提取任务ID SearchTaskResponse taskResponse = responseResult.getData(); if (taskResponse != null && taskResponse.getTask_id() != null) { // 保存任务ID到关键词 LambdaUpdateWrapper updateWrapper = new LambdaUpdateWrapper<>(); updateWrapper.eq(Keyword::getKeyword_id, searchTaskRequest.getKeyword_id()); updateWrapper.set(Keyword::getTask_id, taskResponse.getTask_id()); keywordService.update(updateWrapper); // 可选:更新响应中的其他信息 // taskResponse.setMessage("任务已提交并保存,ID: " + taskResponse.getTaskId()); } return Mono.just(responseResult); }) .onErrorResume(e -> { return Mono.just(ResponseResult.error("调用失败: " + e.getMessage())); }); } @ApiOperation(value = "查询任务状态") @GetMapping("/status") public Mono getTaskStatus(String taskId) { return webClient.get() .uri(baseUrl + "/tasks/" + taskId) .accept(MediaType.APPLICATION_JSON) .retrieve() .onStatus(HttpStatus::is4xxClientError, response -> response.bodyToMono(String.class) .flatMap(errorBody -> Mono.error(new RuntimeException("任务不存在: " + errorBody)))) .bodyToMono(TaskStatusResponse.class) .flatMap(result -> { TaskStatusResponse taskStatusResponse = result; if (taskStatusResponse != null && taskStatusResponse.getStatus() != null) { List updateQuestions = taskStatusResponse.getQuestions_status().stream() .map(qs -> { Question question = new Question(); question.setQuestion_id(qs.getQuestion_id()); question.setStatus(qs.getStatus()); return question; }).collect(Collectors.toList()); questionService.updateBatchById(updateQuestions); } return Mono.just(result); }); } @PostMapping("/cancel/{taskId}") @ApiOperation(value = "取消任务") public Mono> cancelTask(@PathVariable String taskId) { return webClient.post() .uri(baseUrl + "/tasks/" + taskId + "/cancel") .contentType(MediaType.APPLICATION_JSON) .bodyValue(Collections.emptyMap()) // 添加空请求体 .retrieve() .onStatus(HttpStatus::isError, response -> { if (response.statusCode() == HttpStatus.NOT_FOUND) { return response.bodyToMono(String.class) .flatMap(errorBody -> Mono.error(new RuntimeException("任务不存在"))); } else if (response.statusCode() == HttpStatus.BAD_REQUEST) { return response.bodyToMono(String.class) .flatMap(errorBody -> Mono.error(new RuntimeException("任务已经完成,无法取消"))); } return response.createException().flatMap(Mono::error); }) .bodyToMono(TaskCancelResponse.class) .map(data -> ResponseResult.success(data)) .onErrorResume(e -> { if (e.getMessage().contains("任务不存在")) { return Mono.just(ResponseResult.error(404, "任务不存在")); } else if (e.getMessage().contains("无法取消")) { return Mono.just(ResponseResult.error(400, "任务已完成,无法取消")); } return Mono.just(ResponseResult.error(500, "取消任务失败: " + e.getMessage())); }); } // @ApiOperation(value = "获取任务结果") // @GetMapping("/tasks/{taskId}/result") // public Mono> getTaskResult(@PathVariable // String taskId) { // return webClient.get() // .uri(baseUrl + "/tasks/" + taskId + "/result") // .retrieve() // .onStatus(HttpStatus::isError, response -> response.bodyToMono(String.class) // .flatMap(errorBody -> Mono.error(new RuntimeException("获取结果失败: " + // errorBody)))) // .bodyToMono(TaskResultResponse.class) // .flatMap(result -> { // // 更新keyword状态 // LambdaUpdateWrapper keywordWrapper = new LambdaUpdateWrapper<>(); // keywordWrapper.eq(Keyword::getTask_id, taskId) // .set(Keyword::getStatus, "completed"); // keywordService.update(keywordWrapper); // // 更新question信息并收集references // List updateQuestions = new ArrayList<>(); // List references = new ArrayList<>(); // result.getResults().forEach(userResult -> { // userResult.getQuestions_results().forEach(qResult -> { // Question question = new Question(); // question.setQuestion_id(qResult.getQuestion_id()); // question.setResponse(qResult.getResponse()); // question.setStatus(qResult.getStatus()); // updateQuestions.add(question); // // 转换references // references.addAll(qResult.getReferences().stream() // .map(ref -> new Reference( // qResult.getQuestion_id(), // ref.getTitle(), // ref.getUrl(), // ref.getDomain(), // result.getTask_id(), // )) // .collect(Collectors.toList())); // }); // }); // // 批量更新和插入 // if (!updateQuestions.isEmpty()) { // questionService.updateBatchById(updateQuestions); // } // if (!references.isEmpty()) { // referenceService.saveBatch(references); // } // return Mono.just(ResponseResult.success(result)); // }) // .onErrorResume(e -> Mono.just(ResponseResult.error(e.getMessage()))); // } // @ApiOperation(value = "获取任务结果") // @GetMapping("/tasks/{taskId}/result") // public Mono> // getTaskResultlMono(@PathVariable String taskId) { // return webClient.get() // .uri(baseUrl + "/tasks/" + taskId + "/result") // .accept(MediaType.APPLICATION_JSON) // .retrieve() // .onStatus(HttpStatus::is4xxClientError, response -> { // if (response.statusCode() == HttpStatus.NOT_FOUND) { // return response.bodyToMono(String.class) // .flatMap(errorBody -> Mono.error(new RuntimeException("任务不存在"))); // } else if (response.statusCode() == HttpStatus.BAD_REQUEST) { // return response.bodyToMono(String.class) // .flatMap(errorBody -> Mono.error(new RuntimeException("任务未完成,无法获取结果"))); // } // return response.createException().flatMap(Mono::error); // }) // .bodyToMono(new // ParameterizedTypeReference>() {}) // .flatMap(responseResult -> { // TaskResultResponse result = responseResult.getData(); // if (result != null && result.getResults() != null) { // // 处理结果并更新数据库 // return updateQuestionAndReference(result) // .thenReturn(responseResult); // } // return Mono.just(responseResult); // }) // .onErrorResume(e -> { // return Mono.just(ResponseResult.error(e.getMessage())); // }); // } // 更新问题和引用数据 // private Mono updateQuestionAndReference(TaskResultResponse result) { // return Mono.fromRunnable(() -> { // // 1. 更新关键词状态 // LambdaUpdateWrapper keywordUpdate = new LambdaUpdateWrapper<>(); // keywordUpdate.eq(Keyword::getTask_id, result.getTask_id()) // .set(Keyword::getStatus, "completed"); // keywordService.update(keywordUpdate); // // 2. 处理每个用户的问题结果 // for (UserResult userResult : result.getResults()) { // for (QuestionResult questionResult : userResult.getQuestions_results()) { // // 2.1 更新问题状态 // LambdaUpdateWrapper questionUpdate = new LambdaUpdateWrapper<>(); // questionUpdate.eq(Question::getTa, result.getTask_id()) // .eq(Question::getContent, questionResult.getQuestion()) // .set(Question::getStatus, questionResult.getStatus()) // .set(Question::getResponse, questionResult.getResponse()) // .set(Question::getProcessTime, // LocalDateTime.parse(questionResult.getTimestamp())); // questionService.update(questionUpdate); // // 2.2 保存引用数据 // List references = questionResult.getReferences().stream() // .map(ref -> { // Reference reference = new Reference(); // reference.setQuestionId(questionService.getOne(questionUpdate).getId()); // reference.setTitle(ref.getTitle()); // reference.setUrl(ref.getUrl()); // reference.setDomain(ref.getDomain()); // reference.setCreateTime(LocalDateTime.now()); // return reference; // }) // .collect(Collectors.toList()); // if (!references.isEmpty()) { // referenceService.saveBatch(references); // } // } // } // }); // } }