飞道的博客

Dubbo源码分析

577人阅读  评论(0)

一,为什么要用Dubbo

1.为什么要用现成的框架呢?

如果我们自己去开发一个网络通信,需要考虑到

  • 底层网络通信协议的处理
  • 序列化和反序列化的处理工作

这些工作本身应该是通用的,应该是一个中间件服务。为整个公司提供远程通信的服务。而不应该由业务开发人员来自己去实现,所以才有了这样的 rpc 框架,使得我们调用远程方法时就像调用本地方法那么简单,不需要关心底层的通信逻辑。

2.大规模服务化对于服务治理的要求

当企业开始大规模的服务化以后,远程通信带来的弊端就越来越明显了。

  • 服务链路变长了,如何实现对服务链路的跟踪和监控
  • 服务的大规模集群使得服务之间需要依赖第三方注册中心来解决服务的发现和服务的感知问题
  • 服务通信之间的异常,需要有一种保护机制防止一个节点故障引发大规模的系统故障,所以要有容错机制
  • 服务大规模集群会是的客户端需要引入负载均衡机制实现请求分发

dubbo 主要是一个分布式服务治理解决方案,那么什么是服务治理?服务治理主要是针对大规模服务化以后,服务之间的路由、负载均衡、容错机制、服务降级这些问题的解决方案,而 Dubbo 实现的不仅仅是远程服务通信,并且还解决了服务路由、负载、降级、容错等功能。

二,Dubbo 的基本使用

1.dubbo-common

1)service

/**
 * @author yhd
 * @email yinhuidong1@xiaomi.com
 * @description TODO
 * @since 2021/4/2 0:32
 */
public interface LoginService {
   

    String login(String username,String password);
}

2.dubbo-provider

1)pom

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.dubbo</groupId>
			<artifactId>dubbo</artifactId>
			<version>2.7.8</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
		</dependency>
		<dependency>
			<groupId>com.yhd</groupId>
			<artifactId>dubbo-common</artifactId>
			<version>0.0.1-SNAPSHOT</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>3.6.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-framework</artifactId>
			<version>5.1.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-recipes</artifactId>
			<version>5.1.0</version>
		</dependency>
		<dependency>
			<groupId>com.101tec</groupId>
			<artifactId>zkclient</artifactId>
			<version>0.10</version>
		</dependency>

2)配置文件

server.port=8081
dubbo.application.name=dubbo-provider
dubbo.registry.address=zookeeper://localhost:2181
dubbo.registry.protocal=zookeeper
dubbo.protocol.name=dubbo
dubbo.protocol.port=20880

3)主启动类

@EnableDubbo
@ComponentScan("com.yhd")
@SpringBootApplication
public class DubboProviderApplication {
   

	public static void main(String[] args) {
   
		SpringApplication.run(DubboProviderApplication.class, args);
	}

}

4)service

@DubboService
public class LoginServiceImpl implements LoginService {
   
    @Override
    public String login(String username, String password) {
   
        return "login success!";
    }
}

3.dubbo-consumer

1)Controller

@RestController
public class LoginController {
   

    @DubboReference
    private LoginService loginService;

    @GetMapping("login/{aaa}/{bbb}")
    public String login(@PathVariable("aaa")String aaa,@PathVariable("bbb") String bbb){
   
        return loginService.login(aaa,bbb);
    }
}

三,dubbo启动原理

Dubbo 提供了几种容器让我们去启动和发布服务

1.容器类型

Spring Container

自动加载 META-INF/spring 目录下的所有 Spring 配置。

logback Container

自动装配 logback 日志

Log4j Container

自动配置 log4j 的配置

Dubbo 提供了一个 Main.main 快速启动相应的容器,默认情况下,只会启动 spring 容器

2.原理分析

默认情况下,spring 容器,本质上,就是加在 spring ioc 容器,然后启动一个 netty 服务实现服务的发布,所以并没有特别多的黑科技,下面是spring 容器启动的代码

public void start() {
   
    String configPath =ConfigUtils.getProperty( "dubbo.spring.config");
    if (StringUtils.isEmpty(configPath)) {
   
        configPath =  "classpath*:META- - INF/spring/*.xml";
    }
    context =  new
	ClassPathXmlApplicationContext(configPath.split( "[,\ \\ \ s]+"),false);
    context.refresh();
    context.start();
}

四,Dubbo对注册中心的支持

Dubbo 能够支持的注册中心有:consul、etcd、nacos、sofa、zookeeper、redis、multicast

1.Dubbo 集成 Zookeeper 的实现原理

2.dubbo 每次都要连 zookeeper ?

不是每次发起一个请求的时候,都需要访问注册中心,是通过缓存实现。

其他注册中心的实现,核心本质是一样的,都是为了管理服务地址。

3.多注册中心支持

Dubbo 中可以支持多注册中心,有的时候,客户端需要用调用的远程服务不在同一个注册中心上,那么客户端就需要配置多个注册中心来访问。

五,Dubbo仅仅是一个RPC框架?

Dubbo 的核心功能,提供服务注册和服务发现以及基于 Dubbo 协议的远程通信。Dubbo 从另一个方面来看也可以认为是一个服务治理生态。

  • Dubbo 可以支持市面上主流的注册中心
  • Dubbo 提供了 Container 的支持,默认提供了 3 种 container。
  • Dubbo 对于 RPC 通信协议的支持,不仅仅是原生的 Dubbo 协议,它还围绕着 rmi、hessian、http、webservice、thrift、rest

有了多协议的支持,使得其他 rpc 框架的应用程序可以快速的切入到 dubbo生态中。 同时,对于多协议的支持,使得不同应用场景的服务,可以选择合适的协议来发布服务,并不一定要使用 dubbo 提供的长连接方式。

1.Dubbo 监控平台安装

Dubbo-Admin

  • 修 改 dubbo-admin-server/src/main/resources/application.properties中的配置信息
  • mvn clean package 进行构建
  • mvn clean package 进行构建
  • 访问 localhost:8080

2.Dubbo 的终端操作

Dubbo 里面提供了一种基于终端操作的方法来实现服务治理。

使用 telnet localhost 20880 连接到服务对应的端口。

1)常见命令

ls
ls: 显示服务列表
ls -l: 显示服务详细信息列表
ls XxxService: 显示服务的方法列表
ls -l XxxService: 显示服务的方法详细信息列表

ps
ps: 显示服务端口列表
ps -l: 显示服务地址列表
ps 20880: 显示端口上的连接信息
ps -l 20880: 显示端口上的连接详细信息

cd
cd XxxService: 改变缺省服务,当设置了缺省服务,凡是需要输入服务名作
为参数的命令,都可以省略服务参数
cd /: 取消缺省服务

pwd
pwd: 显示当前缺省服务

count
count XxxService: 统计 1 次服务任意方法的调用情况
count XxxService 10: 统计 10 次服务任意方法的调用情况
count XxxService xxxMethod: 统计 1 次服务方法的调用情况
count XxxService xxxMethod 10: 统计 10 次服务方法的调用情况

六,负载均衡

1.负载均衡的背景

当服务端存在多个节点的集群时,zookeeper 上会维护不同集群节点,对于客户端而言,他需要一种负载均衡机制来实现目标服务的请求负载。通过负载均衡,可以让每个服务器节点获得适合自己处理能力的负载。

Dubbo 里面默认就集成了负载均衡的算法和实现,默认提供了 4 种负载均衡实现。

2.Dubbo 中负载均衡的应用

1)启动两台一样的服务

修改配置文件

dubbo.protocol.port=20881

2)代码

@DubboService(loadbalance = "random")

3.Dubbo负载均衡算法

1)RandomLoadBalance

权重随机算法,根据权重值进行随机负载。

它的算法思想很简单。假设我们有一组服务器 servers = [A, B, C],他们对应的权重为weights = [5, 3, 2],权重总和为 10。现在把这些权重值平铺在一维坐标值上,[0, 5) 区间属于服务器 A,[5, 8) 区间属于服务器 B,[8, 10) 区间属于服务器 C。接下来通过随机数生成器生成一个范围在 [0, 10) 之间的随机数,然后计算这个随机数会落到哪个区间上。比如数字 3 会落到服务器 A 对应的区间上,此时返回服务器 A 即可。权重越大的机器,在坐标轴上对应的区间范围就越大,因此随机数生成器生成的数字就会有更大的概率落到此区间内。只要随机数生成器产生的随机数分布性很好,在经过多次选择后,每个服务器被选中的次数比例接近其权重比例。

2)LeastActiveLoadBalance

最少活跃调用数算法,活跃调用数越小,表明该服务提供者效率越高,单位时间内可处理更多的请求这个是比较科学的负载均衡算法。

每个服务提供者对应一个活跃数 active。初始情况下,所有服务提供者活跃数均为 0。每收到一个请求,活跃数加 1,完成请求后则将活跃数减 1。在服务运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求。

3)ConsistentHashLoadBalance

hash 一致性算法,相同参数的请求总是发到同一提供者。

当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。

4)RoundRobinLoadBalance

加权轮询算法

所谓轮询是指将请求轮流分配给每台服务器。举个例子,我们有三台服务器 A、B、C。我们将第一个请求分配给服务器 A,第二个请求分配给服务器 B,第三个请求分配给服务器 C,第四个请求再次分配给服务器 A。这个过程就叫做轮询。轮询是一种无状态负载均衡算法,实现简单,适用于每台服务器性能相近的场景下。但现实情况下,我们并不能保证每台服务器性能均相近。如果我们将等量的请求分配给性能较差的服务器,这显然是不合理的。因此,这个时候我们需要对轮询过程进行加权,以调控每台服务器的负载。经过加权后,每台服务器能够得到的请求数比例,接近或等于他们的权重比。比如服务器 A、B、C 权重比为 5:2:1。那么在 8 次请求中,服务器 A 将收到其中的 5 次请求,服务器 B 会收到其中的 2 次请求,服务器 C 则收到其中的 1次请求。

5)一致性 hash 算法原理

七,集群容错

网络通信会有很多不确定因素,比如网络延迟、网络中断、服务异常等,会造成当前这次请求出现失败。 当服务通信出现这个问题时,需要采取一定的措施应对。而 dubbo 中提供了容错机制来优雅处理这种错误。

在集群调用失败时,Dubbo 提供了多种容错方案,缺省为 failover 重试。

@DubboService(loadbalance = "random",cluster = "failsafe")

1.Failover Cluster

失败自动切换,当出现失败,重试其它服务器。(缺省)。

通常用于读操作,但重试会带来更长延迟。

可通过 retries=“2” 来设置重试次数(不含第一次)。

2.Failfast Cluster

快速失败,只发起一次调用,失败立即报错。

通常用于非幂等性的写操作,比如新增记录。

3.Failsafe Cluster

失败安全,出现异常时,直接忽略。

通常用于写入审计日志等操作。

4.Failback Cluster

失败自动恢复,后台记录失败请求,定时重发。

通常用于消息通知操作。

5.Forking Cluster

并行调用多个服务器,只要一个成功即返回。

通常用于实时性要求较高的读操作,但需要浪费更多服务资源。

可通过 forks=“2” 来设置最大并行数。

6.Broadcast Cluster

广播调用所有提供者,逐个调用,任意一台报错则报错。(2.1.0 开始支持)

通常用于通知所有提供者更新缓存或日志等本地资源信息。

在实际应用中 查询语句容错策略建议使用默认 Failover Cluster ,而增删改 建议使用Failfast Cluster 或者 使用 Failover Cluster(retries=”0”) 策略 防止出现数据 重复添加等等其它问题!建议在设计接口时候把查询接口方法单独做一个接口提供查询。

八,服务降级

1.降级的概念

当某个非关键服务出现错误时,可以通过降级功能来临时屏蔽这个服务。降级可以有几个层面的分类: 自动降级和人工降级; 按照功能可以分为:读服务降级和写服务降级;

  • 对一些非核心服务进行人工降级,在大促之前通过降级开关关闭哪些推荐内容、评价等对主流程没有影响的功能。
  • 故障降级,比如调用的远程服务挂了,网络故障、或者 RPC 服务返回异常。 那么可以直接降级,降级的方案比如设置默认值、采用兜底数据(系统推荐的行为广告挂了,可以提前准备静态页面做返回)等等。
  • 限流降级,在秒杀这种流量比较集中并且流量特别大的情况下,因为突发访问量特别大可能会导致系统支撑不了。这个时候可以采用限流来限制访问量。当达到阀值时,后续的请求被降级,比如进入排队页面,比如跳转到错误页(活动太火爆,稍后重试等)。

那么,Dubbo 中如何实现服务降级呢?Dubbo 中提供了一个 mock 的配置,可以通过mock 来实现当服务提供方出现网络异常或者挂掉以后,客户端不抛出异常,而是通过Mock 数据返回自定义的数据。

2.Dubbo 实现服务降级

dubbo-client 端创建一个 mock 类,当出现服务降级时,会被调用

/**
 * @author yhd
 * @email yinhuidong1@xiaomi.com
 * @description 服务降级兜底类
 * @since 2021/4/2 15:28
 */
public class MockSayHelloService implements LoginService {
   
    @Override
    public String login(String username, String password) {
   
        return "Sorry,  服务端发生异常,被降级啦!";
    }
}

在消费方的主街上配置:

    @DubboReference(mock = "com.yhd.dubboconsumer.mock.MockSayHelloService",
            timeout = 1000, loadbalance = "random", cluster = "failfast",check = false)
    private LoginService loginService;

3.启动时检查

Dubbo 缺省会在启动时检查依赖的服务是否可用,不可用时会抛出异常,阻止 Spring初始化完成,以便上线时,能及早发现问题,默认 check=“true”。

可以通过 check=“false” 关闭检查,比如,测试时,有些服务不关心,或者出现了循环依赖,必须有一方先启动。

registry、reference、consumer 都可以配置 check 这个属性.

    @DubboReference(mock = "com.yhd.dubboconsumer.mock.MockSayHelloService",
            timeout = 1000, loadbalance = "random", cluster = "failfast",check = false)
    private LoginService loginService;

4.多版本支持

当一个接口实现,出现不兼容升级时,可以用版本号过渡,版本号不同的服务相互间不引用。

可以按照以下的步骤进行版本迁移:

  • 在低压力时间段,先升级一半提供者为新版本
  • 再将所有消费者升级为新版本
  • 然后将剩下的一半提供者升级为新版本

5.主机绑定

1)默认的主机绑定方式

  • 通过 LocalHost.getLocalHost()获取本机地址。
  • 如果是 127.*等 loopback(环路地址)地址,则扫描各网卡,获取网卡 IP。
    • 如果是 springboot,修改配置:dubbo.protocol.host=””
    • 如果注册地址获取不正确,可以通过在 dubbo.xml 中加入主机地址的配置。
<dubbo:protocol host="205.182.23.201">

2)缺省主机端口

dubbo: 20880
rmi: 1099
http: 80
hessian: 80
webservice: 80
memcached: 11211
redis: 6379

九,Dubbo 新的功能

1.动态配置规则

动态配置是 Dubbo2.7 版本引入的一个新的功能,简单来说,就是把 dubbo.properties中的属性进行集中式存储,存储在其他的服务器上。

