0
点赞
收藏
分享

微信扫一扫

6、共享模式之无锁(上),CAS,原子类

小a草 2022-03-12 阅读 38

本章内容

  • CAS 与 volatile
  • 原子整数
  • 原子引用
  • 原子累加器
  • Unsafe

1、问题的提出

有如下需求,保证accont.withdraw取款方法的线程安全

public interface Account {

    // 获取余额
    Integer getBalance();

    // 取款
    void withdraw(Integer amount);

    /**
     * 方法内启动1000个线程,那么每个线程做-10的操作
     * 如果初始余额为10000 那么正确结果应该是0
     *
     * @param account
     */
    static void demo(Account account) {
        List<Thread> threads = new ArrayList<>();
        long start = System.nanoTime();
        for (int i = 0; i < 1000; i++) {
            threads.add(new Thread(() -> {
                account.withdraw(10);
            }));
        }
        threads.forEach(Thread::start);
        threads.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.nanoTime();
        System.out.println(account.getBalance() + " cost :" + (end - start) / 1000_000 + "ms");
    }

}

原实现并不是线程安全的

public class AccountUnsafe implements Account{

    private Integer balance;

    public AccountUnsafe(Integer balance) {
        this.balance = balance;
    }

    @Override
    public Integer getBalance() {
        return this.balance;
    }

    @Override
    public void withdraw(Integer amount) {
        balance -= amount;
    }
}

测试

Account.demo(new AccountUnsafe(10000));

输出

90 cost :185ms

为什么是不安全的

withdraw方法

public void withdraw(Integer amount) {
 	balance -= amount;
}

对应字节码

ALOAD 0 // <- this
ALOAD 0
GETFIELD cn/itcast/AccountUnsafe.balance : Ljava/lang/Integer; // <- this.balance
INVOKEVIRTUAL java/lang/Integer.intValue ()I // 拆箱
ALOAD 1 // <- amount
INVOKEVIRTUAL java/lang/Integer.intValue ()I // 拆箱
ISUB // 减法
INVOKESTATIC java/lang/Integer.valueOf (I)Ljava/lang/Integer; // 结果装箱
PUTFIELD cn/itcast/AccountUnsafe.balance : Ljava/lang/Integer; // -> this.balance

多线程执行流程

ALOAD 0 // thread-0 <- this 
ALOAD 0 
GETFIELD cn/itcast/AccountUnsafe.balance // thread-0 <- this.balance 
INVOKEVIRTUAL java/lang/Integer.intValue // thread-0 拆箱
ALOAD 1 // thread-0 <- amount 
INVOKEVIRTUAL java/lang/Integer.intValue // thread-0 拆箱
ISUB // thread-0 减法
INVOKESTATIC java/lang/Integer.valueOf // thread-0 结果装箱
PUTFIELD cn/itcast/AccountUnsafe.balance // thread-0 -> this.balance 
 
 
ALOAD 0 // thread-1 <- this 
ALOAD 0 
GETFIELD cn/itcast/AccountUnsafe.balance // thread-1 <- this.balance 
INVOKEVIRTUAL java/lang/Integer.intValue // thread-1 拆箱
ALOAD 1 // thread-1 <- amount 
INVOKEVIRTUAL java/lang/Integer.intValue // thread-1 拆箱
ISUB // thread-1 减法
INVOKESTATIC java/lang/Integer.valueOf // thread-1 结果装箱
PUTFIELD cn/itcast/AccountUnsafe.balance // thread-1 -> this.balance
  • 单核的指令交错
  • 多核的指令交错

解决思路 - 锁

首先想到的是给Account对象加锁

public class AccountUnsafe implements Account{

    private Integer balance;

    public AccountUnsafe(Integer balance) {
        this.balance = balance;
    }

    @Override
    public synchronized Integer getBalance() {
        return this.balance;
    }

    @Override
    public synchronized void withdraw(Integer amount) {
        balance -= amount;
    }
}

输出

0 cost :231ms

解决思路 - 无锁

public class AccountSafe implements Account{

    private AtomicInteger balance;

    public AccountSafe(Integer balance) {
        this.balance = new AtomicInteger(balance);
    }

    @Override
    public Integer getBalance() {
        return balance.get();
    }

    @Override
    public void withdraw(Integer amount) {
        while (true) {
            int prev = balance.get();
            int next = prev - amount;
            if (balance.compareAndSet(prev, next))
                break;
        }
        // 可以简化为下面的方法
//        balance.addAndGet(-1 * amount);
    }
}

测试

Account.demo(new AccountSafe(10000));

输出

0 cost :177ms

2、CAS与volatile

前面看到的 AtomicInteger 的解决方法,内部并没有用到锁来保护共享变量的线程安全。那么它是如何实现的呢?

