寒玉 Blog
  • Home
  • Books
  • About Me
  • Categories
  • Tags
  • Archives

flink源码系列-监控


Flink对于指标监测有一套自己的实现,指标的统计方式有四种,这些指标都实现了Metric这个接口,而Metric这个接口只是一个标识,本身并没有定义如何方法接口.

Metric

metric的类集成关系如下:

从图中可以看出,Metric这个接口有四个直接子类,分别是:

  • Gauge —— 最简单的度量指标,只是简单的返回一个值,比如返回一个队列中当前元素的个数;
  • Counter —— 计数器,在一些情况下,会比Gauge高效,比如通过一个AtomicLong变量来统计一个队列的长度;
  • Meter —— 吞吐量的度量,也就是一系列事件发生的速率,例如TPS;
  • Histogram —— 度量值的统计结果,如最大值、最小值、平均值,以及分布情况等。

以MeterView为例,分析一个Metric的具体实现。MeterView还实现View接口,实现View接口的类,表示其会定期的执行update方法,进行数据的更新

publicclassMeterViewimplementsMeter,View{
    //底层使用的计算器
    /**Theunderlyingcountermaintainingthecount.*/
    privatefinalCountercounter;
    //计算平均值的时间跨度
    /**Thetime-spanoverwhichtheaverageiscalculated.*/
    privatefinalinttimeSpanInSeconds;
    //包含历史数据的循环数组
    /**Circulararraycontainingthehistoryofvalues.*/
    privatefinallong[]values;
    //当前时间在数组内的索引
    /**Theindexinthearrayforthecurrenttime.*/
    privateinttime=0;
    //最近计算的比率
    /**Thelastratewecomputed.*/
    privatedoublecurrentRate=0;
    publicMeterView(inttimeSpanInSeconds){
        this(newSimpleCounter(),timeSpanInSeconds);
    }
    publicMeterView(Countercounter,inttimeSpanInSeconds){
        this.counter=counter;
        //这里的操作是为了让时间跨度刚好是UPDATE_INTERVAL_SECONDS的整数倍
        this.timeSpanInSeconds=timeSpanInSeconds-(timeSpanInSeconds%UPDATE_INTERVAL_SECONDS);
        this.values=newlong[this.timeSpanInSeconds/UPDATE_INTERVAL_SECONDS+1];
    }
    @Override
    publicvoidmarkEvent(){
        this.counter.inc();
    }
    @Override
    publicvoidmarkEvent(longn){
        this.counter.inc(n);
    }
    @Override
    publiclonggetCount(){
        returncounter.getCount();
    }
    @Override
    publicdoublegetRate(){
        returncurrentRate;
    }
    @Override
    publicvoidupdate(){
        time=(time+1)%values.length;
        values[time]=counter.getCount();
        currentRate=((double)(values[time]-values[(time+1)%values.length])
            /timeSpanInSeconds);
    }
}

从类的属性变量中可以看出,MeterView是在一个Counter计数器的基础之上,封装了一层,从而实现事件每秒的平均速率。以values这个长整型的数组,作为环形数组,实现对最新的历史数据的保存。 在构造函数中,会对入参timeSpanInSeconds这个时间跨度进行修正,使其刚好是UPDATE_INTERVAL_SECONDS的整数倍,另外values数组的长度是timeSpanInSeconds对UPDATE_INTERVAL_SECONDS倍数,再加上1,这样这个数组的最新数据和最老的数据之间的时间间隔就刚好是timeSpanInSeconds。 假设values数组的长度为n,则索引n-1处的统计值,和索引0处的统计值,时间间隔就是timeSpanInSeconds由于是环形数组,所以索引0处的统计值,和索引1处的统计值的时间间隔就是timeSpanInSeconds 所以索引i处的统计值,和索引(i+1)%n处的统计值,时间间隔是timeSpanInSeconds

MetricGroup 为了便于对Metric进行方便的管理和区分,可以对Metric进行分组,MetricGroup就是用来实现这个功能的。 MetricGroup的相关子类的继承关系如下所示。

其中: ProxyMetricGroup —— 这是一个代理类,就是把新Metric或者新的子MetricGroup的注册,委托给代理MetricGroup进行处理; AbstractMetricGroup —— 对新增Metric和子MetricGroup的相关方法进行了实现; 在AbstractMetricGroup中有这些属性

