飞道的博客

Flink 1.12.2 源码浅析 :SlotManager

414人阅读  评论(0)

一 .前言

SlotManager 位于ResourceManager中.
SlotManager 负责维护所有已注册的任务管理器slot、它们的分配和所有挂起的slot请求的视图。
无论何时注册新slot或释放分配的slot,它都会尝试执行另一个挂起的slot请求。 每当没有足够的可用slot时,slot管理器将通过{@link ResourceActions#allocateResource(WorkerResourceSpec)}通知资源管理器。

为了释放资源并避免资源泄漏,空闲任务管理器(其slot当前未使用的任务管理器)和挂起的slot请求分别超时,从而触发其释放和失败。

二 .SlotManager

2.1. 介绍

SlotManager 是一个接口类,定义了ResourceManager中管理Slot的相关操作.

2.2. 接口清单

名称 描述
start 启动
suspend 挂起
int getNumberRegisteredSlots() 获取注册slot数量
int getNumberRegisteredSlotsOf(InstanceID instanceId) 根据InstanceID获取slot数量
int getNumberFreeSlots() 获取空闲slot数量
int getNumberFreeSlotsOf(InstanceID instanceId) 根据InstanceID获取空闲slot数量
Map<WorkerResourceSpec, Integer> getRequiredResources(); 获取从{@link ResourceActions}请求的尚未完成的workers SlotManager的数量。
ResourceProfile getRegisteredResource(); 获取注册的Resource
ResourceProfile getRegisteredResourceOf(InstanceID instanceID); 根据InstanceId 获取注册的Resource
ResourceProfile getFreeResource(); 获取空闲Resource
ResourceProfile getFreeResourceOf(InstanceID instanceID); 根据InstanceID获取空闲的Resource
int getNumberPendingSlotRequests(); 挂起的SlotRequests数量
void processResourceRequirements(ResourceRequirements resourceRequirements); 通知slot manager关于 job需要的资源信息
boolean registerSlotRequest(SlotRequest slotRequest) 注册slot请求.
boolean unregisterSlotRequest(AllocationID allocationId) 取消注册slot请求.
registerTaskManager 注册TaskManager
unregisterTaskManager 取消注册TaskManager
boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport); 报告 SlotStatus
void freeSlot(SlotID slotId, AllocationID allocationId); 释放Slot
void setFailUnfulfillableRequest(boolean failUnfulfillableRequest); 设置失败未实现的请求.

三 .SlotManagerImpl

SlotManagerImpl是SlotManager接口的实现类.

3.1. 属性清单


    
    /** 
     * 所有当前可用slot的索引。
     * Map for all registered slots. */
    private final HashMap<SlotID, TaskManagerSlot> slots;

    /** Index of all currently free slots. */
    private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;

    /**
     * 所有当前注册的任务管理器。
     * All currently registered task managers. */
    private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations;

    /**
     * 用于请求重复数据消除的已完成和活动分配的映射。
     * Map of fulfilled and active allocations for request deduplication purposes.
     * */
    private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;

    /**
     * 挂起/未完成的slot分配请求的映射
     * Map of pending/unfulfilled slot allocation requests. */
    private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests;

    
    
    /** Scheduled executor for timeouts. */
    private final ScheduledExecutor scheduledExecutor;

    /** Timeout for slot requests to the task manager. */
    private final Time taskManagerRequestTimeout;

    /** Timeout after which an allocation is discarded. */
    private final Time slotRequestTimeout;

    /** Timeout after which an unused TaskManager is released. */
    private final Time taskManagerTimeout;

    private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots;

    private final SlotMatchingStrategy slotMatchingStrategy;

    /** ResourceManager's id. */
    private ResourceManagerId resourceManagerId;

    /** Executor for future callbacks which have to be "synchronized". */
    private Executor mainThreadExecutor;

    /** Callbacks for resource (de-)allocations. */
    private ResourceActions resourceActions;

    private ScheduledFuture<?> taskManagerTimeoutsAndRedundancyCheck;

    private ScheduledFuture<?> slotRequestTimeoutCheck;

    /** True iff the component has been started. */
    private boolean started;

    /**
     * Release task executor only when each produced result partition is either consumed or failed.
     */
    private final boolean waitResultConsumedBeforeRelease;

    /** Defines the max limitation of the total number of slots. */
    private final int maxSlotNum;

    /** Defines the number of redundant taskmanagers. */
    private final int redundantTaskManagerNum;

    /**
     * If true, fail unfulfillable slot requests immediately. Otherwise, allow unfulfillable request
     * to pend. A slot request is considered unfulfillable if it cannot be fulfilled by neither a
     * slot that is already registered (including allocated ones) nor a pending slot that the {@link
     * ResourceActions} can allocate.
     */
    private boolean failUnfulfillableRequest = true;

    /** The default resource spec of workers to request. */
    private final WorkerResourceSpec defaultWorkerResourceSpec;

    private final int numSlotsPerWorker;

    private final ResourceProfile defaultSlotResourceProfile;

    private final SlotManagerMetricGroup slotManagerMetricGroup;

