guyue
16 小时以前 785faf8c5a86e612c6c76b798e253879c0a39827
获取结果状态更新
1个文件已修改
38 ■■■■■ 已修改文件
src/main/java/com/linghu/controller/CollectController.java 38 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/linghu/controller/CollectController.java
@@ -71,10 +71,9 @@
    private UserService userService;
    @Autowired
    private OrderService orderService;
    // 1. 使用线程安全的队列实现
    private final BlockingQueue<SearchTaskRequest> taskQueue = new LinkedBlockingQueue<>();
    // 添加队列访问锁
    private final ReentrantLock queueLock = new ReentrantLock();
    private static final Queue<SearchTaskRequest> taskQueue = new LinkedList<>();
    private static boolean isProcessing = false;
    @PostMapping("/search")
@@ -99,13 +98,7 @@
                    }
                    // 将新的任务请求加入队列
//                    taskQueue.add(searchTaskRequest);
                    queueLock.lock();
                    try {
                        taskQueue.add(searchTaskRequest);
                    } finally {
                        queueLock.unlock();
                    }
                    taskQueue.add(searchTaskRequest);
                    // 如果当前没有任务在处理中,则启动任务队列的处理
                    if (!isProcessing) {
@@ -571,13 +564,10 @@
//                    return Mono.just(ResponseResult.error(500, "取消关键词任务失败: " + e.getMessage()));
//                });
//    }
// 线程安全的队列移除方法
private List<SearchTaskRequest> removeTasksFromQueueByKeywordId(Integer keywordId) {
    List<SearchTaskRequest> removedTasks = new ArrayList<>();
    // 使用锁保证队列操作的原子性
    queueLock.lock();
    try {
        Iterator<SearchTaskRequest> iterator = taskQueue.iterator();
        while (iterator.hasNext()) {
            SearchTaskRequest task = iterator.next();
@@ -586,9 +576,7 @@
                iterator.remove();
            }
        }
    } finally {
        queueLock.unlock();
    }
    log.info("从队列中移除了 {} 个与关键词ID {} 相关的任务", removedTasks.size(), keywordId);
    return removedTasks;
@@ -839,8 +827,8 @@
                LambdaQueryWrapper<KeywordTask> keywordTaskWrapper = new LambdaQueryWrapper<>();
                keywordTaskWrapper.eq(KeywordTask::getTask_id, result.getTask_id());
                KeywordTask keywordTask = keywordTaskService.getOne(keywordTaskWrapper);
//                keywordTask.setStatus("completed");
//                keywordTaskService.updateById(keywordTask);
                keywordTask.setStatus("completed");
                keywordTaskService.updateById(keywordTask);
                Keyword keyword = keywordService.getById(keywordTask.getKeyword_id());
                if (keyword == null) {
@@ -854,16 +842,16 @@
                List<KeywordTask> keywordTasks = keywordTaskService.list(keywordTaskWrapper2);
                //如果全部为completed  关键词也为completed  ,如果关联关系没有任务id,或者状态为running  ,关键词为submitted,
                if (keywordTasks.stream().allMatch(task -> "completed".equals(task.getStatus())) ) {
                if (keywordTasks.stream().allMatch(task -> "completed".equals(task.getStatus()) || "false".equals(task.getStatus()) || "canceled".equals(task.getStatus())) ) {
                    keyword.setStatus("completed");
                    keywordService.updateById(keyword);
                }
                //如果有一个task为failed设置关键词为false
                else if (keywordTasks.stream().anyMatch(task -> "failed".equals(task.getStatus()))) {
                    keyword.setStatus("false");
                    keywordService.updateById(keyword);
                }
//                else if (keywordTasks.stream().anyMatch(task -> "failed".equals(task.getStatus()))) {
//                    keyword.setStatus("false");
//                    keywordService.updateById(keyword);
//                }