- Add .gitignore: exclude compiled binaries, build artifacts, and Helm values files containing real secrets (authentik, prometheus) - Add all Kubernetes deployment manifests (deployment/) - Add services source code: ha-sync, device-inventory, games-console, paperclip, parts-inventory - Add Ansible orchestration: playbooks, roles, inventory, cloud-init - Add hardware specs, execution plans, scripts, HOMELAB.md - Add skills/homelab/SKILL.md + skills/install.sh to preserve Copilot skill - Remove previously-tracked inventory-cli binary from git index Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1036 lines
26 KiB
Go
1036 lines
26 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"text/tabwriter"
|
|
"time"
|
|
|
|
dbpkg "github.com/vandachevici/ha-sync/internal/db"
|
|
"github.com/vandachevici/ha-sync/internal/kube"
|
|
)
|
|
|
|
var outputFormat = "table"
|
|
|
|
func main() {
|
|
// Extract --output global flag before subcommand dispatch.
|
|
args := os.Args[1:]
|
|
filtered := make([]string, 0, len(args))
|
|
for i := 0; i < len(args); i++ {
|
|
a := args[i]
|
|
if strings.HasPrefix(a, "--output=") {
|
|
outputFormat = strings.TrimPrefix(a, "--output=")
|
|
} else if a == "--output" && i+1 < len(args) {
|
|
outputFormat = args[i+1]
|
|
i++
|
|
} else {
|
|
filtered = append(filtered, a)
|
|
}
|
|
}
|
|
args = filtered
|
|
|
|
if len(args) < 2 {
|
|
usage()
|
|
os.Exit(1)
|
|
}
|
|
|
|
group, sub := args[0], args[1]
|
|
rest := args[2:]
|
|
|
|
switch group {
|
|
case "jobs":
|
|
runJobs(sub, rest)
|
|
case "runs":
|
|
runRuns(sub, rest)
|
|
case "ops":
|
|
runOps(sub, rest)
|
|
default:
|
|
fmt.Fprintf(os.Stderr, "unknown group %q\n", group)
|
|
usage()
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func usage() {
|
|
fmt.Fprintln(os.Stderr, `Usage: ha-sync-ctl <group> <command> [flags]
|
|
|
|
Groups and commands:
|
|
jobs list
|
|
jobs create --name=X --pair=Y --direction=Z [...]
|
|
jobs edit <name-or-id> [--field=val ...]
|
|
jobs delete <name-or-id>
|
|
jobs enable <name-or-id>
|
|
jobs disable <name-or-id>
|
|
jobs trigger <name-or-id>
|
|
jobs lock-status <name-or-id>
|
|
jobs apply-all
|
|
jobs import-k8s
|
|
|
|
runs list [--job=<name>] [--limit=N]
|
|
runs show <run-id>
|
|
|
|
ops list <run-id> [--limit=N] [--action=copy|delete|skip|error]
|
|
|
|
Global flags:
|
|
--output=json Output as JSON instead of table`)
|
|
}
|
|
|
|
// --- env / connection helpers ---
|
|
|
|
func mustDB() *sql.DB {
|
|
dsn := os.Getenv("HA_SYNC_DB_DSN")
|
|
if dsn == "" {
|
|
fmt.Fprintln(os.Stderr, "error: HA_SYNC_DB_DSN environment variable is required")
|
|
os.Exit(1)
|
|
}
|
|
d, err := dbpkg.Connect(dsn)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: connect to DB: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
return d
|
|
}
|
|
|
|
func namespace() string {
|
|
if ns := os.Getenv("HA_SYNC_NAMESPACE"); ns != "" {
|
|
return ns
|
|
}
|
|
return "infrastructure"
|
|
}
|
|
|
|
// --- job resolution ---
|
|
|
|
func resolveJob(database *sql.DB, nameOrID string) (*dbpkg.Job, error) {
|
|
if id, err := strconv.ParseInt(nameOrID, 10, 64); err == nil {
|
|
j, err := dbpkg.GetJobByID(database, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if j == nil {
|
|
return nil, fmt.Errorf("job with ID %d not found", id)
|
|
}
|
|
return j, nil
|
|
}
|
|
j, err := dbpkg.GetJobByName(database, nameOrID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if j == nil {
|
|
return nil, fmt.Errorf("job %q not found", nameOrID)
|
|
}
|
|
return j, nil
|
|
}
|
|
|
|
// --- formatting helpers ---
|
|
|
|
func formatSize(bytes int64) string {
|
|
if bytes <= 0 {
|
|
return "0 B"
|
|
}
|
|
const unit = 1024.0
|
|
if bytes < unit {
|
|
return fmt.Sprintf("%d B", bytes)
|
|
}
|
|
div, exp := int64(unit), 0
|
|
for n := bytes / unit; n >= unit; n /= unit {
|
|
div *= unit
|
|
exp++
|
|
}
|
|
suffixes := []string{"KB", "MB", "GB", "TB", "PB"}
|
|
return fmt.Sprintf("%.1f %s", float64(bytes)/float64(div), suffixes[exp])
|
|
}
|
|
|
|
func formatDuration(d time.Duration) string {
|
|
if d < 0 {
|
|
d = 0
|
|
}
|
|
h := int(d.Hours())
|
|
m := int(d.Minutes()) % 60
|
|
s := int(d.Seconds()) % 60
|
|
if h > 0 {
|
|
return fmt.Sprintf("%dh%dm", h, m)
|
|
}
|
|
if m > 0 {
|
|
return fmt.Sprintf("%dm%ds", m, s)
|
|
}
|
|
return fmt.Sprintf("%ds", s)
|
|
}
|
|
|
|
func timeAgo(t time.Time) string {
|
|
d := time.Since(t)
|
|
if d < 0 {
|
|
return "just now"
|
|
}
|
|
switch {
|
|
case d < time.Minute:
|
|
return "just now"
|
|
case d < time.Hour:
|
|
return fmt.Sprintf("%dm ago", int(d.Minutes()))
|
|
case d < 24*time.Hour:
|
|
return fmt.Sprintf("%dh ago", int(d.Hours()))
|
|
default:
|
|
return fmt.Sprintf("%dd ago", int(d.Hours())/24)
|
|
}
|
|
}
|
|
|
|
func formatDirection(direction, srcHost, destHost string) string {
|
|
src, dst := srcHost, destHost
|
|
if src == "" {
|
|
src = "dell"
|
|
}
|
|
if dst == "" {
|
|
dst = "hp"
|
|
}
|
|
switch strings.ToLower(direction) {
|
|
case "fwd", "dell-to-hp":
|
|
return src + "→" + dst
|
|
case "rev", "hp-to-dell":
|
|
return dst + "→" + src
|
|
default:
|
|
return direction
|
|
}
|
|
}
|
|
|
|
func boolStr(b bool) string {
|
|
if b {
|
|
return "yes"
|
|
}
|
|
return "no"
|
|
}
|
|
|
|
// --- job/spec conversion helpers ---
|
|
|
|
func jobToSpec(j *dbpkg.Job) kube.JobSpec {
|
|
return kube.JobSpec{
|
|
ID: j.ID,
|
|
Name: j.Name,
|
|
Pair: j.Pair,
|
|
Direction: j.Direction,
|
|
Src: j.Src,
|
|
Dest: j.Dest,
|
|
SrcHost: j.SrcHost,
|
|
DestHost: j.DestHost,
|
|
CronSchedule: j.CronSchedule,
|
|
LockTTLSeconds: j.LockTTLSeconds,
|
|
DryRun: j.DryRun,
|
|
DeleteMissing: j.DeleteMissing,
|
|
Workers: j.Workers,
|
|
MtimeThreshold: j.MtimeThreshold,
|
|
Excludes: j.Excludes,
|
|
Enabled: j.Enabled,
|
|
}
|
|
}
|
|
|
|
func specToJobInput(s *kube.JobSpec) dbpkg.JobInput {
|
|
return dbpkg.JobInput{
|
|
Name: s.Name,
|
|
Pair: s.Pair,
|
|
Direction: s.Direction,
|
|
Src: s.Src,
|
|
Dest: s.Dest,
|
|
SrcHost: s.SrcHost,
|
|
DestHost: s.DestHost,
|
|
CronSchedule: s.CronSchedule,
|
|
LockTTLSeconds: s.LockTTLSeconds,
|
|
DryRun: s.DryRun,
|
|
DeleteMissing: s.DeleteMissing,
|
|
Workers: s.Workers,
|
|
MtimeThreshold: s.MtimeThreshold,
|
|
Excludes: s.Excludes,
|
|
Enabled: s.Enabled,
|
|
}
|
|
}
|
|
|
|
// ============================================================
|
|
// jobs commands
|
|
// ============================================================
|
|
|
|
func runJobs(sub string, args []string) {
|
|
switch sub {
|
|
case "list":
|
|
jobsList(args)
|
|
case "create":
|
|
jobsCreate(args)
|
|
case "edit":
|
|
jobsEdit(args)
|
|
case "delete":
|
|
jobsDelete(args)
|
|
case "enable":
|
|
jobsSetEnabled(args, true)
|
|
case "disable":
|
|
jobsSetEnabled(args, false)
|
|
case "trigger":
|
|
jobsTrigger(args)
|
|
case "lock-status":
|
|
jobsLockStatus(args)
|
|
case "apply-all":
|
|
jobsApplyAll(args)
|
|
case "import-k8s":
|
|
jobsImportK8s(args)
|
|
default:
|
|
fmt.Fprintf(os.Stderr, "unknown jobs command %q\n", sub)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func jobsList(_ []string) {
|
|
database := mustDB()
|
|
defer database.Close()
|
|
|
|
jobs, err := dbpkg.ListJobsWithStats(database)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
if outputFormat == "json" {
|
|
enc := json.NewEncoder(os.Stdout)
|
|
enc.SetIndent("", " ")
|
|
enc.Encode(jobs)
|
|
return
|
|
}
|
|
|
|
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
|
|
fmt.Fprintln(w, "NAME\tPAIR\tDIRECTION\tENABLED\tSCHEDULE\tLAST RUN\tSTATUS")
|
|
for _, j := range jobs {
|
|
lastRun := "never"
|
|
status := "-"
|
|
if !j.Enabled {
|
|
status = "disabled"
|
|
}
|
|
if j.LastIteration != nil {
|
|
lastRun = timeAgo(j.LastIteration.StartedAt)
|
|
if j.Enabled {
|
|
status = j.LastIteration.Status
|
|
}
|
|
}
|
|
dir := formatDirection(j.Direction, j.SrcHost, j.DestHost)
|
|
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n",
|
|
j.Name, j.Pair, dir, boolStr(j.Enabled), j.CronSchedule, lastRun, status)
|
|
}
|
|
w.Flush()
|
|
}
|
|
|
|
func jobsCreate(args []string) {
|
|
f := flag.NewFlagSet("jobs create", flag.ExitOnError)
|
|
name := f.String("name", "", "job name (required)")
|
|
pair := f.String("pair", "", "sync pair (required): media, photos, owncloud, games, infra, ai")
|
|
direction := f.String("direction", "dell-to-hp", "dell-to-hp or hp-to-dell")
|
|
src := f.String("src", "", "source path")
|
|
dest := f.String("dest", "", "destination path")
|
|
srcHost := f.String("src-host", "dell", "source host identifier")
|
|
destHost := f.String("dest-host", "hp", "destination host identifier")
|
|
schedule := f.String("schedule", "0 2 * * *", "cron schedule")
|
|
lockTTL := f.Int("lock-ttl", 3600, "lock TTL in seconds")
|
|
workers := f.Int("workers", 4, "parallel workers")
|
|
dryRun := f.Bool("dry-run", false, "enable dry run mode")
|
|
deleteMissing := f.Bool("delete-missing", false, "delete files at dest not in src")
|
|
excludes := f.String("excludes", "", "comma-separated exclude patterns")
|
|
f.Parse(args)
|
|
|
|
if *name == "" {
|
|
fmt.Fprintln(os.Stderr, "error: --name is required")
|
|
os.Exit(1)
|
|
}
|
|
if *pair == "" {
|
|
fmt.Fprintln(os.Stderr, "error: --pair is required")
|
|
os.Exit(1)
|
|
}
|
|
|
|
var excList []string
|
|
if *excludes != "" {
|
|
excList = strings.Split(*excludes, ",")
|
|
}
|
|
|
|
in := dbpkg.JobInput{
|
|
Name: *name,
|
|
Pair: *pair,
|
|
Direction: *direction,
|
|
Src: *src,
|
|
Dest: *dest,
|
|
SrcHost: *srcHost,
|
|
DestHost: *destHost,
|
|
CronSchedule: *schedule,
|
|
LockTTLSeconds: *lockTTL,
|
|
Workers: *workers,
|
|
DryRun: *dryRun,
|
|
DeleteMissing: *deleteMissing,
|
|
Excludes: excList,
|
|
Enabled: true,
|
|
}
|
|
|
|
database := mustDB()
|
|
defer database.Close()
|
|
|
|
job, err := dbpkg.CreateJob(database, in)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: create job: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
if outputFormat == "json" {
|
|
enc := json.NewEncoder(os.Stdout)
|
|
enc.SetIndent("", " ")
|
|
enc.Encode(job)
|
|
return
|
|
}
|
|
fmt.Printf("created job %q (id=%d)\n", job.Name, job.ID)
|
|
}
|
|
|
|
func jobsEdit(args []string) {
|
|
if len(args) == 0 {
|
|
fmt.Fprintln(os.Stderr, "error: jobs edit requires <name-or-id>")
|
|
os.Exit(1)
|
|
}
|
|
nameOrID := args[0]
|
|
rest := args[1:]
|
|
|
|
f := flag.NewFlagSet("jobs edit", flag.ExitOnError)
|
|
pair := f.String("pair", "", "sync pair")
|
|
direction := f.String("direction", "", "dell-to-hp or hp-to-dell")
|
|
src := f.String("src", "", "source path")
|
|
dest := f.String("dest", "", "destination path")
|
|
srcHost := f.String("src-host", "", "source host identifier")
|
|
destHost := f.String("dest-host", "", "destination host identifier")
|
|
schedule := f.String("schedule", "", "cron schedule")
|
|
lockTTL := f.Int("lock-ttl", 0, "lock TTL in seconds")
|
|
workers := f.Int("workers", 0, "parallel workers")
|
|
dryRun := f.Bool("dry-run", false, "enable dry run mode")
|
|
deleteMissing := f.Bool("delete-missing", false, "delete files at dest not in src")
|
|
excludes := f.String("excludes", "", "comma-separated exclude patterns")
|
|
f.Parse(rest)
|
|
|
|
database := mustDB()
|
|
defer database.Close()
|
|
|
|
job, err := resolveJob(database, nameOrID)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Seed with existing values; only override what was explicitly set.
|
|
in := dbpkg.JobInput{
|
|
Name: job.Name,
|
|
Pair: job.Pair,
|
|
Direction: job.Direction,
|
|
Src: job.Src,
|
|
Dest: job.Dest,
|
|
SrcHost: job.SrcHost,
|
|
DestHost: job.DestHost,
|
|
CronSchedule: job.CronSchedule,
|
|
LockTTLSeconds: job.LockTTLSeconds,
|
|
Workers: job.Workers,
|
|
DryRun: job.DryRun,
|
|
DeleteMissing: job.DeleteMissing,
|
|
Excludes: job.Excludes,
|
|
MtimeThreshold: job.MtimeThreshold,
|
|
Enabled: job.Enabled,
|
|
}
|
|
|
|
f.Visit(func(fl *flag.Flag) {
|
|
switch fl.Name {
|
|
case "pair":
|
|
in.Pair = *pair
|
|
case "direction":
|
|
in.Direction = *direction
|
|
case "src":
|
|
in.Src = *src
|
|
case "dest":
|
|
in.Dest = *dest
|
|
case "src-host":
|
|
in.SrcHost = *srcHost
|
|
case "dest-host":
|
|
in.DestHost = *destHost
|
|
case "schedule":
|
|
in.CronSchedule = *schedule
|
|
case "lock-ttl":
|
|
in.LockTTLSeconds = *lockTTL
|
|
case "workers":
|
|
in.Workers = *workers
|
|
case "dry-run":
|
|
in.DryRun = *dryRun
|
|
case "delete-missing":
|
|
in.DeleteMissing = *deleteMissing
|
|
case "excludes":
|
|
if *excludes != "" {
|
|
in.Excludes = strings.Split(*excludes, ",")
|
|
} else {
|
|
in.Excludes = nil
|
|
}
|
|
}
|
|
})
|
|
|
|
updated, err := dbpkg.UpdateJob(database, job.ID, in)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: update job: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
if outputFormat == "json" {
|
|
enc := json.NewEncoder(os.Stdout)
|
|
enc.SetIndent("", " ")
|
|
enc.Encode(updated)
|
|
return
|
|
}
|
|
fmt.Printf("updated job %q (id=%d)\n", updated.Name, updated.ID)
|
|
}
|
|
|
|
func jobsDelete(args []string) {
|
|
if len(args) == 0 {
|
|
fmt.Fprintln(os.Stderr, "error: jobs delete requires <name-or-id>")
|
|
os.Exit(1)
|
|
}
|
|
nameOrID := args[0]
|
|
|
|
database := mustDB()
|
|
defer database.Close()
|
|
|
|
job, err := resolveJob(database, nameOrID)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
client, kubeErr := kube.NewClient()
|
|
if kubeErr != nil {
|
|
fmt.Fprintf(os.Stderr, "warning: K8s unavailable, skipping CronJob deletion: %v\n", kubeErr)
|
|
} else {
|
|
if err := kube.DeleteCronJob(context.Background(), client, namespace(), job.Name); err != nil {
|
|
fmt.Fprintf(os.Stderr, "warning: delete K8s CronJob: %v\n", err)
|
|
}
|
|
}
|
|
|
|
if err := dbpkg.DeleteJob(database, job.ID); err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: delete job: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
fmt.Printf("deleted job %q (id=%d)\n", job.Name, job.ID)
|
|
}
|
|
|
|
func jobsSetEnabled(args []string, enable bool) {
|
|
verb := "enable"
|
|
if !enable {
|
|
verb = "disable"
|
|
}
|
|
if len(args) == 0 {
|
|
fmt.Fprintf(os.Stderr, "error: jobs %s requires <name-or-id>\n", verb)
|
|
os.Exit(1)
|
|
}
|
|
nameOrID := args[0]
|
|
|
|
database := mustDB()
|
|
defer database.Close()
|
|
|
|
job, err := resolveJob(database, nameOrID)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
if err := dbpkg.SetJobEnabled(database, job.ID, enable); err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
client, kubeErr := kube.NewClient()
|
|
if kubeErr != nil {
|
|
fmt.Fprintf(os.Stderr, "warning: K8s unavailable, skipping CronJob suspend toggle: %v\n", kubeErr)
|
|
} else {
|
|
if err := kube.SuspendCronJob(context.Background(), client, namespace(), job.Name, !enable); err != nil {
|
|
fmt.Fprintf(os.Stderr, "warning: suspend CronJob: %v\n", err)
|
|
}
|
|
}
|
|
|
|
pastTense := "enabled"
|
|
if !enable {
|
|
pastTense = "disabled"
|
|
}
|
|
fmt.Printf("%s job %q (id=%d)\n", pastTense, job.Name, job.ID)
|
|
}
|
|
|
|
func jobsTrigger(args []string) {
|
|
if len(args) == 0 {
|
|
fmt.Fprintln(os.Stderr, "error: jobs trigger requires <name-or-id>")
|
|
os.Exit(1)
|
|
}
|
|
nameOrID := args[0]
|
|
|
|
database := mustDB()
|
|
defer database.Close()
|
|
|
|
job, err := resolveJob(database, nameOrID)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
client, kubeErr := kube.NewClient()
|
|
if kubeErr != nil {
|
|
fmt.Fprintf(os.Stderr, "error: K8s unavailable: %v\n", kubeErr)
|
|
os.Exit(1)
|
|
}
|
|
|
|
created, err := kube.TriggerJob(context.Background(), client, namespace(), job.Name)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: trigger job: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
fmt.Printf("triggered job %q → K8s job %q\n", job.Name, created)
|
|
}
|
|
|
|
func jobsLockStatus(args []string) {
|
|
if len(args) == 0 {
|
|
fmt.Fprintln(os.Stderr, "error: jobs lock-status requires <name-or-id>")
|
|
os.Exit(1)
|
|
}
|
|
nameOrID := args[0]
|
|
|
|
database := mustDB()
|
|
defer database.Close()
|
|
|
|
job, err := resolveJob(database, nameOrID)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
client, kubeErr := kube.NewClient()
|
|
if kubeErr != nil {
|
|
fmt.Fprintf(os.Stderr, "error: K8s unavailable: %v\n", kubeErr)
|
|
os.Exit(1)
|
|
}
|
|
|
|
ls, err := kube.GetLockStatus(context.Background(), client, namespace(), job.Name)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: get lock status: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
if outputFormat == "json" {
|
|
enc := json.NewEncoder(os.Stdout)
|
|
enc.SetIndent("", " ")
|
|
enc.Encode(ls)
|
|
return
|
|
}
|
|
|
|
if ls.Locked {
|
|
fmt.Printf("job %q is LOCKED\n holder: %s\n expires: %s\n",
|
|
job.Name, ls.Holder, ls.ExpiresAt.Format(time.RFC3339))
|
|
} else {
|
|
fmt.Printf("job %q is not locked\n", job.Name)
|
|
}
|
|
}
|
|
|
|
func jobsApplyAll(_ []string) {
|
|
database := mustDB()
|
|
defer database.Close()
|
|
|
|
jobs, err := dbpkg.ListJobs(database)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: list jobs: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
client, kubeErr := kube.NewClient()
|
|
if kubeErr != nil {
|
|
fmt.Fprintf(os.Stderr, "error: K8s unavailable: %v\n", kubeErr)
|
|
os.Exit(1)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
ns := namespace()
|
|
applied := 0
|
|
for _, j := range jobs {
|
|
if !j.Enabled {
|
|
continue
|
|
}
|
|
if err := kube.ApplyCronJob(ctx, client, ns, jobToSpec(j)); err != nil {
|
|
fmt.Fprintf(os.Stderr, "warning: apply %q: %v\n", j.Name, err)
|
|
continue
|
|
}
|
|
applied++
|
|
}
|
|
fmt.Printf("applied %d CronJob(s)\n", applied)
|
|
}
|
|
|
|
func jobsImportK8s(_ []string) {
|
|
client, kubeErr := kube.NewClient()
|
|
if kubeErr != nil {
|
|
fmt.Fprintf(os.Stderr, "error: K8s unavailable: %v\n", kubeErr)
|
|
os.Exit(1)
|
|
}
|
|
|
|
cjs, err := kube.ListCronJobs(context.Background(), client, namespace())
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: list CronJobs: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
database := mustDB()
|
|
defer database.Close()
|
|
|
|
imported, existed := 0, 0
|
|
for _, cj := range cjs {
|
|
spec := kube.ImportFromCronJob(cj)
|
|
if spec == nil {
|
|
continue
|
|
}
|
|
// Check existence before importing to distinguish new vs existing.
|
|
existing, _ := dbpkg.GetJobByName(database, spec.Name)
|
|
_, err := dbpkg.ImportJob(database, specToJobInput(spec))
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "warning: import %q: %v\n", spec.Name, err)
|
|
continue
|
|
}
|
|
if existing != nil {
|
|
existed++
|
|
} else {
|
|
imported++
|
|
}
|
|
}
|
|
fmt.Printf("imported %d, already existed %d\n", imported, existed)
|
|
}
|
|
|
|
// ============================================================
|
|
// runs commands
|
|
// ============================================================
|
|
|
|
func runRuns(sub string, args []string) {
|
|
switch sub {
|
|
case "list":
|
|
runsList(args)
|
|
case "show":
|
|
runsShow(args)
|
|
default:
|
|
fmt.Fprintf(os.Stderr, "unknown runs command %q\n", sub)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
type runRow struct {
|
|
ID int64 `json:"id"`
|
|
JobName string `json:"job"`
|
|
StartedAt time.Time `json:"started_at"`
|
|
EndedAt *time.Time `json:"ended_at,omitempty"`
|
|
Duration string `json:"duration"`
|
|
Copied int `json:"copied"`
|
|
Deleted int `json:"deleted"`
|
|
Skipped int `json:"skipped"`
|
|
Errors int `json:"errors"`
|
|
Status string `json:"status"`
|
|
}
|
|
|
|
func runsList(args []string) {
|
|
f := flag.NewFlagSet("runs list", flag.ExitOnError)
|
|
jobName := f.String("job", "", "filter by job name")
|
|
limit := f.Int("limit", 20, "max results")
|
|
f.Parse(args)
|
|
|
|
database := mustDB()
|
|
defer database.Close()
|
|
|
|
const baseQuery = `
|
|
SELECT i.id, COALESCE(j.name, i.sync_pair), i.started_at, i.ended_at,
|
|
i.files_created + i.files_updated, i.files_deleted, i.files_skipped,
|
|
i.files_failed, i.status
|
|
FROM sync_iterations i
|
|
LEFT JOIN sync_jobs j ON j.pair = i.sync_pair AND j.direction = i.direction`
|
|
|
|
var (
|
|
rows *sql.Rows
|
|
err error
|
|
)
|
|
|
|
if *jobName != "" {
|
|
job, err2 := dbpkg.GetJobByName(database, *jobName)
|
|
if err2 != nil {
|
|
fmt.Fprintf(os.Stderr, "error: %v\n", err2)
|
|
os.Exit(1)
|
|
}
|
|
if job == nil {
|
|
fmt.Fprintf(os.Stderr, "error: job %q not found\n", *jobName)
|
|
os.Exit(1)
|
|
}
|
|
rows, err = database.Query(
|
|
baseQuery+` WHERE i.sync_pair = ? AND i.direction = ? ORDER BY i.started_at DESC LIMIT ?`,
|
|
job.Pair, job.Direction, *limit)
|
|
} else {
|
|
rows, err = database.Query(baseQuery+` ORDER BY i.started_at DESC LIMIT ?`, *limit)
|
|
}
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: query runs: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var runs []runRow
|
|
for rows.Next() {
|
|
var r runRow
|
|
var endedAt sql.NullTime
|
|
var copied, deleted, skipped, errors int
|
|
if err := rows.Scan(&r.ID, &r.JobName, &r.StartedAt, &endedAt,
|
|
&copied, &deleted, &skipped, &errors, &r.Status); err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: scan run: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
if endedAt.Valid {
|
|
r.EndedAt = &endedAt.Time
|
|
r.Duration = formatDuration(endedAt.Time.Sub(r.StartedAt))
|
|
} else {
|
|
r.Duration = "running"
|
|
}
|
|
r.Copied = copied
|
|
r.Deleted = deleted
|
|
r.Skipped = skipped
|
|
r.Errors = errors
|
|
runs = append(runs, r)
|
|
}
|
|
if rows.Err() != nil {
|
|
fmt.Fprintf(os.Stderr, "error: rows: %v\n", rows.Err())
|
|
os.Exit(1)
|
|
}
|
|
|
|
if outputFormat == "json" {
|
|
enc := json.NewEncoder(os.Stdout)
|
|
enc.SetIndent("", " ")
|
|
enc.Encode(runs)
|
|
return
|
|
}
|
|
|
|
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
|
|
fmt.Fprintln(w, "RUN ID\tJOB\tSTARTED\tDURATION\tCOPIED\tDELETED\tSKIPPED\tERRORS")
|
|
for _, r := range runs {
|
|
fmt.Fprintf(w, "%d\t%s\t%s\t%s\t%d\t%d\t%d\t%d\n",
|
|
r.ID, r.JobName,
|
|
r.StartedAt.Format("2006-01-02 15:04"),
|
|
r.Duration, r.Copied, r.Deleted, r.Skipped, r.Errors)
|
|
}
|
|
w.Flush()
|
|
}
|
|
|
|
func runsShow(args []string) {
|
|
if len(args) == 0 {
|
|
fmt.Fprintln(os.Stderr, "error: runs show requires <run-id>")
|
|
os.Exit(1)
|
|
}
|
|
runID, err := strconv.ParseInt(args[0], 10, 64)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: invalid run ID %q\n", args[0])
|
|
os.Exit(1)
|
|
}
|
|
|
|
database := mustDB()
|
|
defer database.Close()
|
|
|
|
type runDetail struct {
|
|
ID int64 `json:"id"`
|
|
JobName string `json:"job"`
|
|
Pair string `json:"pair"`
|
|
Direction string `json:"direction"`
|
|
StartedAt time.Time `json:"started_at"`
|
|
EndedAt *time.Time `json:"ended_at,omitempty"`
|
|
Duration string `json:"duration"`
|
|
Status string `json:"status"`
|
|
DryRun bool `json:"dry_run"`
|
|
FilesCreated int `json:"files_created"`
|
|
FilesUpdated int `json:"files_updated"`
|
|
FilesDeleted int `json:"files_deleted"`
|
|
FilesSkipped int `json:"files_skipped"`
|
|
FilesFailed int `json:"files_failed"`
|
|
TotalBytesTransferred int64 `json:"total_bytes_transferred"`
|
|
ErrorMessage *string `json:"error_message,omitempty"`
|
|
}
|
|
|
|
var d runDetail
|
|
var endedAt sql.NullTime
|
|
var dryRun int
|
|
var errMsg sql.NullString
|
|
|
|
err = database.QueryRow(`
|
|
SELECT i.id, COALESCE(j.name, i.sync_pair), i.sync_pair, i.direction,
|
|
i.started_at, i.ended_at, i.status, i.dry_run,
|
|
i.files_created, i.files_updated, i.files_deleted,
|
|
i.files_skipped, i.files_failed, i.total_bytes_transferred, i.error_message
|
|
FROM sync_iterations i
|
|
LEFT JOIN sync_jobs j ON j.pair = i.sync_pair AND j.direction = i.direction
|
|
WHERE i.id = ?`, runID).Scan(
|
|
&d.ID, &d.JobName, &d.Pair, &d.Direction,
|
|
&d.StartedAt, &endedAt, &d.Status, &dryRun,
|
|
&d.FilesCreated, &d.FilesUpdated, &d.FilesDeleted,
|
|
&d.FilesSkipped, &d.FilesFailed, &d.TotalBytesTransferred, &errMsg,
|
|
)
|
|
if err == sql.ErrNoRows {
|
|
fmt.Fprintf(os.Stderr, "error: run %d not found\n", runID)
|
|
os.Exit(1)
|
|
}
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: query run: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
d.DryRun = dryRun == 1
|
|
if endedAt.Valid {
|
|
d.EndedAt = &endedAt.Time
|
|
d.Duration = formatDuration(endedAt.Time.Sub(d.StartedAt))
|
|
} else {
|
|
d.Duration = "running"
|
|
}
|
|
if errMsg.Valid {
|
|
d.ErrorMessage = &errMsg.String
|
|
}
|
|
|
|
if outputFormat == "json" {
|
|
enc := json.NewEncoder(os.Stdout)
|
|
enc.SetIndent("", " ")
|
|
enc.Encode(d)
|
|
return
|
|
}
|
|
|
|
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
|
|
fmt.Fprintf(w, "Run ID:\t%d\n", d.ID)
|
|
fmt.Fprintf(w, "Job:\t%s\n", d.JobName)
|
|
fmt.Fprintf(w, "Pair:\t%s\n", d.Pair)
|
|
fmt.Fprintf(w, "Direction:\t%s\n", d.Direction)
|
|
fmt.Fprintf(w, "Status:\t%s\n", d.Status)
|
|
fmt.Fprintf(w, "Dry Run:\t%s\n", boolStr(d.DryRun))
|
|
fmt.Fprintf(w, "Started:\t%s\n", d.StartedAt.Format(time.RFC3339))
|
|
if d.EndedAt != nil {
|
|
fmt.Fprintf(w, "Ended:\t%s\n", d.EndedAt.Format(time.RFC3339))
|
|
}
|
|
fmt.Fprintf(w, "Duration:\t%s\n", d.Duration)
|
|
fmt.Fprintf(w, "Created:\t%d\n", d.FilesCreated)
|
|
fmt.Fprintf(w, "Updated:\t%d\n", d.FilesUpdated)
|
|
fmt.Fprintf(w, "Deleted:\t%d\n", d.FilesDeleted)
|
|
fmt.Fprintf(w, "Skipped:\t%d\n", d.FilesSkipped)
|
|
fmt.Fprintf(w, "Failed:\t%d\n", d.FilesFailed)
|
|
fmt.Fprintf(w, "Bytes Transferred:\t%s\n", formatSize(d.TotalBytesTransferred))
|
|
if d.ErrorMessage != nil {
|
|
fmt.Fprintf(w, "Error:\t%s\n", *d.ErrorMessage)
|
|
}
|
|
w.Flush()
|
|
}
|
|
|
|
// ============================================================
|
|
// ops commands
|
|
// ============================================================
|
|
|
|
func runOps(sub string, args []string) {
|
|
switch sub {
|
|
case "list":
|
|
opsList(args)
|
|
default:
|
|
fmt.Fprintf(os.Stderr, "unknown ops command %q\n", sub)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func opsList(args []string) {
|
|
if len(args) == 0 {
|
|
fmt.Fprintln(os.Stderr, "error: ops list requires <run-id>")
|
|
os.Exit(1)
|
|
}
|
|
runID, err := strconv.ParseInt(args[0], 10, 64)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: invalid run ID %q\n", args[0])
|
|
os.Exit(1)
|
|
}
|
|
|
|
f := flag.NewFlagSet("ops list", flag.ExitOnError)
|
|
limit := f.Int("limit", 100, "max results")
|
|
action := f.String("action", "", "filter by action: copy, delete, skip, error")
|
|
f.Parse(args[1:])
|
|
|
|
database := mustDB()
|
|
defer database.Close()
|
|
|
|
query := `
|
|
SELECT operation, filepath, COALESCE(src_host,''), COALESCE(dest_host,''),
|
|
COALESCE(size_before, size_after, 0), COALESCE(owner,'')
|
|
FROM sync_operations
|
|
WHERE iteration_id = ?`
|
|
qArgs := []interface{}{runID}
|
|
|
|
switch *action {
|
|
case "copy":
|
|
query += " AND (operation = 'create' OR operation = 'update')"
|
|
case "delete":
|
|
query += " AND operation = 'delete'"
|
|
case "skip":
|
|
query += " AND status = 'skip'"
|
|
case "error":
|
|
query += " AND status = 'fail'"
|
|
case "":
|
|
// no filter
|
|
default:
|
|
fmt.Fprintf(os.Stderr, "error: unknown action %q (use copy, delete, skip, error)\n", *action)
|
|
os.Exit(1)
|
|
}
|
|
|
|
query += " ORDER BY started_at ASC LIMIT ?"
|
|
qArgs = append(qArgs, *limit)
|
|
|
|
rows, err := database.Query(query, qArgs...)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: query ops: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
defer rows.Close()
|
|
|
|
type opRow struct {
|
|
Action string `json:"action"`
|
|
Path string `json:"path"`
|
|
FromTo string `json:"from_to"`
|
|
Size string `json:"size"`
|
|
Owner string `json:"owner"`
|
|
}
|
|
|
|
var ops []opRow
|
|
for rows.Next() {
|
|
var op opRow
|
|
var operation, srcHost, destHost, owner string
|
|
var size int64
|
|
if err := rows.Scan(&operation, &op.Path, &srcHost, &destHost, &size, &owner); err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: scan op: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
switch operation {
|
|
case "create", "update":
|
|
op.Action = "copy"
|
|
default:
|
|
op.Action = operation
|
|
}
|
|
if srcHost != "" && destHost != "" {
|
|
op.FromTo = srcHost + "→" + destHost
|
|
} else {
|
|
op.FromTo = "-"
|
|
}
|
|
op.Size = formatSize(size)
|
|
op.Owner = owner
|
|
ops = append(ops, op)
|
|
}
|
|
if rows.Err() != nil {
|
|
fmt.Fprintf(os.Stderr, "error: rows: %v\n", rows.Err())
|
|
os.Exit(1)
|
|
}
|
|
|
|
if outputFormat == "json" {
|
|
enc := json.NewEncoder(os.Stdout)
|
|
enc.SetIndent("", " ")
|
|
enc.Encode(ops)
|
|
return
|
|
}
|
|
|
|
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
|
|
fmt.Fprintln(w, "ACTION\tPATH\tFROM→TO\tSIZE\tOWNER")
|
|
for _, op := range ops {
|
|
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n",
|
|
op.Action, op.Path, op.FromTo, op.Size, op.Owner)
|
|
}
|
|
w.Flush()
|
|
}
|