diff --git a/Dockerfile b/Dockerfile index 4f0084c..554f50b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,6 +19,7 @@ WORKDIR /build COPY go.mod go.sum ./ RUN go mod download +COPY pkg/ pkg/ COPY cmd/gh-action/ ./ ENV CGO_ENABLED=0 diff --git a/cmd/gh-action/config.go b/cmd/gh-action/config.go deleted file mode 100644 index 4648c86..0000000 --- a/cmd/gh-action/config.go +++ /dev/null @@ -1,74 +0,0 @@ -package main - -import ( - "fmt" - "os" - "strconv" - "strings" - - "github.com/google/go-github/v65/github" - "github.com/jmoiron/sqlx" -) - -type configType struct { - dbUri string - dbTable string - db *sqlx.DB - runID int64 - repository string - owner string - repo string - githubToken string - ghClient *github.Client -} - -type dbContextType struct { - Tx *sqlx.Tx - insertJobStmt *sqlx.NamedStmt - insertStepStmt *sqlx.NamedStmt -} - -func getConfig() (configType, error) { - dbUri := os.Getenv("DB_URI") - if len(dbUri) == 0 { - return configType{}, fmt.Errorf("missing env: DB_URI") - } - - dbTable := os.Getenv(("DB_TABLE")) - if len(dbTable) == 0 { - return configType{}, fmt.Errorf("missing env: DB_TABLE") - } - - repository := os.Getenv("GITHUB_REPOSITORY") - if len(repository) == 0 { - return configType{}, fmt.Errorf("missing env: GITHUB_REPOSITORY") - } - - envRunID := os.Getenv("GH_RUN_ID") - var runID int64 - if len(envRunID) == 0 { - return configType{}, fmt.Errorf("missing env: GH_RUN_ID") - } - runID, err := strconv.ParseInt(envRunID, 10, 64) - if err != nil { - return configType{}, fmt.Errorf("GH_RUN_ID must be integer, error: %v", err) - } - - githubToken := os.Getenv("GH_TOKEN") - - repoDetails := strings.Split(repository, "/") - if len(repoDetails) != 2 { - return configType{}, fmt.Errorf("invalid env: GITHUB_REPOSITORY") - } - - return configType{ - dbUri: dbUri, - dbTable: dbTable, - db: nil, - runID: runID, - repository: repository, - owner: repoDetails[0], - repo: repoDetails[1], - githubToken: githubToken, - }, nil -} diff --git a/cmd/gh-action/db.go b/cmd/gh-action/db.go deleted file mode 100644 index 1cd016c..0000000 --- a/cmd/gh-action/db.go +++ /dev/null @@ -1,177 +0,0 @@ -package main - -import ( - "context" - "fmt" - - "github.com/jmoiron/sqlx" - _ "github.com/lib/pq" - - "github.com/google/go-github/v65/github" -) - -var ( - schemeWorkflowRunsStats = ` - CREATE TABLE IF NOT EXISTS %s ( - workflowid BIGINT, - name TEXT, - status TEXT, - conclusion TEXT, - runid BIGINT, - runattempt INT, - startedat TIMESTAMP, - updatedat TIMESTAMP, - reponame TEXT, - event TEXT, - PRIMARY KEY(workflowid, runid, runattempt) - ) - ` - schemeWorkflowRunAttempts = ` - CREATE TABLE IF NOT EXISTS %s ( - workflowid BIGINT, - name TEXT, - status TEXT, - conclusion TEXT, - runid BIGINT, - runattempt INT, - startedat TIMESTAMP, - updatedat TIMESTAMP, - reponame TEXT, - event TEXT, - PRIMARY KEY(workflowid, runid, runattempt) - ) - ` - schemeWorkflowJobs = ` - CREATE TABLE IF NOT EXISTS %s ( - JobId BIGINT, - RunID BIGINT, - NodeID TEXT, - HeadBranch TEXT, - HeadSHA TEXT, - Status TEXT, - Conclusion TEXT, - CreatedAt TIMESTAMP, - StartedAt TIMESTAMP, - CompletedAt TIMESTAMP, - Name TEXT, - RunnerName TEXT, - RunnerGroupName TEXT, - RunAttempt BIGINT, - WorkflowName TEXT - )` - schemeWorkflowJobsSteps = ` - CREATE TABLE IF NOT EXISTS %s ( - JobId BIGINT, - RunId BIGINT, - RunAttempt BIGINT, - Name TEXT, - Status TEXT, - Conclusion TEXT, - Number BIGINT, - StartedAt TIMESTAMP, - CompletedAt TIMESTAMP - ) - ` -) - -func initDatabase(conf configType) error { - _, err := conf.db.Exec(fmt.Sprintf(schemeWorkflowRunsStats, conf.dbTable)) - if err != nil { - return err - } - - _, err = conf.db.Exec(fmt.Sprintf(schemeWorkflowRunAttempts, conf.dbTable + "_attempts")) - if err != nil { - return err - } - - _, err = conf.db.Exec(fmt.Sprintf(schemeWorkflowJobs, conf.dbTable + "_jobs")) - if err != nil { - return err - } - - _, err = conf.db.Exec(fmt.Sprintf(schemeWorkflowJobsSteps, conf.dbTable + "_steps")) - if err != nil { - return err - } - return nil -} - -func connectDB(conf *configType) error { - db, err := sqlx.Connect("postgres", conf.dbUri) - if err != nil { - return err - } - conf.db = db - return nil -} - -func saveWorkflowRun(conf configType, record *WorkflowRunRec) error { - query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", conf.dbTable, - "workflowid, name, status, conclusion, runid, runattempt, startedAt, updatedAt, repoName, event", - ":workflowid, :name, :status, :conclusion, :runid, :runattempt, :startedat, :updatedat, :reponame, :event", - ) - - _, err := conf.db.NamedExec(query, *record) - - if err != nil { - return err - } - return nil -} - -func saveWorkflowRunAttempt(conf configType, workflowRun *github.WorkflowRun) error { - query := fmt.Sprintf("INSERT INTO %s_attempts (%s) VALUES (%s)", conf.dbTable, - "workflowid, name, status, conclusion, runid, runattempt, startedAt, updatedAt, repoName, event", - ":workflowid, :name, :status, :conclusion, :runid, :runattempt, :startedat, :updatedat, :reponame, :event", - ) - - _, err := conf.db.NamedExec(query, ghWorkflowRunRec(workflowRun)) - return err -} - -func prepareJobTransaction(ctx context.Context, conf configType, dbContext *dbContextType) error { - var err error - dbContext.Tx, err = conf.db.BeginTxx(ctx, nil) - if err != nil { - return err - } - jobs_query := fmt.Sprintf("INSERT INTO %s_jobs (%s) VALUES (%s)", conf.dbTable, - "jobid, runid, nodeid, headbranch, headsha, status, conclusion, createdat, startedat, completedat, name, runnername, runnergroupname, runattempt, workflowname", - ":jobid, :runid, :nodeid, :headbranch, :headsha, :status, :conclusion, :createdat, :startedat, :completedat, :name, :runnername, :runnergroupname, :runattempt, :workflowname", - ) - dbContext.insertJobStmt, _ = dbContext.Tx.PrepareNamed(jobs_query) - - - steps_query := fmt.Sprintf("INSERT INTO %s_steps (%s) VALUES (%s)", conf.dbTable, - "jobid, runid, runattempt, name, status, conclusion, number, startedat, completedat", - ":jobid, :runid, :runattempt, :name, :status, :conclusion, :number, :startedat, :completedat", - ) - dbContext.insertStepStmt, _ = dbContext.Tx.PrepareNamed(steps_query) - - return nil -} -func saveJobInfo(dbContext *dbContextType, workflowJob *github.WorkflowJob) error { - _, err := dbContext.insertJobStmt.Exec(ghWorkflowJobRec(workflowJob)) - if err != nil { - return err - } - - for _, step := range workflowJob.Steps { - err = saveStepInfo(dbContext, workflowJob, step) - if err != nil { - return err - } - } - return nil -} - -func saveStepInfo(dbContext *dbContextType, job *github.WorkflowJob, step *github.TaskStep) error { - _, err := dbContext.insertStepStmt.Exec(ghWorkflowJobStepRec(job, step)) - return err -} - -func commitJobTransaction(dbContext *dbContextType) error { - err := dbContext.Tx.Commit() - return err -} diff --git a/cmd/gh-action/gh.go b/cmd/gh-action/gh.go deleted file mode 100644 index c317cfb..0000000 --- a/cmd/gh-action/gh.go +++ /dev/null @@ -1,88 +0,0 @@ -package main - -import ( - "context" - "fmt" - "net/http" - - "github.com/google/go-github/v65/github" - "golang.org/x/oauth2" -) - -func printJobInfo(job *github.WorkflowJob) { - fmt.Printf("== Job %s %s, (created: %v, started: %v, completed: %v)\n", - *job.Name, - *job.Status, - *job.CreatedAt, - job.StartedAt, - job.CompletedAt, - ) - for _, step := range job.Steps { - fmt.Printf("Step %s, started %v, completed %v\n", *step.Name, step.StartedAt, step.CompletedAt) - } -} - -func initGhClient(conf *configType) { - var token *http.Client - if len(conf.githubToken) != 0 { - token = oauth2.NewClient(context.Background(), oauth2.StaticTokenSource( - &oauth2.Token{AccessToken: conf.githubToken}, - )) - } - - conf.ghClient = github.NewClient(token) -} - -func getWorkflowStat(ctx context.Context, conf configType) (*WorkflowRunRec, error) { - fmt.Printf("Getting data for %s/%s, runId %d\n", conf.owner, conf.repo, conf.runID) - workflowRunData, _, err := conf.ghClient.Actions.GetWorkflowRunByID(ctx, conf.owner, conf.repo, conf.runID) - if err != nil { - return nil, err - } - - if workflowRunData == nil { - fmt.Printf("Got nil\n") - return &WorkflowRunRec{RepoName: conf.repository}, nil - } - - return ghWorkflowRunRec(workflowRunData), nil -} - -func getWorkflowAttempt(ctx context.Context, conf configType, attempt int64) (*github.WorkflowRun, error) { - workflowRunData, _, err := conf.ghClient.Actions.GetWorkflowRunAttempt( - ctx, - conf.owner, conf.repo, - conf.runID, - int(attempt), - nil, - ) - if err != nil { - return nil, err - } - return workflowRunData, nil -} - -func getWorkflowAttemptJobs(ctx context.Context, conf configType, attempt int64) ([]*github.WorkflowJob, error) { - var result []*github.WorkflowJob - - opts := &github.ListOptions{PerPage: 100} - for { - jobsData, resp, err := conf.ghClient.Actions.ListWorkflowJobsAttempt( - ctx, - conf.owner, conf.repo, - conf.runID, - attempt, - opts, - ) - if err != nil { - return nil, err - } - result = append(result, jobsData.Jobs...) - if resp.NextPage == 0 { - break - } - - opts.Page = resp.NextPage - } - return result, nil -} diff --git a/cmd/gh-action/main.go b/cmd/gh-action/main.go index bda1a95..a248d4d 100644 --- a/cmd/gh-action/main.go +++ b/cmd/gh-action/main.go @@ -6,63 +6,67 @@ import ( "log" "github.com/google/go-github/v65/github" + "github.com/neondatabase/gh-workflow-stats-action/pkg/config" + "github.com/neondatabase/gh-workflow-stats-action/pkg/data" + "github.com/neondatabase/gh-workflow-stats-action/pkg/db" + "github.com/neondatabase/gh-workflow-stats-action/pkg/gh" ) func main() { ctx := context.Background() // Get env vars - conf, err := getConfig() + conf, err := config.GetConfig() if err != nil { log.Fatal(err) } - err = connectDB(&conf) + err = db.ConnectDB(&conf) if err != nil { log.Fatal(err) } - err = initDatabase(conf) + err = db.InitDatabase(conf) if err != nil { log.Fatal(err) } - initGhClient(&conf) + gh.InitGhClient(&conf) - var workflowStat *WorkflowRunRec - workflowStat, err = getWorkflowStat(ctx, conf) + var workflowStat *data.WorkflowRunRec + workflowStat, err = gh.GetWorkflowStat(ctx, conf) if err != nil { log.Fatal(err) } - err = saveWorkflowRun(conf, workflowStat) + err = db.SaveWorkflowRun(conf, workflowStat) if err != nil { log.Fatal(err) } var lastAttemptRun *github.WorkflowRun lastAttemptN := workflowStat.RunAttempt - lastAttemptRun, err = getWorkflowAttempt(ctx, conf, lastAttemptN) + lastAttemptRun, err = gh.GetWorkflowAttempt(ctx, conf, lastAttemptN) if err != nil { log.Fatal(err) } - err = saveWorkflowRunAttempt(conf, lastAttemptRun) + err = db.SaveWorkflowRunAttempt(conf, lastAttemptRun) if err != nil { log.Fatal(err) } - jobsInfo, err := getWorkflowAttemptJobs(ctx, conf, lastAttemptN) + jobsInfo, _, err := gh.GetWorkflowAttemptJobs(ctx, conf, lastAttemptN) if err != nil { log.Fatal(err) } - var dbContext dbContextType - prepareJobTransaction(ctx, conf, &dbContext) + var dbContext config.DbContextType + db.PrepareJobTransaction(ctx, conf, &dbContext) for _, jobInfo := range jobsInfo { - err = saveJobInfo(&dbContext, jobInfo) + err = db.SaveJobInfo(&dbContext, jobInfo) if err != nil { fmt.Println(err) } } - err = commitJobTransaction(&dbContext) + err = db.CommitJobTransaction(&dbContext) if err != nil { fmt.Println(err) } diff --git a/cmd/history-exporter/main.go b/cmd/history-exporter/main.go new file mode 100644 index 0000000..fcd4a38 --- /dev/null +++ b/cmd/history-exporter/main.go @@ -0,0 +1,111 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "time" + + "github.com/google/go-github/v65/github" + "github.com/neondatabase/gh-workflow-stats-action/pkg/config" + "github.com/neondatabase/gh-workflow-stats-action/pkg/db" + "github.com/neondatabase/gh-workflow-stats-action/pkg/export" + "github.com/neondatabase/gh-workflow-stats-action/pkg/gh" +) + +func main() { + var startDateStr string + var endDateStr string + var startDate time.Time + var endDate time.Time + + flag.StringVar(&startDateStr, "start-date", "", "start date to quert and export") + flag.StringVar(&endDateStr, "end-date", "", "end date to quert and export") + flag.Parse() + + if startDateStr == "" { + startDate = time.Now().Truncate(24 * time.Hour) + } else { + var err error + startDate, err = time.Parse("2006-01-02", startDateStr) + if err != nil { + log.Fatalf("Failed to parse date: %s", err) + } + } + + if endDateStr == "" { + endDate = startDate.AddDate(0, 0, 1) + } else { + var err error + endDate, err = time.Parse("2006-01-02", endDateStr) + if err != nil { + log.Fatalf("Failed to parse end date: %s", err) + } + } + + conf, err := config.GetConfig() + if err != nil { + log.Fatal(err) + } + + err = db.ConnectDB(&conf) + if err != nil { + log.Fatal(err) + } + + gh.InitGhClient(&conf) + ctx := context.Background() + + durations := []time.Duration{ + 6 * time.Hour, // 18:00 - 24:00 + 3 * time.Hour, // 15:00 - 18:00 + 1 * time.Hour, // 14:00 - 15:00 + 1 * time.Hour, // 13:00 - 14:00 + 1 * time.Hour, // 12:00 - 13:00 + 2 * time.Hour, // 10:00 - 12:00 + 4 * time.Hour, // 06:00 - 10:00 + 6 * time.Hour, // 00:00 - 06:00 + } + curDurIdx := 0 + for date := endDate.Add(-durations[curDurIdx]); date.Compare(startDate) >= 0; date = date.Add(-durations[curDurIdx]) { + runs, rate, _ := gh.ListWorkflowRuns(ctx, conf, date, date.Add(durations[curDurIdx])) + fmt.Println("\n", date, len(runs)) + if len(runs) >= 1000 { + fmt.Printf("\n\n+++\n+ PAGINATION LIMIT: %v\n+++\n", date) + } + fetchedRunsKeys := make([]gh.WorkflowRunAttemptKey, len(runs)) + i := 0 + for key := range runs { + fetchedRunsKeys[i] = key + i++ + } + notInDb := db.QueryWorkflowRunsNotInDb(conf, fetchedRunsKeys) + fmt.Printf("Time range: %v - %v, fetched: %d, notInDb: %d.\n", + date, date.Add(durations[curDurIdx]), + len(runs), len(notInDb), + ) + if rate.Remaining < 30 { + fmt.Printf("Close to rate limit, remaining: %d", rate.Remaining) + fmt.Printf("Sleep till %v (%v seconds)\n", rate.Reset, time.Until(rate.Reset.Time)) + time.Sleep(time.Until(rate.Reset.Time) + 10*time.Second) + } else { + fmt.Printf("Rate: %+v\n", rate) + } + for _, key := range notInDb { + conf.RunID = key.RunId + fmt.Printf("Saving runId %d Attempt %d. ", key.RunId, key.RunAttempt) + var attemptRun *github.WorkflowRun + var ok bool + if attemptRun, ok = runs[gh.WorkflowRunAttemptKey{RunId: key.RunId, RunAttempt: key.RunAttempt}]; ok { + fmt.Printf("Got it from ListWorkflowRuns results. ") + } else { + fmt.Printf("Fetching it from GH API. ") + attemptRun, _ = gh.GetWorkflowAttempt(ctx, conf, key.RunAttempt) + } + db.SaveWorkflowRunAttempt(conf, attemptRun) + export.ExportAndSaveJobs(ctx, conf, key.RunAttempt) + } + curDurIdx = (curDurIdx + 1) % len(durations) + } +} diff --git a/go.mod b/go.mod index 723c40e..9f2e582 100644 --- a/go.mod +++ b/go.mod @@ -4,9 +4,14 @@ go 1.23.1 require ( github.com/google/go-github/v65 v65.0.0 + github.com/hashicorp/go-retryablehttp v0.7.7 github.com/jmoiron/sqlx v1.4.0 github.com/lib/pq v1.10.9 golang.org/x/oauth2 v0.23.0 ) -require github.com/google/go-querystring v1.1.0 // indirect +require ( + github.com/gofri/go-github-ratelimit v1.1.0 // indirect + github.com/google/go-querystring v1.1.0 // indirect + github.com/hashicorp/go-cleanhttp v0.5.2 // indirect +) diff --git a/go.sum b/go.sum index 10385a1..9854813 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,11 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= +github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/gofri/go-github-ratelimit v1.1.0 h1:ijQ2bcv5pjZXNil5FiwglCg8wc9s8EgjTmNkqjw8nuk= +github.com/gofri/go-github-ratelimit v1.1.0/go.mod h1:OnCi5gV+hAG/LMR7llGhU7yHt44se9sYgKPnafoL7RY= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -9,12 +13,24 @@ github.com/google/go-github/v65 v65.0.0 h1:pQ7BmO3DZivvFk92geC0jB0q2m3gyn8vnYPgV github.com/google/go-github/v65 v65.0.0/go.mod h1:DvrqWo5hvsdhJvHd4WyVF9ttANN3BniqjP8uTFMNb60= github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8= github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU= +github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= +github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= +github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= +github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= +github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU= +github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/config/config.go b/pkg/config/config.go new file mode 100644 index 0000000..c07e7ce --- /dev/null +++ b/pkg/config/config.go @@ -0,0 +1,74 @@ +package config + +import ( + "fmt" + "os" + "strconv" + "strings" + + "github.com/google/go-github/v65/github" + "github.com/jmoiron/sqlx" +) + +type ConfigType struct { + DbUri string + DbTable string + Db *sqlx.DB + RunID int64 + Repository string + Owner string + Repo string + GithubToken string + GhClient *github.Client +} + +type DbContextType struct { + Tx *sqlx.Tx + InsertJobStmt *sqlx.NamedStmt + InsertStepStmt *sqlx.NamedStmt +} + +func GetConfig() (ConfigType, error) { + dbUri := os.Getenv("DB_URI") + if len(dbUri) == 0 { + return ConfigType{}, fmt.Errorf("missing env: DB_URI") + } + + dbTable := os.Getenv(("DB_TABLE")) + if len(dbTable) == 0 { + return ConfigType{}, fmt.Errorf("missing env: DB_TABLE") + } + + repository := os.Getenv("GITHUB_REPOSITORY") + if len(repository) == 0 { + return ConfigType{}, fmt.Errorf("missing env: GITHUB_REPOSITORY") + } + + envRunID := os.Getenv("GH_RUN_ID") + var runID int64 + if len(envRunID) == 0 { + return ConfigType{}, fmt.Errorf("missing env: GH_RUN_ID") + } + runID, err := strconv.ParseInt(envRunID, 10, 64) + if err != nil { + return ConfigType{}, fmt.Errorf("GH_RUN_ID must be integer, error: %v", err) + } + + githubToken := os.Getenv("GH_TOKEN") + + repoDetails := strings.Split(repository, "/") + if len(repoDetails) != 2 { + return ConfigType{}, fmt.Errorf("invalid env: GITHUB_REPOSITORY") + } + + return ConfigType{ + DbUri: dbUri, + DbTable: dbTable, + Db: nil, + RunID: runID, + Repository: repository, + Owner: repoDetails[0], + Repo: repoDetails[1], + GithubToken: githubToken, + }, nil +} diff --git a/cmd/gh-action/data.go b/pkg/data/data.go similarity index 91% rename from cmd/gh-action/data.go rename to pkg/data/data.go index bd7e08b..8cb40c7 100644 --- a/cmd/gh-action/data.go +++ b/pkg/data/data.go @@ -1,4 +1,4 @@ -package main +package data import ( "time" @@ -19,7 +19,7 @@ type WorkflowRunRec struct { Event string } -func ghWorkflowRunRec(w *github.WorkflowRun) *WorkflowRunRec { +func GhWorkflowRunRec(w *github.WorkflowRun) *WorkflowRunRec { return &WorkflowRunRec{ WorkflowId: w.GetWorkflowID(), Name: w.GetName(), @@ -52,7 +52,7 @@ type WorkflowJobRec struct { WorkflowName string } -func ghWorkflowJobRec(j *github.WorkflowJob) *WorkflowJobRec { +func GhWorkflowJobRec(j *github.WorkflowJob) *WorkflowJobRec { return &WorkflowJobRec{ JobId: j.GetID(), RunId: j.GetRunID(), @@ -84,7 +84,7 @@ type WorkflowJobStepRec struct { CompletedAt time.Time } -func ghWorkflowJobStepRec(j *github.WorkflowJob, s *github.TaskStep) *WorkflowJobStepRec { +func GhWorkflowJobStepRec(j *github.WorkflowJob, s *github.TaskStep) *WorkflowJobStepRec { return &WorkflowJobStepRec{ JobId: j.GetID(), RunId: j.GetRunID(), diff --git a/pkg/db/db.go b/pkg/db/db.go new file mode 100644 index 0000000..a7959b1 --- /dev/null +++ b/pkg/db/db.go @@ -0,0 +1,238 @@ +package db + +import ( + "context" + "fmt" + "strings" + + "github.com/jmoiron/sqlx" + _ "github.com/lib/pq" + + "github.com/google/go-github/v65/github" + + "github.com/neondatabase/gh-workflow-stats-action/pkg/config" + "github.com/neondatabase/gh-workflow-stats-action/pkg/data" + "github.com/neondatabase/gh-workflow-stats-action/pkg/gh" +) + +var ( + schemeWorkflowRunsStats = ` + CREATE TABLE IF NOT EXISTS %s ( + workflowid BIGINT, + name TEXT, + status TEXT, + conclusion TEXT, + runid BIGINT, + runattempt INT, + startedat TIMESTAMP, + updatedat TIMESTAMP, + reponame TEXT, + event TEXT, + PRIMARY KEY(workflowid, runid, runattempt) + ) + ` + schemeWorkflowRunAttempts = ` + CREATE TABLE IF NOT EXISTS %s ( + workflowid BIGINT, + name TEXT, + status TEXT, + conclusion TEXT, + runid BIGINT, + runattempt INT, + startedat TIMESTAMP, + updatedat TIMESTAMP, + reponame TEXT, + event TEXT, + PRIMARY KEY(workflowid, runid, runattempt) + ) + ` + schemeWorkflowJobs = ` + CREATE TABLE IF NOT EXISTS %s ( + JobId BIGINT, + RunID BIGINT, + NodeID TEXT, + HeadBranch TEXT, + HeadSHA TEXT, + Status TEXT, + Conclusion TEXT, + CreatedAt TIMESTAMP, + StartedAt TIMESTAMP, + CompletedAt TIMESTAMP, + Name TEXT, + RunnerName TEXT, + RunnerGroupName TEXT, + RunAttempt BIGINT, + WorkflowName TEXT + )` + schemeWorkflowJobsSteps = ` + CREATE TABLE IF NOT EXISTS %s ( + JobId BIGINT, + RunId BIGINT, + RunAttempt BIGINT, + Name TEXT, + Status TEXT, + Conclusion TEXT, + Number BIGINT, + StartedAt TIMESTAMP, + CompletedAt TIMESTAMP + ) + ` +) + +func InitDatabase(conf config.ConfigType) error { + _, err := conf.Db.Exec(fmt.Sprintf(schemeWorkflowRunsStats, conf.DbTable)) + if err != nil { + return err + } + + _, err = conf.Db.Exec(fmt.Sprintf(schemeWorkflowRunAttempts, conf.DbTable+"_attempts")) + if err != nil { + return err + } + + _, err = conf.Db.Exec(fmt.Sprintf(schemeWorkflowJobs, conf.DbTable+"_jobs")) + if err != nil { + return err + } + + _, err = conf.Db.Exec(fmt.Sprintf(schemeWorkflowJobsSteps, conf.DbTable+"_steps")) + if err != nil { + return err + } + return nil +} + +func ConnectDB(conf *config.ConfigType) error { + db, err := sqlx.Connect("postgres", conf.DbUri) + if err != nil { + return err + } + conf.Db = db + return nil +} + +func SaveWorkflowRun(conf config.ConfigType, record *data.WorkflowRunRec) error { + query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", conf.DbTable, + "workflowid, name, status, conclusion, runid, runattempt, startedAt, updatedAt, repoName, event", + ":workflowid, :name, :status, :conclusion, :runid, :runattempt, :startedat, :updatedat, :reponame, :event", + ) + + _, err := conf.Db.NamedExec(query, *record) + + if err != nil { + return err + } + return nil +} + +func SaveWorkflowRunAttempt(conf config.ConfigType, workflowRun *github.WorkflowRun) error { + query := fmt.Sprintf("INSERT INTO %s_attempts (%s) VALUES (%s)", conf.DbTable, + "workflowid, name, status, conclusion, runid, runattempt, startedAt, updatedAt, repoName, event", + ":workflowid, :name, :status, :conclusion, :runid, :runattempt, :startedat, :updatedat, :reponame, :event", + ) + + _, err := conf.Db.NamedExec(query, data.GhWorkflowRunRec(workflowRun)) + return err +} + +func PrepareJobTransaction(ctx context.Context, conf config.ConfigType, dbContext *config.DbContextType) error { + var err error + dbContext.Tx, err = conf.Db.BeginTxx(ctx, nil) + if err != nil { + return err + } + jobs_query := fmt.Sprintf("INSERT INTO %s_jobs (%s) VALUES (%s)", conf.DbTable, + "jobid, runid, nodeid, headbranch, headsha, status, conclusion, createdat, startedat, completedat, name, runnername, runnergroupname, runattempt, workflowname", + ":jobid, :runid, :nodeid, :headbranch, :headsha, :status, :conclusion, :createdat, :startedat, :completedat, :name, :runnername, :runnergroupname, :runattempt, :workflowname", + ) + dbContext.InsertJobStmt, _ = dbContext.Tx.PrepareNamed(jobs_query) + + steps_query := fmt.Sprintf("INSERT INTO %s_steps (%s) VALUES (%s)", conf.DbTable, + "jobid, runid, runattempt, name, status, conclusion, number, startedat, completedat", + ":jobid, :runid, :runattempt, :name, :status, :conclusion, :number, :startedat, :completedat", + ) + dbContext.InsertStepStmt, _ = dbContext.Tx.PrepareNamed(steps_query) + + return nil +} +func SaveJobInfo(dbContext *config.DbContextType, workflowJob *github.WorkflowJob) error { + _, err := dbContext.InsertJobStmt.Exec(data.GhWorkflowJobRec(workflowJob)) + if err != nil { + return err + } + + for _, step := range workflowJob.Steps { + err = SaveStepInfo(dbContext, workflowJob, step) + if err != nil { + return err + } + } + return nil +} + +func SaveStepInfo(dbContext *config.DbContextType, job *github.WorkflowJob, step *github.TaskStep) error { + _, err := dbContext.InsertStepStmt.Exec(data.GhWorkflowJobStepRec(job, step)) + return err +} + +func CommitJobTransaction(dbContext *config.DbContextType) error { + err := dbContext.Tx.Commit() + return err +} + +func QueryWorkflowRunAttempts(conf config.ConfigType, runId int64) map[int64]struct{} { + result := make(map[int64]struct{}) + + query := fmt.Sprintf("SELECT runAttempt from %s_attempts WHERE runId=$1", conf.DbTable) + rows, err := conf.Db.Query(query, runId) + if err != nil { + return result + } + var attempt int64 + for rows.Next() { + err = rows.Scan(&attempt) + if err != nil { + fmt.Println(err) + } else { + result[attempt] = struct{}{} + } + } + return result +} + +func QueryWorkflowRunsNotInDb(conf config.ConfigType, workflowRuns []gh.WorkflowRunAttemptKey) []gh.WorkflowRunAttemptKey { + result := make([]gh.WorkflowRunAttemptKey, 0) + + if len(workflowRuns) == 0 { + return result + } + // TODO: I have to find out how to use https://jmoiron.github.io/sqlx/#namedParams with sqlx.In() + // For now just generate query with strings.Builder + var valuesStr strings.Builder + for i, v := range workflowRuns { + if i > 0 { + valuesStr.WriteString(", ") + } + valuesStr.WriteString(fmt.Sprintf("(%d :: bigint, %d :: bigint)", v.RunId, v.RunAttempt)) + } + queryStr := fmt.Sprintf("SELECT runid, runattempt FROM (VALUES %s) as q (runid, runattempt) LEFT JOIN %s_attempts db "+ + "USING (runid, runattempt) WHERE db.runid is null", + valuesStr.String(), + conf.DbTable, + ) + rows, err := conf.Db.Queryx(queryStr) + if err != nil { + fmt.Printf("Failed to Query: %s\n", err) + return result + } + var rec gh.WorkflowRunAttemptKey + for rows.Next() { + err = rows.StructScan(&rec) + if err != nil { + fmt.Println(err) + } else { + result = append(result, rec) + } + } + return result +} diff --git a/pkg/export/export.go b/pkg/export/export.go new file mode 100644 index 0000000..6655543 --- /dev/null +++ b/pkg/export/export.go @@ -0,0 +1,35 @@ +package export + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/neondatabase/gh-workflow-stats-action/pkg/config" + "github.com/neondatabase/gh-workflow-stats-action/pkg/db" + "github.com/neondatabase/gh-workflow-stats-action/pkg/gh" +) + +func ExportAndSaveJobs(ctx context.Context, conf config.ConfigType, runAttempt int64) error { + jobsInfo, rate, err := gh.GetWorkflowAttemptJobs(ctx, conf, runAttempt) + if err != nil { + log.Fatal(err) + } + if rate.Remaining < 20 { + fmt.Printf("Close to rate limit, remaining: %d", rate.Remaining) + fmt.Printf("Sleep till %v (%v seconds)\n", rate.Reset, time.Until(rate.Reset.Time)) + time.Sleep(time.Until(rate.Reset.Time)) + } + var dbContext config.DbContextType + db.PrepareJobTransaction(ctx, conf, &dbContext) + for _, jobInfo := range jobsInfo { + err = db.SaveJobInfo(&dbContext, jobInfo) + if err != nil { + fmt.Println(err) + } + } + err = db.CommitJobTransaction(&dbContext) + + return err +} diff --git a/pkg/gh/gh.go b/pkg/gh/gh.go new file mode 100644 index 0000000..4a1f29c --- /dev/null +++ b/pkg/gh/gh.go @@ -0,0 +1,141 @@ +package gh + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/gofri/go-github-ratelimit/github_ratelimit" + "github.com/google/go-github/v65/github" + "github.com/hashicorp/go-retryablehttp" + + "github.com/neondatabase/gh-workflow-stats-action/pkg/config" + "github.com/neondatabase/gh-workflow-stats-action/pkg/data" +) + +func printJobInfo(job *github.WorkflowJob) { + fmt.Printf("== Job %s %s, (created: %v, started: %v, completed: %v)\n", + *job.Name, + *job.Status, + *job.CreatedAt, + job.StartedAt, + job.CompletedAt, + ) + for _, step := range job.Steps { + fmt.Printf("Step %s, started %v, completed %v\n", *step.Name, step.StartedAt, step.CompletedAt) + } +} + +func InitGhClient(conf *config.ConfigType) { + retryClient := retryablehttp.NewClient() + retryClient.RetryMax = 5 + + rl, err := github_ratelimit.NewRateLimitWaiterClient(retryClient.StandardClient().Transport) + if err != nil { + log.Fatal(err) + } + conf.GhClient = github.NewClient(rl).WithAuthToken(conf.GithubToken) +} + +func GetWorkflowStat(ctx context.Context, conf config.ConfigType) (*data.WorkflowRunRec, error) { + fmt.Printf("Getting data for %s/%s, runId %d\n", conf.Owner, conf.Repo, conf.RunID) + workflowRunData, _, err := conf.GhClient.Actions.GetWorkflowRunByID(ctx, conf.Owner, conf.Repo, conf.RunID) + if err != nil { + return nil, err + } + + if workflowRunData == nil { + fmt.Printf("Got nil\n") + return &data.WorkflowRunRec{RepoName: conf.Repository}, nil + } + + return data.GhWorkflowRunRec(workflowRunData), nil +} + +func GetWorkflowAttempt(ctx context.Context, conf config.ConfigType, attempt int64) (*github.WorkflowRun, error) { + workflowRunData, _, err := conf.GhClient.Actions.GetWorkflowRunAttempt( + ctx, + conf.Owner, conf.Repo, + conf.RunID, + int(attempt), + nil, + ) + if err != nil { + return nil, err + } + return workflowRunData, nil +} + +func GetWorkflowAttemptJobs(ctx context.Context, conf config.ConfigType, attempt int64) ([]*github.WorkflowJob, github.Rate, error) { + var result []*github.WorkflowJob + finalRate := github.Rate{} + + opts := &github.ListOptions{PerPage: 100} + for { + jobsData, resp, err := conf.GhClient.Actions.ListWorkflowJobsAttempt( + ctx, + conf.Owner, conf.Repo, + conf.RunID, + attempt, + opts, + ) + if resp != nil { + finalRate = resp.Rate + } + if err != nil { + return nil, finalRate, err + } + result = append(result, jobsData.Jobs...) + if resp.NextPage == 0 { + break + } + + opts.Page = resp.NextPage + } + return result, finalRate, nil +} + +type WorkflowRunAttemptKey struct { + RunId int64 + RunAttempt int64 +} + +func ListWorkflowRuns(ctx context.Context, + conf config.ConfigType, + start time.Time, end time.Time) (map[WorkflowRunAttemptKey]*github.WorkflowRun, github.Rate, error) { + result := make(map[WorkflowRunAttemptKey]*github.WorkflowRun) + finalRate := github.Rate{} + + opts := &github.ListOptions{PerPage: 100} + for { + workflowRuns, resp, err := conf.GhClient.Actions.ListRepositoryWorkflowRuns( + ctx, + conf.Owner, conf.Repo, + &github.ListWorkflowRunsOptions{ + Created: fmt.Sprintf("%s..%s", start.Format(time.RFC3339), end.Format(time.RFC3339)), + Status: "completed", + ListOptions: *opts, + }, + ) + if resp != nil { + finalRate = resp.Rate + } + if err != nil { + return nil, finalRate, err + } + for _, rec := range workflowRuns.WorkflowRuns { + key := WorkflowRunAttemptKey{RunId: rec.GetID(), RunAttempt: int64(rec.GetRunAttempt())} + if v, ok := result[key]; ok { + fmt.Printf("Strange, record is already stored for %v (%+v), updating with %+v\n", key, v, rec) + } + result[key] = rec + } + if resp.NextPage == 0 { + finalRate = resp.Rate + break + } + opts.Page = resp.NextPage + } + return result, finalRate, nil +}