xuhy
2025-04-30 5802305b3a0bbf23715aa7a8ccbadde86992c864
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
package com.ruoyi.jianguan.pool;
 
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
 
import java.util.concurrent.*;
 
/**
 * 线程池使用示例及实践
 * execute:适用于无返回值的任务执行,如果需要返回值需要自己传入FutureTask,在子线程中抛出异常,在主线程捕捉不到
 * submit:适用于有返回值的任务执行,不会直接抛出异常,会将异常捕获到FutureTask中,需要通过future.get()获取
 *
 * @author Devil
 * @version 1.0
 * @date 2025/4/12 21:42
 */
@Slf4j
public class ThreadPoolDemo {
 
    /**
     * 创建并演示使用单个线程的线程池
     * 特点:
     * 1. 保证所有任务顺序执行
     * 2. 使用无界队列(LinkedBlockingQueue),需注意可能的内存溢出问题
     * 3. 适用于需要保证任务顺序执行的场景
     */
    @SneakyThrows
    public static void createSingleThreadPool() {
        // 创建单线程线程池(实际开发建议使用自定义ThreadPoolExecutor)
        ExecutorService executorService = Executors.newSingleThreadExecutor();
 
        try {
            log.info("\n================ 普通任务执行 ================");
            for (int i = 0; i < 10; i++) {
                executorService.execute(() -> log.info("线程:{},办理业务", Thread.currentThread().getName()));
            }
 
            TimeUnit.MILLISECONDS.sleep(2);
 
            log.info("\n================ 带返回值的任务执行 ================");
            for (int i = 0; i < 10; i++) {
                Future<String> future = executorService.submit(() -> {
                    log.info("线程:{},办理业务", Thread.currentThread().getName());
                    return "业务办理完成";
                });
                log.info(future.get());
            }
        } finally {
            // 优雅关闭线程池
            gracefulShutdown(executorService);
        }
    }
 
    /**
     * 创建固定大小的线程池
     * 特点:
     * 1. 固定核心线程数(=最大线程数)
     * 2. 使用无界队列(LinkedBlockingQueue),需注意系统资源消耗
     * 3. 适用于已知并发需求的稳定负载场景
     */
    @SneakyThrows
    public static void createFixedThreadPool() {
        // 创建固定大小线程池(建议根据CPU核心数设置)
        ExecutorService executorService = Executors.newFixedThreadPool(5);
 
        try {
            log.info("\n================ 普通任务执行 ================");
            for (int i = 0; i < 10; i++) {
                executorService.execute(() -> log.info("线程:{},办理业务", Thread.currentThread().getName()));
            }
 
            TimeUnit.MILLISECONDS.sleep(1);
 
            log.info("\n================ 带返回值的任务执行 ================");
            for (int i = 0; i < 10; i++) {
                Future<String> future = executorService.submit(() -> {
                    log.info("线程:{},办理业务", Thread.currentThread().getName());
                    return "业务办理完成";
                });
                log.info(future.get());
            }
        } finally {
            gracefulShutdown(executorService);
        }
    }
 
    /**
     * 创建可缓存线程池
     * 特点:
     * 1. 自动回收空闲线程(60秒)
     * 2. 理论上可以创建Integer.MAX_VALUE个线程,需注意线程爆炸问题
     * 3. 适用于短期异步任务或低负载场景
     */
    @SneakyThrows
    public static void createCachedThreadPool() {
        // 创建弹性线程池(慎用,可能产生大量线程)
        ExecutorService executorService = Executors.newCachedThreadPool();
 
        try {
            log.info("\n================ 普通任务执行 ================");
            for (int i = 0; i < 10; i++) {
                executorService.execute(() -> log.info("线程:{},办理业务", Thread.currentThread().getName()));
            }
 
            TimeUnit.MILLISECONDS.sleep(1);
 
            log.info("\n================ 带返回值的任务执行 ================");
            for (int i = 0; i < 10; i++) {
                Future<String> future = executorService.submit(() -> {
                    log.info("线程:{},办理业务", Thread.currentThread().getName());
                    return "业务办理完成";
                });
                log.info(future.get());
            }
        } finally {
            gracefulShutdown(executorService);
        }
    }
 
