Java 多线程实战案例之消费者,生产者
0. 背景
学习多线程编程时,肯定会接触到生产者,消费者 这一案例,操作系统中会有银行家就餐
,排队看电影
等问题,但是这些案例如何在java中展示呢?这里笔者使用生产者,消费者 来演示基本的多线程案例。
1. 基本思想
这里先实现一个简单的多线程案例—— 一个生产者,一个消费者。实现生成者生成数据,消费者消费数据。
2. 代码
2.1 单生产者 + 单消费者
因为消费者和生成者均不止一个,所以抽象出其消费、生产类,分别如下:
-
Consume
package multi.thread.consume;
import multi.thread.result.ResultList;
/**
* 01.代表抽象消费者
*/
public abstract class Consumer {
private String name ;//the name of Consumer
private ResultList resultList;// the shared resultList to read/write
public Consumer(String name, ResultList resultList) {
this.name = name;
this.resultList = resultList;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public ResultList getResultList() {
return resultList;
}
public void setResultList(ResultList resultList) {
this.resultList = resultList;
}
public void comsum(){}
}
-
Producer
package multi.thread.produce;
import multi.thread.result.ResultList;
/**
* 01.抽象类,代表生产者
*/
public abstract class Producer {
private String name;//the name of producer
private ResultList resultList;// the share resultList to write
public Producer(String name, ResultList resultList) {
this.name = name;
this.resultList = resultList;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public ResultList getResultList() {
return resultList;
}
public void setResultList(ResultList resultList) {
this.resultList = resultList;
}
//生产方法
public void produce(){
}
}
-
ConsumerOne
这个类是具体的消费者类。用于消费ResultList类实例的 List
中存放值。
package multi.thread.consume;
import multi.thread.result.ResultList;
/**
* 01.factual consumer
*/
public class ConsumerOne extends Consumer {
public ConsumerOne(String name, ResultList resultList) {
super(name, resultList);
}
@Override
public void comsum() {
while(true) {
synchronized (this.getResultList()) {//lock the list
if (this.getResultList().getLists().size() == 0) {
try {
System.out.println("consumer wait...");
this.getResultList().wait();
System.out.println("consumer is waking...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//if have else,consume
// the size is more than index [size = peek + 1]
int index = this.getResultList().getLists().size() - 1;
System.out.println("start consume:" + this.getResultList().getLists().get(index));
System.out.println(" before consume: " + this.getResultList().getLists().size());
this.getResultList().getLists().remove(index);// remove one element
System.out.println(" after consume: " + this.getResultList().getLists().size());
this.getResultList().notify();//notify the producer
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
01. wait 和 notify的顺序写反了,导致出现错误。
**/
-
ProducerOne
这个类是具体的生产者类。用于往ResultList类实例的 List
中存放值。
package multi.thread.produce;
import multi.thread.produce.Producer;
import multi.thread.result.ResultList;
public class ProducerOne extends Producer {
public ProducerOne(String name, ResultList resultList) {
super(name, resultList);
}
@Override
public void produce() {
while(true) {
synchronized (this.getResultList()) {//the lock of result
//System.out.println("produce's size: " + this.getResultList().getLists().size());
if (this.getResultList().getLists().size() > 0) {
try {
System.out.println("producer wait...");
this.getResultList().wait();//wait the consumer to consume
System.out.println("producer is waking...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (this.getResultList().getLists().size() == 0) {
System.out.println("start produce...");
System.out.println(" before produce: "+this.getResultList().getLists().size());
this.getResultList().getLists().add("1");// add the string to resultList
System.out.println(" after produce: "+this.getResultList().getLists().size());
this.getResultList().notify();//notify the cosumer
}
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
-
ConsumeThread
package multi.thread.threadpool;
import multi.thread.consume.Consumer;
public class ConsumeThread implements Runnable{
private Consumer consumer ;
public ConsumeThread(Consumer consumer){
this.consumer = consumer;
}
public void run() {
consumer.comsum();
}
}
-
ProduceThread
package multi.thread.threadpool;
import multi.thread.produce.Producer;
public class ProduceThread implements Runnable {
private Producer producer;
public ProduceThread(Producer producer) {
this.producer = producer;
}
public void run() {
producer.produce();
}
}
3. 执行结果
2.3 主要问题
当然,并不是写完代码就可以跑成功的,在本案例中需要注意如下几个点:
-
ConsumeThread
类实现Runnable
接口之后,并不能直接使用其实例运行run方法。而是需要将这个实现了Runnable的实例作为Thread的构造参数,然后运行run方法。主要如下图所示: - 因为要模拟一个不停的消费、生产过程,所以需要使用到while(true)循环,但是这个while(true)循环的使用是有很大讲究的,我们希望这个生产者线程不停地生产;这个消费者线程不停的消耗。但是我之前的写法却是下面这个样子:
这么写的结果就会导致IllegalThreadStateException
,原因就是本来就处于running
的线程,再次被执行start
方法。
- 同样,还有的问题是,执行
wait(), notify()
方法时,需要注意保证其在持有对象锁的时候执行。如下标注: - 同时,还有一个问题是,执行wait()方法虽然是当前运行这个代码的线程,但是调用wait()的对象是锁对象。而不是一个单
wait()
方法。即如下的写法将会出错:
- 同时,需要注意使用
wait
,notify
方法的顺序,否则不会得到正确的结果
4. 其它
完整项目可见我的 github ,如果有帮助,别忘了star
哦。