寒玉 Blog
  • Home
  • Books
  • About Me
  • Categories
  • Tags
  • Archives

flink四层运行模型


执行计划ExecutionGraph的生成

flink的运行时执行计划为ExecutionGraph,是JobManager运行时候的抽象.

StreamGraph和JobGraph都是在client生成的,但是ExecutionGrap是在JobMaster端生成的.

客户端JobGraph的提交

客户端的JobGraph生成之后,通过LeaderRetrivalService获取JobManager的地址,然后将JobGraph提交给JobManager去执行.flink的核心进程通信是通过Akka来完成的.JobManager,TaskManager都是一个Akka system,所以在提交的时候需要先生成一个客户端actor与JobManager交互,然后执行rpc命令.

下面方法发送一个JobGraph到JobManager.这个方法会一直block直到job返回或者JobManager不在处于alive状态.

public static JobExecutionResult submitJobAndWait(
    ActorSystem actorSystem,
    Configuration config,
    HighAvailabilityServices highAvailabilityServices,
    JobGraph jobGraph,
    FiniteDuration timeout,
    boolean sysoutLogUpdates,
    ClassLoader classLoader) throws JobExecutionException {
    JobListeningContext jobListeningContext = submitJob(
        actorSystem,
        config,
        highAvailabilityServices,
        jobGraph,
        timeout,
        sysoutLogUpdates,
        classLoader);
    return awaitJobResult(jobListeningContext);
}

真正提交Job并且返回JobListeningContext

public static JobListeningContext submitJob(
        ActorSystem actorSystem,
        Configuration config,
        HighAvailabilityServices highAvailabilityServices,
        JobGraph jobGraph,
        FiniteDuration timeout,
        boolean sysoutLogUpdates,
        ClassLoader classLoader) {

    checkNotNull(actorSystem, "The actorSystem must not be null.");
    checkNotNull(highAvailabilityServices, "The high availability services must not be null.");
    checkNotNull(jobGraph, "The jobGraph must not be null.");
    checkNotNull(timeout, "The timeout must not be null.");

    // for this job, we create a proxy JobClientActor that deals with all communication with
    // the JobManager. It forwards the job submission, checks the success/failure responses, logs
    // update messages, watches for disconnect between client and JobManager, ...

    Props jobClientActorProps = JobSubmissionClientActor.createActorProps(
        highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
        timeout,
        sysoutLogUpdates,
        config);

    ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);

    Future<Object> submissionFuture = Patterns.ask(
            jobClientActor,
            new JobClientMessages.SubmitJobAndWait(jobGraph),
            new Timeout(AkkaUtils.INF_TIMEOUT()));

    return new JobListeningContext(
        jobGraph.getJobID(),
        submissionFuture,
        jobClientActor,
        timeout,
        classLoader,
        highAvailabilityServices);
}

提交Job的基本流程如下:

2019-04-08-19-10-30

  • 启动JobClientActor用来和JobManager交互
  • 启动LeaderRetrievalService获取JobManager的地址
  • 上传用户Jar包
  • 提交SubmitJob命令

JobManager执行生成计划

客户端上传完jar包和JobGraph,flink 会进一步解析封装成运行时的执行计划ExecutionGraph

执行计划ExecutionGraph的生成

flink的运行时执行计划为ExecutionGraph,ExecutionGraph对应之前的JobGraph,一个ExecutionGraph包含多个 ExecutionJobVertex节点JobGraph的JobVertex,每个ExecutionJobVertex节点的并发子task对应一个ExecutionVertex,每个ExecutionVertex的一次attempt执行被抽象为一次Execution.

client生成JobGraph之后,就通过submitJob提交至JobMaster。在其构造函数中,会生成ExecutionGraph.下面代码是ExecutionGraph的构造函数.

