YARN : FairScheduler深入解析(NodeUpdate、assignContainer)

2 篇文章 0 订阅

一、概要

首先,YARN FairScheduler主要做的事情:
① 处理NM心跳NodeUpdate,分配container。
② 树状维护队列和任务,定时计算fair share等信息,并进行排序。

本文重点分析①

二、代码

1、流程框架

① FairScheduler接收心跳

  public void handle(SchedulerEvent event) {
    switch (event.getType()) {
    ....
    case NODE_UPDATE:
      if (!(event instanceof NodeUpdateSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      // 入口:接收并开始处理Node Update事件
      NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
      nodeUpdate(nodeUpdatedEvent.getRMNode());
      break;
    ....
    default:
      LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
    }
  }

② FairScheduler.nodeUpdate

  /**
   * 处理一个NM发来的心跳
   */
  private synchronized void nodeUpdate(RMNode nm) {
    long start = getClock().getTime();
    if (LOG.isDebugEnabled()) {
      LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource);
    }
    eventLog.log("HEARTBEAT", nm.getHostName());

    // 获取发来请求的NM信息
    FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());

    // 获取该NM上container运行情况
    List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
    List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
    List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
    // 筛出在上次心跳到本次心跳的间隔内,新启动的container和完成关闭的container
    for(UpdatedContainerInfo containerInfo : containerInfoList) {
      newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
      completedContainers.addAll(containerInfo.getCompletedContainers());
    } 
    // 更新新启动的container信息,将该Container从超时监控队列中删除
    // 每当RM分配一个container后,为了防止AM长时间不用这个container造成资源浪费,
    // 会将该container加入到超时队列中,一段时间不用就会被回收。
    for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
      containerLaunchedOnNode(launchedContainer.getContainerId(), node);
    }

    // 更新完成关闭的container信息,更新集群资源信息
    for (ContainerStatus completedContainer : completedContainers) {
      ContainerId containerId = completedContainer.getContainerId();
      LOG.debug("Container FINISHED: " + containerId);
      completedContainer(getRMContainer(containerId),
          completedContainer, RMContainerEventType.FINISHED);
    }

    // 获取并更新container资源使用情况
    List<ContainerResourceToReport> containerResourceToReports = nm.pullContainerResourceUpdates();
    for (ContainerResourceToReport containerResource : containerResourceToReports) {
      updateContainerResource(containerResource);
    }

    if (continuousSchedulingEnabled) {
      // 持续调度,暂不介绍
      if (!balanceSchedulingEnabled && !completedContainers.isEmpty()) {
        attemptScheduling(node);
      }
    } else {
      // 核心分配container方法
      attemptScheduling(node);
    }

    // metrics方法运行耗时统计
    long duration = getClock().getTime() - start;
    fsOpDurations.addNodeUpdateDuration(duration);
  }

细节展开:
处理上个心跳周期内新启动的container:
containerLaunchedOnNode

  protected synchronized void containerLaunchedOnNode(
      ContainerId containerId, SchedulerNode node) {
    // 获取container对应的application信息
    SchedulerApplicationAttempt application = getCurrentAttemptForContainer
        (containerId);
    if (application == null) {
      LOG.info("Unknown application "
          + containerId.getApplicationAttemptId().getApplicationId()
          + " launched container " + containerId + " on node: " + node);
      this.rmContext.getDispatcher().getEventHandler()
        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
      return;
    }
    // application依然存活
    application.containerLaunchedOnNode(containerId, node.getNodeID());
  }

--containerLaunchedOnNode-->>>
  public synchronized void containerLaunchedOnNode(ContainerId containerId,
      NodeId nodeId) {
    // 获取RM上该container信息
    RMContainer rmContainer = getRMContainer(containerId);
    if (rmContainer == null) {
      // 校验该container是否在RM中有记录,没有则kill
      rmContext.getDispatcher().getEventHandler()
        .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
      return;
    }

    // 发送事件launched
    rmContainer.handle(new RMContainerEvent(containerId,
        RMContainerEventType.LAUNCHED));
  }

