0
点赞
收藏
分享

微信扫一扫

JUC并发编程(java.util .concurrent工具包)详解与实例演示

JUC 并发编程

概述

1.基本概念

1.1 进程和线程

1.1.1 定义
1.1.2 关系
1.1.3 区别
  1. 简而言之,一个程序至少有一个进程,一个进程至少有一个线程

  2. 线程的划分尺度小于进程,使得多线程程序的并发性高

  3. 另外,进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率

  4. 线程在执行过程中与进程还是有区别的。每个独立的进程有一个程序运行的入口、顺序执行序列和程序的出口。**但是线程不能够独立执行,**必须依存在应用程序中,由应用程序提供多个线程执行控制

  5. 从逻辑角度来看,多线程的意义在于一个应用程序中,有多个执行部分可以同时执行。但操作系统并没有将多个线程看做多个独立的应用,来实现进程的调度和管理以及资源分配。这就是进程和线程的重要区别

1.1.4 线程和进程在使用上各有优缺点

1.2 线程的状态

  1. 初始(NEW):新创建了一个线程对象,但还没有调用start()方法。

  2. 运行(RUNNABLE)

    • Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。

    • 就绪状态(RUNNABLE之READY)

      • 就绪状态只是说你资格运行,调度程序没有挑选到你,你就永远是就绪状态。

      • 调用线程的start()方法,此线程进入就绪状态。

      • 当前线程sleep()方法结束,其他线程join()结束,等待用户输入完毕,某个线程拿到对象锁,其他线程也将进入就绪状态。

      • 当前线程时间片用完了,调用当前线程的yield()方法,当前线程进入就绪状态。

      • 锁池里的线程拿到对象锁后,进入就绪状态。

    • 线程调度程序从可运行池中选择一个线程作为当前线程时,线程进入运行态。这也是线程进入运行状态的唯一的一种方式。

  3. 阻塞(BLOCKED)

  4. 等待(WAITING)

  5. 超时等待(TIMED_WAITING)

  6. 终止(TERMINATED)

    • 当线程的run()方法完成时,或者主线程的main()方法完成时,我们就认为它终止了。这个线程对象也许是活的,但是它已经不是一个单独执行的线程。线程一旦终止了,就不能复生。

    • 在一个终止的线程上调用start()方法,会抛出java.lang.IllegalThreadStateException异常。

这6种状态定义在Thread类的State枚举中,可查看源码进行一一对应。

1.2.1 wait和sleep

sleep()
wait()
1.2.1.1 区别

1.3 并发和并行

1.3.1 概述

并发是逻辑上的同时发生(simultaneous),而并行是物理上的同时发生。

1.3.2 并发(一同出发,一起争夺一个资源)

1.3.3 并行(一同出行)

1.4 管程

1.5 用户线程和守护线程

1.5.1 概述

1.5.2 设置

1.5.3 区别

2. 创建线程的方式

Java使用Thread类代表线程,所有的线程对象都必须是Thread类或其子类的实例。

2.1 Java可以用四种方式来创建线程

  • 继承Thread类创建线程
  • 实现Runnable接口创建线程
  • 使用Callable和Future创建线程
  • 使用线程池例如用Executor框架

2.1.1 继承Thread类创建线程

通过在类名上继承Thread并重写其中的run()方法来实现,在创建实例的时候调用xxx.start()方法来启动线程

2.1.2 实现Runnable接口创建线程

实现Runnable接口并重写其中的run()方法,创建实例以后通过放入线程来创建线程

2.1.2.1 实例
public class Main {
  public static void main(String[] args){undefined
    // 创建并启动线程,这里的myThread已经实现了Runnable接口并重写了run()方法
    MyThread2 myThread=new MyThread2();

    Thread thread=new Thread(myThread);

    thread().start();
    // 或者new Thread(new MyThread2()).start();
  }
}

2.1.3 使用Callable和Future创建线程

2.1.4 实现
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
//实现Callable接口
public class CallableTest {
 
    public static void main(String[] args) {
        //执行Callable 方式,需要FutureTask 实现实现,用于接收运算结果
        FutureTask<Integer> futureTask = new FutureTask<Integer>(new MyCallable());
        new Thread(futureTask).start();
        //接收线程运算后的结果
        try {
            Integer sum = futureTask.get();
            System.out.println(sum);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}
 
class MyCallable implements Callable<Integer> {
 
    @Override
    public Integer call() throws Exception {
        int sum = 0;
        for (int i = 0; i < 100; i++) {
            sum += i;
        }
        return sum;
    }
}

2.1.4 使用线程池例如用Executor框架

线程池提供了一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁额外开销,提升了响应速度。实现了线程复用。

2.1.4.1 实现
Callable<Singleton4> callable = new Callable<Singleton4>() {
            @Override
            public Singleton4 call() throws Exception {
                return Singleton4.getInstance();
            }
        };
		 // 创建一个线程池,并设定大小
        ExecutorService threadPool = Executors.newFixedThreadPool(2);
		 // 通过future来接收线程启动后返回的结果
        Future<Singleton4> f1 = threadPool.submit(callable);
        Future<Singleton4> f2 = threadPool.submit(callable);
		 // 获取返回值
        Singleton4 s6 = f1.get();
        Singleton4 s7 = f1.get();

        System.out.println(s6 == s7);

        threadPool.shutdown();
}

3.多线程编程步骤

