并发同步工具实战指南
并发同步工具概述
在多线程编程中,经常需要协调多个线程之间的执行顺序和资源访问。JUC包提供了三个强大的同步辅助工具:CountDownLatch、CyclicBarrier和Semaphore,它们各有特点,适用于不同的并发场景。
graph TB
A[并发同步工具] --> B[CountDownLatch]
A --> C[CyclicBarrier]
A --> D[Semaphore]
B --> B1[一次性倒计时]
B --> B2[等待多任务完成]
B --> B3[主线程等待模式]
C --> C1[可重用屏障]
C --> C2[多线程互等]
C --> C3[分阶段执行]
D --> D1[许可证管理]
D --> D2[资源数量控制]
D --> D3[限流场景]
style A fill:#e1f5fe
style B fill:#fff9c4
style C fill:#f0f4c3
style D fill:#ffe0b2CountDownLatch倒计时门闩
核心机制
CountDownLatch(倒计时门闩)是一个同步计数器,它允许一个或多个线程等待其他线程完成一组操作后再继续执行。其工作原理类似于倒计时:
- 创建时指定一个计数值
- 每当一个任务完成,计数器减1(调用
countDown()) - 等待的线程会阻塞直到计数器归零(调用
await())
关键特点:
- 一次性使用:计数器降为0后不能重置,对象就废弃了
- 主从模式:通常用于主线程等待多个工作线程完成
- 非阻塞释放:调用
countDown()的线程不会阻塞
graph LR
A[CountDownLatch<br/>count=3] --> B[任务1完成<br/>countDown]
B --> C[count=2]
C --> D[任务2完成<br/>countDown]
D --> E[count=1]
E --> F[任务3完成<br/>countDown]
F --> G[count=0]
G --> H[等待线程<br/>被唤醒]
style A fill:#ffccbc
style G fill:#c8e6c9
style H fill:#81c784实战案例:应用启动检查
在应用启动时,通常需要初始化多个组件,所有组件都准备就绪后才能对外提供服务:
public class ApplicationBootstrap {
public static void main(String[] args) throws InterruptedException {
int componentCount = 5;
CountDownLatch startupLatch = new CountDownLatch(componentCount);
// 初始化数据库连接池
new Thread(() -> {
System.out.println("正在初始化数据库连接池...");
try {
Thread.sleep(2000);
System.out.println("数据库连接池初始化完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
startupLatch.countDown();
}
}, "DB-Init").start();
// 初始化缓存系统
new Thread(() -> {
System.out.println("正在初始化缓存系统...");
try {
Thread.sleep(1500);
System.out.println("缓存系统初始化完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
startupLatch.countDown();
}
}, "Cache-Init").start();
// 加载配置文件
new Thread(() -> {
System.out.println("正在加载配置文件...");
try {
Thread.sleep(1000);
System.out.println("配置文件加载完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
startupLatch.countDown();
}
}, "Config-Init").start();
// 初始化消息队列
new Thread(() -> {
System.out.println("正在初始化消息队列...");
try {
Thread.sleep(1800);
System.out.println("消息队列初始化完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
startupLatch.countDown();
}
}, "MQ-Init").start();
// 初始化线程池
new Thread(() -> {
System.out.println("正在初始化线程池...");
try {
Thread.sleep(800);
System.out.println("线程池初始化完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
startupLatch.countDown();
}
}, "ThreadPool-Init").start();
// 主线程等待所有组件初始化完成
System.out.println("等待所有组件初始化完成...");
startupLatch.await();
System.out.println("=================================");
System.out.println("所有组件初始化完成,应用启动成功!");
System.out.println("=================================");
}
}CyclicBarrier循环屏障
核心机制
CyclicBarrier(循环屏障)让一组线程相互等待,直到所有线程都到达屏障点后再一起继续执行。它的特点是:
- 所有线程相互等待,而不是一个等多个
- 到达屏障点的线程会阻塞,直到最后一个线程到达
- 可以重复使用(Cyclic的含义)
- 可以指定屏障动作,在所有线程到达后优先执行
关键区别:
CountDownLatch:主从模式,一个等多个,一次性使用CyclicBarrier:平等模式,多个相互等,可重复使用
graph TB
subgraph 第一轮
A1[线程1到达] --> D1[等待]
A2[线程2到达] --> D1
A3[线程3到达] --> D1
D1 --> E1[全部到达]
E1 --> F1[执行屏障动作]
F1 --> G1[一起继续]
end
subgraph 第二轮
G1 --> A4[线程1到达]
G1 --> A5[线程2到达]
G1 --> A6[线程3到达]
A4 --> D2[等待]
A5 --> D2
A6 --> D2
D2 --> E2[全部到达]
E2 --> F2[执行屏障动作]
end
style E1 fill:#c8e6c9
style F1 fill:#81c784
style E2 fill:#c8e6c9
style F2 fill:#81c784实战案例:并行计算任务
模拟MapReduce场景,多个线程并行计算,在屏障点汇总结果:
public class ParallelComputeDemo {
public static void main(String[] args) {
int workerCount = 4;
ConcurrentHashMap<String, Integer> resultMap = new ConcurrentHashMap<>();
// 创建循环屏障,指定屏障动作
CyclicBarrier barrier = new CyclicBarrier(workerCount, () -> {
// 所有线程到达后,汇总结果
int totalSum = resultMap.values().stream()
.mapToInt(Integer::intValue)
.sum();
System.out.println("========== 第" + resultMap.size() + "轮计算完成 ==========");
System.out.println("本轮总和:" + totalSum);
System.out.println("====================================\n");
});
// 创建多个计算线程
for (int i = 0; i < workerCount; i++) {
final int workerId = i;
new Thread(() -> {
try {
// 模拟两轮计算
for (int round = 1; round <= 2; round++) {
// 执行计算任务
int result = compute(workerId, round);
resultMap.put("Worker-" + workerId + "-Round-" + round, result);
System.out.println(Thread.currentThread().getName() +
" 完成第" + round + "轮计算,结果:" + result);
// 到达屏障点,等待其他线程
barrier.await();
// 短暂休息后进行下一轮
Thread.sleep(500);
}
} catch (Exception e) {
e.printStackTrace();
}
}, "计算线程-" + i).start();
}
}
private static int compute(int workerId, int round) {
try {
// 模拟计算耗时
Thread.sleep((long) (Math.random() * 1000) + 500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return workerId * 10 + round;
}
}Semaphore信号量
核心机制
Semaphore(信号量)用于控制同时访问特定资源的线程数量,通过许可证(permit)机制实现:
- 创建时指定许可证数量
- 线程通过
acquire()获取许可证,如果没有可用许可证则阻塞 - 线程通过
release()释放许可证,供其他线程使用
应用场景:
- 限流控制:限制同时访问某个资源的线程数
- 连接池管理:控制数据库连接、HTTP连接等资源
- 并发度控制:限制并发执行的任务数量
graph TB
A[Semaphore<br/>permits=3] --> B[线程1获取许可]
B --> C[剩余2个许可]
C --> D[线程2获取许可]
D --> E[剩余1个许可]
E --> F[线程3获取许可]
F --> G[剩余0个许可]
G --> H[线程4等待]
G --> I[线程1释放许可]
I --> J[剩余1个许可]
J --> K[线程4获取许可]
style G fill:#ffcdd2
style H fill:#ff8a80
style J fill:#c8e6c9
style K fill:#81c784实战案例:接口限流
模拟限制同时访问某个接口的请求数量:
public class ApiRateLimiter {
// 限制同时只能有5个请求访问
private static final Semaphore semaphore = new Semaphore(5);
public static void main(String[] args) {
// 模拟20个并发请求
for (int i = 1; i <= 20; i++) {
final int requestId = i;
new Thread(() -> {
try {
// 尝试获取许可证
System.out.println("请求-" + requestId + " 尝试获取访问许可");
semaphore.acquire();
System.out.println(">>> 请求-" + requestId + " 获得许可,开始处理");
// 模拟接口处理
handleRequest(requestId);
System.out.println("<<< 请求-" + requestId + " 处理完成,释放许可");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放许可证
semaphore.release();
}
}, "请求线程-" + i).start();
// 控制请求速度
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static void handleRequest(int requestId) {
try {
// 模拟接口处理耗时
long processTime = (long) (Math.random() * 2000) + 1000;
System.out.println(" 请求-" + requestId + " 处理中,预计耗时 " +
processTime + "ms");
Thread.sleep(processTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}三者对比总结
| 特性 | CountDownLatch | CyclicBarrier | Semaphore |
|---|---|---|---|
| 核心用途 | 一个或多个线程等待其他线程完成 | 多个线程相互等待到达屏障点 | 控制同时访问资源的线程数 |
| 计数方向 | 减法计数(count down) | 加法计数(count up) | 加减计数(acquire/release) |
| 是否可重用 | 不可重用 | 可重用 | 可重用 |
| 阻塞特点 | await()阻塞,countDown()不阻塞 | await()阻塞所有线程 | acquire()可能阻塞 |
| 典型场景 | 应用启动、批量任务等待 | 并行计算、分阶段任务 | 限流、资源池管理 |
| 线程关系 | 主从关系(1对N) | 平等关系(N对N) | 竞争关系 |
graph TB
A{选择同步工具} --> B{是否需要等待多个任务完成?}
B -->|是| C{任务完成后需要重复执行?}
B -->|否| D[可能不需要这些工具]
C -->|否| E[CountDownLatch<br/>主线程等待工作线程]
C -->|是| F{是否需要所有线程相互等待?}
F -->|是| G[CyclicBarrier<br/>多线程互等可重用]
F -->|否| H{是否需要控制并发数?}
H -->|是| I[Semaphore<br/>许可证控制并发]
H -->|否| D
style E fill:#fff9c4
style G fill:#f0f4c3
style I fill:#ffe0b2线程调度技巧:Thread.sleep(0)
sleep(0)的作用
Thread.sleep(0)是一个特殊的用法,它并不是让线程休眠0毫秒,而是主动触发一次线程调度。
调用Thread.sleep(0)后:
- 当前线程主动释放CPU时间片
- 线程状态从RUNNING变为TIMED_WAITING(极短暂)
- 立即重新参与CPU时间片的竞争
- 给其他线程一个获得CPU的机会
应用场景
虽然这种用法比较少见,但在某些底层框架中会用到:
避免线程饥饿:当某个线程长时间占用CPU时,通过sleep(0)主动让出时间片,让其他线程有机会执行。
降低CPU占用:在循环等待某个条件时,避免空转消耗CPU。
// 不推荐:CPU空转
while (!condition) {
// 持续检查条件,CPU使用率100%
}
// 改进:主动让出CPU
while (!condition) {
Thread.sleep(0); // 给其他线程执行机会
}
// 最佳实践:使用wait/notify或并发工具线程调度优化:在一些对响应时间敏感但又需要公平性的场景,可以用来平衡线程调度。
需要注意的是,在现代JVM中,这种优化技巧的必要性已经大大降低,应该优先使用JUC提供的高级并发工具。
总结
并发同步工具为多线程协作提供了优雅的解决方案:
- CountDownLatch:适合"等待所有任务完成"的一次性场景
- CyclicBarrier:适合"分阶段并行执行"的可重复场景
- Semaphore:适合"资源数量控制"的限流场景
正确选择和使用这些工具,可以大大简化多线程编程的复杂度,提高代码的可读性和可维护性。在实际开发中,应该根据具体的业务需求选择合适的同步工具,而不是一味使用低级的wait/notify机制。
更新: 2025-12-04 17:36:24
原文: https://www.yuque.com/u22210564/zoxfmt/doc-07-14-29