飞道的博客

nacos源码分析==客户端从服务端读取配置文件-服务端服务注册

779人阅读  评论(0)

客户端从服务端读取配置文件

客户端启动的时候会扫描到boostrap.yml中的信息,扫描到标签@ConditionalOnProperty会将NacosConfigBootstrapConfiguration 中的bean注入。其中NacosConfigProperties就是读取的boostrap.yml中spring.cloud.nacos.config下的配置项。NacosConfigBootstrapConfiguration创建除了NacosPropertySourceLocator对象并注入容器。


  
  1. @Configuration(proxyBeanMethods = false)
  2. @ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)
  3. public class NacosConfigBootstrapConfiguration {
  4. @Bean
  5. @ConditionalOnMissingBean
  6. //扫描bootstrap.yml中的spring.cloud.nacos.config下的配置项
  7. public NacosConfigProperties nacosConfigProperties () {
  8. return new NacosConfigProperties();
  9. }
  10. @Bean
  11. @ConditionalOnMissingBean
  12. //得到NacosConfigService
  13. public NacosConfigManager nacosConfigManager (
  14. NacosConfigProperties nacosConfigProperties) {
  15. return new NacosConfigManager(nacosConfigProperties);
  16. }
  17. @Bean
  18. //创建处NacosPropertySourceLocator
  19. public NacosPropertySourceLocator nacosPropertySourceLocator (
  20. NacosConfigManager nacosConfigManager) {
  21. return new NacosPropertySourceLocator(nacosConfigManager);
  22. }
  23. /**
  24. * Compatible with bootstrap way to start.
  25. */
  26. @Bean
  27. @ConditionalOnMissingBean(search = SearchStrategy.CURRENT)
  28. @ConditionalOnNonDefaultBehavior
  29. public ConfigurationPropertiesRebinder smartConfigurationPropertiesRebinder (
  30. ConfigurationPropertiesBeans beans) {
  31. // If using default behavior, not use SmartConfigurationPropertiesRebinder.
  32. // Minimize te possibility of making mistakes.
  33. return new SmartConfigurationPropertiesRebinder(beans);
  34. }
  35. }

客户端启动的时候会与服务端建立连接(其实是为了去服务端拉取配置文件,当然会同时把连接创建了),调用栈如下,可见会执行到之前注入的NacosPropertySourceLocator中。


  
  1. start:278, RpcClient (com.alibaba.nacos.common.remote.client)
  2. ensureRpcClient:885, ClientWorker$ConfigRpcTransportClient (com.alibaba.nacos.client.config.impl)
  3. getOneRunningClient:1044, ClientWorker$ConfigRpcTransportClient (com.alibaba.nacos.client.config.impl)
  4. queryConfig:940, ClientWorker$ConfigRpcTransportClient (com.alibaba.nacos.client.config.impl)
  5. getServerConfig:397, ClientWorker (com.alibaba.nacos.client.config.impl)
  6. getConfigInner:166, NacosConfigService (com.alibaba.nacos.client.config)
  7. getConfig:94, NacosConfigService (com.alibaba.nacos.client.config)
  8. loadNacosData:85, NacosPropertySourceBuilder (com.alibaba.cloud.nacos.client)
  9. build:73, NacosPropertySourceBuilder (com.alibaba.cloud.nacos.client)
  10. loadNacosPropertySource:199, NacosPropertySourceLocator (com.alibaba.cloud.nacos.client)
  11. loadNacosDataIfPresent:186, NacosPropertySourceLocator (com.alibaba.cloud.nacos.client)
  12. loadNacosConfiguration:158, NacosPropertySourceLocator (com.alibaba.cloud.nacos.client)
  13. loadSharedConfiguration:116, NacosPropertySourceLocator (com.alibaba.cloud.nacos.client)
  14. locate:101, NacosPropertySourceLocator (com.alibaba.cloud.nacos.client)
  15. locateCollection:51, PropertySourceLocator (org.springframework.cloud.bootstrap.config)
  16. locateCollection:47, PropertySourceLocator (org.springframework.cloud.bootstrap.config)
  17. initialize:95, PropertySourceBootstrapConfiguration (org.springframework.cloud.bootstrap.config)
  18. applyInitializers:604, SpringApplication (org.springframework.boot)
  19. prepareContext:373, SpringApplication (org.springframework.boot)
  20. run:306, SpringApplication (org.springframework.boot)
  21. run:1303, SpringApplication (org.springframework.boot)
  22. run:1292, SpringApplication (org.springframework.boot)
  23. main:22, RuoYiSystemApplication (com.ruoyi.system)

