- Add .gitignore: exclude compiled binaries, build artifacts, and Helm values files containing real secrets (authentik, prometheus) - Add all Kubernetes deployment manifests (deployment/) - Add services source code: ha-sync, device-inventory, games-console, paperclip, parts-inventory - Add Ansible orchestration: playbooks, roles, inventory, cloud-init - Add hardware specs, execution plans, scripts, HOMELAB.md - Add skills/homelab/SKILL.md + skills/install.sh to preserve Copilot skill - Remove previously-tracked inventory-cli binary from git index Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
154 lines
4.2 KiB
Go
154 lines
4.2 KiB
Go
package lease
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"time"
|
|
|
|
coordinationv1 "k8s.io/api/coordination/v1"
|
|
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/rest"
|
|
)
|
|
|
|
var ErrLocked = errors.New("lease is held by another holder")
|
|
|
|
// Locker manages a Kubernetes Lease object for distributed locking.
|
|
type Locker struct {
|
|
client kubernetes.Interface
|
|
namespace string
|
|
leaseName string
|
|
holderID string
|
|
ttlSeconds int
|
|
}
|
|
|
|
// New builds an in-cluster Kubernetes client and returns a Locker.
|
|
func New(namespace, leaseName, holderID string, ttlSeconds int) (*Locker, error) {
|
|
cfg, err := rest.InClusterConfig()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
client, err := kubernetes.NewForConfig(cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &Locker{
|
|
client: client,
|
|
namespace: namespace,
|
|
leaseName: leaseName,
|
|
holderID: holderID,
|
|
ttlSeconds: ttlSeconds,
|
|
}, nil
|
|
}
|
|
|
|
// Acquire tries to create the Lease. If it already exists and is stale
|
|
// (renewTime + TTL is in the past) it deletes and recreates it. If held by
|
|
// another identity and not stale, it returns ErrLocked.
|
|
func (l *Locker) Acquire(ctx context.Context) error {
|
|
now := metav1.NewMicroTime(time.Now())
|
|
ttl := int32(l.ttlSeconds)
|
|
|
|
desired := &coordinationv1.Lease{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: l.leaseName,
|
|
Namespace: l.namespace,
|
|
},
|
|
Spec: coordinationv1.LeaseSpec{
|
|
HolderIdentity: &l.holderID,
|
|
LeaseDurationSeconds: &ttl,
|
|
AcquireTime: &now,
|
|
RenewTime: &now,
|
|
},
|
|
}
|
|
|
|
_, err := l.client.CoordinationV1().Leases(l.namespace).Create(ctx, desired, metav1.CreateOptions{})
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
if !k8serrors.IsAlreadyExists(err) {
|
|
return err
|
|
}
|
|
|
|
// Lease already exists — inspect it.
|
|
existing, err := l.client.CoordinationV1().Leases(l.namespace).Get(ctx, l.leaseName, metav1.GetOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if isStale(existing, l.ttlSeconds) {
|
|
// Delete the stale lease and recreate.
|
|
if err := l.client.CoordinationV1().Leases(l.namespace).Delete(ctx, l.leaseName, metav1.DeleteOptions{}); err != nil && !k8serrors.IsNotFound(err) {
|
|
return err
|
|
}
|
|
_, err = l.client.CoordinationV1().Leases(l.namespace).Create(ctx, desired, metav1.CreateOptions{})
|
|
return err
|
|
}
|
|
|
|
return ErrLocked
|
|
}
|
|
|
|
// Release deletes the Lease if its holderIdentity matches our holderID.
|
|
func (l *Locker) Release(ctx context.Context) error {
|
|
existing, err := l.client.CoordinationV1().Leases(l.namespace).Get(ctx, l.leaseName, metav1.GetOptions{})
|
|
if err != nil {
|
|
if k8serrors.IsNotFound(err) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
if existing.Spec.HolderIdentity == nil || *existing.Spec.HolderIdentity != l.holderID {
|
|
return nil
|
|
}
|
|
|
|
err = l.client.CoordinationV1().Leases(l.namespace).Delete(ctx, l.leaseName, metav1.DeleteOptions{})
|
|
if k8serrors.IsNotFound(err) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
// StartHeartbeat starts a goroutine that updates renewTime every ttl/3 seconds
|
|
// until ctx is cancelled, preventing the lock from going stale during long syncs.
|
|
func (l *Locker) StartHeartbeat(ctx context.Context) {
|
|
interval := time.Duration(l.ttlSeconds) * time.Second / 3
|
|
go func() {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
_ = l.updateRenewTime(ctx)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// updateRenewTime patches the Lease's renewTime to now, keeping the lock alive.
|
|
func (l *Locker) updateRenewTime(ctx context.Context) error {
|
|
existing, err := l.client.CoordinationV1().Leases(l.namespace).Get(ctx, l.leaseName, metav1.GetOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if existing.Spec.HolderIdentity == nil || *existing.Spec.HolderIdentity != l.holderID {
|
|
return ErrLocked
|
|
}
|
|
|
|
now := metav1.NewMicroTime(time.Now())
|
|
existing.Spec.RenewTime = &now
|
|
_, err = l.client.CoordinationV1().Leases(l.namespace).Update(ctx, existing, metav1.UpdateOptions{})
|
|
return err
|
|
}
|
|
|
|
// isStale returns true if the lease's renewTime + TTL is in the past.
|
|
func isStale(lease *coordinationv1.Lease, ttlSeconds int) bool {
|
|
if lease.Spec.RenewTime == nil {
|
|
return true
|
|
}
|
|
expiry := lease.Spec.RenewTime.Time.Add(time.Duration(ttlSeconds) * time.Second)
|
|
return time.Now().After(expiry)
|
|
}
|