homelab/services/ha-sync/internal/db/jobs.go
Dan V deb6c38d7b chore: commit homelab setup — deployment, services, orchestration, skill
- Add .gitignore: exclude compiled binaries, build artifacts, and Helm
  values files containing real secrets (authentik, prometheus)
- Add all Kubernetes deployment manifests (deployment/)
- Add services source code: ha-sync, device-inventory, games-console,
  paperclip, parts-inventory
- Add Ansible orchestration: playbooks, roles, inventory, cloud-init
- Add hardware specs, execution plans, scripts, HOMELAB.md
- Add skills/homelab/SKILL.md + skills/install.sh to preserve Copilot skill
- Remove previously-tracked inventory-cli binary from git index

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-09 08:10:32 +02:00

343 lines
9.5 KiB
Go

package db
import (
"database/sql"
"encoding/json"
"fmt"
"time"
)
// Job represents a row in the sync_jobs table.
type Job struct {
ID int64 `json:"id"`
Name string `json:"name"`
Pair string `json:"pair"`
Direction string `json:"direction"`
Src string `json:"src"`
Dest string `json:"dest"`
SrcHost string `json:"src_host"`
DestHost string `json:"dest_host"`
CronSchedule string `json:"cron_schedule"`
LockTTLSeconds int `json:"lock_ttl_seconds"`
DryRun bool `json:"dry_run"`
DeleteMissing bool `json:"delete_missing"`
Workers int `json:"workers"`
MtimeThreshold string `json:"mtime_threshold"`
Excludes []string `json:"excludes"`
Enabled bool `json:"enabled"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// JobInput is used for create/update operations (no ID/timestamps).
type JobInput struct {
Name string `json:"name"`
Pair string `json:"pair"`
Direction string `json:"direction"`
Src string `json:"src"`
Dest string `json:"dest"`
SrcHost string `json:"src_host"`
DestHost string `json:"dest_host"`
CronSchedule string `json:"cron_schedule"`
LockTTLSeconds int `json:"lock_ttl_seconds"`
DryRun bool `json:"dry_run"`
DeleteMissing bool `json:"delete_missing"`
Workers int `json:"workers"`
MtimeThreshold string `json:"mtime_threshold"`
Excludes []string `json:"excludes"`
Enabled bool `json:"enabled"`
}
func (in *JobInput) applyDefaults() {
if in.Direction == "" {
in.Direction = "fwd"
}
if in.SrcHost == "" {
in.SrcHost = "dell"
}
if in.DestHost == "" {
in.DestHost = "hp"
}
if in.CronSchedule == "" {
in.CronSchedule = "*/15 * * * *"
}
if in.LockTTLSeconds == 0 {
in.LockTTLSeconds = 3600
}
if in.Workers == 0 {
in.Workers = 4
}
if in.MtimeThreshold == "" {
in.MtimeThreshold = "2s"
}
}
func marshalExcludes(excludes []string) string {
if len(excludes) == 0 {
return "[]"
}
b, _ := json.Marshal(excludes)
return string(b)
}
func unmarshalExcludes(s string) []string {
if s == "" || s == "[]" {
return nil
}
var out []string
_ = json.Unmarshal([]byte(s), &out)
return out
}
func scanJob(row interface {
Scan(dest ...any) error
}) (*Job, error) {
var j Job
var dryRun, deleteMissing, enabled int
var excludesJSON string
err := row.Scan(
&j.ID, &j.Name, &j.Pair, &j.Direction,
&j.Src, &j.Dest, &j.SrcHost, &j.DestHost,
&j.CronSchedule, &j.LockTTLSeconds,
&dryRun, &deleteMissing, &j.Workers, &j.MtimeThreshold,
&excludesJSON, &enabled,
&j.CreatedAt, &j.UpdatedAt,
)
if err != nil {
return nil, err
}
j.DryRun = dryRun == 1
j.DeleteMissing = deleteMissing == 1
j.Enabled = enabled == 1
j.Excludes = unmarshalExcludes(excludesJSON)
return &j, nil
}
const jobSelectCols = `id, name, pair, direction, src, dest, src_host, dest_host,
cron_schedule, lock_ttl_seconds, dry_run, delete_missing, workers, mtime_threshold,
COALESCE(excludes, '[]'), enabled, created_at, updated_at`
// CreateJob inserts a new sync job and returns the created row.
func CreateJob(db *sql.DB, in JobInput) (*Job, error) {
in.applyDefaults()
now := time.Now().UTC()
dryRun := 0
if in.DryRun {
dryRun = 1
}
deleteMissing := 0
if in.DeleteMissing {
deleteMissing = 1
}
enabled := 0
if in.Enabled {
enabled = 1
}
res, err := db.Exec(`
INSERT INTO sync_jobs
(name, pair, direction, src, dest, src_host, dest_host,
cron_schedule, lock_ttl_seconds, dry_run, delete_missing,
workers, mtime_threshold, excludes, enabled, created_at, updated_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`,
in.Name, in.Pair, in.Direction, in.Src, in.Dest, in.SrcHost, in.DestHost,
in.CronSchedule, in.LockTTLSeconds, dryRun, deleteMissing,
in.Workers, in.MtimeThreshold, marshalExcludes(in.Excludes), enabled, now, now,
)
if err != nil {
return nil, fmt.Errorf("create job: %w", err)
}
id, _ := res.LastInsertId()
return GetJobByID(db, id)
}
// GetJobByID fetches a job by its numeric ID.
func GetJobByID(db *sql.DB, id int64) (*Job, error) {
row := db.QueryRow(`SELECT `+jobSelectCols+` FROM sync_jobs WHERE id = ?`, id)
j, err := scanJob(row)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("get job %d: %w", id, err)
}
return j, nil
}
// GetJobByName fetches a job by its unique name.
func GetJobByName(db *sql.DB, name string) (*Job, error) {
row := db.QueryRow(`SELECT `+jobSelectCols+` FROM sync_jobs WHERE name = ?`, name)
j, err := scanJob(row)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("get job %q: %w", name, err)
}
return j, nil
}
// ListJobs returns all jobs ordered by name.
func ListJobs(db *sql.DB) ([]*Job, error) {
rows, err := db.Query(`SELECT ` + jobSelectCols + ` FROM sync_jobs ORDER BY name`)
if err != nil {
return nil, fmt.Errorf("list jobs: %w", err)
}
defer rows.Close()
var jobs []*Job
for rows.Next() {
j, err := scanJob(rows)
if err != nil {
return nil, fmt.Errorf("list jobs scan: %w", err)
}
jobs = append(jobs, j)
}
return jobs, rows.Err()
}
// UpdateJob overwrites all mutable fields of a job.
func UpdateJob(db *sql.DB, id int64, in JobInput) (*Job, error) {
in.applyDefaults()
dryRun := 0
if in.DryRun {
dryRun = 1
}
deleteMissing := 0
if in.DeleteMissing {
deleteMissing = 1
}
enabled := 0
if in.Enabled {
enabled = 1
}
_, err := db.Exec(`
UPDATE sync_jobs SET
pair=?, direction=?, src=?, dest=?, src_host=?, dest_host=?,
cron_schedule=?, lock_ttl_seconds=?, dry_run=?, delete_missing=?,
workers=?, mtime_threshold=?, excludes=?, enabled=?, updated_at=?
WHERE id=?`,
in.Pair, in.Direction, in.Src, in.Dest, in.SrcHost, in.DestHost,
in.CronSchedule, in.LockTTLSeconds, dryRun, deleteMissing,
in.Workers, in.MtimeThreshold, marshalExcludes(in.Excludes), enabled,
time.Now().UTC(), id,
)
if err != nil {
return nil, fmt.Errorf("update job %d: %w", id, err)
}
return GetJobByID(db, id)
}
// SetJobEnabled sets the enabled flag and updates updated_at.
func SetJobEnabled(db *sql.DB, id int64, enabled bool) error {
v := 0
if enabled {
v = 1
}
_, err := db.Exec(`UPDATE sync_jobs SET enabled=?, updated_at=? WHERE id=?`,
v, time.Now().UTC(), id)
if err != nil {
return fmt.Errorf("set job enabled: %w", err)
}
return nil
}
// DeleteJob removes a job by ID.
func DeleteJob(db *sql.DB, id int64) error {
_, err := db.Exec(`DELETE FROM sync_jobs WHERE id=?`, id)
if err != nil {
return fmt.Errorf("delete job %d: %w", id, err)
}
return nil
}
// JobWithStats embeds a Job plus the last iteration summary.
type JobWithStats struct {
*Job
LastIteration *Iteration `json:"last_iteration,omitempty"`
}
// ListJobsWithStats returns all jobs joined with their latest real (non-dry-run) iteration.
func ListJobsWithStats(db *sql.DB) ([]*JobWithStats, error) {
jobs, err := ListJobs(db)
if err != nil {
return nil, err
}
out := make([]*JobWithStats, 0, len(jobs))
for _, j := range jobs {
row := db.QueryRow(`
SELECT id, sync_pair, direction, status, dry_run,
started_at, ended_at,
files_created, files_updated, files_deleted, files_skipped, files_failed,
total_bytes_transferred, error_message
FROM sync_iterations
WHERE sync_pair = ? AND direction = ? AND dry_run = 0
ORDER BY started_at DESC LIMIT 1`, j.Pair, j.Direction)
iter, err := scanIterationRow(row)
if err != nil && err != sql.ErrNoRows {
return nil, err
}
ws := &JobWithStats{Job: j}
if err == nil {
ws.LastIteration = iter
}
out = append(out, ws)
}
return out, nil
}
// Iteration is a minimal type used by jobs.go (mirrors ui handler's type).
type Iteration struct {
ID int64 `json:"id"`
SyncPair string `json:"sync_pair"`
Direction string `json:"direction"`
Status string `json:"status"`
DryRun bool `json:"dry_run"`
StartedAt time.Time `json:"started_at"`
EndedAt *time.Time `json:"ended_at"`
FilesCreated int `json:"files_created"`
FilesUpdated int `json:"files_updated"`
FilesDeleted int `json:"files_deleted"`
FilesSkipped int `json:"files_skipped"`
FilesFailed int `json:"files_failed"`
TotalBytesTransferred int64 `json:"total_bytes_transferred"`
ErrorMessage *string `json:"error_message,omitempty"`
}
func scanIterationRow(row interface {
Scan(dest ...any) error
}) (*Iteration, error) {
var it Iteration
var dryRun int
var endedAt sql.NullTime
var errMsg sql.NullString
err := row.Scan(
&it.ID, &it.SyncPair, &it.Direction, &it.Status, &dryRun,
&it.StartedAt, &endedAt,
&it.FilesCreated, &it.FilesUpdated, &it.FilesDeleted, &it.FilesSkipped, &it.FilesFailed,
&it.TotalBytesTransferred, &errMsg,
)
if err != nil {
return nil, err
}
it.DryRun = dryRun == 1
if endedAt.Valid {
it.EndedAt = &endedAt.Time
}
if errMsg.Valid {
it.ErrorMessage = &errMsg.String
}
return &it, nil
}
// ImportJob creates a job from K8s CronJob data without applying K8s changes.
// If a job with the same name already exists it returns the existing row.
func ImportJob(db *sql.DB, in JobInput) (*Job, error) {
in.applyDefaults()
existing, err := GetJobByName(db, in.Name)
if err != nil {
return nil, err
}
if existing != nil {
return existing, nil
}
return CreateJob(db, in)
}