- Add .gitignore: exclude compiled binaries, build artifacts, and Helm values files containing real secrets (authentik, prometheus) - Add all Kubernetes deployment manifests (deployment/) - Add services source code: ha-sync, device-inventory, games-console, paperclip, parts-inventory - Add Ansible orchestration: playbooks, roles, inventory, cloud-init - Add hardware specs, execution plans, scripts, HOMELAB.md - Add skills/homelab/SKILL.md + skills/install.sh to preserve Copilot skill - Remove previously-tracked inventory-cli binary from git index Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
294 lines
8.1 KiB
Go
294 lines
8.1 KiB
Go
package db
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
type OpRecord struct {
|
|
Operation string
|
|
Filepath string
|
|
SizeBefore int64
|
|
SizeAfter int64
|
|
MD5Before string
|
|
MD5After string
|
|
StartedAt time.Time
|
|
EndedAt time.Time
|
|
Status string
|
|
ErrorMessage string
|
|
SrcHost string
|
|
DestHost string
|
|
Owner string
|
|
}
|
|
|
|
func StartIteration(db *sql.DB, pair, direction, src, dest string, dryRun bool) (int64, error) {
|
|
dryRunInt := 0
|
|
if dryRun {
|
|
dryRunInt = 1
|
|
}
|
|
res, err := db.Exec(
|
|
`INSERT INTO sync_iterations (sync_pair, direction, src, dest, started_at, status, dry_run)
|
|
VALUES (?, ?, ?, ?, ?, 'running', ?)`,
|
|
pair, direction, src, dest, time.Now().UTC(), dryRunInt,
|
|
)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("start iteration: %w", err)
|
|
}
|
|
id, err := res.LastInsertId()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("start iteration last id: %w", err)
|
|
}
|
|
return id, nil
|
|
}
|
|
|
|
func FinishIteration(db *sql.DB, id int64, status string, created, updated, deleted, skipped, failed int, totalBytes int64, errMsg string) error {
|
|
var errMsgVal interface{}
|
|
if errMsg != "" {
|
|
errMsgVal = errMsg
|
|
}
|
|
_, err := db.Exec(
|
|
`UPDATE sync_iterations
|
|
SET ended_at = ?, status = ?, files_created = ?, files_updated = ?, files_deleted = ?,
|
|
files_skipped = ?, files_failed = ?, total_bytes_transferred = ?, error_message = ?
|
|
WHERE id = ?`,
|
|
time.Now().UTC(), status, created, updated, deleted, skipped, failed, totalBytes, errMsgVal, id,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("finish iteration: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func BulkInsertOperations(db *sql.DB, iterationID int64, dryRun bool, ops []OpRecord) error {
|
|
if len(ops) == 0 {
|
|
return nil
|
|
}
|
|
dryRunInt := 0
|
|
if dryRun {
|
|
dryRunInt = 1
|
|
}
|
|
|
|
tx, err := db.Begin()
|
|
if err != nil {
|
|
return fmt.Errorf("bulk insert begin tx: %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
// 15 columns per row (added src_host, dest_host, owner)
|
|
placeholders := make([]string, len(ops))
|
|
args := make([]interface{}, 0, len(ops)*15)
|
|
|
|
for i, op := range ops {
|
|
placeholders[i] = "(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
|
|
var md5Before, md5After, errMsg, srcHost, destHost, owner interface{}
|
|
if op.MD5Before != "" {
|
|
md5Before = op.MD5Before
|
|
}
|
|
if op.MD5After != "" {
|
|
md5After = op.MD5After
|
|
}
|
|
if op.ErrorMessage != "" {
|
|
errMsg = op.ErrorMessage
|
|
}
|
|
if op.SrcHost != "" {
|
|
srcHost = op.SrcHost
|
|
}
|
|
if op.DestHost != "" {
|
|
destHost = op.DestHost
|
|
}
|
|
if op.Owner != "" {
|
|
owner = op.Owner
|
|
}
|
|
args = append(args,
|
|
iterationID, dryRunInt, op.Operation, op.Filepath,
|
|
op.SizeBefore, op.SizeAfter,
|
|
md5Before, md5After,
|
|
op.StartedAt.UTC(), op.EndedAt.UTC(),
|
|
op.Status, errMsg,
|
|
srcHost, destHost, owner,
|
|
)
|
|
}
|
|
|
|
query := `INSERT INTO sync_operations
|
|
(iteration_id, dry_run, operation, filepath, size_before, size_after,
|
|
md5_before, md5_after, started_at, ended_at, status, error_message,
|
|
src_host, dest_host, owner)
|
|
VALUES ` + strings.Join(placeholders, ",")
|
|
|
|
if _, err := tx.Exec(query, args...); err != nil {
|
|
return fmt.Errorf("bulk insert exec: %w", err)
|
|
}
|
|
if err := tx.Commit(); err != nil {
|
|
return fmt.Errorf("bulk insert commit: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// OperationRow is a query result from sync_operations including the new host/owner columns.
|
|
type OperationRow struct {
|
|
ID int64 `json:"id"`
|
|
IterationID int64 `json:"iteration_id"`
|
|
DryRun bool `json:"dry_run"`
|
|
Operation string `json:"operation"`
|
|
Filepath string `json:"filepath"`
|
|
SizeBefore *int64 `json:"size_before,omitempty"`
|
|
SizeAfter *int64 `json:"size_after,omitempty"`
|
|
StartedAt time.Time `json:"started_at"`
|
|
EndedAt *time.Time `json:"ended_at,omitempty"`
|
|
Status string `json:"status"`
|
|
ErrorMessage *string `json:"error_message,omitempty"`
|
|
SrcHost *string `json:"src_host,omitempty"`
|
|
DestHost *string `json:"dest_host,omitempty"`
|
|
Owner *string `json:"owner,omitempty"`
|
|
}
|
|
|
|
// ListOperations returns paginated operations for a run plus the total row count.
|
|
func ListOperations(db *sql.DB, runID int64, limit, offset int) ([]*OperationRow, int64, error) {
|
|
var total int64
|
|
if err := db.QueryRow(`SELECT COUNT(*) FROM sync_operations WHERE iteration_id = ?`, runID).Scan(&total); err != nil {
|
|
return nil, 0, fmt.Errorf("count operations: %w", err)
|
|
}
|
|
rows, err := db.Query(`
|
|
SELECT id, iteration_id, dry_run, operation, filepath,
|
|
size_before, size_after, started_at, ended_at,
|
|
status, error_message, src_host, dest_host, owner
|
|
FROM sync_operations
|
|
WHERE iteration_id = ?
|
|
ORDER BY started_at ASC
|
|
LIMIT ? OFFSET ?`, runID, limit, offset)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("list operations: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var ops []*OperationRow
|
|
for rows.Next() {
|
|
var op OperationRow
|
|
var dryRun int
|
|
var sizeBefore, sizeAfter sql.NullInt64
|
|
var endedAt sql.NullTime
|
|
var errMsg, srcHost, destHost, owner sql.NullString
|
|
if err := rows.Scan(
|
|
&op.ID, &op.IterationID, &dryRun, &op.Operation, &op.Filepath,
|
|
&sizeBefore, &sizeAfter, &op.StartedAt, &endedAt,
|
|
&op.Status, &errMsg, &srcHost, &destHost, &owner,
|
|
); err != nil {
|
|
return nil, 0, fmt.Errorf("scan operation: %w", err)
|
|
}
|
|
op.DryRun = dryRun == 1
|
|
if sizeBefore.Valid {
|
|
op.SizeBefore = &sizeBefore.Int64
|
|
}
|
|
if sizeAfter.Valid {
|
|
op.SizeAfter = &sizeAfter.Int64
|
|
}
|
|
if endedAt.Valid {
|
|
op.EndedAt = &endedAt.Time
|
|
}
|
|
if errMsg.Valid {
|
|
op.ErrorMessage = &errMsg.String
|
|
}
|
|
if srcHost.Valid {
|
|
op.SrcHost = &srcHost.String
|
|
}
|
|
if destHost.Valid {
|
|
op.DestHost = &destHost.String
|
|
}
|
|
if owner.Valid {
|
|
op.Owner = &owner.String
|
|
}
|
|
ops = append(ops, &op)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, 0, fmt.Errorf("ops rows: %w", err)
|
|
}
|
|
return ops, total, nil
|
|
}
|
|
|
|
// ListIterations returns the most recent iterations for a sync pair+direction.
|
|
func ListIterations(db *sql.DB, pair, direction string, limit int) ([]*Iteration, error) {
|
|
rows, err := db.Query(`
|
|
SELECT id, sync_pair, direction, status, dry_run,
|
|
started_at, ended_at,
|
|
files_created, files_updated, files_deleted, files_skipped, files_failed,
|
|
total_bytes_transferred, error_message
|
|
FROM sync_iterations
|
|
WHERE sync_pair = ? AND direction = ?
|
|
ORDER BY started_at DESC
|
|
LIMIT ?`, pair, direction, limit)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list iterations: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var iters []*Iteration
|
|
for rows.Next() {
|
|
it, err := scanIterationRow(rows)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scan iteration: %w", err)
|
|
}
|
|
iters = append(iters, it)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("iterations rows: %w", err)
|
|
}
|
|
return iters, nil
|
|
}
|
|
|
|
func LastDryRunOps(db *sql.DB, pair, direction string) ([]OpRecord, error) {
|
|
row := db.QueryRow(
|
|
`SELECT id FROM sync_iterations
|
|
WHERE sync_pair = ? AND direction = ? AND dry_run = 1 AND status = 'success'
|
|
ORDER BY started_at DESC LIMIT 1`,
|
|
pair, direction,
|
|
)
|
|
var iterID int64
|
|
if err := row.Scan(&iterID); err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return []OpRecord{}, nil
|
|
}
|
|
return nil, fmt.Errorf("last dry run query: %w", err)
|
|
}
|
|
|
|
rows, err := db.Query(
|
|
`SELECT operation, filepath, size_before, size_after,
|
|
md5_before, md5_after, started_at, ended_at, status, error_message
|
|
FROM sync_operations WHERE iteration_id = ?`,
|
|
iterID,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("last dry run ops query: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var ops []OpRecord
|
|
for rows.Next() {
|
|
var op OpRecord
|
|
var sizeBefore, sizeAfter sql.NullInt64
|
|
var md5Before, md5After, errMsg sql.NullString
|
|
var startedAt, endedAt time.Time
|
|
if err := rows.Scan(
|
|
&op.Operation, &op.Filepath,
|
|
&sizeBefore, &sizeAfter,
|
|
&md5Before, &md5After,
|
|
&startedAt, &endedAt,
|
|
&op.Status, &errMsg,
|
|
); err != nil {
|
|
return nil, fmt.Errorf("last dry run ops scan: %w", err)
|
|
}
|
|
op.SizeBefore = sizeBefore.Int64
|
|
op.SizeAfter = sizeAfter.Int64
|
|
op.MD5Before = md5Before.String
|
|
op.MD5After = md5After.String
|
|
op.StartedAt = startedAt
|
|
op.EndedAt = endedAt
|
|
op.ErrorMessage = errMsg.String
|
|
ops = append(ops, op)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("last dry run ops rows: %w", err)
|
|
}
|
|
return ops, nil
|
|
}
|