  1. 创建资源类,编写属性和操作方法

  2. 在资源类设置操作方法

    • 判断
    • 执行
    • 通知
  3. 创建多个线程,调用资源类的操作方法

  4. 防止虚假唤醒,将在资源类操作方法的判断条件设置在while中

4. Synchronized

4.1 概述

Java自带的关键字,它最大的特征就是在同一时刻只有一个线程能够获得对象的监视器(monitor),从而进入到同步代码块或者同步方法之中,即表现为互斥性(排它性)。在发生异常的时候会自动释放锁。

4.2 实例

public class SaleTicket {
    public static void main(String[] args) {
        Ticket ticket = new Ticket();

        new Thread(() ->{
            for (int i = 0; i < 40; i++) {
                ticket.sale();
            }
        },"售票员1").start();

        new Thread(() ->{
            for (int i = 0; i < 40; i++) {
                ticket.sale();
            }
        },"售票员2").start();

        new Thread(() ->{
            for (int i = 0; i < 40; i++) {
                ticket.sale();
            }
        },"售票员3").start();
    }

}

class Ticket{
    private int num = 30;

    public synchronized void sale(){
        if (num > 0 ){
            // 获取当前执行的线程的名字
            System.out.println(Thread.currentThread().getName()+"售卖第"+(num--)+"票"+"还剩下"+num+"票");
        }
    }
}

5.LOCK

5.1 概述

Lock不是java中的关键字,通过实例化来获取,可以让用户自己手动的上锁和解锁,如果没有设定解锁方式,在发生异常时不能正常解锁,则会发生死锁现象。

5.2 实例

import java.util.concurrent.locks.ReentrantLock;

public class SaleTicketByLockDemo {
    public static void main(String[] args) {
        SaleTicketByLock ticket = new SaleTicketByLock();
        new Thread(() ->{
            for (int i = 0; i < 40; i++) {
                ticket.sale();
            }
        },"售票员1").start();

        new Thread(() ->{
            for (int i = 0; i < 40; i++) {
                ticket.sale();
            }
        },"售票员2").start();

        new Thread(() ->{
            for (int i = 0; i < 40; i++) {
                ticket.sale();
            }
        },"售票员3").start();
    }

}

class SaleTicketByLock{
    // 实现Lock
    ReentrantLock lock =new ReentrantLock();

    private int num = 30;
	 // 这里使用try finally 的方式来确保无论是否发生异常最后都能正确的关闭锁,以确保不会发生死锁的方式
    public synchronized void sale(){
        lock.lock();
        try {
            if (num > 0 ){
                System.out.println(Thread.currentThread().getName()+"售卖第"+(num--)+"票"+"还剩下"+num+"票");
            }
        } finally {
            lock.unlock();
        }

    }
}

6.Lock和Synchronized的区别

6.1 概述

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-blIZGmmo-1643884728905)(F:\StudyNotepad\img\image-20211101221416591.png)]

6.2 区别

  • 异常是否释放锁

  • 是否知道获取锁

  • Lock可以提高多个线程进行读操作的效率

  • 在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock的性能要远远优于synchronized。所以说,在具体使用时要根据适当情况选择

  • synchronized使用Object对象本身的wait 、notify、notifyAll调度机制,而Lock可以使用Condition进行线程之间的调度

  • lock可以通过condition来达到精确唤醒

6.3 volatile关键字

7.线程通信

7.1 用Synchronized实现

package syn;

/**
 * @Author PoX21s
 * @Date: 2021/11/1 17:01
 * @Version 1.0
 */