public ExecutionGraph(
        JobInformation jobInformation,
        ScheduledExecutorService futureExecutor,
        Executor ioExecutor,
        Time rpcTimeout,
        RestartStrategy restartStrategy,
        FailoverStrategy.Factory failoverStrategyFactory,
        SlotProvider slotProvider,
        ClassLoader userClassLoader,
        BlobWriter blobWriter,
        Time allocationTimeout) throws IOException {

    checkNotNull(futureExecutor);

    this.jobInformation = Preconditions.checkNotNull(jobInformation);

    this.blobWriter = Preconditions.checkNotNull(blobWriter);

    this.jobInformationOrBlobKey = BlobWriter.serializeAndTryOffload(jobInformation, jobInformation.getJobId(), blobWriter);

    this.futureExecutor = Preconditions.checkNotNull(futureExecutor);
    this.ioExecutor = Preconditions.checkNotNull(ioExecutor);

    this.slotProvider = Preconditions.checkNotNull(slotProvider, "scheduler");
    this.userClassLoader = Preconditions.checkNotNull(userClassLoader, "userClassLoader");

    this.tasks = new ConcurrentHashMap<>(16);
    this.intermediateResults = new ConcurrentHashMap<>(16);
    this.verticesInCreationOrder = new ArrayList<>(16);
    this.currentExecutions = new ConcurrentHashMap<>(16);

    this.jobStatusListeners  = new CopyOnWriteArrayList<>();
    this.executionListeners = new CopyOnWriteArrayList<>();

    this.stateTimestamps = new long[JobStatus.values().length];
    this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();

    this.rpcTimeout = checkNotNull(rpcTimeout);
    this.allocationTimeout = checkNotNull(allocationTimeout);

    this.restartStrategy = restartStrategy;
    this.kvStateLocationRegistry = new KvStateLocationRegistry(jobInformation.getJobId(), getAllVertices());

    this.verticesFinished = new AtomicInteger();

    this.globalModVersion = 1L;

    // the failover strategy must be instantiated last, so that the execution graph
    // is ready by the time the failover strategy sees it
    this.failoverStrategy = checkNotNull(failoverStrategyFactory.create(this), "null failover strategy");

    this.schedulingFuture = null;
    LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName());
}

从构造函数中可以看到EexcutionGraph为维护Job信息,运行的Task信息,状态切换信息等.

ExecutionGraph的构建是在JobManager的构造函数中.具体方法为createAndRestoreExecutionGraph

this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);

private ExecutionGraph createAndRestoreExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup) throws Exception {
    ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup);

    final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();

    if (checkpointCoordinator != null) {
        // check whether we find a valid checkpoint
        if (!checkpointCoordinator.restoreLatestCheckpointedState(
            newExecutionGraph.getAllVertices(),
            false,
            false)) {

            // check whether we can restore from a savepoint
            tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
        }
    }

    return newExecutionGraph;
}

通过层层调用,真正执行的方法是public static ExecutionGraph buildGraph

checkNotNull(jobGraph, "job graph cannot be null");

final String jobName = jobGraph.getName();
final JobID jobId = jobGraph.getJobID();

final FailoverStrategy.Factory failoverStrategy =
        FailoverStrategyLoader.loadFailoverStrategy(jobManagerConfig, log);

final JobInformation jobInformation = new JobInformation(
    jobId,
    jobName,
    jobGraph.getSerializedExecutionConfig(),
    jobGraph.getJobConfiguration(),
    jobGraph.getUserJarBlobKeys(),
    jobGraph.getClasspaths());

// create a new execution graph, if none exists so far
final ExecutionGraph executionGraph;
try {
    executionGraph = (prior != null) ? prior :
        new ExecutionGraph(
            jobInformation,
            futureExecutor,
            ioExecutor,
            rpcTimeout,
            restartStrategy,
            failoverStrategy,
            slotProvider,
            classLoader,
            blobWriter,
            allocationTimeout);
} catch (IOException e) {
    throw new JobException("Could not create the ExecutionGraph.", e);
}

// set the basic properties

//setScheduleMode schedule mode固定是EAGER的
executionGraph.setScheduleMode(jobGraph.getScheduleMode());
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());

try {
    executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
}
catch (Throwable t) {
    log.warn("Cannot create JSON plan for job", t);
    // give the graph an empty plan
    executionGraph.setJsonPlan("{}");
}

// initialize the vertices that have a master initialization hook
// file output formats create directories here, input formats create splits

final long initMasterStart = System.nanoTime();
log.info("Running initialization on master for job {} ({}).", jobName, jobId);

// 按拓扑顺序,获取所有的JobVertex列表
for (JobVertex vertex : jobGraph.getVertices()) {
    String executableClass = vertex.getInvokableClassName();
    if (executableClass == null || executableClass.isEmpty()) {
        throw new JobSubmissionException(jobId,
                "The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class.");
    }

    if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
        if (parallelismForAutoMax < 0) {
            throw new JobSubmissionException(
                jobId,
                PARALLELISM_AUTO_MAX_ERROR_MESSAGE);
        }
        else {
            vertex.setParallelism(parallelismForAutoMax);
        }
    }

    try {
        vertex.initializeOnMaster(classLoader);
    }
    catch (Throwable t) {
            throw new JobExecutionException(jobId,
                    "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);
    }
}

log.info("Successfully ran initialization on master in {} ms.",
        (System.nanoTime() - initMasterStart) / 1_000_000);

// topologically sort the job vertices and attach the graph to the existing one
List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
if (log.isDebugEnabled()) {
    log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);
}
executionGraph.attachJobGraph(sortedTopology);

