本文基于dubbo 2.7.5版本代码
dubbo将监控独立出一个监控层,但是我认为监控只能算作是dubbo的一个功能,可以监控服务的健康程度或者做服务治理,监控功能不是dubbo必须的,可以不开启监控。而且在运行过程中,监控宕机,也不会影响服务访问。
dubbo提供了一个简单的监控中心,可以参考下面的文章。但这个监控中心只能满足简单监控功能,对于复杂的需要自行开发。本文对监控中心不做过多介绍。
https://www.cnblogs.com/jtlgb/p/8761244.html
一、总述
dubbo对服务端和客户端都进行了监控,监控内容主要是包含以下四大类信息:
- 服务调用或者服务请求的基本信息,包括服务地址,端口,服务名,方法名等;
- 服务调用消耗时间;
- 服务请求数据大小或者服务返回值大小;
- 服务请求的结果,成功或者失败;
- 服务方法的总调用次数。
监控功能主要由MonitorFilter实现,该类实现了Filter接口。在dubbo架构中,MonitorFilter可以拦截所有客户端发出请求或者服务端处理请求。
我的理解:从功能上来说,MonitorFilter不能算是过滤器,因为它不过滤任何东西,相反它是要做拦截,因此我觉得MonitorFilter更合适叫做拦截器。
二、MonitorFilter应用原理
MonitorFilter的定义如下:
@Activate(group = {PROVIDER, CONSUMER})
public class MonitorFilter implements Filter, Filter.Listener {
...//代码省略
}
MonitorFilter实现了Filter和Filter.Listener接口,该类即是一个过滤器也是一个监听器。
该类有注解@Activate,说明该类需要由ExtensionLoader.getActivateExtension方法加载,group既有PROVIDER又有CONSUMER,说明该类既可以在服务端使用,也可以在客户端使用。@Activate没有设置value,会被ExtensionLoader.getActivateExtension搜索到并加载。
MonitorFilter是在ProtocolFilterWrapper中被创建并被应用到过滤器链中的,下面我们看一下MonitorFilter是如何被应用到过滤器链中的。
先说服务端,服务端启动的时候需要对外暴露服务:
Protocol$Adaptive类是dubbo使用代码生产工具生成的,内部使用ExtensionLoader加载Protocol实现类,例如使用dubbo协议暴露服务,那么Protocol实现类便是DubboProtocol,ProtocolFilterWrapper作为包装类,会对DubboProtocol封装,所以要调用DubboProtocol的export方法,首先调用ProtocolFilterWrapper的export方法。ProtocolFilterWrapper的export方法如下:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
...//代码有删减
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));//protocol是Protocol实现类,比如DubboProtocol
}
//buildInvokerChain方法用于构建过滤器链,
//入参invoker是对外提供服务的对象(也就是@Service注解的类)的代理
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
//使用SPI加载Filter集合,其中便有MonitorFilter
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
//遍历Filter
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
...//代码有删减
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
//当访问invoker的时候,需要先访问filter的invoke方法
//最后一个filter对象调用服务获取返回值。
//asyncResult便是服务的返回值
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
if (filter instanceof ListenableFilter) {// Deprecated!
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onError(e, invoker, invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
listener.onError(e, invoker, invocation);
}
throw e;
} finally {
}
return asyncResult.whenCompleteWithContext((r, t) -> {
if (filter instanceof ListenableFilter) {// Deprecated!
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
if (t == null) {
listener.onMessage(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
}
//MonitorFilter实现了Filter.Listener接口
else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
//t为Exception对象,如果为null,表示服务调用成功
if (t == null) {
listener.onMessage(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
} else {// Deprecated!
filter.onResponse(r, invoker, invocation);
}
});
}
...//代码有删减
};
}
}
return last;
}
buildInvokerChain的返回值是Invoker对象,当收到客户端请求时,便会调用这个Invoker对象的invoke方法,从上面的代码可以看出,invoke方法首先访问各个Filter的invoke方法,最终调用到入参invoker对象上,入参invoker对象便是最终提供服务的对象。
从上面的流程可以看出,每次服务端收到请求时,先被MonitorFilter拦截,收集信息后,才会调用后面的过滤器。
客户端将MonitorFilter加入到过滤器链中,也是使用buildInvokerChain完成的,与服务端的区别是:
- 客户端启动时,要创建远程服务代理(服务代理是在ReferenceConfig的createProxy方法完成的),此时需要调用Protocol$Adaptive的refer方法,进而调用到ProtocolFilterWrapper的refer方法,在refer方法中调用buildInvokerChain。buildInvokerChain中使用过滤器将真正通过网络访问服务端的Invoker对象封装。
- 因为buildInvokerChain使用过滤器对Invoker对象的封装,客户端是在访问远程服务时,都会被MonitorFilter拦截。
三、MonitorFilter方法介绍
前面描述了MonitorFilter是如何被创建,如何对服务端和客户端进行拦截。下面我们深入到MonitorFilter内部,对MonitorFilter的每个方法进行分析。
1、invoke方法
当客户端访问远程服务或者服务端收到请求时,都会调用MonitorFilter的invoke方法,该方法比较简单,只是收集两个信息:服务调用开始时间和服务方法调用次数。
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
//检查是否配置monitor参数
if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
//将开始时间插入到invocation中,为后面统计服务耗时使用
//客户端和服务端对服务耗时的统计有区别:客户端统计包括了网络通讯时间,
//服务端统计时间仅仅是服务的运行时间,两者相减就是网络通讯时间
invocation.put(MONITOR_FILTER_START_TIME, System.currentTimeMillis());
//以接口+方法名为key,访问服务时计数器加一,服务返回后减一,用于统计当前有多少个客户端在访问同一个服务方法
getConcurrent(invoker, invocation).incrementAndGet(); // count up
}
//下面是调用后续的过滤器或者访问远程服务
return invoker.invoke(invocation);
}
如果使用监控功能,必须有如下配置,否则上面对“monitor”检查会失败:
dubbo.monitor.address=dubbo://127.0.0.1:30004/MonitorService 或
dubbo.monitor.protocol=注册中心的id
代码getConcurrent如下:
private AtomicInteger getConcurrent(Invoker<?> invoker, Invocation invocation) {
//以接口+方法名为key
String key = invoker.getInterface().getName() + "." + invocation.getMethodName();
//获取计数器
AtomicInteger concurrent = concurrents.get(key);
if (concurrent == null) {
concurrents.putIfAbsent(key, new AtomicInteger());
concurrent = concurrents.get(key);
}
return concurrent;
}
2、onMessage和onError
MonitorFilter实现了Filter.Listener接口,onMessage和onError便是该接口要求实现的两个方法。这两个方法用于监听服务返回值,代码可以参见上文ProtocolFilterWrapper的buildInvokerChain方法。
onMessage和onError两个方法比较类似,唯一的区别是,服务访问成功调用onMessage,失败则调用onError。
@Override
public void onMessage(Result result, Invoker<?> invoker, Invocation invocation) {
if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
//收集信息
collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), (long) invocation.get(MONITOR_FILTER_START_TIME), false);
//计数器减一
getConcurrent(invoker, invocation).decrementAndGet(); // count down
}
}
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), (long) invocation.get(MONITOR_FILTER_START_TIME), true);
getConcurrent(invoker, invocation).decrementAndGet(); // count down
}
}
onMessage和onError两个方法都调用collect方法收集信息。
3、collect
collect方法的最后一个参数error用于区分是onMessage调用还是onError调用。
private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
try {
//获取配置的监控中心url
URL monitorUrl = invoker.getUrl().getUrlParameter(MONITOR_KEY);
//根据url获取对应的监控中心对象,这里代码不在展示
//getMonitor方法的原理是异步线程创建Monitor对象,调用的是
//DubboMonitorFactory.createMonitor方法,因为是异步,所以第一次调用时,
//Monitor对象可能是null,这种情况下,调用信息不在统计
Monitor monitor = monitorFactory.getMonitor(monitorUrl);
if (monitor == null) {
return;
}
//统计信息
URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error);
//下一篇文章介绍Monitor
monitor.collect(statisticsURL);
} catch (Throwable t) {
//处理监控信息过程中失败,不影响使用
logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
}
}
private URL createStatisticsUrl(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
//服务访问耗时
long elapsed = System.currentTimeMillis() - start; // invocation cost
//服务方法被调用次数
int concurrent = getConcurrent(invoker, invocation).get();
String application = invoker.getUrl().getParameter(APPLICATION_KEY);
String service = invoker.getInterface().getName(); // service name
String method = RpcUtils.getMethodName(invocation); // method name
String group = invoker.getUrl().getParameter(GROUP_KEY);
String version = invoker.getUrl().getParameter(VERSION_KEY);
int localPort;
String remoteKey, remoteValue;
if (CONSUMER_SIDE.equals(invoker.getUrl().getParameter(SIDE_KEY))) {
//统计客户端信息
localPort = 0;
remoteKey = MonitorService.PROVIDER;
remoteValue = invoker.getUrl().getAddress();
} else {
//统计服务端信息
localPort = invoker.getUrl().getPort();
remoteKey = MonitorService.CONSUMER;
remoteValue = remoteHost;
}
String input = "", output = "";
if (invocation.getAttachment(INPUT_KEY) != null) {
//服务端收到请求信息的大小,以字节为单位,服务端统计
input = invocation.getAttachment(INPUT_KEY);
}
if (result != null && result.getAttachment(OUTPUT_KEY) != null) {
//客户端收到返回信息的大小,以字节为单位,客户端统计
output = result.getAttachment(OUTPUT_KEY);
}
//最后的统计信息以URL对象表示
return new URL(COUNT_PROTOCOL, NetUtils.getLocalHost(), localPort, service + PATH_SEPARATOR + method, MonitorService.APPLICATION, application, MonitorService.INTERFACE, service, MonitorService.METHOD, method, remoteKey, remoteValue, error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1", MonitorService.ELAPSED, String.valueOf(elapsed), MonitorService.CONCURRENT, String.valueOf(concurrent), INPUT_KEY, input, OUTPUT_KEY, output, GROUP_KEY, group, VERSION_KEY, version);
}
在collect方法中创建了Monitor对象,因此Monitor对象的创建属于懒加载,只有在使用的时候才会创建。
四、总结
本文详细介绍了MonitorFilter的实现原理及方法实现。由于MonitorFilter作为过滤器,保证了所有的服务调用都会经过该过滤器。
MonitorFilter功能的实现还依赖于Monitor对象,下一篇文章介绍Monitor的实现原理。
转载:https://blog.csdn.net/weixin_38308374/article/details/106044361