public void withdraw(Integer amount) {
        // 不断尝试
        while (true) {
            // 拿到了旧值1000
            int prev = balance.get();
            // 在这个基础上1000-10=990
            int next = prev - amount;
            /**
             * compareAndSet正式做这个检查,在set前,先比较prev与当前值
             * - 不一致了,next作废,返回false表示失败
             *      比如,别的线程已经做了减法,当前值已经被减成了990
             *      那么本线程的这次990就作废,进入while下次循环重试
             * - 一致,以next为新值,返回true表示成功
             */
            if (balance.compareAndSet(prev, next))
                break;
        }
        // 可以简化为下面的方法
//        balance.addAndGet(-1 * amount);
    }

其中关键是compareAndSet, 它就是简称的CAS (也有Compare And Swap的说法),它必须是原子操作。
在这里插入图片描述

慢动作分析

@Slf4j(topic = "c.SlowMotion")
public class SlowMotion {

    public static void main(String[] args) {
        AtomicInteger balance = new AtomicInteger(10000);
        int mainPrev = balance.get();
        log.debug("try get : {}", mainPrev);
        new Thread(() -> {
            sleep(1000);
            int prev = balance.get();
            balance.compareAndSet(prev, 9000);
            log.debug(balance.toString());
        }, "t1").start();
        sleep(2000);
        log.debug("try set 8000");
        boolean isSuccess = balance.compareAndSet(mainPrev, 8000);
        log.debug("is Success? {}" , isSuccess);
        if (!isSuccess) {
            mainPrev = balance.get();
            log.debug("try set 8000");
             isSuccess = balance.compareAndSet(mainPrev, 8000);
            log.debug("is Success? {}" , isSuccess);
        }
    }
    
    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

输出

2022/03/09-02:14:37.774 [main] c.SlowMotion - try get : 10000
2022/03/09-02:14:38.842 [t1] c.SlowMotion - 9000
2022/03/09-02:14:39.843 [main] c.SlowMotion - try set 8000
2022/03/09-02:14:39.843 [main] c.SlowMotion - is Success? false
2022/03/09-02:14:39.843 [main] c.SlowMotion - try set 8000
2022/03/09-02:14:39.843 [main] c.SlowMotion - is Success? true

volatile

获取共享变量时,为了保证该变量的可见性,需要使用volatile修饰。

它可以用来修饰成员变量和静态成员变量,它可以避免线程从自己的工作缓存总查找变量的值,必须从主存中获取它的值,线程操作volatile变量都是直接操作主存。即一个线程对volatile变量的修改,对另一个线程可见。

CAS必须借助volatile才能读取到共享变量的最新值来实现【比较并交换的效果】

为什么无锁效率高

  • 无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而synchronized会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。打个比喻
  • 线程就好像高速跑道上的赛车,高速运行,速度超快,一旦发生上下文切换,就好比火车要减速、熄火,等被唤醒又要重新打火、启动、加速,恢复到高速运行,代价比较大
  • 但无锁情况下,因为线程要保持运行,需要额外CPU的支持,CPU在这里就好比高速跑道,没有额外的跑道,线程想要高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。
    在这里插入图片描述

CAS的特点

结合CAS和volatile可以实现无所并发,适用于线程数少,多核CPU的场景下。

  • CAS是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,及时改了也没关系,我吃点亏再重试呗。
  • synchronized是基于悲观锁的思想:最悲观的估计,得防着其他线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会
  • CAS体现的是无锁并发、无阻塞并发,请仔细思考这两句话的意思
    • 因为没有使用synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
    • 但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受到影响

3、原子整数

JUC并发包提供了

  • AtomicBoolean
  • AtomicInteger
  • AtomicLong

以AtomicInteger为例

public static void main(String[] args) {
        AtomicInteger i = new AtomicInteger(0);

        // 类似i++
        System.out.println(i.getAndIncrement());

        // 类似++i
        System.out.println(i.incrementAndGet());

        // 类似--i
        System.out.println(i.decrementAndGet());

        // 类似i--
        System.out.println(i.getAndDecrement());

        // 获取结果并加值
        System.out.println(i.getAndAdd(5));

        // 加值并获取结果
        System.out.println(i.addAndGet(-5));

        // 获取并更新值,p为i的主值
        System.out.println(i.getAndUpdate(p -> p - 2));

        // 更新并获取
        System.out.println(i.updateAndGet(p -> p + 2));

        // 获取并计算
        System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));

        // 计算并获取
        System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
        
    }

4、原子引用

为什么要使用原子引用类型?

  • AtomicReference
  • AtomICMarkableReference
  • AtomicStampedReference

