一、概要
首先,YARN FairScheduler主要做的事情:
① 处理NM心跳NodeUpdate,分配container。
② 树状维护队列和任务,定时计算fair share等信息,并进行排序。
本文重点分析①
二、代码
1、流程框架
① FairScheduler接收心跳
public void handle(SchedulerEvent event) {
switch (event.getType()) {
....
case NODE_UPDATE:
if (!(event instanceof NodeUpdateSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
// 入口:接收并开始处理Node Update事件
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
nodeUpdate(nodeUpdatedEvent.getRMNode());
break;
....
default:
LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
}
}
② FairScheduler.nodeUpdate
/**
* 处理一个NM发来的心跳
*/
private synchronized void nodeUpdate(RMNode nm) {
long start = getClock().getTime();
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource);
}
eventLog.log("HEARTBEAT", nm.getHostName());
// 获取发来请求的NM信息
FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
// 获取该NM上container运行情况
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
// 筛出在上次心跳到本次心跳的间隔内,新启动的container和完成关闭的container
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// 更新新启动的container信息,将该Container从超时监控队列中删除
// 每当RM分配一个container后,为了防止AM长时间不用这个container造成资源浪费,
// 会将该container加入到超时队列中,一段时间不用就会被回收。
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// 更新完成关闭的container信息,更新集群资源信息
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
// 获取并更新container资源使用情况
List<ContainerResourceToReport> containerResourceToReports = nm.pullContainerResourceUpdates();
for (ContainerResourceToReport containerResource : containerResourceToReports) {
updateContainerResource(containerResource);
}
if (continuousSchedulingEnabled) {
// 持续调度,暂不介绍
if (!balanceSchedulingEnabled && !completedContainers.isEmpty()) {
attemptScheduling(node);
}
} else {
// 核心分配container方法
attemptScheduling(node);
}
// metrics方法运行耗时统计
long duration = getClock().getTime() - start;
fsOpDurations.addNodeUpdateDuration(duration);
}
细节展开:
处理上个心跳周期内新启动的container:
containerLaunchedOnNode
protected synchronized void containerLaunchedOnNode(
ContainerId containerId, SchedulerNode node) {
// 获取container对应的application信息
SchedulerApplicationAttempt application = getCurrentAttemptForContainer
(containerId);
if (application == null) {
LOG.info("Unknown application "
+ containerId.getApplicationAttemptId().getApplicationId()
+ " launched container " + containerId + " on node: " + node);
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
return;
}
// application依然存活
application.containerLaunchedOnNode(containerId, node.getNodeID());
}
--containerLaunchedOnNode-->>>
public synchronized void containerLaunchedOnNode(ContainerId containerId,
NodeId nodeId) {
// 获取RM上该container信息
RMContainer rmContainer = getRMContainer(containerId);
if (rmContainer == null) {
// 校验该container是否在RM中有记录,没有则kill
rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(nodeId, containerId));
return;
}
// 发送事件launched
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.LAUNCHED));
}
③ FairScheduler.attemptScheduling
synchronized boolean attemptScheduling(FSSchedulerNode node) {
boolean hasAssigned = false;
if (rmContext.isWorkPreservingRecoveryEnabled()
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
// 没准备好分配container,直接false
return hasAssigned;
}
final NodeId nodeID = node.getNodeID();
// 节点丢了,直接false
if (!nodes.containsKey(nodeID)) {
// The node might have just been removed while this thread was waiting
// on the synchronized lock before it entered this synchronized method
LOG.info("Skipping scheduling as the node " + nodeID +
" has been removed");
return hasAssigned;
}
// Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations
FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
// 有预留,先尝试给reserved分配,分配不了就等下次心跳,
// 只有reserved分配完了才能继续给其他application分配container
if (reservedAppSchedulable != null) {
SchedulerRequestKey schedulerKey = node.getReservedContainer().getReservedSchedulerKey();
FSQueue queue = reservedAppSchedulable.getQueue();
// 同时满足locality和剩余资源,当前NM满足分配container条件
// 以及当前队列已分配+准备分配的资源不超过该队列最近一次计算出来的最大fair share
if (!reservedAppSchedulable.hasContainerForNode(schedulerKey, node)
|| !fitsInMaxShare(queue,
node.getReservedContainer().getReservedResource())) {
// Don't hold the reservation if app can no longer use it
LOG.info("Releasing reservation that cannot be satisfied for application "
+ reservedAppSchedulable.getApplicationAttemptId()
+ " on node " + node);
// 不满足上述俩条件之一,说明该节点满了或者队列满了,释放reserved
reservedAppSchedulable.unreserve(schedulerKey, node);
reservedAppSchedulable = null;
} else {
// Reservation exists; try to fulfill the reservation
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to fulfill reservation for application "
+ reservedAppSchedulable.getApplicationAttemptId()
+ " on node: " + node);
}
// 尝试分配container或继续reserve资源
Resource assignedReservedContainer = node.getReservedAppSchedulable().assignReservedContainer(node);
hasAssigned = (assignedReservedContainer != Resources.none());
}
}
// 没有reserve的情况,正常分配container
if (reservedAppSchedulable == null) {
// No reservation, schedule at queue which is farthest below fair share
int assignedContainers = 0;
Resource assignedResource = Resources.clone(Resources.none());
// 最多一次只能分配一个NM剩余可用资源50%
Resource maxResourcesToAssign =
Resources.multiply(node.getAvailableResource(), 0.5f);
// 没有预留且该NM剩余资源>=单次分配最小资源,即尽量拿完该NM的资源
while (node.getReservedContainer() == null &&
Resources.fitsIn(minimumAllocation, node.getAvailableResource())) {
boolean assignedContainer = false;
long start = System.nanoTime();
// 在该NM上向childQueues中的队列,按照顺序依次分配container,直到不够用reserve
// 重点方法assignContainer会在下文中着重分析
Resource assignment = queueMgr.getRootQueue().assignContainer(node);
// 成功分配了资源,计算增减,统计耗时
if (!assignment.equals(Resources.none())) {
assignedContainers++;
assignedContainer = true;
hasAssigned = true;
Resources.addTo(assignedResource, assignment);
long end = System.nanoTime();
fsOpDurations.addAssignContainerCallDuration((end - start)/1000);
}
// 没分配container,说明该NM没资源或者没有pending container,没必要继续循环了
if (!assignedContainer) { break; }
// 没开启assignMultiple,该NM分配一次就溜了
// 该NM分配的container数量达到设置的单次心跳分配个数上限maxAssign,也溜了
// 否则就继续循环分配
if (!shouldContinueAssigning(assignedContainers,
maxResourcesToAssign, assignedResource)) {
break;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Schedule node:" + node.getNodeName() + "; " +
"Max resources to assign:" + maxResourcesToAssign + "; " +
"Assign container numbers:" + assignedContainers);
}
}
updateRootQueueMetrics();
return hasAssigned;
}
④ FSParentQueue.assignContainer
public Resource assignContainer(FSSchedulerNode node) {
Resource assigned = Resources.none();
// 已申请+即将申请的资源总和超过队列上限或者有reserved container
if (!assignContainerPreCheck(node)) {
LOG.info("Cannot assign container to queue: " + getQueueName() +
", for it usage resource over its limit.");
return assigned;
}
boolean isRoundEnd = false;
readLock.lock();
// pendingChildQueues是由childQueues filter出来有pending资源的集合
// 该优化还未提交到社区,可以将其视为childQueues
int childQueuesSize = pendingChildQueues.size();
int preScheduleIndex = createScheduleIndex(childQueuesSize);
try {
// 从排序好的队列头开始进行分配container,逐个满足资源申请需求
while (getCurrentScheduleIndex() < childQueuesSize) {
FSQueue child = pendingChildQueues.get(getCurrentScheduleIndex());
// 扫到一个demand为none的,则后面均为none(由队列排序决定,demand少则排在后面)
if (child.getDemand().equals(Resources.none())) {
incCurrentScheduleIndex();
isRoundEnd = true;
break;
}
// 给子队列/application分配container
assigned = child.assignContainer(node);
// 分配了个空气,同理停下
if (!Resources.equals(assigned, Resources.none())) {
break;
}
// 队列指针后移
incCurrentScheduleIndex();
}
// 满足了pendingChildQueues所有需求
if (!isRoundEnd && getCurrentScheduleIndex() >= childQueuesSize) {
isRoundEnd = true;
}
} finally {
readLock.unlock();
}
// 一轮分配所有pendingChildQueues的demand都被满足,重置指针至队首
if (parent == null && isRoundEnd) {
resetCurrentScheduleIndex();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Node " + node.getNodeName() +
", offered to queue: " + getName() +
", prev schedule index:" + preScheduleIndex +
", current schedule index:" + getCurrentScheduleIndex() +
", child queue count:" + childQueuesSize +
", assigned:" + assigned);
}
return assigned;
}
⑤ FSLeafQueue.assignContainer
public Resource assignContainer(FSSchedulerNode node) {
Resource assigned = Resources.none();
// 同FSParentQueue.assignContainer,增加了NM的label校验,不同label不分配直接返回
if (!nodeLabelCheck(node.getNodeID()) || !assignContainerPreCheck(node)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Node: " + node.getNodeID() +
" cannot assigned to queue: " + getQueueName() +
" for resource usage may over its limits or node label does not match.");
}
return assigned;
}
readLock.lock();
int runnableAppsSize = pendingRunnableApps.size();
int preScheduleIndex = createScheduleIndex(runnableAppsSize);
try {
for ( ; getCurrentScheduleIndex() < runnableAppsSize; incCurrentScheduleIndex()) {
FSAppAttempt appAttempt = pendingRunnableApps.get(getCurrentScheduleIndex());
// 过滤RM中没有记录的请求
if (!appAttempt.isInRMContext()) {
LOG.error(appAttempt.getApplicationId()
+ " is not in rmContext, this is not expected");
continue;
}
// NM在该application的黑名单中,不分配
if (SchedulerAppUtils.isBlacklisted(appAttempt, node, LOG)) {
continue;
}
Resource pending = appAttempt.getAppAttemptResourceUsage().getPending();
if (LOG.isDebugEnabled()) {
LOG.debug("Assign container on: " + node.getNodeName()
+ ", app attempt id:" + appAttempt.getName()
+ ", app demand:" + appAttempt.getDemand()
+ ", app resource usage:" + appAttempt.getResourceUsage()
+ ", pending:" + pending);
}
// pending不为none,开始分配
if (!pending.equals(Resources.none())) {
assigned = appAttempt.assignContainer(node);
if (!assigned.equals(Resources.none())) {
// 分配结果为none,同理无需进行后续分配
break;
}
}
}
} finally {
readLock.unlock();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Node " + node.getNodeName() +
", offered to queue: " + getName() +
", prev schedule index:" + preScheduleIndex +
", current schedule index:" + getCurrentScheduleIndex() +
", runnable apps count:" + runnableAppsSize +
", assigned:" + assigned);
}
return assigned;
}
⑥ FSAppAttempt.assignContainer
public Resource assignContainer(FSSchedulerNode node) {
// AM占用总资源超过队列限制,默认AM总资源不超过队列maxShare * 0.5f
if (isOverAMShareLimit()) {
return Resources.none();
}
return assignContainer(node, false);
}
内层assignContainer先获取locality,分为三种:
字面意思,很好理解。
再内层assignContainer:
private Resource assignContainer(
FSSchedulerNode node, ResourceRequest request, NodeType type,
boolean reserved, SchedulerRequestKey schedulerKey) {
// 校验label
if (!SchedulerUtils.checkNodeLabelExpression(node.getLabels(),
request.getNodeLabelExpression())) {
return Resources.none();
}
// 本次请求需要资源
Resource capability = request.getCapability();
// 当前NM空闲资源
Resource available = node.getAvailableResource();
Container container = null;
// 有reserved则分配reserved container,
// 没有reserved则根据capability和schedulerKey new一个
if (reserved) {
container = node.getReservedContainer().getContainer();
} else {
container = createContainer(node, capability, schedulerKey);
}
// 请求资源 <= NM空闲资源
if (Resources.fitsIn(capability, available)) {
// 分配,发送start事件
RMContainer allocatedContainer =
allocate(type, node, schedulerKey, request, container);
// 其他线程已经分配足够的container了,此次不分配
if (allocatedContainer == null) {
// 清空reserved
if (reserved) {
unreserve(schedulerKey, node);
}
return Resources.none();
}
// 清空reserved
if (reserved) {
unreserve(schedulerKey, node);
}
// 更新该节点的信息(container数,可用资源等)
node.allocateContainer(allocatedContainer);
// AM还没起,第一个container就是AM,配置AM信息
if (!isAmRunning() && !getUnmanagedAM()) {
setAMResource(container.getResource());
getQueue().addAMResourceUsage(container.getResource());
setAmRunning(true);
}
return container.getResource();
} else {
// 资源不够分配
// 队列不满足资源max share限制,则不分配
if (!FairScheduler.fitsInMaxShare(getQueue(), capability)) {
return Resources.none();
}
// 资源不够分配,则进行reserve
// reserve具体大致是先判断是否已经reserve,没有则new一个新的RMContainer
// 已经reserve则把此次资源add到currentReservation中,并发送reserve事件
if (reserve(schedulerKey, node, container, type, reserved)) {
return FairScheduler.CONTAINER_RESERVED;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Couldn't create reservation for app: " + getName()
+ ", at priority " + request.getPriority());
}
return Resources.none();
}
}
}
三、总结
本文介绍了FairScheduler处理NM心跳NodeUpdate,分配container的代码,另一部分在之前的文章 树状维护队列和任务,定时计算fair share等信息,并进行排序。
总的来说YARN这块代码看起来还是比较简单清晰的,之后会出CapacityScheduler相关的文章。