homelab/services/ha-sync/internal/lease/lease.go
Dan V deb6c38d7b chore: commit homelab setup — deployment, services, orchestration, skill
- Add .gitignore: exclude compiled binaries, build artifacts, and Helm
  values files containing real secrets (authentik, prometheus)
- Add all Kubernetes deployment manifests (deployment/)
- Add services source code: ha-sync, device-inventory, games-console,
  paperclip, parts-inventory
- Add Ansible orchestration: playbooks, roles, inventory, cloud-init
- Add hardware specs, execution plans, scripts, HOMELAB.md
- Add skills/homelab/SKILL.md + skills/install.sh to preserve Copilot skill
- Remove previously-tracked inventory-cli binary from git index

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-09 08:10:32 +02:00

154 lines
4.2 KiB
Go

package lease
import (
"context"
"errors"
"time"
coordinationv1 "k8s.io/api/coordination/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
var ErrLocked = errors.New("lease is held by another holder")
// Locker manages a Kubernetes Lease object for distributed locking.
type Locker struct {
client kubernetes.Interface
namespace string
leaseName string
holderID string
ttlSeconds int
}
// New builds an in-cluster Kubernetes client and returns a Locker.
func New(namespace, leaseName, holderID string, ttlSeconds int) (*Locker, error) {
cfg, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
client, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, err
}
return &Locker{
client: client,
namespace: namespace,
leaseName: leaseName,
holderID: holderID,
ttlSeconds: ttlSeconds,
}, nil
}
// Acquire tries to create the Lease. If it already exists and is stale
// (renewTime + TTL is in the past) it deletes and recreates it. If held by
// another identity and not stale, it returns ErrLocked.
func (l *Locker) Acquire(ctx context.Context) error {
now := metav1.NewMicroTime(time.Now())
ttl := int32(l.ttlSeconds)
desired := &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: l.leaseName,
Namespace: l.namespace,
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: &l.holderID,
LeaseDurationSeconds: &ttl,
AcquireTime: &now,
RenewTime: &now,
},
}
_, err := l.client.CoordinationV1().Leases(l.namespace).Create(ctx, desired, metav1.CreateOptions{})
if err == nil {
return nil
}
if !k8serrors.IsAlreadyExists(err) {
return err
}
// Lease already exists — inspect it.
existing, err := l.client.CoordinationV1().Leases(l.namespace).Get(ctx, l.leaseName, metav1.GetOptions{})
if err != nil {
return err
}
if isStale(existing, l.ttlSeconds) {
// Delete the stale lease and recreate.
if err := l.client.CoordinationV1().Leases(l.namespace).Delete(ctx, l.leaseName, metav1.DeleteOptions{}); err != nil && !k8serrors.IsNotFound(err) {
return err
}
_, err = l.client.CoordinationV1().Leases(l.namespace).Create(ctx, desired, metav1.CreateOptions{})
return err
}
return ErrLocked
}
// Release deletes the Lease if its holderIdentity matches our holderID.
func (l *Locker) Release(ctx context.Context) error {
existing, err := l.client.CoordinationV1().Leases(l.namespace).Get(ctx, l.leaseName, metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
if existing.Spec.HolderIdentity == nil || *existing.Spec.HolderIdentity != l.holderID {
return nil
}
err = l.client.CoordinationV1().Leases(l.namespace).Delete(ctx, l.leaseName, metav1.DeleteOptions{})
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
// StartHeartbeat starts a goroutine that updates renewTime every ttl/3 seconds
// until ctx is cancelled, preventing the lock from going stale during long syncs.
func (l *Locker) StartHeartbeat(ctx context.Context) {
interval := time.Duration(l.ttlSeconds) * time.Second / 3
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_ = l.updateRenewTime(ctx)
}
}
}()
}
// updateRenewTime patches the Lease's renewTime to now, keeping the lock alive.
func (l *Locker) updateRenewTime(ctx context.Context) error {
existing, err := l.client.CoordinationV1().Leases(l.namespace).Get(ctx, l.leaseName, metav1.GetOptions{})
if err != nil {
return err
}
if existing.Spec.HolderIdentity == nil || *existing.Spec.HolderIdentity != l.holderID {
return ErrLocked
}
now := metav1.NewMicroTime(time.Now())
existing.Spec.RenewTime = &now
_, err = l.client.CoordinationV1().Leases(l.namespace).Update(ctx, existing, metav1.UpdateOptions{})
return err
}
// isStale returns true if the lease's renewTime + TTL is in the past.
func isStale(lease *coordinationv1.Lease, ttlSeconds int) bool {
if lease.Spec.RenewTime == nil {
return true
}
expiry := lease.Spec.RenewTime.Time.Add(time.Duration(ttlSeconds) * time.Second)
return time.Now().After(expiry)
}