    /**
     * 创建定时任务线程池
     * 特点:
     * 1. 支持定时及周期性任务
     * 2. 核心线程数固定,但可以不断创建新线程执行后续任务
     * 3. 适用于需要定时执行或周期性执行的场景
     */
    @SneakyThrows
    public static void createScheduledThreadPool() {
        // 创建定时任务线程池
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
 
        try {
            log.info("\n================ 延迟任务执行 ================");
            for (int i = 0; i < 10; i++) {
                scheduledExecutorService.schedule(() ->
                                log.info("线程:{},办理延迟业务", Thread.currentThread().getName()),
                        1, TimeUnit.SECONDS);
            }
 
            TimeUnit.SECONDS.sleep(2);
 
            log.info("\n================ 带返回值的延迟任务执行 ================");
            for (int i = 0; i < 10; i++) {
                ScheduledFuture<String> future = scheduledExecutorService.schedule(() -> {
                    log.info("线程:{},办理延迟业务", Thread.currentThread().getName());
                    return "延迟业务办理完成";
                }, 1, TimeUnit.SECONDS);
                log.info(future.get());
            }
        } finally {
            gracefulShutdown(scheduledExecutorService);
        }
    }
 
    /**
     * 创建自定义线程池
     * 特点:
     * 1. 支持定时及周期性任务
     * 2. 核心线程数固定,但可以不断创建新线程执行后续任务
     * 3. 适用于需要定时执行或周期性执行的场景
     */
    @SneakyThrows
    public static void createCustomThreadPool() {
        /*
          创建自定义线程池
          字段:
          1. corePoolSize:核心线程池数量
          2. maximumPoolSize: 最大线程池数量
          3. keepAliveTime: 线程空闲时间
          4. unit: 时间单位
          5. workQueue: 阻塞队列
          5. threadFactory: 线程工厂
          5. handler: 拒绝策略
         */
        ThreadPoolExecutor customthreadPoolExecutor = new ThreadPoolExecutor(
                5,                                              // 根据CPU核心数设置
                10,                                                         // 最大应急线程数
                30, TimeUnit.SECONDS,                                       // 空闲线程存活时间
                new ArrayBlockingQueue<>(100),                      // 有界队列防止内存溢出
                new DefaultThreadFactory("custom-thread-pool"),   // 自定义线程命名
                new ThreadPoolExecutor.CallerRunsPolicy()                   // 拒绝策略
        );
 
        try {
            log.info("\n================ 普通任务执行 ================");
            for (int i = 0; i < 20; i++) {
                customthreadPoolExecutor.execute(() -> log.info("线程:{},办理业务", Thread.currentThread().getName()));
            }
 
            TimeUnit.MILLISECONDS.sleep(1);
 
            log.info("\n================ 带返回值的任务执行 ================");
            for (int i = 0; i < 10; i++) {
                Future<String> future = customthreadPoolExecutor.submit(() -> {
                    log.info("线程:{},办理业务", Thread.currentThread().getName());
                    return "业务办理完成";
                });
                log.info(future.get());
            }
        } finally {
            gracefulShutdown(customthreadPoolExecutor);
        }
    }
 
    /**
     * 优雅关闭线程池通用方法
     *
     * @param pool 需要关闭的线程池
     */
    private static void gracefulShutdown(ExecutorService pool) {
        pool.shutdown(); // 拒绝新任务提交
        try {
            // 等待现有任务完成
            if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                pool.shutdownNow(); // 取消等待中的任务 只等待运行中的任务
                // 再次等待任务响应中断
                if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                    log.error("线程池未完全关闭");
                }
            }
        } catch (InterruptedException e) {
            // 重新尝试关闭
            pool.shutdownNow();
            Thread.currentThread().interrupt();
        } finally {
            log.info("线程池是否执行完成:{}", pool.isTerminated());
        }
    }
 
 
    public static void main(String[] args) {
        // 测试不同线程池(选择其中一个执行)
//        createSingleThreadPool();
//        createFixedThreadPool();
//        createCachedThreadPool();
//        createScheduledThreadPool();
        createCustomThreadPool();
    }
}