1.概述
在这里RPC实现其实就是分三部分, 分别是 协议定义&实现 , Server端实现和Client实现. 三个部分. 下面会分别进行讲述
2.协议实现
2.1.定义协议
其实就是根据业务需要定义一个借口协议.
协议详情可以参看: Hadoop3.2.1 【 HDFS 】源码分析 : RPC实现 [一] proto接口协议
示例如下:
-
/**
-
* 协议接口
-
*/
-
public
interface
ClicentNameNodeProtocol {
-
//1. 定义协议的ID
-
public
static final
long versionID =
1L;
-
/**
-
* 拿到元数据方法,协议通信前会访问元数据
-
*/
-
public String getMetaData(String path);
-
}
2.2.实现协议
实现协议这个也很简单, 就是根据接口创建一个实现类. 用的时候注册到Server服务中即可.
示例如下:
-
-
/**
-
* 实现协议结构
-
*/
-
public
class ClicentNameNodeImpl implements ClicentNameNodeProtocol {
-
public String getMetaData(String path) {
-
// 数据存放的路径,有多少块,块大小,校验和,存储在哪一台机器上
-
return path +
":3 - {BLOCK_1,BLOCK_2,BLOCK_3....";
-
}
-
}
整理的调用逻辑图,参考如下. 后面来具体分析.
3. RPC Server处理流程
3.1.Server架构图
3.2.Server组件讲解
■ Listener:
Listener对象中存在一个Selector对象acceptSelector, 负责监听来自客户端的Socket连接请求。 当acceptSelector监听到连接请求后, Listener对象会初始化这个连接, 之后采用轮询的方式从readers线程池中选出一个Reader线程处理RPC请求的读取操作。■ Reader:
用于读取RPC请求。 Reader线程类中存在一个Selector对象readSelector, 类似于Reactor模式中的readReactor, 这个对象用于监听网络中是否有可以读取的RPC请求。 当readSelector监听到有可读的RPC请求后, 会唤醒Reader线程读取这个请求, 并将请求封装在一个Call对象中, 然后将这个Call对象放入共享队列CallQueue中。■ Handler:
用于处理RPC请求并发回响应。Handler对象会从CallQueue中不停地取出RPC请求, 然后执行RPC请求对应的本地函数, 最后封装响应并将响应发回客户端。 为了能够并发地处理RPC请求,Server中会存在多个Handler对象。■ Responder:
用于向客户端发送RPC响应, 响应很大或者网络条件不佳等情况下, Handler线程很难将完整的响应发回客户端, 这就会造成Handler线程阻塞, 从而影响RPC请求的处理效率。 所以Handler在没能够将完整的RPC响应发回客户端时, 会在Responder内部的respondSelector上注册一个写响应事件, 这里的respondSelector与Reactor模式的respondSelector概念相同, 当respondSelector监听到网络情况具备写响应的条件时, 会通知Responder将剩余响应发回客户端。
3.2.Server组件代码讲解
Server服务是用RPC.Builder类中的build()方法进行构建的. 下面是构建Server的模拟代码.
构建server的模拟代码:
-
public
class Server {
-
public static void main(String[] args) throws IOException {
-
//1. 构建RPC框架
-
RPC.Builder builder =
new RPC.Builder(
new Configuration());
-
//2. 绑定地址
-
builder.setBindAddress(
"localhost");
-
//3. 绑定端口
-
builder.setPort(
7777);
-
//4. 绑定协议
-
builder.setProtocol(ClicentNameNodeProtocol.class);
-
//5. 调用协议实现类
-
builder.setInstance(
new ClicentNameNodeImpl());
-
//6. 创建服务
-
RPC.Server server = builder.build();
-
//7. 启动服务
-
server.start();
-
}
-
}
其实就是通过RPC.Builder 构建一个Server对象.
Builder构建对象中包含构建Server的各种属性. 如 IP, 端口, 协议, 实现类等等. 一个builer 只能绑定一个协议和实现类.
当Builder中的各种属性填充完, 满足构建Server的条件之后, 就会构建Server对象. 并且调用Server的start方法,启动Server.
-
public
static
class Builder {
-
//设置协议
-
private Class<?> protocol =
null;
-
//设置协议的实例
-
private Object instance =
null;
-
//设置绑定地址
-
private String bindAddress =
"0.0.0.0";
-
//设置端口
-
private
int port =
0;
-
//这是处理任务的hadnler数量
-
private
int numHandlers =
1;
-
//设置读取任务的县城数量
-
private
int numReaders = -
1;
-
private
int queueSizePerHandler = -
1;
-
private
boolean verbose =
false;
-
private
final Configuration conf;
-
private SecretManager<? extends TokenIdentifier> secretManager =
null;
-
private String portRangeConfig =
null;
-
private AlignmentContext alignmentContext =
null;
-
-
public Builder(Configuration conf) {
-
this.conf = conf;
-
}
-
-
-
-
-
}
根据上面的代码,其实最核心的是创建Server服务
RPC.Server server = builder.build();
接下来,我们看看, 上面这行代码,干了什么. 通过怎样的方式构建了一个Server服务.
-
/**
-
* Build the RPC Server.
-
* @throws IOException on error
-
* @throws HadoopIllegalArgumentException when mandatory fields are not set
-
*/
-
public Server build() throws IOException, HadoopIllegalArgumentException {
-
if (
this.conf ==
null) {
-
throw new HadoopIllegalArgumentException(
"conf is not set");
-
}
-
if (
this.protocol ==
null) {
-
throw new HadoopIllegalArgumentException(
"protocol is not set");
-
}
-
if (
this.instance ==
null) {
-
throw new HadoopIllegalArgumentException(
"instance is not set");
-
}
-
-
//调用getProtocolEngine()获取当前RPC类配置的RpcEngine对象
-
//在 NameNodeRpcServer的构造方法中已经将
-
// 当前RPC类的RpcEngine对象设置为 ProtobufRpcEngine了。
-
// 获取了ProtobufRpcEngine对象之后,build()方法会在
-
// ProtobufRpcEngine对象上调用getServer()方法获取一个RPC Server对象的引用。
-
-
return getProtocolEngine(
this.protocol,
this.conf).getServer(
-
this.protocol,
this.instance,
this.bindAddress,
this.port,
-
this.numHandlers,
this.numReaders,
this.queueSizePerHandler,
-
this.verbose,
this.conf,
this.secretManager,
this.portRangeConfig,
-
this.alignmentContext);
-
}
看到这, 其实最主要的是getProtocolEngine方法,获取RpcEngine. 这个方法加了synchronized关键字, 所以是个同步方法.
-
// return the RpcEngine configured to handle a protocol
-
static synchronized
RpcEngine getProtocolEngine(
Class<?>
protocol,
-
Configuration
conf) {
-
//从缓存中获取RpcEngine
-
RpcEngine engine =
PROTOCOL_ENGINES.
get(
protocol);
-
if (
engine ==
null) {
-
-
//获取RpcEngine实现
-
Class<?> impl = conf.getClass(
ENGINE_PROP+
"."+
protocol.getName(),
-
WritableRpcEngine.class);
-
-
engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
-
PROTOCOL_ENGINES.put(protocol, engine);
-
}
-
return engine;
-
}
RpcEngine有两种ProtobufRpcEngine和WritableRpcEngine, 默认是WritableRpcEngine(已过时). 本文代码会走默认的WritableRpcEngine.
后面会有单独的文章来解释具体的实现.
WritableRpcEngine获取Server.
-
/* Construct a server for a protocol implementation instance listening on a
-
* port and address. */
-
@Override
-
public RPC.
Server getServer(Class<?> protocolClass,
-
Object protocolImpl, String bindAddress, int port,
-
int numHandlers, int numReaders, int queueSizePerHandler,
-
boolean verbose, Configuration conf,
-
SecretManager<? extends TokenIdentifier> secretManager,
-
String portRangeConfig, AlignmentContext alignmentContext)
-
throws IOException {
-
return
new Server(protocolClass, protocolImpl, conf, bindAddress, port,
-
numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
-
portRangeConfig, alignmentContext);
-
}
ProtobufRpcEngine获取Server.
-
@Override
-
public RPC.
Server getServer(Class<?> protocol, Object protocolImpl,
-
String bindAddress, int port, int numHandlers, int numReaders,
-
int queueSizePerHandler, boolean verbose, Configuration conf,
-
SecretManager<? extends TokenIdentifier> secretManager,
-
String portRangeConfig, AlignmentContext alignmentContext)
-
throws IOException {
-
return
new Server(protocol, protocolImpl, conf, bindAddress, port,
-
numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
-
portRangeConfig, alignmentContext);
-
}
这两个类型的RpcEngine都会调用父类的初始化.
比如初始化 listener , handlers, responder,connectionManager 等等..
-
protected Server(String bindAddress, int port,
-
Class<? extends Writable> rpcRequestClass, int handlerCount,
-
int numReaders, int queueSizePerHandler, Configuration conf,
-
String serverName, SecretManager<? extends TokenIdentifier> secretManager,
-
String portRangeConfig)
-
throws IOException {
-
this.bindAddress = bindAddress;
-
this.conf = conf;
-
this.portRangeConfig = portRangeConfig;
-
this.port = port;
-
this.rpcRequestClass = rpcRequestClass;
-
this.handlerCount = handlerCount;
-
this.socketSendBufferSize =
0;
-
this.serverName = serverName;
-
this.auxiliaryListenerMap =
null;
-
this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
-
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
-
if (queueSizePerHandler !=
-1) {
-
this.maxQueueSize = handlerCount * queueSizePerHandler;
-
}
else {
-
this.maxQueueSize = handlerCount * conf.getInt(
-
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
-
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
-
}
-
this.maxRespSize = conf.getInt(
-
CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
-
CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
-
if (numReaders !=
-1) {
-
this.readThreads = numReaders;
-
}
else {
-
this.readThreads = conf.getInt(
-
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
-
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
-
}
-
this.readerPendingConnectionQueue = conf.getInt(
-
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
-
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
-
-
// Setup appropriate callqueue
-
final String prefix = getQueueClassPrefix();
-
this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),
-
getSchedulerClass(prefix, conf),
-
getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf);
-
-
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
-
this.authorize =
-
conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
-
false);
-
-
// configure supported authentications
-
this.enabledAuthMethods = getAuthMethods(secretManager, conf);
-
this.negotiateResponse = buildNegotiateResponse(enabledAuthMethods);
-
-
// Start the listener here and let it bind to the port
-
listener = new Listener(port);
-
// set the server port to the default listener port.
-
this.port = listener.getAddress().getPort();
-
connectionManager = new ConnectionManager();
-
this.rpcMetrics = RpcMetrics.create(
this, conf);
-
this.rpcDetailedMetrics = RpcDetailedMetrics.create(
this.port);
-
this.tcpNoDelay = conf.getBoolean(
-
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
-
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_DEFAULT);
-
-
this.setLogSlowRPC(conf.getBoolean(
-
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC,
-
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT));
-
-
// Create the responder here
-
responder = new Responder();
-
-
if (secretManager !=
null || UserGroupInformation.isSecurityEnabled()) {
-
SaslRpcServer.init(conf);
-
saslPropsResolver = SaslPropertiesResolver.getInstance(conf);
-
}
-
-
this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.
class);
-
}
看到这里就不细说了,我后面会单独拎出章节来写这两个引擎.
最后调用start 方法, 开启server服务.
-
/** Starts the service. Must be called before any calls will be handled. */
-
public synchronized void start() {
-
responder.start();
-
listener.start();
-
if (auxiliaryListenerMap !=
null && auxiliaryListenerMap.size() >
0) {
-
for (Listener newListener : auxiliaryListenerMap.values()) {
-
newListener.start();
-
}
-
}
-
-
handlers =
new Handler[handlerCount];
-
-
for (
int i =
0; i < handlerCount; i++) {
-
handlers[i] =
new Handler(i);
-
handlers[i].start();
-
}
-
}
2. Client 实现
先看一下client调用server端的代码样例. 其实就是通过RPC.getProxy 方法获取server端的代理对象, 然后再通过代理对象调用具体的方法,代理对象根据方法,请求server端, 获取数据. 最终将数据返回给客户端.
-
/**
-
* 访问RPC服务
-
*/
-
public
class
Client {
-
public static void main(String[] args) throws IOException {
-
//1. 拿到RPC协议
-
ClicentNameNodeProtocol proxy = RPC.getProxy(ClicentNameNodeProtocol.class,
1L,
-
new InetSocketAddress(
"localhost",
7777),
new Configuration());
-
//2. 发送请求
-
String metaData = proxy.getMetaData(
"/meta");
-
//3. 打印元数据
-
System.
out.println(metaData);
-
}
-
}
先看一下如何获取代理对象.
-
/**
-
* Get a protocol proxy that contains a proxy connection to a remote server
-
* and a set of methods that are supported by the server
-
*
-
* @param protocol protocol
-
* @param clientVersion client's version
-
* @param addr server address
-
* @param ticket security ticket
-
* @param conf configuration
-
* @param factory socket factory
-
* @param rpcTimeout max time for each rpc; 0 means no timeout
-
* @param connectionRetryPolicy retry policy
-
* @param fallbackToSimpleAuth set to true or false during calls to indicate if
-
* a secure client falls back to simple auth
-
* @return the proxy
-
* @throws IOException if any error occurs
-
*/
-
public
static <T>
ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
-
long clientVersion,
-
InetSocketAddress addr,
-
UserGroupInformation ticket,
-
Configuration conf,
-
SocketFactory factory,
-
int rpcTimeout,
-
RetryPolicy connectionRetryPolicy,
-
AtomicBoolean fallbackToSimpleAuth)
-
throws IOException {
-
if (UserGroupInformation.isSecurityEnabled()) {
-
SaslRpcServer.init(conf);
-
}
-
return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
-
addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
-
fallbackToSimpleAuth,
null);
-
}
根据上面的方法, 先通过getProtocolEngine(protocol, conf) 这个方法, 获取到RpcEngine, 然后调用getProxy 获取对应的对象.
这个是getProxy 的接口定义, 根据RpcEngine 的不同, 实现方式也不同. 后面会有文章做讲述.
-
/** Construct a client-side proxy object. */
-
<T>
ProtocolProxy<T> getProxy(Class<T> protocol,
-
long clientVersion, InetSocketAddress addr,
-
UserGroupInformation ticket, Configuration conf,
-
SocketFactory factory, int rpcTimeout,
-
RetryPolicy connectionRetryPolicy,
-
AtomicBoolean fallbackToSimpleAuth,
-
AlignmentContext alignmentContext)
throws IOException;
拿到代理对象之后, 就可以像本地一样调用里面的方法了.
参考:
Hadoop 2.X HDFS源码剖析 -- 徐鹏
转载:https://blog.csdn.net/zhanglong_4444/article/details/105607870