JUC 并发编程
概述
1.基本概念
1.1 进程和线程
1.1.1 定义
1.1.2 关系
1.1.3 区别
-
简而言之,一个程序至少有一个进程,一个进程至少有一个线程
-
线程的划分尺度小于进程,使得多线程程序的并发性高
-
另外,进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率
-
线程在执行过程中与进程还是有区别的。每个独立的进程有一个程序运行的入口、顺序执行序列和程序的出口。**但是线程不能够独立执行,**必须依存在应用程序中,由应用程序提供多个线程执行控制
-
从逻辑角度来看,多线程的意义在于一个应用程序中,有多个执行部分可以同时执行。但操作系统并没有将多个线程看做多个独立的应用,来实现进程的调度和管理以及资源分配。这就是进程和线程的重要区别
1.1.4 线程和进程在使用上各有优缺点
1.2 线程的状态
-
初始(NEW):新创建了一个线程对象,但还没有调用start()方法。
-
运行(RUNNABLE)
-
Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。
-
就绪状态(RUNNABLE之READY)
-
就绪状态只是说你资格运行,调度程序没有挑选到你,你就永远是就绪状态。
-
调用线程的start()方法,此线程进入就绪状态。
-
当前线程sleep()方法结束,其他线程join()结束,等待用户输入完毕,某个线程拿到对象锁,其他线程也将进入就绪状态。
-
当前线程时间片用完了,调用当前线程的yield()方法,当前线程进入就绪状态。
-
锁池里的线程拿到对象锁后,进入就绪状态。
-
-
线程调度程序从可运行池中选择一个线程作为当前线程时,线程进入运行态。这也是线程进入运行状态的唯一的一种方式。
-
-
阻塞(BLOCKED)
-
等待(WAITING)
-
超时等待(TIMED_WAITING)
-
终止(TERMINATED)
-
当线程的run()方法完成时,或者主线程的main()方法完成时,我们就认为它终止了。这个线程对象也许是活的,但是它已经不是一个单独执行的线程。线程一旦终止了,就不能复生。
-
在一个终止的线程上调用start()方法,会抛出java.lang.IllegalThreadStateException异常。
-
这6种状态定义在Thread类的State枚举中,可查看源码进行一一对应。
1.2.1 wait和sleep
sleep()
wait()
1.2.1.1 区别
1.3 并发和并行
1.3.1 概述
并发是逻辑上的同时发生(simultaneous),而并行是物理上的同时发生。
1.3.2 并发(一同出发,一起争夺一个资源)
1.3.3 并行(一同出行)
1.4 管程
1.5 用户线程和守护线程
1.5.1 概述
1.5.2 设置
1.5.3 区别
2. 创建线程的方式
Java使用Thread类代表线程,所有的线程对象都必须是Thread类或其子类的实例。
2.1 Java可以用四种方式来创建线程
- 继承Thread类创建线程
- 实现Runnable接口创建线程
- 使用Callable和Future创建线程
- 使用线程池例如用Executor框架
2.1.1 继承Thread类创建线程
通过在类名上继承Thread并重写其中的run()方法来实现,在创建实例的时候调用xxx.start()方法来启动线程
2.1.2 实现Runnable接口创建线程
实现Runnable接口并重写其中的run()方法,创建实例以后通过放入线程来创建线程
2.1.2.1 实例
public class Main {
public static void main(String[] args){undefined
// 创建并启动线程,这里的myThread已经实现了Runnable接口并重写了run()方法
MyThread2 myThread=new MyThread2();
Thread thread=new Thread(myThread);
thread().start();
// 或者new Thread(new MyThread2()).start();
}
}
2.1.3 使用Callable和Future创建线程
2.1.4 实现
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
//实现Callable接口
public class CallableTest {
public static void main(String[] args) {
//执行Callable 方式,需要FutureTask 实现实现,用于接收运算结果
FutureTask<Integer> futureTask = new FutureTask<Integer>(new MyCallable());
new Thread(futureTask).start();
//接收线程运算后的结果
try {
Integer sum = futureTask.get();
System.out.println(sum);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i < 100; i++) {
sum += i;
}
return sum;
}
}
2.1.4 使用线程池例如用Executor框架
线程池提供了一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁额外开销,提升了响应速度。实现了线程复用。
2.1.4.1 实现
Callable<Singleton4> callable = new Callable<Singleton4>() {
@Override
public Singleton4 call() throws Exception {
return Singleton4.getInstance();
}
};
// 创建一个线程池,并设定大小
ExecutorService threadPool = Executors.newFixedThreadPool(2);
// 通过future来接收线程启动后返回的结果
Future<Singleton4> f1 = threadPool.submit(callable);
Future<Singleton4> f2 = threadPool.submit(callable);
// 获取返回值
Singleton4 s6 = f1.get();
Singleton4 s7 = f1.get();
System.out.println(s6 == s7);
threadPool.shutdown();
}
3.多线程编程步骤
-
创建资源类,编写属性和操作方法
-
在资源类设置操作方法
- 判断
- 执行
- 通知
-
创建多个线程,调用资源类的操作方法
-
防止虚假唤醒,将在资源类操作方法的判断条件设置在while中
4. Synchronized
4.1 概述
Java自带的关键字,它最大的特征就是在同一时刻只有一个线程能够获得对象的监视器(monitor),从而进入到同步代码块或者同步方法之中,即表现为互斥性(排它性)。在发生异常的时候会自动释放锁。
4.2 实例
public class SaleTicket {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(() ->{
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"售票员1").start();
new Thread(() ->{
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"售票员2").start();
new Thread(() ->{
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"售票员3").start();
}
}
class Ticket{
private int num = 30;
public synchronized void sale(){
if (num > 0 ){
// 获取当前执行的线程的名字
System.out.println(Thread.currentThread().getName()+"售卖第"+(num--)+"票"+"还剩下"+num+"票");
}
}
}
5.LOCK
5.1 概述
Lock不是java中的关键字,通过实例化来获取,可以让用户自己手动的上锁和解锁,如果没有设定解锁方式,在发生异常时不能正常解锁,则会发生死锁现象。
5.2 实例
import java.util.concurrent.locks.ReentrantLock;
public class SaleTicketByLockDemo {
public static void main(String[] args) {
SaleTicketByLock ticket = new SaleTicketByLock();
new Thread(() ->{
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"售票员1").start();
new Thread(() ->{
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"售票员2").start();
new Thread(() ->{
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"售票员3").start();
}
}
class SaleTicketByLock{
// 实现Lock
ReentrantLock lock =new ReentrantLock();
private int num = 30;
// 这里使用try finally 的方式来确保无论是否发生异常最后都能正确的关闭锁,以确保不会发生死锁的方式
public synchronized void sale(){
lock.lock();
try {
if (num > 0 ){
System.out.println(Thread.currentThread().getName()+"售卖第"+(num--)+"票"+"还剩下"+num+"票");
}
} finally {
lock.unlock();
}
}
}
6.Lock和Synchronized的区别
6.1 概述
6.2 区别
-
异常是否释放锁
-
是否知道获取锁
-
Lock可以提高多个线程进行读操作的效率
-
在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock的性能要远远优于synchronized。所以说,在具体使用时要根据适当情况选择
-
synchronized使用Object对象本身的wait 、notify、notifyAll调度机制,而Lock可以使用Condition进行线程之间的调度
-
lock可以通过condition来达到精确唤醒
6.3 volatile关键字
7.线程通信
7.1 用Synchronized实现
package syn;
/**
* @Author PoX21s
* @Date: 2021/11/1 17:01
* @Version 1.0
*/
public class ThreadDemo1 {
public static void main(String[] args) {
share share = new share();
new Thread(() ->{
for (int i = 0; i < 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"线程一").start();
new Thread(() ->{
for (int i = 0; i < 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"线程二").start();
}
}
class share{
private int num = 0;
public synchronized void incr() throws InterruptedException {
if (num == 1){
this.wait();
}
num++;
System.out.println(Thread.currentThread().getName()+"::"+num);
this.notifyAll();
}
public synchronized void decr() throws InterruptedException {
if (num == 0){
this.wait();
}
num--;
System.out.println(Thread.currentThread().getName()+"::"+num);
this.notifyAll();
}
}
7.2 虚假唤醒问题
7.2.1 解决办法
将判断条件中的if改为while,在醒来以后继续判断。
7.3 用Lock实现
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Author PoX21s
* @Date: 2021/11/1 17:01
* @Version 1.0
*/
public class ThreadDemo2 {
public static void main(String[] args) {
share1 share = new share1();
new Thread(() ->{
for (int i = 0; i < 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"线程一").start();
new Thread(() ->{
for (int i = 0; i < 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"线程二").start();
new Thread(() ->{
for (int i = 0; i < 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"线程三").start();
new Thread(() ->{
for (int i = 0; i < 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"线程四").start();
}
}
class share1 {
private int num = 0;
ReentrantLock lock = new ReentrantLock();
// 通过lock中的condition实例来实现类似synchronized中的wait和notify
private Condition condition = lock.newCondition();
public void incr() throws InterruptedException {
lock.lock();
try {
while (num == 1){
condition.await();
}
num++;
System.out.println(Thread.currentThread().getName()+"::"+num);
condition.signalAll();
} finally {
lock.unlock();
}
}
public void decr() throws InterruptedException {
lock.lock();
try {
while (num == 0){
condition.await();
}
num--;
System.out.println(Thread.currentThread().getName()+"::"+num);
condition.signalAll();
} finally {
lock.unlock();
}
}
}
8.线程定制化通信
按照规定的方式执行,执行完以后,更改标志位,唤醒下一个线程。使用Lock实现。
8.1 实现
package syn;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Author PoX21s
* @Date: 2021/11/1 17:01
* @Version 1.0
*/
public class ThreadDemo3 {
public static void main(String[] args) {
share2 share2 = new share2();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share2.print5();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"aa").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share2.print10();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"bb").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share2.print15();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"cc").start();
}
}
class share2 {
// 设置标志位
private int flag = 1;
private ReentrantLock lock = new ReentrantLock();
// 一个Lock对象中可以创建多个Condition实例(即对象监视器)
private Condition c1 = lock.newCondition();
private Condition c2 = lock.newCondition();
private Condition c3 = lock.newCondition();
public void print5() throws InterruptedException {
lock.lock();
try {
while (flag != 1){
c1.await();
}
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+" :: "+"打印第"+i);
}
flag = 2;
c2.signal();
} finally {
lock.unlock();
}
}
public void print10() throws InterruptedException {
lock.lock();
try {
while (flag != 2){
c2.await();
}
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName()+" :: "+"打印第"+i);
}
flag = 3;
c3.signal();
} finally {
lock.unlock();
}
}
public void print15() throws InterruptedException {
lock.lock();
try {
while (flag != 3){
c3.await();
}
for (int i = 0; i < 15; i++) {
System.out.println(Thread.currentThread().getName()+" :: "+"打印第"+i);
}
flag = 1;
c1.signal();
} finally {
lock.unlock();
}
}
}
9.集合中的线程安全
在多个线程同时读取和写入(并发修改)的时候可能会出现线程并发安全问题,下面通过三种方式来解决,推荐使用第三种。
9.1 ArryayList中的线程安全
9.1.1 通过Vector解决
9.1.1.1 实现
public class ThreadDemo4 {
public static void main(String[] args) {
// List<String> list = new ArrayList<>();
// 通过改变创建list引用的对象来实现在添加时的线程安全
List<String> list = new Vector<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0,7));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
9.1.2 通过Collections中的synchronizedList方法解决
本质也是在add方法中增加了synchronized关键字。
9.1.2.1 实现
public class ThreadDemo4 {
public static void main(String[] args) {
// 本质通过在add方法上添加了线程同步关键字实现
List<String> list = Collections.synchronizedList(new ArrayList<String>());
for (int i = 0; i < 30; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0,7));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
9.1.3 通过CopyOnWriteArrayList<>()类来实现
9.1.3.1 实现
public class ThreadDemo4 {
public static void main(String[] args) {
// 写时复制技术,先复制出来一份,当前写完以后合并更新到原数组,后面的线程使用新的数组进行操作
List<String> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0,7));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
9.2 HashSet中的线程安全
9.2.1 通过Collections.synchronizedSet()方法
通过synchronized关键字实现。
9.2.1.1 实现
public class ThreadDemo5 {
public static void main(String[] args) {
// HashSet<String> set = new HashSet<>();
Set<String> set;
set = Collections.synchronizedSet(new HashSet<String>());
for (int i = 0; i < 30; i++) {
new Thread(() -> {
set.add(UUID.randomUUID().toString().substring(0,6));
System.out.println(set);
},String.valueOf(i)).start();
}
}
}
9.3 HashMap中的线程安全
9.3.1 通过ConcurrentHashMap<>()解决
添加synchronized关键字代码块,实现线程同步。推荐使用,性能和HashMap差距不大。
9.3.1.1 实现
public class ThreadDemo5 {
public static void main(String[] args) {
Map<String, String> map = new ConcurrentHashMap<>();
for (int i = 0; i < 30; i++) {
String key = String.valueOf(i);
new Thread(() -> {
map.put(key,UUID.randomUUID().toString().substring(0,6));
System.out.println(map);
},String.valueOf(i)).start();
}
}
}
10.多线程锁
10.1 锁的几种情况
10.1.1 实例
class Person{
public synchronized void test(){
// ....
}
public static synchronized void test2(){
// ....
}
public void test3(){
// ....
}
}
// 当只有单synchronized时锁的是p1实例对象
// 当在synchronized前面加了时,锁的是Person类
public class Test{
// psvm
Person p1 = new Person();
}
10.2 公平锁和非公平锁
10.2.1 非公平锁
-
使用非公平锁,当前线程发现有空就会去使用,不会考虑其他线程
-
如果占用失败,则会采用类似公平锁的方式,到队列后面
-
效率高,但可能会造成单个线程做完了所有事,其他线程没事可干,一直陪跑
-
synchronized是非公平锁
-
lock默认是非公平,但是可以设置为公平锁 new reentrant(true)
10.2.2 公平锁
-
使用公平锁,当前线程发现有空的时候,会询问其他线程是否有线程要使用,如果没有其他线程要使用,就自己用,如果有其他线程用,就排队。
-
按照申请锁的顺序来执行
-
效率相对于非公平锁会低,因为会有询问的过程,但是相对非公平锁来说,每一个线程都有执行的机会,不会出现单线程干完所有事的情况(线程饥饿问题)
10.3 可重入锁
-
synchronized(隐式的上锁和解锁交给JVM)和Lock(显式的上锁和解锁交给用户本身)都是可重入锁
-
可重入锁又称为递归锁,当一把锁内部还有其他锁的时候,此时线程如果获得最外层的锁,那么内部的锁线程也可以随意进入,下面通过synchronized来演示
-
最大的作用就是避免死锁,还是要注意lock的锁释放
10.3.1 实例
public class ThreadDemo6 {
public static void main(String[] args) {
Object o = new Object();
new Thread(() -> {
synchronized (o){
System.out.println(Thread.currentThread().getName()+"外部的锁");
synchronized (o) {
System.out.println(Thread.currentThread().getName()+"中间的锁");
synchronized (o){
System.out.println(Thread.currentThread().getName()+"内部的锁");
}
}
}
},"线程一").start();
}
}
10.3.1.1 注意
10.4 死锁
10.4.1 产生死锁的原因
- 系统资源不足
- 进程运行推进不顺利
- 系统资源分配不当
10.4.1.1 实例演示
public class DeadLock {
static Object o1 = new Object();
static Object o2 = new Object();
public static void main(String[] args) {
new Thread(() -> {
synchronized (o1){
System.out.println(Thread.currentThread().getName()+"持有o1锁,试图获取o2锁");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o2){
System.out.println(Thread.currentThread().getName()+"试图获取o2锁");
}
}
},"线程一").start();
new Thread(() -> {
synchronized (o2){
System.out.println(Thread.currentThread().getName()+"持有o2锁,试图获取o1锁");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o1){
System.out.println(Thread.currentThread().getName()+"试图获取o2锁");
}
}
},"线程二").start();
}
}
// 输出结果
// 线程一持有o1锁,试图获取o2锁
// 线程二持有o2锁,试图获取o1锁
// (一直等待中)
10.4.2 验证死锁
可以通过java中自带的工具查询:jps和jstack
- jps -l ==> 找到当前运行类的java进程号
- jstack [java进程号] ==》 查询当前的运行情况
10.4.2.1 实例演示
PS F:\Java\JUC> jps -l
17044 sun.tools.jps.Jps
18692 org.jetbrains.jps.cmdline.Launcher
13612
21276 syn.DeadLock
PS F:\Java\JUC> jstack 21276
Found 1 deadlock. #返回当前为死锁状态
11.Callable接口
在jdk1.5以后新增的一种创建线程的方式,和Runnable的区别
- Callable可以有返回值
- Callable当返回值有问题时,会抛出异常,而Runnable不会抛出异常
- 实现方法名称不同,Runnable通过run()方法实现,Callable通过call()方法实现
11.1 创建一个实现Callable接口的线程
不能直接传递给Thread,Thread只能接收Runnable对象,因此为了让Thread能够接收Callable对象,要通过中间类来实现,这就是FutureTask类。
11.1.1 关于FutureTask
在不影响主线程的情况下,可以再开启其他线程来执行其他操作,这些线程执行完了以后,把结果返回给主线程,并且所有结果只会汇总一次
11.2 实例演示
class Demo implements Callable{
@Override
public Object call() throws Exception {
return 1+2+3;
}
}
public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Demo c1 = new Demo();
FutureTask<Integer> task1 = new FutureTask<Integer>(c1);
FutureTask<Integer> task2 = new FutureTask<Integer>(() -> {
return 1024;
});
new Thread(task1,"线程一").start();
new Thread(task2,"线程二").start();
// 等待得到返回结果
while (!task1.isDone()){
System.out.println("等待返回结果");
}
System.out.println("结果:");
// 当得到一次结果以后,以后的get()则会直接调取结果,不用再进行运算
// 即只用汇总一次
System.out.println(task1.get());
System.out.println(task1.get());
// 当所有的新开起的线程都结束了,主线程才会结束
System.out.println(task2.get());
System.out.println(Thread.currentThread().getName()+"over");
}
}
12.JUC强大辅助类 – 线程同步工具
12.1 CountDownLatch
可以设定初始值,当这个类中的count属性不为0的时候,会让其他线程进入阻塞状态,调用await()方法,只有减为0才会唤醒其他线程往下执行,也就是解除await()方法只能是count减为0的时候
12.1.1 实例演示
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch count = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+ " 号同学离开教室");
count.countDown();
},String.valueOf(i)).start();
}
count.await();
System.out.println(Thread.currentThread().getName()+" 关闭教室门");
}
}
12.2 CyclicBarrier
设定一组现成的额个数,多个线程开始执行,当开始执行的线程达到设定的一组的个数时,这一组的线程同时开始执行。没有达到数量时,已经开始执行的线程处于阻塞状态,等待其他线程的到来,然后再一起执行。
12.2.1 实例演示
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(6, new Runnable() {
@Override
public void run() {
System.out.println("已经集齐了7颗龙珠,召唤神龙");
}
});
for (int i = 0; i < 7; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName()+ " 号龙珠被得到");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
12.3 Semaphore
设定许可的数量,当一个线程获取到了许可,则可以执行方法,其他开启的线程没有获得许可,则进入等待状态,当有线程释放许可以后,等待的线程争夺许可,获得了许可执行,没有获得的继续等待。
12.3.1 实例演示
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+" 号获取到车位");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName()+" 离开车位====》");
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
13.一些锁的概念
13.1 悲观锁
每次进行操作都会进行锁上,操作完以后才会解锁,当一个线程在执行的时候,其他线程都处于等待阻塞状态
13.2 乐观锁
线程在执行的时候不会将要处理的资源锁上,会将资源复制一份,然后再复制的资源上进行操作,操作完以后提交到原有的数据上进行更新操作。但是在提交的时候,要进行比对判断,判断复制的原内容和现在的情况是否相同 ==》例如:现在有两个线程要操作同一个资源,它们在操作资源的时候都会先进行复制,然后在复制的资源上进行操作,但是现在线程一先处理完,提交更新了,这时的原资源已经发生了改变,当其他线程提交的时候,发现自己最开始复制的内容,和现在的资源内容不相同,则会提交失败,而是继续复制当前的内容,然后再提交判断。
13.3 表锁
在线程处理资源的时候,会将整个数据表都锁上,不允许其他线程进行操作,不会发生死锁现象。
13.4 行锁
在线程处理资源时,只会将要处理的一行进行上锁,表中的其他行是允许其他线程进行操作的,会出现死锁现象,即两个或两个以上的线程互相等待对方释放锁,而导致的多线程间互相等待的情况。
13.5 读锁
在读的时候,允许多个线程一起读,共享锁,会发生死锁现象 ==》 例如:多个线程都在读取同一行资源,但是互相之间都想要修改,则都在互相等待对方读取完毕然后进行修改的操作,因为都想修改,则会导致互相等待的情况。
13.6 写锁
在写的时候,不允许多个线程共同操作,独享锁,但也会发生死锁现象 == 》 例如:多个线程都在修改不同的资源,但是都又想操作对方的资源,而出现互相等待死锁的情况。
14.读写锁
读的时候可以共享,但是写的时候只能有一个线程进行操作。但是不能同时存在读写操作,读写是互斥的,读读是共享的。
14.1 优点
比其synchronized和lock独占锁的情况,可以提升读的性能
14.2 缺点
- 会造成线程饥饿,也就是说会导致有线程一直在读,进而无法进行写的操作
- 写的时候不能读,一直等待,写的时候可以读
14.3 读写锁的实例
public class ReadWriteLockDemo {
public static void main(String[] args) {
ContextDemo context = new ContextDemo();
for (int i = 0; i < 5; i++) {
final int num = i;
new Thread(() -> {
try {
context.put(num+"",num+"");
} catch (InterruptedException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
for (int i = 0; i < 5; i++) {
final int num = i;
new Thread(() -> {
try {
context.get(num+"");
} catch (InterruptedException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
class ContextDemo {
private volatile Map<String,Object> map = new HashMap<>();
private ReadWriteLock lock = new ReentrantReadWriteLock();
public void put(String key, Object o) throws InterruptedException {
lock.writeLock().lock();
System.out.println(Thread.currentThread().getName()+" 正在写入~~");
TimeUnit.MICROSECONDS.sleep(300);
map.put(key,o);
System.out.println(Thread.currentThread().getName()+" 写入完毕~~");
lock.writeLock().unlock();
}
public Object get(String key) throws InterruptedException {
Object res = null;
lock.readLock().lock();
System.out.println(Thread.currentThread().getName()+" 正在读取~~");
TimeUnit.MICROSECONDS.sleep(300);
res = map.get(key);
System.out.println(Thread.currentThread().getName()+" 读取完毕~~");
lock.readLock().unlock();
return res;
}
}
// 结果
1 正在写入~~
1 写入完毕~~
2 正在写入~~
2 写入完毕~~
3 正在写入~~
3 写入完毕~~
0 正在写入~~
0 写入完毕~~
4 正在写入~~
4 写入完毕~~
// 写是独享锁,读是共享锁,一起读
0 正在读取~~
1 正在读取~~
2 正在读取~~
4 正在读取~~
3 正在读取~~
1 读取完毕~~
0 读取完毕~~
3 读取完毕~~
2 读取完毕~~
4 读取完毕~~
14.4 读写锁的降级
写锁可以降级为读锁,但是读锁不能升级写锁,只有在读锁释放了以后才能再进行获取写锁的操作,然后进行写的操作。写的时候可以进行读,但是读的时候不能进行写
14.4.1 过程
- 获取到写锁
- 获取到读锁
- 释放写锁
- 释放读锁
15.阻塞队列
-
阻塞队列是一个共享队列,一边进行往队列中进行添加元素,一边进行往队列进行取出元素。
-
对于添加元素的线程来说,当阻塞队列中的空间为满的时候,它则不能再继续添加元素,这时线程的操作就会被阻塞,线程也就被阻塞了。
-
同理对于队列另一边的线程来说,当阻塞队列中的空间为空的时候,它则不能继续进行取出元素,这时线程的操作就会被阻塞,线程也就被阻塞了。
-
当有线程在阻塞状态,等待阻塞队列空间发生变化,当空间发生变化更新,阻塞队列会唤醒相应的线程。阻塞队列的优点就是,阻塞和唤醒线程,都进行了自动化,无需外界干扰。
15.1 阻塞队列的类型
- ArrayBlockingQueue:由数组结构组成的有界阻塞队列
- LinkedBlockingQueue:由链表结构组成的有界(但是默认值大小为Integer,MAX_VALUE)阻塞队列
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列
- DelayQueue:使用优先级队列支持延迟的无界阻塞队列
- SynchronousQueue:不存储元素的队列,也即单个元素的队列。没有容量,每一个put操作必须等待一个take操作,否则不能添加元素,反之亦然。(定制化,存储单个元素)
- LinkedTransferQueue:由链表结构组成的无界阻塞队列
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列
15.2 方法
15.3 实例演示
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 下面的演示方法都为一个共享队列
BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
// 第一组方法演示,这组会主动抛出异常
// System.out.println(blockingQueue.add("aa"));
// System.out.println(blockingQueue.add("aa"));
// System.out.println(blockingQueue.add("aa"));
// System.out.println(blockingQueue.add("aa"));
// 移除当前第一个元素
// System.out.println(blockingQueue.remove());
// blockingQueue.remove();
// blockingQueue.remove();
// blockingQueue.remove();
// 检查当前队列,并查看队列头部 System.out.println(blockingQueue.element());
/*Exception in thread "main" java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
at BlockingQueueDemo.main(BlockingQueueDemo.java:18)*/
// System.out.println("===============================");
// 第二组方法演示,会有返回时候加入成功
// System.out.println(blockingQueue.offer("bbb"));
// System.out.println(blockingQueue.offer("ccc"));
// System.out.println(blockingQueue.offer("ddd"));
/* true
false
false*/
// 取出当前队列中的值,如果取出的时候队列为空,则会返回null
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// 查看当前队列的第一个值,但是不取出
// System.out.println(blockingQueue.peek());
// System.out.println(blockingQueue.peek());
// System.out.println(blockingQueue.peek());
System.out.println("===============================");
// 演示第三组方法,满足相应的空间条件可以执行,否则线程进入阻塞等待状态
// 添加元素,如果为空则可以添加成功,如果满则无法进行添加,线程阻塞
// blockingQueue.put("tt1");
// blockingQueue.put("tt2");
// blockingQueue.put("tt3");
// 同理
// blockingQueue.take();
// blockingQueue.take();
// blockingQueue.take();
// 演示第四组方法,返回添加情况,设置超时等待,超过时间则退出
System.out.println(blockingQueue.offer("aa", 3, TimeUnit.SECONDS));
}
}
SynchronousQueue<>()
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> bq = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName()+ "放入第一个元素");
bq.put("1");
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+ "放入第二个元素");
bq.put("2");
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+ "放入第三个元素");
bq.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"线程一").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+" 等待两秒后取出元素");
TimeUnit.SECONDS.sleep(2);
bq.take();
System.out.println(Thread.currentThread().getName()+" 已取出第一个元素");
System.out.println(Thread.currentThread().getName()+" 等待两秒后取出第二个元素");
TimeUnit.SECONDS.sleep(2);
bq.take();
System.out.println(Thread.currentThread().getName()+" 已取出第二个元素");
System.out.println(Thread.currentThread().getName()+" 等待两秒后取出第三个元素");
TimeUnit.SECONDS.sleep(2);
bq.take();
System.out.println(Thread.currentThread().getName()+" 已取出第三个元素");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"线程二").start();
}
}
16.线程池
16.1 概述
16.2 实现方式
- 第一种通过Executors.newFixedThreadPool来创建大小并设定线程数量
- 第二种通过Executors.newSingleThreadExecutor创建拥有一个线程的线程池
- 第三种通过Executors.newCachedThreadPool创建一个自适应的线程池大小
16.2.1 实现演示
public class ThreadPoolDemo {
public static void main(String[] args) {
// 第一种通过Executors.newFixedThreadPool来创建大小并设定线程数量
ExecutorService threadPool1 = Executors.newFixedThreadPool(3);
// 第二种通过Executors.newSingleThreadExecutor创建拥有一个线程的线程池
ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
// 第三种通过Executors.newCachedThreadPool创建一个自适应的线程池大小
ExecutorService threadPool3 = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 20; i++) {
threadPool3.execute(() -> {
try {
System.out.println(Thread.currentThread().getName()+ " 在办理业务");
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+ " 业务办理完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
} finally {
// 处理完以后一定要关闭线程池
threadPool3.shutdown();
}
}
}
16.2.2 分析
通过源码分析这三种创建方式,都是通过ThreadPoolExecutor来实现的,现在我们来分析一下这个类
16.3 ThreadPoolExecutor类
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
16.3.1 参数解析
-
int corePoolSize
-
int maximumPoolSize
-
long keepAliveTime
-
TimeUnit unit
-
BlockingQueue workQueue
-
ThreadFactory threadFactory
-
RejectedExecutionHandler handle
16.3.2 工作流程和拒绝策略
16.3.3 工作流程
-
当有任务来的时候才会创建线程池
-
先用常驻线程来进行任务处理,当常驻线程都在进行任务处理,再到来的任务进入阻塞队列
-
当阻塞队列已经排满了,这时启用线程池中除常用线程以外的线程(线程池最大线程数 - 常驻线程)处理到来的任务
-
如果线程池中的已经达到最大线程数量在工作,继续有任务到来,则执行拒绝策略
16.3.4 拒绝策略
- 抛出异常,无法处理。== 》 出门右拐
- 谁安排你让到这个线程池处理的找谁。==》 管我屁事
- 将在阻塞队列中排的最久的任务抛弃,然后将新来 的任务加入阻塞队列中。 ==》 谁让你等
- 将所有在排队的任务中,无法处理的任务不处理也不抛出异常。 == 》 不理不睬
16.4 自定义线程池
一般不使用上面的方式创建线程池,而是自定义。
16.4.1 实例
public class NewThreadByDIY {
public static void main(String[] args) {
ExecutorService diyThread = new ThreadPoolExecutor(
2,
7,
3L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
try {
for (int i = 0; i < 20; i++) {
diyThread.execute(() -> {
try {
System.out.println(Thread.currentThread().getName()+ " 在办理业务");
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+ " 业务办理完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
} finally {
// 处理完以后一定要关闭线程池
diyThread.shutdown();
}
}
}
17.分支合并框架
17.1 概述
将大问题化为小问题,然后合并
17.2 实例
public class Task {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建对象
MyTask myTask = new MyTask(1, 100);
// 创建分支合并对象池
ForkJoinPool pool = new ForkJoinPool();
// 执行
ForkJoinTask<Integer> submit = pool.submit(myTask);
// 获取结果
Integer integer = submit.get();
System.out.println(integer);
// 关闭线程池
pool.shutdown();
}
}
class MyTask extends RecursiveTask<Integer> {
private static final int VALUE = 10;
private int begin;
private int end;
private int res;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
if ((end - begin) <= VALUE){
for (int i = begin; i <= end; i++) {
res = res + i;
}
} else {
int mid = (begin + end) / 2;
// 拆分左边
MyTask task1 = new MyTask(begin, mid);
// 拆分右边
MyTask task2 = new MyTask(mid+1, end);
task1.fork();
task2.fork();
// 合并
res = task1.join() + task2.join();
}
return res;
}
}
18.异步回调
18.1 同步回调
我们常用的一些请求都是同步回调的,同步回调是阻塞的,单个的线程需要等待结果的返回才能继续往下执行。
18.2 异步回调
有的时候,我们不希望程序在某个执行方法上一直阻塞,需要先执行后续的方法,那就是这里的异步回调。我们在调用一个方法时,如果执行时间比较长,我们可以传入一个回调的方法,当方法执行完时,让被调用者执行给定的回调方法。
18.3 两种创建方式
-
CompletableFuture.runAsync
-
CompletableFuture.supplyAsync
18.4 实例
public class ThreadDemo7 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 没有返回值
CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + "CompletableFuture1");
});
runAsync.get();
// 有返回值
CompletableFuture<Object> supplyAsync = CompletableFuture.supplyAsync(() -> {
return 77;
});
supplyAsync.whenComplete((u,t) -> {
// t打印返回值
// u打印异常信息
System.out.println(Thread.currentThread().getName() + t);
System.out.println(Thread.currentThread().getName() + u);
}).get();
}
}