起始点是PropertySourceBootstrapConfiguration, 这个类是由could-context注入容器,

又因为PropertySourceBootstrapConfiguration实现了ApplicationContextInitializer接口,所以会在springboot启动过程(refresh方法前)被调用到(prepareContext-applyInitializers)它的initialize方法,会根据bootstarp.yml中指定的dev去加载对应的配置文件。initialize方法会调用之前已经注入的NacosPropertySourceLocator,

 com.alibaba.cloud.nacos.client.NacosPropertySourceLocator#locate

该方法会一次加载三个配置文件

loadSharedConfiguration(composite);第一配置文件application-dev.yml 
loadExtConfiguration(composite);这里没配置就没有
loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);第三配置文件ruoyi-system-dev.yml

加载的方法都差不多,发起请求去server端获取

com.alibaba.cloud.nacos.client.NacosPropertySourceBuilder#loadNacosData

com.alibaba.nacos.client.config.NacosConfigService#getConfigInner

com.alibaba.nacos.client.config.impl.ClientWorker#getServerConfig

com.alibaba.nacos.client.config.impl.ClientWorker.ConfigRpcTransportClient#queryConfig

queryConfig方法中会创建RpcClient并调用start方法创建与服务端的连接,已有就拿来用。

com.alibaba.nacos.client.config.impl.ClientWorker.ConfigRpcTransportClient#ensureRpcClient


  
  1. RpcClient rpcClient = RpcClientFactory
  2. .createClient(uuid + "_config-" + taskId, getConnectionType(), newLabels);
  3. if (rpcClient.isWaitInitiated()) {
  4. initRpcClientHandler(rpcClient);
  5. rpcClient.setTenant(getTenant());
  6. rpcClient.clientAbilities(initAbilities());
  7. rpcClient.start();
  8. }

com.alibaba.nacos.common.remote.client.RpcClient#start

在start方法中,创建了一个线程池,提交了两个死循环的任务。

queryConfig方法中去请求服务端拿取配置文件:

ConfigQueryResponse response = (ConfigQueryResponse) requestProxy(rpcClient, request, readTimeouts);

com.alibaba.nacos.common.remote.client.grpc.GrpcConnection#request该方法会在请求body中加入一个type字段,实际就是request的类名,获取配置文件的话就是ConfigQueryRequest。

读取到配置文件之后将配置项设置到环境变量中,其他组件就能读取到了

com.alibaba.cloud.nacos.client.NacosPropertySourceLocator#addFirstPropertySource

org.springframework.core.env.CompositePropertySource#addFirstPropertySource

来自客户端的请求,访问到服务端的handler为

com.alibaba.nacos.config.server.remote.ConfigQueryRequestHandler#handle

这个方法的由来为nacos服务端启动时,注入了GrpcSdkServer,它实现了BaseRpcServer,BaseRpcServer中有个@PostConstruct标注的start方法

com.alibaba.nacos.core.remote.BaseRpcServer#start