public class ThreadDemo1 {
    public static void main(String[] args) {
        share share = new share();

        new Thread(() ->{
            for (int i = 0; i < 10; i++) {
                try {
                    share.incr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"线程一").start();

        new Thread(() ->{
            for (int i = 0; i < 10; i++) {
                try {
                    share.decr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"线程二").start();
    }
}

class share{
    private int num = 0;

    public synchronized void incr() throws InterruptedException {
        if (num == 1){
            this.wait();
        }
        num++;
        System.out.println(Thread.currentThread().getName()+"::"+num);
        this.notifyAll();
    }

    public synchronized void decr() throws InterruptedException {
        if (num == 0){
            this.wait();
        }
        num--;
        System.out.println(Thread.currentThread().getName()+"::"+num);
        this.notifyAll();
    }
}

7.2 虚假唤醒问题

7.2.1 解决办法

将判断条件中的if改为while,在醒来以后继续判断。

7.3 用Lock实现

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Author PoX21s
 * @Date: 2021/11/1 17:01
 * @Version 1.0
 */

public class ThreadDemo2 {
    public static void main(String[] args) {
        share1 share = new share1();

        new Thread(() ->{
            for (int i = 0; i < 10; i++) {
                try {
                    share.incr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"线程一").start();

        new Thread(() ->{
            for (int i = 0; i < 10; i++) {
                try {
                    share.decr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"线程二").start();

        new Thread(() ->{
            for (int i = 0; i < 10; i++) {
                try {
                    share.incr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"线程三").start();

        new Thread(() ->{
            for (int i = 0; i < 10; i++) {
                try {
                    share.incr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"线程四").start();
    }
}

class share1 {
    private int num = 0;

    ReentrantLock lock = new ReentrantLock();
	
    // 通过lock中的condition实例来实现类似synchronized中的wait和notify
    private Condition condition = lock.newCondition();

    public void incr() throws InterruptedException {
        lock.lock();

        try {
            while (num == 1){
                condition.await();
            }
            num++;
            System.out.println(Thread.currentThread().getName()+"::"+num);
            condition.signalAll();

        } finally {
            lock.unlock();
        }

    }

    public void decr() throws InterruptedException {
        lock.lock();

        try {
            while (num == 0){
                condition.await();
            }
            num--;
            System.out.println(Thread.currentThread().getName()+"::"+num);
            condition.signalAll();

        } finally {
            lock.unlock();
        }
    }
}

8.线程定制化通信

按照规定的方式执行,执行完以后,更改标志位,唤醒下一个线程。使用Lock实现。

8.1 实现

package syn;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Author PoX21s
 * @Date: 2021/11/1 17:01
 * @Version 1.0
 */

public class ThreadDemo3 {
    public static void main(String[] args) {
        share2 share2 = new share2();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    share2.print5();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"aa").start();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    share2.print10();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"bb").start();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    share2.print15();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"cc").start();
    }


}

class share2 {
    // 设置标志位
    private int flag = 1;

    private ReentrantLock lock = new ReentrantLock();

    // 一个Lock对象中可以创建多个Condition实例(即对象监视器)
    private Condition c1 = lock.newCondition();
    private Condition c2 = lock.newCondition();
    private Condition c3 = lock.newCondition();

    public void print5() throws InterruptedException {
        lock.lock();

        try {
            while (flag != 1){
                c1.await();
            }
            for (int i = 0; i < 5; i++) {
                System.out.println(Thread.currentThread().getName()+" :: "+"打印第"+i);
            }
            flag = 2;
            c2.signal();
        } finally {
            lock.unlock();
        }
    }

    public void print10() throws InterruptedException {
        lock.lock();

        try {
            while (flag != 2){
                c2.await();
            }
            for (int i = 0; i < 10; i++) {
                System.out.println(Thread.currentThread().getName()+" :: "+"打印第"+i);
            }
            flag = 3;
            c3.signal();
        } finally {
            lock.unlock();
        }
    }

    public void print15() throws InterruptedException {
        lock.lock();

        try {
            while (flag != 3){
                c3.await();
            }
            for (int i = 0; i < 15; i++) {
                System.out.println(Thread.currentThread().getName()+" :: "+"打印第"+i);
            }
            flag = 1;
            c1.signal();
        } finally {
            lock.unlock();
        }
    }
}

9.集合中的线程安全

在多个线程同时读取和写入(并发修改)的时候可能会出现线程并发安全问题,下面通过三种方式来解决,推荐使用第三种。

9.1 ArryayList中的线程安全

9.1.1 通过Vector解决

9.1.1.1 实现
public class ThreadDemo4 {
    public static void main(String[] args) {
//        List<String> list = new ArrayList<>();
        // 通过改变创建list引用的对象来实现在添加时的线程安全
        List<String> list = new Vector<>();
        for (int i = 0; i < 30; i++) {
            new Thread(() -> {
                list.add(UUID.randomUUID().toString().substring(0,7));
                System.out.println(list);
            },String.valueOf(i)).start();
        }
    }
}

9.1.2 通过Collections中的synchronizedList方法解决

本质也是在add方法中增加了synchronized关键字。

9.1.2.1 实现
public class ThreadDemo4 {
    public static void main(String[] args) {
        // 本质通过在add方法上添加了线程同步关键字实现
        List<String> list = Collections.synchronizedList(new ArrayList<String>());
        for (int i = 0; i < 30; i++) {
            new Thread(() -> {
                list.add(UUID.randomUUID().toString().substring(0,7));
                System.out.println(list);
            },String.valueOf(i)).start();
        }
    }
}

9.1.3 通过CopyOnWriteArrayList<>()类来实现

9.1.3.1 实现
public class ThreadDemo4 {
    public static void main(String[] args) {
        // 写时复制技术,先复制出来一份,当前写完以后合并更新到原数组,后面的线程使用新的数组进行操作
        List<String> list = new CopyOnWriteArrayList<>();

        for (int i = 0; i < 30; i++) {
            new Thread(() -> {
                list.add(UUID.randomUUID().toString().substring(0,7));
                System.out.println(list);
            },String.valueOf(i)).start();
        }
    }
}

9.2 HashSet中的线程安全

9.2.1 通过Collections.synchronizedSet()方法

通过synchronized关键字实现。

9.2.1.1 实现
public class ThreadDemo5 {
    public static void main(String[] args) {
//        HashSet<String> set = new HashSet<>();
        Set<String> set;
        set = Collections.synchronizedSet(new HashSet<String>());
        for (int i = 0; i < 30; i++) {
            new Thread(() -> {
                set.add(UUID.randomUUID().toString().substring(0,6));
                System.out.println(set);
            },String.valueOf(i)).start();
        }
    }
}

9.3 HashMap中的线程安全

9.3.1 通过ConcurrentHashMap<>()解决

添加synchronized关键字代码块,实现线程同步。推荐使用,性能和HashMap差距不大。

9.3.1.1 实现
public class ThreadDemo5 {
    public static void main(String[] args) {
        Map<String, String> map = new ConcurrentHashMap<>();
        for (int i = 0; i < 30; i++) {
            String key = String.valueOf(i);
            new Thread(() -> {
                map.put(key,UUID.randomUUID().toString().substring(0,6));
                System.out.println(map);
            },String.valueOf(i)).start();
        }
    }
}

10.多线程锁

10.1 锁的几种情况

10.1.1 实例

class Person{
	public synchronized void test(){
      // ....
   }
   
   public static synchronized void test2(){
      // ....
   }
   
   public void test3(){
      // ....
   }
}

// 当只有单synchronized时锁的是p1实例对象
// 当在synchronized前面加了时,锁的是Person类
public class Test{
   // psvm
   Person p1 = new Person();
}

10.2 公平锁和非公平锁

10.2.1 非公平锁

  • 使用非公平锁,当前线程发现有空就会去使用,不会考虑其他线程

  • 如果占用失败,则会采用类似公平锁的方式,到队列后面

  • 效率高,但可能会造成单个线程做完了所有事,其他线程没事可干,一直陪跑

  • synchronized是非公平锁

  • lock默认是非公平,但是可以设置为公平锁 new reentrant(true)

10.2.2 公平锁

  • 使用公平锁,当前线程发现有空的时候,会询问其他线程是否有线程要使用,如果没有其他线程要使用,就自己用,如果有其他线程用,就排队。

  • 按照申请锁的顺序来执行

  • 效率相对于非公平锁会低,因为会有询问的过程,但是相对非公平锁来说,每一个线程都有执行的机会,不会出现单线程干完所有事的情况(线程饥饿问题)

10.3 可重入锁

  • synchronized(隐式的上锁和解锁交给JVM)和Lock(显式的上锁和解锁交给用户本身)都是可重入锁

  • 可重入锁又称为递归锁,当一把锁内部还有其他锁的时候,此时线程如果获得最外层的锁,那么内部的锁线程也可以随意进入,下面通过synchronized来演示

  • 最大的作用就是避免死锁,还是要注意lock的锁释放

10.3.1 实例

public class ThreadDemo6 {
    public static void main(String[] args) {
        Object o = new Object();

        new Thread(() -> {
            synchronized (o){
                System.out.println(Thread.currentThread().getName()+"外部的锁");
                synchronized (o) {
                    System.out.println(Thread.currentThread().getName()+"中间的锁");
                    synchronized (o){
                        System.out.println(Thread.currentThread().getName()+"内部的锁");
                    }
                }
            }
        },"线程一").start();
        
    }
}
10.3.1.1 注意

10.4 死锁

10.4.1 产生死锁的原因

  1. 系统资源不足
  2. 进程运行推进不顺利
  3. 系统资源分配不当

10.4.1.1 实例演示

public class DeadLock {
    static Object o1 = new Object();
    static Object o2 = new Object();

    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (o1){
                System.out.println(Thread.currentThread().getName()+"持有o1锁,试图获取o2锁");
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (o2){
                    System.out.println(Thread.currentThread().getName()+"试图获取o2锁");
                }
            }
        },"线程一").start();

        new Thread(() -> {
            synchronized (o2){
                System.out.println(Thread.currentThread().getName()+"持有o2锁,试图获取o1锁");
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (o1){
                    System.out.println(Thread.currentThread().getName()+"试图获取o2锁");
                }
            }
        },"线程二").start();
        
    }
}

// 输出结果
// 线程一持有o1锁,试图获取o2锁
// 线程二持有o2锁,试图获取o1锁
// (一直等待中)

10.4.2 验证死锁

可以通过java中自带的工具查询:jps和jstack

  1. jps -l ==> 找到当前运行类的java进程号
  2. jstack [java进程号] ==》 查询当前的运行情况
10.4.2.1 实例演示
PS F:\Java\JUC> jps -l
17044 sun.tools.jps.Jps
18692 org.jetbrains.jps.cmdline.Launcher
13612
21276 syn.DeadLock
PS F:\Java\JUC> jstack 21276
Found 1 deadlock. #返回当前为死锁状态

11.Callable接口

在jdk1.5以后新增的一种创建线程的方式,和Runnable的区别

  1. Callable可以有返回值
  2. Callable当返回值有问题时,会抛出异常,而Runnable不会抛出异常
  3. 实现方法名称不同,Runnable通过run()方法实现,Callable通过call()方法实现

11.1 创建一个实现Callable接口的线程

不能直接传递给Thread,Thread只能接收Runnable对象,因此为了让Thread能够接收Callable对象,要通过中间类来实现,这就是FutureTask类。

11.1.1 关于FutureTask

在不影响主线程的情况下,可以再开启其他线程来执行其他操作,这些线程执行完了以后,把结果返回给主线程,并且所有结果只会汇总一次

11.2 实例演示

class Demo implements Callable{
    @Override
    public Object call() throws Exception {
        return 1+2+3;
    }
}

public class CallableDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Demo c1 = new Demo();

        FutureTask<Integer> task1 = new FutureTask<Integer>(c1);
        FutureTask<Integer> task2 = new FutureTask<Integer>(() -> {
            return 1024;
        });

        new Thread(task1,"线程一").start();
        new Thread(task2,"线程二").start();

        // 等待得到返回结果
        while (!task1.isDone()){
            System.out.println("等待返回结果");
        }

        System.out.println("结果:");
        // 当得到一次结果以后,以后的get()则会直接调取结果,不用再进行运算
        // 即只用汇总一次
        System.out.println(task1.get());
        System.out.println(task1.get());
        // 当所有的新开起的线程都结束了,主线程才会结束
        System.out.println(task2.get());

        System.out.println(Thread.currentThread().getName()+"over");

    }

}

12.JUC强大辅助类 – 线程同步工具

12.1 CountDownLatch

可以设定初始值,当这个类中的count属性不为0的时候,会让其他线程进入阻塞状态,调用await()方法,只有减为0才会唤醒其他线程往下执行,也就是解除await()方法只能是count减为0的时候

12.1.1 实例演示

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch count = new CountDownLatch(6);
        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName()+ " 号同学离开教室");
                count.countDown();
            },String.valueOf(i)).start();
        }
        count.await();
        System.out.println(Thread.currentThread().getName()+" 关闭教室门");
    }
}

12.2 CyclicBarrier

设定一组现成的额个数,多个线程开始执行,当开始执行的线程达到设定的一组的个数时,这一组的线程同时开始执行。没有达到数量时,已经开始执行的线程处于阻塞状态,等待其他线程的到来,然后再一起执行。

12.2.1 实例演示

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(6, new Runnable() {
            @Override
            public void run() {
                System.out.println("已经集齐了7颗龙珠,召唤神龙");
            }
        });

        for (int i = 0; i < 7; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName()+ " 号龙珠被得到");
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }
    }

}

12.3 Semaphore

设定许可的数量,当一个线程获取到了许可,则可以执行方法,其他开启的线程没有获得许可,则进入等待状态,当有线程释放许可以后,等待的线程争夺许可,获得了许可执行,没有获得的继续等待。

12.3.1 实例演示

public class SemaphoreDemo {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+" 号获取到车位");
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(Thread.currentThread().getName()+" 离开车位====》");
                    semaphore.release();
                }
            },String.valueOf(i)).start();
        }
    }
}