③ FairScheduler.attemptScheduling

  synchronized boolean attemptScheduling(FSSchedulerNode node) {
    boolean hasAssigned = false;
    if (rmContext.isWorkPreservingRecoveryEnabled()
        && !rmContext.isSchedulerReadyForAllocatingContainers()) {
      // 没准备好分配container,直接false
      return hasAssigned;
    }

    final NodeId nodeID = node.getNodeID();
    // 节点丢了,直接false
    if (!nodes.containsKey(nodeID)) {
      // The node might have just been removed while this thread was waiting
      // on the synchronized lock before it entered this synchronized method
      LOG.info("Skipping scheduling as the node " + nodeID +
          " has been removed");
      return hasAssigned;
    }

    // Assign new containers...
    // 1. Check for reserved applications
    // 2. Schedule if there are no reservations

    FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
    // 有预留,先尝试给reserved分配,分配不了就等下次心跳,
    // 只有reserved分配完了才能继续给其他application分配container
    if (reservedAppSchedulable != null) {
      SchedulerRequestKey schedulerKey = node.getReservedContainer().getReservedSchedulerKey();
      FSQueue queue = reservedAppSchedulable.getQueue();

      // 同时满足locality和剩余资源,当前NM满足分配container条件
      // 以及当前队列已分配+准备分配的资源不超过该队列最近一次计算出来的最大fair share
      if (!reservedAppSchedulable.hasContainerForNode(schedulerKey, node)
          || !fitsInMaxShare(queue,
          node.getReservedContainer().getReservedResource())) {
        // Don't hold the reservation if app can no longer use it
        LOG.info("Releasing reservation that cannot be satisfied for application "
            + reservedAppSchedulable.getApplicationAttemptId()
            + " on node " + node);
        // 不满足上述俩条件之一,说明该节点满了或者队列满了,释放reserved
        reservedAppSchedulable.unreserve(schedulerKey, node);
        reservedAppSchedulable = null;
      } else {
        // Reservation exists; try to fulfill the reservation
        if (LOG.isDebugEnabled()) {
          LOG.debug("Trying to fulfill reservation for application "
              + reservedAppSchedulable.getApplicationAttemptId()
              + " on node: " + node);
        }
        // 尝试分配container或继续reserve资源
        Resource assignedReservedContainer = node.getReservedAppSchedulable().assignReservedContainer(node);
        hasAssigned = (assignedReservedContainer != Resources.none());
      }
    }
    // 没有reserve的情况,正常分配container
    if (reservedAppSchedulable == null) {
      // No reservation, schedule at queue which is farthest below fair share
      int assignedContainers = 0;
      Resource assignedResource = Resources.clone(Resources.none());
      // 最多一次只能分配一个NM剩余可用资源50%
      Resource maxResourcesToAssign =
          Resources.multiply(node.getAvailableResource(), 0.5f);
      // 没有预留且该NM剩余资源>=单次分配最小资源,即尽量拿完该NM的资源
      while (node.getReservedContainer() == null &&
          Resources.fitsIn(minimumAllocation, node.getAvailableResource())) {
        boolean assignedContainer = false;
        long start = System.nanoTime();
        // 在该NM上向childQueues中的队列,按照顺序依次分配container,直到不够用reserve
        // 重点方法assignContainer会在下文中着重分析
        Resource assignment = queueMgr.getRootQueue().assignContainer(node);
        // 成功分配了资源,计算增减,统计耗时
        if (!assignment.equals(Resources.none())) {
          assignedContainers++;
          assignedContainer = true;
          hasAssigned = true;
          Resources.addTo(assignedResource, assignment);
          long end = System.nanoTime();
          fsOpDurations.addAssignContainerCallDuration((end - start)/1000);
        }
        // 没分配container,说明该NM没资源或者没有pending container,没必要继续循环了
        if (!assignedContainer) { break; }
        // 没开启assignMultiple,该NM分配一次就溜了
        // 该NM分配的container数量达到设置的单次心跳分配个数上限maxAssign,也溜了
        // 否则就继续循环分配
        if (!shouldContinueAssigning(assignedContainers,
            maxResourcesToAssign, assignedResource)) {
          break;
        }
      }
      if (LOG.isDebugEnabled()) {
        LOG.debug("Schedule node:" + node.getNodeName() + "; " +
            "Max resources to assign:" + maxResourcesToAssign + "; " + 
            "Assign container numbers:" + assignedContainers);
      }
    }
    updateRootQueueMetrics();
    return hasAssigned;
  }