3.2. 构造方法

构造方法就是普通的赋值&初始化的过程…


    public SlotManagerImpl(
            ScheduledExecutor scheduledExecutor,
            SlotManagerConfiguration slotManagerConfiguration,
            SlotManagerMetricGroup slotManagerMetricGroup) {
   

        this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);

        Preconditions.checkNotNull(slotManagerConfiguration);
        this.slotMatchingStrategy = slotManagerConfiguration.getSlotMatchingStrategy();
        this.taskManagerRequestTimeout = slotManagerConfiguration.getTaskManagerRequestTimeout();
        this.slotRequestTimeout = slotManagerConfiguration.getSlotRequestTimeout();
        this.taskManagerTimeout = slotManagerConfiguration.getTaskManagerTimeout();
        this.waitResultConsumedBeforeRelease =
                slotManagerConfiguration.isWaitResultConsumedBeforeRelease();
        this.defaultWorkerResourceSpec = slotManagerConfiguration.getDefaultWorkerResourceSpec();
        this.numSlotsPerWorker = slotManagerConfiguration.getNumSlotsPerWorker();
        this.defaultSlotResourceProfile =
                generateDefaultSlotResourceProfile(defaultWorkerResourceSpec, numSlotsPerWorker);
        this.slotManagerMetricGroup = Preconditions.checkNotNull(slotManagerMetricGroup);
        this.maxSlotNum = slotManagerConfiguration.getMaxSlotNum();
        this.redundantTaskManagerNum = slotManagerConfiguration.getRedundantTaskManagerNum();

        slots = new HashMap<>(16);
        freeSlots = new LinkedHashMap<>(16);
        taskManagerRegistrations = new HashMap<>(4);
        fulfilledSlotRequests = new HashMap<>(16);
        pendingSlotRequests = new HashMap<>(16);
        pendingSlots = new HashMap<>(16);

        resourceManagerId = null;
        resourceActions = null;
        mainThreadExecutor = null;
        taskManagerTimeoutsAndRedundancyCheck = null;
        slotRequestTimeoutCheck = null;

        started = false;
    }

3.3. 方法详单

3.3.1. start

启动


    /**
     * Starts the slot manager with the given leader id and resource manager actions.
     *
     * @param newResourceManagerId to use for communication with the task managers
     * @param newMainThreadExecutor to use to run code in the ResourceManager's main thread
     * @param newResourceActions to use for resource (de-)allocations
     */
    @Override
    public void start(
            ResourceManagerId newResourceManagerId,
            Executor newMainThreadExecutor,
            ResourceActions newResourceActions) {
   
        LOG.info("Starting the SlotManager.");

        this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
        mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
        resourceActions = Preconditions.checkNotNull(newResourceActions);

        started = true;

        taskManagerTimeoutsAndRedundancyCheck =
                scheduledExecutor.scheduleWithFixedDelay(
                        () ->
                                mainThreadExecutor.execute(
                                        () -> checkTaskManagerTimeoutsAndRedundancy()),
                        0L,
                        taskManagerTimeout.toMilliseconds(),
                        TimeUnit.MILLISECONDS);

        slotRequestTimeoutCheck =
                scheduledExecutor.scheduleWithFixedDelay(
                        () -> mainThreadExecutor.execute(() -> checkSlotRequestTimeouts()),
                        0L,
                        slotRequestTimeout.toMilliseconds(),
                        TimeUnit.MILLISECONDS);

        registerSlotManagerMetrics();
    }

3.3.2. suspend

