涉及技术   序列化、Socket通信、Java动态代理技术,反射机制   角色   1、服务提供者:运行在服务端,是真实的服务实现类   2、服务发布监听者:运行在RPC服务端,1将服务端提供的服务暴露为远程服务并2监听客户端请求3调用真实服务   3、客户端代理:运行在RPC客户端,通过该代理调用远程服务提供者,将结果封装返回本地消费者   4、客户端消费者:委托客户端代理实现透明的RPC调用   代码实现   (1)服务提供者代码实现   接口 public interface IRealService {     public void sayHello(); }   实现类 public class RealServiceImpl implements IRealService {     @Override     public void sayHello() {         System.out.println("Hello,Client!");     } }       (2)服务发布监听者   package com.zerone.rpcdemo;     import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;     import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor;     /** * Created by Andy ye on 2019/4/12. * * @author Andy */ public class RpcServiceListenExecutor {         static ThreadPoolTaskExecutor threadPoolTaskExecutor;         /**      * 开启Socket,监听RPC请求      *      * @param hostName      * @param port      * @throws IOException      */     public static void listenAndExecute(String hostName, int port) throws IOException {             //1、利用springframework创建线程池         RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {             @Override             public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {                 new ThreadPoolExecutor.AbortPolicy();             }         };         threadPoolTaskExecutor = new ThreadPoolTaskExecutor();         threadPoolTaskExecutor.setCorePoolSize(10);         threadPoolTaskExecutor.setMaxPoolSize(100);         threadPoolTaskExecutor.setQueueCapacity(2000);         threadPoolTaskExecutor.setRejectedExecutionHandler(rejectedExecutionHandler);         threadPoolTaskExecutor.initialize();             //2绑定socket服务,监听socket客户端请求         ServerSocket serverSocket = new ServerSocket();         serverSocket.bind(new InetSocketAddress(hostName, port));             //3当客户端代理请求到达时,接收socket客户端请求,并开启一个执行真实服务调用的线程         while (true) {             threadPoolTaskExecutor.execute(new PublishTask(serverSocket.accept()));         }     }             /**      * 接收客户端代理的RPC请求,反序列化数据对象,实现真实的服务调用      */     public static class PublishTask implements Runnable {             Socket socket = null;             public PublishTask(Socket socket) {             this.socket = socket;         }             @Override         public void run() {                 ObjectInputStream inputStream = null;             ObjectOutputStream outputStream = null;                 try {                     //socket.getInputStream() 是读取和客户端代理发送过来的序列化对象                 //该对象包含了本次服务调用的类名 方法名 参数类型 和 参数值                 inputStream = new ObjectInputStream(socket.getInputStream());                     // 反序列化数据                 // 读取接口名 方法名称 参数类型 参数值                 String interfaceName = inputStream.readUTF();                 String methodName = inputStream.readUTF();                 Class<?>[] parameterTypes = (Class<?>[]) inputStream.readObject();                 Object[] args = (Object[]) inputStream.readObject();                     //通过反射实例化该类对象                 Class<?> service = Class.forName(interfaceName);                     //通过类对象和方法名 参数类型获取本类要执行的具体方法                 Method method = service.getMethod(methodName, parameterTypes);                     //调用该方法(在这里利用反射实现真实类的方法实际调用) 返回结果                 Object result = method.invoke(service.newInstance(), args);                 outputStream = new ObjectOutputStream(socket.getOutputStream());                 outputStream.writeObject(result);                 } catch (Exception e) {                 e.printStackTrace();             } finally {                 if (inputStream != null) {                     try {                         inputStream.close();                     } catch (IOException e) {                         e.printStackTrace();                     }                 }                 if (outputStream != null) {                     try {                         outputStream.close();                     } catch (IOException e) {                         e.printStackTrace();                     }                 }                 if (socket != null) {                     try {                         socket.close();                     } catch (IOException e) {                         e.printStackTrace();                     }                 }             }         }     } }     (3) 客户端代理代码实现   package com.zerone.rpcdemo;     import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.Socket;     /** * Created by Andy ye on 2019/4/12. * S 代表了调用的真实服务类实现的接口 */ public class RpcClientProxy<S> {         public S callRealService(final Class<?> serviceClass, final InetSocketAddress addr) {             //返回的是真实服务类实现的接口         return (S) Proxy.newProxyInstance(                 serviceClass.getClassLoader(),                 new Class<?>[]{serviceClass.getInterfaces()[0]},                 new InvocationHandler() {                     @Override                     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {                         Socket socket = null;                         ObjectOutputStream outputStream = null;                         ObjectInputStream inputStream = null;                             try {                                 //建立socket连接客户端                             socket = new Socket();                             socket.connect(addr);                                 // 构造服务调用数据 接口 方法 参数类型 参数值                             outputStream = new ObjectOutputStream(socket.getOutputStream());                             outputStream.writeUTF(serviceClass.getName());                             outputStream.writeUTF(method.getName());                             outputStream.writeObject(method.getParameterTypes());                             outputStream.writeObject(args);                                 // 序列化数据对象,以便将其发送到RPC监听服务器                             inputStream = new ObjectInputStream(socket.getInputStream());                             return inputStream.readObject();                             } catch (Exception e) {                             e.printStackTrace();                             return null;                         } finally {                             if (socket != null) {                                 socket.close();                             }                             if (outputStream != null) {                                 outputStream.close();                             }                             if (inputStream != null) {                                 inputStream.close();                             }                         }                         }                 }         );     } }       (4)测试消费者调用RPC   package com.zerone.rpcdemo;     import java.net.InetSocketAddress;     /** * Created by Andy ye on 2019/4/12. * * @author Andy */ public class ClientTest {         public static void main(String[] args) {             //开启一个RPC服务监听处理器         new Thread(new Runnable() {             @Override             public void run() {                 try {                     RpcServiceListenExecutor.listenAndExecute("localhost", 8088);                 } catch (Exception e) {                     e.printStackTrace();                 }             }         }).start();             //创建客户端代理,构造RPC请求参数,发起RPC调用         RpcClientProxy<IRealService> proxy = new RpcClientProxy<IRealService>();         IRealService realService = proxy.callRealService(RealServiceImpl.class,                 new InetSocketAddress("localhost", 8088));         realService.sayHello();     } }     结果:   【RPC】手撸一个简单的RPC框架实现 随笔 第1张 【RPC】手撸一个简单的RPC框架实现 随笔 第2张           一个细节:当我使用Executors创建线程池时,提示我通过ThreadPoolExcutor的方式去创建会避免资源消耗殆尽的风险 ThreadPoolExcutor可以指定线程的最小创建数量和最大创建数量,以此来控制线程池占用的内存 【RPC】手撸一个简单的RPC框架实现 随笔 第3张 【RPC】手撸一个简单的RPC框架实现 随笔 第4张                                                                      
扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