那么如果需要用到集中式存储,那么还需要一些配置中心的组件来支撑。目前 Dubbo 能支持的配置中心有:apollo、nacos、zookeeper

从另外一个角度来看,我们之前用 zookeeper 实现服务注册和发现,本质上就是使用 zookeeper 实现了配置中心,这个配置中心只是维护了服务注册和服务感知的功能。在 2.7 版本中,dubbo 对配置中心做了延展,除了服务注册之外,还可以把其他的数据存储在 zookeeper 上,从而更好的进行维护。

1)在 dubboadmin 添加配置

应用名称可以是 global,或者对应当前服务的应用名,如果是 global 表示全局配置,针对所有应用可见。

配置的内容,实际就是 dubbo.properties 中配置的基本信息。只是同意存储在了zookeeper 上。

2)本地的配置文件添加配置中心

application.properties 中添加配置中心的配置项,app-name对应的是上一步创建的配置项中的应用名.

dubbo.config-center.address= zookeeper://192.168.13.106 6 :2181
dubbo.config-center.app- - name= spring-boot-provider
#需要注意的是,存在于配置中心上的配置项,本地仍然需要配置一份。所以下面这些配置一定要加上。否则启动不了。这样做的目的是保证可靠性
dubbo.application.name= spring-boot-provider
dubbo.protocol.port= 20880
dubbo.protocol.name= dubbo
dubbo.registry.address= zookeeper://192.168.13.102:2181?backup=192.168.13.103:2181,192.168.13.104:2181

3)配置的优先级

引入配置中心后,配置的优先级就需要关注了,默认情况下,外部配置的优先级最高,也就是意味着配置中心上的配置会覆盖本地的配置。当然我们也可以调整优先级。

dubbo.config-center.highest-priority=false

4)配置中心的原理

默认所有的配置都存储在/dubbo/config 节点。

namespace,用于不同配置的环境隔离。

config,Dubbo 约定的固定节点,不可更改,所有配置和服务治理规则都存储在此节点下。

dubbo/application,分别用来隔离全局配置、应用级别配置:dubbo 是默认 group 值,

application 对应应用名。

dubbo.properties,此节点的 node value 存储具体配置内容。

2.元数据中心

Dubbo2.7 的另外一个新的功能,就是增加了元数据的配置。

在 Dubbo2.7 之前,所有的配置信息,比如服务接口名称、重试次数、版本号、负载策略、容错策略等等,所有参数都是基于 url 形式配置在 zookeeper 上的。这种方式会造成一些问题:

  • url 内容过多,导致数据存储空间增大
  • url 需要涉及到网络传输,数据量过大会造成网络传输过慢
  • 网络传输慢,会造成服务地址感知的延迟变大,影响服务的正常响应

服务提供者这边的配置参数有 30 多个,有一半是不需要作为注册中心进行存储和传输地的。而消费者这边可配置的参数有 25 个以上,只有个别是需要传递到注册中心的。所以,在 Dubbo2.7 中对元数据进行了改造,简单来说,就是把属于服务治理的数据发布到注册中心,其他的配置数据统一发布到元数据中心。这样一来大大降低了注册中心的负载。

1)元数据中心配置

元数据中心目前支持 redis 和 zookeeper。官方推荐是采用 redis。毕竟 redis 本身对于非结构化存储的数据读写性能比较高。当然,也可以使用 zookeeper 来实现。将注册中心地址、元数据中心地址等配置集中管理,可以做到统一环境、减少开发侧感知。官网可查询外部化配置,不过描述过于简略。

dubbo.metadata-report.address= zookeeper://192.168.13.106:2181
dubbo.registry.simplified= true 
#注册到注册中心的 URL 是否采用精简模式的(与低版本兼容)

十,Dubbo 中的 SPI 机制

dubbo版本2.7.2。

1.Java SPI

SPI 是 JDK 内置的一种服务提供发现机制。目前市面上有很多框架都是用它来做服务的扩展发现。简单来说,它是一种动态替换发现的机制。举个简单的例子,我们想在运行时动态给它添加实现,你只需要添加一个实现,然后把新的实现描述给 JDK 知道就行了。如 JDBC、日志框架都有用到。

1)实现SPI需要遵循的标准

  • 需要在 classpath 下创建一个目录,该目录命名必须是:META-INF/service
  • 在该目录下创建一个 properties 文件,该文件需要满足以下几个条件
    • 文件名必须是扩展的接口的全路径名称
    • 文件内部描述的是该扩展接口的所有实现类
    • 文件的编码格式是 UTF-8
  • 通过 java.util.ServiceLoader 的加载机制来发现

2)SPI的实际应用

JDK 本身提供了数据访问的 api。在 java.sql 这个包里面。

java.sql.Driver 的源码,Driver 并没有实现,而是提供了一套标准的 api 接口。

通过 SPI 机制把 java.sql.Driver 和 mysql 的驱动做了集成,达到了各个数据库厂商自己去实现数据库连接,jdk 本身不关心你怎么实现。

门面模式?适配器模式?

3)SPI的缺点

  • JDK 标准的 SPI 会一次性加载实例化扩展点的所有实现,什么意思呢?就是如果你在 META-INF/service 下的文件里面加了 N个实现类,那么 JDK 启动的时候都会一次性全部加载。那么如果有的扩展点实现初始化很耗时或者如果有些实现类并没有用到,那么会很浪费资源
  • 如果扩展点加载失败,会导致调用方报错,而且这个错误很难定位到是这个原因。

2.Dubbo 优化后的 SPI

1)基于 Dubbo SPI 的实现自己的扩展

Dubbo 的 SPI 扩展机制,有两个规则

  • 需要在 resource 目录下配置 META-INF/dubbo 或者 META-INF/dubbo/internal 或者 META-INF/services,并基于 SPI 接口去创建一个文件。
  • 文件名称和接口名称保持一致,文件内容和 SPI 有差异,内容是 KEY 对应 Value

Dubbo 针对的扩展点非常多,可以针对协议、拦截、集群、路由、负载均衡、序列化、容器… 几乎里面用到的所有功能,都可以实现自己的扩展,这个是 dubbo 比较强大的一点。

2)扩展协议扩展点

  • 创建如下结构,添加 META-INF.dubbo 文件。类名和 Dubbo 提供的协议扩展点接口保持一致。

myProtocol=com.yhd.dubboprovider.diy.MyProtocol
  • 创建 MyProtocol 协议类
    • 可以实现自己的协议,我们为了模拟协议产生了作用,修改一个端口
public class MyProtocol implements Protocol {
   
    @Override
    public int getDefaultPort() {
   
        return 8888;
    }

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
   
        return null;
    }

    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
   
        return null;
    }

    @Override
    public void destroy() {
   

    }
}
  • 在调用处执行如下代码
Protocol protocol=ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("myProtocol"); 
System.out.print(protocol.getDefaultPort)
  • 输出结果,可以看到运行结果,是执行的自定义的协议扩展点。
  • 总结:总的来说,思路和 SPI 是差不多。都是基于约定的路径下制定配置文件。目的,通过配置的方式轻松实现功能的扩展。

一定有一个地方通过读取指定路径下的所有文件进行 load。然后讲对应的结果保存到一个 map 中,key 对应为名称,value 对应为实现类。那么这个实现,一定就在 ExtensionLoader 中了。

3.Dubbo 的扩展点原理实现

Dubbo SPI和JDK SPI配置的不同,在Dubbo SPI中可以通过键值对的方式进行配置,这样就可以按需加载指定的实现类。

Dubbo SPI的相关逻辑都被封装到ExtensionLoader类中,通过ExtensionLoader我们可以加载指定的实现类,一个扩展接口就对应一个ExtensionLoader对象,在这里我们把它称为:扩展点加载器。

1)属性

public class ExtensionLoader<T> {
   
    
    //扩展点配置文件的路径,可以从3个地方加载到扩展点配置文件
    private static final String SERVICES_DIRECTORY = "META-INF/services/";
    private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";
    private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";   
    //扩展点加载器的集合
    private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();
    //扩展点实现的集合
    private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();
    //扩展点名称和实现的映射缓存
    private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<Class<?>, String>();
    //拓展点实现类集合缓存
    private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<Map<String, Class<?>>>();
    //扩展点名称和@Activate的映射缓存
    private final Map<String, Activate> cachedActivates = new ConcurrentHashMap<String, Activate>();
    //扩展点实现的缓存
    private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<String, Holder<Object>>();
}

ExtensionLoader会把不同的扩展点配置和实现都缓存起来。同时,Dubbo在官网上也给了我们提醒:扩展点使用单一实例加载(请确保扩展实现的线程安全性),缓存在 ExtensionLoader中。下面我们看几个重点方法。

2)获取扩展点加载器

我们首先通过ExtensionLoader.getExtensionLoader() 方法获取一个 ExtensionLoader 实例,它就是扩展点加载器。然后再通过 ExtensionLoader 的 getExtension 方法获取拓展类对象。这其中,getExtensionLoader 方法用于从缓存中获取与拓展类对应的 ExtensionLoader,若缓存未命中,则创建一个新的实例。

public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
   
    if (type == null)
        throw new IllegalArgumentException("Extension type == null");
    if (!type.isInterface()) {
   
        throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
    }
    if (!withExtensionAnnotation(type)) {
   
        throw new IllegalArgumentException("Extension type(" + type +
                ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
    }
    ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    if (loader == null) {
   
        EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
        loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    }
    return loader;
}

比如你可以通过下面这样,来获取Protocol接口的ExtensionLoader实例:

ExtensionLoader<Protocol> extensionLoader = ExtensionLoader.getExtensionLoader(Protocol.class);

就可以拿到扩展点加载器的对象实例:

com.alibaba.dubbo.common.extension.ExtensionLoader[com.alibaba.dubbo.rpc.Protocol]

3)获取扩展类对象

上一步我们已经拿到加载器,然后可以根据加载器实例,通过扩展点的名称获取扩展类对象。

public T getExtension(String name) {
   
    //校验扩展点名称的合法性
    if (name == null || name.length() == 0)
        throw new IllegalArgumentException("Extension name == null");
    // 获取默认的拓展实现类
    if ("true".equals(name)) {
   
        return getDefaultExtension();
    }
    //用于持有目标对象
    Holder<Object> holder = cachedInstances.get(name);
    if (holder == null) {
   
        cachedInstances.putIfAbsent(name, new Holder<Object>());
        holder = cachedInstances.get(name);
    }
    Object instance = holder.get();
    if (instance == null) {
   
        synchronized (holder) {
   
            instance = holder.get();
            if (instance == null) {
   
                instance = createExtension(name);
                holder.set(instance);
            }
        }
    }
    return (T) instance;
}

它先尝试从缓存中获取,未命中则创建扩展对象。那么它的创建过程是怎样的呢?

private T createExtension(String name) {
   
    //从配置文件中获取所有的扩展类,Map数据结构
    //然后根据名称获取对应的扩展类
    Class<?> clazz = getExtensionClasses().get(name);
    if (clazz == null) {
   
        throw findException(name);
    }
    try {
   
        //通过反射创建实例,然后放入缓存
        T instance = (T) EXTENSION_INSTANCES.get(clazz);
        if (instance == null) {
   
            EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
            instance = (T) EXTENSION_INSTANCES.get(clazz);
        }
        //注入依赖
        injectExtension(instance);
        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
   
            // 包装为Wrapper实例
            for (Class<?> wrapperClass : wrapperClasses) {
   
                instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
            }
        }
        return instance;
    } catch (Throwable t) {
   
        throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                type + ")  could not be instantiated: " + t.getMessage(), t);
    }
}

这里的重点有两个,依赖注入和Wrapper包装类,它们是Dubbo中IOC 与 AOP 的具体实现。

①依赖注入

向拓展对象中注入依赖,它会获取类的所有方法。判断方法是否以 set 开头,且方法仅有一个参数,且方法访问级别为 public,就通过反射设置属性值。所以说,Dubbo中的IOC仅支持以setter方式注入。

private T injectExtension(T instance) {
   
    try {
   
        if (objectFactory != null) {
   
            for (Method method : instance.getClass().getMethods()) {
   
                if (method.getName().startsWith("set")
                        && method.getParameterTypes().length == 1
                        && Modifier.isPublic(method.getModifiers())) {
   
                    Class<?> pt = method.getParameterTypes()[0];
                    try {
   
                        String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
                        Object object = objectFactory.getExtension(pt, property);
                        if (object != null) {
   
                            method.invoke(instance, object);
                        }
                    } catch (Exception e) {
   
                        logger.error("fail to inject via method " + method.getName()
                                + " of interface " + type.getName() + ": " + e.getMessage(), e);
                    }
                }
            }
        }
    } catch (Exception e) {
   
        logger.error(e.getMessage(), e);
    }
    return instance;
}

②Wrapper

它会将当前 instance 作为参数传给 Wrapper 的构造方法,并通过反射创建 Wrapper 实例。 然后向 Wrapper 实例中注入依赖,最后将 Wrapper 实例再次赋值给 instance 变量。说起来可能比较绕,我们直接看下它最后生成的对象就明白了。

我们以DubboProtocol为例,它包装后的对象为:

综上所述,如果我们获取一个扩展类对象,最后拿到的就是这个Wrapper类的实例。

就像这样:

ExtensionLoader<Protocol> extensionLoader = ExtensionLoader.getExtensionLoader(Protocol.class);
Protocol extension = extensionLoader.getExtension("dubbo");
System.out.println(extension);

输出为:com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper@4cdf35a9

4)获取所有的扩展类

在我们通过名称获取扩展类对象之前,首先需要根据配置文件解析出所有的扩展类。

它是一个扩展点名称和扩展类的映射表Map<String, Class<?>>

首先,还是从缓存中cachedClasses获取,如果没有就调用loadExtensionClasses从配置文件中加载。配置文件有三个路径:

  • META-INF/services/
  • META-INF/dubbo/
  • META-INF/dubbo/internal/

先尝试从缓存中获取。

private Map<String, Class<?>> getExtensionClasses() {
   
    //从缓存中获取
    Map<String, Class<?>> classes = cachedClasses.get();
    if (classes == null) {
   
        synchronized (cachedClasses) {
   
            classes = cachedClasses.get();
            if (classes == null) {
   
                //加载扩展类
                classes = loadExtensionClasses();
                cachedClasses.set(classes);
            }
        }
    }
    return classes;
}   

如果没有,就调用loadExtensionClasses从配置文件中读取。

