0
点赞
收藏
分享

微信扫一扫

【并发编程】同步容器与并发容器


文章目录

  • ​​1.同步容器类​​
  • ​​2.并发容器类​​

1.同步容器类

(1)为什么会出现同步容器

  • Java集合框架中,主要有四大类别:List、Set、Queue、Map。
  • List、Set、Queue接口分别继承了Collection接口,Map本身是一个接口。
  • 注意Collection和Map是一个顶层接口,而List、Set、Queue则继承了Collection接口,分别代表数组、集合和对列这三大容器。
  • 像ArrayList、LinkedList都是实现List接口,HashSet实现了Set接口,而Deque继承了Queue接口,PriorityQueue实现了Queue接口。另外LinkedList实现了Deque接口。
  • 像ArrayList、LinkedList、HashMap这些容器都是非线程安全的。如果有多个线程并发地访问这些容器时,就会出现问题。
  • 因此,在编写程序时,必须要求程序员手动地在任何访问到这些容器的地方进行同步处理,这样导致在使用这些容器的时候非常地不方便。
  • 所以,Java提供了同步容器供用户使用。

(2)Java中的同步容器类

  • Vector、Stack、HashTable
  • Collections类中提供的静态工厂方法创建的类
  • Vector实现了List接口,Vector实际上就是一个数组,和ArrayList类似,但是Vector中的方法都是synchronized方法,即进行了同步措施。
  • Stack也是一个同步容器,它的方法也是用synchronzied进行了同步,继承Vector类。
  • HashTable实现了Map接口,他和HashMap相似,但是HashTable进行了同步处理,而HashMap没有。
  • Collections类是一个工具提供类,注意,它和Collection不同,Collection是一个顶层的接口。在Collections类中提供了大量的方法,比如对集合或者容器进行排序、查找等操作。最重要的是,在它里面提供了几个静态工厂方法来创建同步容器类

【并发编程】同步容器与并发容器_List

(3)同步容器的缺陷

  • 1.性能问题:传统的非同步容器和同步容器的性能差异,我们以ArrayList和Vector为例

public class Demo1 {
public static void main(String[] args) {
ArrayList<Integer> arrayList = new ArrayList<>();
Vector<Integer> vector = new Vector<>();

long startTime = System.currentTimeMillis();
for (int i = 0; i < 100000; i++) {
vector.add(i);
}
long endTime = System.currentTimeMillis();
System.out.println("vector循环10万次添加元素消耗的时间:"+(endTime-startTime)+"ms");

long startTime1 = System.currentTimeMillis();
for (int i = 0; i < 100000; i++) {
arrayList.add(i);
}
long endTime1 = System.currentTimeMillis();
System.out.println("ArrayList循环10万次添加元素消耗的时间:"+(endTime1-startTime1)+"ms");
}
}

【并发编程】同步容器与并发容器_List_02

  • 进行同样多的插入操作,Vector的耗时是ArrayList的两倍。
  • 这只是其中的一方面性能问题上的反映。
  • 另外,由于Vector中的add方法和get方法都进行了同步,因此,在有多个线程进行访问时,如果多个线程都只是进行读取操作,那么每个时刻就只能有一个线程进行读取,其他线程便只能等待,这些线程必须竞争同一把锁。
  • 因此为了解决同步容器的性能问题,在Java 1.5中提供了并发容器,位于java.util.concurrent目录下。
  • 2.同步容器的安全性问题
  • Vector中的方法都进行了同步处理,那么一定就是线程安全的,事实上这可不一定。

public class Demo2 {
private static Vector<Integer> vector = new Vector<>();

public static void main(String[] args) {

while (true) {
for (int i = 0; i < 10; i++) {
vector.add(i);
}
Thread thread1 = new Thread(() -> {

for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
});
Thread thread2 = new Thread(() -> {

for (int i = 0; i < vector.size(); i++) {
vector.get(i);
}
});
thread1.start();
thread2.start();
}
}
}

【并发编程】同步容器与并发容器_java_03

正如大家所看到的,这段代码报错了:数组下标越界。

