guyue
2 天以前 d3b6555513c6c0e283bd6e891d4e080aefa6003a
src/main/java/com/linghu/controller/CollectController.java
@@ -1,19 +1,23 @@
package com.linghu.controller;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linghu.mapper.PlatformMapper;
import com.linghu.mapper.TypeMapper;
import com.linghu.model.dto.*;
import com.linghu.model.entity.*;
import com.linghu.service.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@@ -26,30 +30,28 @@
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.linghu.model.common.ResponseResult;
import com.linghu.model.entity.Keyword;
import com.linghu.model.entity.Question;
import com.linghu.model.entity.User;
import com.linghu.service.KeywordService;
import com.linghu.service.QuestionService;
import com.linghu.service.ReferenceService;
import com.linghu.utils.JwtUtils;
import io.jsonwebtoken.lang.Collections;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import org.springframework.web.bind.annotation.* ;
import org.springframework.http.HttpStatus;
import com.linghu.model.dto.TaskResultResponse.QuestionResult;
import com.linghu.model.dto.TaskResultResponse.UserResult;
import com.linghu.model.entity.Reference;
import reactor.core.scheduler.Schedulers;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@RestController
@RequestMapping("/collect")
@Api(value = "采集接口", tags = "采集管理")
@Slf4j
public class CollectController {
    @Autowired
@@ -67,26 +69,19 @@
    private KeywordService keywordService;
    @Autowired
    private QuestionService questionService;
    @Autowired
    private KeywordTaskService keywordTaskService;
    @Autowired
    private PlatformService platformService;
    @Autowired
    private TypeService typeService;
    @PostMapping("/search")
 /*   @PostMapping("/search")
    @ApiOperation(value = "开始采集")
    public Mono<SearchTaskResponse> createSearchTask(
            @RequestBody SearchTaskRequest searchTaskRequest,
            HttpServletRequest request) throws JsonProcessingException {
//        String token = request.getHeader("Authorization");
//        User user = jwtUtils.parseToken(token);
//        // 复制到UserDto
//        UserDto userDto = new UserDto();
//        userDto.setName(user.getUser_name());
//        userDto.setEmail(user.getUser_email());
//        userDto.setPassword(user.getPassword());
//
//        List<UserDto> users = new ArrayList<>();
//        users.add(userDto);
//        searchTaskRequest.setUsers(users);
        // json格式
        ObjectMapper objectMapper = new ObjectMapper();
        System.out.println(objectMapper.writeValueAsString(searchTaskRequest));
        return webClient.post()
                .uri(baseUrl + "/api/v1/search")
@@ -104,10 +99,12 @@
                        // 保存任务ID到关键词
                        LambdaUpdateWrapper<Keyword> updateWrapper = new LambdaUpdateWrapper<>();
                        updateWrapper.eq(Keyword::getKeyword_id, searchTaskRequest.getKeyword_id());
                        updateWrapper.set(Keyword::getStatus,"Submitted");
                        updateWrapper.set(Keyword::getTask_id, taskResponse.getTask_id());
                        keywordService.update(updateWrapper);
                        // 可选:更新响应中的其他信息
                        // taskResponse.setMessage("任务已提交并保存,ID: " + taskResponse.getTaskId());
                    }
                    return Mono.just(taskResponse);
                })
@@ -117,8 +114,179 @@
                    task.setMessage("调用失败: " + e.getMessage());
                    return Mono.just(task);
                });
    }*/
//    public SearchTaskController(WebClient.Builder webClientBuilder, KeywordService keywordService) {
//        this.webClient = webClientBuilder.build();
//        this.keywordService = keywordService;
//    }
   /* @PostMapping("/search")
    @ApiOperation(value = "开始采集")
    public Mono<SearchTaskResponse> createSearchTask(
            @RequestBody SearchTaskRequest searchTaskRequest,
            HttpServletRequest request) throws JsonProcessingException {
        int maxConcurrentUsers = searchTaskRequest.getConfig() != null ?
                searchTaskRequest.getConfig().getMax_concurrent_users() : 3;
        List<List<UserDto>> userBatches = splitUsersIntoBatches(searchTaskRequest.getUsers(), maxConcurrentUsers);
        return processBatchesSequentially(userBatches, searchTaskRequest)
                .onErrorResume(e -> {
                    SearchTaskResponse task = new SearchTaskResponse();
                    task.setMessage("调用失败: " + e.getMessage());
                    return Mono.just(task);
                });
    }
    private List<List<UserDto>> splitUsersIntoBatches(List<UserDto> users, int batchSize) {
        List<List<UserDto>> batches = new ArrayList<>();
        for (int i = 0; i < users.size(); i += batchSize) {
            batches.add(users.subList(i, Math.min(i + batchSize, users.size())));
        }
        return batches;
    }
    private Mono<SearchTaskResponse> processBatchesSequentially(List<List<UserDto>> userBatches, SearchTaskRequest originalRequest) {
        Mono<SearchTaskResponse> resultMono = Mono.empty();
        for (List<UserDto> batch : userBatches) {
            SearchTaskRequest batchRequest = new SearchTaskRequest();
            batchRequest.setUsers(batch);
            batchRequest.setQuestions(originalRequest.getQuestions());
            batchRequest.setConfig(originalRequest.getConfig());
            batchRequest.setSave_to_database(originalRequest.getSave_to_database());
            batchRequest.setWebhook_url(originalRequest.getWebhook_url());
            batchRequest.setKeyword_id(originalRequest.getKeyword_id());
            resultMono = resultMono.then(createSingleBatchTask(batchRequest));
        }
        return resultMono;
    }
    private Mono<SearchTaskResponse> createSingleBatchTask(SearchTaskRequest batchRequest) {
        return webClient.post()
                .uri(baseUrl + "/api/v1/search")
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(batchRequest)
                .retrieve()
                .onStatus(HttpStatus::is4xxClientError, response -> response.bodyToMono(String.class)
                        .flatMap(errorBody -> Mono.error(new RuntimeException(errorBody))))
                .bodyToMono(new ParameterizedTypeReference<SearchTaskResponse>() {
                })
                .flatMap(responseResult -> {
                    SearchTaskResponse taskResponse = responseResult;
                    if (taskResponse != null && taskResponse.getTask_id() != null) {
                        LambdaUpdateWrapper<Keyword> updateWrapper = new LambdaUpdateWrapper<>();
                        updateWrapper.eq(Keyword::getKeyword_id, batchRequest.getKeyword_id());
                        updateWrapper.set(Keyword::getStatus, "Submitted");
                        updateWrapper.set(Keyword::getTask_id, taskResponse.getTask_id());
                        keywordService.update(updateWrapper);
                    }
                    return waitForTaskCompletion(taskResponse.getTask_id())
                            .then(Mono.just(taskResponse));
                });
    }
    private Mono<Void> waitForTaskCompletion(String taskId) {
        return Flux.interval(Duration.ofSeconds(5)) // 每5秒执行一次
                .flatMap(tick -> webClient.get()
                        .uri(baseUrl + "/api/v1/tasks/" + taskId)
                        .retrieve()
                        .bodyToMono(TaskStatusResponse.class)
                )
                .filter(response -> "completed".equals(response.getStatus()))
                .next() // 找到第一个完成的响应后结束流
                .then(); // 转换为Mono<Void>
    }*/
   @PostMapping("/search")
   @ApiOperation(value = "开始采集")
   public Mono<List<SearchTaskResponse>> createSearchTask(
           @RequestBody SearchTaskRequest searchTaskRequest,
           HttpServletRequest request) throws JsonProcessingException {
       int maxConcurrentUsers = searchTaskRequest.getConfig() != null ?
               searchTaskRequest.getConfig().getMax_concurrent_users() : 3;
       List<List<UserDto>> userBatches = splitUsersIntoBatches(searchTaskRequest.getUsers(), maxConcurrentUsers);
       // 获取 keywordId
       Integer keywordId = searchTaskRequest.getKeyword_id();
       return Flux.fromIterable(userBatches)
               .flatMap(batch -> {
                   SearchTaskRequest batchRequest = new SearchTaskRequest();
                   batchRequest.setUsers(batch);
                   batchRequest.setQuestions(searchTaskRequest.getQuestions());
                   batchRequest.setConfig(searchTaskRequest.getConfig());
                   batchRequest.setSave_to_database(searchTaskRequest.getSave_to_database());
                   batchRequest.setWebhook_url(searchTaskRequest.getWebhook_url());
                   batchRequest.setKeyword_id(keywordId);
                   return createSingleBatchTask(batchRequest)
                           .delaySubscription(Duration.ofSeconds(2)); // 批次之间添加延迟
               }, 1) // 限制并发数为1,确保顺序执行
               .collectList() // 收集所有批次的响应
               .flatMap(responses ->
                       saveKeywordTasks(keywordId, responses) // 保存关联关系
                               .thenReturn(responses) // 返回原始响应
               );
   }
    private Mono<Void> saveKeywordTasks(Integer keywordId, List<SearchTaskResponse> taskResponses) {
        List<KeywordTask> keywordTasks = taskResponses.stream()
                .filter(response -> response.getTask_id() != null)
                .map(response -> {
                    KeywordTask keywordTask = new KeywordTask();
                    keywordTask.setKeyword_id(keywordId);
                    keywordTask.setTask_id(response.getTask_id());
                    return keywordTask;
                })
                .collect(Collectors.toList());
        // 将 MyBatis-Plus 的同步方法包装为 Mono<Void>
        return Mono.fromRunnable(() -> {
                    boolean success = keywordTaskService.saveOrUpdateBatch(keywordTasks);
                    if (!success) {
                        throw new RuntimeException("保存关键词任务关联失败");
                    }
                })
                .doFinally(signalType -> log.info("成功保存 {} 个关键词任务关联", keywordTasks.size()))
                .then();
    }
    private List<List<UserDto>> splitUsersIntoBatches(List<UserDto> users, int batchSize) {
        List<List<UserDto>> batches = new ArrayList<>();
        for (int i = 0; i < users.size(); i += batchSize) {
            batches.add(users.subList(i, Math.min(i + batchSize, users.size())));
        }
        return batches;
    }
    private Mono<SearchTaskResponse> createSingleBatchTask(SearchTaskRequest batchRequest) {
        return webClient.post()
                .uri(baseUrl + "/api/v1/search")
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(batchRequest)
                .retrieve()
                .onStatus(HttpStatus::is4xxClientError, response -> response.bodyToMono(String.class)
                        .flatMap(errorBody -> Mono.error(new RuntimeException(errorBody))))
                .bodyToMono(new ParameterizedTypeReference<SearchTaskResponse>() {})
                .flatMap(taskResponse -> {
                    if (taskResponse != null && taskResponse.getTask_id() != null) {
                        // 使用 Reactor 的方式更新数据库
                        return Mono.fromRunnable(() -> {
                                    LambdaUpdateWrapper<Keyword> updateWrapper = new LambdaUpdateWrapper<>();
                                    updateWrapper.eq(Keyword::getKeyword_id, batchRequest.getKeyword_id());
                                    updateWrapper.set(Keyword::getStatus, "Submitted");
                                    updateWrapper.set(Keyword::getTask_id, taskResponse.getTask_id());
                                    keywordService.update(updateWrapper);
                                }).subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行
                                .thenReturn(taskResponse);
                    }
                    return Mono.just(taskResponse);
                });
    }
    // 移除原来的waitForTaskCompletion方法,不再需要同步等待
    @ApiOperation(value = "查询任务状态")
    @GetMapping("/status")
    public Mono<TaskStatusResponse> getTaskStatus(String taskId) {
@@ -126,7 +294,7 @@
                .uri(baseUrl + "/api/v1/tasks/" + taskId)
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .onStatus(HttpStatus::is4xxClientError, response -> response.bodyToMono(TaskStatusResponse.class)
                .onStatus(HttpStatus::isError, response -> response.bodyToMono(TaskStatusResponse.class)
                        .flatMap(errorBody -> Mono.error(new RuntimeException(errorBody.getDetail()))))
                .bodyToMono(TaskStatusResponse.class)
                .flatMap(result -> {
@@ -140,7 +308,12 @@
                                    return question;
                                }).collect(Collectors.toList());
                        questionService.updateBatchById(updateQuestions);
                        // 包装成响应式操作
                        return Mono.fromCallable(() -> {
                            questionService.updateBatchById(updateQuestions);
                            return result;
                        });
                    }
                    return Mono.just(result);
                })