挂起


    /** Suspends the component. This clears the internal state of the slot manager. */
    @Override
    public void suspend() {
   
        LOG.info("Suspending the SlotManager.");

        // stop the timeout checks for the TaskManagers and the SlotRequests
        if (taskManagerTimeoutsAndRedundancyCheck != null) {
   
            taskManagerTimeoutsAndRedundancyCheck.cancel(false);
            taskManagerTimeoutsAndRedundancyCheck = null;
        }

        if (slotRequestTimeoutCheck != null) {
   
            slotRequestTimeoutCheck.cancel(false);
            slotRequestTimeoutCheck = null;
        }

        for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) {
   
            cancelPendingSlotRequest(pendingSlotRequest);
        }

        pendingSlotRequests.clear();

        ArrayList<InstanceID> registeredTaskManagers =
                new ArrayList<>(taskManagerRegistrations.keySet());

        for (InstanceID registeredTaskManager : registeredTaskManagers) {
   
            unregisterTaskManager(
                    registeredTaskManager,
                    new SlotManagerException("The slot manager is being suspended."));
        }

        resourceManagerId = null;
        resourceActions = null;
        started = false;
    }

3.3.3. int getNumberRegisteredSlots()

获取注册slot数量

    @Override
    public int getNumberRegisteredSlots() {
   
        return slots.size();
    }

3.3.4. int getNumberRegisteredSlotsOf(InstanceID instanceId)

根据InstanceID获取slot数量

    @Override
    public int getNumberRegisteredSlotsOf(InstanceID instanceId) {
   
        TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);

        if (taskManagerRegistration != null) {
   
            return taskManagerRegistration.getNumberRegisteredSlots();
        } else {
   
            return 0;
        }
    }

3.3.5. int getNumberFreeSlots()

获取空闲slot数量

    @Override
    public int getNumberFreeSlots() {
   
        return freeSlots.size();
    }

3.3.6. int getNumberFreeSlotsOf(InstanceID instanceId)

根据InstanceID获取空闲slot数量

    @Override
    public int getNumberFreeSlotsOf(InstanceID instanceId) {
   
        TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);

        if (taskManagerRegistration != null) {
   
            return taskManagerRegistration.getNumberFreeSlots();
        } else {
   
            return 0;
        }
    }

3.3.7. Map<WorkerResourceSpec, Integer> getRequiredResources();

获取从{@link ResourceActions}请求的尚未完成的workers SlotManager的数量。


    @Override
    public Map<WorkerResourceSpec, Integer> getRequiredResources() {
   
        final int pendingWorkerNum =
                MathUtils.divideRoundUp(pendingSlots.size(), numSlotsPerWorker);
        return pendingWorkerNum > 0
                ? Collections.singletonMap(defaultWorkerResourceSpec, pendingWorkerNum)
                : Collections.emptyMap();
    }

3.3.8. ResourceProfile getRegisteredResource();

获取注册的Resource


    @Override
    public ResourceProfile getRegisteredResource() {
   
        return getResourceFromNumSlots(getNumberRegisteredSlots());
    }

3.3.9. ResourceProfile getRegisteredResourceOf(InstanceID instanceID);

根据InstanceId 获取注册的Resource

    @Override
    public ResourceProfile getRegisteredResourceOf(InstanceID instanceID) {
   
        return getResourceFromNumSlots(getNumberRegisteredSlotsOf(instanceID));
    }

3.3.10. ResourceProfile getFreeResource();

获取空闲Resource

    @Override
    public ResourceProfile getFreeResource() {
   
        return getResourceFromNumSlots(getNumberFreeSlots());
    }

3.3.11. ResourceProfile getFreeResourceOf(InstanceID instanceID);

根据InstanceID获取空闲的Resource

    @Override
    public ResourceProfile getFreeResourceOf(InstanceID instanceID) {
   
        return getResourceFromNumSlots(getNumberFreeSlotsOf(instanceID));
    }

3.3.12. int getNumberPendingSlotRequests();

挂起的SlotRequests数量

    @Override
    public ResourceProfile getFreeResource() {
   
        return getResourceFromNumSlots(getNumberFreeSlots());
    }

3.3.13. void processResourceRequirements(ResourceRequirements resourceRequirements);

通知slot manager关于 job需要的资源信息

    @Override
    public void processResourceRequirements(ResourceRequirements resourceRequirements) {
   
        // no-op; don't throw an UnsupportedOperationException here because there are code paths
        // where the resource
        // manager calls this method regardless of whether declarative resource management is used
        // or not
    }

3.3.14. boolean registerSlotRequest(SlotRequest slotRequest)

