| | |
| | | import java.time.LocalDateTime; |
| | | import java.time.format.DateTimeFormatter; |
| | | import java.util.*; |
| | | import java.util.concurrent.BlockingQueue; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | import java.util.concurrent.*; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | import java.util.stream.Collectors; |
| | | |
| | |
| | | private UserService userService; |
| | | @Autowired |
| | | private OrderService orderService; |
| | | // 1. 使用线程安全的队列实现 |
| | | private final BlockingQueue<SearchTaskRequest> taskQueue = new LinkedBlockingQueue<>(); |
| | | // 添加队列访问锁 |
| | | private final ReentrantLock queueLock = new ReentrantLock(); |
| | | |
| | | // 替换为线程安全队列 |
| | | private static final Queue<SearchTaskRequest> taskQueue = new ConcurrentLinkedQueue<>(); |
| | | // 全局映射:关键词ID -> 批次队列 |
| | | private static final ConcurrentMap<Integer, Queue<List<UserDto>>> batchQueues = new ConcurrentHashMap<>(); |
| | | |
| | | private static boolean isProcessing = false; |
| | | |
| | | @PostMapping("/search") |
| | |
| | | } |
| | | |
| | | // 将新的任务请求加入队列 |
| | | // taskQueue.add(searchTaskRequest); |
| | | queueLock.lock(); |
| | | try { |
| | | taskQueue.add(searchTaskRequest); |
| | | } finally { |
| | | queueLock.unlock(); |
| | | } |
| | | taskQueue.add(searchTaskRequest); |
| | | |
| | | // 如果当前没有任务在处理中,则启动任务队列的处理 |
| | | if (!isProcessing) { |
| | |
| | | .subscribe(); |
| | | } |
| | | } |
| | | |
| | | private Mono<ResponseResult<String>> executeBatchTask(SearchTaskRequest searchTaskRequest) { |
| | | Integer keywordId = searchTaskRequest.getKeyword_id(); |
| | | // ... 原有逻辑 ... |
| | | int maxConcurrentUsers = searchTaskRequest.getConfig() != null ? |
| | | searchTaskRequest.getConfig().getMax_concurrent_users() : 3; |
| | | List<List<UserDto>> userBatches = splitUsersIntoBatches(searchTaskRequest.getUsers(), maxConcurrentUsers, keywordId); |
| | | |
| | | |
| | | // 创建批次队列并存入全局映射 |
| | | Queue<List<UserDto>> batchQueue = new ConcurrentLinkedQueue<>(userBatches); |
| | | batchQueues.put(keywordId, batchQueue); // 存储到全局映射 |
| | | |
| | | return Mono.just(ResponseResult.success("第一个批次已开始")) |
| | | .doOnTerminate(() -> { |
| | | executeBatchTask(batchQueue, searchTaskRequest, keywordId) |
| | | .subscribe(); |
| | | }); |
| | | } |
| | | /* private Mono<ResponseResult<String>> executeBatchTask(SearchTaskRequest searchTaskRequest) { |
| | | log.info("开始处理任务:{}", searchTaskRequest); |
| | | log.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++"); |
| | | Integer keywordId = searchTaskRequest.getKeyword_id(); |
| | |
| | | executeBatchTask(batchQueue, searchTaskRequest, keywordId) |
| | | .subscribe(); // 使用subscribe()启动后台任务 |
| | | }); |
| | | } |
| | | }*/ |
| | | |
| | | private Mono<ResponseResult<?>> executeBatchTask(Queue<List<UserDto>> batchQueue, SearchTaskRequest searchTaskRequest, Integer keywordId) { |
| | | // 如果队列为空,说明所有批次已经完成 |
| | | if (batchQueue.isEmpty()) { |
| | | // if (batchQueue.isEmpty()) { |
| | | // return Mono.just(ResponseResult.success("所有批次已完成")); |
| | | // } |
| | | if (batchQueue == null || batchQueue.isEmpty()) { |
| | | // 清理资源 |
| | | batchQueues.remove(keywordId); |
| | | return Mono.just(ResponseResult.success("所有批次已完成")); |
| | | } |
| | | |
| | |
| | | |
| | | return createSingleBatchTask(batchRequest) |
| | | .flatMap(taskResponse -> { |
| | | // if (taskResponse != null && taskResponse.getTask_id() != null) { |
| | | // // 保存任务关联到数据库 |
| | | // return saveKeywordTasks(keywordId, taskResponse) |
| | | // .then(waitForTaskCompletion(taskResponse.getTask_id(), batchQueue, searchTaskRequest, keywordId)); |
| | | // } else { |
| | | // return Mono.just(ResponseResult.error("创建批次任务失败")); |
| | | // } |
| | | if (taskResponse != null && taskResponse.getTask_id() != null) { |
| | | // 直接等待任务完成,不再保存任务关联信息 |
| | | return waitForTaskCompletion(taskResponse.getTask_id(), batchQueue, searchTaskRequest, keywordId); |
| | | } else { |
| | | return Mono.just(ResponseResult.error("创建批次任务失败")); |
| | | } |
| | | }) |
| | | .doFinally(signal -> { |
| | | // 任务完成时清理资源 |
| | | if (batchQueue.isEmpty()) { |
| | | batchQueues.remove(keywordId); |
| | | } |
| | | }); |
| | | } |
| | |
| | | // 查询任务状态 |
| | | return getTaskStatus(taskId) |
| | | .flatMap(statusResponse -> { |
| | | // 检查任务是否被取消 |
| | | if ("cancelled".equalsIgnoreCase(statusResponse.getStatus())) { |
| | | batchQueues.remove(keywordId); // 清理资源 |
| | | return Mono.just(ResponseResult.success("任务已被取消")); |
| | | } |
| | | // 如果任务状态是"submitted"或"running",继续轮询 |
| | | if (!"completed".equalsIgnoreCase(statusResponse.getStatus()) && !"failed".equalsIgnoreCase(statusResponse.getStatus()) && !"cancelled".equalsIgnoreCase(statusResponse.getStatus()) ) { |
| | | return Mono.delay(Duration.ofSeconds(5)) // 延迟 5 秒后再次查询 |
| | |
| | | // keywordTaskService.updateById(taskToUpdate); |
| | | |
| | | |
| | | //设置订单进入采集状态 |
| | | LambdaUpdateWrapper<Orders> updateOrderWrapper = new LambdaUpdateWrapper<>(); |
| | | updateOrderWrapper.eq(Orders::getOrder_id, keyword.getOrder_id()) // 确保字段名正确 |
| | | .set(Orders::getStatus, 2); // 直接设置状态值 |
| | | //所有关键词都在采集中或者已完成或者错误设置订单进入采集状态 |
| | | List<Keyword> orderKeywords = keywordService.list(new LambdaQueryWrapper<Keyword>() |
| | | .eq(Keyword::getOrder_id, keyword.getOrder_id())); |
| | | if (!orderKeywords.isEmpty() && orderKeywords.stream().allMatch(k -> |
| | | "completed".equals(k.getStatus()) || "submitted".equals(k.getStatus()) |
| | | )) { |
| | | Orders orders = orderService.getById(keyword.getOrder_id()); |
| | | if (orders != null) { |
| | | orders.setStatus(2); |
| | | orderService.updateById(orders); |
| | | |
| | | boolean success = orderService.update(updateOrderWrapper); |
| | | log.info("订单状态更新结果: {}", success ? "成功" : "失败"); |
| | | |
| | | } |
| | | } |
| | | }).subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行 |
| | | .thenReturn(taskResponse); |
| | | } |
| | |
| | | @PostMapping("/cancel/{keywordId}") |
| | | @ApiOperation(value = "取消任务") |
| | | public Mono<ResponseResult<TaskCancelResponse>> cancelTask(@PathVariable Integer keywordId) { |
| | | // 1. 从主队列移除任务 |
| | | List<SearchTaskRequest> removedMainQueueTasks = removeTasksFromQueueByKeywordId(keywordId); |
| | | int removedMainQueueCount = removedMainQueueTasks.size(); // 获取移除的任务数量 |
| | | |
| | | // 2. 从批次队列移除任务 (新增逻辑) |
| | | int removedBatchQueue = removeBatchTasksByKeywordId(keywordId); |
| | | |
| | | // 3. 查询所有与关键词相关的任务 |
| | | List<KeywordTask> tasks = keywordTaskService.list( |
| | | new LambdaQueryWrapper<KeywordTask>().eq(KeywordTask::getKeyword_id, keywordId) |
| | | ); |
| | | |
| | | // 4. 筛选出需要远程取消的任务 |
| | | List<KeywordTask> tasksToCancelRemotely = tasks.stream() |
| | | .filter(task -> task.getTask_id() != null && "pending".equalsIgnoreCase(task.getStatus())) |
| | | .collect(Collectors.toList()); |
| | | |
| | | return Flux.fromIterable(tasksToCancelRemotely) |
| | | .flatMap(task -> { |
| | | // 创建状态更新和远程取消的组合操作 |
| | | Mono<Void> updateStatus = updateTaskStatus(task.getTask_id(), "canceled"); |
| | | Mono<ResponseResult<?>> cancelOp = cancelRemoteTask(task.getTask_id()) |
| | | .onErrorResume(e -> { |
| | | log.error("取消任务 {} 失败: {}", task.getTask_id(), e.getMessage()); |
| | | return Mono.just(ResponseResult.error("取消任务失败: " + e.getMessage())); |
| | | }); |
| | | |
| | | return Mono.zip(cancelOp, updateStatus) |
| | | .thenReturn(true); |
| | | }, 10) |
| | | .collectList() |
| | | .flatMap(canceledTasks -> { |
| | | return updateKeywordAndOrderStatus(keywordId) |
| | | .thenReturn(ResponseResult.success( |
| | | new TaskCancelResponse( |
| | | String.format("任务已取消: 主队列移除%d, 批次队列移除%d, 远程取消%d", |
| | | removedMainQueueCount , |
| | | removedBatchQueue, |
| | | tasksToCancelRemotely.size()) |
| | | ) |
| | | )); |
| | | }); |
| | | } |
| | | |
| | | // 新增方法:移除批次队列 |
| | | private int removeBatchTasksByKeywordId(Integer keywordId) { |
| | | Queue<List<UserDto>> batchQueue = batchQueues.remove(keywordId); |
| | | if (batchQueue != null) { |
| | | int count = batchQueue.size(); |
| | | batchQueue.clear(); |
| | | log.info("从批次队列中移除关键词 {} 的 {} 个批次任务", keywordId, count); |
| | | return count; |
| | | } |
| | | return 0; |
| | | } |
| | | // 辅助方法:获取待取消任务 |
| | | private List<KeywordTask> getTasksToCancel(Integer keywordId) { |
| | | return keywordTaskService.list( |
| | | new LambdaQueryWrapper<KeywordTask>() |
| | | .eq(KeywordTask::getKeyword_id, keywordId) |
| | | .isNotNull(KeywordTask::getTask_id) |
| | | .eq(KeywordTask::getStatus, "pending") |
| | | ); |
| | | } |
| | | /* @PostMapping("/cancel/{keywordId}") |
| | | @ApiOperation(value = "取消任务") |
| | | public Mono<ResponseResult<TaskCancelResponse>> cancelTask(@PathVariable Integer keywordId) { |
| | | // 1. 查询所有与关键词相关的任务 |
| | | List<KeywordTask> tasks = keywordTaskService.list( |
| | | new LambdaQueryWrapper<KeywordTask>().eq(KeywordTask::getKeyword_id, keywordId) |
| | |
| | | .thenReturn(true); |
| | | }, 10) // 设置10的并发度 |
| | | .collectList() |
| | | .thenReturn(ResponseResult.success( |
| | | new TaskCancelResponse( |
| | | String.format("关键词任务已取消,队列中移除 %d 个任务,远程取消 %d 个任务", |
| | | removedQueueTasks.size(), |
| | | tasksToCancelRemotely.size()) |
| | | ) |
| | | )) |
| | | .flatMap(canceledTasks -> { |
| | | // 5. 更新关键词和订单状态 |
| | | return updateKeywordAndOrderStatus(keywordId) |
| | | .thenReturn(ResponseResult.success( |
| | | new TaskCancelResponse( |
| | | String.format("关键词任务已取消,队列中移除 %d 个任务,远程取消 %d 个任务", |
| | | removedQueueTasks.size(), |
| | | tasksToCancelRemotely.size()) |
| | | ) |
| | | )); |
| | | }) |
| | | .onErrorResume(e -> { |
| | | log.error("取消关键词任务失败: {}", e.getMessage()); |
| | | return Mono.just(ResponseResult.error(500, "取消关键词任务失败: " + e.getMessage())); |
| | | }); |
| | | } |
| | | }*/ |
| | | // 提取关键词和订单状态更新的逻辑为单独方法 |
| | | private Mono<Void> updateKeywordAndOrderStatus(Integer keywordId) { |
| | | return Mono.fromRunnable(() -> { |
| | | try { |
| | | // 查询关键词 |
| | | Keyword keyword = keywordService.getById(keywordId); |
| | | if (keyword == null) { |
| | | log.warn("未找到关键词,keywordId: {}", keywordId); |
| | | return; |
| | | } |
| | | |
| | | // @PostMapping("/cancel/{keywordId}") |
| | | // @ApiOperation(value = "取消任务") |
| | | // public Mono<ResponseResult<TaskCancelResponse>> cancelTask(@PathVariable Integer keywordId) { |
| | | // // 1. 查询所有与关键词相关的任务 |
| | | // List<KeywordTask> tasks = keywordTaskService.list( |
| | | // new LambdaQueryWrapper<KeywordTask>().eq(KeywordTask::getKeyword_id, keywordId) |
| | | // ); |
| | | // |
| | | // // 2. 从队列中移除所有相关任务 |
| | | // List<SearchTaskRequest> removedQueueTasks = removeTasksFromQueueByKeywordId(keywordId); |
| | | // |
| | | // // 3. 筛选出需要远程取消的任务(任务ID不为空且状态为pending) |
| | | // List<KeywordTask> tasksToCancelRemotely = tasks.stream() |
| | | // .filter(task -> task.getTask_id() != null && "pending".equalsIgnoreCase(task.getStatus())) |
| | | // .collect(Collectors.toList()); |
| | | // // 检查是否有任务与关键词相关 |
| | | //// if (tasks.isEmpty()) { |
| | | //// return Mono.just(ResponseResult.error("没有找到相关任务")); |
| | | //// } |
| | | // |
| | | // // 4. 对筛选出的任务发送远程取消请求 |
| | | // List<Mono<ResponseResult<?>>> cancelRequests = tasksToCancelRemotely.stream() |
| | | // .map(task -> cancelRemoteTask(task.getTask_id()) |
| | | // .doOnSuccess(response -> { |
| | | // // 更新任务状态为canceled |
| | | // updateTaskStatus(task.getTask_id(), "canceled").subscribe(); |
| | | // }) |
| | | // .onErrorResume(e -> { |
| | | // log.error("取消任务 {} 失败: {}", task.getTask_id(), e.getMessage()); |
| | | // // 即使取消失败,也尝试更新状态 |
| | | // updateTaskStatus(task.getTask_id(), "canceled").subscribe(); |
| | | // return Mono.just(ResponseResult.error("取消任务失败: " + e.getMessage())); |
| | | // })) |
| | | // .collect(Collectors.toList()); |
| | | // |
| | | // // 5. 并行执行所有取消请求 |
| | | // return Flux.fromIterable(cancelRequests) |
| | | // .concatMap(request -> request) // 顺序执行,而非并行 |
| | | // .collectList() |
| | | // .thenReturn(ResponseResult.success( |
| | | // new TaskCancelResponse( |
| | | // String.format("关键词任务已取消,队列中移除 %d 个任务,远程取消 %d 个任务", |
| | | // removedQueueTasks.size(), |
| | | // tasksToCancelRemotely.size()) |
| | | // ) |
| | | // )) |
| | | // .onErrorResume(e -> { |
| | | // log.error("取消关键词任务失败: {}", e.getMessage()); |
| | | // return Mono.just(ResponseResult.error(500, "取消关键词任务失败: " + e.getMessage())); |
| | | // }); |
| | | // } |
| | | // 线程安全的队列移除方法 |
| | | //把任务id为空的删除 |
| | | LambdaUpdateWrapper<KeywordTask> updateWrapper = new LambdaUpdateWrapper<>(); |
| | | updateWrapper.eq(KeywordTask::getKeyword_id, keywordId); |
| | | updateWrapper.isNull(KeywordTask::getTask_id); |
| | | keywordTaskService.remove(updateWrapper); |
| | | // 查询该关键词下的所有任务 |
| | | LambdaQueryWrapper<KeywordTask> keywordTaskWrapper = new LambdaQueryWrapper<>(); |
| | | keywordTaskWrapper.eq(KeywordTask::getKeyword_id, keywordId); |
| | | |
| | | List<KeywordTask> keywordTasks = keywordTaskService.list(keywordTaskWrapper); |
| | | |
| | | // 更新关键词状态 |
| | | // if (keywordTasks.stream().allMatch(task -> |
| | | // "completed".equals(task.getStatus()) || "false".equals(task.getStatus()) || "canceled".equals(task.getStatus()) |
| | | // )) { |
| | | keyword.setStatus("canceled"); |
| | | keywordService.updateById(keyword); |
| | | // log.info("关键词 {} 所有任务已完成,更新状态为 completed", keywordId); |
| | | // } |
| | | //更新提问词状态为取消 |
| | | |
| | | questionService.update(new LambdaUpdateWrapper<Question>().eq(Question::getKeyword_id, keywordId).set(Question::getStatus, "canceled")); |
| | | |
| | | // 更新订单状态 |
| | | String orderId = keyword.getOrder_id(); |
| | | if (orderId != null && !orderId.isEmpty()) { |
| | | // 查询订单下所有关键词 |
| | | LambdaQueryWrapper<Keyword> orderKeywordsWrapper = new LambdaQueryWrapper<>(); |
| | | orderKeywordsWrapper.eq(Keyword::getOrder_id, orderId); |
| | | List<Keyword> orderKeywords = keywordService.list(orderKeywordsWrapper); |
| | | |
| | | // 所有关键词均已完成,则更新订单状态为3 |
| | | if (!orderKeywords.isEmpty() && orderKeywords.stream().allMatch(k -> |
| | | "completed".equals(k.getStatus()) || "false".equals(k.getStatus()) |
| | | )) { |
| | | Orders orders = orderService.getById(orderId); |
| | | if (orders != null) { |
| | | orders.setStatus(3); |
| | | orderService.updateById(orders); |
| | | log.info("订单 {} 所有关键词已完成,更新状态为3", orderId); |
| | | } |
| | | } |
| | | if (!orderKeywords.isEmpty() && orderKeywords.stream().allMatch(k -> |
| | | !"completed".equals(k.getStatus()) || "false".equals(k.getStatus()) || "canceled".equals(k.getStatus()) |
| | | )) { |
| | | Orders orders = orderService.getById(orderId); |
| | | if (orders != null) { |
| | | orders.setStatus(1); |
| | | orderService.updateById(orders); |
| | | log.info("订单 {} 所有关键词已完成或者取消,更新状态为1", orderId); |
| | | } |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("更新关键词和订单状态失败: {}", e.getMessage(), e); |
| | | } |
| | | }); |
| | | } |
| | | private List<SearchTaskRequest> removeTasksFromQueueByKeywordId(Integer keywordId) { |
| | | List<SearchTaskRequest> removedTasks = new ArrayList<>(); |
| | | |
| | | // 使用锁保证队列操作的原子性 |
| | | queueLock.lock(); |
| | | try { |
| | | Iterator<SearchTaskRequest> iterator = taskQueue.iterator(); |
| | | while (iterator.hasNext()) { |
| | | SearchTaskRequest task = iterator.next(); |
| | |
| | | iterator.remove(); |
| | | } |
| | | } |
| | | } finally { |
| | | queueLock.unlock(); |
| | | } |
| | | |
| | | |
| | | log.info("从队列中移除了 {} 个与关键词ID {} 相关的任务", removedTasks.size(), keywordId); |
| | | return removedTasks; |
| | | } |
| | | |
| | | // 从队列中移除所有关键词ID匹配的任务 |
| | | // private List<SearchTaskRequest> removeTasksFromQueueByKeywordId(Integer keywordId) { |
| | | // List<SearchTaskRequest> removedTasks = new ArrayList<>(); |
| | | // |
| | | // // 使用迭代器安全地移除元素 |
| | | // Iterator<SearchTaskRequest> iterator = taskQueue.iterator(); |
| | | // while (iterator.hasNext()) { |
| | | // SearchTaskRequest task = iterator.next(); |
| | | // if (task.getKeyword_id() != null && task.getKeyword_id().equals(keywordId)) { |
| | | // removedTasks.add(task); |
| | | // iterator.remove(); |
| | | // } |
| | | // } |
| | | // |
| | | // log.info("从队列中移除了 {} 个与关键词ID {} 相关的任务", removedTasks.size(), keywordId); |
| | | // return removedTasks; |
| | | // } |
| | | |
| | | // 发送远程取消请求 |
| | | // 发送远程取消请求(使用Java 8兼容的Map创建方式) |
| | | private Mono<ResponseResult<?>> cancelRemoteTask(String taskId) { |
| | |
| | | keywordTaskService.update(updateWrapper); |
| | | }).subscribeOn(Schedulers.boundedElastic()).then(); |
| | | } |
| | | /* @PostMapping("/cancel/{taskId}") |
| | | @ApiOperation(value = "取消任务") |
| | | public Mono<ResponseResult<TaskCancelResponse>> cancelTask(@PathVariable String taskId) { |
| | | return webClient.post() |
| | | .uri(baseUrl + "/api/v1/tasks/" + taskId + "/cancel") |
| | | .contentType(MediaType.APPLICATION_JSON) |
| | | .bodyValue(Collections.emptyMap()) // 添加空请求体 |
| | | .retrieve() |
| | | .onStatus(HttpStatus::isError, response -> response.bodyToMono(TaskCancelResponse.class) |
| | | .flatMap(errorBody -> Mono.error(new RuntimeException(errorBody.getDetail())))) |
| | | .bodyToMono(TaskCancelResponse.class) |
| | | .flatMap(cancelResponse -> { |
| | | // 更新关键词状态 |
| | | Mono<Void> updateKeyword = Mono.fromRunnable(() -> { |
| | | LambdaUpdateWrapper<Keyword> updateWrapper = new LambdaUpdateWrapper<>(); |
| | | updateWrapper.eq(Keyword::getTask_id, taskId); |
| | | updateWrapper.set(Keyword::getStatus, "canceled"); // 统一使用"canceled" |
| | | keywordService.update(updateWrapper); |
| | | }) |
| | | .subscribeOn(Schedulers.boundedElastic()) |
| | | .then(); |
| | | |
| | | // 更新关键词任务状态 |
| | | Mono<Void> updateKeywordTask = Mono.fromRunnable(() -> { |
| | | LambdaUpdateWrapper<KeywordTask> updateWrapper = new LambdaUpdateWrapper<>(); |
| | | updateWrapper.eq(KeywordTask::getTask_id, taskId); |
| | | updateWrapper.set(KeywordTask::getStatus, "canceled"); // 统一使用"canceled" |
| | | keywordTaskService.update(updateWrapper); |
| | | }) |
| | | .subscribeOn(Schedulers.boundedElastic()) |
| | | .then(); |
| | | |
| | | // 并行执行两个更新操作,并在完成后返回cancelResponse |
| | | return Mono.when(updateKeyword, updateKeywordTask) |
| | | .thenReturn(cancelResponse); |
| | | }) |
| | | .map(data -> ResponseResult.success(data)) |
| | | .onErrorResume(e -> { |
| | | if (e.getMessage().contains("任务不存在")) { |
| | | return Mono.just(ResponseResult.error(200, e.getMessage())); |
| | | } else if (e.getMessage().contains("无法取消")) { |
| | | return Mono.just(ResponseResult.error(200, e.getMessage())); |
| | | } |
| | | return Mono.just(ResponseResult.error(500, e.getMessage())); |
| | | }); |
| | | }*/ |
| | | |
| | | @ApiOperation(value = "获取任务结果") |
| | | @GetMapping("/tasks/{taskId}") |
| | | public Mono<TaskResultResponse> getTaskResult(@PathVariable String taskId) { |
| | |
| | | LambdaQueryWrapper<KeywordTask> keywordTaskWrapper = new LambdaQueryWrapper<>(); |
| | | keywordTaskWrapper.eq(KeywordTask::getTask_id, result.getTask_id()); |
| | | KeywordTask keywordTask = keywordTaskService.getOne(keywordTaskWrapper); |
| | | // keywordTask.setStatus("completed"); |
| | | // keywordTaskService.updateById(keywordTask); |
| | | keywordTask.setStatus("completed"); |
| | | keywordTaskService.updateById(keywordTask); |
| | | Keyword keyword = keywordService.getById(keywordTask.getKeyword_id()); |
| | | |
| | | if (keyword == null) { |
| | |
| | | List<KeywordTask> keywordTasks = keywordTaskService.list(keywordTaskWrapper2); |
| | | |
| | | //如果全部为completed 关键词也为completed ,如果关联关系没有任务id,或者状态为running ,关键词为submitted, |
| | | if (keywordTasks.stream().allMatch(task -> "completed".equals(task.getStatus())) ) { |
| | | if (keywordTasks.stream().allMatch(task -> "completed".equals(task.getStatus()) || "false".equals(task.getStatus()) || "canceled".equals(task.getStatus())) ) { |
| | | keyword.setStatus("completed"); |
| | | keywordService.updateById(keyword); |
| | | |
| | | } |
| | | //如果有一个task为failed设置关键词为false |
| | | else if (keywordTasks.stream().anyMatch(task -> "failed".equals(task.getStatus()))) { |
| | | keyword.setStatus("false"); |
| | | keywordService.updateById(keyword); |
| | | } |
| | | // else if (keywordTasks.stream().anyMatch(task -> "failed".equals(task.getStatus()))) { |
| | | // keyword.setStatus("false"); |
| | | // keywordService.updateById(keyword); |
| | | // } |
| | | |
| | | |
| | | |
| | |
| | | System.out.println("订单[" + orderId + "]下无关键词,跳过状态更新"); |
| | | return; |
| | | } |
| | | boolean allValid2 = orderKeywords.stream() |
| | | .allMatch(k -> "completed".equals(k.getStatus()) || "false".equals(k.getStatus()) || "canceled".equals(k.getStatus())); |
| | | if (allValid2) { |
| | | Orders orders = orderService.getById(orderId); |
| | | if (orders != null) { |
| | | orders.setStatus(1); // 假设Orders有Integer类型的status字段 |
| | | orderService.updateById(orders); |
| | | System.out.println("订单[" + orderId + "]所有关键词采集完成或者取消,已更新状态为1"); |
| | | } else { |
| | | System.out.println("未找到订单[" + orderId + "],无法更新状态"); |
| | | } |
| | | } |
| | | // 3. 检查所有关键词的状态是否均为 completed 或 false |
| | | boolean allValid = orderKeywords.stream() |
| | | .allMatch(k -> "completed".equals(k.getStatus()) || "false".equals(k.getStatus()) || "canceled".equals(k.getStatus())); |
| | | .allMatch(k -> "completed".equals(k.getStatus()) || "false".equals(k.getStatus())); |
| | | |
| | | // 4. 若所有关键词状态均有效,更新订单状态为3 |
| | | if (allValid) { |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | Orders orders = orderService.getById(keyword.getOrder_id()); |
| | | |
| | | // 2. 批量查询所有问题 |