多线程基础

操作系统调度的最小任务单位是线程。

进程 vs 线程

多进程的 缺点 在于:

  • 开销更大,尤其是在Windows系统上;
  • 进程间通信比线程间通信要慢,因为线程间通信就是读写同一个变量,速度很快。

多进程的 优点 在于:

  • 稳定性更高,
    • 因为在多进程的情况下,一个进程崩溃不会影响其他进程,
    • 而在多线程的情况下,任何一个线程崩溃会直接导致整个进程崩溃。

多线程

Java语言内置了多线程支持:

一个Java程序实际上是一个JVM进程,JVM进程用一个主线程来执行main()方法,在main()方法内部,我们又可以启动多个线程。此外,JVM还有负责垃圾回收的其他工作线程等。

多线程编程的特点在于:多线程经常需要读写共享数据,并且需要同步。

Java多线程编程的特点又在于:

  • 多线程模型是Java程序最基本的并发模型;
  • 后续读写网络、数据库、Web开发等都依赖Java多线程模型。

创建新线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 方法一
public class Main {
public static void main(String[] args) {
Thread t = new MyThread();
t.start(); // 启动新线程
}
}

class MyThread extends Thread {
@Override
public void run() {
System.out.println("start new thread!");
}
}

// 方法二
public class Main {
public static void main(String[] args) {
Thread t = new Thread(new MyRunnable());
t.start(); // 启动新线程
}
}

class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("start new thread!");
}
}
// 方法三 lamda表达式
public class Main {
public static void main(String[] args) {
Thread t = new Thread(() -> {
System.out.println("start new thread!");
});
t.start(); // 启动新线程
}
}
  • Java用Thread对象表示一个线程,通过调用start()启动一个新线程;

  • 一个线程对象必须且只能调用一次start()方法;

  • 线程的执行代码写在run()方法中;

  • 线程调度由操作系统决定,程序本身无法决定调度顺序;

  • 可以对线程设定优先级,设定优先级的方法是:

    1
    Thread.setPriority(int n) // 1~10, 默认值5

    优先级高的线程被操作系统调度的优先级较高,操作系统对高优先级线程可能调度更频繁,但我们决不能通过设置优先级来确保高优先级的线程一定会先执行。

  • Thread.sleep()可以把当前线程暂停一段时间(单位:毫秒)。

线程的状态

Java线程对象Thread的状态包括:NewRunnableBlockedWaitingTimed WaitingTerminated

  • New:新创建的线程,尚未执行;
  • Runnable:运行中的线程,正在执行run()方法的Java代码;一旦run()方法执行完毕,线程就结束了。
  • Blocked:运行中的线程,因为某些操作被阻塞而挂起;
  • Waiting:运行中的线程,因为某些操作在等待中;
  • Timed Waiting:运行中的线程,因为执行sleep()方法正在计时等待;
  • Terminated:线程已终止,因为run()方法执行完毕。

image-20220303113753841

线程终止的原因有:

  • 线程正常终止:run()方法执行到return语句返回;
  • 线程意外终止:run()方法因为未捕获的异常导致线程终止;
  • 对某个线程的Thread实例调用stop()方法强制终止(强烈不推荐使用)。

join

main线程对线程对象t调用join()方法时,

主线程将等待变量t表示的线程运行结束,即join就是指等待该线程结束,然后才继续往下执行自身线程。

主要作用就是 同步 ,它可以使得线程之间的并行执行变为串行执行。

  • 可以指定等待时间(单位毫秒),超过等待时间线程仍然没有结束就不再等待;(join(0)等价于join(),等待无限时间)

  • 对已经运行结束的线程调用join()方法会立刻返回。

  • 对没有启动的线程调用join()(即在start之前调用join),并不能起到同步作用

join方法实现的原理

join方法是通过调用线程的 wait 方法来达到同步的目的的。

例如,A线程中调用了B线程的join方法,

则相当于在A线程中调用了B线程的wait方法,A线程变成waiting状态

当B线程执行完(或者到达等待时间),B线程会自动调用自身的notifyAll方法唤醒A线程,从而达到同步的目的。

中断线程

1
t.interrupt(); // 中断t线程

