From 23561fa3cb3b57e64d340a414570fc7c81259937 Mon Sep 17 00:00:00 2001
From: guyue <1721849008@qq.com>
Date: 星期日, 13 七月 2025 12:03:36 +0800
Subject: [PATCH] 取消任务

---
 src/main/java/com/linghu/controller/CollectController.java |  346 +++++++++++++++++++++++++++++++--------------------------
 1 files changed, 186 insertions(+), 160 deletions(-)

diff --git a/src/main/java/com/linghu/controller/CollectController.java b/src/main/java/com/linghu/controller/CollectController.java
index 8882145..4110880 100644
--- a/src/main/java/com/linghu/controller/CollectController.java
+++ b/src/main/java/com/linghu/controller/CollectController.java
@@ -4,8 +4,7 @@
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.*;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
@@ -71,10 +70,12 @@
     private UserService userService;
     @Autowired
     private OrderService orderService;
-    // 1. 使用线程安全的队列实现
-    private final BlockingQueue<SearchTaskRequest> taskQueue = new LinkedBlockingQueue<>();
-    // 添加队列访问锁
-    private final ReentrantLock queueLock = new ReentrantLock();
+
+    // 替换为线程安全队列
+    private static final Queue<SearchTaskRequest> taskQueue = new ConcurrentLinkedQueue<>();
+    // 全局映射:关键词ID -> 批次队列
+    private static final ConcurrentMap<Integer, Queue<List<UserDto>>> batchQueues = new ConcurrentHashMap<>();
+
     private static boolean isProcessing = false;
 
     @PostMapping("/search")
@@ -99,13 +100,7 @@
                     }
 
                     // 将新的任务请求加入队列
-//                    taskQueue.add(searchTaskRequest);
-                    queueLock.lock();
-                    try {
-                        taskQueue.add(searchTaskRequest);
-                    } finally {
-                        queueLock.unlock();
-                    }
+                    taskQueue.add(searchTaskRequest);
 
                     // 如果当前没有任务在处理中,则启动任务队列的处理
                     if (!isProcessing) {
@@ -141,8 +136,25 @@
                     .subscribe();
         }
     }