private Map<String, Class<?>> loadExtensionClasses() {
   
    //获取 SPI 注解,这里的 type 变量是在调用 getExtensionLoader 方法时传入的
    final SPI defaultAnnotation = type.getAnnotation(SPI.class);
    if (defaultAnnotation != null) {
   
        String value = defaultAnnotation.value();
        if ((value = value.trim()).length() > 0) {
   
            String[] names = NAME_SEPARATOR.split(value);
            if (names.length > 1) {
   
                throw new IllegalStateException("more than 1 default extension 
                    name on extension " + type.getName()+ ": " + Arrays.toString(names));
            }
            //设置默认的扩展名称,参考getDefaultExtension 方法
            //如果名称为true,就是调用默认扩赞类
            if (names.length == 1) cachedDefaultName = names[0];
        }
    }
    //加载指定路径的配置文件
    Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
    loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
    loadDirectory(extensionClasses, DUBBO_DIRECTORY);
    loadDirectory(extensionClasses, SERVICES_DIRECTORY);
    return extensionClasses;
}

以Protocol接口为例,获取到的实现类集合如下,我们就可以根据名称加载具体的扩展类对象。

{
   
    registry=class com.alibaba.dubbo.registry.integration.RegistryProtocol
    injvm=class com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
    thrift=class com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
    mock=class com.alibaba.dubbo.rpc.support.MockProtocol
    dubbo=class com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
    http=class com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
    redis=class com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol
    rmi=class com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
}

4.自适应扩展机制

在Dubbo中,很多拓展都是通过 SPI 机制进行加载的,比如 Protocol、Cluster、LoadBalance 等。这些扩展并非在框架启动阶段就被加载,而是在扩展方法被调用的时候,根据URL对象参数进行加载。

那么,Dubbo就是通过自适应扩展机制来解决这个问题。

自适应拓展机制的实现逻辑是这样的:

首先 Dubbo 会为拓展接口生成具有代理功能的代码。然后通过 javassist 或 jdk 编译这段代码,得到 Class 类。最后再通过反射创建代理类,在代理类中,就可以通过URL对象的参数来确定到底调用哪个实现类。

1)Adaptive注解

在开始之前,我们有必要先看一下与自适应拓展息息相关的一个注解,即 Adaptive 注解。

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({
   ElementType.TYPE, ElementType.METHOD})
public @interface Adaptive {
   
    String[] value() default {
   };
}

从上面的代码中可知,Adaptive 可注解在类或方法上。

  • 标注在类上
    Dubbo 不会为该类生成代理类。
  • 标注在方法上
    Dubbo 则会为该方法生成代理逻辑,表示当前方法需要根据 参数URL 调用对应的扩展点实现。

2)获取自适应拓展类

getAdaptiveExtension 方法是获取自适应拓展的入口方法。

public T getAdaptiveExtension() {
   
    // 从缓存中获取自适应拓展
    Object instance = cachedAdaptiveInstance.get();
    if (instance == null) {
   
        if (createAdaptiveInstanceError == null) {
   
            synchronized (cachedAdaptiveInstance) {
   
                instance = cachedAdaptiveInstance.get();
                //未命中缓存,则创建自适应拓展,然后放入缓存
                if (instance == null) {
   
                    try {
   
                        instance = createAdaptiveExtension();
                        cachedAdaptiveInstance.set(instance);
                    } catch (Throwable t) {
   
                        createAdaptiveInstanceError = t;
                        throw new IllegalStateException("fail to create 
                                                  adaptive instance: " + t.toString(), t);
                    }
                }
            }
        }
    }
    return (T) instance;
}

getAdaptiveExtension方法首先会检查缓存,缓存未命中,则调用 createAdaptiveExtension方法创建自适应拓展。

private T createAdaptiveExtension() {
   
    try {
   
        return injectExtension((T) getAdaptiveExtensionClass().newInstance());
    } catch (Exception e) {
   
        throw new IllegalStateException("
            Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
    }
}

这里的代码较少,调用 getAdaptiveExtensionClass方法获取自适应拓展 Class 对象,然后通过反射实例化,最后调用injectExtension方法向拓展实例中注入依赖。

获取自适应扩展类过程如下:

private Class<?> getAdaptiveExtensionClass() {
   
    //获取当前接口的所有实现类
    //如果某个实现类标注了@Adaptive,此时cachedAdaptiveClass不为空
    getExtensionClasses();
    if (cachedAdaptiveClass != null) {
   
        return cachedAdaptiveClass;
    }
    //以上条件不成立,就创建自适应拓展类
    return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

在上面方法中,它会先获取当前接口的所有实现类,如果某个实现类标注了@Adaptive,那么该类就被赋值给cachedAdaptiveClass变量并返回。如果没有,就调用createAdaptiveExtensionClass创建自适应拓展类。

private Class<?> createAdaptiveExtensionClass() {
   
    //构建自适应拓展代码
    String code = createAdaptiveExtensionClassCode();
    ClassLoader classLoader = findClassLoader();
    // 获取编译器实现类 这个Dubbo默认是采用javassist 
    Compiler compiler =ExtensionLoader.getExtensionLoader(Compiler.class).getAdaptiveExtension();
    //编译代码,返回类实例的对象
    return compiler.compile(code, classLoader);
}

在生成自适应扩展类之前,Dubbo会检查接口方法是否包含@Adaptive。如果方法上都没有此注解,就要抛出异常。

if (!hasAdaptiveAnnotation){
   
    throw new IllegalStateException(
        "No adaptive method on extension " + type.getName() + ", 
          refuse to create the adaptive class!");
}

我们还是以Protocol接口为例,它的export()refer()方法,都标注为@AdaptivedestroygetDefaultPort未标注 @Adaptive注解。Dubbo 不会为没有标注 Adaptive 注解的方法生成代理逻辑,对于该种类型的方法,仅会生成一句抛出异常的代码。

package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Adaptive;
import com.alibaba.dubbo.common.extension.SPI;

@SPI("dubbo")
public interface Protocol {
   
    int getDefaultPort();
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
    void destroy();
}

所以说当我们调用这两个方法的时候,会先拿到URL对象中的协议名称,再根据名称找到具体的扩展点实现类,然后去调用。下面是生成自适应扩展类实例的源代码:

package com.viewscenes.netsupervisor.adaptive;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Protocol;
import com.alibaba.dubbo.rpc.RpcException;

public class Protocol$Adaptive implements Protocol {
   
    public void destroy() {
   
        throw new UnsupportedOperationException(
                "method public abstract void Protocol.destroy() of interface Protocol is not adaptive method!");
    }
    public int getDefaultPort() {
   
        throw new UnsupportedOperationException(
                "method public abstract int Protocol.getDefaultPort() of interface Protocol is not adaptive method!");
    }
    public Exporter export(Invoker invoker)throws RpcException {
   
        if (invoker == null) {
   
            throw new IllegalArgumentException("Invoker argument == null");
        }
        if (invoker.getUrl() == null) {
   
            throw new IllegalArgumentException("Invoker argument getUrl() == null");
        }
            
        URL url = invoker.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null) {
   
            throw new IllegalStateException("Fail to get extension(Protocol) name from url("
                    + url.toString() + ") use keys([protocol])");
        }
            
        Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
        return extension.export(invoker);
    }
    public Invoker refer(Class clazz,URL ur)throws RpcException {
   
        if (ur == null) {
   
            throw new IllegalArgumentException("url == null");
        }
        URL url = ur;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null) {
   
            throw new IllegalStateException("Fail to get extension(Protocol) name from url("+ url.toString() + ") use keys([protocol])");
        }
        Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
        return extension.refer(clazz, url);
    }
}

综上所述,当我们获取某个接口的自适应扩展类,实际就是一个Adaptive类实例。

ExtensionLoader<Protocol> extensionLoader = ExtensionLoader.getExtensionLoader(Protocol.class);            
Protocol adaptiveExtension = extensionLoader.getAdaptiveExtension();
System.out.println(adaptiveExtension);

输出为:

com.alibaba.dubbo.rpc.Protocol$Adaptive@47f6473

5.自动激活扩展点机制

自动激活扩展点,有点类似springboot 用到的 conditional,根据条件进行自动激活。但是这里设计的初衷是,对于一个类会加载多个扩展点的实现,这个时候可以通过自动激活扩展点进行动态加载, 从而简化配置我们的配置。

@Activate 提供了一些配置来允许我们配置加载条件,比如 group 过滤,比如 key 过滤。

我们可以看看 org.apache.dubbo.Filter 这个类,它有非常多的实现,比如说 CacheFilter,这个缓存过滤器,配置信息如下:

group 表示客户端和和服务端都会加载,value 表示 url 中有 cache_key 的时候

@Activate(group = {
   CONSUMER, PROVIDER}, value = CACHE_KEY)
public class CacheFilter implements Filter {
   

通过下面这段代码,演示关于 Filter 的自动激活扩展点的效果。没有添加“注释代码”时,list 的结果是 10,添加之后 list激活扩展点的效果。没有添加“注释代码”时,list 的结果是 10,添加之后 list。会自动把 cacheFilter 加载进来。

	public static void main(String[] args) {
   
		SpringApplication.run(DubboProviderApplication.class, args);


		/*Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("myProtocol");
		System.out.println(protocol.getDefaultPort());*/

		/*Compiler compiler = ExtensionLoader.getExtensionLoader(Compiler.class).getAdaptiveExtension();
		//org.apache.dubbo.common.compiler.support.AdaptiveCompiler
		System.out.println("compiler.getClass() = " + compiler.getClass());*/

		ExtensionLoader<Filter> extensionLoader = ExtensionLoader.getExtensionLoader(Filter.class);
		URL url = new URL("", "", 0);
		List<Filter> filters = extensionLoader.getActivateExtension(//url.addParameter("cache","cache"), "cache");
		System.out.println("filters.size() = " + filters.size());
	}

这个方法的底层逻辑其实就是先获取到所有对应的激活扩展类,在拿到URL,根据 @Activate 获取到对应的扩展类组合在一起返回。

十一,Dubbo原理-框架设计

config 配置层:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类

proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory

registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService

cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster, Directory, Router, LoadBalance

monitor 监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory, Monitor, MonitorService

protocol 远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter

exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer

transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec

serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool

十二,服务暴露

分析:如果需要完成服务发布预注册,需要实现哪些事情?

  • 解析配置文件或注解
  • 服务注册
  • 启动netty服务实现远程监听

1.dubbo对于spring的扩展

1)spring的标签扩展

在 spring 中定义了两个接口

  • NamespaceHandler: 注册一堆 BeanDefinitionParser,利用他们来进行解析
  • BeanDefinitionParser:用于解析每个 element 的内容

Spring 默认会加载 jar 包下的 META-INF/spring.handlers 文件寻找对应的 NamespaceHandler。 Dubbo-config 模块下的 dubbo-config-spring就含有这个文件。

2)dubbo的接入实现

Dubbo 中 spring 扩展就是使用 spring 的自定义类型,所以同样也有 NamespaceHandler、BeanDefinitionParser。而NamespaceHandler 是 DubboNamespaceHandler。

public class DubboNamespaceHandler extends NamespaceHandlerSupport implements ConfigurableSourceBeanMetadataElement {
   

    static {
   
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }

    @Override
    public void init() {
   
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
        registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true));
        registerBeanDefinitionParser("ssl", new DubboBeanDefinitionParser(SslConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
    }
}

BeanDefinitionParser 全部都使用了 DubboBeanDefinitionParser,如果我们想看 dubbo:service 的配置,就直接看 DubboBeanDefinitionParser(ServiceBean.class,true)

这个里面主要做了一件事,把不同的配置分别转化成 spring 容器中的 bean 对象

application ApplicationConfig
registry RegistryConfig
monitor MonitorConfig
provider ProviderConfig
consumer ConsumerConfig

涉及到服务发布和服务调用的两个配置的解析,用的是 ServiceBean 和 referenceBean。并不是 config 结尾的,这两个类稍微特殊些,当然他同时也继承了 ServiceConfig 和 ReferenceConfig。

3)DubboBeanDefinitionParser

这里面是实现具体配置文件解析的入口,它重写了 parse 方法,对 spring 的配置进行解析。我们关注一下 ServiceBean 的解析.实际就是解析 dubbo:service 这个标签中对应的属性。

else if (ServiceBean.class.equals(beanClass)) {
   
            String className = resolveAttribute(element, "class", parserContext);
            if (StringUtils.isNotEmpty(className)) {
   
                RootBeanDefinition classDefinition = new RootBeanDefinition();
                classDefinition.setBeanClass(ReflectUtils.forName(className));
                classDefinition.setLazyInit(false);
                parseProperties(element.getChildNodes(), classDefinition, parserContext);
                beanDefinition.getPropertyValues().addPropertyValue("ref", new BeanDefinitionHolder(classDefinition, id + "Impl"));
            }
        }

4)ServiceBean的实现

ServiceBean 这个类,分别实现了 InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener,BeanNameAware, ApplicationEventPublisherAware

①InitializingBean

接口为 bean 提供了初始化方法的方式,它只包括 afterPropertiesSet 方法,凡是继承该接口的类,在初始化 bean 的时候会执行该方法。被重写的方法为 afterPropertiesSet。

②DisposableBean

被重写的方法为 destroy ,bean 被销毁的时候,spring 容器会自动执行 destory 方法,比如释放资源。

③ApplicationContextAware

实现了这个接口的 bean,当 spring 容器初始化的时候,会自动的将 ApplicationContext 注入进来。

④ApplicationListener

ApplicationEvent 事件监听,spring 容器启动后会发一个事件通知。被重写的方法为: onApplicationEvent ,onApplicationEvent方法传入的对象是 ContextRefreshedEvent。这个对象是当 Spring 的上下文被刷新或者加载完毕的时候触发的。因此服务就是在Spring 的上下文刷新后进行导出操作的。

⑤BeanNameAware

获得自身初始化时,本身的 bean 的 id 属性,被重写的方法为 setBeanName。

⑥ApplicationEventPublisherAware

这个是一个异步事件发送器。被重写的方法为 setApplicationEventPublisher ,简单来说,在 spring 里面提供了类似于消息队列的异步事件解耦功能。(典型的观察者模式的应用)。

⑦spring 事件发送监听由 3 个部分组成

  • ApplicationEvent:表示事件本身,自定义事件需要继承该类
  • ApplicationEventPublisherAware:事件发送器,需要实现该接口
  • ApplicationListener:事件监听器接口

5)总结-dubbo启动解析,加载配置文件

在dubbo的META-INF目录下有一个 spring.handlers 配置文件。容器启动就会加载这个配置文件。

这个配置文件里面注册了一个 Bean 叫做 DubboNamespaceHandler ,这个 bean 就是dubbo 配置文件处理器。

在这个 bean里面有一个init(),这个方法里面会加载很多的 DubboBeanDefinitionParser ,DubboBeanDefinitionParser就是配置文件解析器。(除了 service 和 reference 对应的类叫 ServiceBean 和 ReferenceBean ,其他对应标签对应的类都是xxxConfig)。

DubboBeanDefinitionParser 的parse() 就是对标签的解析,他会判断这个标签所属的类型,然后根据标签的配置进行属性填充。最终将所有的bean注册到配置中心。

2.ServiceBean 中服务暴露过程

在 ServiceBean 中,我们暂且只需要关注两个方法,分别是:

  • 在初始化 bean 的时候会执行该方法 afterPropertiesSet
  • spring 容器启动后会发一个事件通知 onApplicationEvent

1)afterPropertiesSet

这个方法里面,就是把 dubbo 中配置的 application 、 registry 、 service 、 protocol 等信息,加载到对应的 config实体中,便于后续的使用。

    @Override
    public void afterPropertiesSet() throws Exception {
   
        if (StringUtils.isEmpty(getPath())) {
   
            if (StringUtils.isNotEmpty(getInterface())) {
   
                setPath(getInterface());
            }
        }
    }

2)onApplicationEvent