④ FSParentQueue.assignContainer

public Resource assignContainer(FSSchedulerNode node) {
    Resource assigned = Resources.none();

    // 已申请+即将申请的资源总和超过队列上限或者有reserved container
    if (!assignContainerPreCheck(node)) {
      LOG.info("Cannot assign container to queue: " + getQueueName() +
          ", for it usage resource over its limit.");
      return assigned;
    }
  
    boolean isRoundEnd = false;
    readLock.lock();
    // pendingChildQueues是由childQueues filter出来有pending资源的集合
    // 该优化还未提交到社区,可以将其视为childQueues
    int childQueuesSize = pendingChildQueues.size();
    int preScheduleIndex = createScheduleIndex(childQueuesSize);
    try {
      // 从排序好的队列头开始进行分配container,逐个满足资源申请需求
      while (getCurrentScheduleIndex() < childQueuesSize) {
        FSQueue child = pendingChildQueues.get(getCurrentScheduleIndex());
        // 扫到一个demand为none的,则后面均为none(由队列排序决定,demand少则排在后面)
        if (child.getDemand().equals(Resources.none())) {
          incCurrentScheduleIndex();
          isRoundEnd = true;
          break;
        }
        // 给子队列/application分配container
        assigned = child.assignContainer(node);
        // 分配了个空气,同理停下
        if (!Resources.equals(assigned, Resources.none())) {
          break;
        }
        // 队列指针后移
        incCurrentScheduleIndex();
      }
      // 满足了pendingChildQueues所有需求
      if (!isRoundEnd && getCurrentScheduleIndex() >= childQueuesSize) {
        isRoundEnd = true;
      }
    } finally {
      readLock.unlock();
    }
  
    // 一轮分配所有pendingChildQueues的demand都被满足,重置指针至队首
    if (parent == null && isRoundEnd) {
      resetCurrentScheduleIndex();
    }
  
    if (LOG.isDebugEnabled()) {
      LOG.debug("Node " + node.getNodeName() +
          ", offered to queue: " + getName() +
          ", prev schedule index:" + preScheduleIndex +
          ", current schedule index:" + getCurrentScheduleIndex() +
          ", child queue count:" + childQueuesSize +
          ", assigned:" + assigned);
    }

    return assigned;
  }

⑤ FSLeafQueue.assignContainer

public Resource assignContainer(FSSchedulerNode node) {
    Resource assigned = Resources.none();
    // 同FSParentQueue.assignContainer,增加了NM的label校验,不同label不分配直接返回
    if (!nodeLabelCheck(node.getNodeID()) || !assignContainerPreCheck(node)) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Node: " + node.getNodeID() +
            " cannot assigned to queue: " + getQueueName() +
            " for resource usage may over its limits or node label does not match.");
      }
      return assigned;
    }
  
    readLock.lock();
    int runnableAppsSize = pendingRunnableApps.size();
    int preScheduleIndex = createScheduleIndex(runnableAppsSize);
    try {
      for ( ; getCurrentScheduleIndex() < runnableAppsSize; incCurrentScheduleIndex()) {
        FSAppAttempt appAttempt = pendingRunnableApps.get(getCurrentScheduleIndex());
        // 过滤RM中没有记录的请求
        if (!appAttempt.isInRMContext()) {
          LOG.error(appAttempt.getApplicationId()
              + " is not in rmContext, this is not expected");
          continue;
        }
        // NM在该application的黑名单中,不分配
        if (SchedulerAppUtils.isBlacklisted(appAttempt, node, LOG)) {
          continue;
        }
        Resource pending = appAttempt.getAppAttemptResourceUsage().getPending();
        if (LOG.isDebugEnabled()) {
          LOG.debug("Assign container on: " + node.getNodeName()
              + ", app attempt id:" + appAttempt.getName()
              + ", app demand:" + appAttempt.getDemand()
              + ", app resource usage:" + appAttempt.getResourceUsage()
              + ", pending:" + pending);
        }
        // pending不为none,开始分配
        if (!pending.equals(Resources.none())) {
          assigned = appAttempt.assignContainer(node);
          if (!assigned.equals(Resources.none())) {
            // 分配结果为none,同理无需进行后续分配
            break;
          }
        }
      }
    } finally {
      readLock.unlock();
    }
  
    if (LOG.isDebugEnabled()) {
      LOG.debug("Node " + node.getNodeName() +
          ", offered to queue: " + getName() +
          ", prev schedule index:" + preScheduleIndex +
          ", current schedule index:" + getCurrentScheduleIndex() +
          ", runnable apps count:" + runnableAppsSize +
          ", assigned:" + assigned);
    }
    
    return assigned;
  }

