一 .前言
SlotManager 位于ResourceManager中.
SlotManager 负责维护所有已注册的任务管理器slot、它们的分配和所有挂起的slot请求的视图。
无论何时注册新slot或释放分配的slot,它都会尝试执行另一个挂起的slot请求。 每当没有足够的可用slot时,slot管理器将通过{@link ResourceActions#allocateResource(WorkerResourceSpec)}通知资源管理器。
为了释放资源并避免资源泄漏,空闲任务管理器(其slot当前未使用的任务管理器)和挂起的slot请求分别超时,从而触发其释放和失败。
二 .SlotManager
2.1. 介绍
SlotManager 是一个接口类,定义了ResourceManager中管理Slot的相关操作.
2.2. 接口清单
名称 | 描述 |
---|---|
start | 启动 |
suspend | 挂起 |
int getNumberRegisteredSlots() | 获取注册slot数量 |
int getNumberRegisteredSlotsOf(InstanceID instanceId) | 根据InstanceID获取slot数量 |
int getNumberFreeSlots() | 获取空闲slot数量 |
int getNumberFreeSlotsOf(InstanceID instanceId) | 根据InstanceID获取空闲slot数量 |
Map<WorkerResourceSpec, Integer> getRequiredResources(); | 获取从{@link ResourceActions}请求的尚未完成的workers SlotManager的数量。 |
ResourceProfile getRegisteredResource(); | 获取注册的Resource |
ResourceProfile getRegisteredResourceOf(InstanceID instanceID); | 根据InstanceId 获取注册的Resource |
ResourceProfile getFreeResource(); | 获取空闲Resource |
ResourceProfile getFreeResourceOf(InstanceID instanceID); | 根据InstanceID获取空闲的Resource |
int getNumberPendingSlotRequests(); | 挂起的SlotRequests数量 |
void processResourceRequirements(ResourceRequirements resourceRequirements); | 通知slot manager关于 job需要的资源信息 |
boolean registerSlotRequest(SlotRequest slotRequest) | 注册slot请求. |
boolean unregisterSlotRequest(AllocationID allocationId) | 取消注册slot请求. |
registerTaskManager | 注册TaskManager |
unregisterTaskManager | 取消注册TaskManager |
boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport); | 报告 SlotStatus |
void freeSlot(SlotID slotId, AllocationID allocationId); | 释放Slot |
void setFailUnfulfillableRequest(boolean failUnfulfillableRequest); | 设置失败未实现的请求. |
三 .SlotManagerImpl
SlotManagerImpl是SlotManager接口的实现类.
3.1. 属性清单
/**
* 所有当前可用slot的索引。
* Map for all registered slots. */
private final HashMap<SlotID, TaskManagerSlot> slots;
/** Index of all currently free slots. */
private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;
/**
* 所有当前注册的任务管理器。
* All currently registered task managers. */
private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations;
/**
* 用于请求重复数据消除的已完成和活动分配的映射。
* Map of fulfilled and active allocations for request deduplication purposes.
* */
private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;
/**
* 挂起/未完成的slot分配请求的映射
* Map of pending/unfulfilled slot allocation requests. */
private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests;
/** Scheduled executor for timeouts. */
private final ScheduledExecutor scheduledExecutor;
/** Timeout for slot requests to the task manager. */
private final Time taskManagerRequestTimeout;
/** Timeout after which an allocation is discarded. */
private final Time slotRequestTimeout;
/** Timeout after which an unused TaskManager is released. */
private final Time taskManagerTimeout;
private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots;
private final SlotMatchingStrategy slotMatchingStrategy;
/** ResourceManager's id. */
private ResourceManagerId resourceManagerId;
/** Executor for future callbacks which have to be "synchronized". */
private Executor mainThreadExecutor;
/** Callbacks for resource (de-)allocations. */
private ResourceActions resourceActions;
private ScheduledFuture<?> taskManagerTimeoutsAndRedundancyCheck;
private ScheduledFuture<?> slotRequestTimeoutCheck;
/** True iff the component has been started. */
private boolean started;
/**
* Release task executor only when each produced result partition is either consumed or failed.
*/
private final boolean waitResultConsumedBeforeRelease;
/** Defines the max limitation of the total number of slots. */
private final int maxSlotNum;
/** Defines the number of redundant taskmanagers. */
private final int redundantTaskManagerNum;
/**
* If true, fail unfulfillable slot requests immediately. Otherwise, allow unfulfillable request
* to pend. A slot request is considered unfulfillable if it cannot be fulfilled by neither a
* slot that is already registered (including allocated ones) nor a pending slot that the {@link
* ResourceActions} can allocate.
*/
private boolean failUnfulfillableRequest = true;
/** The default resource spec of workers to request. */
private final WorkerResourceSpec defaultWorkerResourceSpec;
private final int numSlotsPerWorker;
private final ResourceProfile defaultSlotResourceProfile;
private final SlotManagerMetricGroup slotManagerMetricGroup;
3.2. 构造方法
构造方法就是普通的赋值&初始化的过程…
public SlotManagerImpl(
ScheduledExecutor scheduledExecutor,
SlotManagerConfiguration slotManagerConfiguration,
SlotManagerMetricGroup slotManagerMetricGroup) {
this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
Preconditions.checkNotNull(slotManagerConfiguration);
this.slotMatchingStrategy = slotManagerConfiguration.getSlotMatchingStrategy();
this.taskManagerRequestTimeout = slotManagerConfiguration.getTaskManagerRequestTimeout();
this.slotRequestTimeout = slotManagerConfiguration.getSlotRequestTimeout();
this.taskManagerTimeout = slotManagerConfiguration.getTaskManagerTimeout();
this.waitResultConsumedBeforeRelease =
slotManagerConfiguration.isWaitResultConsumedBeforeRelease();
this.defaultWorkerResourceSpec = slotManagerConfiguration.getDefaultWorkerResourceSpec();
this.numSlotsPerWorker = slotManagerConfiguration.getNumSlotsPerWorker();
this.defaultSlotResourceProfile =
generateDefaultSlotResourceProfile(defaultWorkerResourceSpec, numSlotsPerWorker);
this.slotManagerMetricGroup = Preconditions.checkNotNull(slotManagerMetricGroup);
this.maxSlotNum = slotManagerConfiguration.getMaxSlotNum();
this.redundantTaskManagerNum = slotManagerConfiguration.getRedundantTaskManagerNum();
slots = new HashMap<>(16);
freeSlots = new LinkedHashMap<>(16);
taskManagerRegistrations = new HashMap<>(4);
fulfilledSlotRequests = new HashMap<>(16);
pendingSlotRequests = new HashMap<>(16);
pendingSlots = new HashMap<>(16);
resourceManagerId = null;
resourceActions = null;
mainThreadExecutor = null;
taskManagerTimeoutsAndRedundancyCheck = null;
slotRequestTimeoutCheck = null;
started = false;
}
3.3. 方法详单
3.3.1. start
启动
/**
* Starts the slot manager with the given leader id and resource manager actions.
*
* @param newResourceManagerId to use for communication with the task managers
* @param newMainThreadExecutor to use to run code in the ResourceManager's main thread
* @param newResourceActions to use for resource (de-)allocations
*/
@Override
public void start(
ResourceManagerId newResourceManagerId,
Executor newMainThreadExecutor,
ResourceActions newResourceActions) {
LOG.info("Starting the SlotManager.");
this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
resourceActions = Preconditions.checkNotNull(newResourceActions);
started = true;
taskManagerTimeoutsAndRedundancyCheck =
scheduledExecutor.scheduleWithFixedDelay(
() ->
mainThreadExecutor.execute(
() -> checkTaskManagerTimeoutsAndRedundancy()),
0L,
taskManagerTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
slotRequestTimeoutCheck =
scheduledExecutor.scheduleWithFixedDelay(
() -> mainThreadExecutor.execute(() -> checkSlotRequestTimeouts()),
0L,
slotRequestTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
registerSlotManagerMetrics();
}
3.3.2. suspend
挂起
/** Suspends the component. This clears the internal state of the slot manager. */
@Override
public void suspend() {
LOG.info("Suspending the SlotManager.");
// stop the timeout checks for the TaskManagers and the SlotRequests
if (taskManagerTimeoutsAndRedundancyCheck != null) {
taskManagerTimeoutsAndRedundancyCheck.cancel(false);
taskManagerTimeoutsAndRedundancyCheck = null;
}
if (slotRequestTimeoutCheck != null) {
slotRequestTimeoutCheck.cancel(false);
slotRequestTimeoutCheck = null;
}
for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) {
cancelPendingSlotRequest(pendingSlotRequest);
}
pendingSlotRequests.clear();
ArrayList<InstanceID> registeredTaskManagers =
new ArrayList<>(taskManagerRegistrations.keySet());
for (InstanceID registeredTaskManager : registeredTaskManagers) {
unregisterTaskManager(
registeredTaskManager,
new SlotManagerException("The slot manager is being suspended."));
}
resourceManagerId = null;
resourceActions = null;
started = false;
}
3.3.3. int getNumberRegisteredSlots()
获取注册slot数量
@Override
public int getNumberRegisteredSlots() {
return slots.size();
}
3.3.4. int getNumberRegisteredSlotsOf(InstanceID instanceId)
根据InstanceID获取slot数量
@Override
public int getNumberRegisteredSlotsOf(InstanceID instanceId) {
TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);
if (taskManagerRegistration != null) {
return taskManagerRegistration.getNumberRegisteredSlots();
} else {
return 0;
}
}
3.3.5. int getNumberFreeSlots()
获取空闲slot数量
@Override
public int getNumberFreeSlots() {
return freeSlots.size();
}
3.3.6. int getNumberFreeSlotsOf(InstanceID instanceId)
根据InstanceID获取空闲slot数量
@Override
public int getNumberFreeSlotsOf(InstanceID instanceId) {
TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);
if (taskManagerRegistration != null) {
return taskManagerRegistration.getNumberFreeSlots();
} else {
return 0;
}
}
3.3.7. Map<WorkerResourceSpec, Integer> getRequiredResources();
获取从{@link ResourceActions}请求的尚未完成的workers SlotManager的数量。
@Override
public Map<WorkerResourceSpec, Integer> getRequiredResources() {
final int pendingWorkerNum =
MathUtils.divideRoundUp(pendingSlots.size(), numSlotsPerWorker);
return pendingWorkerNum > 0
? Collections.singletonMap(defaultWorkerResourceSpec, pendingWorkerNum)
: Collections.emptyMap();
}
3.3.8. ResourceProfile getRegisteredResource();
获取注册的Resource
@Override
public ResourceProfile getRegisteredResource() {
return getResourceFromNumSlots(getNumberRegisteredSlots());
}
3.3.9. ResourceProfile getRegisteredResourceOf(InstanceID instanceID);
根据InstanceId 获取注册的Resource
@Override
public ResourceProfile getRegisteredResourceOf(InstanceID instanceID) {
return getResourceFromNumSlots(getNumberRegisteredSlotsOf(instanceID));
}
3.3.10. ResourceProfile getFreeResource();
获取空闲Resource
@Override
public ResourceProfile getFreeResource() {
return getResourceFromNumSlots(getNumberFreeSlots());
}
3.3.11. ResourceProfile getFreeResourceOf(InstanceID instanceID);
根据InstanceID获取空闲的Resource
@Override
public ResourceProfile getFreeResourceOf(InstanceID instanceID) {
return getResourceFromNumSlots(getNumberFreeSlotsOf(instanceID));
}
3.3.12. int getNumberPendingSlotRequests();
挂起的SlotRequests数量
@Override
public ResourceProfile getFreeResource() {
return getResourceFromNumSlots(getNumberFreeSlots());
}
3.3.13. void processResourceRequirements(ResourceRequirements resourceRequirements);
通知slot manager关于 job需要的资源信息
@Override
public void processResourceRequirements(ResourceRequirements resourceRequirements) {
// no-op; don't throw an UnsupportedOperationException here because there are code paths
// where the resource
// manager calls this method regardless of whether declarative resource management is used
// or not
}
3.3.14. boolean registerSlotRequest(SlotRequest slotRequest)
注册slot请求.
/**
* Requests a slot with the respective resource profile.
*
* @param slotRequest specifying the requested slot specs
* @return true if the slot request was registered; false if the request is a duplicate
* @throws ResourceManagerException if the slot request failed (e.g. not enough resources left)
*/
@Override
public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {
checkInit();
if (checkDuplicateRequest(slotRequest.getAllocationId())) {
LOG.debug(
"Ignoring a duplicate slot request with allocation id {}.",
slotRequest.getAllocationId());
return false;
} else {
//构建请求
PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);
pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);
try {
// 请求操作
internalRequestSlot(pendingSlotRequest);
} catch (ResourceManagerException e) {
// requesting the slot failed --> remove pending slot request
pendingSlotRequests.remove(slotRequest.getAllocationId());
throw new ResourceManagerException(
"Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
}
return true;
}
}
/**
* Tries to allocate a slot for the given slot request. If there is no slot available, the
* resource manager is informed to allocate more resources and a timeout for the request is
* registered.
*
* @param pendingSlotRequest to allocate a slot for
* @throws ResourceManagerException if the slot request failed or is unfulfillable
*/
private void internalRequestSlot(PendingSlotRequest pendingSlotRequest)
throws ResourceManagerException {
// 获取配置...
final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
OptionalConsumer.of(findMatchingSlot(resourceProfile))
.ifPresent(taskManagerSlot -> {
// taskManagerSlot 存在操作
allocateSlot(taskManagerSlot, pendingSlotRequest);
})
.ifNotPresent(() -> {
// taskManagerSlot 不存在操作 ==> 启动 TaskManager
fulfillPendingSlotRequestWithPendingTaskManagerSlot( pendingSlotRequest);
});
}
3.3.15. boolean unregisterSlotRequest(AllocationID allocationId)
取消注册slot请求.
/**
* Cancels and removes a pending slot request with the given allocation id. If there is no such
* pending request, then nothing is done.
*
* @param allocationId identifying the pending slot request
* @return True if a pending slot request was found; otherwise false
*/
@Override
public boolean unregisterSlotRequest(AllocationID allocationId) {
checkInit();
PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId);
if (null != pendingSlotRequest) {
LOG.debug("Cancel slot request {}.", allocationId);
cancelPendingSlotRequest(pendingSlotRequest);
return true;
} else {
LOG.debug(
"No pending slot request with allocation id {} found. Ignoring unregistration request.",
allocationId);
return false;
}
}
3.3.16. registerTaskManager
注册TaskManager
/**
*
* 在 slot manager中注册一个新的task manager
*
* 从而是 task managers slots 可以被感知/调度
*
* Registers a new task manager at the slot manager.
* This will make the task managers slots known and, thus, available for allocation.
*
* @param taskExecutorConnection for the new task manager
* @param initialSlotReport for the new task manager
* @return True if the task manager has not been registered before and is registered
* successfully; otherwise false
*/
@Override
public boolean registerTaskManager(
final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
// 初始化检查
// The slot manager has not been started.
checkInit();
LOG.debug(
"Registering TaskManager {} under {} at the SlotManager.",
taskExecutorConnection.getResourceID().getStringWithMetadata(),
taskExecutorConnection.getInstanceID());
// 我们通过任务管理器的实例id来识别它们
// we identify task managers by their instance id
if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
// 之间已经连接过, 直接报搞slot的状态.
reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
return false;
} else {
if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
// 是否查过最大的 slot 数量...
LOG.info(
"The total number of slots exceeds the max limitation {}, release the excess resource.",
maxSlotNum);
resourceActions.releaseResource(
taskExecutorConnection.getInstanceID(),
new FlinkException(
"The total number of slots exceeds the max limitation."));
return false;
}
// 第一次注册TaskManager
// first register the TaskManager
ArrayList<SlotID> reportedSlots = new ArrayList<>();
for (SlotStatus slotStatus : initialSlotReport) {
reportedSlots.add(slotStatus.getSlotID());
}
TaskManagerRegistration taskManagerRegistration =
new TaskManagerRegistration(taskExecutorConnection, reportedSlots);
taskManagerRegistrations.put(
taskExecutorConnection.getInstanceID(), taskManagerRegistration);
// next register the new slots
for (SlotStatus slotStatus : initialSlotReport) {
// 开始注册slots
registerSlot(
slotStatus.getSlotID(),
slotStatus.getAllocationID(),
slotStatus.getJobID(),
slotStatus.getResourceProfile(),
taskExecutorConnection);
}
return true;
}
}
3.3.17. unregisterTaskManager
取消注册TaskManager
@Override
public boolean unregisterTaskManager(InstanceID instanceId, Exception cause) {
checkInit();
LOG.debug("Unregister TaskManager {} from the SlotManager.", instanceId);
TaskManagerRegistration taskManagerRegistration =
taskManagerRegistrations.remove(instanceId);
if (null != taskManagerRegistration) {
internalUnregisterTaskManager(taskManagerRegistration, cause);
return true;
} else {
LOG.debug(
"There is no task manager registered with instance ID {}. Ignoring this message.",
instanceId);
return false;
}
}
3.3.18. boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport);
报告 SlotStatus
3.3.19. void freeSlot(SlotID slotId, AllocationID allocationId);
释放Slot
/**
* Reports the current slot allocations for a task manager identified by the given instance id.
*
* @param instanceId identifying the task manager for which to report the slot status
* @param slotReport containing the status for all of its slots
* @return true if the slot status has been updated successfully, otherwise false
*/
@Override
public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) {
checkInit();
TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);
if (null != taskManagerRegistration) {
// Received slot report from instance
// 5ad0a12f8bb03f9d016f8d1e18380563 :
// SlotReport{
// SlotStatus{
// slotID=container_1615446205104_0025_01_000002_0,
// allocationID=3755cb8f9962a9a7738db04f2a02084c,
// jobID=694474d11da6100e82744c9e47e2f511,
// resourceProfile=ResourceProfile{
// cpuCores=1.0000000000000000,
// taskHeapMemory=384.000mb (402653174 bytes),
// taskOffHeapMemory=0 bytes,
// managedMemory=512.000mb (536870920 bytes),
// networkMemory=128.000mb (134217730 bytes)
// }
// }
// }.
LOG.debug("Received slot report from instance {}: {}.", instanceId, slotReport);
for (SlotStatus slotStatus : slotReport) {
updateSlot(
slotStatus.getSlotID(),
slotStatus.getAllocationID(),
slotStatus.getJobID());
}
return true;
} else {
LOG.debug(
"Received slot report for unknown task manager with instance id {}. Ignoring this report.",
instanceId);
return false;
}
}
3.3.20. void setFailUnfulfillableRequest(boolean failUnfulfillableRequest);
设置失败未实现的请求.
@Override
public void setFailUnfulfillableRequest(boolean failUnfulfillableRequest) {
if (!this.failUnfulfillableRequest && failUnfulfillableRequest) {
// fail unfulfillable pending requests
Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator =
pendingSlotRequests.entrySet().iterator();
while (slotRequestIterator.hasNext()) {
PendingSlotRequest pendingSlotRequest = slotRequestIterator.next().getValue();
if (pendingSlotRequest.getAssignedPendingTaskManagerSlot() != null) {
continue;
}
if (!isFulfillableByRegisteredOrPendingSlots(
pendingSlotRequest.getResourceProfile())) {
slotRequestIterator.remove();
resourceActions.notifyAllocationFailure(
pendingSlotRequest.getJobId(),
pendingSlotRequest.getAllocationId(),
new UnfulfillableSlotRequestException(
pendingSlotRequest.getAllocationId(),
pendingSlotRequest.getResourceProfile()));
}
}
}
this.failUnfulfillableRequest = failUnfulfillableRequest;
}
3.4. freeSlot
/**
* Free the given slot from the given allocation. If the slot is still allocated by the given
* allocation id, then the slot will be marked as free and will be subject to new slot requests.
*
* @param slotId identifying the slot to free
* @param allocationId with which the slot is presumably allocated
*/
@Override
public void freeSlot(SlotID slotId, AllocationID allocationId) {
checkInit();
TaskManagerSlot slot = slots.get(slotId);
if (null != slot) {
if (slot.getState() == SlotState.ALLOCATED) {
if (Objects.equals(allocationId, slot.getAllocationId())) {
TaskManagerRegistration taskManagerRegistration =
taskManagerRegistrations.get(slot.getInstanceId());
if (taskManagerRegistration == null) {
throw new IllegalStateException(
"Trying to free a slot from a TaskManager "
+ slot.getInstanceId()
+ " which has not been registered.");
}
updateSlotState(slot, taskManagerRegistration, null, null);
} else {
LOG.debug(
"Received request to free slot {} with expected allocation id {}, "
+ "but actual allocation id {} differs. Ignoring the request.",
slotId,
allocationId,
slot.getAllocationId());
}
} else {
LOG.debug("Slot {} has not been allocated.", allocationId);
}
} else {
LOG.debug(
"Trying to free a slot {} which has not been registered. Ignoring this message.",
slotId);
}
}
3.5. updateSlotState
// 根据slot的状态进行不同的操作...
private void updateSlotState(
TaskManagerSlot slot,
TaskManagerRegistration taskManagerRegistration,
@Nullable AllocationID allocationId,
@Nullable JobID jobId) {
if (null != allocationId) {
switch (slot.getState()) {
// 挂起...
case PENDING:
// we have a pending slot request --> check whether we have to reject it
PendingSlotRequest pendingSlotRequest = slot.getAssignedSlotRequest();
if (Objects.equals(pendingSlotRequest.getAllocationId(), allocationId)) {
// we can cancel the slot request because it has been fulfilled
cancelPendingSlotRequest(pendingSlotRequest);
// remove the pending slot request, since it has been completed
pendingSlotRequests.remove(pendingSlotRequest.getAllocationId());
slot.completeAllocation(allocationId, jobId);
} else {
// we first have to free the slot in order to set a new allocationId
slot.clearPendingSlotRequest();
// set the allocation id such that the slot won't be considered for the
// pending slot request
slot.updateAllocation(allocationId, jobId);
// remove the pending request if any as it has been assigned
final PendingSlotRequest actualPendingSlotRequest =
pendingSlotRequests.remove(allocationId);
if (actualPendingSlotRequest != null) {
cancelPendingSlotRequest(actualPendingSlotRequest);
}
// this will try to find a new slot for the request
rejectPendingSlotRequest(
pendingSlotRequest,
new Exception(
"Task manager reported slot "
+ slot.getSlotId()
+ " being already allocated."));
}
taskManagerRegistration.occupySlot();
break;
case ALLOCATED:
// 分配
if (!Objects.equals(allocationId, slot.getAllocationId())) {
slot.freeSlot();
slot.updateAllocation(allocationId, jobId);
}
break;
case FREE:
// 释放..
// the slot is currently free --> it is stored in freeSlots
freeSlots.remove(slot.getSlotId());
slot.updateAllocation(allocationId, jobId);
taskManagerRegistration.occupySlot();
break;
}
fulfilledSlotRequests.put(allocationId, slot.getSlotId());
} else {
// no allocation reported
switch (slot.getState()) {
case FREE:
// 处理空闲
handleFreeSlot(slot);
break;
case PENDING:
// 不要做任何事情,因为我们还有一个挂起的slot请求
// don't do anything because we still have a pending slot request
break;
case ALLOCATED:
// 分配操作...
AllocationID oldAllocation = slot.getAllocationId();
slot.freeSlot();
fulfilledSlotRequests.remove(oldAllocation);
taskManagerRegistration.freeSlot();
handleFreeSlot(slot);
break;
}
}
}
3.6. xxx
转载:https://blog.csdn.net/zhanglong_4444/article/details/116229016