homelab/services/ha-sync/internal/ui/handler.go
Dan V deb6c38d7b chore: commit homelab setup — deployment, services, orchestration, skill
- Add .gitignore: exclude compiled binaries, build artifacts, and Helm
  values files containing real secrets (authentik, prometheus)
- Add all Kubernetes deployment manifests (deployment/)
- Add services source code: ha-sync, device-inventory, games-console,
  paperclip, parts-inventory
- Add Ansible orchestration: playbooks, roles, inventory, cloud-init
- Add hardware specs, execution plans, scripts, HOMELAB.md
- Add skills/homelab/SKILL.md + skills/install.sh to preserve Copilot skill
- Remove previously-tracked inventory-cli binary from git index

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-09 08:10:32 +02:00

697 lines
19 KiB
Go

package ui
import (
"database/sql"
"embed"
"encoding/json"
"html/template"
"log"
"net/http"
"os"
"strconv"
"time"
_ "github.com/go-sql-driver/mysql"
appdb "github.com/vandachevici/ha-sync/internal/db"
"github.com/vandachevici/ha-sync/internal/kube"
"k8s.io/client-go/kubernetes"
)
//go:embed templates/index.html templates/about.html
var templateFS embed.FS
var indexTmpl = template.Must(template.ParseFS(templateFS, "templates/index.html"))
var aboutTmpl = template.Must(template.ParseFS(templateFS, "templates/about.html"))
// Server holds the database connection and serves the dashboard.
type Server struct {
db *sql.DB
kubeClient kubernetes.Interface
namespace string
}
func NewServer(sqlDB *sql.DB) *Server {
ns := os.Getenv("HA_SYNC_NAMESPACE")
if ns == "" {
ns = "ha-sync"
}
s := &Server{db: sqlDB, namespace: ns}
client, err := kube.NewClient()
if err != nil {
log.Printf("ha-sync-ui: kube unavailable, kube features disabled: %v", err)
} else {
s.kubeClient = client
}
return s
}
// Handler wires all routes and returns the mux.
func (s *Server) Handler() http.Handler {
mux := http.NewServeMux()
// Existing routes (preserved).
mux.HandleFunc("GET /", s.handleIndex)
mux.HandleFunc("GET /about", s.handleAbout)
mux.HandleFunc("GET /health", s.handleHealth)
mux.HandleFunc("GET /api/pairs", s.handlePairs)
mux.HandleFunc("GET /api/iterations", s.handleIterations)
mux.HandleFunc("GET /api/operations", s.handleOperations)
// Job management API.
mux.HandleFunc("GET /api/jobs", s.handleListJobs)
mux.HandleFunc("POST /api/jobs", s.handleCreateJob)
mux.HandleFunc("GET /api/jobs/{id}", s.handleGetJob)
mux.HandleFunc("PUT /api/jobs/{id}", s.handleUpdateJob)
mux.HandleFunc("DELETE /api/jobs/{id}", s.handleDeleteJob)
mux.HandleFunc("POST /api/jobs/{id}/enable", s.handleEnableJob)
mux.HandleFunc("POST /api/jobs/{id}/disable", s.handleDisableJob)
mux.HandleFunc("POST /api/jobs/{id}/trigger", s.handleTriggerJob)
mux.HandleFunc("GET /api/jobs/{id}/lock", s.handleGetLockStatus)
// Run operations.
mux.HandleFunc("GET /api/runs/{runID}/ops", s.handleListOps)
return mux
}
// --- helpers ---
// jobDetailResp combines a job with its recent iterations for the GET /api/jobs/{id} response.
type jobDetailResp struct {
*appdb.Job
Iterations []*appdb.Iteration `json:"iterations"`
}
func parseJobID(r *http.Request) (int64, bool) {
id, err := strconv.ParseInt(r.PathValue("id"), 10, 64)
return id, err == nil
}
func jobToSpec(j *appdb.Job) kube.JobSpec {
return kube.JobSpec{
ID: j.ID,
Name: j.Name,
Pair: j.Pair,
Direction: j.Direction,
Src: j.Src,
Dest: j.Dest,
SrcHost: j.SrcHost,
DestHost: j.DestHost,
CronSchedule: j.CronSchedule,
LockTTLSeconds: j.LockTTLSeconds,
DryRun: j.DryRun,
DeleteMissing: j.DeleteMissing,
Workers: j.Workers,
MtimeThreshold: j.MtimeThreshold,
Excludes: j.Excludes,
Enabled: j.Enabled,
}
}
// --- new job API handlers ---
func (s *Server) handleListJobs(w http.ResponseWriter, r *http.Request) {
jobs, err := appdb.ListJobsWithStats(s.db)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
if jobs == nil {
jobs = []*appdb.JobWithStats{}
}
writeJSON(w, jobs)
}
func (s *Server) handleCreateJob(w http.ResponseWriter, r *http.Request) {
var input appdb.JobInput
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
writeError(w, http.StatusBadRequest, "invalid JSON: "+err.Error())
return
}
job, err := appdb.CreateJob(s.db, input)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
if s.kubeClient != nil {
if err := kube.ApplyCronJob(r.Context(), s.kubeClient, s.namespace, jobToSpec(job)); err != nil {
log.Printf("ha-sync-ui: apply cronjob for %s: %v", job.Name, err)
}
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(job)
}
func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request) {
id, ok := parseJobID(r)
if !ok {
writeError(w, http.StatusBadRequest, "invalid id")
return
}
job, err := appdb.GetJobByID(s.db, id)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
if job == nil {
writeError(w, http.StatusNotFound, "job not found")
return
}
iters, err := appdb.ListIterations(s.db, job.Pair, job.Direction, 50)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
if iters == nil {
iters = []*appdb.Iteration{}
}
writeJSON(w, jobDetailResp{Job: job, Iterations: iters})
}
func (s *Server) handleUpdateJob(w http.ResponseWriter, r *http.Request) {
id, ok := parseJobID(r)
if !ok {
writeError(w, http.StatusBadRequest, "invalid id")
return
}
var input appdb.JobInput
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
writeError(w, http.StatusBadRequest, "invalid JSON: "+err.Error())
return
}
job, err := appdb.UpdateJob(s.db, id, input)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
if job == nil {
writeError(w, http.StatusNotFound, "job not found")
return
}
if s.kubeClient != nil {
if err := kube.ApplyCronJob(r.Context(), s.kubeClient, s.namespace, jobToSpec(job)); err != nil {
log.Printf("ha-sync-ui: re-apply cronjob for %s: %v", job.Name, err)
}
}
writeJSON(w, job)
}
func (s *Server) handleDeleteJob(w http.ResponseWriter, r *http.Request) {
id, ok := parseJobID(r)
if !ok {
writeError(w, http.StatusBadRequest, "invalid id")
return
}
job, err := appdb.GetJobByID(s.db, id)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
if job == nil {
writeError(w, http.StatusNotFound, "job not found")
return
}
if s.kubeClient != nil {
if err := kube.DeleteCronJob(r.Context(), s.kubeClient, s.namespace, job.Name); err != nil {
log.Printf("ha-sync-ui: delete cronjob for %s: %v", job.Name, err)
}
}
if err := appdb.DeleteJob(s.db, id); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
w.WriteHeader(http.StatusNoContent)
}
func (s *Server) handleEnableJob(w http.ResponseWriter, r *http.Request) {
id, ok := parseJobID(r)
if !ok {
writeError(w, http.StatusBadRequest, "invalid id")
return
}
if err := appdb.SetJobEnabled(s.db, id, true); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
job, _ := appdb.GetJobByID(s.db, id)
if s.kubeClient != nil && job != nil {
if err := kube.SuspendCronJob(r.Context(), s.kubeClient, s.namespace, job.Name, false); err != nil {
log.Printf("ha-sync-ui: unsuspend cronjob for %s: %v", job.Name, err)
}
}
writeJSON(w, map[string]bool{"ok": true})
}
func (s *Server) handleDisableJob(w http.ResponseWriter, r *http.Request) {
id, ok := parseJobID(r)
if !ok {
writeError(w, http.StatusBadRequest, "invalid id")
return
}
if err := appdb.SetJobEnabled(s.db, id, false); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
job, _ := appdb.GetJobByID(s.db, id)
if s.kubeClient != nil && job != nil {
if err := kube.SuspendCronJob(r.Context(), s.kubeClient, s.namespace, job.Name, true); err != nil {
log.Printf("ha-sync-ui: suspend cronjob for %s: %v", job.Name, err)
}
}
writeJSON(w, map[string]bool{"ok": true})
}
func (s *Server) handleTriggerJob(w http.ResponseWriter, r *http.Request) {
id, ok := parseJobID(r)
if !ok {
writeError(w, http.StatusBadRequest, "invalid id")
return
}
if s.kubeClient == nil {
writeError(w, http.StatusServiceUnavailable, "kube client unavailable")
return
}
job, err := appdb.GetJobByID(s.db, id)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
if job == nil {
writeError(w, http.StatusNotFound, "job not found")
return
}
jobName, err := kube.TriggerJob(r.Context(), s.kubeClient, s.namespace, job.Name)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, map[string]string{"job_name": jobName})
}
func (s *Server) handleGetLockStatus(w http.ResponseWriter, r *http.Request) {
id, ok := parseJobID(r)
if !ok {
writeError(w, http.StatusBadRequest, "invalid id")
return
}
if s.kubeClient == nil {
writeError(w, http.StatusServiceUnavailable, "kube client unavailable")
return
}
job, err := appdb.GetJobByID(s.db, id)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
if job == nil {
writeError(w, http.StatusNotFound, "job not found")
return
}
status, err := kube.GetLockStatus(r.Context(), s.kubeClient, s.namespace, job.Name)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, status)
}
func (s *Server) handleListOps(w http.ResponseWriter, r *http.Request) {
runID, err := strconv.ParseInt(r.PathValue("runID"), 10, 64)
if err != nil {
writeError(w, http.StatusBadRequest, "invalid run id")
return
}
q := r.URL.Query()
limit := 200
offset := 0
if l := q.Get("limit"); l != "" {
if n, e := strconv.Atoi(l); e == nil && n > 0 {
limit = n
}
}
if o := q.Get("offset"); o != "" {
if n, e := strconv.Atoi(o); e == nil && n >= 0 {
offset = n
}
}
ops, total, err := appdb.ListOperations(s.db, runID, limit, offset)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
if ops == nil {
ops = []*appdb.OperationRow{}
}
writeJSON(w, map[string]any{
"total": total,
"operations": ops,
})
}
// --- response types (legacy endpoints) ---
type Iteration struct {
ID int64 `json:"id"`
SyncPair string `json:"sync_pair"`
Direction string `json:"direction"`
Status string `json:"status"`
DryRun bool `json:"dry_run"`
StartedAt time.Time `json:"started_at"`
EndedAt *time.Time `json:"ended_at"`
FilesCreated int `json:"files_created"`
FilesUpdated int `json:"files_updated"`
FilesDeleted int `json:"files_deleted"`
FilesSkipped int `json:"files_skipped"`
FilesFailed int `json:"files_failed"`
TotalBytesTransferred int64 `json:"total_bytes_transferred"`
ErrorMessage *string `json:"error_message,omitempty"`
}
type PairSummary struct {
Pair string `json:"pair"`
LastRealSync *Iteration `json:"last_real_sync"`
LastDryRun *Iteration `json:"last_dry_run"`
}
type Operation struct {
ID int64 `json:"id"`
IterationID int64 `json:"iteration_id"`
DryRun bool `json:"dry_run"`
Operation string `json:"operation"`
Filepath string `json:"filepath"`
SizeBefore *int64 `json:"size_before,omitempty"`
SizeAfter *int64 `json:"size_after,omitempty"`
Md5Before *string `json:"md5_before,omitempty"`
Md5After *string `json:"md5_after,omitempty"`
StartedAt time.Time `json:"started_at"`
EndedAt *time.Time `json:"ended_at,omitempty"`
Status string `json:"status"`
ErrorMessage *string `json:"error_message,omitempty"`
}
// --- handlers ---
func (s *Server) handleAbout(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
if err := aboutTmpl.Execute(w, nil); err != nil {
http.Error(w, "template error", http.StatusInternalServerError)
}
}
func (s *Server) handleIndex(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.NotFound(w, r)
return
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
if err := indexTmpl.Execute(w, nil); err != nil {
http.Error(w, "template error", http.StatusInternalServerError)
}
}
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
writeJSON(w, map[string]string{"status": "ok"})
}
func (s *Server) handlePairs(w http.ResponseWriter, r *http.Request) {
rows, err := s.db.QueryContext(r.Context(),
`SELECT DISTINCT sync_pair FROM sync_iterations ORDER BY sync_pair`)
if err != nil {
httpError(w, err)
return
}
defer rows.Close()
var pairs []string
for rows.Next() {
var p string
if err := rows.Scan(&p); err != nil {
httpError(w, err)
return
}
pairs = append(pairs, p)
}
if err := rows.Err(); err != nil {
httpError(w, err)
return
}
summaries := make([]PairSummary, 0, len(pairs))
for _, pair := range pairs {
real, err := s.latestIteration(r, pair, false)
if err != nil {
httpError(w, err)
return
}
dry, err := s.latestIteration(r, pair, true)
if err != nil {
httpError(w, err)
return
}
summaries = append(summaries, PairSummary{
Pair: pair,
LastRealSync: real,
LastDryRun: dry,
})
}
writeJSON(w, summaries)
}
func (s *Server) latestIteration(r *http.Request, pair string, dryRun bool) (*Iteration, error) {
dryVal := 0
if dryRun {
dryVal = 1
}
row := s.db.QueryRowContext(r.Context(), `
SELECT id, sync_pair, direction, status, dry_run,
started_at, ended_at,
files_created, files_updated, files_deleted, files_skipped, files_failed,
total_bytes_transferred, error_message
FROM sync_iterations
WHERE sync_pair = ? AND dry_run = ?
ORDER BY started_at DESC
LIMIT 1`, pair, dryVal)
return scanIteration(row)
}
func (s *Server) handleIterations(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
pair := q.Get("pair")
limit := 20
if l := q.Get("limit"); l != "" {
if n, err := strconv.Atoi(l); err == nil && n > 0 {
limit = n
}
}
var (
rows *sql.Rows
err error
)
// dry_run filter (optional)
dryParam := q.Get("dry_run")
switch {
case pair != "" && dryParam != "":
dryVal, _ := strconv.Atoi(dryParam)
rows, err = s.db.QueryContext(r.Context(), `
SELECT id, sync_pair, direction, status, dry_run,
started_at, ended_at,
files_created, files_updated, files_deleted, files_skipped, files_failed,
total_bytes_transferred, error_message
FROM sync_iterations
WHERE sync_pair = ? AND dry_run = ?
ORDER BY started_at DESC
LIMIT ?`, pair, dryVal, limit)
case pair != "":
rows, err = s.db.QueryContext(r.Context(), `
SELECT id, sync_pair, direction, status, dry_run,
started_at, ended_at,
files_created, files_updated, files_deleted, files_skipped, files_failed,
total_bytes_transferred, error_message
FROM sync_iterations
WHERE sync_pair = ?
ORDER BY started_at DESC
LIMIT ?`, pair, limit)
case dryParam != "":
dryVal, _ := strconv.Atoi(dryParam)
rows, err = s.db.QueryContext(r.Context(), `
SELECT id, sync_pair, direction, status, dry_run,
started_at, ended_at,
files_created, files_updated, files_deleted, files_skipped, files_failed,
total_bytes_transferred, error_message
FROM sync_iterations
WHERE dry_run = ?
ORDER BY started_at DESC
LIMIT ?`, dryVal, limit)
default:
rows, err = s.db.QueryContext(r.Context(), `
SELECT id, sync_pair, direction, status, dry_run,
started_at, ended_at,
files_created, files_updated, files_deleted, files_skipped, files_failed,
total_bytes_transferred, error_message
FROM sync_iterations
ORDER BY started_at DESC
LIMIT ?`, limit)
}
if err != nil {
httpError(w, err)
return
}
defer rows.Close()
iterations := make([]*Iteration, 0)
for rows.Next() {
it, err := scanIterationRow(rows)
if err != nil {
httpError(w, err)
return
}
iterations = append(iterations, it)
}
if err := rows.Err(); err != nil {
httpError(w, err)
return
}
writeJSON(w, iterations)
}
func (s *Server) handleOperations(w http.ResponseWriter, r *http.Request) {
idStr := r.URL.Query().Get("iteration_id")
if idStr == "" {
http.Error(w, "iteration_id required", http.StatusBadRequest)
return
}
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
http.Error(w, "invalid iteration_id", http.StatusBadRequest)
return
}
rows, err := s.db.QueryContext(r.Context(), `
SELECT id, iteration_id, dry_run, operation, filepath,
size_before, size_after, md5_before, md5_after,
started_at, ended_at, status, error_message
FROM sync_operations
WHERE iteration_id = ?
ORDER BY started_at ASC`, id)
if err != nil {
httpError(w, err)
return
}
defer rows.Close()
ops := make([]*Operation, 0)
for rows.Next() {
var op Operation
var dryRun int
var endedAt sql.NullTime
var sizeBefore, sizeAfter sql.NullInt64
var md5Before, md5After, errMsg sql.NullString
if err := rows.Scan(
&op.ID, &op.IterationID, &dryRun, &op.Operation, &op.Filepath,
&sizeBefore, &sizeAfter, &md5Before, &md5After,
&op.StartedAt, &endedAt, &op.Status, &errMsg,
); err != nil {
httpError(w, err)
return
}
op.DryRun = dryRun == 1
if endedAt.Valid {
op.EndedAt = &endedAt.Time
}
if sizeBefore.Valid {
op.SizeBefore = &sizeBefore.Int64
}
if sizeAfter.Valid {
op.SizeAfter = &sizeAfter.Int64
}
if md5Before.Valid {
op.Md5Before = &md5Before.String
}
if md5After.Valid {
op.Md5After = &md5After.String
}
if errMsg.Valid {
op.ErrorMessage = &errMsg.String
}
ops = append(ops, &op)
}
if err := rows.Err(); err != nil {
httpError(w, err)
return
}
writeJSON(w, ops)
}
// --- helpers ---
func scanIteration(row *sql.Row) (*Iteration, error) {
var it Iteration
var dryRun int
var endedAt sql.NullTime
var errMsg sql.NullString
err := row.Scan(
&it.ID, &it.SyncPair, &it.Direction, &it.Status, &dryRun,
&it.StartedAt, &endedAt,
&it.FilesCreated, &it.FilesUpdated, &it.FilesDeleted, &it.FilesSkipped, &it.FilesFailed,
&it.TotalBytesTransferred, &errMsg,
)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
it.DryRun = dryRun == 1
if endedAt.Valid {
it.EndedAt = &endedAt.Time
}
if errMsg.Valid {
it.ErrorMessage = &errMsg.String
}
return &it, nil
}
func scanIterationRow(rows *sql.Rows) (*Iteration, error) {
var it Iteration
var dryRun int
var endedAt sql.NullTime
var errMsg sql.NullString
err := rows.Scan(
&it.ID, &it.SyncPair, &it.Direction, &it.Status, &dryRun,
&it.StartedAt, &endedAt,
&it.FilesCreated, &it.FilesUpdated, &it.FilesDeleted, &it.FilesSkipped, &it.FilesFailed,
&it.TotalBytesTransferred, &errMsg,
)
if err != nil {
return nil, err
}
it.DryRun = dryRun == 1
if endedAt.Valid {
it.EndedAt = &endedAt.Time
}
if errMsg.Valid {
it.ErrorMessage = &errMsg.String
}
return &it, nil
}
func writeJSON(w http.ResponseWriter, v any) {
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(v); err != nil {
http.Error(w, "encode error", http.StatusInternalServerError)
}
}
func writeError(w http.ResponseWriter, status int, msg string) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
json.NewEncoder(w).Encode(map[string]string{"error": msg})
}
func httpError(w http.ResponseWriter, err error) {
http.Error(w, err.Error(), http.StatusInternalServerError)
}