1. 前言
2. 例子
2.1 创建一个namespace
[root@master kubectl]
NAME STATUS AGE
default Active 21h
kube-public Active 21h
kube-system Active 21h
[root@master kubectl]
namespace/nicktming created
[root@master kubectl]
[root@master kubectl]
NAME STATUS AGE
default Active 21h
kube-public Active 21h
kube-system Active 21h
nicktming Active 3s
[root@master kubectl]
No resources found.
[root@master kubectl]
2.2 代码
package main
import (
"context"
"flag"
"k8s.io/client-go/rest"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog"
"fmt"
)
func main() {
klog.InitFlags(nil)
var endpointLockName string
var endpointLockNamespace string
var id string
flag.StringVar(&id, "id", "", "the holder identity name")
flag.StringVar(&endpointLockName, "lease-lock-name", "example", "the lease lock resource name")
flag.StringVar(&endpointLockNamespace, "lease-lock-namespace", "nicktming", "the lease lock resource namespace")
flag.Parse()
if id == "" {
klog.Fatal("unable to get id (missing id flag).")
}
config := &rest.Config{
Host: "http://172.21.0.16:8080",
}
client := clientset.NewForConfigOrDie(config)
lock := &resourcelock.EndpointsLock{
EndpointsMeta: metav1.ObjectMeta{
Name: endpointLockName,
Namespace: endpointLockNamespace,
},
Client: client.CoreV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
log.Printf("Received termination, signaling shutdown")
cancel()
}()
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.Infof("%s: leading", id)
},
OnStoppedLeading: func() {
klog.Infof("%s: lost", id)
},
OnNewLeader: func(identity string) {
if identity == id {
return
}
klog.Infof("new leader elected: %v", identity)
},
},
})
_, err := client.CoreV1().Endpoints(endpointLockNamespace).Get(endpointLockName, metav1.GetOptions{})
if err == nil || !strings.Contains(err.Error(), "the leader is shutting down") {
log.Fatalf("%s: expected to get an error when trying to make a client call: %v", id, err)
}
log.Printf("%s: done", id)
}
go run test.go --id=1
go run test.go --id=2
go run test.go --id=3
[root@master kubectl]
NAME ENDPOINTS AGE
example <none> 5s
[root@master kubectl]
apiVersion: v1
items:
- apiVersion: v1
kind: Endpoints
metadata:
annotations:
control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"1","leaseDurationSeconds":60,"acquireTime":"2019-10-16T13:15:07Z","renewTime":"2019-10-16T13:23:22Z","leaderTransitions":0}'
creationTimestamp: "2019-10-16T13:15:07Z"
name: example
namespace: nicktming
resourceVersion: "45526"
selfLink: /api/v1/namespaces/nicktming/endpoints/example
uid: f988a420-f016-11e9-a4ad-525400d54f7e
kind: List
metadata:
resourceVersion: ""
selfLink: ""
[root@master kubectl]
[root@worker leaderelection]# go run test.go --id=1
1: leading
[root@worker leaderelection]# go run test.go --id=2
new leader elected: 1
[root@worker leaderelection]# go run test.go --id=3
new leader elected: 1
[root@worker leaderelection]# go run test.go --id=1
1: leading
^CReceived termination, signaling shutdown
1: lost
1: expected to get an error when trying to make a client call: <nil>
[root@worker leaderelection]#
[root@worker leaderelection]# go run test.go --id=2
new leader elected: 1
new leader elected: 3
[root@worker leaderelection]# go run test.go --id=3
new leader elected: 1
3: leading
[root@master kubectl]
NAME ENDPOINTS AGE
example <none> 14m
[root@master kubectl]
apiVersion: v1
items:
- apiVersion: v1
kind: Endpoints
metadata:
annotations:
control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"3","leaseDurationSeconds":60,"acquireTime":"2019-10-16T13:27:16Z","renewTime":"2019-10-16T13:29:56Z","leaderTransitions":1}'
creationTimestamp: "2019-10-16T13:15:07Z"
name: example
namespace: nicktming
resourceVersion: "46112"
selfLink: /api/v1/namespaces/nicktming/endpoints/example
uid: f988a420-f016-11e9-a4ad-525400d54f7e
kind: List
metadata:
resourceVersion: ""
selfLink: ""
3. 源码分析
3.1 Interface接口
type LeaderElectionRecord struct {
HolderIdentity string `json:"holderIdentity"`
LeaseDurationSeconds int `json:"leaseDurationSeconds"`
AcquireTime metav1.Time `json:"acquireTime"`
RenewTime metav1.Time `json:"renewTime"`
LeaderTransitions int `json:"leaderTransitions"`
}
type Interface interface {
Get() (*LeaderElectionRecord, error)
Create(ler LeaderElectionRecord) error
Update(ler LeaderElectionRecord) error
RecordEvent(string)
Identity() string
Describe() string
}
type EndpointsLock struct {
EndpointsMeta metav1.ObjectMeta
Client corev1client.EndpointsGetter
LockConfig ResourceLockConfig
e *v1.Endpoints
}
type ResourceLockConfig struct {
Identity string
EventRecorder EventRecorder
}
func (el *EndpointsLock) Get() (*LeaderElectionRecord, error) {
var record LeaderElectionRecord
var err error
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(el.EndpointsMeta.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
if el.e.Annotations == nil {
el.e.Annotations = make(map[string]string)
}
if recordBytes, found := el.e.Annotations[LeaderElectionRecordAnnotationKey]; found {
if err := json.Unmarshal([]byte(recordBytes), err != nil {
return nil, err
}
}
return &record, nil
}
func (el *EndpointsLock) Create(ler LeaderElectionRecord) error {
recordBytes, err := json.Marshal(ler)
if err != nil {
return err
}
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Create(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: el.EndpointsMeta.Name,
Namespace: el.EndpointsMeta.Namespace,
Annotations: map[string]string{
LeaderElectionRecordAnnotationKey: string(recordBytes),
},
},
})
return err
}
func New(lockType string, ns string, name string, coreClient corev1.CoreV1Interface, coordinationClient coordinationv1.CoordinationV1Interface, rlc ResourceLockConfig) (Interface, error) {
switch lockType {
case EndpointsResourceLock:
return &EndpointsLock{
EndpointsMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Client: coreClient,
LockConfig: rlc,
}, nil
...
default:
return nil, fmt.Errorf("Invalid lock-type %s", lockType)
}
}
3.2 LeaderElector
type LeaderElectionConfig struct {
Lock rl.Interface
LeaseDuration time.Duration
RenewDeadline time.Duration
RetryPeriod time.Duration
Callbacks LeaderCallbacks
WatchDog *HealthzAdaptor
ReleaseOnCancel bool
Name string
}
type LeaderElector struct {
config LeaderElectionConfig
observedRecord rl.LeaderElectionRecord
observedTime time.Time
reportedLeader string
clock clock.Clock
metrics leaderMetricsAdapter
name string
}
func (le *LeaderElector) GetLeader() string {
return le.observedRecord.HolderIdentity
}
func (le *LeaderElector) IsLeader() bool {
return le.observedRecord.HolderIdentity == le.config.Lock.Identity()
}
3.3 Run
func (le *LeaderElector) Run(ctx context.Context) {
defer func() {
runtime.HandleCrash()
le.config.Callbacks.OnStoppedLeading()
}()
if !le.acquire(ctx) {
return
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go le.config.Callbacks.OnStartedLeading(ctx)
le.renew(ctx)
}
3.3.1 acquire
func (le *LeaderElector) maybeReportTransition() {
if le.observedRecord.HolderIdentity == le.reportedLeader {
return
}
le.reportedLeader = le.observedRecord.HolderIdentity
if le.config.Callbacks.OnNewLeader != nil {
go le.config.Callbacks.OnNewLeader(le.reportedLeader)
}
}
func (le *LeaderElector) acquire(ctx context.Context) bool {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
succeeded := false
desc := le.config.Lock.Describe()
klog.Infof("attempting to acquire leader lease %v...", desc)
wait.JitterUntil(func() {
succeeded = le.tryAcquireOrRenew()
le.maybeReportTransition()
if !succeeded {
klog.V(4).Infof("failed to acquire lease %v", desc)
return
}
le.config.Lock.RecordEvent("became leader")
le.metrics.leaderOn(le.config.Name)
klog.Infof("successfully acquired lease %v", desc)
cancel()
}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
return succeeded
}
3.3.2 renew
func (le *LeaderElector) renew(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wait.Until(func() {
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
defer timeoutCancel()
err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
done := make(chan bool, 1)
go func() {
defer close(done)
done <- le.tryAcquireOrRenew()
}()
select {
case <-timeoutCtx.Done():
return false, fmt.Errorf("failed to tryAcquireOrRenew %s", timeoutCtx.Err())
case result := <-done:
return result, nil
}
}, timeoutCtx.Done())
le.maybeReportTransition()
desc := le.config.Lock.Describe()
if err == nil {
klog.V(5).Infof("successfully renewed lease %v", desc)
return
}
le.config.Lock.RecordEvent("stopped leading")
le.metrics.leaderOff(le.config.Name)
klog.Infof("failed to renew lease %v: %v", desc, err)
cancel()
}, le.config.RetryPeriod, ctx.Done())
if le.config.ReleaseOnCancel {
le.release()
}
}
3.3.3 tryAcquireOrRenew
func (le *LeaderElector) tryAcquireOrRenew() bool {
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
RenewTime: now,
AcquireTime: now,
}
oldLeaderElectionRecord, err := le.config.Lock.Get()
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
return false
}
if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
klog.Errorf("error initially creating leader election record: %v", err)
return false
}
le.observedRecord = leaderElectionRecord
le.observedTime = le.clock.Now()
return true
}
if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) {
le.observedRecord = *oldLeaderElectionRecord
le.observedTime = le.clock.Now()
}
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
!le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}
if le.IsLeader() {
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
} else {
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}
if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
return false
}
le.observedRecord = leaderElectionRecord
le.observedTime = le.clock.Now()
return true
}