Skip to content

RPC框架设计与实现原理

RPC框架核心概念

RPC(Remote Procedure Call)远程过程调用,其目标是让远程服务调用像本地方法调用一样简单。一个完整的RPC框架需要解决服务发现、网络传输、序列化、负载均衡等多个技术问题。

整体架构设计

mermaid
graph TD
    subgraph 服务消费方
        A["业务代码"]
        B["动态代理"]
        C["负载均衡"]
        D["网络传输"]
    end
    
    subgraph 注册中心
        E["服务注册表"]
    end
    
    subgraph 服务提供方
        F["网络传输"]
        G["请求处理"]
        H["业务实现"]
    end
    
    A --> B
    B --> C
    C --> D
    D <--> F
    F --> G
    G --> H
    
    D -.->|服务发现| E
    F -.->|服务注册| E
    
    style A fill:#4A90E2,color:#fff,rx:10,ry:10
    style B fill:#9B59B6,color:#fff,rx:10,ry:10
    style E fill:#E67E22,color:#fff,rx:10,ry:10
    style H fill:#27AE60,color:#fff,rx:10,ry:10

核心组件职责

组件职责描述技术选型
注册中心服务地址的注册与发现ZooKeeper/Nacos/Consul
网络传输客户端与服务端的数据通信Netty/Mina
序列化对象与字节流的相互转换Protobuf/Kryo/Hessian
动态代理屏蔽远程调用细节JDK动态代理/CGLIB
负载均衡多服务实例间的请求分发随机/轮询/一致性哈希

注册中心设计

ZooKeeper作为注册中心

mermaid
graph TD
    A["ZooKeeper集群"] --> B["/rpc"]
    B --> C["/services"]
    C --> D["/com.example.UserService"]
    C --> E["/com.example.OrderService"]
    
    D --> F["192.168.1.10:8080"]
    D --> G["192.168.1.11:8080"]
    E --> H["192.168.1.20:9090"]
    
    style A fill:#4A90E2,color:#fff,rx:10,ry:10
    style D fill:#27AE60,color:#fff,rx:10,ry:10
    style E fill:#27AE60,color:#fff,rx:10,ry:10
    style F fill:#E67E22,color:#fff,rx:10,ry:10
    style G fill:#E67E22,color:#fff,rx:10,ry:10

服务注册实现

java
public class ZkServiceRegistry implements ServiceRegistry {
    
    private final CuratorFramework zkClient;
    private static final String BASE_PATH = "/rpc/services";
    
    public ZkServiceRegistry(String zkAddress) {
        this.zkClient = CuratorFrameworkFactory.builder()
            .connectString(zkAddress)
            .sessionTimeoutMs(5000)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();
        this.zkClient.start();
    }
    
    @Override
    public void register(ServiceMeta serviceMeta) throws Exception {
        String servicePath = BASE_PATH + "/" + serviceMeta.getServiceName();
        String instancePath = servicePath + "/" + 
            serviceMeta.getAddress() + ":" + serviceMeta.getPort();
        
        // 创建服务节点(持久节点)
        if (zkClient.checkExists().forPath(servicePath) == null) {
            zkClient.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath(servicePath);
        }
        
        // 创建实例节点(临时节点,服务下线自动删除)
        zkClient.create()
            .withMode(CreateMode.EPHEMERAL)
            .forPath(instancePath, 
                JSON.toJSONBytes(serviceMeta));
    }
    
    @Override
    public List<ServiceMeta> discover(String serviceName) throws Exception {
        String servicePath = BASE_PATH + "/" + serviceName;
        List<String> instances = zkClient.getChildren().forPath(servicePath);
        
        List<ServiceMeta> serviceMetaList = new ArrayList<>();
        for (String instance : instances) {
            byte[] data = zkClient.getData()
                .forPath(servicePath + "/" + instance);
            serviceMetaList.add(JSON.parseObject(data, ServiceMeta.class));
        }
        return serviceMetaList;
    }
}

服务订阅与通知

mermaid
sequenceDiagram
    participant Consumer as 服务消费者
    participant ZK as ZooKeeper
    participant Provider as 服务提供者
    
    Provider->>ZK: 注册服务实例
    Consumer->>ZK: 订阅服务列表
    ZK-->>Consumer: 返回服务实例列表
    Consumer->>Consumer: 缓存服务地址
    
    Note over Provider: 新实例上线
    Provider->>ZK: 注册新实例
    ZK-->>Consumer: 推送变更通知
    Consumer->>Consumer: 更新本地缓存

网络传输层设计

基于Netty的传输实现

Netty是高性能NIO框架,非常适合RPC场景:

java
public class RpcServer {
    
