[k8s源码分析][kubelet] devicemanager 之 kubelet申请资源

阅读 41

2021-09-28

1. 前言

Manager

// Manager manages all the Device Plugins running on a node.
type Manager interface {
    // Start starts device plugin registration service.
    Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error
    // Allocate configures and assigns devices to pods. The pods are provided
    // through the pod admission attributes in the attrs argument. From the
    // requested device resources, Allocate will communicate with the owning
    // device plugin to allow setup procedures to take place, and for the
    // device plugin to provide runtime settings to use the device (environment
    // variables, mount points and device files). The node object is provided
    // for the device manager to update the node capacity to reflect the
    // currently available devices.
    Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error
    // Stop stops the manager.
    Stop() error
    // GetDeviceRunContainerOptions checks whether we have cached containerDevices
    // for the passed-in <pod, container> and returns its DeviceRunContainerOptions
    // for the found one. An empty struct is returned in case no cached state is found.
    GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error)
    // GetCapacity returns the amount of available device plugin resource capacity, resource allocatable
    // and inactive device plugin resources previously registered on the node.
    GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
    GetWatcherHandler() watcher.PluginHandler
    // GetDevices returns information about the devices assigned to pods and containers
    GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
}

// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.
type DeviceRunContainerOptions struct {
    // The environment variables list.
    Envs []kubecontainer.EnvVar
    // The mounts for the container.
    Mounts []kubecontainer.Mount
    // The host devices mapped into the container.
    Devices []kubecontainer.DeviceInfo
    // The Annotations for the container
    Annotations []kubecontainer.Annotation
}

ManagerImpl

type monitorCallback func(resourceName string, devices []pluginapi.Device)
type ManagerImpl struct {
    // 地址 /var/lib/kubelet/device-plugins/kubelet.sock
    socketname string
    socketdir  string
    // resouceName与对应的endpoint
    endpoints map[string]endpointInfo // Key is ResourceName
    mutex     sync.Mutex
    // grpc
    server *grpc.Server
    wg     sync.WaitGroup
    // 该方法用于得到节点中active pods, 可以用于更新节点中的资源信息
    // 因为有些占有资源的pod已经运行完了, 就需要回收该pod的资源更新到device manager中
    activePods ActivePodsFunc
    sourcesReady config.SourcesReady
    // 回调函数
    callback monitorCallback
    // resouceName以及它所有healthy的设备
    healthyDevices map[string]sets.String
    // resouceName以及它所有unhealthy的设备
    unhealthyDevices map[string]sets.String
    // resouceName以及它已经分配出去的设备
    allocatedDevices map[string]sets.String
    // podDeivces保存着pod与该pod拥有的资源设备信息
    podDevices        podDevices
    // 持久化
    checkpointManager checkpointmanager.CheckpointManager
}
type endpointInfo struct {
    e    endpoint
    opts *pluginapi.DevicePluginOptions
}
func NewManagerImpl() (*ManagerImpl, error) {
    // pluginapi.KubeletSocket=/var/lib/kubelet/device-plugins/kubelet.sock
    return newManagerImpl(pluginapi.KubeletSocket)
}
func newManagerImpl(socketPath string) (*ManagerImpl, error) {
    klog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)
    if socketPath == "" || !filepath.IsAbs(socketPath) {
        return nil, fmt.Errorf(errBadSocket+" %s", socketPath)
    }
    dir, file := filepath.Split(socketPath)
    manager := &ManagerImpl{
        endpoints: make(map[string]endpointInfo),
        socketname:       file,
        socketdir:        dir,
        healthyDevices:   make(map[string]sets.String),
        unhealthyDevices: make(map[string]sets.String),
        allocatedDevices: make(map[string]sets.String),
        podDevices:       make(podDevices),
    }
    manager.callback = manager.genericDeviceUpdateCallback
    // The following structs are populated with real implementations in manager.Start()
    // Before that, initializes them to perform no-op operations.
    // 在调用start方法的时候会传入新的activePods和sourcesReady
    manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
    manager.sourcesReady = &sourcesReadyStub{}
    checkpointManager, err := checkpointmanager.NewCheckpointManager(dir)
    if err != nil {
        return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
    }
    manager.checkpointManager = checkpointManager
    return manager, nil
}
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
...
m.activePods = activePods
m.sourcesReady = sourcesReady
...
}

Allocate