有如下方法

public interface DecimalAccount {

    BigDecimal getBalance();

    // 取款
    void withdraw(BigDecimal amount);

    /**
     * 方法内启动1000个线程,那么每个线程做-10的操作
     * 如果初始余额为10000 那么正确结果应该是0
     *
     * @param account
     */
    static void demo(DecimalAccount account) {
        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            threads.add(new Thread(() -> {
                account.withdraw(BigDecimal.TEN);
            }));
        }
        threads.forEach(Thread::start);
        threads.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(account.getBalance());
    }

}

试着提供不同的DecimalAccount实现,实现安全的取款操作

不安全实现

class DecimalAccountUnsafe implements DecimalAccount {
    BigDecimal balance;

    public DecimalAccountUnsafe(BigDecimal balance) {
        this.balance = balance;
    }

    @Override
    public BigDecimal getBalance() {
        return balance;
    }

    @Override
    public void withdraw(BigDecimal amount) {
        BigDecimal balance = this.getBalance();
        this.balance = balance.subtract(amount);
    }
}

安全实现 - 使用锁

class DecimalAccountSafeLock implements DecimalAccount {
    private final Object lock = new Object();
    BigDecimal balance;

    public DecimalAccountSafeLock(BigDecimal balance) {
        this.balance = balance;
    }

    @Override
    public BigDecimal getBalance() {
        return balance;
    }

    @Override
    public void withdraw(BigDecimal amount) {
        synchronized (lock) {
            BigDecimal balance = this.getBalance();
            this.balance = balance.subtract(amount);
        }
    }
}

安全实现 - 使用CAS

public class DecimalAccountSafeCas implements DecimalAccount {

    AtomicReference<BigDecimal> balance;

    public DecimalAccountSafeCas(BigDecimal balance) {
        this.balance = new AtomicReference<>(balance);
    }

    @Override
    public BigDecimal getBalance() {
        return this.balance.get();
    }

    @Override
    public void withdraw(BigDecimal amount) {
        while (true) {
            BigDecimal prev = this.balance.get();
            BigDecimal next = prev.subtract(prev);
            if (balance.compareAndSet(prev, next))
                break;
        }
    }
}

测试

DecimalAccount.demo(new DecimalAccountUnsafe(BigDecimal.valueOf(10000)));
        DecimalAccount.demo(new DecimalAccountSafeLock(BigDecimal.valueOf(10000)));
        DecimalAccount.demo(new DecimalAccountSafeCas(BigDecimal.valueOf(10000)));

输出

340
0
0

ABA 问题及解决

	static AtomicReference<String> ref = new AtomicReference<>("A");

    public static void main(String[] args) throws InterruptedException {
        log.debug("main start...");
        // 获取值 A
        // 这个共享变量被它线程修改过?
        String prev = ref.get();
        other();
        Thread.sleep(1000);
        // 尝试改为 C
        log.debug("change A->C {}", ref.compareAndSet(prev, "C"));
    }

    private static void other() throws InterruptedException {
        new Thread(() -> {
            log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B"));
        }, "t1").start();
        Thread.sleep(500);
        new Thread(() -> {
            log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A"));
        }, "t2").start();
    }

输出

2022/03/09-23:27:35.350 [main] c.Test4 - main start...
2022/03/09-23:27:35.384 [t1] c.Test4 - change A->B true
2022/03/09-23:27:35.889 [t2] c.Test4 - change B->A true
2022/03/09-23:27:36.903 [main] c.Test4 - change A->C true

主线程仅能判断出共享变量的值和最初值A相同,不能感知到这种A改变为B又改回A的情况,如果主线程希望:

只要有其他线程【动过了】共享变量,那么自己的cas就算失败,这时,仅比较值是不够的,需要再加一个版本号

AtomicStampedReference

static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);

    public static void main(String[] args) throws InterruptedException {
        log.debug("main start...");
        // 获取值 A
        // 这个共享变量被它线程修改过?
        String prev = ref.getReference();
        Integer stamp = ref.getStamp();
        other();
        Thread.sleep(1000);
        // 尝试改为 C
        log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
    }

    private static void other() throws InterruptedException {
        new Thread(() -> {
            log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B", ref.getStamp(), ref.getStamp() + 1));
        }, "t1").start();
        Thread.sleep(500);
        new Thread(() -> {
            log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A", ref.getStamp(), ref.getStamp() + 1));
        }, "t2").start();
    }

输出

2022/03/09-23:32:21.802 [main] c.Test4 - main start...
2022/03/09-23:32:21.842 [t1] c.Test4 - change A->B true
2022/03/09-23:32:22.345 [t2] c.Test4 - change B->A true
2022/03/09-23:32:23.348 [main] c.Test4 - change A->C false

