package db import ( "database/sql" "fmt" "strings" "time" ) type OpRecord struct { Operation string Filepath string SizeBefore int64 SizeAfter int64 MD5Before string MD5After string StartedAt time.Time EndedAt time.Time Status string ErrorMessage string SrcHost string DestHost string Owner string } func StartIteration(db *sql.DB, pair, direction, src, dest string, dryRun bool) (int64, error) { dryRunInt := 0 if dryRun { dryRunInt = 1 } res, err := db.Exec( `INSERT INTO sync_iterations (sync_pair, direction, src, dest, started_at, status, dry_run) VALUES (?, ?, ?, ?, ?, 'running', ?)`, pair, direction, src, dest, time.Now().UTC(), dryRunInt, ) if err != nil { return 0, fmt.Errorf("start iteration: %w", err) } id, err := res.LastInsertId() if err != nil { return 0, fmt.Errorf("start iteration last id: %w", err) } return id, nil } func FinishIteration(db *sql.DB, id int64, status string, created, updated, deleted, skipped, failed int, totalBytes int64, errMsg string) error { var errMsgVal interface{} if errMsg != "" { errMsgVal = errMsg } _, err := db.Exec( `UPDATE sync_iterations SET ended_at = ?, status = ?, files_created = ?, files_updated = ?, files_deleted = ?, files_skipped = ?, files_failed = ?, total_bytes_transferred = ?, error_message = ? WHERE id = ?`, time.Now().UTC(), status, created, updated, deleted, skipped, failed, totalBytes, errMsgVal, id, ) if err != nil { return fmt.Errorf("finish iteration: %w", err) } return nil } func BulkInsertOperations(db *sql.DB, iterationID int64, dryRun bool, ops []OpRecord) error { if len(ops) == 0 { return nil } dryRunInt := 0 if dryRun { dryRunInt = 1 } tx, err := db.Begin() if err != nil { return fmt.Errorf("bulk insert begin tx: %w", err) } defer tx.Rollback() // 15 columns per row (added src_host, dest_host, owner) placeholders := make([]string, len(ops)) args := make([]interface{}, 0, len(ops)*15) for i, op := range ops { placeholders[i] = "(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" var md5Before, md5After, errMsg, srcHost, destHost, owner interface{} if op.MD5Before != "" { md5Before = op.MD5Before } if op.MD5After != "" { md5After = op.MD5After } if op.ErrorMessage != "" { errMsg = op.ErrorMessage } if op.SrcHost != "" { srcHost = op.SrcHost } if op.DestHost != "" { destHost = op.DestHost } if op.Owner != "" { owner = op.Owner } args = append(args, iterationID, dryRunInt, op.Operation, op.Filepath, op.SizeBefore, op.SizeAfter, md5Before, md5After, op.StartedAt.UTC(), op.EndedAt.UTC(), op.Status, errMsg, srcHost, destHost, owner, ) } query := `INSERT INTO sync_operations (iteration_id, dry_run, operation, filepath, size_before, size_after, md5_before, md5_after, started_at, ended_at, status, error_message, src_host, dest_host, owner) VALUES ` + strings.Join(placeholders, ",") if _, err := tx.Exec(query, args...); err != nil { return fmt.Errorf("bulk insert exec: %w", err) } if err := tx.Commit(); err != nil { return fmt.Errorf("bulk insert commit: %w", err) } return nil } // OperationRow is a query result from sync_operations including the new host/owner columns. type OperationRow struct { ID int64 `json:"id"` IterationID int64 `json:"iteration_id"` DryRun bool `json:"dry_run"` Operation string `json:"operation"` Filepath string `json:"filepath"` SizeBefore *int64 `json:"size_before,omitempty"` SizeAfter *int64 `json:"size_after,omitempty"` StartedAt time.Time `json:"started_at"` EndedAt *time.Time `json:"ended_at,omitempty"` Status string `json:"status"` ErrorMessage *string `json:"error_message,omitempty"` SrcHost *string `json:"src_host,omitempty"` DestHost *string `json:"dest_host,omitempty"` Owner *string `json:"owner,omitempty"` } // ListOperations returns paginated operations for a run plus the total row count. func ListOperations(db *sql.DB, runID int64, limit, offset int) ([]*OperationRow, int64, error) { var total int64 if err := db.QueryRow(`SELECT COUNT(*) FROM sync_operations WHERE iteration_id = ?`, runID).Scan(&total); err != nil { return nil, 0, fmt.Errorf("count operations: %w", err) } rows, err := db.Query(` SELECT id, iteration_id, dry_run, operation, filepath, size_before, size_after, started_at, ended_at, status, error_message, src_host, dest_host, owner FROM sync_operations WHERE iteration_id = ? ORDER BY started_at ASC LIMIT ? OFFSET ?`, runID, limit, offset) if err != nil { return nil, 0, fmt.Errorf("list operations: %w", err) } defer rows.Close() var ops []*OperationRow for rows.Next() { var op OperationRow var dryRun int var sizeBefore, sizeAfter sql.NullInt64 var endedAt sql.NullTime var errMsg, srcHost, destHost, owner sql.NullString if err := rows.Scan( &op.ID, &op.IterationID, &dryRun, &op.Operation, &op.Filepath, &sizeBefore, &sizeAfter, &op.StartedAt, &endedAt, &op.Status, &errMsg, &srcHost, &destHost, &owner, ); err != nil { return nil, 0, fmt.Errorf("scan operation: %w", err) } op.DryRun = dryRun == 1 if sizeBefore.Valid { op.SizeBefore = &sizeBefore.Int64 } if sizeAfter.Valid { op.SizeAfter = &sizeAfter.Int64 } if endedAt.Valid { op.EndedAt = &endedAt.Time } if errMsg.Valid { op.ErrorMessage = &errMsg.String } if srcHost.Valid { op.SrcHost = &srcHost.String } if destHost.Valid { op.DestHost = &destHost.String } if owner.Valid { op.Owner = &owner.String } ops = append(ops, &op) } if err := rows.Err(); err != nil { return nil, 0, fmt.Errorf("ops rows: %w", err) } return ops, total, nil } // ListIterations returns the most recent iterations for a sync pair+direction. func ListIterations(db *sql.DB, pair, direction string, limit int) ([]*Iteration, error) { rows, err := db.Query(` SELECT id, sync_pair, direction, status, dry_run, started_at, ended_at, files_created, files_updated, files_deleted, files_skipped, files_failed, total_bytes_transferred, error_message FROM sync_iterations WHERE sync_pair = ? AND direction = ? ORDER BY started_at DESC LIMIT ?`, pair, direction, limit) if err != nil { return nil, fmt.Errorf("list iterations: %w", err) } defer rows.Close() var iters []*Iteration for rows.Next() { it, err := scanIterationRow(rows) if err != nil { return nil, fmt.Errorf("scan iteration: %w", err) } iters = append(iters, it) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("iterations rows: %w", err) } return iters, nil } func LastDryRunOps(db *sql.DB, pair, direction string) ([]OpRecord, error) { row := db.QueryRow( `SELECT id FROM sync_iterations WHERE sync_pair = ? AND direction = ? AND dry_run = 1 AND status = 'success' ORDER BY started_at DESC LIMIT 1`, pair, direction, ) var iterID int64 if err := row.Scan(&iterID); err != nil { if err == sql.ErrNoRows { return []OpRecord{}, nil } return nil, fmt.Errorf("last dry run query: %w", err) } rows, err := db.Query( `SELECT operation, filepath, size_before, size_after, md5_before, md5_after, started_at, ended_at, status, error_message FROM sync_operations WHERE iteration_id = ?`, iterID, ) if err != nil { return nil, fmt.Errorf("last dry run ops query: %w", err) } defer rows.Close() var ops []OpRecord for rows.Next() { var op OpRecord var sizeBefore, sizeAfter sql.NullInt64 var md5Before, md5After, errMsg sql.NullString var startedAt, endedAt time.Time if err := rows.Scan( &op.Operation, &op.Filepath, &sizeBefore, &sizeAfter, &md5Before, &md5After, &startedAt, &endedAt, &op.Status, &errMsg, ); err != nil { return nil, fmt.Errorf("last dry run ops scan: %w", err) } op.SizeBefore = sizeBefore.Int64 op.SizeAfter = sizeAfter.Int64 op.MD5Before = md5Before.String op.MD5After = md5After.String op.StartedAt = startedAt op.EndedAt = endedAt op.ErrorMessage = errMsg.String ops = append(ops, op) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("last dry run ops rows: %w", err) } return ops, nil }