也许有朋友会问:Vector是线程安全的,为什么还会报这个错?很简单,对于Vector,虽然能保证每一个时刻只能有一个线程访问它,但是不排除这种可能:

当某个线程在某个时刻执行这句时:

for(int i= 0;i < vector.size();i++) vector.get(i);

假若此时vector的size方法返回的是10,i的值为9

然后另外一个线程执行了这句:

for(int i= 0;i < vector.size();i++) vector.remove(i);

将下标为9的元素删除了。

那么通过get方法访问下标为9的元素肯定就会出问题了。

因此为了保证线程安全,必须在方法调用端做额外的同步措施,如下面所示:

public class Demo2 {
private static Vector<Integer> vector = new Vector<>();
private static Object object = new Object();

public static void main(String[] args) {

while (true) {
for (int i = 0; i < 10; i++) {
vector.add(i);
}
Thread thread1 = new Thread(() -> {

synchronized (object) {
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
});
Thread thread2 = new Thread(() -> {

synchronized (object) {
for (int i = 0; i < vector.size(); i++) {
vector.get(i);
}
}
});
thread1.start();
thread2.start();
}
}
}

  • 3.ConcurrentModificationException异常
  • 在对Vector等容器并发地进行迭代修改时,会报ConcurrentModificationException异常,关于这个异常将会在后续文章中讲述。但是在并发容器中不会出现这个问题。

2.并发容器类

(1)ConcurrectHashMap

【并发编程】同步容器与并发容器_List_04

  • HashMap、HashTable与ConcurrentHashMap都是实现的哈希表数据结构,在随机读取的时候效率很高。HashTable实现同步是利用synchronzied关键字进行锁定的,其实针对整张Hash表进行锁定,及每次锁住整张表让线程独占,在线程安全的背后是巨大的浪费。
  • ConcurrentHashMap和HashTable主要的区别就在于围绕这锁的粒度进行区别以及如何区锁定。
  • 上图中,左边是HashTable的实现方式,可以看到锁住的是整张哈希表,而右边是ConcurrectHashMap的实现方式,单独锁住每一个桶(segment),ConcurrentHashMap将哈希表分为16个桶(默认值),诸如get()、put()、remove()等常用操作之锁定当前需要的桶,而size()才锁定整张表。原来只能一个线程进入,现在却能同时接受16个写线程并发进入(写线程需要锁定,而读线程几乎不受限制),并发性的提升是显而易见的。
  • 而在迭代时,ConcurrentHashMap使用了不同于传统集合的快速失败迭代器(fast-fail iterator)的另外一种迭代方式,称为弱一致迭代器。在这种迭代方式中,当iterator被创建后集合在发生改变就不再是抛出ConcurrentModificationException,取而代之的是在改变时实例化出新的数据从而不影响原有的数据,iterator完成后在将头指针替换成为新的数据,这样iterator线程可以使用原来老的数据,而写线程也可以并发的完成改变,更重要的,这保证了多个线程并发执行的连续性和扩展性,是性能提升的关键。

public class Demo3 {
public static void main(String[] args) {
//Map<String,String> map = new ConcurrentHashMap<>(); //84ms
//Map<String,String> map = new ConcurrentSkipListMap<>(); //108ms
//Map<String,String> map = new Hashtable<>(); //92ms
Map<String,String> map = new HashMap<>(); //139ms

Random random = new Random();

Thread[] threads = new Thread[100];

CountDownLatch latch = new CountDownLatch(threads.length);
long startTime = System.currentTimeMillis();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(()->{
for (int j = 0; j < 10000; j++) {
map.put(random.nextInt(1000000)+"",random.nextInt()+"");
latch.countDown();
}
});
}

Arrays.asList(threads).forEach(thread -> thread.start());

try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println(endTime-startTime+"ms");
}
}

  • 运行结果:

ConcurrentHashMap:84ms
ConcurrentSkipListMap:108ms
Hashtable:92ms
HashMap:139ms

  • 启动100个线程,向每个容器中添加100000个元素,最终发现,ConcurrentHashMap要比HashMap效率高,ConcurrentHashMap是将大锁分成若干小锁,实现多个线程共同运行,锁以效率有很大差距。ConcurrentSkipListMap较ConcurrentHashMap除了实现高并发外还能对元素进行排序。

