1. 前言
2. 例子
package main
import (
"fmt"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"time"
)
func main() {
config := &rest.Config{
Host: "http://172.21.0.16:8080",
}
client := clientset.NewForConfigOrDie(config)
// 生成一个SharedInformerFactory
factory := informers.NewSharedInformerFactory(client, 5 * time.Second)
// 生成一个PodInformer
podInformer := factory.Core().V1().Pods()
// 获得一个cache.SharedIndexInformer 单例模式
sharedInformer := podInformer.Informer()
sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {fmt.Printf("add: %v\n", obj.(*v1.Pod).Name)},
UpdateFunc: func(oldObj, newObj interface{}) {fmt.Printf("update: %v\n", newObj.(*v1.Pod).Name)},
DeleteFunc: func(obj interface{}){fmt.Printf("delete: %v\n", obj.(*v1.Pod).Name)},
})
stopCh := make(chan struct{})
// 第一种方式
// 可以这样启动 也可以按照下面的方式启动
// go sharedInformer.Run(stopCh)
// time.Sleep(2 * time.Second)
// 第二种方式
factory.Start(stopCh)
factory.WaitForCacheSync(stopCh)
pods, _ := podInformer.Lister().Pods("default").List(labels.Everything())
for _, p := range pods {
fmt.Printf("list pods: %v\n", p.Name)
}
<- stopCh
}
[root@master kubectl]# ./kubectl get nodes
NAME STATUS ROLES AGE VERSION
172.21.0.12 Ready <none> 5d22h v0.0.0-master+$Format:%h$
172.21.0.16 Ready <none> 5d22h v0.0.0-master+$Format:%h$
[root@master kubectl]# ./kubectl get pods --all-namespaces
NAMESPACE NAME READY STATUS RESTARTS AGE
default test 1/1 Running 0 4d4h
default test-schduler 1/1 Running 0 4d4h
[root@master kubectl]#
[root@worker tming]# go run main.go
add: test
add: test-schduler
list pods: test
list pods: test-schduler
update: test-schduler
update: test
update: test
update: test-schduler
3. 源码分析
3.1 接口
// client-go/informers/internalinterfaces/factory_interfaces.go
type NewInformerFunc func(kubernetes.Interface, time.Duration) cache.SharedIndexInformer
type SharedInformerFactory interface {
Start(stopCh <-chan struct{})
InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}
type TweakListOptionsFunc func(*v1.ListOptions)
// client-go/informers/factory.go
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
...
Core() core.Interface
...
}
// client-go/informers/core/interface.go
type Interface interface {
// V1 provides access to shared informers for resources in V1.
V1() v1.Interface
}
// client-go/informers/core/v1/interface.go
type Interface interface {
...
// Nodes returns a NodeInformer.
Nodes() NodeInformer
...
// Pods returns a PodInformer.
Pods() PodInformer
...
}
// Pods returns a PodInformer.
func (v *version) Pods() PodInformer {
return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
podInformer
// 该接口有两个方法
// Informer 生成一个 cache.SharedIndexInformer对象
// Lister 生成一个 v1.PodLister对象
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}
// 接口的实现类
type podInformer struct {
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
namespace string
}
Informer方法
// client-go/informers/core/v1/pod.go
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
// api-server的接口
return client.CoreV1().Pods(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
// api-server的接口
return client.CoreV1().Pods(namespace).Watch(options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
// client-go/informers/factory.go
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
Lister方法
// client-go/informers/core/v1/pod.go
func (f *podInformer) Lister() v1.PodLister {
return v1.NewPodLister(f.Informer().GetIndexer())
}
// client-go/listers/core/v1/pod.go
type PodLister interface {
List(selector labels.Selector) (ret []*v1.Pod, err error)
Pods(namespace string) PodNamespaceLister
PodListerExpansion
}
type podLister struct {
indexer cache.Indexer
}
func NewPodLister(indexer cache.Indexer) PodLister {
return &podLister{indexer: indexer}
}
3.2 factory方法
// client-go/informers/factory.go
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}
// client-go/informers/factory.go
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
// 收集所有已经启动的informers
informers := func() map[reflect.Type]cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informers := map[reflect.Type]cache.SharedIndexInformer{}
for informerType, informer := range f.informers {
if f.startedInformers[informerType] {
informers[informerType] = informer
}
}
return informers
}()
res := map[reflect.Type]bool{}
for informType, informer := range informers {
// 等待同步完成
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
}
return res
}
// client-go/tools/cache/shared_informer.go
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
err := wait.PollUntil(syncedPollPeriod,
func() (bool, error) {
for _, syncFunc := range cacheSyncs {
if !syncFunc() {
return false, nil
}
}
return true, nil
},
stopCh)
if err != nil {
klog.V(2).Infof("stop requested")
return false
}
klog.V(4).Infof("caches populated")
return true
}
informer整体