0
点赞
收藏
分享

微信扫一扫

[k8s源码分析][client-go] informer之SharedInformerFactory

1. 前言

2. 例子

package main

import (
    "fmt"
    clientset "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/tools/cache"
    "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/labels"
    "time"
)

func main()  {
    config := &rest.Config{
        Host: "http://172.21.0.16:8080",
    }
    client := clientset.NewForConfigOrDie(config)
    // 生成一个SharedInformerFactory
    factory := informers.NewSharedInformerFactory(client, 5 * time.Second)
    // 生成一个PodInformer
    podInformer := factory.Core().V1().Pods()
    // 获得一个cache.SharedIndexInformer 单例模式
    sharedInformer := podInformer.Informer()

    sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    func(obj interface{}) {fmt.Printf("add: %v\n", obj.(*v1.Pod).Name)},
        UpdateFunc: func(oldObj, newObj interface{}) {fmt.Printf("update: %v\n", newObj.(*v1.Pod).Name)},
        DeleteFunc: func(obj interface{}){fmt.Printf("delete: %v\n", obj.(*v1.Pod).Name)},
    })

    stopCh := make(chan struct{})

    // 第一种方式
    // 可以这样启动  也可以按照下面的方式启动
    // go sharedInformer.Run(stopCh)
    // time.Sleep(2 * time.Second)

    // 第二种方式
    factory.Start(stopCh)
    factory.WaitForCacheSync(stopCh)

    pods, _ := podInformer.Lister().Pods("default").List(labels.Everything())

    for _, p := range pods {
        fmt.Printf("list pods: %v\n", p.Name)
    }
    <- stopCh
}
[root@master kubectl]# ./kubectl get nodes
NAME          STATUS   ROLES    AGE     VERSION
172.21.0.12   Ready    <none>   5d22h   v0.0.0-master+$Format:%h$
172.21.0.16   Ready    <none>   5d22h   v0.0.0-master+$Format:%h$
[root@master kubectl]# ./kubectl get pods --all-namespaces
NAMESPACE   NAME            READY   STATUS    RESTARTS   AGE
default     test            1/1     Running   0          4d4h
default     test-schduler   1/1     Running   0          4d4h
[root@master kubectl]# 
[root@worker tming]# go run main.go 
add: test
add: test-schduler
list pods: test
list pods: test-schduler
update: test-schduler
update: test
update: test
update: test-schduler

3. 源码分析

3.1 接口

// client-go/informers/internalinterfaces/factory_interfaces.go
type NewInformerFunc func(kubernetes.Interface, time.Duration) cache.SharedIndexInformer
type SharedInformerFactory interface {
    Start(stopCh <-chan struct{})
    InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}
type TweakListOptionsFunc func(*v1.ListOptions)

// client-go/informers/factory.go
type SharedInformerFactory interface {
    internalinterfaces.SharedInformerFactory
    ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
    WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
    ...
    Core() core.Interface
    ...
}
// client-go/informers/core/interface.go
type Interface interface {
    // V1 provides access to shared informers for resources in V1.
    V1() v1.Interface
}
// client-go/informers/core/v1/interface.go
type Interface interface {
    ...
    // Nodes returns a NodeInformer.
    Nodes() NodeInformer
    ...
    // Pods returns a PodInformer.
    Pods() PodInformer
    ...
}
// Pods returns a PodInformer.
func (v *version) Pods() PodInformer {
    return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
podInformer
// 该接口有两个方法
// Informer 生成一个 cache.SharedIndexInformer对象
// Lister   生成一个 v1.PodLister对象
type PodInformer interface {
    Informer() cache.SharedIndexInformer
    Lister() v1.PodLister
}
// 接口的实现类
type podInformer struct {
    factory          internalinterfaces.SharedInformerFactory
    tweakListOptions internalinterfaces.TweakListOptionsFunc
    namespace        string
}
Informer方法
// client-go/informers/core/v1/pod.go
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                // api-server的接口
                return client.CoreV1().Pods(namespace).List(options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                // api-server的接口
                return client.CoreV1().Pods(namespace).Watch(options)
            },
        },
        &corev1.Pod{},
        resyncPeriod,
        indexers,
    )
}
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
    return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func (f *podInformer) Informer() cache.SharedIndexInformer {
    return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
// client-go/informers/factory.go
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
    f.lock.Lock()
    defer f.lock.Unlock()

    informerType := reflect.TypeOf(obj)
    informer, exists := f.informers[informerType]
    if exists {
        return informer
    }

    resyncPeriod, exists := f.customResync[informerType]
    if !exists {
        resyncPeriod = f.defaultResync
    }

    informer = newFunc(f.client, resyncPeriod)
    f.informers[informerType] = informer

    return informer
}
Lister方法
// client-go/informers/core/v1/pod.go
func (f *podInformer) Lister() v1.PodLister {
    return v1.NewPodLister(f.Informer().GetIndexer())
}
// client-go/listers/core/v1/pod.go
type PodLister interface {
    List(selector labels.Selector) (ret []*v1.Pod, err error)
    Pods(namespace string) PodNamespaceLister
    PodListerExpansion
}
type podLister struct {
    indexer cache.Indexer
}
func NewPodLister(indexer cache.Indexer) PodLister {
    return &podLister{indexer: indexer}
}

3.2 factory方法

// client-go/informers/factory.go
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    f.lock.Lock()
    defer f.lock.Unlock()

    for informerType, informer := range f.informers {
        if !f.startedInformers[informerType] {
            go informer.Run(stopCh)
            f.startedInformers[informerType] = true
        }
    }
}
// client-go/informers/factory.go
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
    // 收集所有已经启动的informers
    informers := func() map[reflect.Type]cache.SharedIndexInformer {
        f.lock.Lock()
        defer f.lock.Unlock()

        informers := map[reflect.Type]cache.SharedIndexInformer{}
        for informerType, informer := range f.informers {
            if f.startedInformers[informerType] {
                informers[informerType] = informer
            }
        }
        return informers
    }()

    res := map[reflect.Type]bool{}
    for informType, informer := range informers {
        // 等待同步完成
        res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
    }
    return res
}
// client-go/tools/cache/shared_informer.go
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
    err := wait.PollUntil(syncedPollPeriod,
        func() (bool, error) {
            for _, syncFunc := range cacheSyncs {
                if !syncFunc() {
                    return false, nil
                }
            }
            return true, nil
        },
        stopCh)
    if err != nil {
        klog.V(2).Infof("stop requested")
        return false
    }

    klog.V(4).Infof("caches populated")
    return true
}

informer整体

举报

相关推荐

0 条评论