(2)ConcurrentQueue

  • 与ConcurrentHashMap相同,ConcurrentQueue也是通过同样的方式来提高并发性能的。
  • 同步容器中提到过火车票问题:
  • 有N张火车票,每张车票都有一个编号,同时有10个窗口对外售票。
  • 使用ConcurrentQueue进一步提高并发性:

public class Demo4 {
private static Queue<String> queues = new ConcurrentLinkedDeque<>();

static {
for (int i = 0; i < 10000; i++) {
queues.add("票编号:"+i);
}
}

public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
new Thread(()->{
while(true){
String s = queues.poll();
if (s ==null) break;
else System.out.println("销售了---"+s);
}
}).start();
}
long end = System.currentTimeMillis();
Thread.sleep(3000L);
System.out.println("总耗时:"+(end-start)+"ms");
}

}

【并发编程】同步容器与并发容器_后端_05

  • 常用的API

Queue<String> strings = new ConcurrentLinkedQueue<String>();

strings.offer(元素) //相当于add,放进队列

strings.size() //获取当前队列的元素个数

strings.poll() //取出并移除

strings.peek() //取出不会移除,相当于get();

(3)CopyOnWriteArrayList

  • 写时复制容器,即copy-on-write,多线程环境下,写时效率低,读时效率高,适合写少读多的环境。

public class Demo5 implements Runnable{
private static List<String> lists = new ArrayList<>();
//private static List<String> lists = new Vector<>();
//private static List<String> lists = new CopyOnWriteArrayList<>();
private Random random = new Random();

@Override
public void run() {
for (int i = 0; i < 10000; i++) {
lists.add(random.nextInt()+"");
}
}

public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
System.out.println("线程开始操作");
for (int i = 0; i < 10; i++) {
new Thread(new Demo5()).start();
}
for (int i = 0; i < 100; i++) {
int finalI = i;
new Thread(()->{
for (int j = 0; j < lists.size(); j++) {
lists.get(finalI);
}
}).start();
}
long end = System.currentTimeMillis();
Thread.sleep(6000L);
System.out.println("耗时:"+(end-start)+"ms");
}
}

  • 运行结果:

ArrayList:报错:Exception in thread "Thread-1" java.lang.ArrayIndexOutOfBoundsException: 244
Vector:117ms
CopyOnWriteArrayList:222ms

  • 从JDK5开始Java并发包里面提供了两个使用CopyOnWrite机制实现的并发容器,它们是CopyOnWriteArrayList和CopyOnWriteArraySet。
  • 当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后向新的容器添加元素,添加完成元素后,再将原来的容器的引用指向新的容器。这样做的好处就是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为在当前读的容器不会添加任何元素。所以CopyOnWrite容器是一种读写分离的思想,读和写写对应不同的容器。

(4)BlockingQueue

  • 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。
  • 阻塞对列,顾名思义,首先他是一个队列,一个队列在数据结构当中起到的作用大致如下:

【并发编程】同步容器与并发容器_后端_06

  • 队列可以使得数据由队列的一端输入,从另一端输出。
  • 先进先出(FIFO):先插入的队列的元素也是最先出队列,这种队列体现了一种公平性。
  • 后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件。

【并发编程】同步容器与并发容器_List_07

(5)LinkedBlockingQueue

  • 这中并发容器,会自动实现阻塞式的生产者/消费者模式。使用队列解耦合,在实现异步事物的时候很有用。
  • 案例

public class Demo6 {

//实例化时指定容器容量
private static LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(1000);

public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
int finalI = i;
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
try {
//向对列中添加元素,如果对列满了 就等待1s在进行添加
boolean b = linkedBlockingQueue.offer(finalI + "", 1, TimeUnit.SECONDS);
if (b) {
System.out.println(finalI + "队列添加成功");
} else {
System.out.println(finalI + "队列添加失败,进入等待");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}

for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
try {
//消费队列,如果为空就等待消费
String take = linkedBlockingQueue.take();
System.out.println("消费队列元素:" + take);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}).start();
}

}

}