@@ -287,13 +460,20 @@
                keywordService.update(keywordUpdate);
                // 查询关键词ID
                LambdaQueryWrapper<Keyword> keywordQuery = new LambdaQueryWrapper<>();
                keywordQuery.eq(Keyword::getTask_id, result.getTask_id());
                Keyword keyword = keywordService.getOne(keywordQuery);
                LambdaQueryWrapper<KeywordTask> keywordTaskWrapper = new LambdaQueryWrapper<>();
                keywordTaskWrapper.eq(KeywordTask::getTask_id, result.getTask_id());
                KeywordTask keywordTask = keywordTaskService.getOne(keywordTaskWrapper);
//                LambdaQueryWrapper<Keyword> keywordQuery = new LambdaQueryWrapper<>();
//                keywordQuery.eq(Keyword::getTask_id, keywordTask.getTask_id());
                Keyword keyword = keywordService.getById(keywordTask.getKeyword_id());
                if (keyword == null) {
                    System.out.println("未找到关联的关键词,task_id: " + result.getTask_id());
                    return;
                    //报错
                    throw new Exception("未找到关联的关键词,task_id: " + result.getTask_id());
//                    return;
                }
                // 2. 批量查询所有问题
@@ -320,6 +500,7 @@
                                question.setResponse(questionResult.getResponse());
                                question.setExtracted_count(questionResult.getExtracted_count());
                                question.setError(questionResult.getError());
                                question.setKeyword_id(keyword.getKeyword_id());
                                // 解析时间戳
                                if (questionResult.getTimestamp() != null) {
@@ -333,7 +514,17 @@
                                questionsToUpdate.add(question);
                                //如果查询结果不为空查询num
                                Integer maxNumByKeywordId = referenceService.getMaxNumByKeywordId(keyword.getKeyword_id());
                               if (maxNumByKeywordId != null){
                                   maxNumByKeywordId++;
                               }else {
                                   maxNumByKeywordId = 1;
                               }
                                // 收集引用数据,处理空集合情况
                                Integer finalMaxNumByKeywordId = maxNumByKeywordId;
                                List<Reference> references =
                                        Optional.ofNullable(questionResult.getReferences())
                                                .orElse(Collections.emptyList())
@@ -344,7 +535,28 @@
                                                    reference.setTitle(ref.getTitle());
                                                    reference.setUrl(ref.getUrl());
                                                    reference.setDomain(ref.getDomain());
                                                    reference.setNum(finalMaxNumByKeywordId);
                                                    //域名和平台id映射
                                                    reference.setCreate_time(LocalDateTime.now());
                                                    Platform platform = platformService.getPlatformByDomain(reference.getDomain());
//                                                    if (platform == null) {
//                                                        throw new RuntimeException("未找到对应的平台: " + reference.getDomain());
//                                                    }
                                                    if (platform != null){
                                                        reference.setPlatform_id(platform.getPlatform_id());
                                                        Type type = typeService.getById(platform.getType_id());
//                                                    if (type == null) {
//                                                        throw new RuntimeException("未找到对应的类型: " + reference.getDomain());
//                                                    }
                                                        if (type != null){
                                                            reference.setType_id(type.getType_id());
                                                        }
                                                    }
                                                    // 根据 domain 查询类型
                                                    return reference;
                                                })
                                                .collect(Collectors.toList());