最后一行,ExecutionGraph.attachJobGraph方法是比较关键的方法,该方法构建婉节点,生成执行计划DAG:

public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {

    LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " +
            "vertices and {} intermediate results.",
            topologiallySorted.size(), tasks.size(), intermediateResults.size());

    final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologiallySorted.size());
    final long createTimestamp = System.currentTimeMillis();

    //遍历Job vertex
    for (JobVertex jobVertex : topologiallySorted) {

        if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
            this.isStoppable = false;
        }
        // 根据Job vertex创建对应的ExecutionVertex
        // create the execution job vertex and attach it to the graph
        ExecutionJobVertex ejv = new ExecutionJobVertex(
            this,
            jobVertex,
            1,
            rpcTimeout,
            globalModVersion,
            createTimestamp);
        // 将创建的ExecutionJobVertex与前置的IntermediateResult连接起来
        ejv.connectToPredecessors(this.intermediateResults);

        ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
        if (previousTask != null) {
            throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
                    jobVertex.getID(), ejv, previousTask));
        }

        for (IntermediateResult res : ejv.getProducedDataSets()) {
            IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
            if (previousDataSet != null) {
                throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
                        res.getId(), res, previousDataSet));
            }
        }

        this.verticesInCreationOrder.add(ejv);
        this.numVerticesTotal += ejv.getParallelism();
        newExecJobVertices.add(ejv);
    }

    terminationFuture = new CompletableFuture<>();
    failoverStrategy.notifyNewVertices(newExecJobVertices);
}

这个方法中的connectToPredecessors是链接执行节点的关键方法:

public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {

    //获取输入的JobEdge列表
    List<JobEdge> inputs = jobVertex.getInputs();

    if (LOG.isDebugEnabled()) {
        LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.", jobVertex.getID(), jobVertex.getName(), inputs.size()));
    }

    // 遍历每条JobEdge
    for (int num = 0; num < inputs.size(); num++) {
        JobEdge edge = inputs.get(num);

        if (LOG.isDebugEnabled()) {
            if (edge.getSource() == null) {
                LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.",
                        num, jobVertex.getID(), jobVertex.getName(), edge.getSourceId()));
            } else {
                LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).",
                        num, jobVertex.getID(), jobVertex.getName(), edge.getSource().getProducer().getID(), edge.getSource().getProducer().getName()));
            }
        }

        // fetch the intermediate result via ID. if it does not exist, then it either has not been created, or the order
        // in which this method is called for the job vertices is not a topological order
        // 获取当前JobEdge的输入所对应的IntermediateResult
        IntermediateResult ires = intermediateDataSets.get(edge.getSourceId());
        if (ires == null) {
            throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID "
                    + edge.getSourceId());
        }

        // 将IntermediateResult加入到当前ExecutionJobVertex的输入中。
        this.inputs.add(ires);

        // 为IntermediateResult注册consumer consumerIndex跟IntermediateResult的出度相关
        int consumerIndex = ires.registerConsumer();

        for (int i = 0; i < parallelism; i++) {
            ExecutionVertex ev = taskVertices[i];
            // 将ExecutionVertex与IntermediateResult关联起来
            ev.connectSource(num, ires, edge, consumerIndex);
        }
    }
}

改方法最后通过一个connectSource方法将ExecutionVertex链接起来.有两种链接策略POINT_WISE和ALL_TO_ALL

  • POINTWISE
  • Each producing sub task is connected to one or more subtask(s) of the consuming task.
  • ALL_TO_ALL
  • Each producing sub task is connected to each sub task of the consuming task.

具体来说,如果是非shuffle的操作就是POINTWISE,否则就是ALL_TO_ALL

public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {

    final DistributionPattern pattern = edge.getDistributionPattern();
    final IntermediateResultPartition[] sourcePartitions = source.getPartitions();

    ExecutionEdge[] edges;

    switch (pattern) {
        case POINTWISE:
            edges = connectPointwise(sourcePartitions, inputNumber);
            break;

        case ALL_TO_ALL:
            edges = connectAllToAll(sourcePartitions, inputNumber);
            break;

        default:
            throw new RuntimeException("Unrecognized distribution pattern.");

    }

    this.inputEdges[inputNumber] = edges;

    // add the consumers to the source
    // for now (until the receiver initiated handshake is in place), we need to register the
    // edges as the execution graph
    for (ExecutionEdge ee : edges) {
        ee.getSource().addConsumer(ee, consumerNumber);
    }
}

运行到这里的时候,ExecutionGraph就已经生成了.


  • « pcap
  • linux小技巧 »

Published

3 25, 2019

Category

flink

Tags

  • bigdata 12
  • flink 3
  • Powered by Pelican. Theme: Elegant by Talha Mansoor