AtomicStampedReference可以给原子引用加上版本号,追踪原子引用整个的变化过程,如:A -> B -> A -> C,通过AtomicStampedReference,我们可以知道,引用变量中途被修改了几次。

但是有时候,并不关心变量更改了几次,只是单纯的关心是否更改过,所以就有了AtomicMarkableReference

在这里插入图片描述
AtomicMarkableReference

public class GarbageBag {
    String desc;

    public GarbageBag(String desc) {
        this.desc = desc;
    }


    public void setDesc(String desc) {
        this.desc = desc;
    }

    @Override
    public String toString() {
        return "GarbageBag{" +
                "desc='" + desc + '\'' +
                '}';
    }
}
public static void main(String[] args) throws InterruptedException {
        GarbageBag bag = new GarbageBag("装满垃圾");
        // 参数2 mark 可以看做一个标记,表示垃圾袋装满
        AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);

        log.debug("主线程 start");
        GarbageBag prev = ref.getReference();
        log.debug(prev.toString());

        new Thread(() -> {
            log.debug("打扫卫生线程...");
            bag.setDesc("倒空垃圾袋");
            while (!ref.compareAndSet(bag, bag, true, false)) {
            }
            log.debug(bag.toString());
        }).start();

        Thread.sleep(1000);
        log.debug("主线程想换一只新的垃圾袋");
        boolean isSuccess = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
        log.debug("换了么?{}", isSuccess);

        log.debug(ref.getReference().toString());

    }

输出

2022/03/09-23:47:54.873 [main] c.Test5 - 主线程 start
2022/03/09-23:47:54.874 [main] c.Test5 - GarbageBag{desc='装满垃圾'}
2022/03/09-23:47:54.916 [Thread-0] c.Test5 - 打扫卫生线程...
2022/03/09-23:47:54.916 [Thread-0] c.Test5 - GarbageBag{desc='倒空垃圾袋'}
2022/03/09-23:47:55.922 [main] c.Test5 - 主线程想换一只新的垃圾袋
2022/03/09-23:47:55.922 [main] c.Test5 - 换了么?false
2022/03/09-23:47:55.923 [main] c.Test5 - GarbageBag{desc='倒空垃圾袋'}

Process finished with exit code 0

5、原子数组

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

有如下方法

/**
     * @param arraySupplier 提供数组,可以是线程不安全的数组或线程安全的数组
     * @param lengthFun     获取数组长度的方法
     * @param putConsumer   自增方法,回传array index
     * @param printConsumer 打印数组的方法
     * @param <T>
     */
    private static <T> void demo(Supplier<T> arraySupplier,
                                 Function<T, Integer> lengthFun,
                                 BiConsumer<T, Integer> putConsumer,
                                 Consumer<T> printConsumer) {
        List<Thread> threads = new ArrayList<>();
        T array = arraySupplier.get();
        int length = lengthFun.apply(array);
        for (int i = 0; i < 1000; i++) {
            threads.add(new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    putConsumer.accept(array, j % length);
                }
            }));
        }
        threads.forEach(Thread::start);
        threads.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        printConsumer.accept(array);
    }

不安全的数组

demo(() -> new int[10],
                array -> array.length,
                (array, index) -> array[index]++,
                array -> System.out.println(Arrays.toString(array)));

输出

[979287, 979029, 978970, 978921, 978919, 978832, 979217, 979376, 979381, 979636]

安全的数组

demo(() -> new AtomicIntegerArray(10),
                array -> array.length(),
                (array, index) -> array.getAndIncrement(index),
                array -> System.out.println(array));

输出

[1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000]

6、字段更新器

  • AtomicReferenceFieldUpdater //域,字段
  • AtomicIntegerFieldUpdater
  • AtomicLongFieldUpdater

利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合volatile修饰的关键字使用,否则会出现异常

Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type
	at java.util.concurrent.atomic.AtomicIntegerFieldUpdater$AtomicIntegerFieldUpdaterImpl.<init>(AtomicIntegerFieldUpdater.java:412)
	at java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater(AtomicIntegerFieldUpdater.java:88)
	at nolock.Test7.main(Test7.java:16)
private volatile int field;

    public static void main(String[] args) {
        AtomicIntegerFieldUpdater fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Test7.class, "field");
        Test7 test7 = new Test7();
        fieldUpdater.compareAndSet(test7, 0, 10);
        System.out.println(test7.field);
        fieldUpdater.compareAndSet(test7, 10, 20);
        System.out.println(test7.field);
        fieldUpdater.compareAndSet(test7, 10, 30);
        System.out.println(test7.field);
    }

输出

10
20
20
举报

相关推荐

0 条评论