spring 容器启动之后,会收到一个这样的事件通知,这里面做了两个事情

  • 判断服务是否已经发布过
  • 如果没有发布,则调用调用 export 进行服务发布的流程 入口
public void onApplicationEvent(ContextRefreshedEvent event) {
   
    if (!isExported() && !isUnexported()) {
   
        if (logger.isInfoEnabled()) {
   
        	logger.info("The service ready on spring started. service: " + getInterface());
        }
        export();
    }
}

3)export

serviceBean 中,重写了 export 方法,实现了 一个事件的发布。并且调用了 super.export() ,也就是会调用父类的 export 方法。

    @Override
    public void exported() {
   
        super.exported();
        // Publish ServiceBeanExportedEvent
        publishExportEvent();
    }

3.ServiceConfig配置类

所有的配置它都放在了一个 AbstractServiceConfig 的抽象类,自己实现了很多对于服务发布之前要做的操作逻辑。

1)export

    public synchronized void export() {
   
        // 当前的服务是否需要发布 ,  通过配置实现: @Service(export = false)
        if (!shouldExport()) {
   
            return;
        }
		//2.7.5版本新增,多了一个Dubbo的引导类
        if (bootstrap == null) {
   
            //获取引导类实例
            bootstrap = DubboBootstrap.getInstance();
            //调用引导类的初始化方法
            bootstrap.initialize();
        }
		//检查并更新配置
        checkAndUpdateSubConfigs();

        //初始化元数据中心 , 2.75 版本新增 ,用来存储配置信息,减轻配置中心压力
        serviceMetadata.setVersion(getVersion());
        serviceMetadata.setGroup(getGroup());
        serviceMetadata.setDefaultGroup(getGroup());
        serviceMetadata.setServiceType(getInterfaceClass());
        serviceMetadata.setServiceInterfaceName(getInterface());
        serviceMetadata.setTarget(getRef());
		// 检查是否需要延时发布,通过配置 @Service(delay = 1000) 实现,单位毫秒
        if (shouldDelay()) {
   
            // 这里的延时是通过定时器来实现
            DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
        } else {
   
            // 如果没有配置 delay ,则直接调用 export 进行发布
            doExport();
        }
		//发布一个事件  TODO
        exported();
    }

2)doExport

这里仍然还是在实现发布前的各种判断

    protected synchronized void doExport() {
   
        if (unexported) {
   
            throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
        }
        // 服务是否已经发布过了
        if (exported) {
   
            return;
        }
        exported = true;// 设置发布状态

        if (StringUtils.isEmpty(path)) {
   
            path = interfaceName;//path 表示服务路径,默认使用 interfaceName
        }
        doExportUrls();
    }

3)doExportUrls

  • 加载所有配置的注册中心地址
  • 遍历所有配置的协议,protocols
  • 针对每种协议发布一个对应协议的服务
    private void doExportUrls() {
   
        ServiceRepository repository = ApplicationModel.getServiceRepository();
        ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
        repository.registerProvider(
                getUniqueServiceName(),
                ref,
                serviceDescriptor,
                this,
                serviceMetadata
        );
		// 加载所有配置的注册中心的地址,组装成一个 URL集合
        List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
		
        for (ProtocolConfig protocolConfig : protocols) {
   
            //group 跟 version 组成一个 pathKey(serviceName)
            String pathKey = URL.buildKey(getContextPath(protocolConfig)
                    .map(p -> p + "/" + path)
                    .orElse(path), group, version);
            // 服务注册   TODO TODO TODO 重要的事情标记三遍
            repository.registerService(pathKey, interfaceClass);
            serviceMetadata.setServiceKey(pathKey);
            //发布指定协议的服务
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

4) doExportUrlsFor1Protocol

发布指定协议的服务,以 Dubbo 服务为例。

// export service  发布服务
//获取当前要发布服务的IP和端口
String host = findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = findConfigedPorts(protocolConfig, name, map);
//组装URL
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

// You can customize Configurator to append extra parameters
//通过 ConfiguratorFactory 去实现动态改变配置的功能 ,动态配置 TODO
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
    .hasExtension(url.getProtocol())) {
   
    url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
        .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}

String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
//如果 scope!="none"则发布服务,默认 scope 为 null。
//如果 scope 不为 none,判断是否为 local 或 remote,
//从而发布 Local 服务或 Remote 服务,默认两个都会发布
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
   

    // export to local if the config is not remote (export to remote only when config is remote)
    if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
   
        //发布本地服务  injvm
        //服务只是 injvm 的服务,提供一种消费者和提供者都在一个 jvm 内的调用方式。使用了 Injvm 协议,是一个伪协议,它不开启端口,不发起远程调用,只在 JVM 内直接关联,(通过集合的方式保存了发布的服务信息),但执行 Dubbo 的 Filter 链。简单来说,就是你本地的 dubbo 服务调用,都依托于 dubbo 的标准来进行。这样可以享受到 dubbo 的一些配置服务。
        exportLocal(url);
    }
    // export to remote if the config is not local (export to local only when config is local)
    if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
   
        if (CollectionUtils.isNotEmpty(registryURLs)) {
   
            for (URL registryURL : registryURLs) {
   
                //if protocol is only injvm ,not register
                if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
   
                    continue;
                }
                url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
                if (monitorUrl != null) {
   
                    url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                }
                if (logger.isInfoEnabled()) {
   
                    if (url.getParameter(REGISTER_KEY, true)) {
   
                        logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                    } else {
   
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                }

                // For providers, this is used to enable custom proxy to generate invoker
                String proxy = url.getParameter(PROXY_KEY);
                if (StringUtils.isNotEmpty(proxy)) {
   
                    registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                }
			//Invoker 是一个代理类,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。
                Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                //因为 2.7 引入了元数据,所以这里对 invoker 做了委托,把 invoker 交给DelegateProviderMetaDataInvoker 来处理。
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
			//发布代理
                Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                //添加到 exporters 集合
                exporters.add(exporter);
            }
        } else {
   
            if (logger.isInfoEnabled()) {
   
                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
            }
            Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

            Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
            exporters.add(exporter);
        }

5)PROTOCOL.export

protocol.export,这个 protocol 是什么呢?找到定义处发现它是一个自适应扩展点,打开 Protocol 这个扩展点,又可以看到它是一个在方法层面上的自适应扩展,意味着它实现了对于 export 这个方法的适配。也就意味着这个 Protocol 是一个动态代理类,Protocol$Adaptive

这个动态代理类,会根据 url 中配置的 protocol name 来实现对应协议的适配。

private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

6)Protocol$Adaptive

在当前的场景中,protocol 会是调用谁呢?目前发布的 invoker(URL),实际上是一个 registry://协议,所以Protocol$Adaptive,会通过 getExtension(extName)得到一个 RegistryProtocol。

public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
   
    public void destroy() {
   
        throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy()of interface org.apache.dubbo.rpc.Protocol is not adaptive method !");
    }

    public int getDefaultPort() {
   
        throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort()of interface org.apache.dubbo.rpc.Protocol is not adaptive method !");
    }

    public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
   
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        org.apache.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys ([protocol])");
        //获取扩展类
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        //扩展类执行服务暴露方法
        return extension.export(arg0);
    }

    public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
   
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys ([protocol])");
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
}

4.RegistryProtocol.export

很明显,这个 RegistryProtocol 是用来实现服务注册的,这里面会有很多处理逻辑。

  • 实现对应协议的服务发布
  • 实现服务注册
  • 订阅服务重写
    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
   
        // 这里获得的是 zookeeper 注册中心的 url: zookeeper://ip:port
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        // 这里是获得服务提供者的 url, dubbo://ip:port...
        URL providerUrl = getProviderUrl(originInvoker);

        // 订阅 override 数据。在 admin 控制台可以针对服务进行治理,比如修改权重,修改路由机制等,当注册中心有此服务的覆盖配置注册进来时,推送消息给提供者,重新暴露服务
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //========================================================================//
        // 这里就交给了具体的协议去暴露服务,本质上这里就是启动一个netty
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        //  根据 invoker 中的 url 获取 Registry 实例 : zookeeperRegistry
        final Registry registry = getRegistry(originInvoker);
        // 获取要注册到注册中心的 URL: dubbo://ip:port
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

        // decide if we need to delay publish
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        // 是否配置了注册中心,如果是, 则需要注册
        if (register) {
   
            register(registryUrl, registeredProviderUrl);
        }

        // 设置注册中心的订阅
        registerStatedUrl(registryUrl, registeredProviderUrl, register);


        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        notifyExport(exporter);
        // 保证每次 export 都返回一个新的 exporter 实例
        return new DestroyableExporter<>(exporter);
    }

1)doLocalExport

先通过 doLocalExport 来暴露一个服务,本质上应该是启动一个通信服务,主要的步骤是将本地 ip 和 20880 端口打开,进行监听originInvoker: 应该是 registry://ip:port/com.alibaba.dubbo.registry.RegistryService

key: 从 originInvoker 中获得发布协议的 url: dubbo://ip:port/...

bounds: 一个 prviderUrl 服务 export 之后,缓存到 bounds 中,所以一个 providerUrl 只会对应一个 exporter

//computeIfAbsent 就相当于, java8 的语法
if(bounds.get(key)==null){
   
	bounds.put(key,s->{
   })
}
    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
   
        String key = getCacheKey(originInvoker);

        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
   
            // 对原有的 invoker, 委托给了 InvokerDelegate
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
            // 将 invoker 转换为 exporter 并启动 netty 服务 ----》DubboProtocol
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
        });
    }

InvokerDelegete: 是 RegistryProtocol 的一个静态内部类,该类是一个 originInvoker 的委托类,该类存储了 originInvoker,其父类 InvokerWrapper 还会存储 providerUrl,InvokerWrapper 会调用 originInvoker 的 invoke 方法,也会销毁 invoker。可以管理 invoker 的生命周期。

5.DubboProtocol.export

基于动态代理的适配,很自然的就过渡到了 DubboProtocol 这个协议类中,但是实际上是 DubboProtocol 吗?

这里并不是获得一个单纯的 DubboProtocol 扩展点,而是会通过 Wrapper 对 Protocol 进行装饰,装饰器分别为:

QosProtocolWrapper/ProtocolListenerWrapper/ProtocolFilterWrapper/DubboProtocol

为什么会这样?回头看SPI机制。

1)Wrapper包装

在 ExtensionLoader.loadClass 这个方法中,有一段这样的判断,如果当前这个类是一个 wrapper 包装类,也就是这个 wrapper中有构造方法,参数是当前被加载的扩展点的类型,则把这个 wrapper 类加入到 cacheWrapperClass 缓存中。

    private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name,
                           boolean overridden) throws NoSuchMethodException {
   
        if (!type.isAssignableFrom(clazz)) {
   
            throw new IllegalStateException("Error occurred when loading extension class (interface: " +
                    type + ", class line: " + clazz.getName() + "), class "
                    + clazz.getName() + " is not subtype of interface.");
        }
        if (clazz.isAnnotationPresent(Adaptive.class)) {
   
            cacheAdaptiveClass(clazz, overridden);
        } else if (isWrapperClass(clazz)) {
   
            //如果当前这个类是一个 wrapper 包装类,也就是这个 wrapper中有构造方法   参数是当前被加载的扩展点的类型
            //则把这个 wrapper 类加入到 cacheWrapperClass 缓存中。
            cacheWrapperClass(clazz);
        } else {
   
            clazz.getConstructor();
            if (StringUtils.isEmpty(name)) {
   
                name = findAnnotationName(clazz);
                if (name.length() == 0) {
   
                    throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
                }
            }

            String[] names = NAME_SEPARATOR.split(name);
            if (ArrayUtils.isNotEmpty(names)) {
   
                cacheActivateClass(clazz, names[0]);
                for (String n : names) {
   
                    cacheName(clazz, n);
                    saveInExtensionClass(extensionClasses, clazz, n, overridden);
                }
            }
        }
    }
    private boolean isWrapperClass(Class<?> clazz) {
   
        try {
   
            clazz.getConstructor(type);
            return true;
        } catch (NoSuchMethodException e) {
   
            return false;
        }
    }

我们可以在 dubbo 的配置文件中找到三个 Wrapper

QosprotocolWrapper:如果当前配置了注册中心,则会启动一个 Qos 。与在线运维相关。

ProtocolFilterWrapper:对 invoker 进行 filter 的包装,实现请求的过滤。

ProtocolListenerWrapper:用于服务 export 时候插入监听机制,暂未实现

接着,在 getExtension->createExtension 方法中,会对 cacheWrapperClass 集合进行判断,如果集合不为空,则进行包装

            if (wrap) {
   

                List<Class<?>> wrapperClassesList = new ArrayList<>();
                if (cachedWrapperClasses != null) {
   
                    wrapperClassesList.addAll(cachedWrapperClasses);
                    wrapperClassesList.sort(WrapperComparator.COMPARATOR);
                    Collections.reverse(wrapperClassesList);
                }

                if (CollectionUtils.isNotEmpty(wrapperClassesList)) {
   
                    for (Class<?> wrapperClass : wrapperClassesList) {
   
                        Wrapper wrapper = wrapperClass.getAnnotation(Wrapper.class);
                        if (wrapper == null
                                || (ArrayUtils.contains(wrapper.matches(), name) && !ArrayUtils.contains(wrapper.mismatches(), name))) {
   
                            instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                        }
                    }
                }
            }

2)ProtocolFilterWrapper

这个是一个过滤器的包装,使用责任链模式,对 invoker 进行了包装。

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
   
        if (UrlUtils.isRegistry(invoker.getUrl())) {
   
            return protocol.export(invoker);
        }
        // 构建责任链,基于激活扩展点
        return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
    }
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
   
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
         //下面代码省略
    }

我们看如下文件:/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter

默认提供了非常多的过滤器。 然后基于条件激活扩展点,来对 invoker 进行包装,从而在实现远程调用的时候,会经过这些filter 进行过滤。

3)DubboProtocol.export

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
   
        URL url = invoker.getUrl();

        // 获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如
    	//${group}/copm.yhd.practice.dubbo.ISayHelloService:${version}:20880
        String key = serviceKey(url);
    	// 创建 DubboExporter
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    	// 将 <key, exporter>  键值对放入缓存中
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
   
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
   
                if (logger.isWarnEnabled()) {
   
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }

            }
        }
		//启动服务
        openServer(url);
    	//优化序列化
        optimizeSerialization(url);

        return exporter;
    }

4)openServer