【并发编程】同步容器与并发容器_List_08

  • 常用API

//实例化时指定容器容量
private static LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(1000);

linkedBlockingQueue.add(元素) //如果队列满了,再次添加就会抛出异常:java.lang.IllegalStateException: Queue full

linkedBlockingQueue.offer(元素,时间,时间单位) //队列满了,等待时间后,再次添加,失败返回false

linkedBlockingQueue.offer(元素) //队列满了,添加失败返回false,成功返回true

linkedBlockingQueue.put(元素) //加入队列,如果满了就等待阻塞

linkedBlockingQueue.take() //取出队列中的元素,如果空了,就会等待阻塞

(6)ArrayBlockingQueue

  • ArrayBlockingQueue和LinkedBlockingQueue对象的方法都是一样的,用法是一样的。
  • 二者的区别:
  • LinkedBlockingQueue是一个单向链表实现的阻塞队列,在链表一头加入元素,如果队列满了,就会阻塞,另一头取出元素,如果队列为空,就会阻塞。
  • LinkedBlockingQueue内部使用ReetrantLock实现插入锁(putLock)和取出锁(takeLock)。
  • ArrayBlockingQueue基于数组实现,成为有界队列,LinkedBlockingQueue认为是无界队列。当然LinkedBlockingQueue也可以指定队列容量。

【并发编程】同步容器与并发容器_List_09

(7)DelayQueue

  • DelayQueue也是一个BlockingQueue,用于放置实现了Delayed接口的对象,只能是实现了Delayed接口的对象,其中对象只能在其到期时才能从队列中取走。
  • Delayed扩展了Comparable接口,比较的基准为延时的时间,Delayed接口实现类getDelay()返回值为固定值(final),DelayedQueue内部是使用PriorityQueue实现的;即 (DelayQueue = BlockingQueue + PriorityQueue + Delayed
  • 可以说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准是时间。是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时能从队列中取走。这种队列是有序的,及队头对象的延迟到期时间最长。但是要注意不能将null元素放置到队列中。
  • Delayed,一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。此接口的实现类必须重写一个compareTo()方法,该方法提供于此接口的getDelay()方法一致的排序。
  • DelayQueue存储的对象是实现了Delayed接口的对象,在这个对象中,需要重写compareTo()和getDelay()方法。
  • 自定义MyTask类实现Delayed

public class MyTask implements Delayed {

private long time;
private String name;
private long start = System.currentTimeMillis();

public MyTask(String name,long time) {
this.time = time;
this.name = name;
}

@Override
public long getDelay(TimeUnit unit) {
return (start+time) - System.currentTimeMillis();
}

@Override
public int compareTo(Delayed o) {
return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}

@Override
public String toString() {
return "MyTask{" +
"time=" + time +
", name='" + name + '\'' +
'}';
}
}

  • 测试main

public class Main {
public static void main(String[] args) throws InterruptedException {

DelayQueue <MyTask> myTasks = new DelayQueue<>();

new Thread(()->{
myTasks.offer(new MyTask("task1",10000));
myTasks.offer(new MyTask("task2",4000));
myTasks.offer(new MyTask("task3",4200));
myTasks.offer(new MyTask("task4",6200));
myTasks.offer(new MyTask("task5",9800));
}).start();
long start = System.currentTimeMillis();
Thread.sleep(2000);
System.out.println("队列中存放数据:");
for (MyTask myTask : myTasks) {
System.out.println(myTask);
}

System.out.println();
System.out.println("队列中取出数据:");
while(true){
MyTask myTask = myTasks.take();
System.out.println(myTask+":取出耗时:"+(System.currentTimeMillis()-start)+"ms");
}
}
}

【并发编程】同步容器与并发容器_后端_10

  • DelayQueue能做什么
  • 淘宝订单业务:下单之后如果30分钟之内没有付款就自动取消订单。
  • 饿了么定餐通知:下单成功后60s后给用户发短信。
  • 关闭空闲连接:服务器中,很多客户端的连接,空闲一段时间之后需要关闭。
  • 缓存:缓存中的对象,超过了空闲时间,需要从缓存中移出。
  • 任务超时处理:在网络协议滑动窗口请求应答交互时,处理超时未响应的请求等。

(8)LinkedTransferQueue

  • TransferQueue是一个继承了BlockingQueue的接口,并且增加了若干新方法。
  • LinkedTransferQueue是TransferQueue接口的实现类,其定义一个无界的队列,具有先进先出(FIFO)的特性。
  • TransferQueue接口含有下面几个重要方法:
  • transfer(E e)
  • 若当前存在一个正在等待获取的消费者线程,即立刻移交之,否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,到有消费者线程取走该元素。
  • tryTransfer(E e)
  • 若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法会即刻转移\传输对象元素e;如不存在,则返回false,并且不进入队列。这是一个不阻塞的操作。
  • tryTransfer(E e,long timeout,TimeUnit nuit)
  • 若当前存在一个正在等待的消费者线程,会立即传输给它,否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉,若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素被移除。
  • hasWaitingConsumer()
  • 判断是否由消费者线程。
  • getWaitingConsumerCount()
  • 获取所有等待获取元素的消费者线程数量。
  • size()
  • 因为队列的异步特性,检测当前队列的元素个数需要逐一迭代,无法保证原子性,可能会得到一个不太准确的结果,尤其是在遍历时有可能队列发生更改。
  • 消费者生产者案例
  • Producer

public class Producer implements Runnable {

private final TransferQueue<String> queue;

//构造传入LinkedTransferQueue队列
public Producer(TransferQueue<String> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
//生产者循环
while (true){
//判断当前队列是否还有消费者,有的话就生产产品交由消费者线程
if(queue.hasWaitingConsumer()) queue.transfer(produce());
//休眠1s
TimeUnit.SECONDS.sleep(1);
}
}catch (Exception e){
e.printStackTrace();
}
}
//生产产品方法
private String produce(){
return "Your lucky number:"+(new Random().nextInt(100));
}
}

  • Consumer

