package lease import ( "context" "errors" "time" coordinationv1 "k8s.io/api/coordination/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) var ErrLocked = errors.New("lease is held by another holder") // Locker manages a Kubernetes Lease object for distributed locking. type Locker struct { client kubernetes.Interface namespace string leaseName string holderID string ttlSeconds int } // New builds an in-cluster Kubernetes client and returns a Locker. func New(namespace, leaseName, holderID string, ttlSeconds int) (*Locker, error) { cfg, err := rest.InClusterConfig() if err != nil { return nil, err } client, err := kubernetes.NewForConfig(cfg) if err != nil { return nil, err } return &Locker{ client: client, namespace: namespace, leaseName: leaseName, holderID: holderID, ttlSeconds: ttlSeconds, }, nil } // Acquire tries to create the Lease. If it already exists and is stale // (renewTime + TTL is in the past) it deletes and recreates it. If held by // another identity and not stale, it returns ErrLocked. func (l *Locker) Acquire(ctx context.Context) error { now := metav1.NewMicroTime(time.Now()) ttl := int32(l.ttlSeconds) desired := &coordinationv1.Lease{ ObjectMeta: metav1.ObjectMeta{ Name: l.leaseName, Namespace: l.namespace, }, Spec: coordinationv1.LeaseSpec{ HolderIdentity: &l.holderID, LeaseDurationSeconds: &ttl, AcquireTime: &now, RenewTime: &now, }, } _, err := l.client.CoordinationV1().Leases(l.namespace).Create(ctx, desired, metav1.CreateOptions{}) if err == nil { return nil } if !k8serrors.IsAlreadyExists(err) { return err } // Lease already exists — inspect it. existing, err := l.client.CoordinationV1().Leases(l.namespace).Get(ctx, l.leaseName, metav1.GetOptions{}) if err != nil { return err } if isStale(existing, l.ttlSeconds) { // Delete the stale lease and recreate. if err := l.client.CoordinationV1().Leases(l.namespace).Delete(ctx, l.leaseName, metav1.DeleteOptions{}); err != nil && !k8serrors.IsNotFound(err) { return err } _, err = l.client.CoordinationV1().Leases(l.namespace).Create(ctx, desired, metav1.CreateOptions{}) return err } return ErrLocked } // Release deletes the Lease if its holderIdentity matches our holderID. func (l *Locker) Release(ctx context.Context) error { existing, err := l.client.CoordinationV1().Leases(l.namespace).Get(ctx, l.leaseName, metav1.GetOptions{}) if err != nil { if k8serrors.IsNotFound(err) { return nil } return err } if existing.Spec.HolderIdentity == nil || *existing.Spec.HolderIdentity != l.holderID { return nil } err = l.client.CoordinationV1().Leases(l.namespace).Delete(ctx, l.leaseName, metav1.DeleteOptions{}) if k8serrors.IsNotFound(err) { return nil } return err } // StartHeartbeat starts a goroutine that updates renewTime every ttl/3 seconds // until ctx is cancelled, preventing the lock from going stale during long syncs. func (l *Locker) StartHeartbeat(ctx context.Context) { interval := time.Duration(l.ttlSeconds) * time.Second / 3 go func() { ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: _ = l.updateRenewTime(ctx) } } }() } // updateRenewTime patches the Lease's renewTime to now, keeping the lock alive. func (l *Locker) updateRenewTime(ctx context.Context) error { existing, err := l.client.CoordinationV1().Leases(l.namespace).Get(ctx, l.leaseName, metav1.GetOptions{}) if err != nil { return err } if existing.Spec.HolderIdentity == nil || *existing.Spec.HolderIdentity != l.holderID { return ErrLocked } now := metav1.NewMicroTime(time.Now()) existing.Spec.RenewTime = &now _, err = l.client.CoordinationV1().Leases(l.namespace).Update(ctx, existing, metav1.UpdateOptions{}) return err } // isStale returns true if the lease's renewTime + TTL is in the past. func isStale(lease *coordinationv1.Lease, ttlSeconds int) bool { if lease.Spec.RenewTime == nil { return true } expiry := lease.Spec.RenewTime.Time.Add(time.Duration(ttlSeconds) * time.Second) return time.Now().After(expiry) }