homelab/services/ha-sync/internal/opslog/flusher.go
Dan V deb6c38d7b chore: commit homelab setup — deployment, services, orchestration, skill
- Add .gitignore: exclude compiled binaries, build artifacts, and Helm
  values files containing real secrets (authentik, prometheus)
- Add all Kubernetes deployment manifests (deployment/)
- Add services source code: ha-sync, device-inventory, games-console,
  paperclip, parts-inventory
- Add Ansible orchestration: playbooks, roles, inventory, cloud-init
- Add hardware specs, execution plans, scripts, HOMELAB.md
- Add skills/homelab/SKILL.md + skills/install.sh to preserve Copilot skill
- Remove previously-tracked inventory-cli binary from git index

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-09 08:10:32 +02:00

108 lines
2.8 KiB
Go

package opslog
import (
"bufio"
"database/sql"
"encoding/json"
"fmt"
"os"
"path/filepath"
"time"
"github.com/vandachevici/ha-sync/internal/db"
)
// FlushAll decodes every *.jsonl file in logDir, groups entries by
// iteration_id, and inserts each group via db.BulkInsertOperations.
// Successfully flushed files are deleted; files that fail are left in place.
// The last error encountered is returned (nil if all files succeeded).
func FlushAll(logDir string, sqlDB *sql.DB) error {
files, err := filepath.Glob(filepath.Join(logDir, "*.jsonl"))
if err != nil {
return fmt.Errorf("opslog: glob: %w", err)
}
var lastErr error
for _, path := range files {
if err := flushFile(path, sqlDB); err != nil {
lastErr = err
continue
}
_ = os.Remove(path)
}
return lastErr
}
// iterGroup holds the entries that share the same iteration_id, all of which
// must also share the same dry_run flag (set when the iteration was started).
type iterGroup struct {
dryRun bool
entries []LogEntry
}
func flushFile(path string, sqlDB *sql.DB) error {
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("opslog: open %s: %w", path, err)
}
defer f.Close()
groups := make(map[int64]*iterGroup)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
var entry LogEntry
if err := json.Unmarshal(scanner.Bytes(), &entry); err != nil {
return fmt.Errorf("opslog: decode %s: %w", path, err)
}
g, ok := groups[entry.IterationID]
if !ok {
g = &iterGroup{dryRun: entry.DryRun}
groups[entry.IterationID] = g
}
g.entries = append(g.entries, entry)
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("opslog: scan %s: %w", path, err)
}
for iterID, g := range groups {
ops := make([]db.OpRecord, len(g.entries))
for i, e := range g.entries {
ops[i] = db.OpRecord{
Operation: e.Operation,
Filepath: e.Filepath,
SizeBefore: e.SizeBefore,
SizeAfter: e.SizeAfter,
MD5Before: e.MD5Before,
MD5After: e.MD5After,
StartedAt: e.StartedAt,
EndedAt: e.EndedAt,
Status: e.Status,
ErrorMessage: e.ErrorMessage,
}
}
if err := db.BulkInsertOperations(sqlDB, iterID, g.dryRun, ops); err != nil {
return fmt.Errorf("opslog: insert iteration %d from %s: %w", iterID, path, err)
}
}
return nil
}
// CleanOld removes *.jsonl files in logDir whose modification time is older
// than retainDays days. Files that cannot be stat'd are skipped silently.
func CleanOld(logDir string, retainDays int) error {
files, err := filepath.Glob(filepath.Join(logDir, "*.jsonl"))
if err != nil {
return fmt.Errorf("opslog: glob: %w", err)
}
cutoff := time.Now().AddDate(0, 0, -retainDays)
for _, path := range files {
info, err := os.Stat(path)
if err != nil {
continue
}
if info.ModTime().Before(cutoff) {
_ = os.Remove(path)
}
}
return nil
}