- internal/kube/client.go: NewClient() with in-cluster + kubeconfig fallback - internal/kube/cronjob.go: JobSpec, ApplyCronJob, DeleteCronJob, TriggerJob, GetLockStatus, SuspendCronJob, ListCronJobs, ImportFromCronJob - Makefile/Dockerfile: add ha-sync-ctl build target - rbac.yaml: add batch/cronjobs+jobs permissions and watch verb on leases Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
357 lines
11 KiB
Go
357 lines
11 KiB
Go
package kube
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
batchv1 "k8s.io/api/batch/v1"
|
|
coordinationv1 "k8s.io/api/coordination/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
|
apiresource "k8s.io/apimachinery/pkg/api/resource"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/client-go/kubernetes"
|
|
)
|
|
|
|
// JobSpec is a self-contained description of a sync job, mirroring db.Job.
|
|
type JobSpec struct {
|
|
ID int64
|
|
Name string
|
|
Pair string
|
|
Direction string
|
|
Src string
|
|
Dest string
|
|
SrcHost string
|
|
DestHost string
|
|
CronSchedule string
|
|
LockTTLSeconds int
|
|
DryRun bool
|
|
DeleteMissing bool
|
|
Workers int
|
|
MtimeThreshold string
|
|
Excludes []string
|
|
Enabled bool
|
|
}
|
|
|
|
// LockStatus describes the current lock state for a job.
|
|
type LockStatus struct {
|
|
Locked bool `json:"locked"`
|
|
Holder string `json:"holder,omitempty"`
|
|
ExpiresAt time.Time `json:"expires_at,omitempty"`
|
|
}
|
|
|
|
// CronJobName returns the K8s CronJob name for a job name.
|
|
func CronJobName(jobName string) string {
|
|
name := strings.ToLower(strings.ReplaceAll(jobName, "_", "-"))
|
|
if len(name) > 52 {
|
|
name = name[:52]
|
|
}
|
|
return "ha-sync-" + name
|
|
}
|
|
|
|
// LeaseName returns the Lease name for a job name.
|
|
func LeaseName(jobName string) string {
|
|
name := strings.ToLower(strings.ReplaceAll(jobName, "_", "-"))
|
|
if len(name) > 55 {
|
|
name = name[:55]
|
|
}
|
|
return "ha-sync-" + name
|
|
}
|
|
|
|
func pvcName(host, pair string) string {
|
|
return fmt.Sprintf("pvc-%s-%s", strings.ToLower(host), strings.ToLower(pair))
|
|
}
|
|
|
|
func buildArgs(spec JobSpec) []string {
|
|
args := []string{
|
|
"--src=" + spec.Src,
|
|
"--dest=" + spec.Dest,
|
|
"--pair=" + spec.Pair,
|
|
"--direction=" + spec.Direction,
|
|
"--log-dir=/var/log/ha-sync",
|
|
"--src-host=" + spec.SrcHost,
|
|
"--dest-host=" + spec.DestHost,
|
|
fmt.Sprintf("--workers=%d", spec.Workers),
|
|
fmt.Sprintf("--lock-ttl=%d", spec.LockTTLSeconds),
|
|
}
|
|
if spec.MtimeThreshold != "" {
|
|
args = append(args, "--mtime-threshold="+spec.MtimeThreshold)
|
|
}
|
|
if spec.DryRun {
|
|
args = append(args, "--dry-run")
|
|
}
|
|
if spec.DeleteMissing {
|
|
args = append(args, "--delete-missing")
|
|
}
|
|
for _, ex := range spec.Excludes {
|
|
args = append(args, "--exclude="+ex)
|
|
}
|
|
return args
|
|
}
|
|
|
|
func qty(s string) apiresource.Quantity {
|
|
q, _ := apiresource.ParseQuantity(s)
|
|
return q
|
|
}
|
|
|
|
func volumeFromPVC(name, claimName string) corev1.Volume {
|
|
return corev1.Volume{
|
|
Name: name,
|
|
VolumeSource: corev1.VolumeSource{
|
|
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ClaimName: claimName},
|
|
},
|
|
}
|
|
}
|
|
|
|
func buildCronJob(namespace string, spec JobSpec) *batchv1.CronJob {
|
|
cronName := CronJobName(spec.Name)
|
|
successLimit := int32(3)
|
|
failureLimit := int32(3)
|
|
suspended := !spec.Enabled
|
|
|
|
return &batchv1.CronJob{
|
|
TypeMeta: metav1.TypeMeta{APIVersion: "batch/v1", Kind: "CronJob"},
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: cronName,
|
|
Namespace: namespace,
|
|
Labels: map[string]string{
|
|
"app": "ha-sync",
|
|
"ha-sync/job": spec.Name,
|
|
"ha-sync/pair": spec.Pair,
|
|
},
|
|
Annotations: map[string]string{
|
|
"ha-sync/job-id": fmt.Sprintf("%d", spec.ID),
|
|
},
|
|
},
|
|
Spec: batchv1.CronJobSpec{
|
|
Schedule: spec.CronSchedule,
|
|
ConcurrencyPolicy: batchv1.ForbidConcurrent,
|
|
SuccessfulJobsHistoryLimit: &successLimit,
|
|
FailedJobsHistoryLimit: &failureLimit,
|
|
Suspend: &suspended,
|
|
JobTemplate: batchv1.JobTemplateSpec{
|
|
Spec: batchv1.JobSpec{
|
|
Template: corev1.PodTemplateSpec{
|
|
Spec: corev1.PodSpec{
|
|
ServiceAccountName: "ha-sync",
|
|
RestartPolicy: corev1.RestartPolicyOnFailure,
|
|
Containers: []corev1.Container{{
|
|
Name: "ha-sync",
|
|
Image: "ha-sync:latest",
|
|
ImagePullPolicy: corev1.PullNever,
|
|
Command: []string{"/usr/local/bin/ha-sync"},
|
|
Args: buildArgs(spec),
|
|
Env: []corev1.EnvVar{
|
|
{
|
|
Name: "HA_SYNC_DB_DSN",
|
|
ValueFrom: &corev1.EnvVarSource{
|
|
SecretKeyRef: &corev1.SecretKeySelector{
|
|
LocalObjectReference: corev1.LocalObjectReference{Name: "ha-sync-db-secret"},
|
|
Key: "HA_SYNC_DB_DSN",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Name: "POD_NAME",
|
|
ValueFrom: &corev1.EnvVarSource{
|
|
FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"},
|
|
},
|
|
},
|
|
},
|
|
VolumeMounts: []corev1.VolumeMount{
|
|
{Name: "src-data", MountPath: spec.Src},
|
|
{Name: "dest-data", MountPath: spec.Dest},
|
|
{Name: "logs", MountPath: "/var/log/ha-sync"},
|
|
},
|
|
Resources: corev1.ResourceRequirements{
|
|
Requests: corev1.ResourceList{
|
|
corev1.ResourceCPU: qty("50m"),
|
|
corev1.ResourceMemory: qty("64Mi"),
|
|
},
|
|
Limits: corev1.ResourceList{
|
|
corev1.ResourceCPU: qty("500m"),
|
|
corev1.ResourceMemory: qty("256Mi"),
|
|
},
|
|
},
|
|
}},
|
|
Volumes: []corev1.Volume{
|
|
volumeFromPVC("src-data", pvcName(spec.SrcHost, spec.Pair)),
|
|
volumeFromPVC("dest-data", pvcName(spec.DestHost, spec.Pair)),
|
|
volumeFromPVC("logs", "pvc-ha-sync-logs"),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
// ApplyCronJob creates or server-side-applies the K8s CronJob for a job spec.
|
|
func ApplyCronJob(ctx context.Context, client kubernetes.Interface, namespace string, spec JobSpec) error {
|
|
desired := buildCronJob(namespace, spec)
|
|
data, err := json.Marshal(desired)
|
|
if err != nil {
|
|
return fmt.Errorf("kube: marshal cronjob: %w", err)
|
|
}
|
|
_, err = client.BatchV1().CronJobs(namespace).Patch(ctx, desired.Name,
|
|
types.ApplyPatchType, data,
|
|
metav1.PatchOptions{FieldManager: "ha-sync-ctl", Force: boolPtr(true)})
|
|
if err != nil {
|
|
return fmt.Errorf("kube: apply cronjob %s: %w", desired.Name, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DeleteCronJob removes the K8s CronJob for a job (ignores not-found).
|
|
func DeleteCronJob(ctx context.Context, client kubernetes.Interface, namespace, jobName string) error {
|
|
cronName := CronJobName(jobName)
|
|
err := client.BatchV1().CronJobs(namespace).Delete(ctx, cronName, metav1.DeleteOptions{})
|
|
if err != nil && !k8serrors.IsNotFound(err) {
|
|
return fmt.Errorf("kube: delete cronjob %s: %w", cronName, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TriggerJob creates a one-off Job from the CronJob.
|
|
func TriggerJob(ctx context.Context, client kubernetes.Interface, namespace, jobName string) (string, error) {
|
|
cronName := CronJobName(jobName)
|
|
cj, err := client.BatchV1().CronJobs(namespace).Get(ctx, cronName, metav1.GetOptions{})
|
|
if err != nil {
|
|
return "", fmt.Errorf("kube: get cronjob %s: %w", cronName, err)
|
|
}
|
|
|
|
ts := time.Now().UTC().Format("150405")
|
|
manualName := fmt.Sprintf("%s-manual-%s", cronName, ts)
|
|
if len(manualName) > 63 {
|
|
manualName = manualName[:63]
|
|
}
|
|
|
|
job := &batchv1.Job{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: manualName,
|
|
Namespace: namespace,
|
|
Labels: cj.Labels,
|
|
Annotations: map[string]string{
|
|
"ha-sync/triggered": "manual",
|
|
},
|
|
},
|
|
Spec: cj.Spec.JobTemplate.Spec,
|
|
}
|
|
created, err := client.BatchV1().Jobs(namespace).Create(ctx, job, metav1.CreateOptions{})
|
|
if err != nil {
|
|
return "", fmt.Errorf("kube: create manual job: %w", err)
|
|
}
|
|
return created.Name, nil
|
|
}
|
|
|
|
// GetLockStatus checks the K8s Lease to determine if a job is actively running.
|
|
func GetLockStatus(ctx context.Context, client kubernetes.Interface, namespace, jobName string) (LockStatus, error) {
|
|
leaseName := LeaseName(jobName)
|
|
lease, err := client.CoordinationV1().Leases(namespace).Get(ctx, leaseName, metav1.GetOptions{})
|
|
if err != nil {
|
|
if k8serrors.IsNotFound(err) {
|
|
return LockStatus{Locked: false}, nil
|
|
}
|
|
return LockStatus{}, fmt.Errorf("kube: get lease %s: %w", leaseName, err)
|
|
}
|
|
return leaseStatus(lease), nil
|
|
}
|
|
|
|
func leaseStatus(lease *coordinationv1.Lease) LockStatus {
|
|
if lease.Spec.RenewTime == nil || lease.Spec.LeaseDurationSeconds == nil || lease.Spec.HolderIdentity == nil {
|
|
return LockStatus{Locked: false}
|
|
}
|
|
ttl := time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second
|
|
expiresAt := lease.Spec.RenewTime.Time.Add(ttl)
|
|
if time.Now().After(expiresAt) {
|
|
return LockStatus{Locked: false}
|
|
}
|
|
return LockStatus{
|
|
Locked: true,
|
|
Holder: *lease.Spec.HolderIdentity,
|
|
ExpiresAt: expiresAt,
|
|
}
|
|
}
|
|
|
|
// SuspendCronJob sets the suspend flag on a CronJob without changing other config.
|
|
func SuspendCronJob(ctx context.Context, client kubernetes.Interface, namespace, jobName string, suspend bool) error {
|
|
cronName := CronJobName(jobName)
|
|
patch := fmt.Sprintf(`{"spec":{"suspend":%v}}`, suspend)
|
|
_, err := client.BatchV1().CronJobs(namespace).Patch(ctx, cronName,
|
|
types.MergePatchType, []byte(patch), metav1.PatchOptions{})
|
|
if err != nil && !k8serrors.IsNotFound(err) {
|
|
return fmt.Errorf("kube: suspend %s=%v: %w", cronName, suspend, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ListCronJobs returns all CronJobs with the ha-sync app label.
|
|
func ListCronJobs(ctx context.Context, client kubernetes.Interface, namespace string) ([]batchv1.CronJob, error) {
|
|
list, err := client.BatchV1().CronJobs(namespace).List(ctx, metav1.ListOptions{
|
|
LabelSelector: "app=ha-sync",
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kube: list cronjobs: %w", err)
|
|
}
|
|
return list.Items, nil
|
|
}
|
|
|
|
// ImportFromCronJob extracts a JobSpec from an existing K8s CronJob.
|
|
func ImportFromCronJob(cj batchv1.CronJob) *JobSpec {
|
|
jobName := cj.Labels["ha-sync/job"]
|
|
pair := cj.Labels["ha-sync/pair"]
|
|
if jobName == "" {
|
|
name := strings.TrimPrefix(cj.Name, "ha-sync-")
|
|
if name == cj.Name {
|
|
return nil
|
|
}
|
|
jobName = name
|
|
}
|
|
|
|
spec := &JobSpec{
|
|
Name: jobName,
|
|
Pair: pair,
|
|
CronSchedule: cj.Spec.Schedule,
|
|
Enabled: cj.Spec.Suspend == nil || !*cj.Spec.Suspend,
|
|
LockTTLSeconds: 3600,
|
|
Workers: 4,
|
|
MtimeThreshold: "2s",
|
|
SrcHost: "dell",
|
|
DestHost: "hp",
|
|
}
|
|
|
|
containers := cj.Spec.JobTemplate.Spec.Template.Spec.Containers
|
|
if len(containers) == 0 {
|
|
return spec
|
|
}
|
|
for _, arg := range containers[0].Args {
|
|
switch {
|
|
case strings.HasPrefix(arg, "--src="):
|
|
spec.Src = strings.TrimPrefix(arg, "--src=")
|
|
case strings.HasPrefix(arg, "--dest="):
|
|
spec.Dest = strings.TrimPrefix(arg, "--dest=")
|
|
case strings.HasPrefix(arg, "--pair="):
|
|
spec.Pair = strings.TrimPrefix(arg, "--pair=")
|
|
case strings.HasPrefix(arg, "--direction="):
|
|
spec.Direction = strings.TrimPrefix(arg, "--direction=")
|
|
case strings.HasPrefix(arg, "--src-host="):
|
|
spec.SrcHost = strings.TrimPrefix(arg, "--src-host=")
|
|
case strings.HasPrefix(arg, "--dest-host="):
|
|
spec.DestHost = strings.TrimPrefix(arg, "--dest-host=")
|
|
case arg == "--dry-run":
|
|
spec.DryRun = true
|
|
case arg == "--delete-missing":
|
|
spec.DeleteMissing = true
|
|
case strings.HasPrefix(arg, "--exclude="):
|
|
spec.Excludes = append(spec.Excludes, strings.TrimPrefix(arg, "--exclude="))
|
|
}
|
|
}
|
|
return spec
|
|
}
|
|
|
|
func boolPtr(b bool) *bool { return &b }
|