13.一些锁的概念

13.1 悲观锁

每次进行操作都会进行锁上,操作完以后才会解锁,当一个线程在执行的时候,其他线程都处于等待阻塞状态

13.2 乐观锁

线程在执行的时候不会将要处理的资源锁上,会将资源复制一份,然后再复制的资源上进行操作,操作完以后提交到原有的数据上进行更新操作。但是在提交的时候,要进行比对判断,判断复制的原内容和现在的情况是否相同 ==》例如:现在有两个线程要操作同一个资源,它们在操作资源的时候都会先进行复制,然后在复制的资源上进行操作,但是现在线程一先处理完,提交更新了,这时的原资源已经发生了改变,当其他线程提交的时候,发现自己最开始复制的内容,和现在的资源内容不相同,则会提交失败,而是继续复制当前的内容,然后再提交判断。

13.3 表锁

在线程处理资源的时候,会将整个数据表都锁上,不允许其他线程进行操作,不会发生死锁现象。

13.4 行锁

在线程处理资源时,只会将要处理的一行进行上锁,表中的其他行是允许其他线程进行操作的,会出现死锁现象,即两个或两个以上的线程互相等待对方释放锁,而导致的多线程间互相等待的情况。

13.5 读锁

在读的时候,允许多个线程一起读,共享锁,会发生死锁现象 ==》 例如:多个线程都在读取同一行资源,但是互相之间都想要修改,则都在互相等待对方读取完毕然后进行修改的操作,因为都想修改,则会导致互相等待的情况。

