执行计划ExecutionGraph的生成
flink的运行时执行计划为ExecutionGraph,是JobManager运行时候的抽象.
StreamGraph和JobGraph都是在client生成的,但是ExecutionGrap是在JobMaster端生成的.
客户端JobGraph的提交
客户端的JobGraph
生成之后,通过LeaderRetrivalService
获取JobManager
的地址,然后将JobGraph
提交给JobManager
去执行.flink的核心进程通信是通过Akka来完成的.JobManager
,TaskManager
都是一个Akka system,所以在提交的时候需要先生成一个客户端actor与JobManager
交互,然后执行rpc命令.
下面方法发送一个JobGraph
到JobManager
.这个方法会一直block直到job返回或者JobManager
不在处于alive状态.
public static JobExecutionResult submitJobAndWait(
ActorSystem actorSystem,
Configuration config,
HighAvailabilityServices highAvailabilityServices,
JobGraph jobGraph,
FiniteDuration timeout,
boolean sysoutLogUpdates,
ClassLoader classLoader) throws JobExecutionException {
JobListeningContext jobListeningContext = submitJob(
actorSystem,
config,
highAvailabilityServices,
jobGraph,
timeout,
sysoutLogUpdates,
classLoader);
return awaitJobResult(jobListeningContext);
}
真正提交Job并且返回JobListeningContext
public static JobListeningContext submitJob(
ActorSystem actorSystem,
Configuration config,
HighAvailabilityServices highAvailabilityServices,
JobGraph jobGraph,
FiniteDuration timeout,
boolean sysoutLogUpdates,
ClassLoader classLoader) {
checkNotNull(actorSystem, "The actorSystem must not be null.");
checkNotNull(highAvailabilityServices, "The high availability services must not be null.");
checkNotNull(jobGraph, "The jobGraph must not be null.");
checkNotNull(timeout, "The timeout must not be null.");
// for this job, we create a proxy JobClientActor that deals with all communication with
// the JobManager. It forwards the job submission, checks the success/failure responses, logs
// update messages, watches for disconnect between client and JobManager, ...
Props jobClientActorProps = JobSubmissionClientActor.createActorProps(
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
timeout,
sysoutLogUpdates,
config);
ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
Future<Object> submissionFuture = Patterns.ask(
jobClientActor,
new JobClientMessages.SubmitJobAndWait(jobGraph),
new Timeout(AkkaUtils.INF_TIMEOUT()));
return new JobListeningContext(
jobGraph.getJobID(),
submissionFuture,
jobClientActor,
timeout,
classLoader,
highAvailabilityServices);
}
提交Job的基本流程如下:
- 启动
JobClientActor
用来和JobManager
交互 - 启动
LeaderRetrievalService
获取JobManager
的地址 - 上传用户Jar包
- 提交SubmitJob命令
JobManager执行生成计划
客户端上传完jar包和JobGraph,flink 会进一步解析封装成运行时的执行计划ExecutionGraph
执行计划ExecutionGraph的生成
flink的运行时执行计划为ExecutionGraph
,ExecutionGraph
对应之前的JobGraph
,一个ExecutionGraph
包含多个 ExecutionJobVertex
节点JobGraph
的JobVertex
,每个ExecutionJobVertex
节点的并发子task
对应一个ExecutionVertex
,每个ExecutionVertex
的一次attempt执行被抽象为一次Execution
.
client生成JobGraph之后,就通过submitJob提交至JobMaster。在其构造函数中,会生成ExecutionGraph.下面代码是ExecutionGraph
的构造函数.
public ExecutionGraph(
JobInformation jobInformation,
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
Time rpcTimeout,
RestartStrategy restartStrategy,
FailoverStrategy.Factory failoverStrategyFactory,
SlotProvider slotProvider,
ClassLoader userClassLoader,
BlobWriter blobWriter,
Time allocationTimeout) throws IOException {
checkNotNull(futureExecutor);
this.jobInformation = Preconditions.checkNotNull(jobInformation);
this.blobWriter = Preconditions.checkNotNull(blobWriter);
this.jobInformationOrBlobKey = BlobWriter.serializeAndTryOffload(jobInformation, jobInformation.getJobId(), blobWriter);
this.futureExecutor = Preconditions.checkNotNull(futureExecutor);
this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
this.slotProvider = Preconditions.checkNotNull(slotProvider, "scheduler");
this.userClassLoader = Preconditions.checkNotNull(userClassLoader, "userClassLoader");
this.tasks = new ConcurrentHashMap<>(16);
this.intermediateResults = new ConcurrentHashMap<>(16);
this.verticesInCreationOrder = new ArrayList<>(16);
this.currentExecutions = new ConcurrentHashMap<>(16);
this.jobStatusListeners = new CopyOnWriteArrayList<>();
this.executionListeners = new CopyOnWriteArrayList<>();
this.stateTimestamps = new long[JobStatus.values().length];
this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
this.rpcTimeout = checkNotNull(rpcTimeout);
this.allocationTimeout = checkNotNull(allocationTimeout);
this.restartStrategy = restartStrategy;
this.kvStateLocationRegistry = new KvStateLocationRegistry(jobInformation.getJobId(), getAllVertices());
this.verticesFinished = new AtomicInteger();
this.globalModVersion = 1L;
// the failover strategy must be instantiated last, so that the execution graph
// is ready by the time the failover strategy sees it
this.failoverStrategy = checkNotNull(failoverStrategyFactory.create(this), "null failover strategy");
this.schedulingFuture = null;
LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName());
}
从构造函数中可以看到EexcutionGraph
为维护Job信息,运行的Task信息,状态切换信息等.
ExecutionGraph
的构建是在JobManager的构造函数中.具体方法为createAndRestoreExecutionGraph
this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
private ExecutionGraph createAndRestoreExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup) throws Exception {
ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup);
final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();
if (checkpointCoordinator != null) {
// check whether we find a valid checkpoint
if (!checkpointCoordinator.restoreLatestCheckpointedState(
newExecutionGraph.getAllVertices(),
false,
false)) {
// check whether we can restore from a savepoint
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
}
}
return newExecutionGraph;
}
通过层层调用,真正执行的方法是public static ExecutionGraph buildGraph
checkNotNull(jobGraph, "job graph cannot be null");
final String jobName = jobGraph.getName();
final JobID jobId = jobGraph.getJobID();
final FailoverStrategy.Factory failoverStrategy =
FailoverStrategyLoader.loadFailoverStrategy(jobManagerConfig, log);
final JobInformation jobInformation = new JobInformation(
jobId,
jobName,
jobGraph.getSerializedExecutionConfig(),
jobGraph.getJobConfiguration(),
jobGraph.getUserJarBlobKeys(),
jobGraph.getClasspaths());
// create a new execution graph, if none exists so far
final ExecutionGraph executionGraph;
try {
executionGraph = (prior != null) ? prior :
new ExecutionGraph(
jobInformation,
futureExecutor,
ioExecutor,
rpcTimeout,
restartStrategy,
failoverStrategy,
slotProvider,
classLoader,
blobWriter,
allocationTimeout);
} catch (IOException e) {
throw new JobException("Could not create the ExecutionGraph.", e);
}
// set the basic properties
//setScheduleMode schedule mode固定是EAGER的
executionGraph.setScheduleMode(jobGraph.getScheduleMode());
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
try {
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
}
catch (Throwable t) {
log.warn("Cannot create JSON plan for job", t);
// give the graph an empty plan
executionGraph.setJsonPlan("{}");
}
// initialize the vertices that have a master initialization hook
// file output formats create directories here, input formats create splits
final long initMasterStart = System.nanoTime();
log.info("Running initialization on master for job {} ({}).", jobName, jobId);
// 按拓扑顺序,获取所有的JobVertex列表
for (JobVertex vertex : jobGraph.getVertices()) {
String executableClass = vertex.getInvokableClassName();
if (executableClass == null || executableClass.isEmpty()) {
throw new JobSubmissionException(jobId,
"The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class.");
}
if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
if (parallelismForAutoMax < 0) {
throw new JobSubmissionException(
jobId,
PARALLELISM_AUTO_MAX_ERROR_MESSAGE);
}
else {
vertex.setParallelism(parallelismForAutoMax);
}
}
try {
vertex.initializeOnMaster(classLoader);
}
catch (Throwable t) {
throw new JobExecutionException(jobId,
"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);
}
}
log.info("Successfully ran initialization on master in {} ms.",
(System.nanoTime() - initMasterStart) / 1_000_000);
// topologically sort the job vertices and attach the graph to the existing one
List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
if (log.isDebugEnabled()) {
log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);
}
executionGraph.attachJobGraph(sortedTopology);
最后一行,ExecutionGraph.attachJobGraph
方法是比较关键的方法,该方法构建婉节点,生成执行计划DAG:
public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " +
"vertices and {} intermediate results.",
topologiallySorted.size(), tasks.size(), intermediateResults.size());
final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologiallySorted.size());
final long createTimestamp = System.currentTimeMillis();
//遍历Job vertex
for (JobVertex jobVertex : topologiallySorted) {
if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
this.isStoppable = false;
}
// 根据Job vertex创建对应的ExecutionVertex
// create the execution job vertex and attach it to the graph
ExecutionJobVertex ejv = new ExecutionJobVertex(
this,
jobVertex,
1,
rpcTimeout,
globalModVersion,
createTimestamp);
// 将创建的ExecutionJobVertex与前置的IntermediateResult连接起来
ejv.connectToPredecessors(this.intermediateResults);
ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
if (previousTask != null) {
throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
jobVertex.getID(), ejv, previousTask));
}
for (IntermediateResult res : ejv.getProducedDataSets()) {
IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
if (previousDataSet != null) {
throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
res.getId(), res, previousDataSet));
}
}
this.verticesInCreationOrder.add(ejv);
this.numVerticesTotal += ejv.getParallelism();
newExecJobVertices.add(ejv);
}
terminationFuture = new CompletableFuture<>();
failoverStrategy.notifyNewVertices(newExecJobVertices);
}
这个方法中的connectToPredecessors
是链接执行节点的关键方法:
public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {
//获取输入的JobEdge列表
List<JobEdge> inputs = jobVertex.getInputs();
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.", jobVertex.getID(), jobVertex.getName(), inputs.size()));
}
// 遍历每条JobEdge
for (int num = 0; num < inputs.size(); num++) {
JobEdge edge = inputs.get(num);
if (LOG.isDebugEnabled()) {
if (edge.getSource() == null) {
LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.",
num, jobVertex.getID(), jobVertex.getName(), edge.getSourceId()));
} else {
LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).",
num, jobVertex.getID(), jobVertex.getName(), edge.getSource().getProducer().getID(), edge.getSource().getProducer().getName()));
}
}
// fetch the intermediate result via ID. if it does not exist, then it either has not been created, or the order
// in which this method is called for the job vertices is not a topological order
// 获取当前JobEdge的输入所对应的IntermediateResult
IntermediateResult ires = intermediateDataSets.get(edge.getSourceId());
if (ires == null) {
throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID "
+ edge.getSourceId());
}
// 将IntermediateResult加入到当前ExecutionJobVertex的输入中。
this.inputs.add(ires);
// 为IntermediateResult注册consumer consumerIndex跟IntermediateResult的出度相关
int consumerIndex = ires.registerConsumer();
for (int i = 0; i < parallelism; i++) {
ExecutionVertex ev = taskVertices[i];
// 将ExecutionVertex与IntermediateResult关联起来
ev.connectSource(num, ires, edge, consumerIndex);
}
}
}
改方法最后通过一个connectSource方法将ExecutionVertex链接起来.有两种链接策略POINT_WISE
和ALL_TO_ALL
- POINTWISE
- Each producing sub task is connected to one or more subtask(s) of the consuming task.
- ALL_TO_ALL
- Each producing sub task is connected to each sub task of the consuming task.
具体来说,如果是非shuffle的操作就是POINTWISE
,否则就是ALL_TO_ALL
public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {
final DistributionPattern pattern = edge.getDistributionPattern();
final IntermediateResultPartition[] sourcePartitions = source.getPartitions();
ExecutionEdge[] edges;
switch (pattern) {
case POINTWISE:
edges = connectPointwise(sourcePartitions, inputNumber);
break;
case ALL_TO_ALL:
edges = connectAllToAll(sourcePartitions, inputNumber);
break;
default:
throw new RuntimeException("Unrecognized distribution pattern.");
}
this.inputEdges[inputNumber] = edges;
// add the consumers to the source
// for now (until the receiver initiated handshake is in place), we need to register the
// edges as the execution graph
for (ExecutionEdge ee : edges) {
ee.getSource().addConsumer(ee, consumerNumber);
}
}
运行到这里的时候,ExecutionGraph
就已经生成了.