homelab/services/ha-sync/internal/db/logging.go
Dan V deb6c38d7b chore: commit homelab setup — deployment, services, orchestration, skill
- Add .gitignore: exclude compiled binaries, build artifacts, and Helm
  values files containing real secrets (authentik, prometheus)
- Add all Kubernetes deployment manifests (deployment/)
- Add services source code: ha-sync, device-inventory, games-console,
  paperclip, parts-inventory
- Add Ansible orchestration: playbooks, roles, inventory, cloud-init
- Add hardware specs, execution plans, scripts, HOMELAB.md
- Add skills/homelab/SKILL.md + skills/install.sh to preserve Copilot skill
- Remove previously-tracked inventory-cli binary from git index

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-09 08:10:32 +02:00

294 lines
8.1 KiB
Go

package db
import (
"database/sql"
"fmt"
"strings"
"time"
)
type OpRecord struct {
Operation string
Filepath string
SizeBefore int64
SizeAfter int64
MD5Before string
MD5After string
StartedAt time.Time
EndedAt time.Time
Status string
ErrorMessage string
SrcHost string
DestHost string
Owner string
}
func StartIteration(db *sql.DB, pair, direction, src, dest string, dryRun bool) (int64, error) {
dryRunInt := 0
if dryRun {
dryRunInt = 1
}
res, err := db.Exec(
`INSERT INTO sync_iterations (sync_pair, direction, src, dest, started_at, status, dry_run)
VALUES (?, ?, ?, ?, ?, 'running', ?)`,
pair, direction, src, dest, time.Now().UTC(), dryRunInt,
)
if err != nil {
return 0, fmt.Errorf("start iteration: %w", err)
}
id, err := res.LastInsertId()
if err != nil {
return 0, fmt.Errorf("start iteration last id: %w", err)
}
return id, nil
}
func FinishIteration(db *sql.DB, id int64, status string, created, updated, deleted, skipped, failed int, totalBytes int64, errMsg string) error {
var errMsgVal interface{}
if errMsg != "" {
errMsgVal = errMsg
}
_, err := db.Exec(
`UPDATE sync_iterations
SET ended_at = ?, status = ?, files_created = ?, files_updated = ?, files_deleted = ?,
files_skipped = ?, files_failed = ?, total_bytes_transferred = ?, error_message = ?
WHERE id = ?`,
time.Now().UTC(), status, created, updated, deleted, skipped, failed, totalBytes, errMsgVal, id,
)
if err != nil {
return fmt.Errorf("finish iteration: %w", err)
}
return nil
}
func BulkInsertOperations(db *sql.DB, iterationID int64, dryRun bool, ops []OpRecord) error {
if len(ops) == 0 {
return nil
}
dryRunInt := 0
if dryRun {
dryRunInt = 1
}
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("bulk insert begin tx: %w", err)
}
defer tx.Rollback()
// 15 columns per row (added src_host, dest_host, owner)
placeholders := make([]string, len(ops))
args := make([]interface{}, 0, len(ops)*15)
for i, op := range ops {
placeholders[i] = "(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
var md5Before, md5After, errMsg, srcHost, destHost, owner interface{}
if op.MD5Before != "" {
md5Before = op.MD5Before
}
if op.MD5After != "" {
md5After = op.MD5After
}
if op.ErrorMessage != "" {
errMsg = op.ErrorMessage
}
if op.SrcHost != "" {
srcHost = op.SrcHost
}
if op.DestHost != "" {
destHost = op.DestHost
}
if op.Owner != "" {
owner = op.Owner
}
args = append(args,
iterationID, dryRunInt, op.Operation, op.Filepath,
op.SizeBefore, op.SizeAfter,
md5Before, md5After,
op.StartedAt.UTC(), op.EndedAt.UTC(),
op.Status, errMsg,
srcHost, destHost, owner,
)
}
query := `INSERT INTO sync_operations
(iteration_id, dry_run, operation, filepath, size_before, size_after,
md5_before, md5_after, started_at, ended_at, status, error_message,
src_host, dest_host, owner)
VALUES ` + strings.Join(placeholders, ",")
if _, err := tx.Exec(query, args...); err != nil {
return fmt.Errorf("bulk insert exec: %w", err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("bulk insert commit: %w", err)
}
return nil
}
// OperationRow is a query result from sync_operations including the new host/owner columns.
type OperationRow struct {
ID int64 `json:"id"`
IterationID int64 `json:"iteration_id"`
DryRun bool `json:"dry_run"`
Operation string `json:"operation"`
Filepath string `json:"filepath"`
SizeBefore *int64 `json:"size_before,omitempty"`
SizeAfter *int64 `json:"size_after,omitempty"`
StartedAt time.Time `json:"started_at"`
EndedAt *time.Time `json:"ended_at,omitempty"`
Status string `json:"status"`
ErrorMessage *string `json:"error_message,omitempty"`
SrcHost *string `json:"src_host,omitempty"`
DestHost *string `json:"dest_host,omitempty"`
Owner *string `json:"owner,omitempty"`
}
// ListOperations returns paginated operations for a run plus the total row count.
func ListOperations(db *sql.DB, runID int64, limit, offset int) ([]*OperationRow, int64, error) {
var total int64
if err := db.QueryRow(`SELECT COUNT(*) FROM sync_operations WHERE iteration_id = ?`, runID).Scan(&total); err != nil {
return nil, 0, fmt.Errorf("count operations: %w", err)
}
rows, err := db.Query(`
SELECT id, iteration_id, dry_run, operation, filepath,
size_before, size_after, started_at, ended_at,
status, error_message, src_host, dest_host, owner
FROM sync_operations
WHERE iteration_id = ?
ORDER BY started_at ASC
LIMIT ? OFFSET ?`, runID, limit, offset)
if err != nil {
return nil, 0, fmt.Errorf("list operations: %w", err)
}
defer rows.Close()
var ops []*OperationRow
for rows.Next() {
var op OperationRow
var dryRun int
var sizeBefore, sizeAfter sql.NullInt64
var endedAt sql.NullTime
var errMsg, srcHost, destHost, owner sql.NullString
if err := rows.Scan(
&op.ID, &op.IterationID, &dryRun, &op.Operation, &op.Filepath,
&sizeBefore, &sizeAfter, &op.StartedAt, &endedAt,
&op.Status, &errMsg, &srcHost, &destHost, &owner,
); err != nil {
return nil, 0, fmt.Errorf("scan operation: %w", err)
}
op.DryRun = dryRun == 1
if sizeBefore.Valid {
op.SizeBefore = &sizeBefore.Int64
}
if sizeAfter.Valid {
op.SizeAfter = &sizeAfter.Int64
}
if endedAt.Valid {
op.EndedAt = &endedAt.Time
}
if errMsg.Valid {
op.ErrorMessage = &errMsg.String
}
if srcHost.Valid {
op.SrcHost = &srcHost.String
}
if destHost.Valid {
op.DestHost = &destHost.String
}
if owner.Valid {
op.Owner = &owner.String
}
ops = append(ops, &op)
}
if err := rows.Err(); err != nil {
return nil, 0, fmt.Errorf("ops rows: %w", err)
}
return ops, total, nil
}
// ListIterations returns the most recent iterations for a sync pair+direction.
func ListIterations(db *sql.DB, pair, direction string, limit int) ([]*Iteration, error) {
rows, err := db.Query(`
SELECT id, sync_pair, direction, status, dry_run,
started_at, ended_at,
files_created, files_updated, files_deleted, files_skipped, files_failed,
total_bytes_transferred, error_message
FROM sync_iterations
WHERE sync_pair = ? AND direction = ?
ORDER BY started_at DESC
LIMIT ?`, pair, direction, limit)
if err != nil {
return nil, fmt.Errorf("list iterations: %w", err)
}
defer rows.Close()
var iters []*Iteration
for rows.Next() {
it, err := scanIterationRow(rows)
if err != nil {
return nil, fmt.Errorf("scan iteration: %w", err)
}
iters = append(iters, it)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterations rows: %w", err)
}
return iters, nil
}
func LastDryRunOps(db *sql.DB, pair, direction string) ([]OpRecord, error) {
row := db.QueryRow(
`SELECT id FROM sync_iterations
WHERE sync_pair = ? AND direction = ? AND dry_run = 1 AND status = 'success'
ORDER BY started_at DESC LIMIT 1`,
pair, direction,
)
var iterID int64
if err := row.Scan(&iterID); err != nil {
if err == sql.ErrNoRows {
return []OpRecord{}, nil
}
return nil, fmt.Errorf("last dry run query: %w", err)
}
rows, err := db.Query(
`SELECT operation, filepath, size_before, size_after,
md5_before, md5_after, started_at, ended_at, status, error_message
FROM sync_operations WHERE iteration_id = ?`,
iterID,
)
if err != nil {
return nil, fmt.Errorf("last dry run ops query: %w", err)
}
defer rows.Close()
var ops []OpRecord
for rows.Next() {
var op OpRecord
var sizeBefore, sizeAfter sql.NullInt64
var md5Before, md5After, errMsg sql.NullString
var startedAt, endedAt time.Time
if err := rows.Scan(
&op.Operation, &op.Filepath,
&sizeBefore, &sizeAfter,
&md5Before, &md5After,
&startedAt, &endedAt,
&op.Status, &errMsg,
); err != nil {
return nil, fmt.Errorf("last dry run ops scan: %w", err)
}
op.SizeBefore = sizeBefore.Int64
op.SizeAfter = sizeAfter.Int64
op.MD5Before = md5Before.String
op.MD5After = md5After.String
op.StartedAt = startedAt
op.EndedAt = endedAt
op.ErrorMessage = errMsg.String
ops = append(ops, op)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("last dry run ops rows: %w", err)
}
return ops, nil
}