13.6 写锁

在写的时候,不允许多个线程共同操作,独享锁,但也会发生死锁现象 == 》 例如:多个线程都在修改不同的资源,但是都又想操作对方的资源,而出现互相等待死锁的情况。

14.读写锁

读的时候可以共享,但是写的时候只能有一个线程进行操作。但是不能同时存在读写操作,读写是互斥的,读读是共享的。

14.1 优点

比其synchronized和lock独占锁的情况,可以提升读的性能

14.2 缺点

  1. 会造成线程饥饿,也就是说会导致有线程一直在读,进而无法进行写的操作
  2. 写的时候不能读,一直等待,写的时候可以读

14.3 读写锁的实例

public class ReadWriteLockDemo {
    public static void main(String[] args) {
        ContextDemo context = new ContextDemo();

        for (int i = 0; i < 5; i++) {
            final int num = i;
            new Thread(() -> {
                try {
                    context.put(num+"",num+"");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }

        for (int i = 0; i < 5; i++) {
            final int num = i;
            new Thread(() -> {
                try {
                    context.get(num+"");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }

    }
}

class ContextDemo {
    private volatile Map<String,Object> map = new HashMap<>();
    private ReadWriteLock lock = new ReentrantReadWriteLock();

    public void put(String key, Object o) throws InterruptedException {
        lock.writeLock().lock();
        System.out.println(Thread.currentThread().getName()+" 正在写入~~");
       
        TimeUnit.MICROSECONDS.sleep(300);
       
        map.put(key,o);
        System.out.println(Thread.currentThread().getName()+" 写入完毕~~");
        lock.writeLock().unlock();
    }

    public Object get(String key) throws InterruptedException {
        Object res = null;
        lock.readLock().lock();
        System.out.println(Thread.currentThread().getName()+" 正在读取~~");
       
        TimeUnit.MICROSECONDS.sleep(300);
       
        res = map.get(key);
        System.out.println(Thread.currentThread().getName()+" 读取完毕~~");
        lock.readLock().unlock();
       
        return res;
    }
}

// 结果
1 正在写入~~
1 写入完毕~~
2 正在写入~~
2 写入完毕~~
3 正在写入~~
3 写入完毕~~
0 正在写入~~
0 写入完毕~~
4 正在写入~~
4 写入完毕~~
// 写是独享锁,读是共享锁,一起读
0 正在读取~~
1 正在读取~~
2 正在读取~~
4 正在读取~~
3 正在读取~~
1 读取完毕~~
0 读取完毕~~
3 读取完毕~~
2 读取完毕~~
4 读取完毕~~

14.4 读写锁的降级

写锁可以降级为读锁,但是读锁不能升级写锁,只有在读锁释放了以后才能再进行获取写锁的操作,然后进行写的操作。写的时候可以进行读,但是读的时候不能进行写

14.4.1 过程

  1. 获取到写锁
  2. 获取到读锁
  3. 释放写锁
  4. 释放读锁

15.阻塞队列

  • 阻塞队列是一个共享队列,一边进行往队列中进行添加元素,一边进行往队列进行取出元素。

  • 对于添加元素的线程来说,当阻塞队列中的空间为满的时候,它则不能再继续添加元素,这时线程的操作就会被阻塞,线程也就被阻塞了。

  • 同理对于队列另一边的线程来说,当阻塞队列中的空间为空的时候,它则不能继续进行取出元素,这时线程的操作就会被阻塞,线程也就被阻塞了。

  • 当有线程在阻塞状态,等待阻塞队列空间发生变化,当空间发生变化更新,阻塞队列会唤醒相应的线程。阻塞队列的优点就是,阻塞和唤醒线程,都进行了自动化,无需外界干扰。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EfxmopPK-1643884728908)(F:\StudyNotepad\img\image-20211103110017441.png)]

15.1 阻塞队列的类型

  • ArrayBlockingQueue:由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue:由链表结构组成的有界(但是默认值大小为Integer,MAX_VALUE)阻塞队列
  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列
  • DelayQueue:使用优先级队列支持延迟的无界阻塞队列
  • SynchronousQueue:不存储元素的队列,也即单个元素的队列。没有容量,每一个put操作必须等待一个take操作,否则不能添加元素,反之亦然。(定制化,存储单个元素)
  • LinkedTransferQueue:由链表结构组成的无界阻塞队列
  • LinkedBlockingDeque:由链表结构组成的双向阻塞队列

15.2 方法

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-edGuBiOu-1643884728909)(F:\StudyNotepad\img\image-20211103112315084.png)]

15.3 实例演示

public class BlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        // 下面的演示方法都为一个共享队列
        BlockingQueue blockingQueue = new ArrayBlockingQueue(3);

        // 第一组方法演示,这组会主动抛出异常
//        System.out.println(blockingQueue.add("aa"));
//        System.out.println(blockingQueue.add("aa"));
//        System.out.println(blockingQueue.add("aa"));
//        System.out.println(blockingQueue.add("aa"));
        // 移除当前第一个元素
//        System.out.println(blockingQueue.remove());
//        blockingQueue.remove();
//        blockingQueue.remove();
//        blockingQueue.remove();
        // 检查当前队列,并查看队列头部      System.out.println(blockingQueue.element());
       
        /*Exception in thread "main" java.lang.IllegalStateException: Queue full
        at java.util.AbstractQueue.add(AbstractQueue.java:98)
        at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
        at BlockingQueueDemo.main(BlockingQueueDemo.java:18)*/
//        System.out.println("===============================");

        // 第二组方法演示,会有返回时候加入成功
//        System.out.println(blockingQueue.offer("bbb"));
//        System.out.println(blockingQueue.offer("ccc"));
//        System.out.println(blockingQueue.offer("ddd"));
        /* true
        false
        false*/
        // 取出当前队列中的值,如果取出的时候队列为空,则会返回null
//        System.out.println(blockingQueue.poll());
//        System.out.println(blockingQueue.poll());
//        System.out.println(blockingQueue.poll());
//        System.out.println(blockingQueue.poll());
        // 查看当前队列的第一个值,但是不取出
//        System.out.println(blockingQueue.peek());
//        System.out.println(blockingQueue.peek());
//        System.out.println(blockingQueue.peek());

        System.out.println("===============================");

        // 演示第三组方法,满足相应的空间条件可以执行,否则线程进入阻塞等待状态
        // 添加元素,如果为空则可以添加成功,如果满则无法进行添加,线程阻塞
//        blockingQueue.put("tt1");
//        blockingQueue.put("tt2");
//        blockingQueue.put("tt3");
        // 同理
//        blockingQueue.take();
//        blockingQueue.take();
//        blockingQueue.take();

        // 演示第四组方法,返回添加情况,设置超时等待,超过时间则退出
        System.out.println(blockingQueue.offer("aa", 3, TimeUnit.SECONDS));
    }
}

