RPC框架设计与实现原理
RPC框架核心概念
RPC(Remote Procedure Call)远程过程调用,其目标是让远程服务调用像本地方法调用一样简单。一个完整的RPC框架需要解决服务发现、网络传输、序列化、负载均衡等多个技术问题。
整体架构设计
mermaid
graph TD
subgraph 服务消费方
A["业务代码"]
B["动态代理"]
C["负载均衡"]
D["网络传输"]
end
subgraph 注册中心
E["服务注册表"]
end
subgraph 服务提供方
F["网络传输"]
G["请求处理"]
H["业务实现"]
end
A --> B
B --> C
C --> D
D <--> F
F --> G
G --> H
D -.->|服务发现| E
F -.->|服务注册| E
style A fill:#4A90E2,color:#fff,rx:10,ry:10
style B fill:#9B59B6,color:#fff,rx:10,ry:10
style E fill:#E67E22,color:#fff,rx:10,ry:10
style H fill:#27AE60,color:#fff,rx:10,ry:10核心组件职责
| 组件 | 职责描述 | 技术选型 |
|---|---|---|
| 注册中心 | 服务地址的注册与发现 | ZooKeeper/Nacos/Consul |
| 网络传输 | 客户端与服务端的数据通信 | Netty/Mina |
| 序列化 | 对象与字节流的相互转换 | Protobuf/Kryo/Hessian |
| 动态代理 | 屏蔽远程调用细节 | JDK动态代理/CGLIB |
| 负载均衡 | 多服务实例间的请求分发 | 随机/轮询/一致性哈希 |
注册中心设计
ZooKeeper作为注册中心
mermaid
graph TD
A["ZooKeeper集群"] --> B["/rpc"]
B --> C["/services"]
C --> D["/com.example.UserService"]
C --> E["/com.example.OrderService"]
D --> F["192.168.1.10:8080"]
D --> G["192.168.1.11:8080"]
E --> H["192.168.1.20:9090"]
style A fill:#4A90E2,color:#fff,rx:10,ry:10
style D fill:#27AE60,color:#fff,rx:10,ry:10
style E fill:#27AE60,color:#fff,rx:10,ry:10
style F fill:#E67E22,color:#fff,rx:10,ry:10
style G fill:#E67E22,color:#fff,rx:10,ry:10服务注册实现:
java
public class ZkServiceRegistry implements ServiceRegistry {
private final CuratorFramework zkClient;
private static final String BASE_PATH = "/rpc/services";
public ZkServiceRegistry(String zkAddress) {
this.zkClient = CuratorFrameworkFactory.builder()
.connectString(zkAddress)
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
this.zkClient.start();
}
@Override
public void register(ServiceMeta serviceMeta) throws Exception {
String servicePath = BASE_PATH + "/" + serviceMeta.getServiceName();
String instancePath = servicePath + "/" +
serviceMeta.getAddress() + ":" + serviceMeta.getPort();
// 创建服务节点(持久节点)
if (zkClient.checkExists().forPath(servicePath) == null) {
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(servicePath);
}
// 创建实例节点(临时节点,服务下线自动删除)
zkClient.create()
.withMode(CreateMode.EPHEMERAL)
.forPath(instancePath,
JSON.toJSONBytes(serviceMeta));
}
@Override
public List<ServiceMeta> discover(String serviceName) throws Exception {
String servicePath = BASE_PATH + "/" + serviceName;
List<String> instances = zkClient.getChildren().forPath(servicePath);
List<ServiceMeta> serviceMetaList = new ArrayList<>();
for (String instance : instances) {
byte[] data = zkClient.getData()
.forPath(servicePath + "/" + instance);
serviceMetaList.add(JSON.parseObject(data, ServiceMeta.class));
}
return serviceMetaList;
}
}服务订阅与通知
mermaid
sequenceDiagram
participant Consumer as 服务消费者
participant ZK as ZooKeeper
participant Provider as 服务提供者
Provider->>ZK: 注册服务实例
Consumer->>ZK: 订阅服务列表
ZK-->>Consumer: 返回服务实例列表
Consumer->>Consumer: 缓存服务地址
Note over Provider: 新实例上线
Provider->>ZK: 注册新实例
ZK-->>Consumer: 推送变更通知
Consumer->>Consumer: 更新本地缓存网络传输层设计
基于Netty的传输实现
Netty是高性能NIO框架,非常适合RPC场景:
java
public class RpcServer {
private final int port;
private final Map<String, Object> serviceMap = new ConcurrentHashMap<>();
public void start() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 解决粘包拆包
pipeline.addLast(new LengthFieldBasedFrameDecoder(
65535, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
// 序列化处理
pipeline.addLast(new RpcDecoder());
pipeline.addLast(new RpcEncoder());
// 业务处理
pipeline.addLast(new RpcServerHandler(serviceMap));
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public void registerService(String serviceName, Object serviceImpl) {
serviceMap.put(serviceName, serviceImpl);
}
}自定义协议设计
RPC协议需要定义清晰的消息格式:
mermaid
graph LR
A["魔数<br/>4字节"] --> B["版本号<br/>1字节"]
B --> C["序列化类型<br/>1字节"]
C --> D["消息类型<br/>1字节"]
D --> E["请求ID<br/>8字节"]
E --> F["消息体长度<br/>4字节"]
F --> G["消息体<br/>N字节"]
style A fill:#E74C3C,color:#fff,rx:10,ry:10
style B fill:#E67E22,color:#fff,rx:10,ry:10
style C fill:#4A90E2,color:#fff,rx:10,ry:10
style D fill:#9B59B6,color:#fff,rx:10,ry:10
style E fill:#27AE60,color:#fff,rx:10,ry:10
style F fill:#3498DB,color:#fff,rx:10,ry:10
style G fill:#1ABC9C,color:#fff,rx:10,ry:10协议定义:
java
@Data
public class RpcProtocol {
// 魔数,用于识别协议
public static final int MAGIC_NUMBER = 0xCAFEBABE;
// 协议版本
private byte version = 1;
// 序列化类型:1-JSON, 2-Protobuf, 3-Kryo
private byte serializerType;
// 消息类型:1-请求, 2-响应, 3-心跳
private byte messageType;
// 请求ID,用于异步响应匹配
private long requestId;
// 消息体
private byte[] body;
}
@Data
public class RpcRequest {
private String serviceName;
private String methodName;
private Class<?>[] parameterTypes;
private Object[] parameters;
}
@Data
public class RpcResponse {
private long requestId;
private Object result;
private String errorMessage;
private boolean success;
}序列化方案选型
常用序列化方案对比
| 方案 | 性能 | 空间 | 跨语言 | 可读性 |
|---|---|---|---|---|
| JDK原生 | 差 | 大 | 否 | 差 |
| JSON | 中 | 大 | 是 | 好 |
| Protobuf | 优 | 小 | 是 | 差 |
| Kryo | 优 | 小 | 否 | 差 |
| Hessian | 良 | 中 | 是 | 差 |
Kryo序列化实现
java
public class KryoSerializer implements RpcSerializer {
private static final ThreadLocal<Kryo> KRYO_LOCAL = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
kryo.setRegistrationRequired(false);
kryo.setReferences(true);
return kryo;
});
@Override
public byte[] serialize(Object obj) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Output output = new Output(baos);
KRYO_LOCAL.get().writeClassAndObject(output, obj);
output.close();
return baos.toByteArray();
}
@Override
public <T> T deserialize(byte[] data, Class<T> clazz) {
Input input = new Input(new ByteArrayInputStream(data));
Object obj = KRYO_LOCAL.get().readClassAndObject(input);
input.close();
return clazz.cast(obj);
}
}动态代理实现
动态代理是RPC框架的核心,使远程调用对业务代码透明:
mermaid
sequenceDiagram
participant Biz as 业务代码
participant Proxy as 代理对象
participant LB as 负载均衡
participant Net as 网络层
participant Server as 远程服务
Biz->>Proxy: 调用接口方法
Proxy->>Proxy: 封装RpcRequest
Proxy->>LB: 选择服务实例
LB-->>Proxy: 返回服务地址
Proxy->>Net: 发送网络请求
Net->>Server: TCP传输
Server-->>Net: 返回结果
Net-->>Proxy: 解析响应
Proxy-->>Biz: 返回方法结果JDK动态代理实现:
java
public class RpcClientProxy implements InvocationHandler {
private final ServiceDiscovery serviceDiscovery;
private final LoadBalancer loadBalancer;
private final RpcTransport transport;
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
// 构建RPC请求
RpcRequest request = new RpcRequest();
request.setServiceName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
// 服务发现
String serviceName = method.getDeclaringClass().getName();
List<ServiceMeta> instances = serviceDiscovery.discover(serviceName);
if (instances.isEmpty()) {
throw new RpcException("No available service: " + serviceName);
}
// 负载均衡
ServiceMeta selected = loadBalancer.select(instances);
// 发送请求
RpcResponse response = transport.send(request, selected);
if (!response.isSuccess()) {
throw new RpcException(response.getErrorMessage());
}
return response.getResult();
}
@SuppressWarnings("unchecked")
public <T> T createProxy(Class<T> interfaceClass) {
return (T) Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
this
);
}
}负载均衡策略
常用负载均衡算法
mermaid
graph TD
A["负载均衡策略"] --> B["随机"]
A --> C["轮询"]
A --> D["加权轮询"]
A --> E["一致性哈希"]
A --> F["最小连接数"]
B --> B1["简单随机分配"]
C --> C1["依次轮流分配"]
D --> D1["按权重比例分配"]
E --> E1["相同key路由到相同节点"]
F --> F1["选择连接数最少的节点"]
style A fill:#4A90E2,color:#fff,rx:10,ry:10
style B fill:#27AE60,color:#fff,rx:10,ry:10
style C fill:#E67E22,color:#fff,rx:10,ry:10
style D fill:#9B59B6,color:#fff,rx:10,ry:10
style E fill:#E74C3C,color:#fff,rx:10,ry:10
style F fill:#3498DB,color:#fff,rx:10,ry:10一致性哈希实现:
java
public class ConsistentHashLoadBalancer implements LoadBalancer {
private static final int VIRTUAL_NODES = 160;
@Override
public ServiceMeta select(List<ServiceMeta> instances, String routeKey) {
TreeMap<Long, ServiceMeta> hashRing = new TreeMap<>();
// 构建哈希环
for (ServiceMeta instance : instances) {
for (int i = 0; i < VIRTUAL_NODES; i++) {
String nodeKey = instance.getAddress() + ":" +
instance.getPort() + "#" + i;
long hash = hash(nodeKey);
hashRing.put(hash, instance);
}
}
// 查找节点
long requestHash = hash(routeKey);
SortedMap<Long, ServiceMeta> tailMap = hashRing.tailMap(requestHash);
if (tailMap.isEmpty()) {
return hashRing.firstEntry().getValue();
}
return tailMap.get(tailMap.firstKey());
}
private long hash(String key) {
// 使用MurmurHash
return Hashing.murmur3_128().hashString(key,
StandardCharsets.UTF_8).asLong();
}
}容错与重试机制
容错策略
mermaid
graph TD
A["调用失败"] --> B{"容错策略"}
B --> C["Failover"]
B --> D["Failfast"]
B --> E["Failsafe"]
B --> F["Failback"]
C --> C1["切换节点重试"]
D --> D1["直接报错"]
E --> E1["忽略错误"]
F --> F1["异步补偿"]
style A fill:#E74C3C,color:#fff,rx:10,ry:10
style C fill:#27AE60,color:#fff,rx:10,ry:10
style D fill:#E67E22,color:#fff,rx:10,ry:10
style E fill:#9B59B6,color:#fff,rx:10,ry:10
style F fill:#4A90E2,color:#fff,rx:10,ry:10Failover重试实现:
java
public class FailoverInvoker implements Invoker {
private static final int MAX_RETRIES = 3;
@Override
public RpcResponse invoke(RpcRequest request, List<ServiceMeta> instances) {
int retries = 0;
RpcException lastException = null;
Set<String> failedNodes = new HashSet<>();
while (retries < MAX_RETRIES) {
// 排除失败节点
List<ServiceMeta> availableInstances = instances.stream()
.filter(i -> !failedNodes.contains(i.getAddress()))
.collect(Collectors.toList());
if (availableInstances.isEmpty()) {
break;
}
ServiceMeta selected = loadBalancer.select(availableInstances);
try {
return transport.send(request, selected);
} catch (RpcException e) {
lastException = e;
failedNodes.add(selected.getAddress());
retries++;
}
}
throw new RpcException("All retries failed", lastException);
}
}更新: 2025-12-06 17:32:08
原文: https://www.yuque.com/u22210564/zoxfmt/ylug88ozmo4wsa0f