一 Result
 
package concurrent.activeobjects;
public interface Result {
    Object getResultValue();
}
 
二 RealResult
 
package concurrent.activeobjects;
public class RealResult implements Result {
    private final Object result;
    public RealResult(Object reslut) {
        this.result = reslut;
    }
    @Override
    public Object getResultValue() {
        return result;
    }
}
 
三 FutureResult
 
package concurrent.activeobjects;
public class FutureResult implements Result {
    private Result result;
    private boolean ready = false;
    public synchronized void setResult(Result result) {
        this.result = result;
        this.ready = true;
        this.notifyAll();
    }
    @Override
    public synchronized Object getResultValue() {
        while (!ready) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return this.result.getResultValue();
    }
}
 
四 MethodRequest
 
package concurrent.activeobjects;
/**
* @className: MethodRequest
* @description: 对应 ActiveObject 的每一个方法
* @date: 2022/5/3
* @author: cakin
*/
public abstract class MethodRequest {
    protected final Servant servant;
    protected final FutureResult futureResult;
    public MethodRequest(Servant servant, FutureResult futureResult) {
        this.servant = servant;
        this.futureResult = futureResult;
    }
    public abstract void execute();
}
 
五 MakeStringRequest
 
package concurrent.activeobjects;
public class MakeStringRequest extends MethodRequest {
    private final int count;
    private final char fillchar;
    public MakeStringRequest(Servant servant, FutureResult futureResult, int count, char fillChar) {
        super(servant, futureResult);
        this.count = count;
        this.fillchar = fillChar;
    }
    @Override
    public void execute() {
        Result result = servant.makeString(count, fillchar);
        futureResult.setResult(result);
    }
}
 
六 DisplayStringRequest
 
package concurrent.activeobjects;
public class DisplayStringRequest extends MethodRequest {
    private final String text;
    public DisplayStringRequest(Servant servant, final String text) {
        super(servant, null);
        this.text = text;
    }
    @Override
    public void execute() {
        this.servant.displayString(text);
    }
}
 
七 ActivationQueue
 
package concurrent.activeobjects;
import java.util.LinkedList;
public class ActivationQueue {
    private final static int MAX_METHOD_REQUEST_QUEUE_SIZE = 100;
    private final LinkedList<MethodRequest> methodQueue;
    public ActivationQueue() {
        methodQueue = new LinkedList<>();
    }
    public synchronized void put(MethodRequest request) {
        while (methodQueue.size() >= MAX_METHOD_REQUEST_QUEUE_SIZE) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.methodQueue.addLast(request);
        this.notifyAll();
    }
    public synchronized MethodRequest take() {
        while (methodQueue.isEmpty()) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        MethodRequest methodRequest = methodQueue.removeFirst();
        this.notifyAll();
        return methodRequest;
    }
}
 
八 SchedulerThread
 
package concurrent.activeobjects;
public class SchedulerThread extends Thread {
    private final ActivationQueue activationQueue;
    public SchedulerThread(ActivationQueue activationQueue) {
        this.activationQueue = activationQueue;
    }
    public void invoke(MethodRequest request) {
        this.activationQueue.put(request);
    }
    @Override
    public void run() {
        while (true) {
            activationQueue.take().execute();
        }
    }
}
 
九  ActiveObject 
 
package concurrent.activeobjects;
/**
* @className: ActiveObject
* @description: 接受异步消息的主动对象
* @date: 2022/5/3
* @author: cakin
*/
public interface ActiveObject {
    Result makeString(int count, char fillChar);
    void displayString(String text);
}
 
十 Servant
 
package concurrent.activeobjects;
import java.util.concurrent.TimeUnit;
class Servant implements ActiveObject {
    @Override
    public Result makeString(int count, char fillChar) {
        char[] buf = new char[count];
        for (int i = 0; i < count; i++) {
            buf[i] = fillChar;
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return new RealResult(new String(buf));
    }
    @Override
    public void displayString(String text) {
        System.out.println("Display is " + text);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
 
十一 ActiveObjectProxy
 
package concurrent.activeobjects;
class ActiveObjectProxy implements ActiveObject {
    private final SchedulerThread schedulerThread;
    private final Servant servant;
    public ActiveObjectProxy(SchedulerThread schedulerThread, Servant servant) {
        this.schedulerThread = schedulerThread;
        this.servant = servant;
    }
    @Override
    public Result makeString(int count, char fillChar) {
        FutureResult future = new FutureResult();
        schedulerThread.invoke(new MakeStringRequest(servant, future, count, fillChar));
        return future;
    }
    @Override
    public void displayString(String text) {
        schedulerThread.invoke(new DisplayStringRequest(servant, text));
    }
}
 
十二 ActiveObjectFactory
 
package concurrent.activeobjects;
public final class ActiveObjectFactory {
    public ActiveObjectFactory() {
    }
    public static ActiveObject createAcitveObject() {
        Servant servant = new Servant();
        ActivationQueue queue = new ActivationQueue();
        SchedulerThread schedulerThread = new SchedulerThread(queue);
        ActiveObjectProxy proxy = new ActiveObjectProxy(schedulerThread, servant);
        schedulerThread.start();
        return proxy;
    }
}
 
十三 DisplayClientThread
 
package concurrent.activeobjects;
public class DisplayClientThread extends Thread {
    private final ActiveObject activeObject;
    public DisplayClientThread(String name, ActiveObject activeObject) {
        super(name);
        this.activeObject = activeObject;
    }
    @Override
    public void run() {
        try {
            for (int i = 0; true; i++) {
                String text = Thread.currentThread().getName() + "=>" + i;
                activeObject.displayString(text);
                Thread.sleep(200);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
 
十四 MakeClientThread
 
package concurrent.activeobjects;
public class MakeClientThread extends Thread {
    private final ActiveObject activeObject;
    private final char fillChar;
    public MakeClientThread(ActiveObject activeObject, String name) {
        super(name);
        this.activeObject = activeObject;
        this.fillChar = name.charAt(0);
    }
    @Override
    public void run() {
        try {
            for (int i = 0; true; i++) {
                Result result = activeObject.makeString(i + 1, fillChar);
                Thread.sleep(20);
                Object value = (String) result.getResultValue();
                System.out.println(Thread.currentThread().getName() + ": value " + value);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
 
十五 Test
 
package concurrent.activeobjects;
public class Test {
    public static void main(String[] args) {
        ActiveObject activeObject = ActiveObjectFactory.createAcitveObject();
        new MakeClientThread(activeObject, "Alice").start();
        new MakeClientThread(activeObject, "Bobby").start();
        new DisplayClientThread("Chris", activeObject).start();
    }
}
 
十六 测试结果