func (m *ManagerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
    // 要申请资源的pod
    pod := attrs.Pod
    // 尝试为该pod分配资源
    err := m.allocatePodResources(pod)
    ...
    // 再次确认分配是否成功
    if _, podRequireDevicePluginResource := m.podDevices[string(pod.UID)]; !podRequireDevicePluginResource {
        return nil
    }
    // 分配成功 调整节点信息
    m.sanitizeNodeAllocatable(node)
    return nil
}
allocatePodResources
func (m *ManagerImpl) allocatePodResources(pod *v1.Pod) error {
    devicesToReuse := make(map[string]sets.String)
    for _, container := range pod.Spec.InitContainers {
        if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
            return err
        }
        m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
    }
    for _, container := range pod.Spec.Containers {
        if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
            return err
        }
        m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
    }
    return nil
}
allocateContainerResources
func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.String) error {
    podUID := string(pod.UID)
    contName := container.Name
    allocatedDevicesUpdated := false
    for k, v := range container.Resources.Limits {
        resource := string(k)
        needed := int(v.Value())
        klog.V(3).Infof("needs %d %s", needed, resource)
        if !m.isDevicePluginResource(resource) {
            continue
        }
        // 更新一次资源设备信息
        if !allocatedDevicesUpdated {
            m.updateAllocatedDevices(m.activePods())
            allocatedDevicesUpdated = true
        }
        // 得到分配给该容器的设备
        allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed, devicesToReuse[resource])
        ...
        // 获得与该resourceName对应的endpoint, endpoint可以与该注册此resourceName的device plugin发送请求
        eI, ok := m.endpoints[resource]
        ...
        devs := allocDevices.UnsortedList()
        // 向device plugin发送请求 根据这些设备得到相关信息
        // 比如nvidia device plugin 会返回NVIDIA_VISIBLE_DEVICES=UUID of devs(那些分配的gpu的uuid)
        resp, err := eI.e.allocate(devs)
        ...
        // 加入到podDeivce中
        m.podDevices.insert(podUID, contName, resource, allocDevices, resp.ContainerResponses[0])
        ...
    }
    // 持久化到kubelet_internal_checkpoint中
    return m.writeCheckpoint()
}

// k8s-device-plugin/server.go
func (m *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
    devs := m.devs
    name := fmt.Sprintf("NVIDIA_VISIBLE_DEVICES/%v", resourceName)
    responses := pluginapi.AllocateResponse{}
    for _, req := range reqs.ContainerRequests {
        response := pluginapi.ContainerAllocateResponse{
            Envs: map[string]string{
                name: strings.Join(req.DevicesIDs, ","),
            },
        }

        for _, id := range req.DevicesIDs {
            if !deviceExists(devs, id) {
                return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id)
            }
        }

        responses.ContainerResponses = append(responses.ContainerResponses, &response)
    }

    return &responses, nil
}
updateAllocatedDevices
func (m *ManagerImpl) updateAllocatedDevices(activePods []*v1.Pod) {
    if !m.sourcesReady.AllReady() {
        return
    }
    m.mutex.Lock()
    defer m.mutex.Unlock()
    activePodUids := sets.NewString()
    for _, pod := range activePods {
        activePodUids.Insert(string(pod.UID))
    }
    // allocatedPodUids代表device manager目前保存了哪些pods
    // activePodUids代表节点上真正还在运行的pods
    allocatedPodUids := m.podDevices.pods()
    // podsToBeRemoved代表那些在该节点上占有资源的pods已经terminating了
    podsToBeRemoved := allocatedPodUids.Difference(activePodUids)
    if len(podsToBeRemoved) <= 0 {
        return
    }
    klog.V(3).Infof("pods to be removed: %v", podsToBeRemoved.List())
    m.podDevices.delete(podsToBeRemoved.List())
    // Regenerated allocatedDevices after we update pod allocation information.
    m.allocatedDevices = m.podDevices.devices()
}
devicesToAllocate
func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int, reusableDevices sets.String) (sets.String, error) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    needed := required
    // Gets list of devices that have already been allocated.
    // This can happen if a container restarts for example.
    // 查看该container是否有分配过信息
    devices := m.podDevices.containerDevices(podUID, contName, resource)
    if devices != nil {
        klog.V(3).Infof("Found pre-allocated devices for resource %s container %q in Pod %q: %v", resource, contName, podUID, devices.List())
        needed = needed - devices.Len()
        // A pod's resource is not expected to change once admitted by the API server,
        // so just fail loudly here. We can revisit this part if this no longer holds.
        if needed != 0 {
            // 如果以前分配的设备数量与当前要求的数量不一致 返回错误
            return nil, fmt.Errorf("pod %q container %q changed request for resource %q from %d to %d", podUID, contName, resource, devices.Len(), required)
        }
    }
    if needed == 0 {
        // No change, no work.
        return nil, nil
    }
    klog.V(3).Infof("Needs to allocate %d %q for pod %q container %q", needed, resource, podUID, contName)
    // Needs to allocate additional devices.
    if _, ok := m.healthyDevices[resource]; !ok {
        return nil, fmt.Errorf("can't allocate unregistered device %s", resource)
    }
    devices = sets.NewString()
    // Allocates from reusableDevices list first.
    // 从initContainer中取 当然也可以从上一个InitContainer中取 因为InitContainer是一个一个运行的
    for device := range reusableDevices {
        devices.Insert(device)
        needed--
        if needed == 0 {
            return devices, nil
        }
    }
    // Needs to allocate additional devices.
    if m.allocatedDevices[resource] == nil {
        m.allocatedDevices[resource] = sets.NewString()
    }
    // Gets Devices in use.
    devicesInUse := m.allocatedDevices[resource]
    // Gets a list of available devices.
    available := m.healthyDevices[resource].Difference(devicesInUse)
    if int(available.Len()) < needed {
        return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len())
    }
    allocated := available.UnsortedList()[:needed]
    // 更新到allocatedDevices中
    for _, device := range allocated {
        m.allocatedDevices[resource].Insert(device)
        devices.Insert(device)
    }
    return devices, nil
}
func (m *ManagerImpl) allocatePodResources(pod *v1.Pod) error {
    devicesToReuse := make(map[string]sets.String)
    for _, container := range pod.Spec.InitContainers {
        ...
        m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
    }
    for _, container := range pod.Spec.Containers {
        ...
        m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
    }
    return nil
}

总结

精彩评论(0)

0 0 举报