去开启一个服务,并且放入到缓存中->在同一台机器上(单网卡),同一个端口上仅允许启动一个服务器实例。

    private void openServer(URL url) {
   
        // find server.
        //  获取 host:port ,并将其作为服务器实例的 key ,用于标识当前的服务器实例
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        //client  也可以暴露一个只有 server 可以调用的服务
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        if (isServer) {
   
            //是否在 serverMap 中缓存了
            ProtocolServer server = serverMap.get(key);
            if (server == null) {
   
                synchronized (this) {
   
                    server = serverMap.get(key);
                    if (server == null) {
   
                        //创建服务器实例
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
   
                //服务器已创建,则根据 url  中的配置重置服务器
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }

5)createServer

创建服务,开启心跳检测,默认使用 netty。组装 url。

    private ProtocolServer createServer(URL url) {
   
        // 组装 url ,在 url 中添加心跳时间、编解码参数
        url = URLBuilder.from(url)
            //  当服务关闭以后,发送一个只读的事件,默认是开启状态
                // send readonly event when server closes, it's enabled by default
                .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                // enable heartbeat by default
            //  启动心跳配置
                .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
                .addParameter(CODEC_KEY, DubboCodec.NAME)
                .build();
        String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
		// 通过 SPI  检测是否存在 server  参数所代表的 Transporter  拓展,不存在则抛出异常
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
   
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }
		// 创建 ExchangeServer
        ExchangeServer server;
        try {
   
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
   
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }

        str = url.getParameter(CLIENT_KEY);
        if (str != null && str.length() > 0) {
   
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
   
                throw new RpcException("Unsupported client type: " + str);
            }
        }

        return new DubboProtocolServer(server);
    }

6)Exchangers.bind

    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
   
        if (url == null) {
   
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
   
            throw new IllegalArgumentException("handler == null");
        }
        // 获取 Exchanger ,默认为 HeaderExchanger 
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        // 调用 HeaderExchanger  的 bind  方法创建 ExchangeServer  实例
        return getExchanger(url).bind(url, handler);
    }

7)headerExchanger.bind

这里面包含多个逻辑

  • new DecodeHandler(new HeaderExchangeHandler(handler))
  • Transporters.bind
  • new HeaderExchangeServer

目前只需要关心 transporters.bind 方法即可

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
   
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

8)Transporters.bind

    public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
   
        if (url == null) {
   
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
   
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
   
            handler = handlers[0];
        } else {
   
            //  如果 handlers  元素数量大于 1 ,则创建 ChannelHandler  分发器
            handler = new ChannelHandlerDispatcher(handlers);
        }
        //  获取自适应 Transporter  实例,并调用实例方法
        return getTransporter().bind(url, handler);
    }

9)getTransporter

getTransporter 是一个自适应扩展点,它针对 bind 方法添加了自适应注解,意味着,bing 方法的具体实现,会基于Transporter$Adaptive 方法进行适配,那么在这里面默认的通信协议是 netty,所以它会采用 netty4 的实现,也就是org.apache.dubbo.remoting.transport.netty4.NettyTransporter

10)NettyTransporter.bind

public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
   
    return new NettyServer(url, handler);
}

6.NettyServer

初始化一个 nettyserver,并且从 url 中获得相应的 ip/ port。然后调用 doOpen();

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
   
        // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
        // the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
        super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
    }
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
   
    super(url, handler);
    localAddress = getUrl().toInetSocketAddress();
	//获取IP
    String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
    //获取端口
    int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
    if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
   
        bindIp = ANYHOST_VALUE;
    }
    bindAddress = new InetSocketAddress(bindIp, bindPort);
    this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
    this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
    try {
   
        //  调用模板方法 doOpen  启动服务器
        doOpen();
        if (logger.isInfoEnabled()) {
   
            logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
        }
    } catch (Throwable t) {
   
        throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
    }
    executor = executorRepository.createExecutorIfAbsent(url);
}

1)doOpen

开启 netty 服务

    protected void doOpen() throws Throwable {
   
        bootstrap = new ServerBootstrap();

        bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
        workerGroup = NettyEventLoopFactory.eventLoopGroup(
                getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                "NettyServerWorker");

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NettyEventLoopFactory.serverSocketChannelClass())
                .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<SocketChannel>() {
   
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
   
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
   
                            ch.pipeline().addLast("negotiation",
                                    SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
                        }
                        ch.pipeline()
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

这里用到了一个 handler 来处理客户端传递过来的请求NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);

这个 handler 是一个链路,后续接收到的请求,会一层一层的处理。

7.Invoker是什么

从前面的分析来看,服务的发布分三个阶段

  • 创造一个 invoker
  • 把经历过一系列处理的 invoker(各种包装),在 DubboProtocol 中保存到 exporterMap 中
  • 把 dubbo 协议的 url 地址注册到注册中心上

Invoker 是 Dubbo 领域模型中非常重要的一个概念, 和 ExtensionLoader 的重要性是一样的。回到 ServiceConfig 中 export 的代码,这段代码是还没有分析过的。以这个作为入口来分析export 出去的 invoker 到底是啥东西。

Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);

1)ProxyFacotory.getInvoker

这个是一个代理工程,用来生成 invoker,从它的定义来看,它是一个自适应扩展点,看到这样的扩展点,我们几乎可以不假思索的想到它会存在一个动态适配器类。

private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

2)ProxyFactory

这个方法的简单解读为: 它是一个 spi 扩展点,并且默认的扩展实现是 javassit, 这个接口中有三个方法,并且都是加了@Adaptive 的自适应扩展点。所以如果调用 getInvoker 方法,应该会返回一个 ProxyFactory$Adaptiv。

@SPI("javassist")
public interface ProxyFactory {
   

    @Adaptive({
   PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker) throws RpcException;

    @Adaptive({
   PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;

    @Adaptive({
   PROXY_KEY})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;

}

3)ProxyFactory$Adaptive

这个自适应扩展点,做了两件事情

  • 通过 ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(extName)获取了一个指定名称的扩展点
    • dubbo-rpc-api/resources/META-INF/com.alibaba.dubbo.rpc.ProxyFactory 中,定义了 javassis=JavassisProxyFactory
  • 调用 JavassisProxyFactorygetInvoker 方法

javassist 是一个动态类库,用来实现动态代理的。构建好了代理类之后,返回一个 AbstractproxyInvoker,并且它实现了 doInvoke 方法,这个地方似乎看到了 dubbo 消费者调用过来的时候触发的影子,因为 wrapper.invokeMethod 本质上就是触发上面动态代理类的方法 invokeMethod。

总结一下 Invoke 本质上应该是一个代理,经过层层包装最终进行了发布。当消费者发起请求的时候,会获得这个invoker 进行调用。最终发布出去的 invoker, 也不是单纯的一个代理,也是经过多层包装。

十三,服务注册

1.服务注册的核心逻辑

服务发布这一条线分析完成之后,再来了解一下服务注册的过程。

在看服务发布的源码的时候,在RegistryProtocol这个类中,看到了服务发布的流程。

从export方法中抽离出来的部分代码,就是服务注册的流程。

	   //  根据 invoker 中的 url 获取 Registry 实例 : zookeeperRegistry
        final Registry registry = getRegistry(originInvoker);
        // 获取要注册到注册中心的 URL: dubbo://ip:port
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

        // decide if we need to delay publish
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        // 是否配置了注册中心,如果是, 则需要注册
        if (register) {
   
            register(registryUrl, registeredProviderUrl);
        }

2.getRegistry

  • 把url转化为对应配置的注册中心的具体协议
  • 根据具体协议,从registryFactory中获得指定的注册中心实现

这个registryFactory具体是怎么赋值的呢?

    protected Registry getRegistry(final Invoker<?> originInvoker) {
   
        //把url转化为配置的具体协议,比如zookeeper://ip:port. 这样后续获得的注册中心就会是基于zk的实现
        URL registryUrl = getRegistryUrl(originInvoker);
        return registryFactory.getRegistry(registryUrl);
    }

在RegistryProtocol中存在一段这样的代码,很明显这是通过依赖注入来实现的扩展点。

private RegistryFactory registryFactory;
public void setRegistryFactory(RegistryFactory registryFactory) {
   
        this.registryFactory = registryFactory;
    }

按照扩展点的加载规则,我们可以先看看/META-INF/dubbo/internal路径下找到RegistryFactory的配置文件.这个factory有多个扩展点的实现。

dubbo=org.apache.dubbo.registry.dubbo.DubboRegistryFactory
multicast=org.apache.dubbo.registry.multicast.MulticastRegistryFactory
zookeeper=org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory
redis=org.apache.dubbo.registry.redis.RedisRegistryFactory
consul=org.apache.dubbo.registry.consul.ConsulRegistryFactory
etcd3=org.apache.dubbo.registry.etcd.EtcdRegistryFactory

接着,找到RegistryFactory的实现, 发现它里面有一个自适应的方法,根据url中protocol传入的值进行适配。

@SPI("dubbo")
public interface RegistryFactory {
   
}

3.RegistryFactory$Adaptive

由于在前面的代码中,url中的protocol已经改成了zookeeper,那么这个时候根据zookeeper获得的spi扩展点应该是ZookeeperRegistryFactory。

import org.apache.dubbo.common.extension.ExtensionLoader;

public class RegistryFactory$Adaptive implements org.apache.dubbo.registry.RegistryFactory {
   
    
  public org.apache.dubbo.registry.Registry getRegistry(org.apache.dubbo.common.URL arg0) {
   
    
    if (arg0 == null) throw new IllegalArgumentException("url == null");
    org.apache.dubbo.common.URL url = arg0;
    String extName = ( url.getProtocol() == null ? "dubbo" :url.getProtocol() );
    if(extName == null) throw new IllegalStateException("Failed to getextension (org.apache.dubbo.registry.RegistryFactory) name from url (" +url.toString() + ") use keys([protocol])");
    org.apache.dubbo.registry.RegistryFactory extension =(org.apache.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.registry.RegistryFactory.class).getExtension(extName);
    return extension.getRegistry(arg0);
 }
}

4.ZookeeperRegistryFactory

这个方法中并没有getRegistry方法,而是在父类AbstractRegistryFactory

  • 从缓存REGISTRIES中,根据key获得对应的Registry
  • 如果不存在,则创建Registry
    @Override
    public Registry getRegistry(URL url) {
   
        if (destroyed.get()) {
   
            
            return DEFAULT_NOP_REGISTRY;
        }

        url = URLBuilder.from(url)
                .setPath(RegistryService.class.getName())
                .addParameter(INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(EXPORT_KEY, REFER_KEY)
                .build();
        String key = createRegistryCacheKey(url);
        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();
        try {
   
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
   
                return registry;
            }
            //create registry by spi/ioc
            //创建注册中心
            registry = createRegistry(url);
            if (registry == null) {
   
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
   
            // Release the lock
            LOCK.unlock();
        }
    }

5.createRegistry

创建一个zookeeperRegistry,把urlzookeepertransporter作为参数传入。

zookeeperTransporter 这个属性也是基于依赖注入来赋值的,具体的流程就不再分析了,这个的值应该是CuratorZookeeperTransporter 表示具体使用什么框架来和zk产生连接。

    @Override
    public Registry createRegistry(URL url) {
   
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

6.ZookeeperRegistry

这个方法中使用了CuratorZookeeperTransport来实现zk的连接。

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
   
        super(url);
        if (url.isAnyHost()) {
   
            throw new IllegalStateException("registry address == null");
        }
        //获得group名称
        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(PATH_SEPARATOR)) {
   
            group = PATH_SEPARATOR + group;
        }
        this.root = group;
        //产生一个zookeeper连接
        zkClient = zookeeperTransporter.connect(url);
        //添加zookeeper状态变化事件
        zkClient.addStateListener((state) -> {
   
            if (state == StateListener.RECONNECTED) {
   
                
                ZookeeperRegistry.this.fetchLatestAddresses();
            } else if (state == StateListener.NEW_SESSION_CREATED) {
   
               
                try {
   
                    ZookeeperRegistry.this.recover();
                } catch (Exception e) {
   
                    logger.error(e.getMessage(), e);
                }
            } else if (state == StateListener.SESSION_LOST) {
   
                
            } else if (state == StateListener.SUSPENDED) {
   

            } else if (state == StateListener.CONNECTED) {
   

            }
        });
    }

7.register

获取到了注册中心之后,回到服务注册的核心逻辑,开始调用register方法,去将dubbo://的协议地址注册到zookeeper上

    private void register(URL registryUrl, URL registeredProviderUrl) {
   
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registeredProviderUrl);
    }

这个方法会调用FailbackRegistry类中的register. 为什么呢?因为ZookeeperRegistry这个类中并没有register这个方法,但是他的父类FailbackRegistry中存在register方法,而这个类又重写了AbstractRegistry类中的register方法。所以我们可以直接定位大FailbackRegistry这个类中的register方法中

8.FailbackRegistry.register

    @Override
    public void register(URL url) {
   
        if (!acceptable(url)) {
   
            
            return;
        }
        super.register(url);
        removeFailedRegistered(url);
        removeFailedUnregistered(url);
        try {
   
            // 调用子类实现真正的服务注册,把url注册到zk上
            doRegister(url);
        } catch (Exception e) {
   
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            // 如果开启了启动时检测,则直接抛出异常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
   
                if (skipFailback) {
   
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
   
                
            }

            // Record a failed registration request to a failed list, retry regularly
            // 将失败的注册请求记录到失败列表,定时重试
            addFailedRegistered(url);
        }
    }

9.ZookeeperRegistry.doRegister

最终调用curator的客户端把服务地址注册到zk。

    @Override
    public void doRegister(URL url) {
   
        try {
   
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
   
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

十四,服务消费

1.服务消费应该具备的逻辑

  • 生成一个代理对象–实现网络通信的细节
  • 建立通信连接–netty
  • 从zk获取目标地址–订阅节点变化
  • 实现负载均衡
  • 实现集群容错
  • mock
  • 序列化

2.服务消费的入口

ReferenceAnnotationBeanPostProcessor->ReferenceBeanInvocationHandler.init->ReferenceConfig.get() //获得一个远程代理类

1)ReferenceConfig.get()

    public synchronized T get() {
   
        if (destroyed) {
   
            throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
        }
        if (ref == null) {
   //如果当前接口的远程代理引用为空,则进行初始化
            init();
        }
        return ref;
    }

2)init

初始化的过程,和服务发布的过程类似,会有特别多的判断以及参数的组装. 我们只需要关注createProxy,创建代理类的方法。

    public synchronized void init() {
   
        if (initialized) {
   
            return;
        }

        if (bootstrap == null) {
   
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.init();
        }

        checkAndUpdateSubConfigs();

        checkStubAndLocal(interfaceClass);
        ConfigValidationUtils.checkMock(interfaceClass, this);

        Map<String, String> map = new HashMap<String, String>();
        map.put(SIDE_KEY, CONSUMER_SIDE);

        ReferenceConfigBase.appendRuntimeParameters(map);
        if (!ProtocolUtils.isGeneric(generic)) {
   
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
   
                map.put(REVISION_KEY, revision);
            }

            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
   
                logger.warn("No method found in service interface " + interfaceClass.getName());
                map.put(METHODS_KEY, ANY_VALUE);
            } else {
   
                map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
            }
        }
        map.put(INTERFACE_KEY, interfaceName);
        AbstractConfig.appendParameters(map, getMetrics());
        AbstractConfig.appendParameters(map, getApplication());
        AbstractConfig.appendParameters(map, getModule());
        // remove 'default.' prefix for configs from ConsumerConfig
        // appendParameters(map, consumer, Constants.DEFAULT_KEY);
        AbstractConfig.appendParameters(map, consumer);
        AbstractConfig.appendParameters(map, this);
        MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
        if (metadataReportConfig != null && metadataReportConfig.isValid()) {
   
            map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
        }
        Map<String, AsyncMethodInfo> attributes = null;
        if (CollectionUtils.isNotEmpty(getMethods())) {
   
            attributes = new HashMap<>();
            for (MethodConfig methodConfig : getMethods()) {
   
                AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
                String retryKey = methodConfig.getName() + ".retry";
                if (map.containsKey(retryKey)) {
   
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
   
                        map.put(methodConfig.getName() + ".retries", "0");
                    }
                }
                AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
                if (asyncMethodInfo != null) {
   
//                    consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
                    attributes.put(methodConfig.getName(), asyncMethodInfo);
                }
            }
        }

        String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
        if (StringUtils.isEmpty(hostToRegistry)) {
   
            hostToRegistry = NetUtils.getLocalHost();
        } else if (isInvalidLocalHost(hostToRegistry)) {
   
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
        }
        map.put(REGISTER_IP_KEY, hostToRegistry);

        serviceMetadata.getAttachments().putAll(map);
		//创建代理
        ref = createProxy(map);

        serviceMetadata.setTarget(ref);
        serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
        ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
        consumerModel.setProxyObject(ref);
        consumerModel.init(attributes);

        initialized = true;

        checkInvokerAvailable();

        // dispatch a ReferenceConfigInitializedEvent since 2.7.4
        dispatch(new ReferenceConfigInitializedEvent(this, invoker));
    }

