1. 前言
Manager
// Manager manages all the Device Plugins running on a node.
type Manager interface {
// Start starts device plugin registration service.
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error
// Allocate configures and assigns devices to pods. The pods are provided
// through the pod admission attributes in the attrs argument. From the
// requested device resources, Allocate will communicate with the owning
// device plugin to allow setup procedures to take place, and for the
// device plugin to provide runtime settings to use the device (environment
// variables, mount points and device files). The node object is provided
// for the device manager to update the node capacity to reflect the
// currently available devices.
Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error
// Stop stops the manager.
Stop() error
// GetDeviceRunContainerOptions checks whether we have cached containerDevices
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
// for the found one. An empty struct is returned in case no cached state is found.
GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error)
// GetCapacity returns the amount of available device plugin resource capacity, resource allocatable
// and inactive device plugin resources previously registered on the node.
GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
GetWatcherHandler() watcher.PluginHandler
// GetDevices returns information about the devices assigned to pods and containers
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
}
// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.
type DeviceRunContainerOptions struct {
// The environment variables list.
Envs []kubecontainer.EnvVar
// The mounts for the container.
Mounts []kubecontainer.Mount
// The host devices mapped into the container.
Devices []kubecontainer.DeviceInfo
// The Annotations for the container
Annotations []kubecontainer.Annotation
}
ManagerImpl
type monitorCallback func(resourceName string, devices []pluginapi.Device)
type ManagerImpl struct {
// 地址 /var/lib/kubelet/device-plugins/kubelet.sock
socketname string
socketdir string
// resouceName与对应的endpoint
endpoints map[string]endpointInfo // Key is ResourceName
mutex sync.Mutex
// grpc
server *grpc.Server
wg sync.WaitGroup
// 该方法用于得到节点中active pods, 可以用于更新节点中的资源信息
// 因为有些占有资源的pod已经运行完了, 就需要回收该pod的资源更新到device manager中
activePods ActivePodsFunc
sourcesReady config.SourcesReady
// 回调函数
callback monitorCallback
// resouceName以及它所有healthy的设备
healthyDevices map[string]sets.String
// resouceName以及它所有unhealthy的设备
unhealthyDevices map[string]sets.String
// resouceName以及它已经分配出去的设备
allocatedDevices map[string]sets.String
// podDeivces保存着pod与该pod拥有的资源设备信息
podDevices podDevices
// 持久化
checkpointManager checkpointmanager.CheckpointManager
}
type endpointInfo struct {
e endpoint
opts *pluginapi.DevicePluginOptions
}
func NewManagerImpl() (*ManagerImpl, error) {
// pluginapi.KubeletSocket=/var/lib/kubelet/device-plugins/kubelet.sock
return newManagerImpl(pluginapi.KubeletSocket)
}
func newManagerImpl(socketPath string) (*ManagerImpl, error) {
klog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)
if socketPath == "" || !filepath.IsAbs(socketPath) {
return nil, fmt.Errorf(errBadSocket+" %s", socketPath)
}
dir, file := filepath.Split(socketPath)
manager := &ManagerImpl{
endpoints: make(map[string]endpointInfo),
socketname: file,
socketdir: dir,
healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
}
manager.callback = manager.genericDeviceUpdateCallback
// The following structs are populated with real implementations in manager.Start()
// Before that, initializes them to perform no-op operations.
// 在调用start方法的时候会传入新的activePods和sourcesReady
manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
manager.sourcesReady = &sourcesReadyStub{}
checkpointManager, err := checkpointmanager.NewCheckpointManager(dir)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
}
manager.checkpointManager = checkpointManager
return manager, nil
}
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
...
m.activePods = activePods
m.sourcesReady = sourcesReady
...
}
Allocate
func (m *ManagerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
// 要申请资源的pod
pod := attrs.Pod
// 尝试为该pod分配资源
err := m.allocatePodResources(pod)
...
// 再次确认分配是否成功
if _, podRequireDevicePluginResource := m.podDevices[string(pod.UID)]; !podRequireDevicePluginResource {
return nil
}
// 分配成功 调整节点信息
m.sanitizeNodeAllocatable(node)
return nil
}
allocatePodResources
func (m *ManagerImpl) allocatePodResources(pod *v1.Pod) error {
devicesToReuse := make(map[string]sets.String)
for _, container := range pod.Spec.InitContainers {
if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
return err
}
m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
}
for _, container := range pod.Spec.Containers {
if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
return err
}
m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
}
return nil
}
allocateContainerResources
func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.String) error {
podUID := string(pod.UID)
contName := container.Name
allocatedDevicesUpdated := false
for k, v := range container.Resources.Limits {
resource := string(k)
needed := int(v.Value())
klog.V(3).Infof("needs %d %s", needed, resource)
if !m.isDevicePluginResource(resource) {
continue
}
// 更新一次资源设备信息
if !allocatedDevicesUpdated {
m.updateAllocatedDevices(m.activePods())
allocatedDevicesUpdated = true
}
// 得到分配给该容器的设备
allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed, devicesToReuse[resource])
...
// 获得与该resourceName对应的endpoint, endpoint可以与该注册此resourceName的device plugin发送请求
eI, ok := m.endpoints[resource]
...
devs := allocDevices.UnsortedList()
// 向device plugin发送请求 根据这些设备得到相关信息
// 比如nvidia device plugin 会返回NVIDIA_VISIBLE_DEVICES=UUID of devs(那些分配的gpu的uuid)
resp, err := eI.e.allocate(devs)
...
// 加入到podDeivce中
m.podDevices.insert(podUID, contName, resource, allocDevices, resp.ContainerResponses[0])
...
}
// 持久化到kubelet_internal_checkpoint中
return m.writeCheckpoint()
}
// k8s-device-plugin/server.go
func (m *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
devs := m.devs
name := fmt.Sprintf("NVIDIA_VISIBLE_DEVICES/%v", resourceName)
responses := pluginapi.AllocateResponse{}
for _, req := range reqs.ContainerRequests {
response := pluginapi.ContainerAllocateResponse{
Envs: map[string]string{
name: strings.Join(req.DevicesIDs, ","),
},
}
for _, id := range req.DevicesIDs {
if !deviceExists(devs, id) {
return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id)
}
}
responses.ContainerResponses = append(responses.ContainerResponses, &response)
}
return &responses, nil
}
updateAllocatedDevices
func (m *ManagerImpl) updateAllocatedDevices(activePods []*v1.Pod) {
if !m.sourcesReady.AllReady() {
return
}
m.mutex.Lock()
defer m.mutex.Unlock()
activePodUids := sets.NewString()
for _, pod := range activePods {
activePodUids.Insert(string(pod.UID))
}
// allocatedPodUids代表device manager目前保存了哪些pods
// activePodUids代表节点上真正还在运行的pods
allocatedPodUids := m.podDevices.pods()
// podsToBeRemoved代表那些在该节点上占有资源的pods已经terminating了
podsToBeRemoved := allocatedPodUids.Difference(activePodUids)
if len(podsToBeRemoved) <= 0 {
return
}
klog.V(3).Infof("pods to be removed: %v", podsToBeRemoved.List())
m.podDevices.delete(podsToBeRemoved.List())
// Regenerated allocatedDevices after we update pod allocation information.
m.allocatedDevices = m.podDevices.devices()
}
devicesToAllocate
func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int, reusableDevices sets.String) (sets.String, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
needed := required
// Gets list of devices that have already been allocated.
// This can happen if a container restarts for example.
// 查看该container是否有分配过信息
devices := m.podDevices.containerDevices(podUID, contName, resource)
if devices != nil {
klog.V(3).Infof("Found pre-allocated devices for resource %s container %q in Pod %q: %v", resource, contName, podUID, devices.List())
needed = needed - devices.Len()
// A pod's resource is not expected to change once admitted by the API server,
// so just fail loudly here. We can revisit this part if this no longer holds.
if needed != 0 {
// 如果以前分配的设备数量与当前要求的数量不一致 返回错误
return nil, fmt.Errorf("pod %q container %q changed request for resource %q from %d to %d", podUID, contName, resource, devices.Len(), required)
}
}
if needed == 0 {
// No change, no work.
return nil, nil
}
klog.V(3).Infof("Needs to allocate %d %q for pod %q container %q", needed, resource, podUID, contName)
// Needs to allocate additional devices.
if _, ok := m.healthyDevices[resource]; !ok {
return nil, fmt.Errorf("can't allocate unregistered device %s", resource)
}
devices = sets.NewString()
// Allocates from reusableDevices list first.
// 从initContainer中取 当然也可以从上一个InitContainer中取 因为InitContainer是一个一个运行的
for device := range reusableDevices {
devices.Insert(device)
needed--
if needed == 0 {
return devices, nil
}
}
// Needs to allocate additional devices.
if m.allocatedDevices[resource] == nil {
m.allocatedDevices[resource] = sets.NewString()
}
// Gets Devices in use.
devicesInUse := m.allocatedDevices[resource]
// Gets a list of available devices.
available := m.healthyDevices[resource].Difference(devicesInUse)
if int(available.Len()) < needed {
return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len())
}
allocated := available.UnsortedList()[:needed]
// 更新到allocatedDevices中
for _, device := range allocated {
m.allocatedDevices[resource].Insert(device)
devices.Insert(device)
}
return devices, nil
}
func (m *ManagerImpl) allocatePodResources(pod *v1.Pod) error {
devicesToReuse := make(map[string]sets.String)
for _, container := range pod.Spec.InitContainers {
...
m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
}
for _, container := range pod.Spec.Containers {
...
m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
}
return nil
}
总结