package kube import ( "context" "encoding/json" "fmt" "strings" "time" batchv1 "k8s.io/api/batch/v1" coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" apiresource "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" ) // JobSpec is a self-contained description of a sync job, mirroring db.Job. type JobSpec struct { ID int64 Name string Pair string Direction string Src string Dest string SrcHost string DestHost string CronSchedule string LockTTLSeconds int DryRun bool DeleteMissing bool Workers int MtimeThreshold string Excludes []string Enabled bool } // LockStatus describes the current lock state for a job. type LockStatus struct { Locked bool `json:"locked"` Holder string `json:"holder,omitempty"` ExpiresAt time.Time `json:"expires_at,omitempty"` } // CronJobName returns the K8s CronJob name for a job name. func CronJobName(jobName string) string { name := strings.ToLower(strings.ReplaceAll(jobName, "_", "-")) if len(name) > 52 { name = name[:52] } return "ha-sync-" + name } // LeaseName returns the Lease name for a job name. func LeaseName(jobName string) string { name := strings.ToLower(strings.ReplaceAll(jobName, "_", "-")) if len(name) > 55 { name = name[:55] } return "ha-sync-" + name } func pvcName(host, pair string) string { return fmt.Sprintf("pvc-%s-%s", strings.ToLower(host), strings.ToLower(pair)) } func buildArgs(spec JobSpec) []string { args := []string{ "--src=" + spec.Src, "--dest=" + spec.Dest, "--pair=" + spec.Pair, "--direction=" + spec.Direction, "--log-dir=/var/log/ha-sync", "--src-host=" + spec.SrcHost, "--dest-host=" + spec.DestHost, fmt.Sprintf("--workers=%d", spec.Workers), fmt.Sprintf("--lock-ttl=%d", spec.LockTTLSeconds), } if spec.MtimeThreshold != "" { args = append(args, "--mtime-threshold="+spec.MtimeThreshold) } if spec.DryRun { args = append(args, "--dry-run") } if spec.DeleteMissing { args = append(args, "--delete-missing") } for _, ex := range spec.Excludes { args = append(args, "--exclude="+ex) } return args } func qty(s string) apiresource.Quantity { q, _ := apiresource.ParseQuantity(s) return q } func volumeFromPVC(name, claimName string) corev1.Volume { return corev1.Volume{ Name: name, VolumeSource: corev1.VolumeSource{ PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ClaimName: claimName}, }, } } func buildCronJob(namespace string, spec JobSpec) *batchv1.CronJob { cronName := CronJobName(spec.Name) successLimit := int32(3) failureLimit := int32(3) suspended := !spec.Enabled return &batchv1.CronJob{ TypeMeta: metav1.TypeMeta{APIVersion: "batch/v1", Kind: "CronJob"}, ObjectMeta: metav1.ObjectMeta{ Name: cronName, Namespace: namespace, Labels: map[string]string{ "app": "ha-sync", "ha-sync/job": spec.Name, "ha-sync/pair": spec.Pair, }, Annotations: map[string]string{ "ha-sync/job-id": fmt.Sprintf("%d", spec.ID), }, }, Spec: batchv1.CronJobSpec{ Schedule: spec.CronSchedule, ConcurrencyPolicy: batchv1.ForbidConcurrent, SuccessfulJobsHistoryLimit: &successLimit, FailedJobsHistoryLimit: &failureLimit, Suspend: &suspended, JobTemplate: batchv1.JobTemplateSpec{ Spec: batchv1.JobSpec{ Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ ServiceAccountName: "ha-sync", RestartPolicy: corev1.RestartPolicyOnFailure, Containers: []corev1.Container{{ Name: "ha-sync", Image: "ha-sync:latest", ImagePullPolicy: corev1.PullNever, Command: []string{"/usr/local/bin/ha-sync"}, Args: buildArgs(spec), Env: []corev1.EnvVar{ { Name: "HA_SYNC_DB_DSN", ValueFrom: &corev1.EnvVarSource{ SecretKeyRef: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{Name: "ha-sync-db-secret"}, Key: "HA_SYNC_DB_DSN", }, }, }, { Name: "POD_NAME", ValueFrom: &corev1.EnvVarSource{ FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"}, }, }, }, VolumeMounts: []corev1.VolumeMount{ {Name: "src-data", MountPath: spec.Src}, {Name: "dest-data", MountPath: spec.Dest}, {Name: "logs", MountPath: "/var/log/ha-sync"}, }, Resources: corev1.ResourceRequirements{ Requests: corev1.ResourceList{ corev1.ResourceCPU: qty("50m"), corev1.ResourceMemory: qty("64Mi"), }, Limits: corev1.ResourceList{ corev1.ResourceCPU: qty("500m"), corev1.ResourceMemory: qty("256Mi"), }, }, }}, Volumes: []corev1.Volume{ volumeFromPVC("src-data", pvcName(spec.SrcHost, spec.Pair)), volumeFromPVC("dest-data", pvcName(spec.DestHost, spec.Pair)), volumeFromPVC("logs", "pvc-ha-sync-logs"), }, }, }, }, }, }, } } // ApplyCronJob creates or server-side-applies the K8s CronJob for a job spec. func ApplyCronJob(ctx context.Context, client kubernetes.Interface, namespace string, spec JobSpec) error { desired := buildCronJob(namespace, spec) data, err := json.Marshal(desired) if err != nil { return fmt.Errorf("kube: marshal cronjob: %w", err) } _, err = client.BatchV1().CronJobs(namespace).Patch(ctx, desired.Name, types.ApplyPatchType, data, metav1.PatchOptions{FieldManager: "ha-sync-ctl", Force: boolPtr(true)}) if err != nil { return fmt.Errorf("kube: apply cronjob %s: %w", desired.Name, err) } return nil } // DeleteCronJob removes the K8s CronJob for a job (ignores not-found). func DeleteCronJob(ctx context.Context, client kubernetes.Interface, namespace, jobName string) error { cronName := CronJobName(jobName) err := client.BatchV1().CronJobs(namespace).Delete(ctx, cronName, metav1.DeleteOptions{}) if err != nil && !k8serrors.IsNotFound(err) { return fmt.Errorf("kube: delete cronjob %s: %w", cronName, err) } return nil } // TriggerJob creates a one-off Job from the CronJob. func TriggerJob(ctx context.Context, client kubernetes.Interface, namespace, jobName string) (string, error) { cronName := CronJobName(jobName) cj, err := client.BatchV1().CronJobs(namespace).Get(ctx, cronName, metav1.GetOptions{}) if err != nil { return "", fmt.Errorf("kube: get cronjob %s: %w", cronName, err) } ts := time.Now().UTC().Format("150405") manualName := fmt.Sprintf("%s-manual-%s", cronName, ts) if len(manualName) > 63 { manualName = manualName[:63] } job := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: manualName, Namespace: namespace, Labels: cj.Labels, Annotations: map[string]string{ "ha-sync/triggered": "manual", }, }, Spec: cj.Spec.JobTemplate.Spec, } created, err := client.BatchV1().Jobs(namespace).Create(ctx, job, metav1.CreateOptions{}) if err != nil { return "", fmt.Errorf("kube: create manual job: %w", err) } return created.Name, nil } // GetLockStatus checks the K8s Lease to determine if a job is actively running. func GetLockStatus(ctx context.Context, client kubernetes.Interface, namespace, jobName string) (LockStatus, error) { leaseName := LeaseName(jobName) lease, err := client.CoordinationV1().Leases(namespace).Get(ctx, leaseName, metav1.GetOptions{}) if err != nil { if k8serrors.IsNotFound(err) { return LockStatus{Locked: false}, nil } return LockStatus{}, fmt.Errorf("kube: get lease %s: %w", leaseName, err) } return leaseStatus(lease), nil } func leaseStatus(lease *coordinationv1.Lease) LockStatus { if lease.Spec.RenewTime == nil || lease.Spec.LeaseDurationSeconds == nil || lease.Spec.HolderIdentity == nil { return LockStatus{Locked: false} } ttl := time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second expiresAt := lease.Spec.RenewTime.Time.Add(ttl) if time.Now().After(expiresAt) { return LockStatus{Locked: false} } return LockStatus{ Locked: true, Holder: *lease.Spec.HolderIdentity, ExpiresAt: expiresAt, } } // SuspendCronJob sets the suspend flag on a CronJob without changing other config. func SuspendCronJob(ctx context.Context, client kubernetes.Interface, namespace, jobName string, suspend bool) error { cronName := CronJobName(jobName) patch := fmt.Sprintf(`{"spec":{"suspend":%v}}`, suspend) _, err := client.BatchV1().CronJobs(namespace).Patch(ctx, cronName, types.MergePatchType, []byte(patch), metav1.PatchOptions{}) if err != nil && !k8serrors.IsNotFound(err) { return fmt.Errorf("kube: suspend %s=%v: %w", cronName, suspend, err) } return nil } // ListCronJobs returns all CronJobs with the ha-sync app label. func ListCronJobs(ctx context.Context, client kubernetes.Interface, namespace string) ([]batchv1.CronJob, error) { list, err := client.BatchV1().CronJobs(namespace).List(ctx, metav1.ListOptions{ LabelSelector: "app=ha-sync", }) if err != nil { return nil, fmt.Errorf("kube: list cronjobs: %w", err) } return list.Items, nil } // ImportFromCronJob extracts a JobSpec from an existing K8s CronJob. func ImportFromCronJob(cj batchv1.CronJob) *JobSpec { jobName := cj.Labels["ha-sync/job"] pair := cj.Labels["ha-sync/pair"] if jobName == "" { name := strings.TrimPrefix(cj.Name, "ha-sync-") if name == cj.Name { return nil } jobName = name } spec := &JobSpec{ Name: jobName, Pair: pair, CronSchedule: cj.Spec.Schedule, Enabled: cj.Spec.Suspend == nil || !*cj.Spec.Suspend, LockTTLSeconds: 3600, Workers: 4, MtimeThreshold: "2s", SrcHost: "dell", DestHost: "hp", } containers := cj.Spec.JobTemplate.Spec.Template.Spec.Containers if len(containers) == 0 { return spec } for _, arg := range containers[0].Args { switch { case strings.HasPrefix(arg, "--src="): spec.Src = strings.TrimPrefix(arg, "--src=") case strings.HasPrefix(arg, "--dest="): spec.Dest = strings.TrimPrefix(arg, "--dest=") case strings.HasPrefix(arg, "--pair="): spec.Pair = strings.TrimPrefix(arg, "--pair=") case strings.HasPrefix(arg, "--direction="): spec.Direction = strings.TrimPrefix(arg, "--direction=") case strings.HasPrefix(arg, "--src-host="): spec.SrcHost = strings.TrimPrefix(arg, "--src-host=") case strings.HasPrefix(arg, "--dest-host="): spec.DestHost = strings.TrimPrefix(arg, "--dest-host=") case arg == "--dry-run": spec.DryRun = true case arg == "--delete-missing": spec.DeleteMissing = true case strings.HasPrefix(arg, "--exclude="): spec.Excludes = append(spec.Excludes, strings.TrimPrefix(arg, "--exclude=")) } } return spec } func boolPtr(b bool) *bool { return &b }