Flink对于指标监测有一套自己的实现,指标的统计方式有四种,这些指标都实现了Metric这个接口,而Metric这个接口只是一个标识,本身并没有定义如何方法接口.
Metric
metric的类集成关系如下:
从图中可以看出,Metric这个接口有四个直接子类,分别是:
- Gauge —— 最简单的度量指标,只是简单的返回一个值,比如返回一个队列中当前元素的个数;
- Counter —— 计数器,在一些情况下,会比Gauge高效,比如通过一个AtomicLong变量来统计一个队列的长度;
- Meter —— 吞吐量的度量,也就是一系列事件发生的速率,例如TPS;
- Histogram —— 度量值的统计结果,如最大值、最小值、平均值,以及分布情况等。
以MeterView为例,分析一个Metric的具体实现。MeterView还实现View接口,实现View接口的类,表示其会定期的执行update方法,进行数据的更新
publicclassMeterViewimplementsMeter,View{
//底层使用的计算器
/**Theunderlyingcountermaintainingthecount.*/
privatefinalCountercounter;
//计算平均值的时间跨度
/**Thetime-spanoverwhichtheaverageiscalculated.*/
privatefinalinttimeSpanInSeconds;
//包含历史数据的循环数组
/**Circulararraycontainingthehistoryofvalues.*/
privatefinallong[]values;
//当前时间在数组内的索引
/**Theindexinthearrayforthecurrenttime.*/
privateinttime=0;
//最近计算的比率
/**Thelastratewecomputed.*/
privatedoublecurrentRate=0;
publicMeterView(inttimeSpanInSeconds){
this(newSimpleCounter(),timeSpanInSeconds);
}
publicMeterView(Countercounter,inttimeSpanInSeconds){
this.counter=counter;
//这里的操作是为了让时间跨度刚好是UPDATE_INTERVAL_SECONDS的整数倍
this.timeSpanInSeconds=timeSpanInSeconds-(timeSpanInSeconds%UPDATE_INTERVAL_SECONDS);
this.values=newlong[this.timeSpanInSeconds/UPDATE_INTERVAL_SECONDS+1];
}
@Override
publicvoidmarkEvent(){
this.counter.inc();
}
@Override
publicvoidmarkEvent(longn){
this.counter.inc(n);
}
@Override
publiclonggetCount(){
returncounter.getCount();
}
@Override
publicdoublegetRate(){
returncurrentRate;
}
@Override
publicvoidupdate(){
time=(time+1)%values.length;
values[time]=counter.getCount();
currentRate=((double)(values[time]-values[(time+1)%values.length])
/timeSpanInSeconds);
}
}
从类的属性变量中可以看出,MeterView是在一个Counter计数器的基础之上,封装了一层,从而实现事件每秒的平均速率。以values这个长整型的数组,作为环形数组,实现对最新的历史数据的保存。 在构造函数中,会对入参timeSpanInSeconds这个时间跨度进行修正,使其刚好是UPDATE_INTERVAL_SECONDS的整数倍,另外values数组的长度是timeSpanInSeconds对UPDATE_INTERVAL_SECONDS倍数,再加上1,这样这个数组的最新数据和最老的数据之间的时间间隔就刚好是timeSpanInSeconds。 假设values数组的长度为n,则索引n-1处的统计值,和索引0处的统计值,时间间隔就是timeSpanInSeconds由于是环形数组,所以索引0处的统计值,和索引1处的统计值的时间间隔就是timeSpanInSeconds 所以索引i处的统计值,和索引(i+1)%n处的统计值,时间间隔是timeSpanInSeconds
MetricGroup 为了便于对Metric进行方便的管理和区分,可以对Metric进行分组,MetricGroup就是用来实现这个功能的。 MetricGroup的相关子类的继承关系如下所示。
其中: ProxyMetricGroup —— 这是一个代理类,就是把新Metric或者新的子MetricGroup的注册,委托给代理MetricGroup进行处理; AbstractMetricGroup —— 对新增Metric和子MetricGroup的相关方法进行了实现; 在AbstractMetricGroup中有这些属性
//用来保存这个MetricGroup的父MetricGroup
protectedfinalAparent;
//这个map,是用来保存当前MetricGroup中注册的Metric;
privatefinalMap<String,Metric>metrics=newHashMap<>();
//这个map,是用来保存当前MetricGroup中注册子MetricGroup;
privatefinalMap<String,AbstractMetricGroup>groups=newHashMap<>();
在MetricGroup中,可以建立一个树状的结构,用来存储和归类相关的Metric。
MetricReporter MetricReporter是用来向外披露Metric的监测结果的接口。 由于MetricReporter的子类在实例化时,都是通过反射机制,所以对于其实现子类,需要有一个公共,无参的构造函数,这个接口的定义如下:
publicinterfaceMetricReporter{
voidopen(MetricConfigconfig);
voidclose();
voidnotifyOfAddedMetric(Metricmetric,StringmetricName,MetricGroupgroup);
voidnotifyOfRemovedMetric(Metricmetric,StringmetricName,MetricGroupgroup);
}
open —— 由于子类都是用无参构造函数,通过反射进行实例化,所以相关初始化的工作都是放在这里进行的,并且这个方法需要在实例化后,就需要调用该方法进行相关初始化的工作; close —— 这里就是在关闭时,进行资源回收等相关操作的; notifyOfAddedMetric —— 当有一个新的Metric注册时,会调用该方法来通知MetricReporter; notifyOfRemovedMetric —— 当有一个Metric被移除时,通过这个方法来通知MetricReporter; Metric Report的类集成图:
从图上可以看出flink只是的report类型有:Slf4j,StatsD,Ganglia,Graphite,JMX,Prometheus,Datadog. MetricRegistry MetricGroup是用来对Metric进行分组管理,MetricReporter是用来对外披露Metric,而MetricRegistry就是这两者之间的桥梁,通过MetricRegistry,就可以让MetricReporter感知到在MetricGroup中的Metric发生的变化情况。 对于MetricRegistry这个接口,其实现为MetricRegistryImpl,而其在实例化时,构造函数的入参是一个MetricRegistryConfiguration实例。 MetricRegistryConfiguration MetricRegistryConfiguration顾名思义,就是MetricRegistry的相关配置参数,主要有三个属性,如下:
/**flink中不同组件的范围格式*/
privatefinalScopeFormatsscopeFormats;
/**字符串的分隔符,这是一个全局的分隔符*/
privatefinalchardelimiter;
/**配置中每个reporter的名称和其对应的配置对象的列表*/
privatefinalList<Tuple2<String,Configuration>>reporterConfigurations;
这些属性,都是从配置参数中获取而来,逻辑如下:
publicstaticMetricRegistryConfigurationfromConfiguration(Configurationconfiguration){
/**获取scopeFormats*/
ScopeFormatsscopeFormats;
try{
scopeFormats=ScopeFormats.fromConfig(configuration);
}catch(Exceptione){
LOG.warn("Failedtoparsescopeformat,usingdefaultscopeformats",e);
scopeFormats=ScopeFormats.fromConfig(newConfiguration());
}
/**获取分隔符*/
chardelim;
try{
delim=configuration.getString(MetricOptions.SCOPE_DELIMITER).charAt(0);
}catch(Exceptione){
LOG.warn("Failedtoparsedelimiter,usingdefaultdelimiter.",e);
delim='.';
}
/**获取MetricReporter相关的配置信息,MetricReporter的配置格式是metrics.reporters=foo,bar*/
finalStringdefinedReporters=configuration.getString(MetricOptions.REPORTERS_LIST);
List<Tuple2<String,Configuration>>reporterConfigurations;
if(definedReporters==null){
/**如果没有配置,则返回空集合*/
reporterConfigurations=Collections.emptyList();
}else{
/**按模式匹配分割,如上述的配置,则namedReporters={"foo","bar"}*/
String[]namedReporters=splitPattern.split(definedReporters);
reporterConfigurations=newArrayList<>(namedReporters.length);
for(StringnamedReporter:namedReporters){
/**
*这里是获取一个代理配置对象,就是在原来配置对象的基础上,在查询key时,需要加上这里配置的前缀
,
*如metrics.reporter.foo.,这样就可以获取特定reporter的配置
*/
DelegatingConfigurationdelegatingConfiguration=newDelegatingConfiguration(
configuration,
ConfigConstants.METRICS_REPORTER_PREFIX+namedReporter+'.');
reporterConfigurations.add(Tuple2.of(namedReporter,(Configuration)delegatingConfiguration));
}
}
returnnewMetricRegistryConfiguration(scopeFormats,delim,reporterConfigurations);
}
ScopeFormat
上述的ScopeFormats也是配置对象中获取的,如下:
publicstaticScopeFormatsfromConfig(Configurationconfig){
StringjmFormat=config.getString(MetricOptions.SCOPE_NAMING_JM);
StringjmJobFormat=config.getString(MetricOptions.SCOPE_NAMING_JM_JOB);
StringtmFormat=config.getString(MetricOptions.SCOPE_NAMING_TM);
StringtmJobFormat=config.getString(MetricOptions.SCOPE_NAMING_TM_JOB);
StringtaskFormat=config.getString(MetricOptions.SCOPE_NAMING_TASK);
StringoperatorFormat=config.getString(MetricOptions.SCOPE_NAMING_OPERATOR);
returnnewScopeFormats(jmFormat,jmJobFormat,tmFormat,tmJobFormat,taskFormat,operatorFormat);
}
ScopeFormat的类继承关系如下:
从图中可以看出,Flink中的每个组件,都有对应的格式。 首先看下ScopeFormat中的主要属性对象:
/**这是原生格式,比如<host>.jobmanager,如果为空,则是<empty>*/
privatefinalStringformat;
/**format按照分割符分割后的数组,如template={"<host>","jobmanager”},被<>包裹的元素,是变量元素*/
privatefinalString[]template;
/**这是template数组中,变量元素的索引,如"<host>"是变量,在template中的索引是0,则templatePos={0}
*/
privatefinalint[]templatePos;
/**这个是template中变量元素对应的真实的值,在values数组中的位置,详见构造函数和#bindVariables方法
*/
privatefinalint[]valuePos;
这里以JobManagerScopeFormat为例进行分析说明,在ScopeFormats中,默认传给JobManagerScopeFormat的构造函数的入参值是 .jobmanager 。 则JobManagerScopeFormat的构造过程如下:
/**format的默认值是<host>.jobmanager*/
publicJobManagerScopeFormat(Stringformat){
super(format,null,newString[]{
SCOPE_HOST
});
}
接着看起父类ScopeFormat的构造过程:
/**接上面,入参值为format="<host>.jobmanager",parent=null,variables={"<host>"}*/
protectedScopeFormat(Stringformat,ScopeFormatparent,String[]variables){
checkNotNull(format,"formatisnull");
/**将format这个字符串分割,rawComponents={"<host>","jobmanager"}*/
finalString[]rawComponents=format.split("\\"+SCOPE_SEPARATOR);
/**根据rawComponents的第一个元素是为"*",来判断是否要继承父组的范围*/
finalbooleanparentAsPrefix=rawComponents.length>0&&rawComponents[0].equals
(SCOPE_INHERIT_PARENT);
if(parentAsPrefix){
/**需要继承父组的范围,而父组有是null,则抛出异常*/
if(parent==null){
thrownewIllegalArgumentException("Componentscopeformatrequiresparentprefix
(startswith'"
+SCOPE_INHERIT_PARENT+"'),butthiscomponenthasnoparent(isrootcomponent).");
}
/**如果以"*."开头,则format至少需要有3个字符,否则就是无效字符,设置为"<empty>"*/
this.format=format.length()>2?format.substring(2):"<empty>";
String[]parentTemplate=parent.template;
intparentLen=parentTemplate.length;
/**将父组的范围和自身的范围,合并到一起*/
this.template=newString[parentLen+rawComponents.length-1];
System.arraycopy(parentTemplate,0,this.template,0,parentLen);
System.arraycopy(rawComponents,1,this.template,parentLen,rawComponents.length-1);
}
else{
/**不需要继承父组的范围,则直接赋值,format="<host>.jobmanager",template={"<host>"
,"jobmanager"}*/
this.format=format.isEmpty()?"<empty>":format;
this.template=rawComponents;
}
/**将variables={"<host>"}转换为map{"<host>"->0}*/
HashMap<String,Integer>varToValuePos=arrayToMap(variables);
List<Integer>templatePos=newArrayList<>();
List<Integer>valuePos=newArrayList<>();
for(inti=0;i<template.length;i++){
finalStringcomponent=template[i];
/**检查当前这个组件是否是一个变量*/
if(component!=null&&component.length()>=3&&
component.charAt(0)=='<'&&component.charAt(component.length()-1)=='>'){
/**这是一个变量,则从上面的map中,获取其索引*/
IntegerreplacementPos=varToValuePos.get(component);
if(replacementPos!=null){
templatePos.add(i);
valuePos.add(replacementPos);
}
}
}
this.templatePos=integerListToArray(templatePos);
this.valuePos=integerListToArray(valuePos);
}
经过这个构造过程,ScopeFormat中的四个属性的值如下:
format = “.jobmanager”
template = {“”, “jobmanager”}
templatePos = {0}
valuePos = {0}
对于JobManagerScopeFormat来说,构建一个具体的范围数组的逻辑如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
publicString[]formatScope(Stringhostname){
/获取template数组的一份拷贝,深拷贝*/
finalString[]template=copyTemplate();
finalString[]values={hostname};
/使用hostname替换掉template中索引为0的元素
MetricRegistryImpl 在获取了MetricRegistryConfiguration实例后,在看MetricRegistryImpl的构造函数的实现逻辑。
1 this.executor=Executors.newSingleThreadScheduledExecutor(newExecutorThreadFactory("Flink -MetricRegistry"));
这里给executor这个属性,设置了一个单线程可调度的执行器。 接下来主要看下对MetricReporter相关的初始化工作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/变量配置中配置的reporter的配置*/
for(Tuple2
1 2 3 publicinterfaceScheduled{ voidreport(); } 实现Scheduled接口的reporter,表示其需要被定期调度执行,定期执行的就是其report方法,没有实现Scheduled接口的reporter方法,是不会被定期调度的。 Slf4jReporter这个MetricReporter的子类就实现了Scheduled接口,而其report方法,就是将注册的Metric的信息打印到日志里; JMXReporter这个子类是没有实现Scheduled接口的,但可以通过JMX服务来获取注册的Metric的信息。 添加Metric的过程 Metric的添加逻辑的入口在AbstractMetricGroup的addMetric方法中,逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 protectedvoidaddMetric(Stringname,Metricmetric){ if(metric==null){ LOG.warn("Ignoringattemptedregistrationofametricduetobeingnullforname{}.",name); return; } /只有group仍然打开的情况下,才添加这个metric*/ synchronized(this){ if(!closed){ / 在没有进行"contains"校验下,立即进行put操作,来优化常见的情况(没有碰撞) 碰撞的情况后面会处理。 / Metricprior=metrics.put(name,metric); /检查与其他度量名称的冲突*/ if(prior==null){ /这个名字还没有其他指标,也就是与注册在当前group下的metric没有名称冲突/ if(groups.containsKey(name)){ /与注册在group下的子groups的名称由冲突,这里给出warn日志,而不是fail ,因为metrics是工具,当使用错误时,不应该使得程序失败*/ LOG.warn("Namecollision:Addingametricwiththesamenameasametricsubgroup:'"+ name+"'.Metricmightnotgetproperlyreported."+Arrays.toString (scopeComponents)); } /这里就是桥梁起作用的地方/ registry.register(metric,name,this); } else{ /有碰撞,放回原来的metric/ metrics.put(name,prior); LOG.warn("Namecollision:GroupalreadycontainsaMetricwiththename'"+ name+"'.Metricwillnotbereported."+Arrays.toString(scopeComponents)); } } } } 上述逻辑就是把Metric注册到当前Group中,接着看调用了MetricRegistry的register里的逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
publicvoidregister(Metricmetric,StringmetricName,AbstractMetricGroupgroup){
synchronized(lock){
if(isShutdown()){
LOG.warn("Cannotregistermetric,becausetheMetricRegistryhasalreadybeenshutdown.");
}else{
if(reporters!=null){
/通知所有的reporters,注册了一个metric,以及对应的metricName,group*/
for(inti=0;i
1 2 3 4 5 6 7 publicinterfaceView{ /metrics更新的间隔*/ intUPDATE_INTERVAL_SECONDS=5; /被定期调用进行metric更新的方法*/ voidupdate(); } 实现了View接口的Metric,需要定期的调用update方法,进行状态的更新,而这个定期更新的功能是通过ViewUpdater实现的,其构造函数中,就是在executor中添加了一个定期执行的task。
1 2 3 publicViewUpdater(ScheduledExecutorServiceexecutor){ executor.scheduleWithFixedDelay(newViewUpdaterTask(lock,toAdd,toRemove),5 ,UPDATE_INTERVAL_SECONDS,TimeUnit.SECONDS); } 新增一个Metric时,通知viewUpdater时的逻辑如下:
1 2 3 4 5 publicvoidnotifyOfAddedView(Viewview){ synchronized(lock){ toAdd.add(view); } } 就是想toAdd这个Set中添加一个新的元素,通过lock这个锁来实现同步。 而ViewUpdaterTask的run方法中,就会调用注册的Metric的update方法,同时更新几个Set。逻辑如下:
publicvoidrun(){ for(ViewtoUpdate:this.views){ toUpdate.update(); } synchronized(lock){ views.addAll(toAdd); toAdd.clear(); views.removeAll(toRemove); toRemove.clear(); } } MetricQueryService 在MetricRegistryImpl中有一个属性queryService,是一个ActorRef,对应的具体实现是MetricQueryService。在MetricQueryService中也维护了注册的各种Metric,并且也是从MetricRegistry那里接受Metric的添加和删除的消息.
/**用来接收Metric添加的消息*/
publicstaticvoidnotifyOfAddedMetric(ActorRefservice,Metricmetric,StringmetricName
,AbstractMetricGroupgroup){
service.tell(newAddMetric(metricName,metric,group),null);
}
/**用于接收Metric删除的消息*/
publicstaticvoidnotifyOfRemovedMetric(ActorRefservice,Metricmetric){
service.tell(newRemoveMetric(metric),null);
}
MetricQueryService在接收到这类消息后,会在onReceive方法中根据不同的消息类型进行相应的处理,添加和删除Metric就是在四类Metric对应的map属性上进行相应的添加删除操作。以此来实现对Metric信息的维护。 onReceive方法中还会接收到一类消息,叫CreateDump消息,接收到这个消息后,就会把当前所有的Metric数据进行序列化操作,得到一个MetricDumpSerialization.MetricSerializationResult序列化后的结果实例,并发送给请求者。 对于Metric的序列化和反序列化的实现都在MetricDumpSerialization这个类中。 通过MetricDumpSerializer进行序列化,序列化后的结果为MetricSerializationResult; 通过MetricDumpDeserializer进行反序列化,反序列化后的结果为MetricDump;
参考
https://blog.csdn.net/qq_21653785/article/details/79625601