注册slot请求.


    /**
     * Requests a slot with the respective resource profile.
     *
     * @param slotRequest specifying the requested slot specs
     * @return true if the slot request was registered; false if the request is a duplicate
     * @throws ResourceManagerException if the slot request failed (e.g. not enough resources left)
     */
    @Override
    public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {
   
        checkInit();

        if (checkDuplicateRequest(slotRequest.getAllocationId())) {
   
            LOG.debug(
                    "Ignoring a duplicate slot request with allocation id {}.",
                    slotRequest.getAllocationId());

            return false;
        } else {
   
            //构建请求
            PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);

            pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);

            try {
   
                // 请求操作
                internalRequestSlot(pendingSlotRequest);
            } catch (ResourceManagerException e) {
   
                // requesting the slot failed --> remove pending slot request
                pendingSlotRequests.remove(slotRequest.getAllocationId());

                throw new ResourceManagerException(
                        "Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
            }

            return true;
        }
    }


    /**
     * Tries to allocate a slot for the given slot request. If there is no slot available, the
     * resource manager is informed to allocate more resources and a timeout for the request is
     * registered.
     *
     * @param pendingSlotRequest to allocate a slot for
     * @throws ResourceManagerException if the slot request failed or is unfulfillable
     */
    private void internalRequestSlot(PendingSlotRequest pendingSlotRequest)
            throws ResourceManagerException {
   
        // 获取配置...
        final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();

        OptionalConsumer.of(findMatchingSlot(resourceProfile))
                .ifPresent(taskManagerSlot -> {
   
                    // taskManagerSlot 存在操作
                    allocateSlot(taskManagerSlot, pendingSlotRequest);
                })
                .ifNotPresent(() -> {
   
                            // taskManagerSlot 不存在操作 ==>  启动 TaskManager
                            fulfillPendingSlotRequestWithPendingTaskManagerSlot( pendingSlotRequest);
                        });
    }

3.3.15. boolean unregisterSlotRequest(AllocationID allocationId)

取消注册slot请求.


    /**
     * Cancels and removes a pending slot request with the given allocation id. If there is no such
     * pending request, then nothing is done.
     *
     * @param allocationId identifying the pending slot request
     * @return True if a pending slot request was found; otherwise false
     */
    @Override
    public boolean unregisterSlotRequest(AllocationID allocationId) {
   
        checkInit();

        PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId);

        if (null != pendingSlotRequest) {
   
            LOG.debug("Cancel slot request {}.", allocationId);

            cancelPendingSlotRequest(pendingSlotRequest);

            return true;
        } else {
   
            LOG.debug(
                    "No pending slot request with allocation id {} found. Ignoring unregistration request.",
                    allocationId);

            return false;
        }
    }

3.3.16. registerTaskManager

注册TaskManager


    /**
     *
     * 在 slot manager中注册一个新的task manager
     *
     * 从而是 task managers slots 可以被感知/调度
     *
     * Registers a new task manager at the slot manager.
     * This will make the task managers slots known and, thus, available for allocation.
     *
     * @param taskExecutorConnection for the new task manager
     * @param initialSlotReport for the new task manager
     * @return True if the task manager has not been registered before and is registered
     *     successfully; otherwise false
     */
    @Override
    public boolean registerTaskManager(
            final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
   

        // 初始化检查
        // The slot manager has not been started.
        checkInit();

        LOG.debug(
                "Registering TaskManager {} under {} at the SlotManager.",
                taskExecutorConnection.getResourceID().getStringWithMetadata(),
                taskExecutorConnection.getInstanceID());

        // 我们通过任务管理器的实例id来识别它们
        // we identify task managers by their instance id

        if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
   

            // 之间已经连接过, 直接报搞slot的状态.
            reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
            return false;
        } else {
   

            if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
   
                // 是否查过最大的 slot 数量...
                LOG.info(
                        "The total number of slots exceeds the max limitation {}, release the excess resource.",
                        maxSlotNum);
                resourceActions.releaseResource(
                        taskExecutorConnection.getInstanceID(),
                        new FlinkException(
                                "The total number of slots exceeds the max limitation."));
                return false;
            }

            // 第一次注册TaskManager
            // first register the TaskManager
            ArrayList<SlotID> reportedSlots = new ArrayList<>();

            for (SlotStatus slotStatus : initialSlotReport) {
   
                reportedSlots.add(slotStatus.getSlotID());
            }

            TaskManagerRegistration taskManagerRegistration =
                    new TaskManagerRegistration(taskExecutorConnection, reportedSlots);

            taskManagerRegistrations.put(
                    taskExecutorConnection.getInstanceID(), taskManagerRegistration);


            // next register the new slots
            for (SlotStatus slotStatus : initialSlotReport) {
   

                // 开始注册slots
                registerSlot(
                        slotStatus.getSlotID(),
                        slotStatus.getAllocationID(),
                        slotStatus.getJobID(),
                        slotStatus.getResourceProfile(),
                        taskExecutorConnection);
            }

            return true;
        }
    }

