- Add .gitignore: exclude compiled binaries, build artifacts, and Helm values files containing real secrets (authentik, prometheus) - Add all Kubernetes deployment manifests (deployment/) - Add services source code: ha-sync, device-inventory, games-console, paperclip, parts-inventory - Add Ansible orchestration: playbooks, roles, inventory, cloud-init - Add hardware specs, execution plans, scripts, HOMELAB.md - Add skills/homelab/SKILL.md + skills/install.sh to preserve Copilot skill - Remove previously-tracked inventory-cli binary from git index Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
343 lines
9.5 KiB
Go
343 lines
9.5 KiB
Go
package db
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
)
|
|
|
|
// Job represents a row in the sync_jobs table.
|
|
type Job struct {
|
|
ID int64 `json:"id"`
|
|
Name string `json:"name"`
|
|
Pair string `json:"pair"`
|
|
Direction string `json:"direction"`
|
|
Src string `json:"src"`
|
|
Dest string `json:"dest"`
|
|
SrcHost string `json:"src_host"`
|
|
DestHost string `json:"dest_host"`
|
|
CronSchedule string `json:"cron_schedule"`
|
|
LockTTLSeconds int `json:"lock_ttl_seconds"`
|
|
DryRun bool `json:"dry_run"`
|
|
DeleteMissing bool `json:"delete_missing"`
|
|
Workers int `json:"workers"`
|
|
MtimeThreshold string `json:"mtime_threshold"`
|
|
Excludes []string `json:"excludes"`
|
|
Enabled bool `json:"enabled"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
UpdatedAt time.Time `json:"updated_at"`
|
|
}
|
|
|
|
// JobInput is used for create/update operations (no ID/timestamps).
|
|
type JobInput struct {
|
|
Name string `json:"name"`
|
|
Pair string `json:"pair"`
|
|
Direction string `json:"direction"`
|
|
Src string `json:"src"`
|
|
Dest string `json:"dest"`
|
|
SrcHost string `json:"src_host"`
|
|
DestHost string `json:"dest_host"`
|
|
CronSchedule string `json:"cron_schedule"`
|
|
LockTTLSeconds int `json:"lock_ttl_seconds"`
|
|
DryRun bool `json:"dry_run"`
|
|
DeleteMissing bool `json:"delete_missing"`
|
|
Workers int `json:"workers"`
|
|
MtimeThreshold string `json:"mtime_threshold"`
|
|
Excludes []string `json:"excludes"`
|
|
Enabled bool `json:"enabled"`
|
|
}
|
|
|
|
func (in *JobInput) applyDefaults() {
|
|
if in.Direction == "" {
|
|
in.Direction = "fwd"
|
|
}
|
|
if in.SrcHost == "" {
|
|
in.SrcHost = "dell"
|
|
}
|
|
if in.DestHost == "" {
|
|
in.DestHost = "hp"
|
|
}
|
|
if in.CronSchedule == "" {
|
|
in.CronSchedule = "*/15 * * * *"
|
|
}
|
|
if in.LockTTLSeconds == 0 {
|
|
in.LockTTLSeconds = 3600
|
|
}
|
|
if in.Workers == 0 {
|
|
in.Workers = 4
|
|
}
|
|
if in.MtimeThreshold == "" {
|
|
in.MtimeThreshold = "2s"
|
|
}
|
|
}
|
|
|
|
func marshalExcludes(excludes []string) string {
|
|
if len(excludes) == 0 {
|
|
return "[]"
|
|
}
|
|
b, _ := json.Marshal(excludes)
|
|
return string(b)
|
|
}
|
|
|
|
func unmarshalExcludes(s string) []string {
|
|
if s == "" || s == "[]" {
|
|
return nil
|
|
}
|
|
var out []string
|
|
_ = json.Unmarshal([]byte(s), &out)
|
|
return out
|
|
}
|
|
|
|
func scanJob(row interface {
|
|
Scan(dest ...any) error
|
|
}) (*Job, error) {
|
|
var j Job
|
|
var dryRun, deleteMissing, enabled int
|
|
var excludesJSON string
|
|
err := row.Scan(
|
|
&j.ID, &j.Name, &j.Pair, &j.Direction,
|
|
&j.Src, &j.Dest, &j.SrcHost, &j.DestHost,
|
|
&j.CronSchedule, &j.LockTTLSeconds,
|
|
&dryRun, &deleteMissing, &j.Workers, &j.MtimeThreshold,
|
|
&excludesJSON, &enabled,
|
|
&j.CreatedAt, &j.UpdatedAt,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
j.DryRun = dryRun == 1
|
|
j.DeleteMissing = deleteMissing == 1
|
|
j.Enabled = enabled == 1
|
|
j.Excludes = unmarshalExcludes(excludesJSON)
|
|
return &j, nil
|
|
}
|
|
|
|
const jobSelectCols = `id, name, pair, direction, src, dest, src_host, dest_host,
|
|
cron_schedule, lock_ttl_seconds, dry_run, delete_missing, workers, mtime_threshold,
|
|
COALESCE(excludes, '[]'), enabled, created_at, updated_at`
|
|
|
|
// CreateJob inserts a new sync job and returns the created row.
|
|
func CreateJob(db *sql.DB, in JobInput) (*Job, error) {
|
|
in.applyDefaults()
|
|
now := time.Now().UTC()
|
|
dryRun := 0
|
|
if in.DryRun {
|
|
dryRun = 1
|
|
}
|
|
deleteMissing := 0
|
|
if in.DeleteMissing {
|
|
deleteMissing = 1
|
|
}
|
|
enabled := 0
|
|
if in.Enabled {
|
|
enabled = 1
|
|
}
|
|
res, err := db.Exec(`
|
|
INSERT INTO sync_jobs
|
|
(name, pair, direction, src, dest, src_host, dest_host,
|
|
cron_schedule, lock_ttl_seconds, dry_run, delete_missing,
|
|
workers, mtime_threshold, excludes, enabled, created_at, updated_at)
|
|
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`,
|
|
in.Name, in.Pair, in.Direction, in.Src, in.Dest, in.SrcHost, in.DestHost,
|
|
in.CronSchedule, in.LockTTLSeconds, dryRun, deleteMissing,
|
|
in.Workers, in.MtimeThreshold, marshalExcludes(in.Excludes), enabled, now, now,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create job: %w", err)
|
|
}
|
|
id, _ := res.LastInsertId()
|
|
return GetJobByID(db, id)
|
|
}
|
|
|
|
// GetJobByID fetches a job by its numeric ID.
|
|
func GetJobByID(db *sql.DB, id int64) (*Job, error) {
|
|
row := db.QueryRow(`SELECT `+jobSelectCols+` FROM sync_jobs WHERE id = ?`, id)
|
|
j, err := scanJob(row)
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get job %d: %w", id, err)
|
|
}
|
|
return j, nil
|
|
}
|
|
|
|
// GetJobByName fetches a job by its unique name.
|
|
func GetJobByName(db *sql.DB, name string) (*Job, error) {
|
|
row := db.QueryRow(`SELECT `+jobSelectCols+` FROM sync_jobs WHERE name = ?`, name)
|
|
j, err := scanJob(row)
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get job %q: %w", name, err)
|
|
}
|
|
return j, nil
|
|
}
|
|
|
|
// ListJobs returns all jobs ordered by name.
|
|
func ListJobs(db *sql.DB) ([]*Job, error) {
|
|
rows, err := db.Query(`SELECT ` + jobSelectCols + ` FROM sync_jobs ORDER BY name`)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list jobs: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
var jobs []*Job
|
|
for rows.Next() {
|
|
j, err := scanJob(rows)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list jobs scan: %w", err)
|
|
}
|
|
jobs = append(jobs, j)
|
|
}
|
|
return jobs, rows.Err()
|
|
}
|
|
|
|
// UpdateJob overwrites all mutable fields of a job.
|
|
func UpdateJob(db *sql.DB, id int64, in JobInput) (*Job, error) {
|
|
in.applyDefaults()
|
|
dryRun := 0
|
|
if in.DryRun {
|
|
dryRun = 1
|
|
}
|
|
deleteMissing := 0
|
|
if in.DeleteMissing {
|
|
deleteMissing = 1
|
|
}
|
|
enabled := 0
|
|
if in.Enabled {
|
|
enabled = 1
|
|
}
|
|
_, err := db.Exec(`
|
|
UPDATE sync_jobs SET
|
|
pair=?, direction=?, src=?, dest=?, src_host=?, dest_host=?,
|
|
cron_schedule=?, lock_ttl_seconds=?, dry_run=?, delete_missing=?,
|
|
workers=?, mtime_threshold=?, excludes=?, enabled=?, updated_at=?
|
|
WHERE id=?`,
|
|
in.Pair, in.Direction, in.Src, in.Dest, in.SrcHost, in.DestHost,
|
|
in.CronSchedule, in.LockTTLSeconds, dryRun, deleteMissing,
|
|
in.Workers, in.MtimeThreshold, marshalExcludes(in.Excludes), enabled,
|
|
time.Now().UTC(), id,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("update job %d: %w", id, err)
|
|
}
|
|
return GetJobByID(db, id)
|
|
}
|
|
|
|
// SetJobEnabled sets the enabled flag and updates updated_at.
|
|
func SetJobEnabled(db *sql.DB, id int64, enabled bool) error {
|
|
v := 0
|
|
if enabled {
|
|
v = 1
|
|
}
|
|
_, err := db.Exec(`UPDATE sync_jobs SET enabled=?, updated_at=? WHERE id=?`,
|
|
v, time.Now().UTC(), id)
|
|
if err != nil {
|
|
return fmt.Errorf("set job enabled: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DeleteJob removes a job by ID.
|
|
func DeleteJob(db *sql.DB, id int64) error {
|
|
_, err := db.Exec(`DELETE FROM sync_jobs WHERE id=?`, id)
|
|
if err != nil {
|
|
return fmt.Errorf("delete job %d: %w", id, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// JobWithStats embeds a Job plus the last iteration summary.
|
|
type JobWithStats struct {
|
|
*Job
|
|
LastIteration *Iteration `json:"last_iteration,omitempty"`
|
|
}
|
|
|
|
// ListJobsWithStats returns all jobs joined with their latest real (non-dry-run) iteration.
|
|
func ListJobsWithStats(db *sql.DB) ([]*JobWithStats, error) {
|
|
jobs, err := ListJobs(db)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out := make([]*JobWithStats, 0, len(jobs))
|
|
for _, j := range jobs {
|
|
row := db.QueryRow(`
|
|
SELECT id, sync_pair, direction, status, dry_run,
|
|
started_at, ended_at,
|
|
files_created, files_updated, files_deleted, files_skipped, files_failed,
|
|
total_bytes_transferred, error_message
|
|
FROM sync_iterations
|
|
WHERE sync_pair = ? AND direction = ? AND dry_run = 0
|
|
ORDER BY started_at DESC LIMIT 1`, j.Pair, j.Direction)
|
|
iter, err := scanIterationRow(row)
|
|
if err != nil && err != sql.ErrNoRows {
|
|
return nil, err
|
|
}
|
|
ws := &JobWithStats{Job: j}
|
|
if err == nil {
|
|
ws.LastIteration = iter
|
|
}
|
|
out = append(out, ws)
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// Iteration is a minimal type used by jobs.go (mirrors ui handler's type).
|
|
type Iteration struct {
|
|
ID int64 `json:"id"`
|
|
SyncPair string `json:"sync_pair"`
|
|
Direction string `json:"direction"`
|
|
Status string `json:"status"`
|
|
DryRun bool `json:"dry_run"`
|
|
StartedAt time.Time `json:"started_at"`
|
|
EndedAt *time.Time `json:"ended_at"`
|
|
FilesCreated int `json:"files_created"`
|
|
FilesUpdated int `json:"files_updated"`
|
|
FilesDeleted int `json:"files_deleted"`
|
|
FilesSkipped int `json:"files_skipped"`
|
|
FilesFailed int `json:"files_failed"`
|
|
TotalBytesTransferred int64 `json:"total_bytes_transferred"`
|
|
ErrorMessage *string `json:"error_message,omitempty"`
|
|
}
|
|
|
|
func scanIterationRow(row interface {
|
|
Scan(dest ...any) error
|
|
}) (*Iteration, error) {
|
|
var it Iteration
|
|
var dryRun int
|
|
var endedAt sql.NullTime
|
|
var errMsg sql.NullString
|
|
err := row.Scan(
|
|
&it.ID, &it.SyncPair, &it.Direction, &it.Status, &dryRun,
|
|
&it.StartedAt, &endedAt,
|
|
&it.FilesCreated, &it.FilesUpdated, &it.FilesDeleted, &it.FilesSkipped, &it.FilesFailed,
|
|
&it.TotalBytesTransferred, &errMsg,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
it.DryRun = dryRun == 1
|
|
if endedAt.Valid {
|
|
it.EndedAt = &endedAt.Time
|
|
}
|
|
if errMsg.Valid {
|
|
it.ErrorMessage = &errMsg.String
|
|
}
|
|
return &it, nil
|
|
}
|
|
|
|
// ImportJob creates a job from K8s CronJob data without applying K8s changes.
|
|
// If a job with the same name already exists it returns the existing row.
|
|
func ImportJob(db *sql.DB, in JobInput) (*Job, error) {
|
|
in.applyDefaults()
|
|
existing, err := GetJobByName(db, in.Name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if existing != nil {
|
|
return existing, nil
|
|
}
|
|
return CreateJob(db, in)
|
|
}
|