forked from pressly/goose
-
Notifications
You must be signed in to change notification settings - Fork 0
/
migration_sql.go
128 lines (109 loc) · 3.19 KB
/
migration_sql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package goose
import (
"database/sql"
"fmt"
"regexp"
"time"
)
// Run a migration specified in raw SQL.
//
// Sections of the script can be annotated with a special comment,
// starting with "-- +goose" to specify whether the section should
// be applied during an Up or Down migration
//
// All statements following an Up or Down directive are grouped together
// until another direction directive is found.
func runSQLMigration(db *sql.DB, statements []string, useTx bool, v int64, direction bool, noVersioning bool) error {
if useTx {
// TRANSACTION.
verboseInfo("Begin transaction")
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
for _, query := range statements {
verboseInfo("Executing statement: %s\n", clearStatement(query))
if err = execQuery(tx.Exec, query); err != nil {
verboseInfo("Rollback transaction")
tx.Rollback()
return fmt.Errorf("failed to execute SQL query %q: %w", clearStatement(query), err)
}
}
if !noVersioning {
if direction {
if err := execQuery(tx.Exec, GetDialect().insertVersionSQL(), v, direction); err != nil {
verboseInfo("Rollback transaction")
tx.Rollback()
return fmt.Errorf("failed to insert new goose version: %w", err)
}
} else {
if err := execQuery(tx.Exec, GetDialect().deleteVersionSQL(), v); err != nil {
verboseInfo("Rollback transaction")
tx.Rollback()
return fmt.Errorf("failed to delete goose version: %w", err)
}
}
}
verboseInfo("Commit transaction")
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}
// NO TRANSACTION.
for _, query := range statements {
verboseInfo("Executing statement: %s", clearStatement(query))
if err := execQuery(db.Exec, query); err != nil {
return fmt.Errorf("failed to execute SQL query %q: %w", clearStatement(query), err)
}
}
if !noVersioning {
if direction {
if err := execQuery(db.Exec, GetDialect().insertVersionSQL(), v, direction); err != nil {
return fmt.Errorf("failed to insert new goose version: %w", err)
}
} else {
if err := execQuery(db.Exec, GetDialect().deleteVersionSQL(), v); err != nil {
return fmt.Errorf("failed to delete goose version: %w", err)
}
}
}
return nil
}
func execQuery(fn func(string, ...interface{}) (sql.Result, error), query string, args ...interface{}) error {
if !verbose {
_, err := fn(query, args...)
return err
}
ch := make(chan error)
go func() {
_, err := fn(query, args...)
ch <- err
}()
t := time.Now()
for {
select {
case err := <-ch:
return err
case <-time.Tick(time.Minute):
verboseInfo("Executing statement still in progress for %v", time.Since(t).Round(time.Second))
}
}
}
const (
grayColor = "\033[90m"
resetColor = "\033[00m"
)
func verboseInfo(s string, args ...interface{}) {
if verbose {
log.Printf(grayColor+s+resetColor, args...)
}
}
var (
matchSQLComments = regexp.MustCompile(`(?m)^--.*$[\r\n]*`)
matchEmptyEOL = regexp.MustCompile(`(?m)^$[\r\n]*`) // TODO: Duplicate
)
func clearStatement(s string) string {
s = matchSQLComments.ReplaceAllString(s, ``)
return matchEmptyEOL.ReplaceAllString(s, ``)
}