3.3.17. unregisterTaskManager

取消注册TaskManager

 @Override
    public boolean unregisterTaskManager(InstanceID instanceId, Exception cause) {
   
        checkInit();

        LOG.debug("Unregister TaskManager {} from the SlotManager.", instanceId);

        TaskManagerRegistration taskManagerRegistration =
                taskManagerRegistrations.remove(instanceId);

        if (null != taskManagerRegistration) {
   
            internalUnregisterTaskManager(taskManagerRegistration, cause);

            return true;
        } else {
   
            LOG.debug(
                    "There is no task manager registered with instance ID {}. Ignoring this message.",
                    instanceId);

            return false;
        }
    }

3.3.18. boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport);

报告 SlotStatus

3.3.19. void freeSlot(SlotID slotId, AllocationID allocationId);

释放Slot


    /**
     * Reports the current slot allocations for a task manager identified by the given instance id.
     *
     * @param instanceId identifying the task manager for which to report the slot status
     * @param slotReport containing the status for all of its slots
     * @return true if the slot status has been updated successfully, otherwise false
     */
    @Override
    public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) {
   
        checkInit();

        TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);

        if (null != taskManagerRegistration) {
   
            // Received slot report from instance
            //      5ad0a12f8bb03f9d016f8d1e18380563 :
            //      SlotReport{
   
            //          SlotStatus{
   
            //              slotID=container_1615446205104_0025_01_000002_0,
            //              allocationID=3755cb8f9962a9a7738db04f2a02084c,
            //              jobID=694474d11da6100e82744c9e47e2f511,
            //              resourceProfile=ResourceProfile{
   
            //                  cpuCores=1.0000000000000000,
            //                  taskHeapMemory=384.000mb (402653174 bytes),
            //                  taskOffHeapMemory=0 bytes,
            //                  managedMemory=512.000mb (536870920 bytes),
            //                  networkMemory=128.000mb (134217730 bytes)
            //              }
            //          }
            //    }.
            LOG.debug("Received slot report from instance {}: {}.", instanceId, slotReport);

            for (SlotStatus slotStatus : slotReport) {
   
                updateSlot(
                        slotStatus.getSlotID(),
                        slotStatus.getAllocationID(),
                        slotStatus.getJobID());
            }

            return true;
        } else {
   
            LOG.debug(
                    "Received slot report for unknown task manager with instance id {}. Ignoring this report.",
                    instanceId);

            return false;
        }
    }

3.3.20. void setFailUnfulfillableRequest(boolean failUnfulfillableRequest);

设置失败未实现的请求.


    @Override
    public void setFailUnfulfillableRequest(boolean failUnfulfillableRequest) {
   
        if (!this.failUnfulfillableRequest && failUnfulfillableRequest) {
   
            // fail unfulfillable pending requests
            Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator =
                    pendingSlotRequests.entrySet().iterator();
            while (slotRequestIterator.hasNext()) {
   
                PendingSlotRequest pendingSlotRequest = slotRequestIterator.next().getValue();
                if (pendingSlotRequest.getAssignedPendingTaskManagerSlot() != null) {
   
                    continue;
                }
                if (!isFulfillableByRegisteredOrPendingSlots(
                        pendingSlotRequest.getResourceProfile())) {
   
                    slotRequestIterator.remove();
                    resourceActions.notifyAllocationFailure(
                            pendingSlotRequest.getJobId(),
                            pendingSlotRequest.getAllocationId(),
                            new UnfulfillableSlotRequestException(
                                    pendingSlotRequest.getAllocationId(),
                                    pendingSlotRequest.getResourceProfile()));
                }
            }
        }
        this.failUnfulfillableRequest = failUnfulfillableRequest;
    }