com.alibaba.nacos.core.remote.grpc.BaseGrpcServer#startServer


  
  1. @Override
  2. public void startServer () throws Exception {
  3. final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
  4. //指定处理请求的handler
  5. addServices(handlerRegistry, new GrpcConnectionInterceptor());
  6. server = ServerBuilder.forPort(getServicePort()).executor(getRpcExecutor())
  7. .maxInboundMessageSize(getMaxInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
  8. .compressorRegistry(CompressorRegistry.getDefaultInstance())
  9. .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
  10. .addTransportFilter( new AddressTransportFilter(connectionManager))
  11. .keepAliveTime(getKeepAliveTime(), TimeUnit.MILLISECONDS)
  12. .keepAliveTimeout(getKeepAliveTimeout(), TimeUnit.MILLISECONDS)
  13. .permitKeepAliveTime(getPermitKeepAliveTime(), TimeUnit.MILLISECONDS)
  14. .build();
  15. server.start();
  16. }

会创建一个io.grpc的Server,这个是个抽象接口,实现类为ServerImpl,ServerImpl中有个InternalServer,InternalServer因为引入了netty,所以实现类为NettyServer 。并且调用addServices方法指定处理请求的acceptor为GrpcRequestAcceptor

com.alibaba.nacos.core.remote.grpc.BaseGrpcServer#addServices

 com.alibaba.nacos.core.remote.grpc.GrpcRequestAcceptor#request

然后根据请求参数中带的type=ConfigQueryRequest,从服务端的持有的registryHandlers

Map<String, RequestHandler> registryHandlers = new HashMap<>();

中选出本次要用的ConfigQueryRequestHandler。

com.alibaba.nacos.core.remote.RequestHandler#handleRequest

com.alibaba.nacos.config.server.remote.ConfigQueryRequestHandler#handle

com.alibaba.nacos.config.server.remote.ConfigQueryRequestHandler#getContext

看来是提前将配置文件缓存到了服务端本地而没有放在数据库里。

 

=======================================================

断点设置

从上可以看到,从客户端发到服务端的消息,肯定要经过客户端的com.alibaba.nacos.common.remote.client.grpc.GrpcUtils#convert(com.alibaba.nacos.api.remote.request.Request)

所以只要在这打个断点,就知道发往服务端的是什么,根据type参数也知道同时服务端应该用哪个handler来处理。 后面发现服务注册竟然没走这个接口,她在自己的方法里new了个请求对象,估计不是同一个人写的,该统一转换吧。。。

到达服务端的请求肯定要经过服务端的com/alibaba/nacos/core/remote/grpc/GrpcRequestAcceptor.java:96

服务注册走了这

从这能看到服务端谁来处理。大概都是type+Handler。

=================

服务端服务注册

客户端发送注册请求

com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#requestToServer

 

服务端收到注册请求

com.alibaba.nacos.naming.remote.rpc.handler.InstanceRequestHandler#handle

发现报错了

 

 原因是我之前将这个微服务设置成了非临时实例,而现在我将它设置成了临时实例,因为临时实例才是走GRPC注册,否者是走InstancController去了,而我想看下GRPC。解决方法是停掉NACOS,将存在本地的METAdata删掉,我这是

"D:\prj_idea\Nacos\distribution\data\protocol\raft\

重启后解决。

 

IP和端口都被服务端拿到了,然后发布两个事件ClientRegisterServiceEvent  和InstanceMetadataEvent。事件发布后告知客户端注册成功,剩下的事件在服务端异步处理。

 


  
  1. @Override
  2. public void registerInstance (Service service, Instance instance, String clientId) throws NacosException {
  3. //检查合法性,之前报错就从这出的
  4. NamingUtils.checkInstanceIsLegal(instance);
  5. Service singleton = ServiceManager.getInstance().getSingleton(service);
  6. if (!singleton.isEphemeral()) {
  7. throw new NacosRuntimeException(NacosException.INVALID_PARAM,
  8. String.format( "Current service %s is persistent service, can't register ephemeral instance.",
  9. singleton.getGroupedServiceName()));
  10. }
  11. //更改该客户端对应的client对象
  12. Client client = clientManager.getClient(clientId);
  13. if (!clientIsLegal(client, clientId)) {
  14. return;
  15. }
  16. InstancePublishInfo instanceInfo = getPublishInfo(instance);
  17. client.addServiceInstance(singleton, instanceInfo);
  18. client.setLastUpdatedTime();
  19. client.recalculateRevision();
  20. //通知我注册完成了
  21. NotifyCenter.publishEvent( new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
  22. //通知我的元信息可能变了
  23. NotifyCenter
  24. .publishEvent( new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
  25. }

上面那两个事件在哪被处理?

第一个ClientRegisterServiceEvent

com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#onEvent

 
ClientServiceIndexesManager这个类,里面两个map,估计是把注册上来的客户端的某些信息存在这,并且维护起来

存到map中后,又发了个事件ServiceChangedEvent,这个时间被NamingSubscriberServiceV2Impl监听。可以发现SmartSubscriber 、Subscriber

的实现类都是监听类

com.alibaba.nacos.naming.push.v2.NamingSubscriberServiceV2Impl#onEvent

将发生变化的客户端的最新信息推送给 订阅该客户端的其他客户端


  
  1. @Override
  2. public void onEvent (Event event) {
  3. if (event instanceof ServiceEvent.ServiceChangedEvent) {
  4. // If service changed, push to all subscribers.
  5. ServiceEvent. ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
  6. Service service = serviceChangedEvent.getService();
  7. delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
  8. MetricsMonitor.incrementServiceChangeCount(service.getNamespace(), service.getGroup(), service.getName());
  9. } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
  10. // If service is subscribed by one client, only push this client.
  11. ServiceEvent. ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
  12. Service service = subscribedEvent.getService();
  13. //将发生变化的客户端的信息推送给订阅该客户端的其他客户端
  14. delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
  15. subscribedEvent.getClientId()));
  16. }
  17. }

 


转载:https://blog.csdn.net/hebian1994/article/details/128748424
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场