3)createProxy

  • 判断是否为本地调用,如果是则使用injvm协议进行调用
  • 判断是否为点对点调用,如果是则把url保存到urls集合中,如果url为1,进入步骤4,如果urls>1,则执行5
  • 如果是配置了注册中心,遍历注册中心,把url添加到urls集合,url为1,进入步骤4,如果urls>1,则执行5
  • 直连构建invoker
  • 构建invokers集合,通过cluster合并多个invoker
  • 最后调用 ProxyFactory 生成代理类
    private T createProxy(Map<String, String> map) {
   
        if (shouldJvmRefer(map)) {
   //判断是否是在同一个jvm进程中调用
            URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
            invoker = REF_PROTOCOL.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
   
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
   
            urls.clear();
            //url 如果不为空,说明是点对点通信
            if (url != null && url.length() > 0) {
    
                String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
   
                    for (String u : us) {
   
                        URL url = URL.valueOf(u);
                        if (StringUtils.isEmpty(url.getPath())) {
   
                            url = url.setPath(interfaceName);
                        }
                        // 检测 url 协议是否为 registry,若是,表明用户想使用指定的注册中心
                        if (UrlUtils.isRegistry(url)) {
   
                            // 将 map 转换为查询字符串,并作为 refer 参数的值添加到url 中
                            urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
   
                            // 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性),比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等,最后将合并后的配置设置为 url 查询字符串中。
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else {
    // assemble URL from register center's configuration
              
                if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
   
                    checkRegistry(); //校验注册中心的配置以及是否有必要从配置中心组装url
                    //这里的代码实现和服务端类似,也是根据注册中心配置进行解析得到URL
                    //这里的URL肯定也是:registry://ip:port/org.apache.dubbo.service.RegsitryService
                    List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
                    if (CollectionUtils.isNotEmpty(us)) {
   
                        for (URL u : us) {
   
                            URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                            if (monitorUrl != null) {
   
                                map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                            urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        }
                    }
                    if (urls.isEmpty()) {
   //如果没有配置注册中心,则报错
                        throw new IllegalStateException();
                    }
                }
            }
			//如果值配置了一个注册中心或者一个服务提供者,直接使用refprotocol.refer
            if (urls.size() == 1) {
   
                invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
            } else {
   
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
   //遍历urls生成多个invoker
                    invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                    if (UrlUtils.isRegistry(url)) {
   
                        registryURL = url; // use last registry url
                    }
                }
                if (registryURL != null) {
    //如果registryUrl不为空,构建静态directory
                    
                    String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
                    // 通过Cluster将多个invoker合并RegistryAwareClusterInvoker(StaticDirectory) ->FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
                    invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
                } else {
    // not a registry url, must be direct invoke.
                    String cluster = CollectionUtils.isNotEmpty(invokers)
                            ? (invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) : Cluster.DEFAULT)
                            : Cluster.DEFAULT;
                    invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
                }
            }
        }

        if (logger.isInfoEnabled()) {
   
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }
        /**
         * @since 2.7.0
         * ServiceData Store
         */
        String metadata = map.get(METADATA_KEY);
        WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
        if (metadataService != null) {
   
            URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
            metadataService.publishServiceDefinition(consumerURL);
        }
        // create service proxy
        return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
    }

4)protocol.refer

这里通过指定的协议来调用refer生成一个invoker对象,invoker前面看过,它是一个代理对象。那么在当前的消费端而言,invoker主要用于执行远程调用。

这个protocol,又是一个自适应扩展点,它得到的是一个Protocol$Adaptive.

根据当前的协议url,得到一个指定的扩展点,传递进来的参数中,协议地址为registry://,所以,我们可以直接定位到RegistryProtocol.refer代码

5)RegistryProtocol.refer

  • 组装注册中心协议的url
  • 判断是否配置了group,如果有,则cluster=getMergeableCluster(),构建invoker
  • doRefer构建invoker
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
   
        //根据配置的协议,生成注册中心的url: zookeeper://
        url = getRegistryUrl(url);
        //获取注册中心
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
   
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        // 解析group参数,根据group决定cluster的类型
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) {
   
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
   
                return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url);
            }
        }

        Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
        return doRefer(cluster, registry, type, url);
    }

6)doRefer

  • 构建一个RegistryDirectory
  • 构建一个consumer://协议的地址注册到注册中心
  • 订阅zookeeper中节点的变化
  • 调用cluster.join方法
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
   
        //RegistryDirectory初始化
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        //注册consumer://协议的url
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (directory.isShouldRegister()) {
   
            directory.setRegisteredConsumerUrl(subscribeUrl);
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(subscribeUrl);
        //订阅事件监听
        directory.subscribe(toSubscribeUrl(subscribeUrl));
	   //构建invoker
        Invoker<T> invoker = cluster.join(directory);
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) {
   
            return invoker;
        }

        RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker);
        for (RegistryProtocolListener listener : listeners) {
   
            listener.onRefer(this, registryInvokerWrapper);
        }
        return registryInvokerWrapper;
    }

3.Cluster是什么

我们只关注一下Invoker这个代理类的创建过程,其他的暂且不关心.

Invoker<T> invoker = cluster.join(directory);

在方法refer中:

Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
    static Cluster getCluster(String name) {
   
        return getCluster(name, true);
    }

    static Cluster getCluster(String name, boolean wrap) {
   
        if (StringUtils.isEmpty(name)) {
   
            name = Cluster.DEFAULT;
        }
        return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(name, wrap);
    }

1)Cluster$Adaptive

在动态适配的类中会基于extName,选择一个合适的扩展点进行适配,由于默认情况下cluster:failover,所以getExtension(“failover”)理论上应该返回FailOverCluster。但实际上,这里做了包装 MockClusterWrapper(FailOverCluster)

public class Cluster$Adaptive implements org.apache.dubbo.rpc.cluster.Cluster {
   
  public org.apache.dubbo.rpc.Invoker join(org.apache.dubbo.rpc.cluster.Directory arg0) throws org.apache.dubbo.rpc.RpcException {
   
    if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argument ==null");
    if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argumentgetUrl() == null");
    org.apache.dubbo.common.URL url = arg0.getUrl();
    String extName = url.getParameter("cluster", "failover");
    if(extName == null) 
        throw new IllegalStateException("Failed to getextension (org.apache.dubbo.rpc.cluster.Cluster) name from url (" +url.toString() + ") use keys([cluster])");
    org.apache.dubbo.rpc.cluster.Cluster extension =(org.apache.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(org.apa
che.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
    return extension.join(arg0);
 }
}

2)cluster.join

再回到doRefer方法,下面这段代码, 实际是调用MockClusterWrapper(FailOverCluster.join)

Invoker invoker = cluster.join(directory);

所以这里返回的invoker,应该是MockClusterWrapper(FailOverCluster(directory))

接着回到ReferenceConfig.createProxy方法中的最后一行.

3)proxyFactory.getProxy

拿到invoker之后,会调用获得一个动态代理类

return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));

而,这个PROXY_FACTORY 又是一个自适应扩展点。

    /**
     * A {@link ProxyFactory} implementation that will generate a reference service's proxy,the JavassistProxyFactory is
     * its default implementation
     */
    private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

4)JavassistProxyFactory.getProxy

通过这个方法生成了一个动态代理类,并且对invoker再做了一层处理,InvokerInvocationHandler。意味着后续发起服务调用的时候,会由InvokerInvocationHandler来进行处理。

    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
   
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

5)proxy.getProxy

在proxy.getProxy这个方法中会生成一个动态代理类,通过debug的形式可以看到动态代理类的原貌,在getProxy这个方法位置加一个断点。

public static Proxy getProxy(Class<?>... ics) {
   
    return getProxy(ClassUtils.getClassLoader(Proxy.class), ics);
}

@Reference注入的一个对象实例本质上就是一个动态代理类,通过调用这个类中的方法,会触发handler.invoke(), 而这个handler就是InvokerInvocationHandler。

4.网络连接的建立

接下来看 目标服务地址信息以及网络通信的建立。

1)RegistryProtocol.doRefer

关注directory.subscribe这个方法,它是实现服务目标服务订阅的。

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
   
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (directory.isShouldRegister()) {
   
            directory.setRegisteredConsumerUrl(subscribeUrl);
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(subscribeUrl);
        directory.subscribe(toSubscribeUrl(subscribeUrl));

        Invoker<T> invoker = cluster.join(directory);
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) {
   
            return invoker;
        }

        RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker);
        for (RegistryProtocolListener listener : listeners) {
   
            listener.onRefer(this, registryInvokerWrapper);
        }
        return registryInvokerWrapper;
    }

2)RegistryDirectory.subscribe

订阅注册中心指定节点的变化,如果发生变化,则通知到RegistryDirectory。Directory其实和服务的注册以及服务的发现有非常大的关联。

    public void subscribe(URL url) {
   
        setConsumerUrl(url); //设置consumerUrl
        //把当前RegistryDirectory作为listener,去监听zk上节点的变化
        CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
        serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
        //订阅 -> 这里的registry是zookeeperRegsitry
        registry.subscribe(url, this);
    }

这里的registry 是ZookeeperRegistry ,会去监听并获取路径下面的节点。监听的路径是:
/dubbo/org.apache.dubbo.demo.DemoService/providers/dubbo/org.apache.dubbo.demo.DemoService/configurators/dubbo/org.apache.dubbo.demo.DemoService/routers 节点下面的子节点变动。

3)FailbackRegistry.subscribe

listener为RegistryDirectory,后续要用到,移除失效的listener,调用doSubscribe进行订阅。

    public void subscribe(URL url, NotifyListener listener) {
   
        super.subscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
   
            // Sending a subscription request to the server side
            doSubscribe(url, listener);
        } catch (Exception e) {
   
            Throwable t = e;

            List<URL> urls = getCacheUrls(url);
            if (CollectionUtils.isNotEmpty(urls)) {
   
                notify(url, listener, urls);
                logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
            } else {
   
                // If the startup detection is opened, the Exception is thrown directly.
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true);
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
   
                    if (skipFailback) {
   
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                } else {
   
                    logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
            }

            // Record a failed registration request to a failed list, retry regularly
            addFailedSubscribed(url, listener);
        }
    }

4)ZookeeperRegistry.doSubscribe

这个方法是订阅,逻辑实现比较多,可以分两段来看,这里的实现把所有Service层发起的订阅以及指定的Service层发起的订阅分开处理。所有Service层类似于监控中心发起的订阅。指定的Service层发起的订阅可以看作是服务消费者的订阅。我们只需要关心指定service层发起的订阅即可。

    public void doSubscribe(final URL url, final NotifyListener listener) {
   
        try {
   
            if (ANY_VALUE.equals(url.getServiceInterface())) {
   
                String root = toRootPath();
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
   
                    for (String child : currentChilds) {
   
                        child = URL.decode(child);
                        if (!anyServices.contains(child)) {
   
                            anyServices.add(child);
                            subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                                    Constants.CHECK_KEY, String.valueOf(false)), k);
                        }
                    }
                });
                zkClient.create(root, false);
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (CollectionUtils.isNotEmpty(services)) {
   
                    for (String service : services) {
   
                        service = URL.decode(service);
                        anyServices.add(service);
                        subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
            } else {
   
                List<URL> urls = new ArrayList<>();
                for (String path : toCategoriesPath(url)) {
   
                    // 如果之前该路径没有添加过listener,则创建一个map来放置listener
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                    // 如果没有添加过对于子节点的listener,则创建,通知服务变化 回调NotifyListener
                    ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
                    zkClient.create(path, false);
                    //添加path节点的当前节点及子节点监听,并且获取子节点信息
                    //也就是dubbo://ip:port/...
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
   
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                //调用notify进行通知,对已经可用的列表进行通知
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
   
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

5)FailbackRegistry.notify

调用FailbackRegistry.notify, 对参数进行判断。 然后调用AbstractRegistry.notify方法。

    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
   
        if (url == null) {
   
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
   
            throw new IllegalArgumentException("notify listener == null");
        }
        try {
   
            doNotify(url, listener, urls);
        } catch (Exception t) {
   
            // Record a failed registration request to a failed list, retry regularly
            addFailedNotified(url, listener, urls);
            logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }
    }

6)AbstractRegistry.notify

这里面会针对每一个category,调用listener.notify进行通知,然后更新本地的缓存文件

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
   
    if (url == null) {
   
        throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
   
        throw new IllegalArgumentException("notify listener == null");
    }
    if ((CollectionUtils.isEmpty(urls))
            && !ANY_VALUE.equals(url.getServiceInterface())) {
   
        logger.warn("Ignore empty notify urls for subscribe url " + url);
        return;
    }
    if (logger.isInfoEnabled()) {
   
        logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
    }
    // keep every provider's category.
    Map<String, List<URL>> result = new HashMap<>();
    for (URL u : urls) {
   
        if (UrlUtils.isMatch(url, u)) {
   
            String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
            List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
            categoryList.add(u);
        }
    }
    if (result.size() == 0) {
   
        return;
    }
    Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
   
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        listener.notify(categoryList);
        // We will update our cache file after each notification.
        // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
        saveProperties(url);
    }
}

消费端的listener是最开始传递过来的RegistryDirectory,所以这里会触发RegistryDirectory.notify.

7)RegistryDirectory.notify

Invoker的网络连接以及后续的配置变更,都会调用这个notify方法

urls: zk的path数据,这里表示的是dubbo://

public synchronized void notify(List<URL> urls) {
   
    //对url列表进行校验、过滤,然后分成 config、router、provider 3个分组map
    Map<String, List<URL>> categoryUrls = urls.stream()
            .filter(Objects::nonNull)
            .filter(this::isValidCategory)
            .filter(this::isNotCompatibleFor26x)
            .collect(Collectors.groupingBy(this::judgeCategory));

    List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
    this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
	// 如果router 路由节点有变化,则从新将router 下的数据生成router
    List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
    toRouters(routerURLs).ifPresent(this::addRouters);

    // 获得provider URL,然后调用refreshOverrideAndInvoker进行刷新
    List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
    /**
     * 3.x added for extend URL address
     */
    ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
    List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
    if (supportedListeners != null && !supportedListeners.isEmpty()) {
   
        for (AddressListener addressListener : supportedListeners) {
   
            providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
        }
    }
    refreshOverrideAndInvoker(providerURLs);
}

