执行计划ExecutionGraph的生成
flink的运行时执行计划为ExecutionGraph,是JobManager运行时候的抽象.
StreamGraph和JobGraph都是在client生成的,但是ExecutionGrap是在JobMaster端生成的.
客户端JobGraph的提交
客户端的JobGraph生成之后,通过LeaderRetrivalService获取JobManager的地址,然后将JobGraph提交给JobManager去执行.flink的核心进程通信是通过Akka来完成的.JobManager,TaskManager都是一个Akka system,所以在提交的时候需要先生成一个客户端actor与JobManager交互,然后执行rpc命令.
下面方法发送一个JobGraph到JobManager.这个方法会一直block直到job返回或者JobManager不在处于alive状态.
public static JobExecutionResult submitJobAndWait(
ActorSystem actorSystem,
Configuration config,
HighAvailabilityServices highAvailabilityServices,
JobGraph jobGraph,
FiniteDuration timeout,
boolean sysoutLogUpdates,
ClassLoader classLoader) throws JobExecutionException {
JobListeningContext jobListeningContext = submitJob(
actorSystem,
config,
highAvailabilityServices,
jobGraph,
timeout,
sysoutLogUpdates,
classLoader);
return awaitJobResult(jobListeningContext);
}
真正提交Job并且返回JobListeningContext
public static JobListeningContext submitJob(
ActorSystem actorSystem,
Configuration config,
HighAvailabilityServices highAvailabilityServices,
JobGraph jobGraph,
FiniteDuration timeout,
boolean sysoutLogUpdates,
ClassLoader classLoader) {
checkNotNull(actorSystem, "The actorSystem must not be null.");
checkNotNull(highAvailabilityServices, "The high availability services must not be null.");
checkNotNull(jobGraph, "The jobGraph must not be null.");
checkNotNull(timeout, "The timeout must not be null.");
// for this job, we create a proxy JobClientActor that deals with all communication with
// the JobManager. It forwards the job submission, checks the success/failure responses, logs
// update messages, watches for disconnect between client and JobManager, ...
Props jobClientActorProps = JobSubmissionClientActor.createActorProps(
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
timeout,
sysoutLogUpdates,
config);
ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
Future<Object> submissionFuture = Patterns.ask(
jobClientActor,
new JobClientMessages.SubmitJobAndWait(jobGraph),
new Timeout(AkkaUtils.INF_TIMEOUT()));
return new JobListeningContext(
jobGraph.getJobID(),
submissionFuture,
jobClientActor,
timeout,
classLoader,
highAvailabilityServices);
}
提交Job的基本流程如下:

