homelab/services/ha-sync/cmd/ha-sync-ctl/main.go
Dan V deb6c38d7b chore: commit homelab setup — deployment, services, orchestration, skill
- Add .gitignore: exclude compiled binaries, build artifacts, and Helm
  values files containing real secrets (authentik, prometheus)
- Add all Kubernetes deployment manifests (deployment/)
- Add services source code: ha-sync, device-inventory, games-console,
  paperclip, parts-inventory
- Add Ansible orchestration: playbooks, roles, inventory, cloud-init
- Add hardware specs, execution plans, scripts, HOMELAB.md
- Add skills/homelab/SKILL.md + skills/install.sh to preserve Copilot skill
- Remove previously-tracked inventory-cli binary from git index

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-09 08:10:32 +02:00

1036 lines
26 KiB
Go

package main
import (
"context"
"database/sql"
"encoding/json"
"flag"
"fmt"
"os"
"strconv"
"strings"
"text/tabwriter"
"time"
dbpkg "github.com/vandachevici/ha-sync/internal/db"
"github.com/vandachevici/ha-sync/internal/kube"
)
var outputFormat = "table"
func main() {
// Extract --output global flag before subcommand dispatch.
args := os.Args[1:]
filtered := make([]string, 0, len(args))
for i := 0; i < len(args); i++ {
a := args[i]
if strings.HasPrefix(a, "--output=") {
outputFormat = strings.TrimPrefix(a, "--output=")
} else if a == "--output" && i+1 < len(args) {
outputFormat = args[i+1]
i++
} else {
filtered = append(filtered, a)
}
}
args = filtered
if len(args) < 2 {
usage()
os.Exit(1)
}
group, sub := args[0], args[1]
rest := args[2:]
switch group {
case "jobs":
runJobs(sub, rest)
case "runs":
runRuns(sub, rest)
case "ops":
runOps(sub, rest)
default:
fmt.Fprintf(os.Stderr, "unknown group %q\n", group)
usage()
os.Exit(1)
}
}
func usage() {
fmt.Fprintln(os.Stderr, `Usage: ha-sync-ctl <group> <command> [flags]
Groups and commands:
jobs list
jobs create --name=X --pair=Y --direction=Z [...]
jobs edit <name-or-id> [--field=val ...]
jobs delete <name-or-id>
jobs enable <name-or-id>
jobs disable <name-or-id>
jobs trigger <name-or-id>
jobs lock-status <name-or-id>
jobs apply-all
jobs import-k8s
runs list [--job=<name>] [--limit=N]
runs show <run-id>
ops list <run-id> [--limit=N] [--action=copy|delete|skip|error]
Global flags:
--output=json Output as JSON instead of table`)
}
// --- env / connection helpers ---
func mustDB() *sql.DB {
dsn := os.Getenv("HA_SYNC_DB_DSN")
if dsn == "" {
fmt.Fprintln(os.Stderr, "error: HA_SYNC_DB_DSN environment variable is required")
os.Exit(1)
}
d, err := dbpkg.Connect(dsn)
if err != nil {
fmt.Fprintf(os.Stderr, "error: connect to DB: %v\n", err)
os.Exit(1)
}
return d
}
func namespace() string {
if ns := os.Getenv("HA_SYNC_NAMESPACE"); ns != "" {
return ns
}
return "infrastructure"
}
// --- job resolution ---
func resolveJob(database *sql.DB, nameOrID string) (*dbpkg.Job, error) {
if id, err := strconv.ParseInt(nameOrID, 10, 64); err == nil {
j, err := dbpkg.GetJobByID(database, id)
if err != nil {
return nil, err
}
if j == nil {
return nil, fmt.Errorf("job with ID %d not found", id)
}
return j, nil
}
j, err := dbpkg.GetJobByName(database, nameOrID)
if err != nil {
return nil, err
}
if j == nil {
return nil, fmt.Errorf("job %q not found", nameOrID)
}
return j, nil
}
// --- formatting helpers ---
func formatSize(bytes int64) string {
if bytes <= 0 {
return "0 B"
}
const unit = 1024.0
if bytes < unit {
return fmt.Sprintf("%d B", bytes)
}
div, exp := int64(unit), 0
for n := bytes / unit; n >= unit; n /= unit {
div *= unit
exp++
}
suffixes := []string{"KB", "MB", "GB", "TB", "PB"}
return fmt.Sprintf("%.1f %s", float64(bytes)/float64(div), suffixes[exp])
}
func formatDuration(d time.Duration) string {
if d < 0 {
d = 0
}
h := int(d.Hours())
m := int(d.Minutes()) % 60
s := int(d.Seconds()) % 60
if h > 0 {
return fmt.Sprintf("%dh%dm", h, m)
}
if m > 0 {
return fmt.Sprintf("%dm%ds", m, s)
}
return fmt.Sprintf("%ds", s)
}
func timeAgo(t time.Time) string {
d := time.Since(t)
if d < 0 {
return "just now"
}
switch {
case d < time.Minute:
return "just now"
case d < time.Hour:
return fmt.Sprintf("%dm ago", int(d.Minutes()))
case d < 24*time.Hour:
return fmt.Sprintf("%dh ago", int(d.Hours()))
default:
return fmt.Sprintf("%dd ago", int(d.Hours())/24)
}
}
func formatDirection(direction, srcHost, destHost string) string {
src, dst := srcHost, destHost
if src == "" {
src = "dell"
}
if dst == "" {
dst = "hp"
}
switch strings.ToLower(direction) {
case "fwd", "dell-to-hp":
return src + "→" + dst
case "rev", "hp-to-dell":
return dst + "→" + src
default:
return direction
}
}
func boolStr(b bool) string {
if b {
return "yes"
}
return "no"
}
// --- job/spec conversion helpers ---
func jobToSpec(j *dbpkg.Job) kube.JobSpec {
return kube.JobSpec{
ID: j.ID,
Name: j.Name,
Pair: j.Pair,
Direction: j.Direction,
Src: j.Src,
Dest: j.Dest,
SrcHost: j.SrcHost,
DestHost: j.DestHost,
CronSchedule: j.CronSchedule,
LockTTLSeconds: j.LockTTLSeconds,
DryRun: j.DryRun,
DeleteMissing: j.DeleteMissing,
Workers: j.Workers,
MtimeThreshold: j.MtimeThreshold,
Excludes: j.Excludes,
Enabled: j.Enabled,
}
}
func specToJobInput(s *kube.JobSpec) dbpkg.JobInput {
return dbpkg.JobInput{
Name: s.Name,
Pair: s.Pair,
Direction: s.Direction,
Src: s.Src,
Dest: s.Dest,
SrcHost: s.SrcHost,
DestHost: s.DestHost,
CronSchedule: s.CronSchedule,
LockTTLSeconds: s.LockTTLSeconds,
DryRun: s.DryRun,
DeleteMissing: s.DeleteMissing,
Workers: s.Workers,
MtimeThreshold: s.MtimeThreshold,
Excludes: s.Excludes,
Enabled: s.Enabled,
}
}
// ============================================================
// jobs commands
// ============================================================
func runJobs(sub string, args []string) {
switch sub {
case "list":
jobsList(args)
case "create":
jobsCreate(args)
case "edit":
jobsEdit(args)
case "delete":
jobsDelete(args)
case "enable":
jobsSetEnabled(args, true)
case "disable":
jobsSetEnabled(args, false)
case "trigger":
jobsTrigger(args)
case "lock-status":
jobsLockStatus(args)
case "apply-all":
jobsApplyAll(args)
case "import-k8s":
jobsImportK8s(args)
default:
fmt.Fprintf(os.Stderr, "unknown jobs command %q\n", sub)
os.Exit(1)
}
}
func jobsList(_ []string) {
database := mustDB()
defer database.Close()
jobs, err := dbpkg.ListJobsWithStats(database)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
if outputFormat == "json" {
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
enc.Encode(jobs)
return
}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintln(w, "NAME\tPAIR\tDIRECTION\tENABLED\tSCHEDULE\tLAST RUN\tSTATUS")
for _, j := range jobs {
lastRun := "never"
status := "-"
if !j.Enabled {
status = "disabled"
}
if j.LastIteration != nil {
lastRun = timeAgo(j.LastIteration.StartedAt)
if j.Enabled {
status = j.LastIteration.Status
}
}
dir := formatDirection(j.Direction, j.SrcHost, j.DestHost)
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n",
j.Name, j.Pair, dir, boolStr(j.Enabled), j.CronSchedule, lastRun, status)
}
w.Flush()
}
func jobsCreate(args []string) {
f := flag.NewFlagSet("jobs create", flag.ExitOnError)
name := f.String("name", "", "job name (required)")
pair := f.String("pair", "", "sync pair (required): media, photos, owncloud, games, infra, ai")
direction := f.String("direction", "dell-to-hp", "dell-to-hp or hp-to-dell")
src := f.String("src", "", "source path")
dest := f.String("dest", "", "destination path")
srcHost := f.String("src-host", "dell", "source host identifier")
destHost := f.String("dest-host", "hp", "destination host identifier")
schedule := f.String("schedule", "0 2 * * *", "cron schedule")
lockTTL := f.Int("lock-ttl", 3600, "lock TTL in seconds")
workers := f.Int("workers", 4, "parallel workers")
dryRun := f.Bool("dry-run", false, "enable dry run mode")
deleteMissing := f.Bool("delete-missing", false, "delete files at dest not in src")
excludes := f.String("excludes", "", "comma-separated exclude patterns")
f.Parse(args)
if *name == "" {
fmt.Fprintln(os.Stderr, "error: --name is required")
os.Exit(1)
}
if *pair == "" {
fmt.Fprintln(os.Stderr, "error: --pair is required")
os.Exit(1)
}
var excList []string
if *excludes != "" {
excList = strings.Split(*excludes, ",")
}
in := dbpkg.JobInput{
Name: *name,
Pair: *pair,
Direction: *direction,
Src: *src,
Dest: *dest,
SrcHost: *srcHost,
DestHost: *destHost,
CronSchedule: *schedule,
LockTTLSeconds: *lockTTL,
Workers: *workers,
DryRun: *dryRun,
DeleteMissing: *deleteMissing,
Excludes: excList,
Enabled: true,
}
database := mustDB()
defer database.Close()
job, err := dbpkg.CreateJob(database, in)
if err != nil {
fmt.Fprintf(os.Stderr, "error: create job: %v\n", err)
os.Exit(1)
}
if outputFormat == "json" {
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
enc.Encode(job)
return
}
fmt.Printf("created job %q (id=%d)\n", job.Name, job.ID)
}
func jobsEdit(args []string) {
if len(args) == 0 {
fmt.Fprintln(os.Stderr, "error: jobs edit requires <name-or-id>")
os.Exit(1)
}
nameOrID := args[0]
rest := args[1:]
f := flag.NewFlagSet("jobs edit", flag.ExitOnError)
pair := f.String("pair", "", "sync pair")
direction := f.String("direction", "", "dell-to-hp or hp-to-dell")
src := f.String("src", "", "source path")
dest := f.String("dest", "", "destination path")
srcHost := f.String("src-host", "", "source host identifier")
destHost := f.String("dest-host", "", "destination host identifier")
schedule := f.String("schedule", "", "cron schedule")
lockTTL := f.Int("lock-ttl", 0, "lock TTL in seconds")
workers := f.Int("workers", 0, "parallel workers")
dryRun := f.Bool("dry-run", false, "enable dry run mode")
deleteMissing := f.Bool("delete-missing", false, "delete files at dest not in src")
excludes := f.String("excludes", "", "comma-separated exclude patterns")
f.Parse(rest)
database := mustDB()
defer database.Close()
job, err := resolveJob(database, nameOrID)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
// Seed with existing values; only override what was explicitly set.
in := dbpkg.JobInput{
Name: job.Name,
Pair: job.Pair,
Direction: job.Direction,
Src: job.Src,
Dest: job.Dest,
SrcHost: job.SrcHost,
DestHost: job.DestHost,
CronSchedule: job.CronSchedule,
LockTTLSeconds: job.LockTTLSeconds,
Workers: job.Workers,
DryRun: job.DryRun,
DeleteMissing: job.DeleteMissing,
Excludes: job.Excludes,
MtimeThreshold: job.MtimeThreshold,
Enabled: job.Enabled,
}
f.Visit(func(fl *flag.Flag) {
switch fl.Name {
case "pair":
in.Pair = *pair
case "direction":
in.Direction = *direction
case "src":
in.Src = *src
case "dest":
in.Dest = *dest
case "src-host":
in.SrcHost = *srcHost
case "dest-host":
in.DestHost = *destHost
case "schedule":
in.CronSchedule = *schedule
case "lock-ttl":
in.LockTTLSeconds = *lockTTL
case "workers":
in.Workers = *workers
case "dry-run":
in.DryRun = *dryRun
case "delete-missing":
in.DeleteMissing = *deleteMissing
case "excludes":
if *excludes != "" {
in.Excludes = strings.Split(*excludes, ",")
} else {
in.Excludes = nil
}
}
})
updated, err := dbpkg.UpdateJob(database, job.ID, in)
if err != nil {
fmt.Fprintf(os.Stderr, "error: update job: %v\n", err)
os.Exit(1)
}
if outputFormat == "json" {
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
enc.Encode(updated)
return
}
fmt.Printf("updated job %q (id=%d)\n", updated.Name, updated.ID)
}
func jobsDelete(args []string) {
if len(args) == 0 {
fmt.Fprintln(os.Stderr, "error: jobs delete requires <name-or-id>")
os.Exit(1)
}
nameOrID := args[0]
database := mustDB()
defer database.Close()
job, err := resolveJob(database, nameOrID)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
client, kubeErr := kube.NewClient()
if kubeErr != nil {
fmt.Fprintf(os.Stderr, "warning: K8s unavailable, skipping CronJob deletion: %v\n", kubeErr)
} else {
if err := kube.DeleteCronJob(context.Background(), client, namespace(), job.Name); err != nil {
fmt.Fprintf(os.Stderr, "warning: delete K8s CronJob: %v\n", err)
}
}
if err := dbpkg.DeleteJob(database, job.ID); err != nil {
fmt.Fprintf(os.Stderr, "error: delete job: %v\n", err)
os.Exit(1)
}
fmt.Printf("deleted job %q (id=%d)\n", job.Name, job.ID)
}
func jobsSetEnabled(args []string, enable bool) {
verb := "enable"
if !enable {
verb = "disable"
}
if len(args) == 0 {
fmt.Fprintf(os.Stderr, "error: jobs %s requires <name-or-id>\n", verb)
os.Exit(1)
}
nameOrID := args[0]
database := mustDB()
defer database.Close()
job, err := resolveJob(database, nameOrID)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
if err := dbpkg.SetJobEnabled(database, job.ID, enable); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
client, kubeErr := kube.NewClient()
if kubeErr != nil {
fmt.Fprintf(os.Stderr, "warning: K8s unavailable, skipping CronJob suspend toggle: %v\n", kubeErr)
} else {
if err := kube.SuspendCronJob(context.Background(), client, namespace(), job.Name, !enable); err != nil {
fmt.Fprintf(os.Stderr, "warning: suspend CronJob: %v\n", err)
}
}
pastTense := "enabled"
if !enable {
pastTense = "disabled"
}
fmt.Printf("%s job %q (id=%d)\n", pastTense, job.Name, job.ID)
}
func jobsTrigger(args []string) {
if len(args) == 0 {
fmt.Fprintln(os.Stderr, "error: jobs trigger requires <name-or-id>")
os.Exit(1)
}
nameOrID := args[0]
database := mustDB()
defer database.Close()
job, err := resolveJob(database, nameOrID)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
client, kubeErr := kube.NewClient()
if kubeErr != nil {
fmt.Fprintf(os.Stderr, "error: K8s unavailable: %v\n", kubeErr)
os.Exit(1)
}
created, err := kube.TriggerJob(context.Background(), client, namespace(), job.Name)
if err != nil {
fmt.Fprintf(os.Stderr, "error: trigger job: %v\n", err)
os.Exit(1)
}
fmt.Printf("triggered job %q → K8s job %q\n", job.Name, created)
}
func jobsLockStatus(args []string) {
if len(args) == 0 {
fmt.Fprintln(os.Stderr, "error: jobs lock-status requires <name-or-id>")
os.Exit(1)
}
nameOrID := args[0]
database := mustDB()
defer database.Close()
job, err := resolveJob(database, nameOrID)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
client, kubeErr := kube.NewClient()
if kubeErr != nil {
fmt.Fprintf(os.Stderr, "error: K8s unavailable: %v\n", kubeErr)
os.Exit(1)
}
ls, err := kube.GetLockStatus(context.Background(), client, namespace(), job.Name)
if err != nil {
fmt.Fprintf(os.Stderr, "error: get lock status: %v\n", err)
os.Exit(1)
}
if outputFormat == "json" {
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
enc.Encode(ls)
return
}
if ls.Locked {
fmt.Printf("job %q is LOCKED\n holder: %s\n expires: %s\n",
job.Name, ls.Holder, ls.ExpiresAt.Format(time.RFC3339))
} else {
fmt.Printf("job %q is not locked\n", job.Name)
}
}
func jobsApplyAll(_ []string) {
database := mustDB()
defer database.Close()
jobs, err := dbpkg.ListJobs(database)
if err != nil {
fmt.Fprintf(os.Stderr, "error: list jobs: %v\n", err)
os.Exit(1)
}
client, kubeErr := kube.NewClient()
if kubeErr != nil {
fmt.Fprintf(os.Stderr, "error: K8s unavailable: %v\n", kubeErr)
os.Exit(1)
}
ctx := context.Background()
ns := namespace()
applied := 0
for _, j := range jobs {
if !j.Enabled {
continue
}
if err := kube.ApplyCronJob(ctx, client, ns, jobToSpec(j)); err != nil {
fmt.Fprintf(os.Stderr, "warning: apply %q: %v\n", j.Name, err)
continue
}
applied++
}
fmt.Printf("applied %d CronJob(s)\n", applied)
}
func jobsImportK8s(_ []string) {
client, kubeErr := kube.NewClient()
if kubeErr != nil {
fmt.Fprintf(os.Stderr, "error: K8s unavailable: %v\n", kubeErr)
os.Exit(1)
}
cjs, err := kube.ListCronJobs(context.Background(), client, namespace())
if err != nil {
fmt.Fprintf(os.Stderr, "error: list CronJobs: %v\n", err)
os.Exit(1)
}
database := mustDB()
defer database.Close()
imported, existed := 0, 0
for _, cj := range cjs {
spec := kube.ImportFromCronJob(cj)
if spec == nil {
continue
}
// Check existence before importing to distinguish new vs existing.
existing, _ := dbpkg.GetJobByName(database, spec.Name)
_, err := dbpkg.ImportJob(database, specToJobInput(spec))
if err != nil {
fmt.Fprintf(os.Stderr, "warning: import %q: %v\n", spec.Name, err)
continue
}
if existing != nil {
existed++
} else {
imported++
}
}
fmt.Printf("imported %d, already existed %d\n", imported, existed)
}
// ============================================================
// runs commands
// ============================================================
func runRuns(sub string, args []string) {
switch sub {
case "list":
runsList(args)
case "show":
runsShow(args)
default:
fmt.Fprintf(os.Stderr, "unknown runs command %q\n", sub)
os.Exit(1)
}
}
type runRow struct {
ID int64 `json:"id"`
JobName string `json:"job"`
StartedAt time.Time `json:"started_at"`
EndedAt *time.Time `json:"ended_at,omitempty"`
Duration string `json:"duration"`
Copied int `json:"copied"`
Deleted int `json:"deleted"`
Skipped int `json:"skipped"`
Errors int `json:"errors"`
Status string `json:"status"`
}
func runsList(args []string) {
f := flag.NewFlagSet("runs list", flag.ExitOnError)
jobName := f.String("job", "", "filter by job name")
limit := f.Int("limit", 20, "max results")
f.Parse(args)
database := mustDB()
defer database.Close()
const baseQuery = `
SELECT i.id, COALESCE(j.name, i.sync_pair), i.started_at, i.ended_at,
i.files_created + i.files_updated, i.files_deleted, i.files_skipped,
i.files_failed, i.status
FROM sync_iterations i
LEFT JOIN sync_jobs j ON j.pair = i.sync_pair AND j.direction = i.direction`
var (
rows *sql.Rows
err error
)
if *jobName != "" {
job, err2 := dbpkg.GetJobByName(database, *jobName)
if err2 != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err2)
os.Exit(1)
}
if job == nil {
fmt.Fprintf(os.Stderr, "error: job %q not found\n", *jobName)
os.Exit(1)
}
rows, err = database.Query(
baseQuery+` WHERE i.sync_pair = ? AND i.direction = ? ORDER BY i.started_at DESC LIMIT ?`,
job.Pair, job.Direction, *limit)
} else {
rows, err = database.Query(baseQuery+` ORDER BY i.started_at DESC LIMIT ?`, *limit)
}
if err != nil {
fmt.Fprintf(os.Stderr, "error: query runs: %v\n", err)
os.Exit(1)
}
defer rows.Close()
var runs []runRow
for rows.Next() {
var r runRow
var endedAt sql.NullTime
var copied, deleted, skipped, errors int
if err := rows.Scan(&r.ID, &r.JobName, &r.StartedAt, &endedAt,
&copied, &deleted, &skipped, &errors, &r.Status); err != nil {
fmt.Fprintf(os.Stderr, "error: scan run: %v\n", err)
os.Exit(1)
}
if endedAt.Valid {
r.EndedAt = &endedAt.Time
r.Duration = formatDuration(endedAt.Time.Sub(r.StartedAt))
} else {
r.Duration = "running"
}
r.Copied = copied
r.Deleted = deleted
r.Skipped = skipped
r.Errors = errors
runs = append(runs, r)
}
if rows.Err() != nil {
fmt.Fprintf(os.Stderr, "error: rows: %v\n", rows.Err())
os.Exit(1)
}
if outputFormat == "json" {
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
enc.Encode(runs)
return
}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintln(w, "RUN ID\tJOB\tSTARTED\tDURATION\tCOPIED\tDELETED\tSKIPPED\tERRORS")
for _, r := range runs {
fmt.Fprintf(w, "%d\t%s\t%s\t%s\t%d\t%d\t%d\t%d\n",
r.ID, r.JobName,
r.StartedAt.Format("2006-01-02 15:04"),
r.Duration, r.Copied, r.Deleted, r.Skipped, r.Errors)
}
w.Flush()
}
func runsShow(args []string) {
if len(args) == 0 {
fmt.Fprintln(os.Stderr, "error: runs show requires <run-id>")
os.Exit(1)
}
runID, err := strconv.ParseInt(args[0], 10, 64)
if err != nil {
fmt.Fprintf(os.Stderr, "error: invalid run ID %q\n", args[0])
os.Exit(1)
}
database := mustDB()
defer database.Close()
type runDetail struct {
ID int64 `json:"id"`
JobName string `json:"job"`
Pair string `json:"pair"`
Direction string `json:"direction"`
StartedAt time.Time `json:"started_at"`
EndedAt *time.Time `json:"ended_at,omitempty"`
Duration string `json:"duration"`
Status string `json:"status"`
DryRun bool `json:"dry_run"`
FilesCreated int `json:"files_created"`
FilesUpdated int `json:"files_updated"`
FilesDeleted int `json:"files_deleted"`
FilesSkipped int `json:"files_skipped"`
FilesFailed int `json:"files_failed"`
TotalBytesTransferred int64 `json:"total_bytes_transferred"`
ErrorMessage *string `json:"error_message,omitempty"`
}
var d runDetail
var endedAt sql.NullTime
var dryRun int
var errMsg sql.NullString
err = database.QueryRow(`
SELECT i.id, COALESCE(j.name, i.sync_pair), i.sync_pair, i.direction,
i.started_at, i.ended_at, i.status, i.dry_run,
i.files_created, i.files_updated, i.files_deleted,
i.files_skipped, i.files_failed, i.total_bytes_transferred, i.error_message
FROM sync_iterations i
LEFT JOIN sync_jobs j ON j.pair = i.sync_pair AND j.direction = i.direction
WHERE i.id = ?`, runID).Scan(
&d.ID, &d.JobName, &d.Pair, &d.Direction,
&d.StartedAt, &endedAt, &d.Status, &dryRun,
&d.FilesCreated, &d.FilesUpdated, &d.FilesDeleted,
&d.FilesSkipped, &d.FilesFailed, &d.TotalBytesTransferred, &errMsg,
)
if err == sql.ErrNoRows {
fmt.Fprintf(os.Stderr, "error: run %d not found\n", runID)
os.Exit(1)
}
if err != nil {
fmt.Fprintf(os.Stderr, "error: query run: %v\n", err)
os.Exit(1)
}
d.DryRun = dryRun == 1
if endedAt.Valid {
d.EndedAt = &endedAt.Time
d.Duration = formatDuration(endedAt.Time.Sub(d.StartedAt))
} else {
d.Duration = "running"
}
if errMsg.Valid {
d.ErrorMessage = &errMsg.String
}
if outputFormat == "json" {
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
enc.Encode(d)
return
}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintf(w, "Run ID:\t%d\n", d.ID)
fmt.Fprintf(w, "Job:\t%s\n", d.JobName)
fmt.Fprintf(w, "Pair:\t%s\n", d.Pair)
fmt.Fprintf(w, "Direction:\t%s\n", d.Direction)
fmt.Fprintf(w, "Status:\t%s\n", d.Status)
fmt.Fprintf(w, "Dry Run:\t%s\n", boolStr(d.DryRun))
fmt.Fprintf(w, "Started:\t%s\n", d.StartedAt.Format(time.RFC3339))
if d.EndedAt != nil {
fmt.Fprintf(w, "Ended:\t%s\n", d.EndedAt.Format(time.RFC3339))
}
fmt.Fprintf(w, "Duration:\t%s\n", d.Duration)
fmt.Fprintf(w, "Created:\t%d\n", d.FilesCreated)
fmt.Fprintf(w, "Updated:\t%d\n", d.FilesUpdated)
fmt.Fprintf(w, "Deleted:\t%d\n", d.FilesDeleted)
fmt.Fprintf(w, "Skipped:\t%d\n", d.FilesSkipped)
fmt.Fprintf(w, "Failed:\t%d\n", d.FilesFailed)
fmt.Fprintf(w, "Bytes Transferred:\t%s\n", formatSize(d.TotalBytesTransferred))
if d.ErrorMessage != nil {
fmt.Fprintf(w, "Error:\t%s\n", *d.ErrorMessage)
}
w.Flush()
}
// ============================================================
// ops commands
// ============================================================
func runOps(sub string, args []string) {
switch sub {
case "list":
opsList(args)
default:
fmt.Fprintf(os.Stderr, "unknown ops command %q\n", sub)
os.Exit(1)
}
}
func opsList(args []string) {
if len(args) == 0 {
fmt.Fprintln(os.Stderr, "error: ops list requires <run-id>")
os.Exit(1)
}
runID, err := strconv.ParseInt(args[0], 10, 64)
if err != nil {
fmt.Fprintf(os.Stderr, "error: invalid run ID %q\n", args[0])
os.Exit(1)
}
f := flag.NewFlagSet("ops list", flag.ExitOnError)
limit := f.Int("limit", 100, "max results")
action := f.String("action", "", "filter by action: copy, delete, skip, error")
f.Parse(args[1:])
database := mustDB()
defer database.Close()
query := `
SELECT operation, filepath, COALESCE(src_host,''), COALESCE(dest_host,''),
COALESCE(size_before, size_after, 0), COALESCE(owner,'')
FROM sync_operations
WHERE iteration_id = ?`
qArgs := []interface{}{runID}
switch *action {
case "copy":
query += " AND (operation = 'create' OR operation = 'update')"
case "delete":
query += " AND operation = 'delete'"
case "skip":
query += " AND status = 'skip'"
case "error":
query += " AND status = 'fail'"
case "":
// no filter
default:
fmt.Fprintf(os.Stderr, "error: unknown action %q (use copy, delete, skip, error)\n", *action)
os.Exit(1)
}
query += " ORDER BY started_at ASC LIMIT ?"
qArgs = append(qArgs, *limit)
rows, err := database.Query(query, qArgs...)
if err != nil {
fmt.Fprintf(os.Stderr, "error: query ops: %v\n", err)
os.Exit(1)
}
defer rows.Close()
type opRow struct {
Action string `json:"action"`
Path string `json:"path"`
FromTo string `json:"from_to"`
Size string `json:"size"`
Owner string `json:"owner"`
}
var ops []opRow
for rows.Next() {
var op opRow
var operation, srcHost, destHost, owner string
var size int64
if err := rows.Scan(&operation, &op.Path, &srcHost, &destHost, &size, &owner); err != nil {
fmt.Fprintf(os.Stderr, "error: scan op: %v\n", err)
os.Exit(1)
}
switch operation {
case "create", "update":
op.Action = "copy"
default:
op.Action = operation
}
if srcHost != "" && destHost != "" {
op.FromTo = srcHost + "→" + destHost
} else {
op.FromTo = "-"
}
op.Size = formatSize(size)
op.Owner = owner
ops = append(ops, op)
}
if rows.Err() != nil {
fmt.Fprintf(os.Stderr, "error: rows: %v\n", rows.Err())
os.Exit(1)
}
if outputFormat == "json" {
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
enc.Encode(ops)
return
}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintln(w, "ACTION\tPATH\tFROM→TO\tSIZE\tOWNER")
for _, op := range ops {
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n",
op.Action, op.Path, op.FromTo, op.Size, op.Owner)
}
w.Flush()
}