package main import ( "context" "database/sql" "encoding/json" "flag" "fmt" "os" "strconv" "strings" "text/tabwriter" "time" dbpkg "github.com/vandachevici/ha-sync/internal/db" "github.com/vandachevici/ha-sync/internal/kube" ) var outputFormat = "table" func main() { // Extract --output global flag before subcommand dispatch. args := os.Args[1:] filtered := make([]string, 0, len(args)) for i := 0; i < len(args); i++ { a := args[i] if strings.HasPrefix(a, "--output=") { outputFormat = strings.TrimPrefix(a, "--output=") } else if a == "--output" && i+1 < len(args) { outputFormat = args[i+1] i++ } else { filtered = append(filtered, a) } } args = filtered if len(args) < 2 { usage() os.Exit(1) } group, sub := args[0], args[1] rest := args[2:] switch group { case "jobs": runJobs(sub, rest) case "runs": runRuns(sub, rest) case "ops": runOps(sub, rest) default: fmt.Fprintf(os.Stderr, "unknown group %q\n", group) usage() os.Exit(1) } } func usage() { fmt.Fprintln(os.Stderr, `Usage: ha-sync-ctl [flags] Groups and commands: jobs list jobs create --name=X --pair=Y --direction=Z [...] jobs edit [--field=val ...] jobs delete jobs enable jobs disable jobs trigger jobs lock-status jobs apply-all jobs import-k8s runs list [--job=] [--limit=N] runs show ops list [--limit=N] [--action=copy|delete|skip|error] Global flags: --output=json Output as JSON instead of table`) } // --- env / connection helpers --- func mustDB() *sql.DB { dsn := os.Getenv("HA_SYNC_DB_DSN") if dsn == "" { fmt.Fprintln(os.Stderr, "error: HA_SYNC_DB_DSN environment variable is required") os.Exit(1) } d, err := dbpkg.Connect(dsn) if err != nil { fmt.Fprintf(os.Stderr, "error: connect to DB: %v\n", err) os.Exit(1) } return d } func namespace() string { if ns := os.Getenv("HA_SYNC_NAMESPACE"); ns != "" { return ns } return "infrastructure" } // --- job resolution --- func resolveJob(database *sql.DB, nameOrID string) (*dbpkg.Job, error) { if id, err := strconv.ParseInt(nameOrID, 10, 64); err == nil { j, err := dbpkg.GetJobByID(database, id) if err != nil { return nil, err } if j == nil { return nil, fmt.Errorf("job with ID %d not found", id) } return j, nil } j, err := dbpkg.GetJobByName(database, nameOrID) if err != nil { return nil, err } if j == nil { return nil, fmt.Errorf("job %q not found", nameOrID) } return j, nil } // --- formatting helpers --- func formatSize(bytes int64) string { if bytes <= 0 { return "0 B" } const unit = 1024.0 if bytes < unit { return fmt.Sprintf("%d B", bytes) } div, exp := int64(unit), 0 for n := bytes / unit; n >= unit; n /= unit { div *= unit exp++ } suffixes := []string{"KB", "MB", "GB", "TB", "PB"} return fmt.Sprintf("%.1f %s", float64(bytes)/float64(div), suffixes[exp]) } func formatDuration(d time.Duration) string { if d < 0 { d = 0 } h := int(d.Hours()) m := int(d.Minutes()) % 60 s := int(d.Seconds()) % 60 if h > 0 { return fmt.Sprintf("%dh%dm", h, m) } if m > 0 { return fmt.Sprintf("%dm%ds", m, s) } return fmt.Sprintf("%ds", s) } func timeAgo(t time.Time) string { d := time.Since(t) if d < 0 { return "just now" } switch { case d < time.Minute: return "just now" case d < time.Hour: return fmt.Sprintf("%dm ago", int(d.Minutes())) case d < 24*time.Hour: return fmt.Sprintf("%dh ago", int(d.Hours())) default: return fmt.Sprintf("%dd ago", int(d.Hours())/24) } } func formatDirection(direction, srcHost, destHost string) string { src, dst := srcHost, destHost if src == "" { src = "dell" } if dst == "" { dst = "hp" } switch strings.ToLower(direction) { case "fwd", "dell-to-hp": return src + "→" + dst case "rev", "hp-to-dell": return dst + "→" + src default: return direction } } func boolStr(b bool) string { if b { return "yes" } return "no" } // --- job/spec conversion helpers --- func jobToSpec(j *dbpkg.Job) kube.JobSpec { return kube.JobSpec{ ID: j.ID, Name: j.Name, Pair: j.Pair, Direction: j.Direction, Src: j.Src, Dest: j.Dest, SrcHost: j.SrcHost, DestHost: j.DestHost, CronSchedule: j.CronSchedule, LockTTLSeconds: j.LockTTLSeconds, DryRun: j.DryRun, DeleteMissing: j.DeleteMissing, Workers: j.Workers, MtimeThreshold: j.MtimeThreshold, Excludes: j.Excludes, Enabled: j.Enabled, } } func specToJobInput(s *kube.JobSpec) dbpkg.JobInput { return dbpkg.JobInput{ Name: s.Name, Pair: s.Pair, Direction: s.Direction, Src: s.Src, Dest: s.Dest, SrcHost: s.SrcHost, DestHost: s.DestHost, CronSchedule: s.CronSchedule, LockTTLSeconds: s.LockTTLSeconds, DryRun: s.DryRun, DeleteMissing: s.DeleteMissing, Workers: s.Workers, MtimeThreshold: s.MtimeThreshold, Excludes: s.Excludes, Enabled: s.Enabled, } } // ============================================================ // jobs commands // ============================================================ func runJobs(sub string, args []string) { switch sub { case "list": jobsList(args) case "create": jobsCreate(args) case "edit": jobsEdit(args) case "delete": jobsDelete(args) case "enable": jobsSetEnabled(args, true) case "disable": jobsSetEnabled(args, false) case "trigger": jobsTrigger(args) case "lock-status": jobsLockStatus(args) case "apply-all": jobsApplyAll(args) case "import-k8s": jobsImportK8s(args) default: fmt.Fprintf(os.Stderr, "unknown jobs command %q\n", sub) os.Exit(1) } } func jobsList(_ []string) { database := mustDB() defer database.Close() jobs, err := dbpkg.ListJobsWithStats(database) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } if outputFormat == "json" { enc := json.NewEncoder(os.Stdout) enc.SetIndent("", " ") enc.Encode(jobs) return } w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) fmt.Fprintln(w, "NAME\tPAIR\tDIRECTION\tENABLED\tSCHEDULE\tLAST RUN\tSTATUS") for _, j := range jobs { lastRun := "never" status := "-" if !j.Enabled { status = "disabled" } if j.LastIteration != nil { lastRun = timeAgo(j.LastIteration.StartedAt) if j.Enabled { status = j.LastIteration.Status } } dir := formatDirection(j.Direction, j.SrcHost, j.DestHost) fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n", j.Name, j.Pair, dir, boolStr(j.Enabled), j.CronSchedule, lastRun, status) } w.Flush() } func jobsCreate(args []string) { f := flag.NewFlagSet("jobs create", flag.ExitOnError) name := f.String("name", "", "job name (required)") pair := f.String("pair", "", "sync pair (required): media, photos, owncloud, games, infra, ai") direction := f.String("direction", "dell-to-hp", "dell-to-hp or hp-to-dell") src := f.String("src", "", "source path") dest := f.String("dest", "", "destination path") srcHost := f.String("src-host", "dell", "source host identifier") destHost := f.String("dest-host", "hp", "destination host identifier") schedule := f.String("schedule", "0 2 * * *", "cron schedule") lockTTL := f.Int("lock-ttl", 3600, "lock TTL in seconds") workers := f.Int("workers", 4, "parallel workers") dryRun := f.Bool("dry-run", false, "enable dry run mode") deleteMissing := f.Bool("delete-missing", false, "delete files at dest not in src") excludes := f.String("excludes", "", "comma-separated exclude patterns") f.Parse(args) if *name == "" { fmt.Fprintln(os.Stderr, "error: --name is required") os.Exit(1) } if *pair == "" { fmt.Fprintln(os.Stderr, "error: --pair is required") os.Exit(1) } var excList []string if *excludes != "" { excList = strings.Split(*excludes, ",") } in := dbpkg.JobInput{ Name: *name, Pair: *pair, Direction: *direction, Src: *src, Dest: *dest, SrcHost: *srcHost, DestHost: *destHost, CronSchedule: *schedule, LockTTLSeconds: *lockTTL, Workers: *workers, DryRun: *dryRun, DeleteMissing: *deleteMissing, Excludes: excList, Enabled: true, } database := mustDB() defer database.Close() job, err := dbpkg.CreateJob(database, in) if err != nil { fmt.Fprintf(os.Stderr, "error: create job: %v\n", err) os.Exit(1) } if outputFormat == "json" { enc := json.NewEncoder(os.Stdout) enc.SetIndent("", " ") enc.Encode(job) return } fmt.Printf("created job %q (id=%d)\n", job.Name, job.ID) } func jobsEdit(args []string) { if len(args) == 0 { fmt.Fprintln(os.Stderr, "error: jobs edit requires ") os.Exit(1) } nameOrID := args[0] rest := args[1:] f := flag.NewFlagSet("jobs edit", flag.ExitOnError) pair := f.String("pair", "", "sync pair") direction := f.String("direction", "", "dell-to-hp or hp-to-dell") src := f.String("src", "", "source path") dest := f.String("dest", "", "destination path") srcHost := f.String("src-host", "", "source host identifier") destHost := f.String("dest-host", "", "destination host identifier") schedule := f.String("schedule", "", "cron schedule") lockTTL := f.Int("lock-ttl", 0, "lock TTL in seconds") workers := f.Int("workers", 0, "parallel workers") dryRun := f.Bool("dry-run", false, "enable dry run mode") deleteMissing := f.Bool("delete-missing", false, "delete files at dest not in src") excludes := f.String("excludes", "", "comma-separated exclude patterns") f.Parse(rest) database := mustDB() defer database.Close() job, err := resolveJob(database, nameOrID) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } // Seed with existing values; only override what was explicitly set. in := dbpkg.JobInput{ Name: job.Name, Pair: job.Pair, Direction: job.Direction, Src: job.Src, Dest: job.Dest, SrcHost: job.SrcHost, DestHost: job.DestHost, CronSchedule: job.CronSchedule, LockTTLSeconds: job.LockTTLSeconds, Workers: job.Workers, DryRun: job.DryRun, DeleteMissing: job.DeleteMissing, Excludes: job.Excludes, MtimeThreshold: job.MtimeThreshold, Enabled: job.Enabled, } f.Visit(func(fl *flag.Flag) { switch fl.Name { case "pair": in.Pair = *pair case "direction": in.Direction = *direction case "src": in.Src = *src case "dest": in.Dest = *dest case "src-host": in.SrcHost = *srcHost case "dest-host": in.DestHost = *destHost case "schedule": in.CronSchedule = *schedule case "lock-ttl": in.LockTTLSeconds = *lockTTL case "workers": in.Workers = *workers case "dry-run": in.DryRun = *dryRun case "delete-missing": in.DeleteMissing = *deleteMissing case "excludes": if *excludes != "" { in.Excludes = strings.Split(*excludes, ",") } else { in.Excludes = nil } } }) updated, err := dbpkg.UpdateJob(database, job.ID, in) if err != nil { fmt.Fprintf(os.Stderr, "error: update job: %v\n", err) os.Exit(1) } if outputFormat == "json" { enc := json.NewEncoder(os.Stdout) enc.SetIndent("", " ") enc.Encode(updated) return } fmt.Printf("updated job %q (id=%d)\n", updated.Name, updated.ID) } func jobsDelete(args []string) { if len(args) == 0 { fmt.Fprintln(os.Stderr, "error: jobs delete requires ") os.Exit(1) } nameOrID := args[0] database := mustDB() defer database.Close() job, err := resolveJob(database, nameOrID) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } client, kubeErr := kube.NewClient() if kubeErr != nil { fmt.Fprintf(os.Stderr, "warning: K8s unavailable, skipping CronJob deletion: %v\n", kubeErr) } else { if err := kube.DeleteCronJob(context.Background(), client, namespace(), job.Name); err != nil { fmt.Fprintf(os.Stderr, "warning: delete K8s CronJob: %v\n", err) } } if err := dbpkg.DeleteJob(database, job.ID); err != nil { fmt.Fprintf(os.Stderr, "error: delete job: %v\n", err) os.Exit(1) } fmt.Printf("deleted job %q (id=%d)\n", job.Name, job.ID) } func jobsSetEnabled(args []string, enable bool) { verb := "enable" if !enable { verb = "disable" } if len(args) == 0 { fmt.Fprintf(os.Stderr, "error: jobs %s requires \n", verb) os.Exit(1) } nameOrID := args[0] database := mustDB() defer database.Close() job, err := resolveJob(database, nameOrID) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } if err := dbpkg.SetJobEnabled(database, job.ID, enable); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } client, kubeErr := kube.NewClient() if kubeErr != nil { fmt.Fprintf(os.Stderr, "warning: K8s unavailable, skipping CronJob suspend toggle: %v\n", kubeErr) } else { if err := kube.SuspendCronJob(context.Background(), client, namespace(), job.Name, !enable); err != nil { fmt.Fprintf(os.Stderr, "warning: suspend CronJob: %v\n", err) } } pastTense := "enabled" if !enable { pastTense = "disabled" } fmt.Printf("%s job %q (id=%d)\n", pastTense, job.Name, job.ID) } func jobsTrigger(args []string) { if len(args) == 0 { fmt.Fprintln(os.Stderr, "error: jobs trigger requires ") os.Exit(1) } nameOrID := args[0] database := mustDB() defer database.Close() job, err := resolveJob(database, nameOrID) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } client, kubeErr := kube.NewClient() if kubeErr != nil { fmt.Fprintf(os.Stderr, "error: K8s unavailable: %v\n", kubeErr) os.Exit(1) } created, err := kube.TriggerJob(context.Background(), client, namespace(), job.Name) if err != nil { fmt.Fprintf(os.Stderr, "error: trigger job: %v\n", err) os.Exit(1) } fmt.Printf("triggered job %q → K8s job %q\n", job.Name, created) } func jobsLockStatus(args []string) { if len(args) == 0 { fmt.Fprintln(os.Stderr, "error: jobs lock-status requires ") os.Exit(1) } nameOrID := args[0] database := mustDB() defer database.Close() job, err := resolveJob(database, nameOrID) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } client, kubeErr := kube.NewClient() if kubeErr != nil { fmt.Fprintf(os.Stderr, "error: K8s unavailable: %v\n", kubeErr) os.Exit(1) } ls, err := kube.GetLockStatus(context.Background(), client, namespace(), job.Name) if err != nil { fmt.Fprintf(os.Stderr, "error: get lock status: %v\n", err) os.Exit(1) } if outputFormat == "json" { enc := json.NewEncoder(os.Stdout) enc.SetIndent("", " ") enc.Encode(ls) return } if ls.Locked { fmt.Printf("job %q is LOCKED\n holder: %s\n expires: %s\n", job.Name, ls.Holder, ls.ExpiresAt.Format(time.RFC3339)) } else { fmt.Printf("job %q is not locked\n", job.Name) } } func jobsApplyAll(_ []string) { database := mustDB() defer database.Close() jobs, err := dbpkg.ListJobs(database) if err != nil { fmt.Fprintf(os.Stderr, "error: list jobs: %v\n", err) os.Exit(1) } client, kubeErr := kube.NewClient() if kubeErr != nil { fmt.Fprintf(os.Stderr, "error: K8s unavailable: %v\n", kubeErr) os.Exit(1) } ctx := context.Background() ns := namespace() applied := 0 for _, j := range jobs { if !j.Enabled { continue } if err := kube.ApplyCronJob(ctx, client, ns, jobToSpec(j)); err != nil { fmt.Fprintf(os.Stderr, "warning: apply %q: %v\n", j.Name, err) continue } applied++ } fmt.Printf("applied %d CronJob(s)\n", applied) } func jobsImportK8s(_ []string) { client, kubeErr := kube.NewClient() if kubeErr != nil { fmt.Fprintf(os.Stderr, "error: K8s unavailable: %v\n", kubeErr) os.Exit(1) } cjs, err := kube.ListCronJobs(context.Background(), client, namespace()) if err != nil { fmt.Fprintf(os.Stderr, "error: list CronJobs: %v\n", err) os.Exit(1) } database := mustDB() defer database.Close() imported, existed := 0, 0 for _, cj := range cjs { spec := kube.ImportFromCronJob(cj) if spec == nil { continue } // Check existence before importing to distinguish new vs existing. existing, _ := dbpkg.GetJobByName(database, spec.Name) _, err := dbpkg.ImportJob(database, specToJobInput(spec)) if err != nil { fmt.Fprintf(os.Stderr, "warning: import %q: %v\n", spec.Name, err) continue } if existing != nil { existed++ } else { imported++ } } fmt.Printf("imported %d, already existed %d\n", imported, existed) } // ============================================================ // runs commands // ============================================================ func runRuns(sub string, args []string) { switch sub { case "list": runsList(args) case "show": runsShow(args) default: fmt.Fprintf(os.Stderr, "unknown runs command %q\n", sub) os.Exit(1) } } type runRow struct { ID int64 `json:"id"` JobName string `json:"job"` StartedAt time.Time `json:"started_at"` EndedAt *time.Time `json:"ended_at,omitempty"` Duration string `json:"duration"` Copied int `json:"copied"` Deleted int `json:"deleted"` Skipped int `json:"skipped"` Errors int `json:"errors"` Status string `json:"status"` } func runsList(args []string) { f := flag.NewFlagSet("runs list", flag.ExitOnError) jobName := f.String("job", "", "filter by job name") limit := f.Int("limit", 20, "max results") f.Parse(args) database := mustDB() defer database.Close() const baseQuery = ` SELECT i.id, COALESCE(j.name, i.sync_pair), i.started_at, i.ended_at, i.files_created + i.files_updated, i.files_deleted, i.files_skipped, i.files_failed, i.status FROM sync_iterations i LEFT JOIN sync_jobs j ON j.pair = i.sync_pair AND j.direction = i.direction` var ( rows *sql.Rows err error ) if *jobName != "" { job, err2 := dbpkg.GetJobByName(database, *jobName) if err2 != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err2) os.Exit(1) } if job == nil { fmt.Fprintf(os.Stderr, "error: job %q not found\n", *jobName) os.Exit(1) } rows, err = database.Query( baseQuery+` WHERE i.sync_pair = ? AND i.direction = ? ORDER BY i.started_at DESC LIMIT ?`, job.Pair, job.Direction, *limit) } else { rows, err = database.Query(baseQuery+` ORDER BY i.started_at DESC LIMIT ?`, *limit) } if err != nil { fmt.Fprintf(os.Stderr, "error: query runs: %v\n", err) os.Exit(1) } defer rows.Close() var runs []runRow for rows.Next() { var r runRow var endedAt sql.NullTime var copied, deleted, skipped, errors int if err := rows.Scan(&r.ID, &r.JobName, &r.StartedAt, &endedAt, &copied, &deleted, &skipped, &errors, &r.Status); err != nil { fmt.Fprintf(os.Stderr, "error: scan run: %v\n", err) os.Exit(1) } if endedAt.Valid { r.EndedAt = &endedAt.Time r.Duration = formatDuration(endedAt.Time.Sub(r.StartedAt)) } else { r.Duration = "running" } r.Copied = copied r.Deleted = deleted r.Skipped = skipped r.Errors = errors runs = append(runs, r) } if rows.Err() != nil { fmt.Fprintf(os.Stderr, "error: rows: %v\n", rows.Err()) os.Exit(1) } if outputFormat == "json" { enc := json.NewEncoder(os.Stdout) enc.SetIndent("", " ") enc.Encode(runs) return } w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) fmt.Fprintln(w, "RUN ID\tJOB\tSTARTED\tDURATION\tCOPIED\tDELETED\tSKIPPED\tERRORS") for _, r := range runs { fmt.Fprintf(w, "%d\t%s\t%s\t%s\t%d\t%d\t%d\t%d\n", r.ID, r.JobName, r.StartedAt.Format("2006-01-02 15:04"), r.Duration, r.Copied, r.Deleted, r.Skipped, r.Errors) } w.Flush() } func runsShow(args []string) { if len(args) == 0 { fmt.Fprintln(os.Stderr, "error: runs show requires ") os.Exit(1) } runID, err := strconv.ParseInt(args[0], 10, 64) if err != nil { fmt.Fprintf(os.Stderr, "error: invalid run ID %q\n", args[0]) os.Exit(1) } database := mustDB() defer database.Close() type runDetail struct { ID int64 `json:"id"` JobName string `json:"job"` Pair string `json:"pair"` Direction string `json:"direction"` StartedAt time.Time `json:"started_at"` EndedAt *time.Time `json:"ended_at,omitempty"` Duration string `json:"duration"` Status string `json:"status"` DryRun bool `json:"dry_run"` FilesCreated int `json:"files_created"` FilesUpdated int `json:"files_updated"` FilesDeleted int `json:"files_deleted"` FilesSkipped int `json:"files_skipped"` FilesFailed int `json:"files_failed"` TotalBytesTransferred int64 `json:"total_bytes_transferred"` ErrorMessage *string `json:"error_message,omitempty"` } var d runDetail var endedAt sql.NullTime var dryRun int var errMsg sql.NullString err = database.QueryRow(` SELECT i.id, COALESCE(j.name, i.sync_pair), i.sync_pair, i.direction, i.started_at, i.ended_at, i.status, i.dry_run, i.files_created, i.files_updated, i.files_deleted, i.files_skipped, i.files_failed, i.total_bytes_transferred, i.error_message FROM sync_iterations i LEFT JOIN sync_jobs j ON j.pair = i.sync_pair AND j.direction = i.direction WHERE i.id = ?`, runID).Scan( &d.ID, &d.JobName, &d.Pair, &d.Direction, &d.StartedAt, &endedAt, &d.Status, &dryRun, &d.FilesCreated, &d.FilesUpdated, &d.FilesDeleted, &d.FilesSkipped, &d.FilesFailed, &d.TotalBytesTransferred, &errMsg, ) if err == sql.ErrNoRows { fmt.Fprintf(os.Stderr, "error: run %d not found\n", runID) os.Exit(1) } if err != nil { fmt.Fprintf(os.Stderr, "error: query run: %v\n", err) os.Exit(1) } d.DryRun = dryRun == 1 if endedAt.Valid { d.EndedAt = &endedAt.Time d.Duration = formatDuration(endedAt.Time.Sub(d.StartedAt)) } else { d.Duration = "running" } if errMsg.Valid { d.ErrorMessage = &errMsg.String } if outputFormat == "json" { enc := json.NewEncoder(os.Stdout) enc.SetIndent("", " ") enc.Encode(d) return } w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) fmt.Fprintf(w, "Run ID:\t%d\n", d.ID) fmt.Fprintf(w, "Job:\t%s\n", d.JobName) fmt.Fprintf(w, "Pair:\t%s\n", d.Pair) fmt.Fprintf(w, "Direction:\t%s\n", d.Direction) fmt.Fprintf(w, "Status:\t%s\n", d.Status) fmt.Fprintf(w, "Dry Run:\t%s\n", boolStr(d.DryRun)) fmt.Fprintf(w, "Started:\t%s\n", d.StartedAt.Format(time.RFC3339)) if d.EndedAt != nil { fmt.Fprintf(w, "Ended:\t%s\n", d.EndedAt.Format(time.RFC3339)) } fmt.Fprintf(w, "Duration:\t%s\n", d.Duration) fmt.Fprintf(w, "Created:\t%d\n", d.FilesCreated) fmt.Fprintf(w, "Updated:\t%d\n", d.FilesUpdated) fmt.Fprintf(w, "Deleted:\t%d\n", d.FilesDeleted) fmt.Fprintf(w, "Skipped:\t%d\n", d.FilesSkipped) fmt.Fprintf(w, "Failed:\t%d\n", d.FilesFailed) fmt.Fprintf(w, "Bytes Transferred:\t%s\n", formatSize(d.TotalBytesTransferred)) if d.ErrorMessage != nil { fmt.Fprintf(w, "Error:\t%s\n", *d.ErrorMessage) } w.Flush() } // ============================================================ // ops commands // ============================================================ func runOps(sub string, args []string) { switch sub { case "list": opsList(args) default: fmt.Fprintf(os.Stderr, "unknown ops command %q\n", sub) os.Exit(1) } } func opsList(args []string) { if len(args) == 0 { fmt.Fprintln(os.Stderr, "error: ops list requires ") os.Exit(1) } runID, err := strconv.ParseInt(args[0], 10, 64) if err != nil { fmt.Fprintf(os.Stderr, "error: invalid run ID %q\n", args[0]) os.Exit(1) } f := flag.NewFlagSet("ops list", flag.ExitOnError) limit := f.Int("limit", 100, "max results") action := f.String("action", "", "filter by action: copy, delete, skip, error") f.Parse(args[1:]) database := mustDB() defer database.Close() query := ` SELECT operation, filepath, COALESCE(src_host,''), COALESCE(dest_host,''), COALESCE(size_before, size_after, 0), COALESCE(owner,'') FROM sync_operations WHERE iteration_id = ?` qArgs := []interface{}{runID} switch *action { case "copy": query += " AND (operation = 'create' OR operation = 'update')" case "delete": query += " AND operation = 'delete'" case "skip": query += " AND status = 'skip'" case "error": query += " AND status = 'fail'" case "": // no filter default: fmt.Fprintf(os.Stderr, "error: unknown action %q (use copy, delete, skip, error)\n", *action) os.Exit(1) } query += " ORDER BY started_at ASC LIMIT ?" qArgs = append(qArgs, *limit) rows, err := database.Query(query, qArgs...) if err != nil { fmt.Fprintf(os.Stderr, "error: query ops: %v\n", err) os.Exit(1) } defer rows.Close() type opRow struct { Action string `json:"action"` Path string `json:"path"` FromTo string `json:"from_to"` Size string `json:"size"` Owner string `json:"owner"` } var ops []opRow for rows.Next() { var op opRow var operation, srcHost, destHost, owner string var size int64 if err := rows.Scan(&operation, &op.Path, &srcHost, &destHost, &size, &owner); err != nil { fmt.Fprintf(os.Stderr, "error: scan op: %v\n", err) os.Exit(1) } switch operation { case "create", "update": op.Action = "copy" default: op.Action = operation } if srcHost != "" && destHost != "" { op.FromTo = srcHost + "→" + destHost } else { op.FromTo = "-" } op.Size = formatSize(size) op.Owner = owner ops = append(ops, op) } if rows.Err() != nil { fmt.Fprintf(os.Stderr, "error: rows: %v\n", rows.Err()) os.Exit(1) } if outputFormat == "json" { enc := json.NewEncoder(os.Stdout) enc.SetIndent("", " ") enc.Encode(ops) return } w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) fmt.Fprintln(w, "ACTION\tPATH\tFROM→TO\tSIZE\tOWNER") for _, op := range ops { fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n", op.Action, op.Path, op.FromTo, op.Size, op.Owner) } w.Flush() }