- 启动
JobClientActor用来和JobManager交互 - 启动
LeaderRetrievalService获取JobManager的地址 - 上传用户Jar包
- 提交SubmitJob命令
JobManager执行生成计划
客户端上传完jar包和JobGraph,flink 会进一步解析封装成运行时的执行计划ExecutionGraph
执行计划ExecutionGraph的生成
flink的运行时执行计划为ExecutionGraph,ExecutionGraph对应之前的JobGraph,一个ExecutionGraph包含多个 ExecutionJobVertex节点JobGraph的JobVertex,每个ExecutionJobVertex节点的并发子task对应一个ExecutionVertex,每个ExecutionVertex的一次attempt执行被抽象为一次Execution.
client生成JobGraph之后,就通过submitJob提交至JobMaster。在其构造函数中,会生成ExecutionGraph.下面代码是ExecutionGraph的构造函数.
public ExecutionGraph(
JobInformation jobInformation,
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
Time rpcTimeout,
RestartStrategy restartStrategy,
FailoverStrategy.Factory failoverStrategyFactory,
SlotProvider slotProvider,
ClassLoader userClassLoader,
BlobWriter blobWriter,
Time allocationTimeout) throws IOException {
checkNotNull(futureExecutor);
this.jobInformation = Preconditions.checkNotNull(jobInformation);
this.blobWriter = Preconditions.checkNotNull(blobWriter);
this.jobInformationOrBlobKey = BlobWriter.serializeAndTryOffload(jobInformation, jobInformation.getJobId(), blobWriter);
this.futureExecutor = Preconditions.checkNotNull(futureExecutor);
this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
this.slotProvider = Preconditions.checkNotNull(slotProvider, "scheduler");
this.userClassLoader = Preconditions.checkNotNull(userClassLoader, "userClassLoader");
this.tasks = new ConcurrentHashMap<>(16);
this.intermediateResults = new ConcurrentHashMap<>(16);
this.verticesInCreationOrder = new ArrayList<>(16);
this.currentExecutions = new ConcurrentHashMap<>(16);
this.jobStatusListeners = new CopyOnWriteArrayList<>();
this.executionListeners = new CopyOnWriteArrayList<>();
this.stateTimestamps = new long[JobStatus.values().length];
this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
this.rpcTimeout = checkNotNull(rpcTimeout);
this.allocationTimeout = checkNotNull(allocationTimeout);
this.restartStrategy = restartStrategy;
this.kvStateLocationRegistry = new KvStateLocationRegistry(jobInformation.getJobId(), getAllVertices());
this.verticesFinished = new AtomicInteger();
this.globalModVersion = 1L;
// the failover strategy must be instantiated last, so that the execution graph
// is ready by the time the failover strategy sees it
this.failoverStrategy = checkNotNull(failoverStrategyFactory.create(this), "null failover strategy");
this.schedulingFuture = null;
LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName());
}
从构造函数中可以看到EexcutionGraph为维护Job信息,运行的Task信息,状态切换信息等.
ExecutionGraph的构建是在JobManager的构造函数中.具体方法为createAndRestoreExecutionGraph
this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
private ExecutionGraph createAndRestoreExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup) throws Exception {
ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup);
final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();
if (checkpointCoordinator != null) {
// check whether we find a valid checkpoint
if (!checkpointCoordinator.restoreLatestCheckpointedState(
newExecutionGraph.getAllVertices(),
false,
false)) {
// check whether we can restore from a savepoint
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
}
}
return newExecutionGraph;
}
通过层层调用,真正执行的方法是public static ExecutionGraph buildGraph
checkNotNull(jobGraph, "job graph cannot be null");
final String jobName = jobGraph.getName();
final JobID jobId = jobGraph.getJobID();
final FailoverStrategy.Factory failoverStrategy =
FailoverStrategyLoader.loadFailoverStrategy(jobManagerConfig, log);
final JobInformation jobInformation = new JobInformation(
jobId,
jobName,
jobGraph.getSerializedExecutionConfig(),
jobGraph.getJobConfiguration(),
jobGraph.getUserJarBlobKeys(),
jobGraph.getClasspaths());
// create a new execution graph, if none exists so far
final ExecutionGraph executionGraph;
try {
executionGraph = (prior != null) ? prior :
new ExecutionGraph(
jobInformation,
futureExecutor,
ioExecutor,
rpcTimeout,
restartStrategy,
failoverStrategy,
slotProvider,
classLoader,
blobWriter,
allocationTimeout);
} catch (IOException e) {
throw new JobException("Could not create the ExecutionGraph.", e);
}
// set the basic properties
//setScheduleMode schedule mode固定是EAGER的
executionGraph.setScheduleMode(jobGraph.getScheduleMode());
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
try {
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
}
catch (Throwable t) {
log.warn("Cannot create JSON plan for job", t);
// give the graph an empty plan
executionGraph.setJsonPlan("{}");
}
// initialize the vertices that have a master initialization hook
// file output formats create directories here, input formats create splits
final long initMasterStart = System.nanoTime();
log.info("Running initialization on master for job {} ({}).", jobName, jobId);
// 按拓扑顺序,获取所有的JobVertex列表
for (JobVertex vertex : jobGraph.getVertices()) {
String executableClass = vertex.getInvokableClassName();
if (executableClass == null || executableClass.isEmpty()) {
throw new JobSubmissionException(jobId,
"The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class.");
}
if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
if (parallelismForAutoMax < 0) {
throw new JobSubmissionException(
jobId,
PARALLELISM_AUTO_MAX_ERROR_MESSAGE);
}
else {
vertex.setParallelism(parallelismForAutoMax);
}
}
try {
vertex.initializeOnMaster(classLoader);
}
catch (Throwable t) {
throw new JobExecutionException(jobId,
"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);
}
}
log.info("Successfully ran initialization on master in {} ms.",
(System.nanoTime() - initMasterStart) / 1_000_000);
// topologically sort the job vertices and attach the graph to the existing one
List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
if (log.isDebugEnabled()) {
log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);
}
executionGraph.attachJobGraph(sortedTopology);
最后一行,ExecutionGraph.attachJobGraph方法是比较关键的方法,该方法构建婉节点,生成执行计划DAG:
public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " +
"vertices and {} intermediate results.",
topologiallySorted.size(), tasks.size(), intermediateResults.size());
final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologiallySorted.size());
final long createTimestamp = System.currentTimeMillis();
//遍历Job vertex
for (JobVertex jobVertex : topologiallySorted) {
if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
this.isStoppable = false;
}
// 根据Job vertex创建对应的ExecutionVertex
// create the execution job vertex and attach it to the graph
ExecutionJobVertex ejv = new ExecutionJobVertex(
this,
jobVertex,
1,
rpcTimeout,
globalModVersion,
createTimestamp);
// 将创建的ExecutionJobVertex与前置的IntermediateResult连接起来
ejv.connectToPredecessors(this.intermediateResults);
ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
if (previousTask != null) {
throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
jobVertex.getID(), ejv, previousTask));
}
for (IntermediateResult res : ejv.getProducedDataSets()) {
IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
if (previousDataSet != null) {
throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
res.getId(), res, previousDataSet));
}
}
this.verticesInCreationOrder.add(ejv);
this.numVerticesTotal += ejv.getParallelism();
newExecJobVertices.add(ejv);
}
terminationFuture = new CompletableFuture<>();
failoverStrategy.notifyNewVertices(newExecJobVertices);
}
这个方法中的connectToPredecessors是链接执行节点的关键方法:
public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {
//获取输入的JobEdge列表
List<JobEdge> inputs = jobVertex.getInputs();
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.", jobVertex.getID(), jobVertex.getName(), inputs.size()));
}
// 遍历每条JobEdge
for (int num = 0; num < inputs.size(); num++) {
JobEdge edge = inputs.get(num);
if (LOG.isDebugEnabled()) {
if (edge.getSource() == null) {
LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.",
num, jobVertex.getID(), jobVertex.getName(), edge.getSourceId()));
} else {
LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).",
num, jobVertex.getID(), jobVertex.getName(), edge.getSource().getProducer().getID(), edge.getSource().getProducer().getName()));
}
}
// fetch the intermediate result via ID. if it does not exist, then it either has not been created, or the order
// in which this method is called for the job vertices is not a topological order
// 获取当前JobEdge的输入所对应的IntermediateResult
IntermediateResult ires = intermediateDataSets.get(edge.getSourceId());
if (ires == null) {
throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID "
+ edge.getSourceId());
}
// 将IntermediateResult加入到当前ExecutionJobVertex的输入中。
this.inputs.add(ires);
// 为IntermediateResult注册consumer consumerIndex跟IntermediateResult的出度相关
int consumerIndex = ires.registerConsumer();
for (int i = 0; i < parallelism; i++) {
ExecutionVertex ev = taskVertices[i];
// 将ExecutionVertex与IntermediateResult关联起来
ev.connectSource(num, ires, edge, consumerIndex);
}
}
}
改方法最后通过一个connectSource方法将ExecutionVertex链接起来.有两种链接策略POINT_WISE和ALL_TO_ALL
- POINTWISE
- Each producing sub task is connected to one or more subtask(s) of the consuming task.
- ALL_TO_ALL
- Each producing sub task is connected to each sub task of the consuming task.
具体来说,如果是非shuffle的操作就是POINTWISE,否则就是ALL_TO_ALL
public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {
final DistributionPattern pattern = edge.getDistributionPattern();
final IntermediateResultPartition[] sourcePartitions = source.getPartitions();
ExecutionEdge[] edges;
switch (pattern) {
case POINTWISE:
edges = connectPointwise(sourcePartitions, inputNumber);
break;
case ALL_TO_ALL:
edges = connectAllToAll(sourcePartitions, inputNumber);
break;
default:
throw new RuntimeException("Unrecognized distribution pattern.");
}
this.inputEdges[inputNumber] = edges;
// add the consumers to the source
// for now (until the receiver initiated handshake is in place), we need to register the
// edges as the execution graph
for (ExecutionEdge ee : edges) {
ee.getSource().addConsumer(ee, consumerNumber);
}
}
运行到这里的时候,ExecutionGraph就已经生成了.