    private final int port;
    private final Map<String, Object> serviceMap = new ConcurrentHashMap<>();
    
    public void start() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();
                        // 解决粘包拆包
                        pipeline.addLast(new LengthFieldBasedFrameDecoder(
                            65535, 0, 4, 0, 4));
                        pipeline.addLast(new LengthFieldPrepender(4));
                        // 序列化处理
                        pipeline.addLast(new RpcDecoder());
                        pipeline.addLast(new RpcEncoder());
                        // 业务处理
                        pipeline.addLast(new RpcServerHandler(serviceMap));
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true);
            
            ChannelFuture future = bootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    
    public void registerService(String serviceName, Object serviceImpl) {
        serviceMap.put(serviceName, serviceImpl);
    }
}

自定义协议设计

RPC协议需要定义清晰的消息格式:

mermaid
graph LR
    A["魔数<br/>4字节"] --> B["版本号<br/>1字节"]
    B --> C["序列化类型<br/>1字节"]
    C --> D["消息类型<br/>1字节"]
    D --> E["请求ID<br/>8字节"]
    E --> F["消息体长度<br/>4字节"]
    F --> G["消息体<br/>N字节"]
    
    style A fill:#E74C3C,color:#fff,rx:10,ry:10
    style B fill:#E67E22,color:#fff,rx:10,ry:10
    style C fill:#4A90E2,color:#fff,rx:10,ry:10
    style D fill:#9B59B6,color:#fff,rx:10,ry:10
    style E fill:#27AE60,color:#fff,rx:10,ry:10
    style F fill:#3498DB,color:#fff,rx:10,ry:10
    style G fill:#1ABC9C,color:#fff,rx:10,ry:10

协议定义

java
@Data
public class RpcProtocol {
    
    // 魔数,用于识别协议
    public static final int MAGIC_NUMBER = 0xCAFEBABE;
    
    // 协议版本
    private byte version = 1;
    
    // 序列化类型:1-JSON, 2-Protobuf, 3-Kryo
    private byte serializerType;
    
    // 消息类型:1-请求, 2-响应, 3-心跳
    private byte messageType;
    
    // 请求ID,用于异步响应匹配
    private long requestId;
    
    // 消息体
    private byte[] body;
}

@Data
public class RpcRequest {
    private String serviceName;
    private String methodName;
    private Class<?>[] parameterTypes;
    private Object[] parameters;
}

@Data
public class RpcResponse {
    private long requestId;
    private Object result;
    private String errorMessage;
    private boolean success;
}

序列化方案选型

常用序列化方案对比

方案性能空间跨语言可读性
JDK原生
JSON
Protobuf
Kryo
Hessian

Kryo序列化实现

java
public class KryoSerializer implements RpcSerializer {
    
    private static final ThreadLocal<Kryo> KRYO_LOCAL = ThreadLocal.withInitial(() -> {
        Kryo kryo = new Kryo();
        kryo.setRegistrationRequired(false);
        kryo.setReferences(true);
        return kryo;
    });
    
    @Override
    public byte[] serialize(Object obj) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        Output output = new Output(baos);
        KRYO_LOCAL.get().writeClassAndObject(output, obj);
        output.close();
        return baos.toByteArray();
    }
    
    @Override
    public <T> T deserialize(byte[] data, Class<T> clazz) {
        Input input = new Input(new ByteArrayInputStream(data));
        Object obj = KRYO_LOCAL.get().readClassAndObject(input);
        input.close();
        return clazz.cast(obj);
    }
}

动态代理实现

动态代理是RPC框架的核心,使远程调用对业务代码透明:

mermaid
sequenceDiagram
    participant Biz as 业务代码
    participant Proxy as 代理对象
    participant LB as 负载均衡
    participant Net as 网络层
    participant Server as 远程服务
    
    Biz->>Proxy: 调用接口方法
    Proxy->>Proxy: 封装RpcRequest
    Proxy->>LB: 选择服务实例
    LB-->>Proxy: 返回服务地址
    Proxy->>Net: 发送网络请求
    Net->>Server: TCP传输
    Server-->>Net: 返回结果
    Net-->>Proxy: 解析响应
    Proxy-->>Biz: 返回方法结果

JDK动态代理实现

java
public class RpcClientProxy implements InvocationHandler {
    
