前言
还记得我们在 服务端增删改配置数据之后如何通知集群中的其他机器 中分析了服务端之间的相互通知修改数据, 当时结尾的时候,留下了以下问题,我们将在这篇文章中来给解析一下;
- 客户端是如何订阅服务端的?
- 客户端与服务端直接是长连接还是短连接?
- 客户端与服务端是推还是拉?
ConfigService.addListener 订阅配置消息
在【Nacos源码之配置管理 九】客户端获取配置数据的流程 中我们知道了客户端获取配置数据的整个流程;也知道了客户端的Nacos配置服务类NacosConfigService ; 这个配置服务类在初始化的时候初始化了一些基本的属性,还有一些重要的实例 比如
- ServerHttpAgent :是一个请求集群http的代理类;它持有实例ServerListManager(集群列表管理类,健康检查等等操作);
- ClientWorker: 客户端工作实例类; 它负责调用ServerHttpAgent来发起请求像服务端请求数据,已经监听配置数据的变更等等;
在【Nacos源码之配置管理 九】客户端获取配置数据的流程 中只是说明了主动发起获取数据的流程; 但是客户端最重要的功能还是轮询和订阅功能;
所以下面主要讲一下 客户端如何订阅服务端数据;
订阅指定配置数据的变更
代码中addListener表示对指定的配置监听,如果消息有变更了,则执行方法receiveConfigInfo
; 当然可以选择是否用异步执行的; getExecutor
方法是可以自定义线程池来执行方法 receiveConfigInfo
; 如果是返回null的话,那么就是用主线程同步执行的;
addTenantListeners
ConfigService.addListener调用之后最终是执行了这里的方法,但是看这里的方法似乎只是将listeners放入到了配置数据的缓存CacheData
中; 那什么时候会通知到我们的监听器呢?那我们得看看哪里调用了我们这个listeners了;
ClientWork 客户端
看看clientwork初始化的代码
主要看看里面的checkConfigInfo()
最终执行的是
LongPollingRunnable 长轮询任务类
LongPollingRunnable是一个长轮询的任务类,他负责不断的去对比服务端的配置数据的MD5与自身是否一致,如果不一致,则会发起请求去服务端获取最新数据来更新客户端的缓存数据;
我们看下主要代码:
public void run() {
//公众号: 进击的老码农 个人微信: jjdlmn_
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
// check failover config
for (CacheData cacheData : cacheMap.get().values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
LOGGER.error("get local config info error", e);
}
}
}
// check server config
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
String content = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(content);
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}",
agent.getName(), dataId, group, tenant, cache.getMd5(),
ContentUtils.truncateContent(content));
} catch (NacosException ioe) {
String message = String.format(
"[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
agent.getName(), dataId, group, tenant);
LOGGER.error(message, ioe);
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
executorService.execute(this);
} catch (Throwable e) {
// If the rotation training task is abnormal, the next execution time of the task will be punished
LOGGER.error("longPolling error : ", e);
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
检查本地配置,使用本地配置直接回调监听类的方法
我们先看下面的做了什么
- 如果客户端没有使用本地配置(状态是
useLocalConfigInfo=false
;),但是本地配置的文件存在,则将客户端的缓存更新为本地配置的文件;(一般是初始化或者运行过程中创建了本地配置文件) ;状态变更为useLocalConfigInfo=true
; - 如果使用的是本地配置(状态是
useLocalConfigInfo=true
;)但是本地配置不存在了(不想用本地配置了,删除了),则将useLocalConfigInfo=false
;这个时候还不会更新客户端的缓存;会等到后面去服务端请求数据获取; - 如果使用的是本地配置(状态是
useLocalConfigInfo=true
;)并且本地配置文件也存在; 但是发现本地配置文件有更新;则将最新的本地配置文件缓存到内存中;(怎么判断文件有更新呢?通过path.lastModified()
文件的最后修改时间对比) - 如果使用了本地配置(状态是
useLocalConfigInfo=true
;)则对有变化的配置数据发起通知; CacheData中的md5与wrap中的lastCallMd5作对比判断是否数据有更新
- 如果数据有变化,则通知所有的监听类; 会调用监听类的
receiveConfigInfo(content )
方法,当然如果配置了getExecutor()
返回不是空的话,就会用这个返回的线程池来执行;如果是null;则用主线程同步调用;
checkUpdateDataIds获取有变化的配置数据列表
这个方法只是将所有 不是使用本地配置的的 配置数据group、dataid 拼接起来可能更新的字符串 probeUpdateString;真正做处理的还是checkUpdateConfigStr()
方法;
//公众号: 进击的老码农
/**
* 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
*/
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {
List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
List<String> headers = new ArrayList<String>(2);
headers.add("Long-Pulling-Timeout");
headers.add("" + timeout);
// told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) {
headers.add("Long-Pulling-Timeout-No-Hangup");
headers.add("true");
}
if (StringUtils.isBlank(probeUpdateString)) {
return Collections.emptyList();
}
try {
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
agent.getEncode(), timeout);
if (HttpURLConnection.HTTP_OK == result.code) {
setHealthServer(true);
return parseUpdateDataIdResponse(result.content);
} else {
setHealthServer(false);
LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code);
}
} catch (IOException e) {
setHealthServer(false);
LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
throw e;
}
return Collections.emptyList();
}
这个方法主要就是发起http请求,从而拿到哪些的group+dataid的配置项是发生了变化的;
注意这里不是拿具体内容content; 上面拼接的 probeUpdateString是所有客户端监听的所有配置项(除了使用了本地配置);
主要是拿到变更的group+dataid才好去发起获取content的请求
-
配置请求Head中的参数
①.Long-Pulling-Timeout
: 长轮询的超时时间,这个值默认30秒;可以通知配置文件中配置configLongPollTimeout=30000
来自定义设置; 最少时间是10秒;就算你设置了9秒,它的超时时间也是10秒;
②.Long-Pulling-Timeout-No-Hangup = true/false
: 如果有第一次获取数据的配置则设置 true; 表示不挂起! 立刻返回;(服务端如果发现数据没有变更,则会将请求挂起,等等29.5秒才会返回,后面会详细介绍) -
配置请求param中的参数
①、Listening-Configs = probeUpdateString
表示监听的配置项; -
向服务端发起请求:
currentServerAddr/v1/cs/configs/listener
; -
请求成功
HTTP_OK = 200
;设置当前访问的服务端健康;并返回获取到的有变更后的数据group、dataid; -
请求失败, 设置当前访问的服务端不健康,下次发起请求会换一个集群中的服务端;
第4步骤看起来很简单,就是发起一个http请求获取有变更的配置项;但是这里其实很有文章,下面会介绍到
拿到变更配置项之后,发起获取配置数据content请求
上面的步骤让我们拿到了改变过的配置项changedGroupKeys
;这个时候是没有拿到具体的数据内容content
的; 代码不贴了,概述一下流程
-
发起获取配置数据内容
getServerConfig(dataId, group, tenant, 3000L)
;
这一步骤就是【Nacos源码之配置管理 九】客户端获取配置数据的流程 中讲的;就不详细说了; -
拿到content之后更新缓存
-
回调监听类的接口
receiveConfigInfo(content )
executorService.execute(this);重新执行一遍
可以看到重新把这个任务放到线程池中取执行了; 那么我们就郁闷了,这不就是不停的去发情请求吗?这样不会有性能问题吗?
话是这么说,这个就是长轮询的方式发起请求; 不停的去请求;但是也不是想象中 发起之后里面发起;
因为发起一个请求,如果配置没有变更的话,服务端会将请求挂起,直到快到超时时间或者直到配置数据有变更才会返回数据并结束连接;
.
总结
addListener
会将监听的dataId, group 缓存在对象CacheData
中;并且将Listener包装一下作为CacheData
属性- 长轮询任务
LongPollingRunnable
一直在执行; - 检查是否使用本地配置,如果使用本地配置并且配置有修改;则更新缓存
CacheData
的值; 并且回调监听类的接口receiveConfigInfo
- 如果没有使用本地配置,则向服务端发起请求:
currentServerAddr/v1/cs/configs/listener
;获取有变更过的数据dataid、group 列表 - 拿到4中返回的数据去获取具体数据内容content;
- 一直执行 3、4、5中的数据
转载:https://blog.csdn.net/u010634066/article/details/100746229