Active Objects设计模式

Active Objects设计模式

Active是主动的意思,Active Object是主动对象的意思。主动对象就是拥有自己的独立线程。 Active Object模式不仅有自己的独立线程,还可以接受异步消息,并能返回处理结果。从标准的Active Objects设计入手,将一个接口的方法调用转换成可接收异步消息的主动对象,也就是说方法的执行和方法的调用是在不同的线程中进行的,接口方法的参数以及具体的实现封装成特定的Message告诉执行线程,接口方法需要返回值,必须以Future形式返回。

第一种方法:当某个线程调用OrderService接口的findOrderDetails方法时,是会发送一个包含findOrderDetails方法参数以及OrderService具体实现的Message到Message队列,执行线程通过从队列中获取Message来调用具体的实现,接口的方法的调用和接口方法的执行分别处于不同的线程中,因此称该接口为Active Objects(可接受异步消息的主动对象)。 具体样例代码如下:

java

复制代码

public interface OrderService {

Future findOrderDetails(long orderId);

void order(String account,long orderId);

}

java

复制代码

public class OrderServiceImpl implements OrderService{

@Override

public Future findOrderDetails(long orderId) {

return FutureService.newService().submit(input->{

try {

System.out.println("process the orderId->"+orderId);

TimeUnit.SECONDS.sleep(10);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

return "The order details Information";

},orderId);

}

@Override

public void order(String account, long orderId) {

try {

System.out.println("process the orderId->"+orderId+" , account->"+account);

TimeUnit.SECONDS.sleep(10);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

java

复制代码

public class OrderServiceProxy implements OrderService{

private final OrderService orderService;

private final ActiveMessageQueue activeMessageQueue;

public OrderServiceProxy(OrderService orderService,ActiveMessageQueue activeMessageQueue) {

this.orderService=orderService;

this.activeMessageQueue=activeMessageQueue;

}

@Override

public Future findOrderDetails(long orderId) {

final ActiveFuture activeFuture=new ActiveFuture<>();

Map params=new HashMap<>();

params.put("orderId", orderId);

params.put("activeFuture", activeFuture);

MethodMessage message=new FindOrderDetailsMessage(params,orderService);

activeMessageQueue.offer(message);

return activeFuture;

}

@Override

public void order(String account, long orderId) {

Map params=new HashMap<>();

params.put("account", account);

params.put("orderId", orderId);

MethodMessage message=new OrderMessage(params,orderService);

System.out.println("processing in OrderServicePoxy.order method");

activeMessageQueue.offer(message);

}

}

java

复制代码

public class ActiveFuture extends FutureTask{

@Override

public void finish(T result) {

super.finish(result);

}

}

java

复制代码

import java.util.Map;

public abstract class MethodMessage {

protected final Map params;

protected final OrderService orderService;

public MethodMessage(Map params,OrderService orderService) {

this.params=params;

this.orderService=orderService;

}

public abstract void execute();

}

java

复制代码

public class FindOrderDetailsMessage extends MethodMessage{

public FindOrderDetailsMessage(Map params, OrderService orderService) {

super(params, orderService);

}

@Override

public void execute() {

Future realFuture=orderService.findOrderDetails((Long) params.get("orderId"));

ActiveFuture activeFuture=(ActiveFuture)params.get("activeFuture");

try {

String result=realFuture.get();

activeFuture.finish(result);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

java

复制代码

import java.util.Map;

public class OrderMessage extends MethodMessage{

public OrderMessage(Map params, OrderService orderService) {

super(params, orderService);

}

@Override

public void execute() {

String account=(String)params.get("account");

long orderId=(long)params.get("orderId");

orderService.order(account, orderId);

}

}

java

复制代码

import java.util.LinkedList;

public class ActiveMessageQueue {

private final LinkedList message=new LinkedList<>();

public ActiveMessageQueue() {

System.out.println("active Object Thread is build");

new ActiveDaemonThread(this).start();

}

public void offer(MethodMessage methodMessage) {

synchronized(this) {

message.add(methodMessage);

System.out.println("processing in ActiveMessageQueue.offer method");

this.notify();

}

}

protected MethodMessage take() {

synchronized(this) {

while(message.isEmpty()) {

try {

this.wait();

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

System.out.println("processing in ActiveMessageQueue.take method");

return message.removeFirst();

}

}

}

java

复制代码

public class ActiveDaemonThread extends Thread{

private final ActiveMessageQueue queue;

public ActiveDaemonThread(ActiveMessageQueue queue) {

super("ActiveDaemonThread");

this.queue=queue;

this.setDaemon(true);

}

@Override

public void run() {

for(;;) {

System.out.println(" active daemon thread is running");

MethodMessage methodMessage=this.queue.take();

methodMessage.execute();

}

}

}

java

复制代码

public class OrderServiceFactory {

private final static ActiveMessageQueue activeMessageQueue=new ActiveMessageQueue();

private OrderServiceFactory() {}

public static OrderService toActiveObject(OrderService orderService) {

return new OrderServiceProxy(orderService,activeMessageQueue);

}

}

java

复制代码

public class AOtest {

public static void main(String[] args) {

OrderService orderService=OrderServiceFactory.toActiveObject(new OrderServiceImpl());

orderService.order("aACC", 5);

Future f=orderService.findOrderDetails(50);

try {

System.out.println("future result is "+f.get());

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

System.out.println("Return immedately");

try {

Thread.currentThread().join();

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

第二种方法:第一种方法在接口方法非常多的情况下,会需要封装成很多的Message类。基于JDK动态代理的方式,可以实现一种更加通用的Active Objects。这种方式下,可以将任意接口方法转换w Active Objects,如果接口方法有返回值,必须返回Future类型才可以,否则会抛出IllegalActiveMethod异常。示例代码如下:

java

复制代码

public class IllegalActivedException extends Exception{

public IllegalActivedException(String message) {

super(message);

}

}

java

复制代码

public interface OrderService {

Future findOrderDetails(long orderId);

void order(String account,long orderId);

}

java

复制代码

public class OrderServiceImpl implements OrderService{

@ActiveMethod

@Override

public Future findOrderDetails(long orderId) {

return FutureService.newService().submit(input->{

try {

System.out.println("process the orderId->"+orderId);

TimeUnit.SECONDS.sleep(10);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

return "The order details Information";

},orderId);

}

@ActiveMethod

@Override

public void order(String account, long orderId) {

try {

System.out.println("process the orderId->"+orderId+" , account->"+account);

TimeUnit.SECONDS.sleep(10);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

java

复制代码

import static java.lang.annotation.ElementType.METHOD;

import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Retention;

import java.lang.annotation.Target;

@Retention(RUNTIME)

@Target(METHOD)

public @interface ActiveMethod {

}

java

复制代码

public class ActiveMessage {

private final Object[] objects;

private final Method method;

private final ActiveFuture future;

private final Object service;

private ActiveMessage(Builder builder) {

this.objects=builder.objects;

this.method=builder.method;

this.future=builder.future;

this.service=builder.service;

}

public void execute() {

Object result;

try {

result = method.invoke(service, objects);

if(future!=null) {

Future realFuture= (Future)result;

Object realResult=realFuture.get();

future.finish(realResult);

}

} catch (Exception e) {

if(future!=null) {

future.finish(null);

}

e.printStackTrace();

}

}

static class Builder{

private Object[] objects;

private Method method;

private ActiveFuture future;

private Object service;

public Builder useMethod(Method method) {

this.method=method;

return this;

}

public Builder returnFuture(ActiveFuture future) {

this.future=future;

return this;

}

public Builder withObjects(Object[] objects) {

this.objects=objects;

return this;

}

public Builder forService(Object service) {

this.service=service;

return this;

}

public ActiveMessage build() {

return new ActiveMessage(this);

}

}

}

java

复制代码

import java.lang.reflect.InvocationHandler;

import java.lang.reflect.Method;

import java.lang.reflect.Proxy;

import org.multithread.future.Future;

public class ActiveServiceFactory {

private final static ActiveMessageQueue queue=new ActiveMessageQueue();

public static T active(T instance) {

Object proxy=Proxy.newProxyInstance(instance.getClass().getClassLoader(),

instance.getClass().getInterfaces(),

new ActiveInvocationHandler<>(instance));

return (T)proxy;

}

private static class ActiveInvocationHandler implements InvocationHandler{

private final T instance;

ActiveInvocationHandler(T instance){

this.instance=instance;

}

private void checkMethod(Method method) throws IllegalActivedException{

if(!isReturnVoidType(method)&&!isReturnFutureType(method)) {

throw new IllegalActivedException("the method ["+method.getName()+"] return type must be void/Future");

}

}

private boolean isReturnVoidType(Method method) {

return method.getReturnType().equals(Void.TYPE);

}

private boolean isReturnFutureType(Method method) {

return method.getReturnType().isAssignableFrom(Future.class);

}

@Override

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

if(method.isAnnotationPresent(ActiveMethod.class)) {

this.checkMethod(method);

ActiveMessage.Builder builder=new ActiveMessage.Builder();

builder.useMethod(method).withObjects(args).forService(instance);

Object result=null;

if(this.isReturnFutureType(method)) {

result=new ActiveFuture<>();

builder.returnFuture((ActiveFuture) result);

}

queue.offer(builder.build());

return result;

}else {

return method.invoke(instance, args);

}

}

}

}

java

复制代码

import java.util.LinkedList;

public class ActiveMessageQueue {

private final LinkedList activeMessages=new LinkedList<>();

public ActiveMessageQueue() {

System.out.println("active Object Thread is build");

new ActiveDaemonThread(this).start();

}

public void offer(ActiveMessage activeMessage) {

synchronized(this) {

this.activeMessages.add(activeMessage);

System.out.println("processing in ActiveMessageQueue.offer method");

this.notify();

}

}

public ActiveMessage takeActive() {

synchronized(this) {

while(this.activeMessages.isEmpty()) {

try {

this.wait();

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

return this.activeMessages.removeFirst();

}

}

}

java

复制代码

public class ActiveDaemonThread extends Thread{

private final ActiveMessageQueue queue;

public ActiveDaemonThread(ActiveMessageQueue queue) {

super("ActiveDaemonThread");

this.queue=queue;

this.setDaemon(true);

}

@Override

public void run() {

for(;;) {

System.out.println(" active daemon thread is running");

ActiveMessage activeMessage=this.queue.takeActive();

activeMessage.execute();

}

}

}

java

复制代码

public class AOtest {

public static void main(String[] args) {

ActiveServiceFactory activeInstance=new ActiveServiceFactory();

OrderService orderService=activeInstance.active(new OrderServiceImpl());

orderService.order("aACC", 5);

Future f=orderService.findOrderDetails(150);

try {

System.out.println("future result is "+f.get());

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

System.out.println("Return immedately");

try {

Thread.currentThread().join();

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

相关文章

种子种下去后怎么浇水
365彩票app下载不了

种子种下去后怎么浇水

10-09 阅读: 9237
关于印发《上海市企业工资支付办法》的通知
365彩票app下载不了

关于印发《上海市企业工资支付办法》的通知

08-07 阅读: 2590
拼多多店铺如何设置关注?拼多多店铺注册入口
365彩票网3d专家预测

拼多多店铺如何设置关注?拼多多店铺注册入口

10-15 阅读: 8431
金色木头有哪些
365彩票网3d专家预测

金色木头有哪些

08-17 阅读: 5742
张三丰是哪个朝代的
365彩票网3d专家预测

张三丰是哪个朝代的

09-18 阅读: 1060
优酷会员怎么支付,优酷会员支付详解:从选择方式到成功支