homelab/services/ha-sync/internal/sync/engine.go
Dan V deb6c38d7b chore: commit homelab setup — deployment, services, orchestration, skill
- Add .gitignore: exclude compiled binaries, build artifacts, and Helm
  values files containing real secrets (authentik, prometheus)
- Add all Kubernetes deployment manifests (deployment/)
- Add services source code: ha-sync, device-inventory, games-console,
  paperclip, parts-inventory
- Add Ansible orchestration: playbooks, roles, inventory, cloud-init
- Add hardware specs, execution plans, scripts, HOMELAB.md
- Add skills/homelab/SKILL.md + skills/install.sh to preserve Copilot skill
- Remove previously-tracked inventory-cli binary from git index

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-09 08:10:32 +02:00

254 lines
5.7 KiB
Go

package sync
import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
gosync "sync"
"time"
"github.com/vandachevici/ha-sync/internal/opslog"
)
// Summary accumulates counters for a single sync pass.
type Summary struct {
Created int
Updated int
Deleted int
Skipped int
Failed int
Bytes int64
}
// Options configures a sync pass.
type Options struct {
Src string
Dest string
DryRun bool
DeleteMissing bool
Workers int
MtimeThreshold time.Duration
Verbose bool
Excludes []string // glob patterns for files/dirs to skip (matched against base name and rel path)
}
type opKind int
const (
opCreate opKind = iota
opUpdate
opDelete
)
func (k opKind) String() string {
switch k {
case opCreate:
return "create"
case opUpdate:
return "update"
case opDelete:
return "delete"
default:
return "unknown"
}
}
type fileOp struct {
kind opKind
relPath string
srcPath string // empty for delete ops
destPath string
srcInfo FileInfo // zero for delete ops
}
type opResult struct {
op fileOp
bytes int64
err error
}
// Run executes one sync pass from Options.Src to Options.Dest.
//
// For every file in src:
// - absent in dest → create
// - present but stale → update
// - present and fresh → skip
//
// If DeleteMissing is set, files present in dest but absent in src are deleted.
// When DryRun is true the operation list is built and counted but no filesystem
// changes are made. Operations are logged to writer if writer is non-nil.
// A worker pool of size Options.Workers processes copy/delete operations
// concurrently. Individual operation failures are tallied in Summary.Failed
// but do not abort the overall pass.
func Run(ctx context.Context, opts Options, iterID int64, writer *opslog.Writer) (Summary, error) {
srcFiles, err := Walk(opts.Src, opts.Excludes)
if err != nil {
return Summary{}, fmt.Errorf("walking src: %w", err)
}
destFiles, err := Walk(opts.Dest, opts.Excludes)
if err != nil && !errors.Is(err, fs.ErrNotExist) && !os.IsNotExist(err) {
return Summary{}, fmt.Errorf("walking dest: %w", err)
}
destMap := make(map[string]FileInfo, len(destFiles))
for _, f := range destFiles {
destMap[f.RelPath] = f
}
srcFileSet := make(map[string]struct{}, len(srcFiles))
for _, f := range srcFiles {
if !f.IsDir {
srcFileSet[f.RelPath] = struct{}{}
}
}
var ops []fileOp
var summary Summary
for _, sf := range srcFiles {
if sf.IsDir {
continue
}
destPath := filepath.Join(opts.Dest, sf.RelPath)
df, exists := destMap[sf.RelPath]
switch {
case !exists:
ops = append(ops, fileOp{
kind: opCreate, relPath: sf.RelPath,
srcPath: sf.AbsPath, destPath: destPath, srcInfo: sf,
})
case NeedsSync(sf, df, opts.MtimeThreshold):
ops = append(ops, fileOp{
kind: opUpdate, relPath: sf.RelPath,
srcPath: sf.AbsPath, destPath: destPath, srcInfo: sf,
})
default:
summary.Skipped++
logOp(writer, opslog.OpRecord{IterID: iterID, RelPath: sf.RelPath, Action: "skip", Owner: sf.Owner})
}
}
if opts.DeleteMissing {
for _, df := range destFiles {
if df.IsDir {
continue
}
if _, inSrc := srcFileSet[df.RelPath]; !inSrc {
ops = append(ops, fileOp{
kind: opDelete, relPath: df.RelPath, destPath: df.AbsPath,
})
}
}
}
// In dry-run mode count the pending operations without touching the FS.
if opts.DryRun {
for _, op := range ops {
switch op.kind {
case opCreate:
summary.Created++
case opUpdate:
summary.Updated++
case opDelete:
summary.Deleted++
}
logOp(writer, opslog.OpRecord{IterID: iterID, RelPath: op.relPath, Action: op.kind.String(), Owner: op.srcInfo.Owner})
}
return summary, nil
}
if len(ops) == 0 {
return summary, nil
}
workers := opts.Workers
if workers < 1 {
workers = 1
}
jobs := make(chan fileOp, len(ops))
results := make(chan opResult, len(ops))
var wg gosync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case op, ok := <-jobs:
if !ok {
return
}
results <- executeOp(ctx, op)
}
}
}()
}
for _, op := range ops {
jobs <- op
}
close(jobs)
go func() {
wg.Wait()
close(results)
}()
for res := range results {
action := res.op.kind.String()
rec := opslog.OpRecord{IterID: iterID, RelPath: res.op.relPath, Action: action, Bytes: res.bytes, Owner: res.op.srcInfo.Owner}
if res.err != nil {
// Keep the original action ("create"/"update"/"delete") so the DB enum is satisfied;
// the error is captured in ErrMsg and status becomes "fail" in convertToDB.
rec.ErrMsg = res.err.Error()
summary.Failed++
} else {
switch res.op.kind {
case opCreate:
summary.Created++
case opUpdate:
summary.Updated++
case opDelete:
summary.Deleted++
}
summary.Bytes += res.bytes
}
logOp(writer, rec)
}
return summary, nil
}
// executeOp performs a single copy or delete and returns the result.
func executeOp(ctx context.Context, op fileOp) opResult {
if ctx.Err() != nil {
return opResult{op: op, err: ctx.Err()}
}
switch op.kind {
case opCreate, opUpdate:
if err := CopyFile(op.srcPath, op.destPath, op.srcInfo.ModTime); err != nil {
return opResult{op: op, err: err}
}
return opResult{op: op, bytes: op.srcInfo.Size}
case opDelete:
return opResult{op: op, err: DeleteFile(op.destPath)}
default:
return opResult{op: op, err: fmt.Errorf("unknown op kind %d", op.kind)}
}
}
// logOp writes rec to writer if writer is non-nil; errors are silently dropped.
func logOp(writer *opslog.Writer, rec opslog.OpRecord) {
if writer == nil {
return
}
_ = writer.WriteOp(rec)
}