中断线程就是其他线程给该线程发一个信号,该线程收到信号后结束执行run()方法,使得自身线程能立刻结束运行。

  • 目标线程通过检测isInterrupted()标志获取自身是否已中断。

  • 如果目标线程处于 等待状态 ,该线程会捕获到InterruptedException

目标线程检测到isInterrupted()true或者捕获了InterruptedException都应该立刻结束自身线程;

volatile关键字——共享变量

线程的标志位boolean running是一个线程间共享的变量。线程间共享变量需要使用volatile关键字标记,确保每个线程都能读取到更新后的变量值。

在Java虚拟机中,变量的值保存在主内存中,但是,当线程访问变量时,它会先获取一个副本,并保存在自己的工作内存中。如果线程修改了变量的值,虚拟机会在某个时刻把修改后的值回写到主内存,但是,这个时间是不确定的!

因此,volatile关键字的目的是告诉虚拟机:

  • 每次访问变量时,总是获取主内存的最新值;
  • 每次修改变量后,立刻回写到主内存。

volatile关键字解决的是 可见性问题 :当一个线程修改了某个共享变量的值,其他线程能够立刻看到修改后的值。

如果我们去掉volatile关键字,运行上述程序,发现效果和带volatile差不多,这是因为在x86的架构下,JVM回写主内存的速度非常快,但是,换成ARM的架构,就会有显著的延迟。

守护线程 Daemon Thread

1
2
3
Thread t = new MyThread();
t.setDaemon(true);
t.start();

守护线程是为其他线程服务的线程;

所有非守护线程都执行完毕后,虚拟机退出;

守护线程不能持有需要关闭的资源(如打开文件等)。

  • 如果注释掉,即都是非守护进程,则只要有一个进程在执行,程序都无法退出
  • 守护线程会默默在后台执行,使得执行完毕的非守护线程可以正常退出
    • 举例: 垃圾回收机制

线程同步

如果多个线程同时读写共享变量,会出现数据不一致的问题,所以需要线程同步。

原子操作 : 原子操作是指不能被中断的一个或一系列操作。对变量进行读取和写入时,结果要正确,必须保证是原子操作。

多线程模型下,要保证逻辑正确,对共享变量进行读写时,必须保证一组指令以原子方式执行:即某一个线程执行时,其他线程必须等待

Java程序使用synchronized关键字对一个对象进行加锁:

  1. 找出修改共享变量的线程代码块;
  2. 选择一个共享实例作为锁;
  3. 使用synchronized(lockObject) { ... }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Counter {
public static final Object lock = new Object();
public static int count = 0;
}

class AddThread extends Thread {
public void run() {
for (int i=0; i<10000; i++) {
synchronized(Counter.lock) { // 加锁
Counter.count += 1;
} // 释放锁
}
}
}

  • 同步的本质就是给指定对象加锁,加锁后才能继续执行后续代码;
  • 注意加锁对象必须是 同一个 实例;
  • 对JVM定义的单个原子操作不需要同步。
    • 基本类型(longdouble除外)赋值,例如:int n = m
      • longdouble是64位数据,JVM没有明确规定64位赋值操作是不是一个原子操作,不过在x64平台的JVM是把longdouble的赋值作为原子操作实现的。
    • 引用类型赋值,例如:List<String> list = anotherList
  • 如果是多行赋值语句,就必须保证是同步操作

有些时候,通过一些巧妙的转换,可以把非原子操作变为原子操作。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 非原子操作
class Pair {
int first;
int last;
public void set(int first, int last) {
synchronized(this) { //加锁
this.first = first;
this.last = last;
}
}
}

// 原子操作
class Pair {
int[] pair;
public void set(int first, int last) {
int[] ps = new int[] { first, last };
this.pair = ps;
}
}