8)refreshOverrideAndInvoker

  • 逐个调用注册中心里面的配置,覆盖原来的url,组成最新的url 放入overrideDirectoryUrl 存储
  • 根据 provider urls,重新刷新Invoker
    private void refreshOverrideAndInvoker(List<URL> urls) {
   
        // mock zookeeper://xxx?mock=return null
        overrideDirectoryUrl();
        refreshInvoker(urls);
    }

9)refreshInvoker

    private void refreshInvoker(List<URL> invokerUrls) {
   
        Assert.notNull(invokerUrls, "invokerUrls should not be null");

        if (invokerUrls.size() == 1
               //...
            //如果是空协议,则直接返回不允许访问
        } else {
   
            this.forbidden = false; // Allow to access
            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
            if (invokerUrls == Collections.<URL>emptyList()) {
   //如果url为空
                invokerUrls = new ArrayList<>();
            }
            if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
   
                invokerUrls.addAll(this.cachedInvokerUrls);
            } else {
   
                this.cachedInvokerUrls = new HashSet<>();
                this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
            }
            if (invokerUrls.isEmpty()) {
   
                return;
            }
            //根据provider url,生成新的invoker
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

            /**
             * If the calculation is wrong, it is not processed.
             *
             * 1. The protocol configured by the client is inconsistent with the protocol of the server.
             *    eg: consumer protocol = dubbo, provider only has other protocol services(rest).
             * 2. The registration center is not robust and pushes illegal specification data.
             *
             */
            if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
   
                logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
                        .toString()));
                return;
            }

            List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
           
            routerChain.setInvokers(newInvokers);
             //如果服务配置了分组,则把分组下的provider包装成StaticDirectory,组成一个invoker
            //实际上就是按照group进行合并
            this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
            this.urlInvokerMap = newUrlInvokerMap;

            try {
   
                //旧的url 是否在新map里面存在,不存在,就是销毁url对应的Invoker
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
            } catch (Exception e) {
   
                logger.warn("destroyUnusedInvokers error. ", e);
            }
        }
    }

10)toInvokers

这里用到了protocol.refer来构建了一个invoker.invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url,providerUrl);

构建完成之后,会保存在 Map<String, Invoker<T>> urlInvokerMap 这个集合中。

    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
   
        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
        if (urls == null || urls.isEmpty()) {
   
            return newUrlInvokerMap;
        }
        Set<String> keys = new HashSet<>();
        String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
        for (URL providerUrl : urls) {
   
            // If protocol is configured at the reference side, only the matching protocol is selected
            if (queryProtocols != null && queryProtocols.length() > 0) {
   
                boolean accept = false;
                String[] acceptProtocols = queryProtocols.split(",");
                for (String acceptProtocol : acceptProtocols) {
   
                    if (providerUrl.getProtocol().equals(acceptProtocol)) {
   
                        accept = true;
                        break;
                    }
                }
                if (!accept) {
   
                    continue;
                }
            }
            if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
   
                continue;
            }
            if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
   
                logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
                        " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
                        " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
                        ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                continue;
            }
            URL url = mergeUrl(providerUrl);

            String key = url.toFullString(); // The parameter urls are sorted
            if (keys.contains(key)) {
    // Repeated url
                continue;
            }
            keys.add(key);
            // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) {
    // Not in the cache, refer again
                try {
   
                    boolean enabled = true;
                    if (url.hasParameter(DISABLED_KEY)) {
   
                        enabled = !url.getParameter(DISABLED_KEY, false);
                    } else {
   
                        enabled = url.getParameter(ENABLED_KEY, true);
                    }
                    if (enabled) {
   
                        invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {
   
                    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                }
                if (invoker != null) {
    // Put new invoker in cache
                    newUrlInvokerMap.put(key, invoker);
                }
            } else {
   
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
    }

5.protocol.refer

调用指定的协议来进行远程引用。protocol是一个Protocol$Adaptive类,而真正的实现应该是:

ProtocolListenerWrapper(ProtocolFilterWrapper(QosProtocolWrapper(DubboProtocol.refer)

前面的包装过程,在服务发布的时候已经分析过了,我们直接进入DubboProtocol.protocolBindingRefer方法

1)DubboProtocol.protocolBindingRefer

  • 优化序列化
  • 构建DubboInvoker

在构建DubboInvoker时,会构建一个ExchangeClient,通过getClients(url)方法,这里基本可以猜到到是服务的通信建立。

    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
   
        optimizeSerialization(url);

        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);

        return invoker;
    }

2)getClients

这里面是获得客户端连接的方法

  • 判断是否为共享连接,默认是共享同一个连接进行通信
  • 是否配置了多个连接通道 connections,默认只有一个
    private ExchangeClient[] getClients(URL url) {
   
        // whether to share connection

        boolean useShareConnect = false;

        int connections = url.getParameter(CONNECTIONS_KEY, 0);
        List<ReferenceCountExchangeClient> shareClients = null;
        //如果没有配置连接数,则默认为共享连接
        if (connections == 0) {
   
            useShareConnect = true;

            /*
             * The xml configuration should have a higher priority than properties.
             */
            String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
            connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
                    DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
            ///
            shareClients = getSharedClient(url, connections);
        }

        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
   
            if (useShareConnect) {
   
                clients[i] = shareClients.get(i);

            } else {
   
                clients[i] = initClient(url);
            }
        }

        return clients;
    }

3)getSharedClient

获得一个共享连接

    private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
   
        String key = url.getAddress();
        List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);
        //检查当前的key检查连接是否已经创建过并且可用,如果是,则直接返回并且增加连接的个数的统计
        if (checkClientCanUse(clients)) {
   
            batchClientRefIncr(clients);
            return clients;
        }
        //如果连接已经关闭或者连接没有创建过
        locks.putIfAbsent(key, new Object());
        synchronized (locks.get(key)) {
   
            clients = referenceClientMap.get(key);
            // dubbo check
            // 在创建连接之前,在做一次检查,防止连接并发创建
            if (checkClientCanUse(clients)) {
   
                batchClientRefIncr(clients);
                return clients;
            }

            // connectNum must be greater than or equal to 1
            // 连接数必须大于等于1
            connectNum = Math.max(connectNum, 1);

            // If the clients is empty, then the first initialization is
            //如果当前消费者还没有和服务端产生连接,则初始化
            if (CollectionUtils.isEmpty(clients)) {
   
                clients = buildReferenceCountExchangeClientList(url, connectNum);
                //创建clients之后,保存到map中
                referenceClientMap.put(key, clients);

            } else {
   //如果clients不为空,则从clients数组中进行遍历
                for (int i = 0; i < clients.size(); i++) {
   
                    ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
                    // If there is a client in the list that is no longer available, create a new one to replace him.
                    // 如果在集合中存在一个连接但是这个连接处于closed状态,则重新构建一个进行替换
                    if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
   
                        clients.set(i, buildReferenceCountExchangeClient(url));
                        continue;
                    }
					//增加个数
                    referenceCountExchangeClient.incrementAndGetCount();
                }
            }


            locks.remove(key);

            return clients;
        }
    }

4)buildReferenceCountExchangeClientList

根据连接数配置,来构建指定个数的链接。默认为1

    private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
   
        ExchangeClient exchangeClient = initClient(url);

        return new ReferenceCountExchangeClient(exchangeClient);
    }

5)initClient

进入到初始化客户端连接的方法了,猜测应该是根据url中配置的参数进行远程通信的构建

private ExchangeClient initClient(URL url) {
   

    // client type setting.
    // 获得连接类型
    String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
	//添加默认序列化方式
    url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
    // enable heartbeat by default
    //设置心跳时间
    url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));

    // BIO is not allowed since it has severe performance issue.
    // 判断str是否存在于扩展点中,如果不存在则直接报错
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
   
        throw new RpcException("Unsupported client type: " + str + "," +
                " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
    }

    ExchangeClient client;
    try {
   
        // connection should be lazy
        // 是否需要延迟创建连接,注意,这里的requestHandler是一个适配器
        if (url.getParameter(LAZY_CONNECT_KEY, false)) {
   
            client = new LazyConnectExchangeClient(url, requestHandler);

        } else {
   
            client = Exchangers.connect(url, requestHandler);
        }

    } catch (RemotingException e) {
   
        throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
    }

    return client;
}

6)Exchangers.connect

创建一个客户端连接

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
   
    if (url == null) {
   
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
   
        throw new IllegalArgumentException("handler == null");
    }
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    return getExchanger(url).connect(url, handler);
}

7)HeaderExchange.connect

主要关注transporters.connect

public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
   
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

8)NettyTransport.connect

使用netty构建了一个客户端连接

    public Client connect(URL url, ChannelHandler handler) throws RemotingException {
   
        return new NettyClient(url, handler);
    }

6.梳理

RegistryProtocol.refer 过程中有一个关键步骤,即在监听到服务提供者url时触发RegistryDirectory.notify() 方法。

RegistryDirectory.notify() 方法调用 refreshInvoker() 方法将服务提供者urls转换为对应的 远程invoker ,最终调用到 DubboProtocol.refer() 方法生成对应的 DubboInvoker 。

DubboInvoker 的构造方法中有一项入参 ExchangeClient[] clients ,即对应本文要讲的网络客户端 Client 。DubboInvoker就是通过调用 client.request() 方法完成网络通信的请求发送和响应接收功能。

Client 的具体生成过程就是通过 DubboProtocol 的 initClient(URL url) 方法创建了一个HeaderExchangeClient 。

7.客户端生成Proxy

1)JavassistProxyFactory.getProxy

    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
   
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

这个invoker实际上是:MockClusterWrapper(FailoverCluster(directory))然后通过InvokerInvocationHandler做了一层包装变成了InvokerInvocationHandler(MockClusterWrapper(FailoverCluster(directory)))

2)proxy.getProxy

这个方法里面,会生成一个动态代理的方法,我们通过debug可以看到动态字节码的拼接过程。它代理了当前这个接口的方法 sayHello , 并且方法里面是使用handler.invoke进行调用的。

而handler又是这样一个实现:InvokerInvocationHandler(MockClusterWrapper(FailoverCluster(directory)))

public java.lang.String sayHello(java.lang.String arg0){
   
     Object[] args = new Object[1];
     args[0] = ($w)$1;
     Object ret = handler.invoke(this, methods[0], args);
    return (java.lang.String)ret;
}

8.消费端调用的过程

1)nvokerInvocationHandler.invoke

这个方法主要判断当前调用的远程方法,如果是tostring、hashcode、equals,就直接返回否则,调用invoker.invoke,进入到 MockClusterWrapper.invoke 方法。

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
   
    if (method.getDeclaringClass() == Object.class) {
   
        return method.invoke(invoker, args);
    }
    String methodName = method.getName();
    Class<?>[] parameterTypes = method.getParameterTypes();
    if (parameterTypes.length == 0) {
   
        if ("toString".equals(methodName)) {
   
            return invoker.toString();
        } else if ("$destroy".equals(methodName)) {
   
            invoker.destroy();
            return null;
        } else if ("hashCode".equals(methodName)) {
   
            return invoker.hashCode();
        }
    } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
   
        return invoker.equals(args[0]);
    }
    RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
    String serviceKey = invoker.getUrl().getServiceKey();
    rpcInvocation.setTargetServiceUniqueName(serviceKey);
  
    if (consumerModel != null) {
   
        rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
        rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
    }

    return invoker.invoke(rpcInvocation).recreate();
}

2)MockClusterInvoker.invoke

  • 是否客户端强制配置了mock调用,那么在这种场景中主要可以用来解决服务端还没开发好的时候直接使用本地数据进行测试
  • 是否出现了异常,如果出现异常则使用配置好的Mock类来实现服务的降级
public Result invoke(Invocation invocation) throws RpcException {
   
    Result result = null;
    //从url中获得MOCK_KEY对应的value
    String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
    //如果没有配置mock,则直接传递给下个invoker调用
    if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
   
        //no mock
        result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {
   //如果强制为本地调用,则执行mockInvoke
        if (logger.isWarnEnabled()) {
   
            logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl());
        }
        //force:direct mock
        result = doMockInvoke(invocation, null);
    } else {
   
        //fail-mock
        try {
   
            result = this.invoker.invoke(invocation);

            //fix:#4585
            if(result.getException() != null && result.getException() instanceof RpcException){
   
                RpcException rpcException= (RpcException)result.getException();
                if(rpcException.isBiz()){
   
                    throw  rpcException;
                }else {
   
                    result = doMockInvoke(invocation, rpcException);
                }
            }

        } catch (RpcException e) {
   
            if (e.isBiz()) {
   
                throw e;
            }
			//如果远程调用出现异常,则使用Mock进行处理
            if (logger.isWarnEnabled()) {
   
                logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e);
            }
            result = doMockInvoke(invocation, e);
        }
    }
    return result;
}

3)AbstractClusterInvoker.invoke

public Result invoke(final Invocation invocation) throws RpcException {
   
    checkWhetherDestroyed();

    // binding attachments into invocation.
    //Dubbo中,可以通过 RpcContext 上的 setAttachment 和 getAttachment 在服务消费方和提供方之间进行参数的隐式传递
    Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
   
        ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
    }
	//通过list获得invoker列表,这个列表基本可以猜测到是从directory里面获得的、但是这里面还实现了服务路由的逻辑,
    //简单来说就是先拿到invoker列表,然后通过router进行服务路由,筛选出符合路由规则的服务提供者
    List<Invoker<T>> invokers = list(invocation);
    //初始化负载均衡机制
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}

4)initLoadBalance

初始化负载均衡

protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
   
    if (CollectionUtils.isNotEmpty(invokers)) {
   
        return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE));
    } else {
   
        return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);
    }
}

5)FailoverClusterInvoker.doInvoke

failover是失败重试,所以这里面应该会实现容错的逻辑

  • 获得重试的次数,并且进行循环
  • 获得目标服务,并且记录当前已经调用过的目标服务防止下次继续将请求发送过去
  • 如果执行成功,则返回结果
  • 如果出现异常,判断是否为业务异常,如果是则抛出,否则,进行下一次重试

  • 这里的 Invoker 是 Provider 的一个可调用 Service 的抽象, Invoker 封装了 Provider 地址及 Service 接口信息
  • Directory 代表多个 Invoker ,可以把它看成 List<Invoker> ,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更
  • Cluster 将 Directory 中的多个 Invoker 伪装成一个 Invoker ,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个
  • Router 负责从多个 Invoker 中按路由规则选出子集,比如读写分离,应用隔离等
  • LoadBalance 负责从多个 Invoker 中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
   
    List<Invoker<T>> copyInvokers = invokers;
    checkInvokers(copyInvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
    if (len <= 0) {
   
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
   
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        if (i > 0) {
   
            checkWhetherDestroyed();
            copyInvokers = list(invocation);
            // check again
            checkInvokers(copyInvokers, invocation);
        }
        //通过负载均衡获得目标invoker
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        invoked.add(invoker);//记录已经调用过的服务,下次调用会进行过滤
        RpcContext.getContext().setInvokers((List) invoked);
        try {
   
            //服务调用成功,直接返回结果
            Result result = invoker.invoke(invocation);
            if (le != null && logger.isWarnEnabled()) {
   
                
            }
            return result;
        } catch (RpcException e) {
   
            if (e.isBiz()) {
    // 如果是业务异常,直接抛出不进行重试
                throw e;
            }
            le = e;//记录异常信息,进行下一次循环
        } catch (Throwable e) {
   
            le = new RpcException(e.getMessage(), e);
        } finally {
   
            providers.add(invoker.getUrl().getAddress());
        }
    }
    throw new RpcException(le.getCode());
}