@@ -355,6 +567,7 @@
                                }
                            }
                        } catch (Exception e) {
                            log.error(e.getMessage(), e);
                            System.out.println("处理问题结果失败: " + e.getMessage());
                        }
                    }
@@ -366,22 +579,24 @@
                    questionService.updateBatchById(questionsToUpdate);
                    System.out.println("成功批量更新 " + questionsToUpdate.size() + " 个问题");
                }
                referenceService.saveBatch(allReferences);
                // 5. 批量插入引用,使用流式分批处理
                if (!allReferences.isEmpty()) {
                    int batchSize = 1000;
                    IntStream.iterate(0, i -> i + batchSize)
                            .limit((allReferences.size() + batchSize - 1) / batchSize)
                            .forEach(i -> {
                                List<Reference> batch = allReferences.subList(
                                        i, Math.min(i + batchSize, allReferences.size()));
                                referenceService.saveBatch(batch);
                            });
                    System.out.println("成功批量插入 " + allReferences.size() + " 条引用数据");
                }
//                if (!allReferences.isEmpty()) {
//                    int batchSize = 1000;
//                    IntStream.iterate(0, i -> i + batchSize)
//                            .limit((allReferences.size() + batchSize - 1) / batchSize)
//                            .forEach(i -> {
//                                List<Reference> batch = allReferences.subList(
//                                        i, Math.min(i + batchSize, allReferences.size()));
//                                referenceService.saveBatch(batch);
//                            });
//                    System.out.println("成功批量插入 " + allReferences.size() + " 条引用数据");
//                }
            } catch (Exception e) {
                System.out.println("更新问题和引用数据失败: " + e.getMessage());
                log.error("更新问题和引用数据失败: " ,e.getMessage(), e);
//                System.out.println("更新问题和引用数据失败: " + e.getMessage());
                throw new RuntimeException("更新问题和引用数据失败", e);
            }
        });
@@ -414,4 +629,17 @@
                .onErrorResume(e -> Mono.just(
                        new HealthResponse("unhealthy", null, "", e.getMessage())));
    }
    /**
     * 查询服务器资源
     */
    @GetMapping("/server/resource")
    public Mono<ServerResourceResponse> getServerResource() {
        return webClient.get()
                .uri(baseUrl + "/api/v1/system/resources")
                .retrieve()
                .bodyToMono(ServerResourceResponse.class)
                .onErrorResume(e -> Mono.just(
                        new ServerResourceResponse( e.getMessage())));
    }
}