【RPC】手撸一个简单的RPC框架实现
涉及技术 序列化、Socket通信、Java动态代理技术,反射机制
角色 1、服务提供者:运行在服务端,是真实的服务实现类 2、服务发布监听者:运行在RPC服务端,1将服务端提供的服务暴露为远程服务并2监听客户端请求3调用真实服务 3、客户端代理:运行在RPC客户端,通过该代理调用远程服务提供者,将结果封装返回本地消费者 4、客户端消费者:委托客户端代理实现透明的RPC调用
代码实现
(1)服务提供者代码实现 接口 public interface IRealService { public void sayHello(); } 实现类 public class RealServiceImpl implements IRealService { @Override public void sayHello() { System.out.println("Hello,Client!"); } }
(2)服务发布监听者
package com.zerone.rpcdemo; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; /** * Created by Andy ye on 2019/4/12. * * @author Andy */ public class RpcServiceListenExecutor { static ThreadPoolTaskExecutor threadPoolTaskExecutor; /** * 开启Socket,监听RPC请求 * * @param hostName * @param port * @throws IOException */ public static void listenAndExecute(String hostName, int port) throws IOException { //1、利用springframework创建线程池 RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { new ThreadPoolExecutor.AbortPolicy(); } }; threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(10); threadPoolTaskExecutor.setMaxPoolSize(100); threadPoolTaskExecutor.setQueueCapacity(2000); threadPoolTaskExecutor.setRejectedExecutionHandler(rejectedExecutionHandler); threadPoolTaskExecutor.initialize(); //2绑定socket服务,监听socket客户端请求 ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(hostName, port)); //3当客户端代理请求到达时,接收socket客户端请求,并开启一个执行真实服务调用的线程 while (true) { threadPoolTaskExecutor.execute(new PublishTask(serverSocket.accept())); } } /** * 接收客户端代理的RPC请求,反序列化数据对象,实现真实的服务调用 */ public static class PublishTask implements Runnable { Socket socket = null; public PublishTask(Socket socket) { this.socket = socket; } @Override public void run() { ObjectInputStream inputStream = null; ObjectOutputStream outputStream = null; try { //socket.getInputStream() 是读取和客户端代理发送过来的序列化对象 //该对象包含了本次服务调用的类名 方法名 参数类型 和 参数值 inputStream = new ObjectInputStream(socket.getInputStream()); // 反序列化数据 // 读取接口名 方法名称 参数类型 参数值 String interfaceName = inputStream.readUTF(); String methodName = inputStream.readUTF(); Class<?>[] parameterTypes = (Class<?>[]) inputStream.readObject(); Object[] args = (Object[]) inputStream.readObject(); //通过反射实例化该类对象 Class<?> service = Class.forName(interfaceName); //通过类对象和方法名 参数类型获取本类要执行的具体方法 Method method = service.getMethod(methodName, parameterTypes); //调用该方法(在这里利用反射实现真实类的方法实际调用) 返回结果 Object result = method.invoke(service.newInstance(), args); outputStream = new ObjectOutputStream(socket.getOutputStream()); outputStream.writeObject(result); } catch (Exception e) { e.printStackTrace(); } finally { if (inputStream != null) { try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if (outputStream != null) { try { outputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } } } (3)
客户端代理代码实现 package com.zerone.rpcdemo; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.Socket; /** * Created by Andy ye on 2019/4/12. * S 代表了调用的真实服务类实现的接口 */ public class RpcClientProxy<S> { public S callRealService(final Class<?> serviceClass, final InetSocketAddress addr) { //返回的是真实服务类实现的接口 return (S) Proxy.newProxyInstance( serviceClass.getClassLoader(), new Class<?>[]{serviceClass.getInterfaces()[0]}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket = null; ObjectOutputStream outputStream = null; ObjectInputStream inputStream = null; try { //建立socket连接客户端 socket = new Socket(); socket.connect(addr); // 构造服务调用数据 接口 方法 参数类型 参数值 outputStream = new ObjectOutputStream(socket.getOutputStream()); outputStream.writeUTF(serviceClass.getName()); outputStream.writeUTF(method.getName()); outputStream.writeObject(method.getParameterTypes()); outputStream.writeObject(args); // 序列化数据对象,以便将其发送到RPC监听服务器 inputStream = new ObjectInputStream(socket.getInputStream()); return inputStream.readObject(); } catch (Exception e) { e.printStackTrace(); return null; } finally { if (socket != null) { socket.close(); } if (outputStream != null) { outputStream.close(); } if (inputStream != null) { inputStream.close(); } } } } ); } } (4)测试消费者调用RPC package com.zerone.rpcdemo; import java.net.InetSocketAddress; /** * Created by Andy ye on 2019/4/12. * * @author Andy */ public class ClientTest { public static void main(String[] args) { //开启一个RPC服务监听处理器 new Thread(new Runnable() { @Override public void run() { try { RpcServiceListenExecutor.listenAndExecute("localhost", 8088); } catch (Exception e) { e.printStackTrace(); } } }).start(); //创建客户端代理,构造RPC请求参数,发起RPC调用 RpcClientProxy<IRealService> proxy = new RpcClientProxy<IRealService>(); IRealService realService = proxy.callRealService(RealServiceImpl.class, new InetSocketAddress("localhost", 8088)); realService.sayHello(); } } 结果:
一个细节:当我使用Executors创建线程池时,提示我通过ThreadPoolExcutor的方式去创建会避免资源消耗殆尽的风险 ThreadPoolExcutor可以指定线程的最小创建数量和最大创建数量,以此来控制线程池占用的内存
扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄



更多精彩