Kubernetes中的资源锁

client-go提供了资源锁相关工具
tools/leaderelection/resourcelock/interface.go中定义了资源锁接口:
type Interface interface {
    Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)   //获取资源锁的所有信息
    Create(ctx context.Context, ler LeaderElectionRecord) error   //创建资源锁
    Update(ctx context.Context, ler LeaderElectionRecord) error   //更新资源锁
    RecordEvent(string)    //通过EventBroadcaster事件管理器记录事件
    Identity() string      //获取领导者身份标识
    Describe() string      //获取资源锁的信息
}
k8s中实现了三种资源锁,每种资源锁都实现了对key(资源锁)的操作方法
使用EndpointsResourceLock时,key存的是竞选为领导者节点的信息,通过LeaderElectionRecord结构体进行描述:
type LeaderElectionRecord struct {
    HolderIdentity       string      `json:"holderIdentity”`        //领导者身份标识,通常为Hostname_<hash值>
    LeaseDurationSeconds int         `json:"leaseDurationSeconds”`    //领导者租约的时长
    AcquireTime          metav1.Time `json:"acquireTime”`            //领导者获得锁的时间
    RenewTime            metav1.Time `json:"renewTime”`             //领导者续租的时间
    LeaderTransitions    int         `json:"leaderTransitions”`     //领导者选举切换的次数
}
tools/leaderelection/leaderelection.go
func (le *LeaderElector) Run(ctx context.Context) {
   defer func() {
      runtime.HandleCrash()
      le.config.Callbacks.OnStoppedLeading()
   }()
   if !le.acquire(ctx) {    //尝试获得锁
      return 
   }
   ctx, cancel := context.WithCancel(ctx)
   defer cancel()
   go le.config.Callbacks.OnStartedLeading(ctx)   //通过回调函数执行主要逻辑
   le.renew(ctx)                          //对资源锁续约
}
资源锁获取过程:
func (le *LeaderElector) acquire(ctx context.Context) bool {
   ctx, cancel := context.WithCancel(ctx)
   defer cancel()
   succeeded := false
   desc := le.config.Lock.Describe()
   wait.JitterUntil(func() {   
      succeeded = le.tryAcquireOrRenew(ctx)  //获取资源锁
      le.maybeReportTransition()
      if !succeeded {
         return     //获取资源锁失败,return等待下一次定时获取 
      }
      le.config.Lock.RecordEvent("became leader")
      le.metrics.leaderOn(le.config.Name)
      cancel()
   }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())  //通过wait.JitterUntil定时器定时执行匿名函数
   return succeeded
}
tryAcquireOrRenew(ctx):
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
   now := metav1.Now()
   leaderElectionRecord := rl.LeaderElectionRecord{
      HolderIdentity:       le.config.Lock.Identity(),
      LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
      RenewTime:            now,
      AcquireTime:          now,
   }
 
   oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)   //获取资源锁
   if err != nil {
      if !errors.IsNotFound(err) {
         return false
      }
      if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {   //创建资源锁
         return false   //创建失败
      }
      le.observedRecord = leaderElectionRecord
      le.observedTime = le.clock.Now()
      return true   //当前节点成为leader,返回
   }
 
   if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
      le.observedRecord = *oldLeaderElectionRecord
      le.observedRawRecord = oldLeaderElectionRawRecord
      le.observedTime = le.clock.Now()   //更新本地缓存的租约信息
   }
   if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
      le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
      !le.IsLeader() {
      return false   //当前leader租约未到期,暂时不能抢占,返回
   }
 
   if le.IsLeader() {  //当前节点是leader
      leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime  //资源锁获得时间保持不变
      leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions  //leader切换次数保持不变
   } else {
      leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1  //领导者切换次数+1,抢占资源锁
   }
 
   if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {   //尝试更新租约
      return false  //更新不成功,返回
   }
 
   le.observedRecord = leaderElectionRecord
   le.observedTime = le.clock.Now()
   return true
}
资源锁续约过程:
func (le *LeaderElector) renew(ctx context.Context) {
   ctx, cancel := context.WithCancel(ctx)
   defer cancel()
   wait.Until(func() {
      timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
      defer timeoutCancel()
      err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {  
         return le.tryAcquireOrRenew(timeoutCtx), nil   //返回true说明续约成功;返回false则退出并执行le.release释放资源锁
      }, timeoutCtx.Done())      //通过wait.PollImmediateUntil定时器定时执行匿名函数
 
      le.maybeReportTransition()
      desc := le.config.Lock.Describe()
      if err == nil {
         return
      }
      le.config.Lock.RecordEvent("stopped leading")
      le.metrics.leaderOff(le.config.Name)
      cancel()  //更新租约失败
   }, le.config.RetryPeriod, ctx.Done())
 
   if le.config.ReleaseOnCancel {
      le.release()
   }
}
 
原文地址:https://www.cnblogs.com/yangyuliufeng/p/14186751.html