package db import ( "database/sql" "encoding/json" "fmt" "time" ) // Job represents a row in the sync_jobs table. type Job struct { ID int64 `json:"id"` Name string `json:"name"` Pair string `json:"pair"` Direction string `json:"direction"` Src string `json:"src"` Dest string `json:"dest"` SrcHost string `json:"src_host"` DestHost string `json:"dest_host"` CronSchedule string `json:"cron_schedule"` LockTTLSeconds int `json:"lock_ttl_seconds"` DryRun bool `json:"dry_run"` DeleteMissing bool `json:"delete_missing"` Workers int `json:"workers"` MtimeThreshold string `json:"mtime_threshold"` Excludes []string `json:"excludes"` Enabled bool `json:"enabled"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } // JobInput is used for create/update operations (no ID/timestamps). type JobInput struct { Name string `json:"name"` Pair string `json:"pair"` Direction string `json:"direction"` Src string `json:"src"` Dest string `json:"dest"` SrcHost string `json:"src_host"` DestHost string `json:"dest_host"` CronSchedule string `json:"cron_schedule"` LockTTLSeconds int `json:"lock_ttl_seconds"` DryRun bool `json:"dry_run"` DeleteMissing bool `json:"delete_missing"` Workers int `json:"workers"` MtimeThreshold string `json:"mtime_threshold"` Excludes []string `json:"excludes"` Enabled bool `json:"enabled"` } func (in *JobInput) applyDefaults() { if in.Direction == "" { in.Direction = "fwd" } if in.SrcHost == "" { in.SrcHost = "dell" } if in.DestHost == "" { in.DestHost = "hp" } if in.CronSchedule == "" { in.CronSchedule = "*/15 * * * *" } if in.LockTTLSeconds == 0 { in.LockTTLSeconds = 3600 } if in.Workers == 0 { in.Workers = 4 } if in.MtimeThreshold == "" { in.MtimeThreshold = "2s" } } func marshalExcludes(excludes []string) string { if len(excludes) == 0 { return "[]" } b, _ := json.Marshal(excludes) return string(b) } func unmarshalExcludes(s string) []string { if s == "" || s == "[]" { return nil } var out []string _ = json.Unmarshal([]byte(s), &out) return out } func scanJob(row interface { Scan(dest ...any) error }) (*Job, error) { var j Job var dryRun, deleteMissing, enabled int var excludesJSON string err := row.Scan( &j.ID, &j.Name, &j.Pair, &j.Direction, &j.Src, &j.Dest, &j.SrcHost, &j.DestHost, &j.CronSchedule, &j.LockTTLSeconds, &dryRun, &deleteMissing, &j.Workers, &j.MtimeThreshold, &excludesJSON, &enabled, &j.CreatedAt, &j.UpdatedAt, ) if err != nil { return nil, err } j.DryRun = dryRun == 1 j.DeleteMissing = deleteMissing == 1 j.Enabled = enabled == 1 j.Excludes = unmarshalExcludes(excludesJSON) return &j, nil } const jobSelectCols = `id, name, pair, direction, src, dest, src_host, dest_host, cron_schedule, lock_ttl_seconds, dry_run, delete_missing, workers, mtime_threshold, COALESCE(excludes, '[]'), enabled, created_at, updated_at` // CreateJob inserts a new sync job and returns the created row. func CreateJob(db *sql.DB, in JobInput) (*Job, error) { in.applyDefaults() now := time.Now().UTC() dryRun := 0 if in.DryRun { dryRun = 1 } deleteMissing := 0 if in.DeleteMissing { deleteMissing = 1 } enabled := 0 if in.Enabled { enabled = 1 } res, err := db.Exec(` INSERT INTO sync_jobs (name, pair, direction, src, dest, src_host, dest_host, cron_schedule, lock_ttl_seconds, dry_run, delete_missing, workers, mtime_threshold, excludes, enabled, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`, in.Name, in.Pair, in.Direction, in.Src, in.Dest, in.SrcHost, in.DestHost, in.CronSchedule, in.LockTTLSeconds, dryRun, deleteMissing, in.Workers, in.MtimeThreshold, marshalExcludes(in.Excludes), enabled, now, now, ) if err != nil { return nil, fmt.Errorf("create job: %w", err) } id, _ := res.LastInsertId() return GetJobByID(db, id) } // GetJobByID fetches a job by its numeric ID. func GetJobByID(db *sql.DB, id int64) (*Job, error) { row := db.QueryRow(`SELECT `+jobSelectCols+` FROM sync_jobs WHERE id = ?`, id) j, err := scanJob(row) if err == sql.ErrNoRows { return nil, nil } if err != nil { return nil, fmt.Errorf("get job %d: %w", id, err) } return j, nil } // GetJobByName fetches a job by its unique name. func GetJobByName(db *sql.DB, name string) (*Job, error) { row := db.QueryRow(`SELECT `+jobSelectCols+` FROM sync_jobs WHERE name = ?`, name) j, err := scanJob(row) if err == sql.ErrNoRows { return nil, nil } if err != nil { return nil, fmt.Errorf("get job %q: %w", name, err) } return j, nil } // ListJobs returns all jobs ordered by name. func ListJobs(db *sql.DB) ([]*Job, error) { rows, err := db.Query(`SELECT ` + jobSelectCols + ` FROM sync_jobs ORDER BY name`) if err != nil { return nil, fmt.Errorf("list jobs: %w", err) } defer rows.Close() var jobs []*Job for rows.Next() { j, err := scanJob(rows) if err != nil { return nil, fmt.Errorf("list jobs scan: %w", err) } jobs = append(jobs, j) } return jobs, rows.Err() } // UpdateJob overwrites all mutable fields of a job. func UpdateJob(db *sql.DB, id int64, in JobInput) (*Job, error) { in.applyDefaults() dryRun := 0 if in.DryRun { dryRun = 1 } deleteMissing := 0 if in.DeleteMissing { deleteMissing = 1 } enabled := 0 if in.Enabled { enabled = 1 } _, err := db.Exec(` UPDATE sync_jobs SET pair=?, direction=?, src=?, dest=?, src_host=?, dest_host=?, cron_schedule=?, lock_ttl_seconds=?, dry_run=?, delete_missing=?, workers=?, mtime_threshold=?, excludes=?, enabled=?, updated_at=? WHERE id=?`, in.Pair, in.Direction, in.Src, in.Dest, in.SrcHost, in.DestHost, in.CronSchedule, in.LockTTLSeconds, dryRun, deleteMissing, in.Workers, in.MtimeThreshold, marshalExcludes(in.Excludes), enabled, time.Now().UTC(), id, ) if err != nil { return nil, fmt.Errorf("update job %d: %w", id, err) } return GetJobByID(db, id) } // SetJobEnabled sets the enabled flag and updates updated_at. func SetJobEnabled(db *sql.DB, id int64, enabled bool) error { v := 0 if enabled { v = 1 } _, err := db.Exec(`UPDATE sync_jobs SET enabled=?, updated_at=? WHERE id=?`, v, time.Now().UTC(), id) if err != nil { return fmt.Errorf("set job enabled: %w", err) } return nil } // DeleteJob removes a job by ID. func DeleteJob(db *sql.DB, id int64) error { _, err := db.Exec(`DELETE FROM sync_jobs WHERE id=?`, id) if err != nil { return fmt.Errorf("delete job %d: %w", id, err) } return nil } // JobWithStats embeds a Job plus the last iteration summary. type JobWithStats struct { *Job LastIteration *Iteration `json:"last_iteration,omitempty"` } // ListJobsWithStats returns all jobs joined with their latest real (non-dry-run) iteration. func ListJobsWithStats(db *sql.DB) ([]*JobWithStats, error) { jobs, err := ListJobs(db) if err != nil { return nil, err } out := make([]*JobWithStats, 0, len(jobs)) for _, j := range jobs { row := db.QueryRow(` SELECT id, sync_pair, direction, status, dry_run, started_at, ended_at, files_created, files_updated, files_deleted, files_skipped, files_failed, total_bytes_transferred, error_message FROM sync_iterations WHERE sync_pair = ? AND direction = ? AND dry_run = 0 ORDER BY started_at DESC LIMIT 1`, j.Pair, j.Direction) iter, err := scanIterationRow(row) if err != nil && err != sql.ErrNoRows { return nil, err } ws := &JobWithStats{Job: j} if err == nil { ws.LastIteration = iter } out = append(out, ws) } return out, nil } // Iteration is a minimal type used by jobs.go (mirrors ui handler's type). type Iteration struct { ID int64 `json:"id"` SyncPair string `json:"sync_pair"` Direction string `json:"direction"` Status string `json:"status"` DryRun bool `json:"dry_run"` StartedAt time.Time `json:"started_at"` EndedAt *time.Time `json:"ended_at"` FilesCreated int `json:"files_created"` FilesUpdated int `json:"files_updated"` FilesDeleted int `json:"files_deleted"` FilesSkipped int `json:"files_skipped"` FilesFailed int `json:"files_failed"` TotalBytesTransferred int64 `json:"total_bytes_transferred"` ErrorMessage *string `json:"error_message,omitempty"` } func scanIterationRow(row interface { Scan(dest ...any) error }) (*Iteration, error) { var it Iteration var dryRun int var endedAt sql.NullTime var errMsg sql.NullString err := row.Scan( &it.ID, &it.SyncPair, &it.Direction, &it.Status, &dryRun, &it.StartedAt, &endedAt, &it.FilesCreated, &it.FilesUpdated, &it.FilesDeleted, &it.FilesSkipped, &it.FilesFailed, &it.TotalBytesTransferred, &errMsg, ) if err != nil { return nil, err } it.DryRun = dryRun == 1 if endedAt.Valid { it.EndedAt = &endedAt.Time } if errMsg.Valid { it.ErrorMessage = &errMsg.String } return &it, nil } // ImportJob creates a job from K8s CronJob data without applying K8s changes. // If a job with the same name already exists it returns the existing row. func ImportJob(db *sql.DB, in JobInput) (*Job, error) { in.applyDefaults() existing, err := GetJobByName(db, in.Name) if err != nil { return nil, err } if existing != nil { return existing, nil } return CreateJob(db, in) }