| | |
| | | private UserService userService; |
| | | @Autowired |
| | | private OrderService orderService; |
| | | // 1. 使用线程安全的队列实现 |
| | | private final BlockingQueue<SearchTaskRequest> taskQueue = new LinkedBlockingQueue<>(); |
| | | // 添加队列访问锁 |
| | | private final ReentrantLock queueLock = new ReentrantLock(); |
| | | |
| | | private static final Queue<SearchTaskRequest> taskQueue = new LinkedList<>(); |
| | | |
| | | private static boolean isProcessing = false; |
| | | |
| | | @PostMapping("/search") |
| | |
| | | } |
| | | |
| | | // 将新的任务请求加入队列 |
| | | // taskQueue.add(searchTaskRequest); |
| | | queueLock.lock(); |
| | | try { |
| | | taskQueue.add(searchTaskRequest); |
| | | } finally { |
| | | queueLock.unlock(); |
| | | } |
| | | taskQueue.add(searchTaskRequest); |
| | | |
| | | // 如果当前没有任务在处理中,则启动任务队列的处理 |
| | | if (!isProcessing) { |
| | |
| | | // return Mono.just(ResponseResult.error(500, "取消关键词任务失败: " + e.getMessage())); |
| | | // }); |
| | | // } |
| | | // 线程安全的队列移除方法 |
| | | |
| | | private List<SearchTaskRequest> removeTasksFromQueueByKeywordId(Integer keywordId) { |
| | | List<SearchTaskRequest> removedTasks = new ArrayList<>(); |
| | | |
| | | // 使用锁保证队列操作的原子性 |
| | | queueLock.lock(); |
| | | try { |
| | | Iterator<SearchTaskRequest> iterator = taskQueue.iterator(); |
| | | while (iterator.hasNext()) { |
| | | SearchTaskRequest task = iterator.next(); |
| | |
| | | iterator.remove(); |
| | | } |
| | | } |
| | | } finally { |
| | | queueLock.unlock(); |
| | | } |
| | | |
| | | |
| | | log.info("从队列中移除了 {} 个与关键词ID {} 相关的任务", removedTasks.size(), keywordId); |
| | | return removedTasks; |
| | |
| | | LambdaQueryWrapper<KeywordTask> keywordTaskWrapper = new LambdaQueryWrapper<>(); |
| | | keywordTaskWrapper.eq(KeywordTask::getTask_id, result.getTask_id()); |
| | | KeywordTask keywordTask = keywordTaskService.getOne(keywordTaskWrapper); |
| | | // keywordTask.setStatus("completed"); |
| | | // keywordTaskService.updateById(keywordTask); |
| | | keywordTask.setStatus("completed"); |
| | | keywordTaskService.updateById(keywordTask); |
| | | Keyword keyword = keywordService.getById(keywordTask.getKeyword_id()); |
| | | |
| | | if (keyword == null) { |
| | |
| | | List<KeywordTask> keywordTasks = keywordTaskService.list(keywordTaskWrapper2); |
| | | |
| | | //如果全部为completed 关键词也为completed ,如果关联关系没有任务id,或者状态为running ,关键词为submitted, |
| | | if (keywordTasks.stream().allMatch(task -> "completed".equals(task.getStatus())) ) { |
| | | if (keywordTasks.stream().allMatch(task -> "completed".equals(task.getStatus()) || "false".equals(task.getStatus()) || "canceled".equals(task.getStatus())) ) { |
| | | keyword.setStatus("completed"); |
| | | keywordService.updateById(keyword); |
| | | |
| | | } |
| | | //如果有一个task为failed设置关键词为false |
| | | else if (keywordTasks.stream().anyMatch(task -> "failed".equals(task.getStatus()))) { |
| | | keyword.setStatus("false"); |
| | | keywordService.updateById(keyword); |
| | | } |
| | | // else if (keywordTasks.stream().anyMatch(task -> "failed".equals(task.getStatus()))) { |
| | | // keyword.setStatus("false"); |
| | | // keywordService.updateById(keyword); |
| | | // } |
| | | |
| | | |
| | | |