    private final ServiceDiscovery serviceDiscovery;
    private final LoadBalancer loadBalancer;
    private final RpcTransport transport;
    
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) 
            throws Throwable {
        
        // 构建RPC请求
        RpcRequest request = new RpcRequest();
        request.setServiceName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameterTypes(method.getParameterTypes());
        request.setParameters(args);
        
        // 服务发现
        String serviceName = method.getDeclaringClass().getName();
        List<ServiceMeta> instances = serviceDiscovery.discover(serviceName);
        
        if (instances.isEmpty()) {
            throw new RpcException("No available service: " + serviceName);
        }
        
        // 负载均衡
        ServiceMeta selected = loadBalancer.select(instances);
        
        // 发送请求
        RpcResponse response = transport.send(request, selected);
        
        if (!response.isSuccess()) {
            throw new RpcException(response.getErrorMessage());
        }
        
        return response.getResult();
    }
    
    @SuppressWarnings("unchecked")
    public <T> T createProxy(Class<T> interfaceClass) {
        return (T) Proxy.newProxyInstance(
            interfaceClass.getClassLoader(),
            new Class<?>[]{interfaceClass},
            this
        );
    }
}

负载均衡策略

常用负载均衡算法

mermaid
graph TD
    A["负载均衡策略"] --> B["随机"]
    A --> C["轮询"]
    A --> D["加权轮询"]
    A --> E["一致性哈希"]
    A --> F["最小连接数"]
    
    B --> B1["简单随机分配"]
    C --> C1["依次轮流分配"]
    D --> D1["按权重比例分配"]
    E --> E1["相同key路由到相同节点"]
    F --> F1["选择连接数最少的节点"]
    
    style A fill:#4A90E2,color:#fff,rx:10,ry:10
    style B fill:#27AE60,color:#fff,rx:10,ry:10
    style C fill:#E67E22,color:#fff,rx:10,ry:10
    style D fill:#9B59B6,color:#fff,rx:10,ry:10
    style E fill:#E74C3C,color:#fff,rx:10,ry:10
    style F fill:#3498DB,color:#fff,rx:10,ry:10

一致性哈希实现

java
public class ConsistentHashLoadBalancer implements LoadBalancer {
    
    private static final int VIRTUAL_NODES = 160;
    
    @Override
    public ServiceMeta select(List<ServiceMeta> instances, String routeKey) {
        TreeMap<Long, ServiceMeta> hashRing = new TreeMap<>();
        
        // 构建哈希环
        for (ServiceMeta instance : instances) {
            for (int i = 0; i < VIRTUAL_NODES; i++) {
                String nodeKey = instance.getAddress() + ":" + 
                    instance.getPort() + "#" + i;
                long hash = hash(nodeKey);
                hashRing.put(hash, instance);
            }
        }
        
        // 查找节点
        long requestHash = hash(routeKey);
        SortedMap<Long, ServiceMeta> tailMap = hashRing.tailMap(requestHash);
        
        if (tailMap.isEmpty()) {
            return hashRing.firstEntry().getValue();
        }
        return tailMap.get(tailMap.firstKey());
    }
    
    private long hash(String key) {
        // 使用MurmurHash
        return Hashing.murmur3_128().hashString(key, 
            StandardCharsets.UTF_8).asLong();
    }
}

容错与重试机制

容错策略

mermaid
graph TD
    A["调用失败"] --> B{"容错策略"}
    
    B --> C["Failover"]
    B --> D["Failfast"]
    B --> E["Failsafe"]
    B --> F["Failback"]
    
    C --> C1["切换节点重试"]
    D --> D1["直接报错"]
    E --> E1["忽略错误"]
    F --> F1["异步补偿"]
    
    style A fill:#E74C3C,color:#fff,rx:10,ry:10
    style C fill:#27AE60,color:#fff,rx:10,ry:10
    style D fill:#E67E22,color:#fff,rx:10,ry:10
    style E fill:#9B59B6,color:#fff,rx:10,ry:10
    style F fill:#4A90E2,color:#fff,rx:10,ry:10

Failover重试实现

java
public class FailoverInvoker implements Invoker {
    
    private static final int MAX_RETRIES = 3;
    
    @Override
    public RpcResponse invoke(RpcRequest request, List<ServiceMeta> instances) {
        int retries = 0;
        RpcException lastException = null;
        Set<String> failedNodes = new HashSet<>();
        
        while (retries < MAX_RETRIES) {
            // 排除失败节点
            List<ServiceMeta> availableInstances = instances.stream()
                .filter(i -> !failedNodes.contains(i.getAddress()))
                .collect(Collectors.toList());
            
            if (availableInstances.isEmpty()) {
                break;
            }
            
            ServiceMeta selected = loadBalancer.select(availableInstances);
            
            try {
                return transport.send(request, selected);
            } catch (RpcException e) {
                lastException = e;
                failedNodes.add(selected.getAddress());
                retries++;
            }
        }
        
        throw new RpcException("All retries failed", lastException);
    }
}

更新: 2025-12-06 17:32:08
原文: https://www.yuque.com/u22210564/zoxfmt/ylug88ozmo4wsa0f

Java 后端面试知识库