3.4. freeSlot


    /**
     * Free the given slot from the given allocation. If the slot is still allocated by the given
     * allocation id, then the slot will be marked as free and will be subject to new slot requests.
     *
     * @param slotId identifying the slot to free
     * @param allocationId with which the slot is presumably allocated
     */
    @Override
    public void freeSlot(SlotID slotId, AllocationID allocationId) {
   
        checkInit();

        TaskManagerSlot slot = slots.get(slotId);

        if (null != slot) {
   
            if (slot.getState() == SlotState.ALLOCATED) {
   
                if (Objects.equals(allocationId, slot.getAllocationId())) {
   

                    TaskManagerRegistration taskManagerRegistration =
                            taskManagerRegistrations.get(slot.getInstanceId());

                    if (taskManagerRegistration == null) {
   
                        throw new IllegalStateException(
                                "Trying to free a slot from a TaskManager "
                                        + slot.getInstanceId()
                                        + " which has not been registered.");
                    }

                    updateSlotState(slot, taskManagerRegistration, null, null);
                } else {
   
                    LOG.debug(
                            "Received request to free slot {} with expected allocation id {}, "
                                    + "but actual allocation id {} differs. Ignoring the request.",
                            slotId,
                            allocationId,
                            slot.getAllocationId());
                }
            } else {
   
                LOG.debug("Slot {} has not been allocated.", allocationId);
            }
        } else {
   
            LOG.debug(
                    "Trying to free a slot {} which has not been registered. Ignoring this message.",
                    slotId);
        }
    }

3.5. updateSlotState


    // 根据slot的状态进行不同的操作...
    private void updateSlotState(
            TaskManagerSlot slot,
            TaskManagerRegistration taskManagerRegistration,
            @Nullable AllocationID allocationId,
            @Nullable JobID jobId) {
   
        if (null != allocationId) {
   
            switch (slot.getState()) {
   
                // 挂起...
                case PENDING:
                    // we have a pending slot request --> check whether we have to reject it
                    PendingSlotRequest pendingSlotRequest = slot.getAssignedSlotRequest();

                    if (Objects.equals(pendingSlotRequest.getAllocationId(), allocationId)) {
   
                        // we can cancel the slot request because it has been fulfilled
                        cancelPendingSlotRequest(pendingSlotRequest);

                        // remove the pending slot request, since it has been completed
                        pendingSlotRequests.remove(pendingSlotRequest.getAllocationId());

                        slot.completeAllocation(allocationId, jobId);
                    } else {
   
                        // we first have to free the slot in order to set a new allocationId
                        slot.clearPendingSlotRequest();
                        // set the allocation id such that the slot won't be considered for the
                        // pending slot request
                        slot.updateAllocation(allocationId, jobId);

                        // remove the pending request if any as it has been assigned
                        final PendingSlotRequest actualPendingSlotRequest =
                                pendingSlotRequests.remove(allocationId);

                        if (actualPendingSlotRequest != null) {
   
                            cancelPendingSlotRequest(actualPendingSlotRequest);
                        }

                        // this will try to find a new slot for the request
                        rejectPendingSlotRequest(
                                pendingSlotRequest,
                                new Exception(
                                        "Task manager reported slot "
                                                + slot.getSlotId()
                                                + " being already allocated."));
                    }

                    taskManagerRegistration.occupySlot();
                    break;
                case ALLOCATED:
                    // 分配
                    if (!Objects.equals(allocationId, slot.getAllocationId())) {
   
                        slot.freeSlot();
                        slot.updateAllocation(allocationId, jobId);
                    }
                    break;
                case FREE:
                    // 释放..
                    // the slot is currently free --> it is stored in freeSlots
                    freeSlots.remove(slot.getSlotId());
                    slot.updateAllocation(allocationId, jobId);
                    taskManagerRegistration.occupySlot();
                    break;
            }

            fulfilledSlotRequests.put(allocationId, slot.getSlotId());
        } else {
   
            // no allocation reported
            switch (slot.getState()) {
   
                case FREE:
                    // 处理空闲
                    handleFreeSlot(slot);
                    break;
                case PENDING:
                    // 不要做任何事情,因为我们还有一个挂起的slot请求
                    // don't do anything because we still have a pending slot request
                    break;
                case ALLOCATED:
                    // 分配操作...
                    AllocationID oldAllocation = slot.getAllocationId();
                    slot.freeSlot();
                    fulfilledSlotRequests.remove(oldAllocation);
                    taskManagerRegistration.freeSlot();

                    handleFreeSlot(slot);
                    break;
            }
        }
    }

3.6. xxx


转载:https://blog.csdn.net/zhanglong_4444/article/details/116229016
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场