-
     private Mono<ResponseResult<String>> executeBatchTask(SearchTaskRequest searchTaskRequest) {
+        Integer keywordId = searchTaskRequest.getKeyword_id();
+        // ... 原有逻辑 ...
+        int maxConcurrentUsers = searchTaskRequest.getConfig() != null ?
+                searchTaskRequest.getConfig().getMax_concurrent_users() : 3;
+        List<List<UserDto>> userBatches = splitUsersIntoBatches(searchTaskRequest.getUsers(), maxConcurrentUsers, keywordId);
+
+
+        // 创建批次队列并存入全局映射
+        Queue<List<UserDto>> batchQueue = new ConcurrentLinkedQueue<>(userBatches);
+        batchQueues.put(keywordId, batchQueue); // 存储到全局映射
+
+        return Mono.just(ResponseResult.success("第一个批次已开始"))
+                .doOnTerminate(() -> {
+                    executeBatchTask(batchQueue, searchTaskRequest, keywordId)
+                            .subscribe();
+                });
+    }
+/*    private Mono<ResponseResult<String>> executeBatchTask(SearchTaskRequest searchTaskRequest) {
         log.info("开始处理任务:{}", searchTaskRequest);
         log.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
         Integer keywordId = searchTaskRequest.getKeyword_id();
@@ -158,11 +170,16 @@
                     executeBatchTask(batchQueue, searchTaskRequest, keywordId)
                             .subscribe();  // 使用subscribe()启动后台任务
                 });
-    }
+    }*/
 
     private Mono<ResponseResult<?>> executeBatchTask(Queue<List<UserDto>> batchQueue, SearchTaskRequest searchTaskRequest, Integer keywordId) {
         // 如果队列为空,说明所有批次已经完成
-        if (batchQueue.isEmpty()) {
+//        if (batchQueue.isEmpty()) {
+//            return Mono.just(ResponseResult.success("所有批次已完成"));
+//        }
+        if (batchQueue == null || batchQueue.isEmpty()) {
+            // 清理资源
+            batchQueues.remove(keywordId);
             return Mono.just(ResponseResult.success("所有批次已完成"));
         }
 
@@ -177,18 +194,17 @@
 
         return createSingleBatchTask(batchRequest)
                 .flatMap(taskResponse -> {
-//                    if (taskResponse != null && taskResponse.getTask_id() != null) {
-//                        // 保存任务关联到数据库
-//                        return saveKeywordTasks(keywordId, taskResponse)
-//                                .then(waitForTaskCompletion(taskResponse.getTask_id(), batchQueue, searchTaskRequest, keywordId));
-//                    } else {
-//                        return Mono.just(ResponseResult.error("创建批次任务失败"));
-//                    }
                     if (taskResponse != null && taskResponse.getTask_id() != null) {
                         // 直接等待任务完成,不再保存任务关联信息
                         return waitForTaskCompletion(taskResponse.getTask_id(), batchQueue, searchTaskRequest, keywordId);
                     } else {
                         return Mono.just(ResponseResult.error("创建批次任务失败"));
+                    }
+                })
+                .doFinally(signal -> {
+                    // 任务完成时清理资源
+                    if (batchQueue.isEmpty()) {
+                        batchQueues.remove(keywordId);
                     }
                 });
     }
@@ -217,6 +233,11 @@
         // 查询任务状态
         return getTaskStatus(taskId)
                 .flatMap(statusResponse -> {
+                    // 检查任务是否被取消
+                    if ("cancelled".equalsIgnoreCase(statusResponse.getStatus())) {
+                        batchQueues.remove(keywordId); // 清理资源
+                        return Mono.just(ResponseResult.success("任务已被取消"));
+                    }
                     // 如果任务状态是"submitted"或"running",继续轮询
                     if (!"completed".equalsIgnoreCase(statusResponse.getStatus()) && !"failed".equalsIgnoreCase(statusResponse.getStatus()) && !"cancelled".equalsIgnoreCase(statusResponse.getStatus()) ) {
                         return Mono.delay(Duration.ofSeconds(5))  // 延迟 5 秒后再次查询
@@ -478,6 +499,73 @@
     @PostMapping("/cancel/{keywordId}")
     @ApiOperation(value = "取消任务")
     public Mono<ResponseResult<TaskCancelResponse>> cancelTask(@PathVariable Integer keywordId) {
+        // 1. 从主队列移除任务
+        List<SearchTaskRequest> removedMainQueueTasks = removeTasksFromQueueByKeywordId(keywordId);
+        int removedMainQueueCount = removedMainQueueTasks.size(); // 获取移除的任务数量
+
+        // 2. 从批次队列移除任务 (新增逻辑)
+        int removedBatchQueue = removeBatchTasksByKeywordId(keywordId);
+
+        // 3. 查询所有与关键词相关的任务
+        List<KeywordTask> tasks = keywordTaskService.list(
+                new LambdaQueryWrapper<KeywordTask>().eq(KeywordTask::getKeyword_id, keywordId)
+        );
+
+        // 4. 筛选出需要远程取消的任务
+        List<KeywordTask> tasksToCancelRemotely = tasks.stream()
+                .filter(task -> task.getTask_id() != null && "pending".equalsIgnoreCase(task.getStatus()))
+                .collect(Collectors.toList());
+
+        return Flux.fromIterable(tasksToCancelRemotely)
+                .flatMap(task -> {
+                    // 创建状态更新和远程取消的组合操作
+                    Mono<Void> updateStatus = updateTaskStatus(task.getTask_id(), "canceled");
+                    Mono<ResponseResult<?>> cancelOp = cancelRemoteTask(task.getTask_id())
+                            .onErrorResume(e -> {
+                                log.error("取消任务 {} 失败: {}", task.getTask_id(), e.getMessage());
+                                return Mono.just(ResponseResult.error("取消任务失败: " + e.getMessage()));
+                            });
+
+                    return Mono.zip(cancelOp, updateStatus)
+                            .thenReturn(true);
+                }, 10)
+                .collectList()
+                .flatMap(canceledTasks -> {
+                    return updateKeywordAndOrderStatus(keywordId)
+                            .thenReturn(ResponseResult.success(
+                                    new TaskCancelResponse(
+                                            String.format("任务已取消: 主队列移除%d, 批次队列移除%d, 远程取消%d",
+                                                    removedMainQueueCount ,
+                                                    removedBatchQueue,
+                                                    tasksToCancelRemotely.size())
+                                    )
+                            ));
+                });
+    }
+
+    // 新增方法:移除批次队列
+    private int removeBatchTasksByKeywordId(Integer keywordId) {
+        Queue<List<UserDto>> batchQueue = batchQueues.remove(keywordId);
+        if (batchQueue != null) {
+            int count = batchQueue.size();
+            batchQueue.clear();
+            log.info("从批次队列中移除关键词 {} 的 {} 个批次任务", keywordId, count);
+            return count;
+        }
+        return 0;
+    }
+    // 辅助方法:获取待取消任务
+    private List<KeywordTask> getTasksToCancel(Integer keywordId) {
+        return keywordTaskService.list(
+                new LambdaQueryWrapper<KeywordTask>()
+                        .eq(KeywordTask::getKeyword_id, keywordId)
+                        .isNotNull(KeywordTask::getTask_id)
+                        .eq(KeywordTask::getStatus, "pending")
+        );
+    }
+    /* @PostMapping("/cancel/{keywordId}")
+    @ApiOperation(value = "取消任务")
+    public Mono<ResponseResult<TaskCancelResponse>> cancelTask(@PathVariable Integer keywordId) {
         // 1. 查询所有与关键词相关的任务
         List<KeywordTask> tasks = keywordTaskService.list(
                 new LambdaQueryWrapper<KeywordTask>().eq(KeywordTask::getKeyword_id, keywordId)
@@ -507,77 +595,80 @@
                             .thenReturn(true);
                 }, 10) // 设置10的并发度
                 .collectList()
-                .thenReturn(ResponseResult.success(
-                        new TaskCancelResponse(
-                                String.format("关键词任务已取消,队列中移除 %d 个任务,远程取消 %d 个任务",
-                                        removedQueueTasks.size(),
-                                        tasksToCancelRemotely.size())
-                        )
-                ))
+                .flatMap(canceledTasks -> {
+                    // 5. 更新关键词和订单状态
+                    return updateKeywordAndOrderStatus(keywordId)
+                            .thenReturn(ResponseResult.success(
+                                    new TaskCancelResponse(
+                                            String.format("关键词任务已取消,队列中移除 %d 个任务,远程取消 %d 个任务",
+                                                    removedQueueTasks.size(),
+                                                    tasksToCancelRemotely.size())
+                                    )
+                            ));
+                })
                 .onErrorResume(e -> {
                     log.error("取消关键词任务失败: {}", e.getMessage());
                     return Mono.just(ResponseResult.error(500, "取消关键词任务失败: " + e.getMessage()));
                 });
-    }
+    }*/
+    // 提取关键词和订单状态更新的逻辑为单独方法
+    private Mono<Void> updateKeywordAndOrderStatus(Integer keywordId) {
+        return Mono.fromRunnable(() -> {
+            try {
+                // 查询关键词
+                Keyword keyword = keywordService.getById(keywordId);
+                if (keyword == null) {
+                    log.warn("未找到关键词,keywordId: {}", keywordId);
+                    return;
+                }
 
-//    @PostMapping("/cancel/{keywordId}")
-//    @ApiOperation(value = "取消任务")
-//    public Mono<ResponseResult<TaskCancelResponse>> cancelTask(@PathVariable Integer keywordId) {
-//        // 1. 查询所有与关键词相关的任务
-//        List<KeywordTask> tasks = keywordTaskService.list(
-//                new LambdaQueryWrapper<KeywordTask>().eq(KeywordTask::getKeyword_id, keywordId)
-//        );
-//
-//        // 2. 从队列中移除所有相关任务
-//        List<SearchTaskRequest> removedQueueTasks = removeTasksFromQueueByKeywordId(keywordId);
-//
-//        // 3. 筛选出需要远程取消的任务(任务ID不为空且状态为pending)
-//        List<KeywordTask> tasksToCancelRemotely = tasks.stream()
-//                .filter(task -> task.getTask_id() != null && "pending".equalsIgnoreCase(task.getStatus()))
-//                .collect(Collectors.toList());
-//        // 检查是否有任务与关键词相关
-////        if (tasks.isEmpty()) {
-////            return Mono.just(ResponseResult.error("没有找到相关任务"));
-////        }
-//
-//        // 4. 对筛选出的任务发送远程取消请求
-//        List<Mono<ResponseResult<?>>> cancelRequests = tasksToCancelRemotely.stream()
-//                .map(task -> cancelRemoteTask(task.getTask_id())
-//                        .doOnSuccess(response -> {
-//                            // 更新任务状态为canceled
-//                            updateTaskStatus(task.getTask_id(), "canceled").subscribe();
-//                        })
-//                        .onErrorResume(e -> {
-//                            log.error("取消任务 {} 失败: {}", task.getTask_id(), e.getMessage());
-//                            // 即使取消失败,也尝试更新状态
-//                            updateTaskStatus(task.getTask_id(), "canceled").subscribe();
-//                            return Mono.just(ResponseResult.error("取消任务失败: " + e.getMessage()));
-//                        }))
-//                .collect(Collectors.toList());
-//
-//        // 5. 并行执行所有取消请求
-//        return Flux.fromIterable(cancelRequests)
-//                .concatMap(request -> request) // 顺序执行,而非并行
-//                .collectList()
-//                .thenReturn(ResponseResult.success(
-//                        new TaskCancelResponse(
-//                                String.format("关键词任务已取消,队列中移除 %d 个任务,远程取消 %d 个任务",
-//                                        removedQueueTasks.size(),
-//                                        tasksToCancelRemotely.size())
-//                        )
-//                ))
-//                .onErrorResume(e -> {
-//                    log.error("取消关键词任务失败: {}", e.getMessage());
-//                    return Mono.just(ResponseResult.error(500, "取消关键词任务失败: " + e.getMessage()));
-//                });
-//    }
-// 线程安全的队列移除方法
+                //把任务id为空的删除
+                LambdaUpdateWrapper<KeywordTask> updateWrapper = new LambdaUpdateWrapper<>();
+                updateWrapper.isNull(KeywordTask::getTask_id);
+                keywordTaskService.remove(updateWrapper);
+                // 查询该关键词下的所有任务
+                LambdaQueryWrapper<KeywordTask> keywordTaskWrapper = new LambdaQueryWrapper<>();
+                keywordTaskWrapper.eq(KeywordTask::getKeyword_id, keywordId);
+
+                List<KeywordTask> keywordTasks = keywordTaskService.list(keywordTaskWrapper);
+
+                // 更新关键词状态
+//                if (keywordTasks.stream().allMatch(task ->
+//                        "completed".equals(task.getStatus()) || "false".equals(task.getStatus()) || "canceled".equals(task.getStatus())
+//                )) {
+                    keyword.setStatus("canceled");
+                    keywordService.updateById(keyword);
+//                    log.info("关键词 {} 所有任务已完成,更新状态为 completed", keywordId);
+//                }
+
+                // 更新订单状态
+                String orderId = keyword.getOrder_id();
+                if (orderId != null && !orderId.isEmpty()) {
+                    // 查询订单下所有关键词
+                    LambdaQueryWrapper<Keyword> orderKeywordsWrapper = new LambdaQueryWrapper<>();
+                    orderKeywordsWrapper.eq(Keyword::getOrder_id, orderId);
+                    List<Keyword> orderKeywords = keywordService.list(orderKeywordsWrapper);
+
+                    // 所有关键词均已完成,则更新订单状态为3
+                    if (!orderKeywords.isEmpty() && orderKeywords.stream().allMatch(k ->
+                            "completed".equals(k.getStatus()) || "false".equals(k.getStatus()) || "canceled".equals(k.getStatus())
+                    )) {
+                        Orders orders = orderService.getById(orderId);
+                        if (orders != null) {
+                            orders.setStatus(3);
+                            orderService.updateById(orders);
+                            log.info("订单 {} 所有关键词已完成,更新状态为3", orderId);
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                log.error("更新关键词和订单状态失败: {}", e.getMessage(), e);
+            }
+        });
+    }
 private List<SearchTaskRequest> removeTasksFromQueueByKeywordId(Integer keywordId) {
     List<SearchTaskRequest> removedTasks = new ArrayList<>();
 
-    // 使用锁保证队列操作的原子性
-    queueLock.lock();
-    try {
         Iterator<SearchTaskRequest> iterator = taskQueue.iterator();
         while (iterator.hasNext()) {
             SearchTaskRequest task = iterator.next();
@@ -586,31 +677,13 @@
                 iterator.remove();
             }
         }
-    } finally {
-        queueLock.unlock();
-    }
+
 
     log.info("从队列中移除了 {} 个与关键词ID {} 相关的任务", removedTasks.size(), keywordId);
     return removedTasks;
 }
 
-    // 从队列中移除所有关键词ID匹配的任务
-//    private List<SearchTaskRequest> removeTasksFromQueueByKeywordId(Integer keywordId) {
-//        List<SearchTaskRequest> removedTasks = new ArrayList<>();
-//
-//        // 使用迭代器安全地移除元素
-//        Iterator<SearchTaskRequest> iterator = taskQueue.iterator();
-//        while (iterator.hasNext()) {
-//            SearchTaskRequest task = iterator.next();
-//            if (task.getKeyword_id() != null && task.getKeyword_id().equals(keywordId)) {
-//                removedTasks.add(task);
-//                iterator.remove();
-//            }
-//        }
-//
-//        log.info("从队列中移除了 {} 个与关键词ID {} 相关的任务", removedTasks.size(), keywordId);
-//        return removedTasks;
-//    }
+
     // 发送远程取消请求
 // 发送远程取消请求(使用Java 8兼容的Map创建方式)
     private Mono<ResponseResult<?>> cancelRemoteTask(String taskId) {
@@ -638,53 +711,6 @@
             keywordTaskService.update(updateWrapper);
         }).subscribeOn(Schedulers.boundedElastic()).then();
     }
- /*   @PostMapping("/cancel/{taskId}")
-    @ApiOperation(value = "取消任务")
-    public Mono<ResponseResult<TaskCancelResponse>> cancelTask(@PathVariable String taskId) {
-        return webClient.post()
-                .uri(baseUrl + "/api/v1/tasks/" + taskId + "/cancel")
-                .contentType(MediaType.APPLICATION_JSON)
-                .bodyValue(Collections.emptyMap()) // 添加空请求体
-                .retrieve()
-                .onStatus(HttpStatus::isError, response -> response.bodyToMono(TaskCancelResponse.class)
-                        .flatMap(errorBody -> Mono.error(new RuntimeException(errorBody.getDetail()))))
-                .bodyToMono(TaskCancelResponse.class)
-                .flatMap(cancelResponse -> {
-                    // 更新关键词状态
-                    Mono<Void> updateKeyword = Mono.fromRunnable(() -> {
-                                LambdaUpdateWrapper<Keyword> updateWrapper = new LambdaUpdateWrapper<>();
-                                updateWrapper.eq(Keyword::getTask_id, taskId);
-                                updateWrapper.set(Keyword::getStatus, "canceled"); // 统一使用"canceled"
-                                keywordService.update(updateWrapper);
-                            })
-                            .subscribeOn(Schedulers.boundedElastic())
-                            .then();
-
-                    // 更新关键词任务状态
-                    Mono<Void> updateKeywordTask = Mono.fromRunnable(() -> {
-                                LambdaUpdateWrapper<KeywordTask> updateWrapper = new LambdaUpdateWrapper<>();
-                                updateWrapper.eq(KeywordTask::getTask_id, taskId);
-                                updateWrapper.set(KeywordTask::getStatus, "canceled"); // 统一使用"canceled"
-                                keywordTaskService.update(updateWrapper);
-                            })
-                            .subscribeOn(Schedulers.boundedElastic())
-                            .then();
-
-                    // 并行执行两个更新操作,并在完成后返回cancelResponse
-                    return Mono.when(updateKeyword, updateKeywordTask)
-                            .thenReturn(cancelResponse);
-                })
-                .map(data -> ResponseResult.success(data))
-                .onErrorResume(e -> {
-                    if (e.getMessage().contains("任务不存在")) {
-                        return Mono.just(ResponseResult.error(200, e.getMessage()));
-                    } else if (e.getMessage().contains("无法取消")) {
-                        return Mono.just(ResponseResult.error(200, e.getMessage()));
-                    }
-                    return Mono.just(ResponseResult.error(500,  e.getMessage()));
-                });
-    }*/
-
     @ApiOperation(value = "获取任务结果")
     @GetMapping("/tasks/{taskId}")
     public Mono<TaskResultResponse> getTaskResult(@PathVariable String taskId) {
@@ -839,8 +865,8 @@
                 LambdaQueryWrapper<KeywordTask> keywordTaskWrapper = new LambdaQueryWrapper<>();
                 keywordTaskWrapper.eq(KeywordTask::getTask_id, result.getTask_id());
                 KeywordTask keywordTask = keywordTaskService.getOne(keywordTaskWrapper);
-//                keywordTask.setStatus("completed");
-//                keywordTaskService.updateById(keywordTask);
+                keywordTask.setStatus("completed");
+                keywordTaskService.updateById(keywordTask);
                 Keyword keyword = keywordService.getById(keywordTask.getKeyword_id());
 
                 if (keyword == null) {
@@ -854,16 +880,16 @@
                 List<KeywordTask> keywordTasks = keywordTaskService.list(keywordTaskWrapper2);
 
                 //如果全部为completed  关键词也为completed  ,如果关联关系没有任务id,或者状态为running  ,关键词为submitted,
-                if (keywordTasks.stream().allMatch(task -> "completed".equals(task.getStatus())) ) {
+                if (keywordTasks.stream().allMatch(task -> "completed".equals(task.getStatus()) || "false".equals(task.getStatus()) || "canceled".equals(task.getStatus())) ) {
                     keyword.setStatus("completed");
                     keywordService.updateById(keyword);
 
                 }
                 //如果有一个task为failed设置关键词为false
-                else if (keywordTasks.stream().anyMatch(task -> "failed".equals(task.getStatus()))) {
-                    keyword.setStatus("false");
-                    keywordService.updateById(keyword);
-                }
+//                else if (keywordTasks.stream().anyMatch(task -> "failed".equals(task.getStatus()))) {
+//                    keyword.setStatus("false");
+//                    keywordService.updateById(keyword);
+//                }
 
 
 

--
Gitblit v1.7.1