简单 RPC 实现(一)
项目结构
[root@localhost ppr-parent]# tree -L 1
├── pom.xml
├── ppr-consumer  # 消费者
├── ppr-provider  # 服务提供者
├── ppr-sdk       # rpc sdk
└── ppr-service   # 服务接口定义
ppr-consumer 依赖 ppr-sdk,ppr-service
ppr-provider 依赖 ppr-sdk,ppr-service
 
展开
├── pom.xml
├── ppr-consumer
│   ├── pom.xml
│   └── src
│       ├── main
│       │   └── java
│       │       └── com.ddup.consumer
│       │                   └── service
│       │                       └── HelloServicePpr.java
├── ppr-provider
│   ├── pom.xml
│   └── src
│       ├── main
│       │   └── java
│       │       └── com.ddup.provider
│       │                   ├── server
│       │                   │   └── Server.java
│       │                   └── service
│       │                       └── HelloServiceImpl.java
├── ppr-sdk
│   ├── pom.xml
│   └── src
│       ├── main
│       │   └── java
│       │       └── com.ddup. sdk
│       │                   ├── exception
│       │                   │   └── RpcException.java
│       │                   ├── handler
│       │                   │   ├── NetClient.java
│       │                   │   └── PprProxyInvocationHandler.java # 客户端代理
│       │                   ├── PprRequest.java
│       │                   └── server
│       │                       └── ProviderServer.java
└── ppr-service
    ├── pom.xml
    └── src
        ├── main
        │   └── java
        │       └── com.ddup. service
        │                   └── HelloService.java
 
ppr-service 定义接口
public interface HelloService {
    String sayHi(String name);
}
 
ppr-consumer 消费者
-  
定义 PprRequest 【一定要实现序列化,对象内部的属性也必须是实现序列化】
 -  
创建代理类
 -  
使用socket 进行数据传输
 
PprRequest
@Data
public class PprRequest implements Serializable {
    private Object[] args;
    private Class<?> clz;
    // Method 使用这个属性不行, Method 没有实现序列化
    //private Method method;
    private String method;
    private Class<?>[] parameterTypes;
}
 
请求代理类 PprProxyInvocationHandler
所有的接口都通过代理类调用 socket 通信
public class PprProxyInvocationHandler<T> implements InvocationHandler {
    private Class<T> service;
    private NetClient client = new NetClient();
    public PprProxyInvocationHandler(Class<T> service) {
        this.service = service;
    }
    public T getProxy() {
        //如果是接口具体的实现类,第二个参数用 service.getInterfaces()
        //return (T) Proxy.newProxyInstance(service.getClassLoader(), service.getInterfaces(), this);
        //如果只是接口,第二个参数使用 new Class[]{service}
        return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class[]{service}, this);
    }
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        PprRequest request = new PprRequest();
        request.setArgs(args);
        request.setMethod(method.getName());
        request.setParameterTypes(method.getParameterTypes());
        request.setClz(service);
        return client.execute(request);
    }
}
 
Rpc 通信 NetClient
public class NetClient {
    public Object execute(PprRequest request) {
        ObjectInputStream ois = null;
        ObjectOutputStream oos = null;
        try {
            String host = "localhost";
            int port = 8888;
            Socket socket = new Socket(host, port);
            oos = new ObjectOutputStream(socket.getOutputStream());
            oos.writeObject(request);
            oos.flush();
            ois = new ObjectInputStream(socket.getInputStream());
            Object result = ois.readObject();
            return result;
        } catch (Exception e) {
            try {
                if (ois != null) {
                    ois.close();
                }
                if (oos != null) {
                    oos.close();
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
        throw new RpcException("ppr调用异常");
    }
}
 
客户端调用:
public class HelloServicePpr {
    public void helloTest() {
        String name = "小明";
        PprProxyInvocationHandler invocationHandler = new PprProxyInvocationHandler(HelloService.class);
        HelloService proxy = (HelloService) invocationHandler.getProxy();
        String s = proxy.sayHi(name);
        System.out.println(s);
    }
    public static void main(String[] args) {
        new HelloServicePpr().helloTest();
    }
}
 
服务提供者 ppr-provider
- socket 反序列化 生成 PprRequest
 - 根据 PprRequest 中 className 找到具体实现类
 - 使用反射执行方法
 - 序列化结果返回给消费者
 
服务端通信模板
public abstract class ProviderServer {
    public void start() {
        ObjectInputStream ois = null;
        ObjectOutputStream oos = null;
        try {
            int port = 8888;
            ServerSocket serverSocket = new ServerSocket(port);
            System.out.println("ppr server start at 8888...");
            while (true) {
                try {
                    Socket socket = serverSocket.accept();
                    //反序列化
                    ois = new ObjectInputStream(socket.getInputStream());
                    PprRequest request = (PprRequest) ois.readObject();
                    //反射调用
                    Object result = invoke(request);
                    oos = new ObjectOutputStream(socket.getOutputStream());
                    oos.writeObject(result);
                    oos.flush();
                } catch (Exception e) {
                    try {
                        if (ois != null) {
                            ois.close();
                        }
                        if (oos != null) {
                            oos.close();
                        }
                    } catch (IOException ex) {
                        ex.printStackTrace();
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    protected Object invoke(PprRequest request) {
        try {
            Method method = request.getClz().getMethod(request.getMethod(), request.getParameterTypes());
            Object o = getProvider(request);
            if (o != null) {
                return method.invoke(o, request.getArgs());
            } else {
                System.out.println("bean 不存在");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
    /**
     * 获取实现类
     *
     * @param request
     * @return
     */
    protected abstract Object getProvider(PprRequest request);
}
 
- 在 ppr-provider 中 启动服务:
 
public class Server extends ProviderServer {
    private ConcurrentHashMap<String, Object> factory = new ConcurrentHashMap<>();
    {
        factory.put("com.ddup.service.HelloService", new HelloServiceImpl());
    }
    public static void main(String[] args) throws IOException {
        new Server().start();
        System.in.read();
    }
    @Override
    protected Object getProvider(PprRequest request) {
        return factory.get(request.getClz().getName());
    }
}
 
2.HelloService 实现业务
public class HelloServiceImpl implements HelloService {
    @Override
    public String sayHi(String name) {
        return "echo from server, hi: " + name;
    }
}
 
启动 ppr-provider 服务
ppr server start at 8888...
 
ppr-consumer 调用
echo from server, hi: 小明
 
good luck!










