From e229a5d7c7ca4c3d3353d8ab6e34df2d6ef642ef Mon Sep 17 00:00:00 2001 From: guyue <1721849008@qq.com> Date: 星期五, 11 七月 2025 12:24:52 +0800 Subject: [PATCH] 更新 --- src/main/java/com/linghu/controller/CollectController.java | 161 ++++++++++++++++++++++++++++++++++++++++------------- 1 files changed, 122 insertions(+), 39 deletions(-) diff --git a/src/main/java/com/linghu/controller/CollectController.java b/src/main/java/com/linghu/controller/CollectController.java index bbd9f8e..40aead8 100644 --- a/src/main/java/com/linghu/controller/CollectController.java +++ b/src/main/java/com/linghu/controller/CollectController.java @@ -6,26 +6,18 @@ import java.util.*; import java.util.stream.Collectors; -import javax.annotation.Resource; import javax.servlet.http.HttpServletRequest; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.linghu.mapper.PlatformMapper; -import com.linghu.mapper.TypeMapper; import com.linghu.model.dto.*; import com.linghu.model.entity.*; import com.linghu.service.*; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.*; -import org.springframework.web.client.RestTemplate; import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; @@ -37,7 +29,6 @@ import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.SignalType; import org.springframework.web.bind.annotation.* ; import org.springframework.http.HttpStatus; @@ -76,6 +67,8 @@ private PlatformService platformService; @Autowired private TypeService typeService; + @Autowired + private UserService userService; /* @PostMapping("/search") @@ -199,39 +192,78 @@ .next() // 找到第一个完成的响应后结束流 .then(); // 转换为Mono<Void> }*/ + // 添加一个辅助方法来安全地将字符串转换为double + private double parseUsage(String usageStr) { + try { + if (usageStr != null) { + // 移除可能存在的百分号 + usageStr = usageStr.replace("%", "").trim(); + return Double.parseDouble(usageStr); + } + return 0.0; + } catch (NumberFormatException e) { + log.error("解析资源使用率失败: {}", e.getMessage()); + return 0.0; + } + } @PostMapping("/search") @ApiOperation(value = "开始采集") - public Mono<List<SearchTaskResponse>> createSearchTask( + public Mono<ResponseResult<?>> createSearchTask( @RequestBody SearchTaskRequest searchTaskRequest, HttpServletRequest request) throws JsonProcessingException { - int maxConcurrentUsers = searchTaskRequest.getConfig() != null ? - searchTaskRequest.getConfig().getMax_concurrent_users() : 3; + // 首先检查服务器资源 + return getServerResource() + .flatMap(resourceResponse -> { + // 将字符串类型的使用率转换为double类型 + double cpuUsage = parseUsage(resourceResponse.getCpu_usage_percent()); + double memoryUsage = parseUsage(resourceResponse.getMemory_usage_percent()); + // 检查CPU和内存使用率 + if (cpuUsage >= 90.0 || memoryUsage >= 90.0) { + String errorMsg = String.format("服务器资源不足:CPU使用率 %.1f%%,内存使用率 %.1f%%", + resourceResponse.getCpu_usage_percent(), resourceResponse.getMemory_usage_percent()); + log.warn(errorMsg); + return Mono.just(ResponseResult.error(503, errorMsg)); + } + Integer keywordId = searchTaskRequest.getKeyword_id(); - // 获取 keywordId - Integer keywordId = searchTaskRequest.getKeyword_id(); - //分割 - List<List<UserDto>> userBatches = splitUsersIntoBatches(searchTaskRequest.getUsers(), maxConcurrentUsers,keywordId); + int maxConcurrentUsers = searchTaskRequest.getConfig() != null ? + searchTaskRequest.getConfig().getMax_concurrent_users() : 3; + List<List<UserDto>> userBatches = splitUsersIntoBatches(searchTaskRequest.getUsers(), maxConcurrentUsers,keywordId); - return Flux.fromIterable(userBatches) - .flatMap(batch -> { - SearchTaskRequest batchRequest = new SearchTaskRequest(); - batchRequest.setUsers(batch); - batchRequest.setQuestions(searchTaskRequest.getQuestions()); - batchRequest.setConfig(searchTaskRequest.getConfig()); - batchRequest.setSave_to_database(searchTaskRequest.getSave_to_database()); - batchRequest.setWebhook_url(searchTaskRequest.getWebhook_url()); - batchRequest.setKeyword_id(keywordId); + // 获取 keywordId - return createSingleBatchTask(batchRequest) - .delaySubscription(Duration.ofSeconds(2)); // 批次之间添加延迟 - }, 1) // 限制并发数为1,确保顺序执行 - .collectList() // 收集所有批次的响应 - .flatMap(responses -> + //分割 - saveKeywordTasks(keywordId, responses) // 保存关联关系 - .thenReturn(responses) // 返回原始响应 - ); + + return Flux.fromIterable(userBatches) + .flatMap(batch -> { + SearchTaskRequest batchRequest = new SearchTaskRequest(); + batchRequest.setUsers(batch); + batchRequest.setQuestions(searchTaskRequest.getQuestions()); + batchRequest.setConfig(searchTaskRequest.getConfig()); + batchRequest.setSave_to_database(searchTaskRequest.getSave_to_database()); + batchRequest.setWebhook_url(searchTaskRequest.getWebhook_url()); + batchRequest.setKeyword_id(keywordId); + + return createSingleBatchTask(batchRequest) + .delaySubscription(Duration.ofSeconds(2)); // 批次之间添加延迟 + }, 1) // 限制并发数为1,确保顺序执行 + .collectList() // 收集所有批次的响应 + .flatMap(responses -> + saveKeywordTasks(keywordId, responses) // 保存关联关系 + .thenReturn(responses) // 返回原始响应 + ) + .map(responses -> ResponseResult.success(responses)) // 使用ResponseResult包装结果 + .onErrorResume(e -> { + log.error("创建搜索任务失败: {}", e.getMessage(), e); + return Mono.just(ResponseResult.error("创建搜索任务失败: " + e.getMessage())); + }); + }) + .onErrorResume(e -> { + log.error("检查服务器资源失败: {}", e.getMessage(), e); + return Mono.just(ResponseResult.error("检查服务器资源失败: " + e.getMessage())); + }); } private Mono<Void> saveKeywordTasks(Integer keywordId, List<SearchTaskResponse> taskResponses) { @@ -352,14 +384,39 @@ .onStatus(HttpStatus::isError, response -> response.bodyToMono(TaskCancelResponse.class) .flatMap(errorBody -> Mono.error(new RuntimeException(errorBody.getDetail())))) .bodyToMono(TaskCancelResponse.class) + .flatMap(cancelResponse -> { + // 更新关键词状态 + Mono<Void> updateKeyword = Mono.fromRunnable(() -> { + LambdaUpdateWrapper<Keyword> updateWrapper = new LambdaUpdateWrapper<>(); + updateWrapper.eq(Keyword::getTask_id, taskId); + updateWrapper.set(Keyword::getStatus, "canceled"); // 统一使用"canceled" + keywordService.update(updateWrapper); + }) + .subscribeOn(Schedulers.boundedElastic()) + .then(); + + // 更新关键词任务状态 + Mono<Void> updateKeywordTask = Mono.fromRunnable(() -> { + LambdaUpdateWrapper<KeywordTask> updateWrapper = new LambdaUpdateWrapper<>(); + updateWrapper.eq(KeywordTask::getTask_id, taskId); + updateWrapper.set(KeywordTask::getStatus, "canceled"); // 统一使用"canceled" + keywordTaskService.update(updateWrapper); + }) + .subscribeOn(Schedulers.boundedElastic()) + .then(); + + // 并行执行两个更新操作,并在完成后返回cancelResponse + return Mono.when(updateKeyword, updateKeywordTask) + .thenReturn(cancelResponse); + }) .map(data -> ResponseResult.success(data)) .onErrorResume(e -> { if (e.getMessage().contains("任务不存在")) { - return Mono.just(ResponseResult.error(200, "任务不存在")); + return Mono.just(ResponseResult.error(200, e.getMessage())); } else if (e.getMessage().contains("无法取消")) { - return Mono.just(ResponseResult.error(200, "任务已完成,无法取消")); + return Mono.just(ResponseResult.error(200, e.getMessage())); } - return Mono.just(ResponseResult.error(500, "取消任务失败: " + e.getMessage())); + return Mono.just(ResponseResult.error(500, e.getMessage())); }); } @@ -391,7 +448,10 @@ }) .onErrorResume(e -> { System.out.println("获取任务结果失败"); - TaskResultResponse result = new TaskResultResponse(); + if (e.getMessage().contains("登陆失败")){ + + } + TaskResultResponse result = new TaskResultResponse(); result.setDetail("获取任务结果失败: " + e.getMessage()); return Mono.just(result); }); @@ -466,6 +526,8 @@ private Mono<Void> updateQuestionAndReference(TaskResultResponse result) { return Mono.fromRunnable(() -> { try { + //查看每个账号信息的status是否正常 + // 1. 根据KeywordTask更新关键词状态 // 查询关键词ID LambdaQueryWrapper<KeywordTask> keywordTaskWrapper = new LambdaQueryWrapper<>(); @@ -480,8 +542,29 @@ throw new Exception("未找到关联的关键词,task_id: " + result.getTask_id()); // return; } - keyword.setStatus("completed"); - keywordService.updateById(keyword); + LambdaQueryWrapper<KeywordTask> keywordTaskWrapper2 = new LambdaQueryWrapper<>(); + keywordTaskWrapper2.eq(KeywordTask::getKeyword_id, keyword.getKeyword_id()); + List<KeywordTask> keywordTasks = keywordTaskService.list(keywordTaskWrapper2); + +// 定义状态优先级:canceled > false > completed + String finalStatus = "completed"; // 默认状态为 completed + + for (KeywordTask task : keywordTasks) { + String status = task.getStatus(); + if ("canceled".equals(status)) { + finalStatus = "canceled"; + break; // 遇到 canceled 直接跳出循环,因为优先级最高 + } else if ("false".equals(status)) { + finalStatus = "false"; + // 不跳出循环,继续检查是否存在 canceled + } + } + +// 更新关键词状态 + if (!finalStatus.equals(keyword.getStatus())) { + keyword.setStatus(finalStatus); + keywordService.updateById(keyword); + } // 2. 批量查询所有问题 LambdaQueryWrapper<Question> queryWrapper = new LambdaQueryWrapper<>(); -- Gitblit v1.7.1