Earth Guardian

You are not LATE!You are not EARLY!

0%

Java 多线程并发

线程常用 API

  • thread.start() :启动
  • thread.isAlive()
    判断当前线程是否处于活动状态
  • thread.sleep()
    在指定毫秒数内让当前正在执行的线程休眠
  • thread.getId():获取线程 ID
  • yield
    放弃当前的 CPU 资源,但放弃多长时间不确定
  • thread.interrupt()
    中断,仅仅是在该线程中打了一个停止的标记,并不是真的停止线程
  • thread.setPriority()
    设置优先级,并不会马上优先执行。线程优先级的代码影响较小,但是优先级高的确实会获取更多的 CPU 资源和执行速度
  • suspend()/resume
    暂停和恢复, 作废
  • stop
    停止线程,作废。暴力停止进程,会导致内存无法清理干净,锁的释放也会导致数据不一致
  • thread.wait()
    释放当前锁并等待,执行前必须确保已经获取该对象锁
  • thread.notify()/.notifyAll()
    也必须在同步代码块呢,通知该对象处于锁等待的线程/通知所有。注意:一次 notify 只能被一个处于 wait 的线程接受,即使该对象锁一直空闲,其他 wait 线程也收不到通知,除非使用 notifyAllnotify 后代码块并不会马上释放锁,而是需要将这个 synchronized 代码块执行完毕才会释放锁
  • thread.join
    等待指定线程对象销毁,但是当前线程会进入阻塞状态
  • Thread.currentThread()
    获得当前线程的引用

线程状态机

线程有六种状态:

1
2
3
4
5
6
7
8
9
// java.lang.Thread.java
public enum State {
NEW,
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED;
}

状态说明

  • NEW
    这种情况指的是,通过 New 关键字创建了 Thread 类(或其子类)的对象,该对象此时就处于 NEW 的状态
  • RUNNABLE
    线程正在正常运行中,当然可能会有某种耗时计算 I/O 等待的操作 CPU 时间片切换等,这个状态下发生的等待一般是其他系统资源, 而不是锁, Sleep
  • BLOCKED
    多个线程有同步操作的场景,等待获取同步锁。比如正在等待另一个线程的 synchronized 块的执行释放,也就是线程在等待进入临界区
  • WAITING
    线程拥有了某个锁之后调用了 wait 方法,等待 notify/notifyAll 后才能执行下一步操作。这里要区分 BLOCKEDWATING 的区别:一个是在临界点外面等待获取锁进入;一个是 wait 后等待 notify
  • TIMED_WAITING
    有时间限制的 WAITING
  • TERMINATED
    线程已经执行完毕了

状态转换图

0005-java-concurrent-api-status.png

线程几个特性

进程和线程

线程也被称为轻量级进程。CPU 的基本调度单位是线程而不是进程。同一个进程中所有线程共享进程的内存地址空间,因此这些线程都能访问相同的变量并在同一个堆上分配对象

并发执行顺序

在多线程中,线程的调用具有随机性。也就是 Thread.start() 的执行顺序并不是按照代码顺序来执行的,而是随机的。参考示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class TestThreadStart {

private static final int THREAD_COUNT = 20;

public static class MyThread extends Thread{
private int i = 0;

public MyThread(int i) {
super();
this.i = i;
}

@Override
public void run() {
System.out.println(i);
}
}

public static void main(String[] agrs) {
Thread[] threads = new Thread[THREAD_COUNT];
for (int i = 0; i < THREAD_COUNT; i++){
threads[i] = new MyThread(i);
threads[i].start();
}
}
}

执行结果是随机的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
xmt@server005:~/test/java$ java TestThreadStart
0
10
9
8
7
6
5
4
3
2
1
19
18
15
17
16
14
13
12
11

并发对共享变量的影响

根据 Java 内存模型,共享变量必须同时满足:原子性,可见性,有序性,或者满足先行发生原则,在并发情况下才是安全的,否则会出现数据异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class TestThreadConcurrent {

private static final int THREAD_COUNT = 5;

public static class RunnableNotSafe implements Runnable{
private int count = 5;

@Override
public void run() {
count--;
System.out.println("Thread name: " + Thread.currentThread().getName() + ", count = " + count);
}
}

public static class RunnableSafe implements Runnable{
private int count = 5;

// 使用了同步,确保数据一致
@Override
public synchronized void run() {
count--;
System.out.println("Thread name: " + Thread.currentThread().getName() + ", count = " + count);
}
}

public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[THREAD_COUNT];
Runnable notSafe = new RunnableNotSafe();
Runnable safe = new RunnableSafe();
// 并发时共享变量的数据异常
for (int i = 0; i < THREAD_COUNT; i++){
threads[i] = new Thread(notSafe, String.valueOf(i));
threads[i].start();
}

Thread.sleep(1000);

// 使用同步后,能正常输出
for (int i = 0; i < THREAD_COUNT; i++){
threads[i] = new Thread(safe, String.valueOf((char)(i + 'A')));
threads[i].start();
}
}
}