⑥ FSAppAttempt.assignContainer

  public Resource assignContainer(FSSchedulerNode node) {
    // AM占用总资源超过队列限制,默认AM总资源不超过队列maxShare * 0.5f
    if (isOverAMShareLimit()) {
      return Resources.none();
    }
    return assignContainer(node, false);
  }

内层assignContainer先获取locality,分为三种:
在这里插入图片描述
字面意思,很好理解。

再内层assignContainer:

private Resource assignContainer(
      FSSchedulerNode node, ResourceRequest request, NodeType type,
      boolean reserved, SchedulerRequestKey schedulerKey) {

    // 校验label
    if (!SchedulerUtils.checkNodeLabelExpression(node.getLabels(),
        request.getNodeLabelExpression())) {
      return Resources.none();
    }

    // 本次请求需要资源
    Resource capability = request.getCapability();

    // 当前NM空闲资源
    Resource available = node.getAvailableResource();

    Container container = null;
    // 有reserved则分配reserved container,
    // 没有reserved则根据capability和schedulerKey new一个
    if (reserved) {
      container = node.getReservedContainer().getContainer();
    } else {
      container = createContainer(node, capability, schedulerKey);
    }

    // 请求资源 <= NM空闲资源
    if (Resources.fitsIn(capability, available)) {
      // 分配,发送start事件
      RMContainer allocatedContainer =
          allocate(type, node, schedulerKey, request, container);
      // 其他线程已经分配足够的container了,此次不分配
      if (allocatedContainer == null) {
        // 清空reserved
        if (reserved) {
          unreserve(schedulerKey, node);
        }
        return Resources.none();
      }

      // 清空reserved
      if (reserved) {
        unreserve(schedulerKey, node);
      }

      // 更新该节点的信息(container数,可用资源等)
      node.allocateContainer(allocatedContainer);
  
      // AM还没起,第一个container就是AM,配置AM信息
      if (!isAmRunning() && !getUnmanagedAM()) {
        setAMResource(container.getResource());
        getQueue().addAMResourceUsage(container.getResource());
        setAmRunning(true);
      }

      return container.getResource();
    } else {
      // 资源不够分配

      // 队列不满足资源max share限制,则不分配
      if (!FairScheduler.fitsInMaxShare(getQueue(), capability)) {
        return Resources.none();
      }

      // 资源不够分配,则进行reserve
      // reserve具体大致是先判断是否已经reserve,没有则new一个新的RMContainer
      // 已经reserve则把此次资源add到currentReservation中,并发送reserve事件
      if (reserve(schedulerKey, node, container, type, reserved)) {
        return FairScheduler.CONTAINER_RESERVED;
      } else {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Couldn't create reservation for app:  " + getName()
              + ", at priority " +  request.getPriority());
        }
        return Resources.none();
      }
    }
  }

三、总结

本文介绍了FairScheduler处理NM心跳NodeUpdate,分配container的代码,另一部分在之前的文章 树状维护队列和任务,定时计算fair share等信息,并进行排序。

总的来说YARN这块代码看起来还是比较简单清晰的,之后会出CapacityScheduler相关的文章。

  • 0
    点赞
  • 1
    收藏
    觉得还不错? 一键收藏
  • 3
    评论

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论 3
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值