//用来保存这个MetricGroup的父MetricGroup
protectedfinalAparent;
//这个map,是用来保存当前MetricGroup中注册的Metric;
privatefinalMap<String,Metric>metrics=newHashMap<>();
//这个map,是用来保存当前MetricGroup中注册子MetricGroup;
privatefinalMap<String,AbstractMetricGroup>groups=newHashMap<>();

在MetricGroup中,可以建立一个树状的结构,用来存储和归类相关的Metric。

MetricReporter MetricReporter是用来向外披露Metric的监测结果的接口。 由于MetricReporter的子类在实例化时,都是通过反射机制,所以对于其实现子类,需要有一个公共,无参的构造函数,这个接口的定义如下:

publicinterfaceMetricReporter{
voidopen(MetricConfigconfig);
voidclose();
voidnotifyOfAddedMetric(Metricmetric,StringmetricName,MetricGroupgroup);
voidnotifyOfRemovedMetric(Metricmetric,StringmetricName,MetricGroupgroup);
}

open —— 由于子类都是用无参构造函数,通过反射进行实例化,所以相关初始化的工作都是放在这里进行的,并且这个方法需要在实例化后,就需要调用该方法进行相关初始化的工作; close —— 这里就是在关闭时,进行资源回收等相关操作的; notifyOfAddedMetric —— 当有一个新的Metric注册时,会调用该方法来通知MetricReporter; notifyOfRemovedMetric —— 当有一个Metric被移除时,通过这个方法来通知MetricReporter; Metric Report的类集成图:

从图上可以看出flink只是的report类型有:Slf4j,StatsD,Ganglia,Graphite,JMX,Prometheus,Datadog. MetricRegistry MetricGroup是用来对Metric进行分组管理,MetricReporter是用来对外披露Metric,而MetricRegistry就是这两者之间的桥梁,通过MetricRegistry,就可以让MetricReporter感知到在MetricGroup中的Metric发生的变化情况。 对于MetricRegistry这个接口,其实现为MetricRegistryImpl,而其在实例化时,构造函数的入参是一个MetricRegistryConfiguration实例。 MetricRegistryConfiguration MetricRegistryConfiguration顾名思义,就是MetricRegistry的相关配置参数,主要有三个属性,如下:

/**flink中不同组件的范围格式*/
privatefinalScopeFormatsscopeFormats;
/**字符串的分隔符,这是一个全局的分隔符*/
privatefinalchardelimiter;
/**配置中每个reporter的名称和其对应的配置对象的列表*/
privatefinalList<Tuple2<String,Configuration>>reporterConfigurations;

这些属性,都是从配置参数中获取而来,逻辑如下:



publicstaticMetricRegistryConfigurationfromConfiguration(Configurationconfiguration){
/**获取scopeFormats*/
ScopeFormatsscopeFormats;
try{
scopeFormats=ScopeFormats.fromConfig(configuration);
}catch(Exceptione){
LOG.warn("Failedtoparsescopeformat,usingdefaultscopeformats",e);
scopeFormats=ScopeFormats.fromConfig(newConfiguration());
}
/**获取分隔符*/
chardelim;
try{
delim=configuration.getString(MetricOptions.SCOPE_DELIMITER).charAt(0);
}catch(Exceptione){
LOG.warn("Failedtoparsedelimiter,usingdefaultdelimiter.",e);
delim='.';
}
/**获取MetricReporter相关的配置信息,MetricReporter的配置格式是metrics.reporters=foo,bar*/
finalStringdefinedReporters=configuration.getString(MetricOptions.REPORTERS_LIST);
List<Tuple2<String,Configuration>>reporterConfigurations;
if(definedReporters==null){
/**如果没有配置,则返回空集合*/
reporterConfigurations=Collections.emptyList();
}else{
/**按模式匹配分割,如上述的配置,则namedReporters={"foo","bar"}*/
String[]namedReporters=splitPattern.split(definedReporters);
reporterConfigurations=newArrayList<>(namedReporters.length);
for(StringnamedReporter:namedReporters){
/**
*这里是获取一个代理配置对象,就是在原来配置对象的基础上,在查询key时,需要加上这里配置的前缀
    ,
*如metrics.reporter.foo.,这样就可以获取特定reporter的配置
*/
DelegatingConfigurationdelegatingConfiguration=newDelegatingConfiguration(
configuration,
ConfigConstants.METRICS_REPORTER_PREFIX+namedReporter+'.');
reporterConfigurations.add(Tuple2.of(namedReporter,(Configuration)delegatingConfiguration));
}
}
returnnewMetricRegistryConfiguration(scopeFormats,delim,reporterConfigurations);
}