SynchronousQueue<>()

public class SynchronousQueueDemo {
    public static void main(String[] args) {
        BlockingQueue<String> bq = new SynchronousQueue<>();

        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName()+ "放入第一个元素");
                bq.put("1");
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName()+ "放入第二个元素");
                bq.put("2");
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName()+ "放入第三个元素");
                bq.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"线程一").start();

        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName()+" 等待两秒后取出元素");
                TimeUnit.SECONDS.sleep(2);
                bq.take();
                System.out.println(Thread.currentThread().getName()+" 已取出第一个元素");
                System.out.println(Thread.currentThread().getName()+" 等待两秒后取出第二个元素");
                TimeUnit.SECONDS.sleep(2);
                bq.take();
                System.out.println(Thread.currentThread().getName()+" 已取出第二个元素");
                System.out.println(Thread.currentThread().getName()+" 等待两秒后取出第三个元素");
                TimeUnit.SECONDS.sleep(2);
                bq.take();
                System.out.println(Thread.currentThread().getName()+" 已取出第三个元素");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"线程二").start();

    }
}

16.线程池

16.1 概述

16.2 实现方式

  1. 第一种通过Executors.newFixedThreadPool来创建大小并设定线程数量
  2. 第二种通过Executors.newSingleThreadExecutor创建拥有一个线程的线程池
  3. 第三种通过Executors.newCachedThreadPool创建一个自适应的线程池大小