synchronized修饰方法可以把整个方法变为同步代码块,synchronized方法加锁对象是this

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Counter {
private int count = 0;

public void add(int n) {
synchronized(this) {
count += n;
}
}

// 等价于下面的写法
public synchronized void add(int n) { // 锁住this
count += n;
} // 解锁


public void dec(int n) {
synchronized(this) {
count -= n;
}
}

public int get() {
return count;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var c1 = Counter();
var c2 = Counter();

// 对c1进行操作的线程:
new Thread(() -> {
c1.add();
}).start();
new Thread(() -> {
c1.dec();
}).start();

// 对c2进行操作的线程:
new Thread(() -> {
c2.add();
}).start();
new Thread(() -> {
c2.dec();
}).start();

通过合理的设计和数据封装可以让一个类变为“线程安全”;

  • 如果一个类被设计为允许多线程正确访问,我们就说这个类就是“线程安全”的(thread-safe),上面的Counter类就是线程安全的。Java标准库的java.lang.StringBuffer也是线程安全的。

  • 不变类 ,例如StringIntegerLocalDate,它们的所有成员变量都是final,多线程同时访问时只能读不能写,这些不变类也是线程安全的。

  • 最后,类似Math这些只提供静态方法, 没有成员变量的类,也是线程安全的

一个类没有特殊说明,默认不是thread-safe;

  • 大部分类,例如ArrayList,都是非线程安全的类,我们不能在多线程中修改它们。但是,如果所有线程都只读取,不写入,那么ArrayList是可以安全地在线程间共享的。

对一个静态方法添加synchronized修饰符,锁住的是该类的Class实例。

死锁

可重入锁:

JVM允许同一个线程重复获取同一个锁,这种能被同一个线程反复获取的锁,就叫做可重入锁。

Java的synchronized锁是可重入锁;

死锁:

死锁产生的条件是多线程各自持有不同的锁,并互相试图获取对方已持有的锁,导致无限等待;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void add(int m) {
synchronized(lockA) { // 获得lockA的锁
this.value += m;
synchronized(lockB) { // 获得lockB的锁
this.another += m;
} // 释放lockB的锁
} // 释放lockA的锁
}

public void dec(int m) {
synchronized(lockB) { // 获得lockB的锁
this.another -= m;
synchronized(lockA) { // 获得lockA的锁
this.value -= m;
} // 释放lockA的锁
} // 释放lockB的锁
}

如何避免死锁呢?答案是:线程获取锁的顺序要一致。即严格按照先获取lockA,再获取lockB的顺序

使用wait和notify

必须在synchronized块中才能调用wait()方法,因为wait()方法调用时,会释放线程获得的锁,wait()方法返回后,线程又会重新试图获得锁。

waitnotify用于多线程协调运行:

  • synchronized内部可以调用wait()使线程进入等待状态;
  • 必须在已获得的锁对象上调用wait()方法;
  • synchronized内部可以调用notify()notifyAll()唤醒其他等待线程;
  • 必须在已获得的锁对象上调用notify()notifyAll()方法;
  • 已唤醒的线程还需要重新获得锁后才能继续执行。

使用ReentrantLock

java.util.concurrent.locks包提供的ReentrantLock用于替代synchronized进行同步;

ReentrantLock获取锁更安全;

必须先获取到锁,再进入try {...}代码块,最后使用finally保证释放锁;

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Counter {
private final Lock lock = new ReentrantLock();
private int count;

public void add(int n) {
lock.lock();
try {
count += n;
} finally {
lock.unlock();
}
}
}

可以使用tryLock()尝试获取锁。

1
2
3
4
5
6
7
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
...
} finally {
lock.unlock();
}
}

使用Condition

Condition可以替代waitnotify

Condition对象必须从Lock对象获取。

1
private final Condition condition = lock.newCondition();

Condition提供的await()signal()signalAll()原理和synchronized锁对象的wait()notify()notifyAll()是一致的,并且其行为也是一样的:

  • await()会释放当前锁,进入等待状态;
  • signal()会唤醒某个等待线程;
  • signalAll()会唤醒所有等待线程;
  • 唤醒线程从await()返回后需要重新获得锁。

此外,和tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()signalAll()唤醒,可以自己醒来:

1
2
3
4
5
if (condition.await(1, TimeUnit.SECOND)) {
// 被其他线程唤醒
} else {
// 指定时间内没有被其他线程唤醒
}

使用ReadWriteLock

使用ReadWriteLock可以提高读取效率:

  • ReadWriteLock只允许一个线程写入;
  • ReadWriteLock允许多个线程在没有写入时同时读取;
  • ReadWriteLock适合读多写少的场景。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Counter {
private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
private final Lock rlock = rwlock.readLock();
private final Lock wlock = rwlock.writeLock();
private int[] counts = new int[10];

public void inc(int index) {
wlock.lock(); // 加写锁
try {
counts[index] += 1;
} finally {
wlock.unlock(); // 释放写锁
}
}

public int[] get() {
rlock.lock(); // 加读锁
try {
return Arrays.copyOf(counts, counts.length);
} finally {
rlock.unlock(); // 释放读锁
}
}
}