public class Consumer implements Runnable{
private final TransferQueue<String> queue;

public Consumer(TransferQueue<String> queue) {
this.queue = queue;
}

@Override
public void run() {
try{
//消费者线程取出队列元素
System.out.println("Consumer--"+Thread.currentThread().getName()+"--"+queue.take());
}catch (Exception e){
e.printStackTrace();
}
}
}

  • main测试

public class Main {
public static void main(String[] args) {
TransferQueue<String> queue = new LinkedTransferQueue<>();

Thread producer = new Thread(new Producer(queue));
producer.setDaemon(true);
producer.start();

for (int i = 0; i < 20; i++) {
Thread consumer = new Thread(new Consumer(queue));
consumer.setDaemon(true);
consumer.start();
try{
Thread.sleep(1000L);
}catch (Exception e){
e.printStackTrace();
}
}
}
}

【并发编程】同步容器与并发容器_System_11

(9)SynchronousQueue

  • SynchronousQueue也是一种BlockingQueue,是一种无缓冲的等待队列。所以在某次添加元素后必须等待其他线程取走后才能继续添加,可以认为SynchronousQueue是一个缓存值为0的阻塞队列(也可以是1),它的isEmpty()方法永远返回时true,remainingCapacity()方法永远返回时0。
  • remove和removeAll方法返回永远是false,iterator()方法永远返回空,peek()方法永远返回null。
  • 使用put()方法时,会一直阻塞在这里,等待被消费。
  • 案例代码

public class SynchronousQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> strings = new SynchronousQueue<>();

for (int i = 0; i < 2; i++) {
new Thread(()->{
try{
System.out.println("取出数据:"+strings.take());
}catch (Exception e){
e.printStackTrace();
}
}).start();
}
strings.put("aaa");
strings.put("bbb");

}
}

【并发编程】同步容器与并发容器_List_12

【并发编程】同步容器与并发容器_i++_13


举报

相关推荐

0 条评论