16.2.1 实现演示

public class ThreadPoolDemo {
    public static void main(String[] args) {
        // 第一种通过Executors.newFixedThreadPool来创建大小并设定线程数量
        ExecutorService threadPool1 = Executors.newFixedThreadPool(3);

        // 第二种通过Executors.newSingleThreadExecutor创建拥有一个线程的线程池
        ExecutorService threadPool2 = Executors.newSingleThreadExecutor();

        // 第三种通过Executors.newCachedThreadPool创建一个自适应的线程池大小
        ExecutorService threadPool3 = Executors.newCachedThreadPool();

        try {
            for (int i = 0; i < 20; i++) {
                threadPool3.execute(() -> {
                    try {
                        System.out.println(Thread.currentThread().getName()+ " 在办理业务");
                        TimeUnit.SECONDS.sleep(1);
                        System.out.println(Thread.currentThread().getName()+ " 业务办理完成");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
        } finally {
            // 处理完以后一定要关闭线程池
            threadPool3.shutdown();
        }
        
    }
}

16.2.2 分析

通过源码分析这三种创建方式,都是通过ThreadPoolExecutor来实现的,现在我们来分析一下这个类

16.3 ThreadPoolExecutor类

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

16.3.1 参数解析

  • int corePoolSize

  • int maximumPoolSize

  • long keepAliveTime

  • TimeUnit unit

  • BlockingQueue workQueue

  • ThreadFactory threadFactory

  • RejectedExecutionHandler handle

16.3.2 工作流程和拒绝策略

16.3.3 工作流程

  1. 当有任务来的时候才会创建线程池

  2. 先用常驻线程来进行任务处理,当常驻线程都在进行任务处理,再到来的任务进入阻塞队列

  3. 当阻塞队列已经排满了,这时启用线程池中除常用线程以外的线程(线程池最大线程数 - 常驻线程)处理到来的任务

  4. 如果线程池中的已经达到最大线程数量在工作,继续有任务到来,则执行拒绝策略

16.3.4 拒绝策略

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jjQV3tJn-1643884728910)(F:\StudyNotepad\img\image-20211103123819897.png)]

  1. 抛出异常,无法处理。== 》 出门右拐
  2. 谁安排你让到这个线程池处理的找谁。==》 管我屁事
  3. 将在阻塞队列中排的最久的任务抛弃,然后将新来 的任务加入阻塞队列中。 ==》 谁让你等
  4. 将所有在排队的任务中,无法处理的任务不处理也不抛出异常。 == 》 不理不睬

16.4 自定义线程池

一般不使用上面的方式创建线程池,而是自定义。

16.4.1 实例

public class NewThreadByDIY {
    public static void main(String[] args) {
        ExecutorService diyThread = new ThreadPoolExecutor(
                2,
                7,
                3L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );

        try {
            for (int i = 0; i < 20; i++) {
                diyThread.execute(() -> {
                    try {
                        System.out.println(Thread.currentThread().getName()+ " 在办理业务");
                        TimeUnit.SECONDS.sleep(1);
                        System.out.println(Thread.currentThread().getName()+ " 业务办理完成");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
        } finally {
            // 处理完以后一定要关闭线程池
            diyThread.shutdown();
        }
    }
}

17.分支合并框架

17.1 概述

将大问题化为小问题,然后合并

17.2 实例

public class Task {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建对象
        MyTask myTask = new MyTask(1, 100);
        // 创建分支合并对象池
        ForkJoinPool pool = new ForkJoinPool();
        // 执行
        ForkJoinTask<Integer> submit = pool.submit(myTask);
        // 获取结果
        Integer integer = submit.get();
        System.out.println(integer);
        // 关闭线程池
        pool.shutdown();
    }
}

class MyTask extends RecursiveTask<Integer> {
    private static final int VALUE = 10;
    private int begin;
    private int end;
    private int res;

    public MyTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if ((end - begin) <= VALUE){
            for (int i = begin; i <= end; i++) {
                res = res + i;
            }
        } else {
            int mid = (begin + end) / 2;
            // 拆分左边
            MyTask task1 = new MyTask(begin, mid);
            // 拆分右边
            MyTask task2 = new MyTask(mid+1, end);
            task1.fork();
            task2.fork();
            // 合并
            res = task1.join() + task2.join();
        }

        return res;
    }
}

18.异步回调

18.1 同步回调

我们常用的一些请求都是同步回调的,同步回调是阻塞的,单个的线程需要等待结果的返回才能继续往下执行。

18.2 异步回调

有的时候,我们不希望程序在某个执行方法上一直阻塞,需要先执行后续的方法,那就是这里的异步回调。我们在调用一个方法时,如果执行时间比较长,我们可以传入一个回调的方法,当方法执行完时,让被调用者执行给定的回调方法。

18.3 两种创建方式

  1. CompletableFuture.runAsync

  2. CompletableFuture.supplyAsync

18.4 实例

public class ThreadDemo7 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 没有返回值
        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "CompletableFuture1");
        });

        runAsync.get();

        // 有返回值
        CompletableFuture<Object> supplyAsync = CompletableFuture.supplyAsync(() -> {
            return 77;
        });

        supplyAsync.whenComplete((u,t) -> {
            // t打印返回值
            // u打印异常信息
            System.out.println(Thread.currentThread().getName() + t);
            System.out.println(Thread.currentThread().getName() + u);
        }).get();
    }
}
举报

相关推荐

java.util包

0 条评论