9.负载均衡

1)select

所有负载均衡实现类均继承自 AbstractLoadBalance,该类实现了 LoadBalance 接口,并封装了一些公共的逻辑。所以在分析负载均衡实现之前,先来看一下 AbstractLoadBalance 的逻辑。首先来看一下负载均衡的入口方法 select,如下:

protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
   

    if (CollectionUtils.isEmpty(invokers)) {
   
        return null;
    }
    String methodName = invocation == null ? StringUtils.EMPTY_STRING : invocation.getMethodName();

    boolean sticky = invokers.get(0).getUrl()
            .getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY);

    //ignore overloaded method
    if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
   
        stickyInvoker = null;
    }
    //ignore concurrency problem
    if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
   
        if (availablecheck && stickyInvoker.isAvailable()) {
   
            return stickyInvoker;
        }
    }
    //调用 doSelect 方法进行负载均衡,该方法为抽象方法,由子类实现
    Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);

    if (sticky) {
   
        stickyInvoker = invoker;
    }
    return invoker;
}

2)doSelect

private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
   

    if (CollectionUtils.isEmpty(invokers)) {
   
        return null;
    }
    //如果就一个,就不负载均衡了,直接返回
    if (invokers.size() == 1) {
   
        return invokers.get(0);
    }
    //负载均衡逻辑
    Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

    //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
    if ((selected != null && selected.contains(invoker))
            || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
   
        try {
   
            Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
            if (rInvoker != null) {
   
                invoker = rInvoker;
            } else {
   
                
                int index = invokers.indexOf(invoker);
                try {
   
                   
                    invoker = invokers.get((index + 1) % invokers.size());
                } catch (Exception e) {
   
                    
            }
        } catch (Throwable t) {
   
            
        }
    }
    return invoker;
}

3)select

@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {
   

    @Adaptive("loadbalance")
    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}

这里应该选择的RandomLoadBalance,默认情况下。

4)RandomLoadBalance.doSelect

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
   
    // Number of invokers
    int length = invokers.size();
    // Every invoker has the same weight?
    boolean sameWeight = true;
    // the weight of every invokers
    int[] weights = new int[length];
    // the first invoker's weight
    int firstWeight = getWeight(invokers.get(0), invocation);
    weights[0] = firstWeight;
    // The sum of weights
    int totalWeight = firstWeight;
    for (int i = 1; i < length; i++) {
   
        int weight = getWeight(invokers.get(i), invocation);
        // save for later use
        weights[i] = weight;
        // Sum
        totalWeight += weight;
        if (sameWeight && weight != firstWeight) {
   
            sameWeight = false;
        }
    }
    if (totalWeight > 0 && !sameWeight) {
   
        // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
        int offset = ThreadLocalRandom.current().nextInt(totalWeight);
        // Return a invoker based on the random value.
        for (int i = 0; i < length; i++) {
   
            offset -= weights[i];
            if (offset < 0) {
   
                return invokers.get(i);
            }
        }
    }
    // If all invokers have the same weight value or totalWeight=0, return evenly.
    return invokers.get(ThreadLocalRandom.current().nextInt(length));
}

10.DubboInvoker

1)AbstractInvoker.invoke

这里面也是对Invocation的attachments进行处理,把attachment加入到Invocation中。

这里的attachment,实际上是目标服务的接口信息以及版本信息。

2)DubboInvoker.doInvoker

这里面看到一个很熟悉的东西,就是ExchangeClient,这个是客户端和服务端之间的连接。

然后如果当前方法有返回值,也就是isOneway=false,则执行else逻辑,然后通过异步的形式进行通信。

    protected Result doInvoke(final Invocation invocation) throws Throwable {
   
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
   
            currentClient = clients[0];
        } else {
   
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
   
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = calculateTimeout(invocation, methodName);
            if (isOneway) {
   
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
   
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                result.setExecutor(executor);
                return result;
            }
        } catch (TimeoutException e) {
   
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
   
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

3)currentClient.request

它实际是一个ReferenceCountExchangeClient(HeaderExchangeClient())

所以它的调用链路是:

ReferenceCountExchangeClient->HeaderExchangeClient->HeaderExchangeChannel->(request方法)

最终,把构建好的 RpcInvocation,组装到一个Request对象中进行传递

    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
   
        if (closed) {
   
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
   
            channel.send(req);
        } catch (RemotingException e) {
   
            future.cancel();
            throw e;
        }
        return future;
    }

channel.send的调用链路:

AbstractPeer.send ->AbstractClient.send->NettyChannel.send

通过NioSocketChannel把消息发送出去

ChannelFuture future = channel.writeAndFlush(message);

十五,服务端接收消息的处理流程

客户端把消息发送出去之后,服务端会收到消息,然后把执行的结果返回到客户端

1.服务端接收到消息

服务端这边接收消息的处理链路,也比较复杂,我们回到NettServer中创建io的过程.

bootstrap.group(bossGroup, workerGroup)
       .channel(NioServerSocketChannel.class)
       .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
       .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
       .childOption(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT)
       .childHandler(new ChannelInitializer<NioSocketChannel>() {
   
          @Override
          protected void initChannel(NioSocketChannel ch) throws
Exception {
   
            // FIXME: should we use getTimeout()?
            int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
            NettyCodecAdapter adapter = new
NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
            ch.pipeline()//.addLast("logging",new
LoggingHandler(LogLevel.INFO))//for debug
               .addLast("decoder", adapter.getDecoder())
               .addLast("encoder", adapter.getEncoder())
               .addLast("server-idle-handler", new
IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
               .addLast("handler", nettyServerHandler);
         }
       });

handler配置的是nettyServerHandler

server-idle-handler 表示心跳处理的机制

final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(),this);

Handler与Servlet中的filter很像,通过Handler可以完成通讯报文的解码编码、拦截指定的报文、统一对日志错误进行处理、统一对请求进行计数、控制Handler执行与否。

2.handler.channelRead()

服务端收到读的请求是,会进入这个方法。

接着通过handler.received来处理msg,这个handle的链路很长,比较复杂,我们需要逐步剖析

@Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
   
    
      NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url,handler);
    try {
   
      handler.received(channel, msg);
   } finally {
   
      NettyChannel.removeChannelIfDisconnected(ctx.channel());
   }
 }

handler->MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler-HeaderExchangeHandler->最后进入这个方法->DubboProtocol$requestHandler(receive)

MultiMessageHandler: 复合消息处理

HeartbeatHandler:心跳消息处理,接收心跳并发送心跳响应

AllChannelHandler:业务线程转化处理器,把接收到的消息封装成ChannelEventRunnable可执行任务给线程池处理

DecodeHandler:业务解码处理器

3.HeaderExchangeHandler.received

交互层请求响应处理,有三种处理方式

  1. handlerRequest,双向请求
  2. handler.received 单向请求
  3. handleResponse 响应消息
public void received(Channel channel, Object message) throws RemotingException {
   
   
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    final ExchangeChannel exchangeChannel =HeaderExchangeChannel.getOrAddChannel(channel);
    try {
   
      if (message instanceof Request) {
   
        // handle request.
        Request request = (Request) message;
        if (request.isEvent()) {
   
          handlerEvent(channel, request);
       } else {
   
          if (request.isTwoWay()) {
   
            handleRequest(exchangeChannel, request);
         } else {
   
            handler.received(exchangeChannel, request.getData());
         }
       }
     } else if (message instanceof Response) {
   
        handleResponse(channel, (Response) message);
     } else if (message instanceof String) {
   
        if (isClientSide(channel)) {
   
          Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
          logger.error(e.getMessage(), e);
       } else {
   
          String echo = handler.telnet(channel, (String) message);
          if (echo != null && echo.length() > 0) {
   
            channel.send(echo);
         }
       }
     } else {
   
        handler.received(exchangeChannel, message);
     }
   } finally {
   
      HeaderExchangeChannel.removeChannelIfDisconnected(channel);
   }
 }

4.ExchangeHandler.reply

接着进入到ExchangeHandler.reply这个方法中

  • 把message转化为Invocation
  • 调用getInvoker获得一个Invoker对象
  • 然后通过 Result result = invoker.invoke(inv); 进行调用
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
   
            if (!(message instanceof Invocation)) {
   
                throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            } else {
   
                Invocation inv = (Invocation)message;
                Invoker<?> invoker = DubboProtocol.this.getInvoker(channel, inv);
                if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get("_isCallBackServiceInvoke"))) {
   
                    String methodsStr = (String)invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr != null && methodsStr.contains(",")) {
   
                        String[] methods = methodsStr.split(",");
                        String[] var8 = methods;
                        int var9 = methods.length;

                        for(int var10 = 0; var10 < var9; ++var10) {
   
                            String method = var8[var10];
                            if (inv.getMethodName().equals(method)) {
   
                                hasMethod = true;
                                break;
                            }
                        }
                    } else {
   
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    }

                    if (!hasMethod) {
   
                        DubboProtocol.this.logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }

                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                Result result = invoker.invoke(inv);
                return result.thenApply(Function.identity());
            }
        }

5.getInvoker

这里面是获得一个invoker的实现DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

这段代码非常熟悉,exporterMap不就是我们之前在分析服务发布的过程中,保存的Invoker吗?而key,就是对应的interface:port

    Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
   
        boolean isCallBackServiceInvoke = false;
        boolean isStubServiceInvoke = false;
        int port = channel.getLocalAddress().getPort();
        String path = (String)inv.getObjectAttachments().get("path");
        isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getObjectAttachments().get("dubbo.stub.event"));
        if (isStubServiceInvoke) {
   
            port = channel.getRemoteAddress().getPort();
        }

        isCallBackServiceInvoke = this.isClientSide(channel) && !isStubServiceInvoke;
        if (isCallBackServiceInvoke) {
   
            path = path + "." + inv.getObjectAttachments().get("callback.service.instid");
            inv.getObjectAttachments().put("_isCallBackServiceInvoke", Boolean.TRUE.toString());
        }

        String serviceKey = serviceKey(port, path, (String)inv.getObjectAttachments().get("version"), (String)inv.getObjectAttachments().get("group"));
        DubboExporter<?> exporter = (DubboExporter)this.exporterMap.get(serviceKey);
        if (exporter == null) {
   
            throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + this.exporterMap.keySet() + ", may be version or group mismatch , channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + this.getInvocationWithoutData(inv));
        } else {
   
            return exporter.getInvoker();
        }
    }

6.exporterMap

Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();

在服务发布时,实际上是把invoker包装成了DubboExpoter。然后放入到exporterMap中。

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
   
        URL url = invoker.getUrl();
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter(invoker, key, this.exporterMap);
        this.exporterMap.put(key, exporter);
        Boolean isStubSupportEvent = url.getParameter("dubbo.stub.event", false);
        Boolean isCallbackservice = url.getParameter("is_callback_service", false);
        if (isStubSupportEvent && !isCallbackservice) {
   
            String stubServiceMethods = url.getParameter("dubbo.stub.event.methods");
            if ((stubServiceMethods == null || stubServiceMethods.length() == 0) && this.logger.isWarnEnabled()) {
   
                this.logger.warn(new IllegalStateException("consumer [" + url.getParameter("interface") + "], has set stubproxy support event ,but no stub methods founded."));
            }
        }

        this.openServer(url);
        this.optimizeSerialization(url);
        return exporter;
    }

7.invoker.invoke(inv)

接着调用invoker.invoke

此时的invoker是一个什么呢?

invoker=ProtocolFilterWrapper(InvokerDelegate(DelegateProviderMetaDataInvoker(AbstractProxyInvoker)))

最后一定会进入到这个代码里面

8.AbstractProxyInvoker

在AbstractProxyInvoker里面,doInvoker本质上调用的是wrapper.invokeMethod()

return new AbstractProxyInvoker<T>(proxy, type, url) {
   
      @Override
      protected Object doInvoke(T proxy, String methodName,
                   Class<?>[] parameterTypes,
                   Object[] arguments) throws Throwable {
   
        return wrapper.invokeMethod(proxy, methodName, parameterTypes,arguments);
     }
   };

而Wrapper是一个动态代理类,它的定义是这样的, 最终调用w.sayHello()方法进行处理

到此为止,服务端的处理过程就分析完了。

十六,性能调优相关参数

1.常用的性能调优参数

参数名 作用范围 说明 默认值 备注
threads provider 业务处理线程池大小 200
iothreads provider io线程池大小 cpu个数+1
queues provider 线程池队列大小,当线程池满时,排队等待执行的队列大小,建议不要设置,当线程池时应立即失败,重试其他服务提供机器,而不是排队,除非有特殊需求。 0
connections consumer 对每个提供者的最大连接数,rmi,http,hession等短连接协议表示限制连接数,Dubbo等长连接表示简历的长连接个数 0 Dubbo协议默认共享一个长连接
actives consumer 每个服务消费者每个方法的对大并发调用数 0 0表示不限制
accepts provider 服务提供方最大可接收连接数 0 0表示不限制
executes provider 服务提供者每个服务每个方法最大可并行执行请求数 0 0表示不限制

2.各个参数的作用

  1. 当consumer发起一个请求时,首先经过active limit(参数actives)进行方法级别的限制,其实现方式为CHM中存放计数器(AtomicInteger),请求时加1,请求完成(包括异常)减1,如果超过actives则等待有其他请求完成后重试或者超时后失败;
  2. 从多个连接(connections)中选择一个连接发送数据,对于默认的netty实现来说,由于可以复用连接,默认一个连接就可以。不过如果你在压测,且只有一个consumer,一个provider,此时适当的加大connections确实能够增强网络传输能力。但线上业务由于有多个consumer多个provider,因此不建议增加connections参数;
  3. 连接到达provider时(如dubbo的初次连接),首先会判断总连接数是否超限(acceps),超过限制连接将被拒绝;
  4. 连接成功后,具体的请求交给io thread处理。io threads虽然是处理数据的读写,但io部分为异步,更多的消耗的是cpu,因此iothreads默认cpu个数+1是比较合理的设置,不建议调整此参数;
  5. 数据读取并反序列化以后,交给业务线程池处理,默认情况下线程池为fixed,且排队队列为0(queues),这种情况下,最大并发等于业务线程池大小(threads),如果希望有请求的堆积能力,可以调整queues参数。如果希望快速失败由其他节点处理(官方推荐方式),则不修改queues,只调整threads;
  6. execute limit(参数executes)是方法级别的并发限制,原理与actives类似,只是少了等待的过程,即受限后立即失败

转载:https://blog.csdn.net/weixin_45596022/article/details/115561788
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场