飞道的博客

Hadoop Rpc简单实现

308人阅读  评论(0)

一、Hadoop RPC总体架构


  • 序列化层
    序列化作用主要还是将结构化数据对象转换成字节流用于网络传输或写入持久存储。
    在RPC中,主要是将用户请求的参数或者服务器应答转化成字节流跨机器传输。
  • 函数调用层
    作用:定位所需调用的函数并执行函数。
    依赖:Java的反射与Java的动态代理模式来实现
  • 网络传输层
    作用:用于描述Client与Server之间的消息格式。
    依赖:Hadoop RPC 依赖基于TCP/IP协议中的Socket机制
  • 服务器端处理框架
    作用:抽象为网络IO模型,用于述client与server间信息交互方式。
    网络IO模型:阻塞式IO、NIO、事件驱动IO等
    Hadoop RPC:基于Reactor设计模式的事件驱动IO模型。

二、Hadoop Rpc特点


  • 透明性
    这是所有RPC框架最根本的特点,即当用户在一台计算机的程序调用另外一台计算机上的子程序时,用户自身不应感觉到其间涉及跨机器间的通信,而是感觉像是在执行一个本地调用。
  • 高性能
    Hadoop各个系统(如HDFS、YARN、MapReduce等)均采用了Master/Slave结构,其中,Master实际上是一个RPC Server,它负责处理集群中所有Slave发送的服务请求,为了保证Master的并发处理能力,RPCServer应是一个高性能服务器,能够高效地处理来自多个Client的并发RPC请求。
  • 可控性
    相对于RMI(remote method invocation)而言,Hadoop RPC具有轻量级、可控性等优点,即用户程序调用RPC接口可控的地方比较RMI多些。

三、实现步骤


  • 创建Maven项目,添加maven依赖(maven有问题,可以直接导入hadoop相关依赖包)

    	<dependencies>
    	        <dependency>
    	            <groupId>org.apache.hadoop</groupId>
    	            <artifactId>hadoop-common</artifactId>
    	            <version>2.7.3</version>
    	        </dependency>
    	        <dependency>
    	            <groupId>org.apache.hadoop</groupId>
    	            <artifactId>hadoop-hdfs</artifactId>
    	            <version>2.7.3</version>
    	        </dependency>
    	        <dependency>
    	            <groupId>org.apache.hadoop</groupId>
    	            <artifactId>hadoop-mapreduce-client-common</artifactId>
    	            <version>2.7.3</version>
    	        </dependency>
    	        <dependency>
    	            <groupId>org.apache.hadoop</groupId>
    	            <artifactId>hadoop-mapreduce-client-core</artifactId>
    	            <version>2.7.3</version>
    	        </dependency>
    
  • 定义接口协议

    /**
     * 1、定义RPC接口协议,添加自定义方法
     * 必须继承Hadoop提供的接口VersionedProtocol
     */
    public interface RPCProtocol {
         
        //RPC client 和 server 之间必须使用相同的版本的协议才能进行正常通信
        public static final long versionID = 1L;
        //自定义方法
        public String echo(String value) throws Exception;
        public int add(int v1, int v2) throws Exception;
    }
    
  • 定义接口协议实现类

    import org.apache.hadoop.ipc.ProtocolSignature;
    import java.io.IOException;
    
    /**
     * 2、定义一个Java类,实现自定义的RCP接口
     */
    public class RPCProtocolImplement implements RPCProtocol{
         
        public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
         
            System.out.println("===getProtocolVersion被调用===protocol=" + protocol + "\t clientVersion=" + clientVersion);
            return RPCProtocol.versionID;
        }
        public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException {
         
            System.out.println("===getProtocolSignature被调用===protocol=" + protocol + ",clientVersion=" + clientVersion + ",clientMethodsHash=" + clientMethodsHash);
            return new ProtocolSignature(RPCProtocol.versionID,null);
        }
    
        public String echo(String value) throws Exception {
         
            System.out.println("好的,我已收到你的信息");
            return value;
        }
    
        public int add(int v1, int v2) throws Exception {
         
            System.out.println("正在帮您计算,稍等片刻……");
            int sum = v1 + v2;
            System.out.println("计算完毕,结果为:" + sum);
            return sum;
        }
    }
    
  • 定义服务端(Server)

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.ipc.RPC;
    
    import java.io.IOException;
    
    /**
     * 3、构造RPC服务启动类
     */
    public class RPCServer {
         
        public static void main(String[] args) throws IOException {
         
            RPC.Server server = new RPC.Builder(new Configuration())
                    .setProtocol(RPCProtocol.class)
                    .setInstance(new RPCProtocolImplement())
                    .setBindAddress("localhost")
                    .setPort(8001).setNumHandlers(1).build();
            server.start();
        }
    }
    
  • 定义客户端Client

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.ipc.RPC;
    
    import java.net.InetSocketAddress;
    
    /**
     * 4、定义RPC客户端
     */
    public class RPCClient {
         
        public static void main(String[] args) throws Exception {
         
            RPCProtocol client = RPC.getProxy(RPCProtocol.class,
                    RPCProtocol.versionID,
                    new InetSocketAddress("localhost",8001),
                    new Configuration());
            String echo = client.echo("rpc服务,请帮我计算一下:");
            System.out.println(echo);
            int sum = client.add(400, 300);
            System.out.println("收到rpc服务的计算结果:" + sum);
            // 停止客户端
            RPC.stopProxy(client);
        }
    }
    

四、运行及实验结果


  • 启动RPCServer服务端服务(先启动)
  • 启动RPCClient客户端调用服务
  • 运行结果如下
    • 客户端Client控制台会看到如下日志:

    • 服务端Server控制台会看到如下日志:


转载:https://blog.csdn.net/sujiangming/article/details/115985509
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场