package sync import ( "context" "errors" "fmt" "io/fs" "os" "path/filepath" gosync "sync" "time" "github.com/vandachevici/ha-sync/internal/opslog" ) // Summary accumulates counters for a single sync pass. type Summary struct { Created int Updated int Deleted int Skipped int Failed int Bytes int64 } // Options configures a sync pass. type Options struct { Src string Dest string DryRun bool DeleteMissing bool Workers int MtimeThreshold time.Duration Verbose bool Excludes []string // glob patterns for files/dirs to skip (matched against base name and rel path) } type opKind int const ( opCreate opKind = iota opUpdate opDelete ) func (k opKind) String() string { switch k { case opCreate: return "create" case opUpdate: return "update" case opDelete: return "delete" default: return "unknown" } } type fileOp struct { kind opKind relPath string srcPath string // empty for delete ops destPath string srcInfo FileInfo // zero for delete ops } type opResult struct { op fileOp bytes int64 err error } // Run executes one sync pass from Options.Src to Options.Dest. // // For every file in src: // - absent in dest → create // - present but stale → update // - present and fresh → skip // // If DeleteMissing is set, files present in dest but absent in src are deleted. // When DryRun is true the operation list is built and counted but no filesystem // changes are made. Operations are logged to writer if writer is non-nil. // A worker pool of size Options.Workers processes copy/delete operations // concurrently. Individual operation failures are tallied in Summary.Failed // but do not abort the overall pass. func Run(ctx context.Context, opts Options, iterID int64, writer *opslog.Writer) (Summary, error) { srcFiles, err := Walk(opts.Src, opts.Excludes) if err != nil { return Summary{}, fmt.Errorf("walking src: %w", err) } destFiles, err := Walk(opts.Dest, opts.Excludes) if err != nil && !errors.Is(err, fs.ErrNotExist) && !os.IsNotExist(err) { return Summary{}, fmt.Errorf("walking dest: %w", err) } destMap := make(map[string]FileInfo, len(destFiles)) for _, f := range destFiles { destMap[f.RelPath] = f } srcFileSet := make(map[string]struct{}, len(srcFiles)) for _, f := range srcFiles { if !f.IsDir { srcFileSet[f.RelPath] = struct{}{} } } var ops []fileOp var summary Summary for _, sf := range srcFiles { if sf.IsDir { continue } destPath := filepath.Join(opts.Dest, sf.RelPath) df, exists := destMap[sf.RelPath] switch { case !exists: ops = append(ops, fileOp{ kind: opCreate, relPath: sf.RelPath, srcPath: sf.AbsPath, destPath: destPath, srcInfo: sf, }) case NeedsSync(sf, df, opts.MtimeThreshold): ops = append(ops, fileOp{ kind: opUpdate, relPath: sf.RelPath, srcPath: sf.AbsPath, destPath: destPath, srcInfo: sf, }) default: summary.Skipped++ logOp(writer, opslog.OpRecord{IterID: iterID, RelPath: sf.RelPath, Action: "skip", Owner: sf.Owner}) } } if opts.DeleteMissing { for _, df := range destFiles { if df.IsDir { continue } if _, inSrc := srcFileSet[df.RelPath]; !inSrc { ops = append(ops, fileOp{ kind: opDelete, relPath: df.RelPath, destPath: df.AbsPath, }) } } } // In dry-run mode count the pending operations without touching the FS. if opts.DryRun { for _, op := range ops { switch op.kind { case opCreate: summary.Created++ case opUpdate: summary.Updated++ case opDelete: summary.Deleted++ } logOp(writer, opslog.OpRecord{IterID: iterID, RelPath: op.relPath, Action: op.kind.String(), Owner: op.srcInfo.Owner}) } return summary, nil } if len(ops) == 0 { return summary, nil } workers := opts.Workers if workers < 1 { workers = 1 } jobs := make(chan fileOp, len(ops)) results := make(chan opResult, len(ops)) var wg gosync.WaitGroup for i := 0; i < workers; i++ { wg.Add(1) go func() { defer wg.Done() for { select { case <-ctx.Done(): return case op, ok := <-jobs: if !ok { return } results <- executeOp(ctx, op) } } }() } for _, op := range ops { jobs <- op } close(jobs) go func() { wg.Wait() close(results) }() for res := range results { action := res.op.kind.String() rec := opslog.OpRecord{IterID: iterID, RelPath: res.op.relPath, Action: action, Bytes: res.bytes, Owner: res.op.srcInfo.Owner} if res.err != nil { // Keep the original action ("create"/"update"/"delete") so the DB enum is satisfied; // the error is captured in ErrMsg and status becomes "fail" in convertToDB. rec.ErrMsg = res.err.Error() summary.Failed++ } else { switch res.op.kind { case opCreate: summary.Created++ case opUpdate: summary.Updated++ case opDelete: summary.Deleted++ } summary.Bytes += res.bytes } logOp(writer, rec) } return summary, nil } // executeOp performs a single copy or delete and returns the result. func executeOp(ctx context.Context, op fileOp) opResult { if ctx.Err() != nil { return opResult{op: op, err: ctx.Err()} } switch op.kind { case opCreate, opUpdate: if err := CopyFile(op.srcPath, op.destPath, op.srcInfo.ModTime); err != nil { return opResult{op: op, err: err} } return opResult{op: op, bytes: op.srcInfo.Size} case opDelete: return opResult{op: op, err: DeleteFile(op.destPath)} default: return opResult{op: op, err: fmt.Errorf("unknown op kind %d", op.kind)} } } // logOp writes rec to writer if writer is non-nil; errors are silently dropped. func logOp(writer *opslog.Writer, rec opslog.OpRecord) { if writer == nil { return } _ = writer.WriteOp(rec) }