局限: 读的过程中不允许写

使用StampedLock

StampedLockReadWriteLock相比,改进之处在于:读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。

  • 乐观锁: 乐观地估计读的过程中大概率不会有写入
  • 悲观锁: 读的过程中拒绝有写入,也就是写入必须等待。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Point {
private final StampedLock stampedLock = new StampedLock();

private double x;
private double y;

public void move(double deltaX, double deltaY) {
long stamp = stampedLock.writeLock(); // 获取写锁
try {
x += deltaX;
y += deltaY;
} finally {
stampedLock.unlockWrite(stamp); // 释放写锁
}
}

public double distanceFromOrigin() {
long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁,返回版本号
// 注意下面两行代码不是原子操作
// 假设x,y = (100,200)
double currentX = x;
// 此处已读取到x=100,但x,y可能被写线程修改为(300,400)
double currentY = y;
// 此处已读取到y,如果没有写入,读取是正确的(100,200)
// 如果有写入,读取是错误的(100,400)
if (!stampedLock.validate(stamp)) { // 通过validate()去验证版本号,检查乐观读锁后是否有其他写锁发生
stamp = stampedLock.readLock(); // 获取一个悲观读锁
try {
currentX = x;
currentY = y;
} finally {
stampedLock.unlockRead(stamp); // 释放悲观读锁
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}

StampedLock是不可重入锁,不能在一个线程中反复获取同一个锁。

使用Concurrent集合

使用java.util.concurrent包提供的线程安全的并发集合可以大大简化多线程编程:

多线程同时读写并发集合是安全的;

尽量使用Java标准库提供的并发集合,避免自己编写同步代码。

interface non-thread-safe thread-safe
List ArrayList CopyOnWriteArrayList
Map HashMap ConcurrentHashMap
Set HashSet / TreeSet CopyOnWriteArraySet
Queue ArrayDeque / LinkedList ArrayBlockingQueue / LinkedBlockingQueue
Deque ArrayDeque / LinkedList LinkedBlockingDeque

使用Atomic

使用java.util.concurrent.atomic提供的原子操作可以简化多线程编程:

  • 原子操作实现了无锁的线程安全;
  • 适用于计数器,累加器等。

AtomicInteger为例,它提供的主要操作有:

  • 增加值并返回新值:int addAndGet(int delta)
  • 加1后返回新值:int incrementAndGet()
  • 获取当前值:int get()
  • 用CAS方式设置:int compareAndSet(int expect, int update)

Atomic类是通过无锁(lock-free)的方式实现的线程安全(thread-safe)访问。它的主要原理是利用了CAS:Compare and Set。

CAS是指,在这个操作中,如果AtomicInteger的当前值是prev,那么就更新为next,返回true。如果AtomicInteger的当前值不是prev,就什么也不干,返回false。通过CAS操作并配合do ... while循环,即使其他线程修改了AtomicInteger的值,最终的结果也是正确的。

使用线程池

线程池: 能接收大量小任务并进行分发处理的就是线程池。线程池内部维护了若干个线程:

  • 没有任务的时候,这些线程都处于等待状态。
  • 如果有新任务,就分配一个空闲线程执行。
  • 如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。

Java标准库提供了ExecutorService接口表示线程池,它的典型用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import java.util.concurrent.*;

public class Main {
public static void main(String[] args) {
// 创建一个固定大小的线程池:
ExecutorService es = Executors.newFixedThreadPool(4);
for (int i = 0; i < 6; i++) {
// 提交任务
es.submit(new Task("" + i));
}
// 关闭线程池:
es.shutdown();
}
}

因为ExecutorService只是接口,Java标准库提供的几个常用实现类有:

  • FixedThreadPool:线程数固定的线程池;
  • CachedThreadPool:线程数根据任务动态调整的线程池;
  • SingleThreadExecutor:仅单线程执行的线程池。

线程池在程序结束的时候要关闭。

  • shutdown()方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。
  • shutdownNow()会立刻停止正在执行的任务
  • awaitTermination()则会等待指定的时间让线程池关闭。

ScheduledThreadPool

放入ScheduledThreadPool的任务可以定期反复执行。

创建一个ScheduledThreadPool仍然是通过Executors类:

1
ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);

Java标准库还提供了一个java.util.Timer类,这个类也可以定期执行任务,但是,一个Timer会对应一个Thread,所以,一个Timer只能定期执行一个任务,多个定时任务必须启动多个Timer,而一个ScheduledThreadPool就可以调度多个定时任务.

使用Future

对线程池提交一个Callable任务,可以获得一个Future对象;

1
2
3
4
5
6
7
ExecutorService executor = Executors.newFixedThreadPool(4); 
// 定义任务:
Callable<String> task = new Task();
// 提交任务并获得Future:
Future<String> future = executor.submit(task);
// 从Future获取异步执行返回的结果:
String result = future.get(); // 可能阻塞

可以用Future在将来某个时刻获取结果。一个Future<V>接口表示一个未来可能会返回的结果,它定义的方法有:

  • get():获取结果(可能会等待)
  • get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间;
  • cancel(boolean mayInterruptIfRunning):取消当前任务;
  • isDone():判断任务是否已完成。

使用CompletableFuture

CompletableFuture可以指定异步处理流程:

  • thenAccept()处理正常结果;
  • exceptional()处理异常结果;
  • thenApplyAsync()用于串行化另一个CompletableFuture
  • anyOf()allOf()用于并行化多个CompletableFuture

使用ForkJoin

Fork/Join线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行。任务类必须继承自RecursiveTaskRecursiveAction

举例:对大数据进行并行求和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import java.util.Random;
import java.util.concurrent.*;

public class Main {
public static void main(String[] args) throws Exception {
// 创建2000个随机数组成的数组:
long[] array = new long[2000];
long expectedSum = 0;
for (int i = 0; i < array.length; i++) {
array[i] = random(); //构造数组
expectedSum += array[i];
}
System.out.println("Expected sum: " + expectedSum);
// fork/join:
ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
long startTime = System.currentTimeMillis();
//
Long result = ForkJoinPool.commonPool().invoke(task);
long endTime = System.currentTimeMillis();
System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
}

static Random random = new Random(0);

static long random() {
return random.nextInt(10000);
}
}

class SumTask extends RecursiveTask<Long> {
static final int THRESHOLD = 500; // 判断任务大小的阈值
long[] array;
int start;
int end;

SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 如果任务足够小,直接计算:
long sum = 0;
for (int i = start; i < end; i++) {
sum += this.array[i];
// 故意放慢计算速度:
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
return sum;
}
// 任务太大,一分为二:
int middle = (end + start) / 2;
System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
SumTask subtask1 = new SumTask(this.array, start, middle);
SumTask subtask2 = new SumTask(this.array, middle, end);
// invokeAll会并行运行两个子任务:
invokeAll(subtask1, subtask2);
// 获得子任务的结果:
Long subresult1 = subtask1.join();
Long subresult2 = subtask2.join();
Long result = subresult1 + subresult2;
System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
return result;
}
}

使用ThreadLocal

在代码中调用Thread.currentThread()获取当前线程。例如,打印日志时,可以同时打印出当前线程的名字。

上下文(Context): 在一个线程中,横跨若干方法调用,需要传递的对象,是一种状态,可以是用户身份、任务信息等。

ThreadLocal,它可以在一个线程中传递同一个对象。适合在一个线程的处理流程中保持上下文(避免了同一参数在所有方法中传递);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static ThreadLocal<User> threadLocalUser = new ThreadLocal<>();

void processUser(user) {
try {
threadLocalUser.set(user);
step1();
step2();
} finally {
threadLocalUser.remove(); // 清除
}
}

void step1() {
User u = threadLocalUser.get();
log();
printUser();
}

void log() {
User u = threadLocalUser.get();
println(u.name);
}

void step2() {
User u = threadLocalUser.get();
checkUser(u.id);
}

ThreadLocal表示线程的“局部变量”,它确保每个线程的ThreadLocal变量都是各自独立的;

ThreadLocal

使用ThreadLocal要用try ... finally结构,并在finally中清除。