测试结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
xmt@server005:~/test/java$ javac TestThreadConcurrent.java
xmt@server005:~/test/java$ java TestThreadConcurrent
// 并发时读取脏数据
Thread name: 0, count = 3
Thread name: 4, count = 0
Thread name: 3, count = 1
Thread name: 2, count = 2
Thread name: 1, count = 3
// 同步块输出的正确结果
Thread name: A, count = 4
Thread name: C, count = 3
Thread name: B, count = 2
Thread name: D, count = 1
Thread name: E, count = 0

从示例中,可以看到共享变量 RunnableNotSafe 实例中 count 的自减操作是复合运算,这段代码不具有原子性,而且也不满足先行发生原则,所以是并发不安全的。通过增加 synchronized 关键字修饰后,run() 这段代码满足了:原子性,可见性,有序性,所以并发时安全

线程名称 API

  • this.getName()
    当前线程的名称,即 new 出来的线程实例对应的名称
  • Thread.currentThread().getName()
    当前执行线程的名称
  • thread.setName()
    设置当前执行线程的名称

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class TestThreadName {

public static class ThreadName extends Thread{

public ThreadName(){
System.out.println("Constructor begin!");
System.out.println("Thread.currentThread().getName(): "
+ Thread.currentThread().getName());
System.out.println("this.getName(): " + this.getName());
System.out.println("Constructor end!");
}

@Override
public void run() {
System.out.println("Run begin!");
System.out.println("Thread.currentThread().getName(): "
+ Thread.currentThread().getName());
System.out.println("this.getName(): " + this.getName());
System.out.println("Run end!");
}
}

public static void main(String[] args){
ThreadName threadName = new ThreadName();
System.out.println("#####################");
Thread thread = new Thread(threadName);
thread.setName("Test");
thread.start();
}
}

执行结果:

1
2
3
4
5
6
7
8
9
10
11
xmt@server005:~/test/java$ javac TestThreadName.java
xmt@server005:~/test/java$ java TestThreadName
Constructor begin!
Thread.currentThread().getName(): main
this.getName(): Thread-0
Constructor end!
#####################
Run begin!
Thread.currentThread().getName(): Test
this.getName(): Thread-0
Run end!

程序解读:

  • ThreadName threadName = new ThreadName();
    new 的第一个线程:ThreadName 对象,该操作是在主线程中执行的,所以构造函数中 Thread.currentThread.getName 打印的是 main 主线程。而 threadName 对象也就是 this.getName 的值为默认初始值 Thread-0。输出:
1
2
3
4
Constructor begin!
Thread.currentThread().getName(): main
this.getName(): Thread-0
Constructor end!
  • Thread thread = new Thread(threadName);
    这里 new 了第二个线程,本示例比较特殊,仅仅是把 ThreadName 对象当做有名字的 Runnable 在使用。thread 对象名字默认值为 Thread-1
  • thread.setName("Test");
    修改 thread 对象的名字为 Test,所以该线程在执行时(Thread.currentThread().getName())对应的名字被修改为 Test
  • thread.start();
    thread 对象开始执行,上一步已经将名称修改为 Test,所以输出:
    Thread.currentThread().getName(): Test (如果没有上一步修改名称,此处输出 Thread-1
    因为是调用 threadName 对象的 run() 方法,所以 this.getName 表示 threadName 对象的名字,所以输出:
    this.getName(): Thread-0

yield

public static native void yield(); 放弃当前的 CPU 资源,但放弃多长时间不确定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static class RunnableYield implements Runnable{

@Override
public void run() {
long begin = System.currentTimeMillis();
int count = 0;
for (int i = 0; i < 100000; i++){
//Thread.yield();
count += (i + 1);
}
long end = System.currentTimeMillis();
System.out.println("Take time: " + (end - begin));
}
}

结果分析:如果注释掉 Thread.yield,整个程序执行时间只有 1 毫秒;但是执行 yield 后,执行时间并不固定,有时 15 毫秒,有时 17 毫秒等

线程优先级

Java 中线程的优先级有 10 个等级,优先级越高的线程得到的资源越多,CPU 优先执行优先级较高的线程对象中的任务。线程先级有如下几个特点:

  • 继承性
    如果 A 线程启动 B 线程,那么 AB 拥有相同的线程优先级
  • 规则性
    线程的优先级和代码执行顺序无关,并不表示优先级高先执行完了再执行优先级低的线程,他们是并发执行的,只是优先级高的可能会先执行完
  • 随机性
    优先级高的并不是每次都会先执行完,具有随机性,特别是优先级差别不大的时候

总结: 整体来说,线程优先级的代码影响较小,但是优先级高的确实会获取更多的 CPU 资源和执行速度

守护线程 Daemon

Java 线程有两种:用户线程和守护线程
守护线程是一种特殊的线程,它的特性有“陪伴”的意义,陪伴用户线程。当该用户线程不存在了,对应的守护线程也自动销毁。典型的守护线程就是垃圾回收线程(GC
定义:public final void setDaemon(boolean on),注意事项:

  • thread.setDaemon(true) 必须在 thread.start() 之前设置,否则会跑出一个 IllegalThreadStateException 异常。即:不能把正在运行的常规线程设置为守护线程
  • Daemon 线程中产生的新线程也是 Daemon 的,具有传递性
  • 守护线程做耗时操作时,一定要注意用户线程必须等待它结束后才推出。否则用户线程先退出,守护进程来不及执行完毕就会跟随销毁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static class ThreadDaemon extends Thread{
@Override
public void run() {
try {
int i = 0;
while (true) {
i++;
System.out.println(i);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

ThreadDaemon threadDaemon = new ThreadDaemon();
// 先设置守护线程,再开启线程
threadDaemon.setDaemon(true);
threadDaemon.start();

// 主线程等待守护线程执行一段时间
Thread.sleep(5000);
System.out.println("Main end, then thread daemon will be destroyed.");

线程停止

interrupt 方法

  • 功能
    调用 interrupt() 方法,并不能马上就将线程停止,仅仅是在该线程中打了一个停止的标记,并不是真的停止线程。
  • 有什么用
    既然不能直接停止线程,它的作用是什么呢?中断并不能直接终止另一个线程,而需要被中断的线程自己处理中断。

判断线程是否是中断状态

两种方法:

  • Thread.interrupted()

    1
    2
    3
    public static boolean interrupted() {
    return currentThread().isInterrupted(true);
    }

    它是 static 的,表示测试当前执行线程是否已经中断。同时,执行后当前执行线程的中断状态标志会被清除,也就是说连续调用两次,如果第一次返回 true,第二次因为前面被清除会返回 fasle

  • thread.isInterrupted()

    1
    2
    3
    public boolean isInterrupted() {
    return isInterrupted(false);
    }

    表示某个线程对象是否已经中断,不会清除中断状态标志。

interruptsleep 的影响

不管是线程先休眠后被打上停止标志,还是先被打上停止标志后执行休眠,都会触发异常 InterruptedException

stop() 方法

该方法在 API 中已经被废弃,属于暴力停止进程,同时会抛出 java.lang.ThreadDeath 异常。stop 会导致内存清理工作无法完成,并且对锁定的对象进行了解锁,导致数据无法同步处理,出现数据不一致的问题。

join 等待

等待指定线程对象销毁,但是当前线程会进入阻塞状态

原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public final void join() throws InterruptedException {
join(0);
}
public final synchronized void join(long millis){
...
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
...
}

从源代码可以看出 join 是一个同步方法,需要获取当前对象的锁,并且通过 wait 方法来实现延时,并释放锁。
线程对象 threadA.join 执行后,threadA 内的所有 synchronized 方法将都需要等待。或者说 threadA 中的同步方法执行后,join 将不会马上执行,需要等待获取同步锁,参考示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static class MyThread extends Thread{
public synchronized void syncMethod(){
try {
System.out.println("MyThread begin");
Thread.sleep(3000);
System.out.println("MyThread exit");
} catch (InterruptedException e){
e.printStackTrace();
}
}

@Override
public void run() {
syncMethod();
}
}

public static void main(String[] args) throws InterruptedException {
System.out.println("main begin");
MyThread myThread = new MyThread();
myThread.start();
myThread.join(1000);
System.out.println("main exit");
}

输出结果为:

1
2
3
4
main begin
MyThread begin
MyThread exit
main exit

也就是主线程虽然设置了,只等待 myThread 线程 1 秒,但是因为 myThread 中执行的是同步方法,所以 join 因为拿不到对象锁,只能直到同步方法执行完后才能执行 join 等待。但是 join 等待时会判断当前线程是否还存活,因为线程已经执行完退出,所以 join 并没有调用 wait 来等待。

join/sleep/yield 的区别

  • join(long) 使用了 synchronized 来同步,并通过 wait 实现延时,具有释放锁的特点
  • sleep(long) 使用的是 native 的方法实现的,并不会释放锁,而是一直阻塞到指定时间
  • yield() 仅仅是放弃当前的 CPU 资源,但放弃多长时间不确定,并不涉及到锁,也不会释放锁。执行后线程状态仍然是 RUNNABLE 状态

synchronized 关键字

可以在任意对象及方法上加锁,而加锁的这段代码被称为“互斥区”或“临界区”。在执行这段代码时上锁,执行完毕后释放锁;其他线程访问时需要先检查代码是否上锁,如果上锁则等待直到锁被释放。在 synchronized 方法或者代码块中的代码,是同步执行的;否则就是异步执行

锁对象

锁是针对这个对象的,比如对象有两部分代码都有 synchonized 修饰,只有当第一部分代码执行完释放锁后,第二部分代码才能执行。也就是同一个对象的同步代码,只能串行执行;同一时间只有一个线程能执行。注意:指的是同一个对象

修饰代码块

synchronized (this) 修饰代码块,即获取当前对象锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static class MyObject{
public void doLongTimeTask(){
for (int i = 0; i < 50; i++){
System.out.println("Thread Name: " + Thread.currentThread().getName()
+ ", i = " + i);
}

synchronized (this){
for (int j = 0; j < 50; j++){
System.out.println("Thread Name: " + Thread.currentThread().getName()
+ ", j = " + j);
}
}
}
}

MyObject 的同一个实例,在执行 doLongTimeTask 时, i 的输出是线程并发执行输出的,而 j 的输出这部分代码被修饰为同步,所以 j 只能同步打印。输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// i 的输出是并发的
Thread Name: A, i = 14
Thread Name: A, i = 15
Thread Name: B, i = 0
Thread Name: A, i = 16
Thread Name: B, i = 1
Thread Name: A, i = 17
Thread Name: B, i = 2
Thread Name: A, i = 18
Thread Name: B, i = 3

// j 的输出是串行输出,顺序执行
Thread Name: A, j = 47
Thread Name: A, j = 48
Thread Name: A, j = 49
Thread Name: B, j = 0
Thread Name: B, j = 1
Thread Name: B, j = 2
Thread Name: B, j = 3
  • synchronized (this)
    使用当前实例对象锁定
  • synchronized (otherObject)
    使用其他对象锁定,这是因为如果一个类中有多个 synchronized 代码段,在大量并发时,不管是否有前后逻辑关系,都需要等待当前对象释放锁才能执行下一个同步代码,所以可以通过锁其他对象来实现并行处理
  • synchronized(class)
    使用当前类锁定,锁定的是当前类,所以所有的 static 同步方法都共用同一个锁
  • synchronized(String)
    String 比较特殊,涉及到常量池特性,当锁定对象为常量池字符串时,实际上是同一个对象。见示例:
1
2
3
4
5
6
7
8
9
10
11
public void method(String value){
synchronized (value) {...}
}

// "AA" 为常量池字符串,所以是同一个对象,同步执行
method("AA");
method("AA");

// 显示 new 了两个不同的 String 对象,会并发执行
method(new String(AA));
method(new String(AA));
  • synchronized(wrapperclass)
    包装类需要特别注意自动装箱时:数字类型和字符型的包装类在特定范围内共用的是相同对象。char[\u0000, \u007f]:即 ASCII 码表中的 128 个字符;byte/short/int/long[-128, 128):即 -128 <= x < 128 范围内,共用相同的对象(缓存),示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void method(Object value){
synchronized (value) {...}
}

// 范围内自动装箱
Integer integer1 = 1;
Integer integer2 = 1;
// integer1 == integer2 is true, 即同一个对象,所以同步执行
method(integer1);
method(integer2);

// 范围外自动装箱
Long long1 = 128L;
Long long2 = 128L;
// long1 == long2 is false, 超出范围即不同对象,并发执行
method(long1);
method(long2);

修饰普通方法

synchronized 直接修饰方法,等同于 synchronized(this) 修饰方法的整段代码,是给当前实例对象上锁:

1
2
3
4
private static class MyObject{
public synchronized void methodA(){...}
public synchronized void methodB(){...}
}

MyObject 的同一个实例中,并发调用了 methodAmethodB,这两个方法只能串行执行。比如先执行完 methodA 再执行 methodB

修饰静态方法

synchronized 修饰静态方法,是给当前 Class 类上锁,等同于 synchronized(class),和给当前对象实例上锁是不一样的

1
2
3
4
5
6
7
8
private static class MyObject{
public synchronized static void methodA(){...}
public synchronized static void methodB(){...}
public synchronized void methodC(){...}
public void methodD(){
synchronized (MyObject.class){...}
}
}

其中:methodA, methodB, methodD 都是 MyObject 类锁,所以只能同步执行;但是 methodC 实例对象锁,是可以和他们并发执行的

锁重入

当一个线程获取对象的锁后,再没有释放的情况下可以再次获取这个对象锁,但是其他线程在锁没有释放的前提下是无法获取的。比如:

  • synchronized 修饰方法时的 methodA 中可以直接调用 methodB
  • synchronized(Object)
  • lock.lock()

同时,锁重入支持类继承

锁对象的改变

  • 锁对象属性的改变
    仅仅是属性的变化,锁对象并没有变,所以同步执行
  • 锁对象改变
    因为对象已经改变,所以锁的是不同对象,并发执行

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 同一个锁对象,仅仅属性 name/age 的改变,不会影响锁,同步执行
private static class CombinationObject{
private String name;
private int age;
public synchronized static void methodA(){...}
public synchronized static void methodB(){...}
}

// 锁对象已经改变,并发执行
private static class SimpleObject{
private String lock = "aaa";
public void method(){
synchronized (lock){
lock = "bbb";
...
}
}
}

volatile 关键字

关键字 volatile 可以理解为 Java 虚拟机提供的最轻量级同步机制。根据 Java 内存模型,每个线程都有自己的工作线程,会从共享内存中读取出共享变量到自己的线程中,这个时候对共享变量的修改涉及到可见性,否则会出现数据同步异常。但是 volatilte 无法保证操作的原子性,所以需要满足特定的规则才能保证线程安全

特性

  • 保证此变量对所有线程的可见性
  • 禁止指令重排序优化

规则

使用 volatile 保证并发安全需要符合如下两条规则:

  • 运算结果并不依赖变量的当前值,或者确保只有单一线程修改变量的值
  • 变量不需要与其他状态变量共同参与不变约束

示例

使用 volatile 修饰后,可以保证变量的可见性,在不同线程中修改,可以立即可见

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private static class VolatileThread extends Thread{
private volatile boolean isRunning = true;
public boolean isRunning(){
return isRunning;
}
public void setRunning(boolean running){
isRunning = running;
}
@Override
public void run() {
while (isRunning){}
}
}

// 主线程中修改 isRunning 的值,子线程中立即可见
public static void main(String[] args) throws InterruptedException {
VolatileThread volatileThread = new VolatileThread();
volatileThread.start();
Thread.sleep(500);
volatileThread.setRunning(false);
}

线程间通信

线程之间的通信机制有两种:共享内存和消息传递。
在共享内存的并发模型里,线程之间共享程序的公共状态,线程之间通过写-读内存中的公共状态来隐式进行通信。在消息传递的并发模型里,线程之间没有公共状态,线程之间必须通过明确的发送消息来显式进行通信。
Java 的并发采用的是共享内存模型,线程之间的通信总是隐式进行,整个通信过程对程序员完全透明。如果编写多线程程序不理解隐式进行的线程之间通信的工作机制,很可能会遇到各种奇怪的内存可见性问题。

等待/通知机制

方法:

  • thread.wait()
  • thread.notify()
  • thread.notifyAll()

注意事项:

  • 执行前必须已经获取该对象锁,否则会抛出 IllegalMonitorStateException,也就是需要在 synchronized 同步代码块中才能执行
  • wait 释放当前锁并等待
  • wait 处于该状态下的对象锁,如果主动 interrupt 后会抛出异常。所以 wait 语句需要 catch InterruptedException
  • wait(time) 操作 time 后自动唤醒继续执行
  • notify 通知该对象处于锁等待的线程。一次 notify 只能随机通知一个 wait 的对象,即使该对象锁一直空闲,其他 wait 对象也收不到通知,除非使用 notifyAll
  • notify 后代码块并不会马上释放锁,而是需要将这个 synchronized 代码块执行完毕才会释放锁
  • 代码编写时,确保 notifywait 之后执行,否则会陷于持续等待的过程

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class TestCommWaitNotify {
private static class ThreadA extends Thread{
private Object lock;
public ThreadA(Object lock) {
this.lock = lock;
}

@Override
public void run() {
synchronized (lock){
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

private static class ThreadB extends Thread{
private Object lock;
public ThreadB(Object lock) {
this.lock = lock;
}
@Override
public void run() {
synchronized (lock) {
lock.notify();
for (int i = 0; i < 5; i++){...}
}
}
}

public static void main(String[] args) throws InterruptedException {
Object object = new Object();
ThreadA threadA = new ThreadA(object);
threadA.setName("A");
threadA.start();

// 线程 A 先执行并处于等待过程
Thread.sleep(200);
// 线程 B 开始运行并通知
ThreadB threadB = new ThreadB(object);
threadB.setName("B");
threadB.start();
}
}

管道

Java 中提供了各种各样的输入/输出流 Stream,其中管道流 pipeStream 是一种特殊的柳,用于不同线程间直接传送数据。一个线程发送数据到输出管道,另一个线程从输入管道读取数据。通过管道实现线程间通信,不用借助任何临时文件。常见类:

  • PipedInputSteam/PipedOutputStream
    管道字节流
  • PipedReader/PipedWriter
    管道字符流

基于等待/通知机制的读写

读阻塞直到有数据写入才唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public synchronized int read()  throws IOException {
...
while (in < 0) {
...
/* might be a writer waiting */
notifyAll();
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
...
}

连接 connect

write/read 管道需要连接后,才能确保两个线程使用了相同管道

1
2
3
4
5
6
7
PipedOutputStream pipedOutputStream = new PipedOutputStream();
PipedInputStream pipedInputStream = new PipedInputStream();
pipedOutputStream.connect(pipedInputStream);

PipedWriter pipedWriter = new PipedWriter();
PipedReader pipedReader = new PipedReader();
pipedWriter.connect(pipedReader);

管道字节流通信示例

  • 初始化 PipedOutputStream/PipedInputStream 并连接
  • 开读线程进入读等待,开写线程写入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 读阻塞
byte[] bytes = new byte[20];
int readLength = inputStream.read(bytes);
while (readLength != -1){
String newData = new String(bytes, 0, readLength);
readLength = inputStream.read(bytes);
}
inputStream.close();

// 写数据
for (int i = 0; i < 300; i++){
String outData = "" + (i + 1);
outputStream.write(outData.getBytes());
}
outputStream.close();

// 读线程
ThreadRead threadRead = new ThreadRead(readData, pipedInputStream);
threadRead.start();

// 写线程
ThreadWrite threadWrite = new ThreadWrite(writeData, pipedOutputStream);
threadWrite.start();

管道字符流通信示例

流程和字节流通信一样,部分参考代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 写数据
for (int i = 0; i < 300; i++) {
String data = "" + (i + 1);
pipedWriter.write(data);
}
pipedWriter.close();

// 读等待
char[] chars = new char[20];
int readLength = pipedReader.read(chars);
while (readLength != -1){
String data = new String(chars, 0, readLength);
readLength = pipedReader.read(chars);
}
pipedReader.close();

// 读线程
ThreadRead threadRead = new ThreadRead(readDataCharacter, pipedReader);
threadRead.start();

// 写线程
ThreadWrite threadWrite = new ThreadWrite(writeDataCharacter, pipedWriter);
threadWrite.start();

ThreadLocalInheritableThreadLocal

基本方法

1
2
threadLocal.set(value);
threadLocal.get();

特点

  • ThreadLocal 共享变量,具有线程隔离性,每个线程存储的值都是私有的
  • InheritableThreadLocal 共享变量,会继承父线程设置的值

实现原理

  • 线程隔离性
    从源码可以看出,线程隔离是因为值存储通过 ThreadLocalMap 来实现的,key 就是当前执行线程,所以不同的线程对应的 value 将会不一样
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
  • 值继承
    主线程在 set 时,创建 t.inheritableThreadLocals,子线程在初始化时会拷贝一份主线程设置的值,所以在 get 时能看到值继承效果。但是子线程修改 InheritableThreadLocal 变量的值,主线程中并不会跟着改变
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}

void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}

private void init(...) {
...
if (parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
...
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private static ThreadLocal sThreadLocal = new ThreadLocal();
private static InheritableThreadLocal sInheritableThreadLocal = new InheritableThreadLocal();

private static class ThreadA extends Thread{
@Override
public void run() {
System.out.println("ThreadA begin: " + sThreadLocal.get());
sThreadLocal.set("It is ThreadA running");
System.out.println("ThreadA exit: " + sThreadLocal.get());
}
}

private static class ThreadB extends Thread{
@Override
public void run() {
System.out.println("ThreadB begin: " + sThreadLocal.get());
sThreadLocal.set("It is ThreadB running");
System.out.println("ThreadB exit: " + sThreadLocal.get());
}
}

private static class ThreadC extends Thread{
@Override
public void run() {
System.out.println("ThreadC begin: " + sInheritableThreadLocal.get() + ", inheritable from main");
sInheritableThreadLocal.set("It is ThreadC running");
System.out.println("ThreadC exit: " + sInheritableThreadLocal.get());
}
}

public static void main(String[] args) throws InterruptedException {
ThreadA threadA = new ThreadA();
threadA.start();
Thread.sleep(300);

ThreadB threadB = new ThreadB();
threadB.start();
Thread.sleep(300);

System.out.println("Main thread: begin set value");
sInheritableThreadLocal.set("It is main thread.");
System.out.println(sInheritableThreadLocal.get());
ThreadC threadC = new ThreadC();
threadC.start();
Thread.sleep(300);

System.out.println("Main thread: exit, get value");
System.out.println(sInheritableThreadLocal.get());
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ThreadA begin: null
ThreadA exit: It is ThreadA running
// 线程 A 和 B 对共享变量的修改隔离,不会相互影响
ThreadB begin: null
ThreadB exit: It is ThreadB running
// 主线程修改 InheritableThreadLocal 变量
Main thread: begin set value
It is main thread.
// 子线程继承值
ThreadC begin: It is main thread., inheritable from main
ThreadC exit: It is ThreadC running
Main thread: exit, get value
// 子线程对 InheritableThreadLocal 变量的修改,主线程并不会跟着改变
It is main thread.

lock

Lockjava.util.concurrent.locks 包下的接口,实现提供了比 synchronized 方法和语句可获得的更广泛的锁定操作,它能以更灵活的方式处理线程同步问题。它允许把锁定的实现作为 Java 类,而不是作为语言的特性来实现。

ReentrantLock

ReentrantLock 类实现了 Lock ,它拥有与 synchronized 相同的并发性和内存语义,但是添加了类似锁投票、定时锁等候和可中断锁等候的一些特性;它是一种独占锁。
初始化: private Lock lock = new ReentrantLock(); ,常见方法:

  • public void lock() {...}
    获取锁对象,类似 synchronized(object) ,必须用 try 包起来
  • public void unlock() {...}
    释放锁对象,必须在 finally 中执行,如果 try 后面有 catch 捕捉异常,则必须在 unlock 前增加是否 lock 的判断。否则释放锁时因为并没有持有锁抛出异常 IllegalMonitorStateException
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    try {
    lock.lock()
    Thread.sleep(200);
    ...
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    // 释放前先判断是否持有
    if (lock.isHeldByCurrentThread()) {
    lock.unlock();
    }
    }
  • public int getHoldCount() {...}
    当前线程持有锁的个数,也就是调用 lock 的次数
  • public final int getQueueLength() {...}
    获取等待锁释放的线程估计数,是一个估计值。即 5 个线程并发 lock 方法,1 个正在执行 lock 后的操作,那么返回值为 4 个在等待 lock 释放
  • public int getWaitQueueLength(Condition condition) {...}
    获取等待 condition.await 的线程估计数,也是一个估计值,必须在 lock.lock() 中执行,否则会抛出 IllegalMonitorStateException。即 5 个线程并发都执行了 condition.await ,返回值为 5
  • public final boolean hasQueuedThreads(Thread thread) {...}
    查询指定线程是否正在等待获取此锁定
  • public final boolean hasQueuedThreads() {...}
    查询是否有线程正在等待获取此锁定
  • public boolean hasWaiters(Condition condition) {...}
    查询是否有线程在的等待与此锁定有关的 condition
  • public final boolean isFair() {...}
    判断是不是公平锁
  • public boolean isHeldByCurrentThread() {...}
    查询当前线程是否持有此锁定
  • public boolean isLocked() {...}
    查询此锁定是否被任意线程持有
  • public void lockInterruptibly() throws InterruptedException{...}
    当前线程未被中断则获取此锁定,如果已经被中断则抛出异常
  • public boolean tryLock() {...}
    如果锁定没有被其他线程持有,才获取该锁定
  • public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException
    如果锁定在给定等待时间内没有被其他线程持有,且线程未被中断,则获取该锁定

Condition

Condition 替代了 Object 监视器方法的使用(wait/notify/notifyAll),同样代码必须在 lock.lock/lock.unlock 之间,类似 wait/notify 必须要在synchronized 代码段中执行。在 wait/notify/notifyAll 方法进行通知时,通知线程是有 JVM 随机选择的。但是 Condition 拥有更好的灵活性,可以实现多路通知,可以选择性进行线程通知,在调度线程上更加灵活。

功能类比:

  • condition.await --> object.wait
  • condition.signal --> object.notify
  • condition.signalAll --> object.notifyAll

常见方法:

  • void awaitUninterruptibly();
    不能被打断的等待,直到 signal 信号出现
  • boolean awaitUntil(Date deadline) throws InterruptedException;
    await 基础上加了个等待的时间限制

ReentrantLockCondition 结合示例

生产者消费者模型:

0005-java-concurrent-produce-consume.jpg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
private static final int NUM = 5;
private static class MyObject {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private boolean hasValue = false;

public void produce(){
try {
lock.lock();
while (hasValue){
condition.await();
}
System.out.println("produce: **");
hasValue = true;
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void consume(){
try {
lock.lock();
while (!hasValue){
condition.await();
}
System.out.println("consume: ##");
hasValue = false;
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

private static class ThreadProduce extends Thread{
private MyObject myObject;

public ThreadProduce(MyObject myObject) {
this.myObject = myObject;
}

@Override
public void run() {
for (int i = 0; i < NUM; i++) {
myObject.produce();
}
}
}

private static class ThreadConsume extends Thread{
private MyObject myObject;

public ThreadConsume(MyObject myObject) {
this.myObject = myObject;
}

@Override
public void run() {
for (int i = 0; i < NUM; i++) {
myObject.consume();
}
}
}

public static void main(String[] args) {
MyObject myObject = new MyObject();
ThreadProduce threadProduce = new ThreadProduce(myObject);
threadProduce.start();
ThreadConsume threadConsume = new ThreadConsume(myObject);
threadConsume.start();
}

实现效果:生产者和消费者交替执行,每生产一个就消费掉

1
2
3
4
5
6
produce: **
consume: ##
produce: **
consume: ##
produce: **
consume: ##

公平锁和非公平锁

  • 公平锁
    表示线程获取锁的顺序是按照线程加锁的顺序来分配的,即先来先得 FIFO 的先进先出顺序
  • 非公平锁
    一种获取锁的抢占机制,是随机获取锁。Java 中默认都是非公平锁

示例:

1
2
3
// true 表示公平锁,false 表示非公平锁  
private Lock lock = new ReentrantLock(true);
private Lock lock = new ReentrantLock(false);

ReentrantReadWriteLock 读写锁

ReentrantLock 具有完全互斥排他的效果,即同一个时间只有一个线程能执行 lock 后的任务,虽然保证了共享变量的线程安全性,但是效率较低。而ReentrantReadWriteLock 具有读写锁特性(有两个锁),可以提高效率。基本概念及特性:

  • 读锁
    读相关的锁(`reentrantReadWriteLock.readLock()),共享锁,读锁不互斥
  • 写锁
    写相关的锁(`reentrantReadWriteLock.writeLock()),排他锁。读锁与写锁互斥,写锁与写锁互斥;只要持有了写锁,不管是先写后读还是先读后写,都会互斥

单例模式多线程

立即加载/饿汉模式

类加载的时候就直接将对象创建并立即初始化(static),因为对象创建的比较急,即使不需要使用也会被初始化,即所谓的“饿汉”模式。特点:

  • 优点:线程安全
  • 缺点:即使不需要也会被实例化,大量使用会造成资源浪费

示例:

1
2
3
4
5
6
7
8
9
10
11
12
// 饿汉式,static,定义时直接初始化
private static MyObject myObject = new MyObject();

// 或者使用静态加载
//static {
// myObject = new MyObject();
//}

// 使用时,因为在类加载过程中就已经实例化,并发调用安全
public static MyObject getInstance(){
return myObject;
}

延迟加载/懒汉模式

在需要实例时才初始化,延迟加载即所谓的“懒汉”模式。特点:

  • 缺点:线程不安全,需要加锁确保并发安全
  • 优点:需要使用时才实例化,提高资源利用率

基本用法,存在线程不安全问题:

1
2
3
4
5
6
7
private static MyObject myObject;
public static MyObject getInstance(){
if (myObject == null){
myObject = new MyObject();
}
return myObject;
}

懒汉模式:synchronized 修饰方法

懒汉模式中使用 synchronized 修饰方法,可以解决并发安全问题。
缺点:如果单例已经被初始化完毕,每次使用时却还需要同步获取,效率并不高。

1
2
3
4
5
6
7
private static MyObject myObject;
public synchronized static MyObject getInstance(){
if (myObject == null){
myObject = new MyObject();
}
return myObject;
}

懒汉模式:synchronized 同步代码块,双检锁

懒汉模式中使用 synchronized 同步代码块,必须使用双检锁的模式,才能确保线程并发安全。双检锁:DCL- Dounble Check Lock,正确示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
// volatile 禁止指令重排序
private volatile static MyObject myObject;

public static MyObject getInstance(){
if (myObject == null){ //step1
synchronized (MyObject.class) { //step2
if (myObject == null) { //step3
myObject = new MyObject(); //step4
}
}
}
return myObject;
}

代码解析:

  • step1/step3 两次检测非空,即双检锁
  • step2 只有在初始化时加锁,提高效率
  • step4JMM 内存模型中,存在编译器对指令的优化。这一步并非原子操作,实际上被分解为三步:给实例分配存储空间;new 初始化存储空间;myObject 指向存储空间(此时 myObject 为非空了)。在指令优化时第二步和第三步是乱序的,所以当正在执行 step4 的线程在乱序的情况下,先执行了指向存储空间,导致 myObject 非空,其他线程在读取时认为非空就直接使用了会出错。所以针对这个问题,需要使用 volatile 关键字来修饰 myObject 共享变量,禁止指令重排序。

懒汉模式:静态内置类实现

饿汉模式中使用静态内置类来初始化实例,静态内置类只有在第一次引用时才会被加载到内存,间接实现了懒汉模式。示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static class MyObject{
// 静态内置类,只有在第一次引用时才加载
private static class MyObjectHandler{
private static MyObject myObject = new MyObject();
}

private MyObject(){

}

public static MyObject getInstance(){
return MyObjectHandler.myObject;
}
}

序列化与反序列化对静态内置类的影响

使用静态内置类实现懒汉模式,提供了资源利用率。但是在序列化和反序列化中,反序列化会克隆这个实例,不满足单例模式的特性了。
解决方案:在类中实现 readResolve 方法。该方法是 ObjectInputStream.readObject() 读取时,会判断类中是否定义了,如果定义则会通过反射调用 readResolve 返回的对象替换反序列化中创建的对象,这样就做到了确保单例模式的唯一性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static class MyObject implements Serializable{
private static final long serialVersionUID = -6683402613942762249L;
private static class MyObjectHandler{
private static MyObject myObject = new MyObject();
}

private MyObject(){...}

public static MyObject getInstance(){
return MyObjectHandler.myObject;
}

// ObjectInputStream.readObject() --> readOrdinaryObject
private Object readResolve() throws ObjectStreamException {
return MyObjectHandler.myObject;
}
}

懒汉模式:枚举类的实现

枚举中我们明确了构造方法限制为私有,在我们访问枚举实例时会执行构造方法,同时每个枚举实例都是 static final 类型的,也就表明只能被实例化一次。枚举也提供了序列化机制,所以不存在内部静态类在读取时需要重写 readResolve 的问题。在《Effective Java》 中提到:单元素的枚举类型已经成为实现 Singleton 的最佳方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 实现方法一:枚举类作为帮助类实现单例
private static class MyObject{
private enum Helper{
INSTANCE;

private MyObject myObject;
Helper(){
myObject = new MyObject();
}

public MyObject getInstance(){
return myObject;
}
}

private MyObject(){...}

public static MyObject getInstance(){
return Helper.INSTANCE.getInstance();
}
}

// 方法二:直接设计一个枚举类,并且支持序列化和反序列化
private enum MyObjectSerializable {
INSTANCE;

public void method() {

}
}

线程组

概念

  • 作用
    可以批量管理线程或线程组对象,有效的对线程或线程组对象进行组织。
  • 结构图
    0005-threadgroup-tree.png

线程对象关联线程组

  • 1 级关联
    父对象中有子对象,但不创建孙对象。非常常用,我们在线程池技术中,线程工厂创建线程时基本都是 1 级关联,可以对零散的线程进行有效的组织和规划。
  • 多级关联
    父对象中有子对象,子对象中还有子对象,也就是出现了子孙对象。多级关联就是结构图中的树结构,但是这种级联方式并不常见,层次过多会导致管理复杂。

1 级关联示例:

1
2
3
4
5
6
7
8
9
10
11
12
RunnableA runnableA = new RunnableA();
RunnableB runnableB = new RunnableB();
ThreadGroup threadGroup = new ThreadGroup("TestThreadGroup");
Thread threadA = new Thread(threadGroup, runnableA);
Thread threadB = new Thread(threadGroup, runnableB);
threadA.start();
threadB.start();
System.out.println("ThreadGroup Active Count: " + threadGroup.activeCount());
System.out.println("ThreadGruop Name: " + threadGroup.getName());

Thread.sleep(3000);
threadGroup.interrupt();

常用 API

  • threadGroup.getName():获取线程组名称
  • threadGroup.activeCount():获取线程组中活动线程估计数
  • threadGroup.interrupt():中断线程组中所有线程
  • threadGroup.getParent():获取线程组的父线程组

自动归属特性

线程组在构造时,如果不显示指定父线程组,默认为自动归属到当前线程组。
ThreadGroup threadGroup = new ThreadGroup("TestThreadGroup");:自动归属到当前 main 线程组中。

参考文档

  1. Java多线程编程核心技术–高洪岩
  2. 图文并茂简单易懂的博客1
  3. 图文并茂简单易懂的博客2