ScopeFormat

上述的ScopeFormats也是配置对象中获取的,如下:

publicstaticScopeFormatsfromConfig(Configurationconfig){
StringjmFormat=config.getString(MetricOptions.SCOPE_NAMING_JM);
StringjmJobFormat=config.getString(MetricOptions.SCOPE_NAMING_JM_JOB);
StringtmFormat=config.getString(MetricOptions.SCOPE_NAMING_TM);
StringtmJobFormat=config.getString(MetricOptions.SCOPE_NAMING_TM_JOB);
StringtaskFormat=config.getString(MetricOptions.SCOPE_NAMING_TASK);
StringoperatorFormat=config.getString(MetricOptions.SCOPE_NAMING_OPERATOR);
returnnewScopeFormats(jmFormat,jmJobFormat,tmFormat,tmJobFormat,taskFormat,operatorFormat);
}

ScopeFormat的类继承关系如下:

从图中可以看出,Flink中的每个组件,都有对应的格式。 首先看下ScopeFormat中的主要属性对象:

/**这是原生格式,比如<host>.jobmanager,如果为空,则是<empty>*/
privatefinalStringformat;
/**format按照分割符分割后的数组,如template={"<host>","jobmanager”},被<>包裹的元素,是变量元素*/
privatefinalString[]template;
/**这是template数组中,变量元素的索引,如"<host>"是变量,在template中的索引是0,则templatePos={0}
    */
privatefinalint[]templatePos;
/**这个是template中变量元素对应的真实的值,在values数组中的位置,详见构造函数和#bindVariables方法
    */
privatefinalint[]valuePos;

这里以JobManagerScopeFormat为例进行分析说明,在ScopeFormats中,默认传给JobManagerScopeFormat的构造函数的入参值是 .jobmanager 。 则JobManagerScopeFormat的构造过程如下:

/**format的默认值是<host>.jobmanager*/
publicJobManagerScopeFormat(Stringformat){
super(format,null,newString[]{
SCOPE_HOST
});
}

接着看起父类ScopeFormat的构造过程:



/**接上面,入参值为format="<host>.jobmanager",parent=null,variables={"<host>"}*/
protectedScopeFormat(Stringformat,ScopeFormatparent,String[]variables){
checkNotNull(format,"formatisnull");
/**将format这个字符串分割,rawComponents={"<host>","jobmanager"}*/
finalString[]rawComponents=format.split("\\"+SCOPE_SEPARATOR);
/**根据rawComponents的第一个元素是为"*",来判断是否要继承父组的范围*/
finalbooleanparentAsPrefix=rawComponents.length>0&&rawComponents[0].equals
    (SCOPE_INHERIT_PARENT);
if(parentAsPrefix){
/**需要继承父组的范围,而父组有是null,则抛出异常*/
if(parent==null){
thrownewIllegalArgumentException("Componentscopeformatrequiresparentprefix
    (startswith'"
+SCOPE_INHERIT_PARENT+"'),butthiscomponenthasnoparent(isrootcomponent).");
}
/**如果以"*."开头,则format至少需要有3个字符,否则就是无效字符,设置为"<empty>"*/
this.format=format.length()>2?format.substring(2):"<empty>";
String[]parentTemplate=parent.template;
intparentLen=parentTemplate.length;
/**将父组的范围和自身的范围,合并到一起*/
this.template=newString[parentLen+rawComponents.length-1];
System.arraycopy(parentTemplate,0,this.template,0,parentLen);
System.arraycopy(rawComponents,1,this.template,parentLen,rawComponents.length-1);
}
else{
/**不需要继承父组的范围,则直接赋值,format="<host>.jobmanager",template={"<host>"
    ,"jobmanager"}*/
this.format=format.isEmpty()?"<empty>":format;
this.template=rawComponents;
}
/**将variables={"<host>"}转换为map{"<host>"->0}*/
HashMap<String,Integer>varToValuePos=arrayToMap(variables);
List<Integer>templatePos=newArrayList<>();
List<Integer>valuePos=newArrayList<>();
for(inti=0;i<template.length;i++){
finalStringcomponent=template[i];
/**检查当前这个组件是否是一个变量*/
if(component!=null&&component.length()>=3&&
component.charAt(0)=='<'&&component.charAt(component.length()-1)=='>'){
/**这是一个变量,则从上面的map中,获取其索引*/
IntegerreplacementPos=varToValuePos.get(component);
if(replacementPos!=null){
templatePos.add(i);
valuePos.add(replacementPos);
}
}
}
this.templatePos=integerListToArray(templatePos);
this.valuePos=integerListToArray(valuePos);
}

经过这个构造过程,ScopeFormat中的四个属性的值如下:

format = “.jobmanager”
template = {“”, “jobmanager”}
templatePos = {0}
valuePos = {0}

对于JobManagerScopeFormat来说,构建一个具体的范围数组的逻辑如下:

 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 publicString[]formatScope(Stringhostname){ /获取template数组的一份拷贝,深拷贝*/ finalString[]template=copyTemplate(); finalString[]values={hostname}; /使用hostname替换掉template中索引为0的元素/ returnbindVariables(template,values); } protectedfinalString[]copyTemplate(){ String[]copy=newString[template.length]; System.arraycopy(template,0,copy,0,template.length); returncopy; } /在结合这个逻辑,就知道ScopeFormat中的属性valuePos的作用了/ protectedfinalString[]bindVariables(String[]template,String[]values){ finalintlen=templatePos.length; for(inti=0;i<len;i++){ template[templatePos[i]]=values[valuePos[i]]; } returntemplate; }

MetricRegistryImpl 在获取了MetricRegistryConfiguration实例后,在看MetricRegistryImpl的构造函数的实现逻辑。

 1 this.executor=Executors.newSingleThreadScheduledExecutor(newExecutorThreadFactory("Flink -MetricRegistry"));

这里给executor这个属性,设置了一个单线程可调度的执行器。 接下来主要看下对MetricReporter相关的初始化工作。

 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 /变量配置中配置的reporter的配置*/ for(Tuple2reporterConfiguration:reporterConfigurations){ StringnamedReporter=reporterConfiguration.f0; /reporterConfig是Configuration的子类DelegatingConfiguration,会肯定定义的前缀来找key/ ConfigurationreporterConfig=reporterConfiguration.f1; /获取MetricReporter的具体实现子类的全限定类型,配置的key如:metrics.reporter.foo.class*/ finalStringclassName=reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX ,null); if(className==null){ LOG.error("Noreporterclasssetforreporter"+namedReporter+".Metricsmightnotbeexposed /reported."); continue; } try{ /获取配置的定期执行的时间间隔,key的格式如:metrics.reporter.foo.interval/ StringconfiguredPeriod=reporterConfig.getString(ConfigConstants .METRICS_REPORTER_INTERVAL_SUFFIX,null); TimeUnittimeunit=TimeUnit.SECONDS; longperiod=10; if(configuredPeriod!=null){ try{ String[]interval=configuredPeriod.split(""); period=Long.parseLong(interval[0]); timeunit=TimeUnit.valueOf(interval[1]); } catch(Exceptione){ LOG.error("Cannotparsereportintervalfromconfig:"+configuredPeriod+ "-pleaseusevalueslike'10SECONDS'or'500MILLISECONDS'."+ "Usingdefaultreportinginterval."); } } /实例化MetricReporter的子类*/ Class<?>reporterClass=Class.forName(className); MetricReporterreporterInstance=(MetricReporter)reporterClass.newInstance(); /构造MetricConfig的实例,并把reporterConfig中的配置key-value都添加到metricConfig中/ MetricConfigmetricConfig=newMetricConfig(); reporterConfig.addAllToProperties(metricConfig); LOG.info("Configuring{}with{}.",reporterClass.getSimpleName(),metricConfig); /这里就是reporter进行初始化操作的地方*/ reporterInstance.open(metricConfig); /如果reporter实现了Scheduled接口,则通过executor进行定期调度执行,执行时间间隔就是上面获取的时 间间隔/ if(reporterInstanceinstanceofScheduled){ LOG.info("Periodicallyreportingmetricsinintervalsof{}{}forreporter{}oftype{}." ,period,timeunit.name(),namedReporter,className); /将reporter封装成一个task,并调度定期更新执行*/ executor.scheduleWithFixedDelay( newMetricRegistryImpl.ReporterTask((Scheduled)reporterInstance),period,period ,timeunit); }else{ LOG.info("Reportingmetricsforreporter{}oftype{}.",namedReporter,className); } /将reporter添加到集合中/ reporters.add(reporterInstance); /获取reporter定制化的分隔符,如果没有设置,则设置为全局分割符/ StringdelimiterForReporter=reporterConfig.getString(ConfigConstants .METRICS_REPORTER_SCOPE_DELIMITER,String.valueOf(globalDelimiter)); if(delimiterForReporter.length()!=1){ LOG.warn("Failedtoparsedelimiter'{}'forreporter'{}',usingglobaldelimiter'{}'." ,delimiterForReporter,namedReporter,globalDelimiter); delimiterForReporter=String.valueOf(globalDelimiter); } this.delimiters.add(delimiterForReporter.charAt(0)); } catch(Throwablet){ LOG.error("Couldnotinstantiatemetricsreporter{}.Metricsmightnotbeexposed/reported." ,namedReporter,t); } } 其中Schedule接口,只有一个report接口。

 1 2 3 publicinterfaceScheduled{ voidreport(); } 实现Scheduled接口的reporter,表示其需要被定期调度执行,定期执行的就是其report方法,没有实现Scheduled接口的reporter方法,是不会被定期调度的。 Slf4jReporter这个MetricReporter的子类就实现了Scheduled接口,而其report方法,就是将注册的Metric的信息打印到日志里; JMXReporter这个子类是没有实现Scheduled接口的,但可以通过JMX服务来获取注册的Metric的信息。 添加Metric的过程 Metric的添加逻辑的入口在AbstractMetricGroup的addMetric方法中,逻辑如下:

 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 protectedvoidaddMetric(Stringname,Metricmetric){ if(metric==null){ LOG.warn("Ignoringattemptedregistrationofametricduetobeingnullforname{}.",name); return; } /只有group仍然打开的情况下,才添加这个metric*/ synchronized(this){ if(!closed){ / 在没有进行"contains"校验下,立即进行put操作,来优化常见的情况(没有碰撞) 碰撞的情况后面会处理。 / Metricprior=metrics.put(name,metric); /检查与其他度量名称的冲突*/ if(prior==null){ /这个名字还没有其他指标,也就是与注册在当前group下的metric没有名称冲突/ if(groups.containsKey(name)){ /与注册在group下的子groups的名称由冲突,这里给出warn日志,而不是fail ,因为metrics是工具,当使用错误时,不应该使得程序失败*/ LOG.warn("Namecollision:Addingametricwiththesamenameasametricsubgroup:'"+ name+"'.Metricmightnotgetproperlyreported."+Arrays.toString (scopeComponents)); } /这里就是桥梁起作用的地方/ registry.register(metric,name,this); } else{ /有碰撞,放回原来的metric/ metrics.put(name,prior); LOG.warn("Namecollision:GroupalreadycontainsaMetricwiththename'"+ name+"'.Metricwillnotbereported."+Arrays.toString(scopeComponents)); } } } } 上述逻辑就是把Metric注册到当前Group中,接着看调用了MetricRegistry的register里的逻辑。

 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 publicvoidregister(Metricmetric,StringmetricName,AbstractMetricGroupgroup){ synchronized(lock){ if(isShutdown()){ LOG.warn("Cannotregistermetric,becausetheMetricRegistryhasalreadybeenshutdown."); }else{ if(reporters!=null){ /通知所有的reporters,注册了一个metric,以及对应的metricName,group*/ for(inti=0;i>(i,group); /然后调用reporter的接口方法,通知reporter/ reporter.notifyOfAddedMetric(metric,metricName,front); } }catch(Exceptione){ LOG.warn("Errorwhileregisteringmetric.",e); } } } try{ /如果queryService不为null,则也通知它*/ if(queryService!=null){ MetricQueryService.notifyOfAddedMetric(queryService,metric,metricName,group); } }catch(Exceptione){ LOG.warn("Errorwhileregisteringmetric.",e); } try{ /如果metric实现了View接口,则将其添加到定期更新的viewUpdater中/ if(metricinstanceofView){ if(viewUpdater==null){ viewUpdater=newViewUpdater(executor); } viewUpdater.notifyOfAddedView((View)metric); } }catch(Exceptione){ LOG.warn("Errorwhileregisteringmetric.",e); } } } } 从上述逻辑,可以看出MetricRegistry所起的桥梁作用了,它会再依次通知配置的各个reporter,前面已经介绍过AbstractReporter这个抽象子类实现。 View接口 View接口的定义如下:

 1 2 3 4 5 6 7 publicinterfaceView{ /metrics更新的间隔*/ intUPDATE_INTERVAL_SECONDS=5; /被定期调用进行metric更新的方法*/ voidupdate(); } 实现了View接口的Metric,需要定期的调用update方法,进行状态的更新,而这个定期更新的功能是通过ViewUpdater实现的,其构造函数中,就是在executor中添加了一个定期执行的task。

 1 2 3 publicViewUpdater(ScheduledExecutorServiceexecutor){ executor.scheduleWithFixedDelay(newViewUpdaterTask(lock,toAdd,toRemove),5 ,UPDATE_INTERVAL_SECONDS,TimeUnit.SECONDS); } 新增一个Metric时,通知viewUpdater时的逻辑如下:

 1 2 3 4 5 publicvoidnotifyOfAddedView(Viewview){ synchronized(lock){ toAdd.add(view); } } 就是想toAdd这个Set中添加一个新的元素,通过lock这个锁来实现同步。 而ViewUpdaterTask的run方法中,就会调用注册的Metric的update方法,同时更新几个Set。逻辑如下:

publicvoidrun(){ for(ViewtoUpdate:this.views){ toUpdate.update(); } synchronized(lock){ views.addAll(toAdd); toAdd.clear(); views.removeAll(toRemove); toRemove.clear(); } } MetricQueryService 在MetricRegistryImpl中有一个属性queryService,是一个ActorRef,对应的具体实现是MetricQueryService。在MetricQueryService中也维护了注册的各种Metric,并且也是从MetricRegistry那里接受Metric的添加和删除的消息.

/**用来接收Metric添加的消息*/
publicstaticvoidnotifyOfAddedMetric(ActorRefservice,Metricmetric,StringmetricName
    ,AbstractMetricGroupgroup){
service.tell(newAddMetric(metricName,metric,group),null);
}
/**用于接收Metric删除的消息*/
publicstaticvoidnotifyOfRemovedMetric(ActorRefservice,Metricmetric){
service.tell(newRemoveMetric(metric),null);
}

MetricQueryService在接收到这类消息后,会在onReceive方法中根据不同的消息类型进行相应的处理,添加和删除Metric就是在四类Metric对应的map属性上进行相应的添加删除操作。以此来实现对Metric信息的维护。 onReceive方法中还会接收到一类消息,叫CreateDump消息,接收到这个消息后,就会把当前所有的Metric数据进行序列化操作,得到一个MetricDumpSerialization.MetricSerializationResult序列化后的结果实例,并发送给请求者。 对于Metric的序列化和反序列化的实现都在MetricDumpSerialization这个类中。 通过MetricDumpSerializer进行序列化,序列化后的结果为MetricSerializationResult; 通过MetricDumpDeserializer进行反序列化,反序列化后的结果为MetricDump;

参考

https://blog.csdn.net/qq_21653785/article/details/79625601


  • « idea-查询athena
  • druid的规则设置 »

Published

3 28, 2019

Category

flink

Tags

  • flink 3
  • Powered by Pelican. Theme: Elegant by Talha Mansoor