Skip to content

Commit

Permalink
store expire data
Browse files Browse the repository at this point in the history
  • Loading branch information
Nithunikzz committed Feb 16, 2024
1 parent 9177c42 commit a155f8f
Show file tree
Hide file tree
Showing 26 changed files with 304 additions and 40 deletions.
99 changes: 96 additions & 3 deletions client/pkg/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ package application

import (
"context"
"database/sql"
"log"
"os"
"os/signal"
"syscall"

"github.com/intelops/kubviz/client/pkg/clickhouse"
"github.com/intelops/kubviz/client/pkg/clients"
"github.com/intelops/kubviz/client/pkg/config"
"github.com/intelops/kubviz/client/pkg/storage"
"github.com/intelops/kubviz/pkg/opentelemetry"
"github.com/kelseyhightower/envconfig"
"github.com/robfig/cron/v3"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
)
Expand All @@ -19,20 +25,44 @@ type Application struct {
dbClient clickhouse.DBInterface
}

const (
EventsTable = "events"
RakkessTable = "rakkess"
DeprecatedAPIsTable = "DeprecatedAPIs"
DeletedAPIsTable = "DeletedAPIs"
JfrogContainerPushTable = "jfrogcontainerpush"
GetAllResourcesTable = "getall_resources"
OutdatedImagesTable = "outdated_images"
KubeScoreTable = "kubescore"
TrivyVulTable = "trivy_vul"
TrivyMisconfigTable = "trivy_misconfig"
TrivyImageTable = "trivyimage"
DockerHubBuildTable = "dockerhubbuild"
AzureContainerPushTable = "azurecontainerpush"
QuayContainerPushTable = "quaycontainerpush"
TrivySBOMTable = "trivysbom"
AzureDevOpsTable = "azure_devops"
GitHubTable = "github"
GitLabTable = "gitlab"
BitbucketTable = "bitbucket"
GiteaTable = "gitea"
KuberHealthy = "kuberhealthy"
)

func Start() *Application {
log.Println("Client Application started...")

ctx:=context.Background()
ctx := context.Background()
tracer := otel.Tracer("kubviz-client")
_, span := tracer.Start(opentelemetry.BuildContext(ctx), "Start")
span.SetAttributes(attribute.String("start-app-client", "application"))
defer span.End()

cfg := &config.Config{}
if err := envconfig.Process("", cfg); err != nil {
log.Fatalf("Could not parse env Config: %v", err)
}
dbClient, _, err := clickhouse.NewDBClient(cfg)
dbClient, conn, err := clickhouse.NewDBClient(cfg)
if err != nil {
log.Fatal(err)
}
Expand All @@ -41,6 +71,53 @@ func Start() *Application {
if err != nil {
log.Fatal("Error establishing connection to NATS:", err)
}
// c := cron.New()
// _, err = c.AddFunc("@daily", func() {
// if err := exportDataForTables(conn); err != nil {
// log.Println("Error exporting data:", err)
// }
// })
// if err != nil {
// log.Fatal("Error adding cron job:", err)
// }

// // Listen for interrupt signals to stop the program
// interrupt := make(chan os.Signal, 1)
// signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)

// // Start the cron job scheduler
// c.Start()

// // Wait for an interrupt signal to stop the program
// <-interrupt

// // Stop the cron scheduler gracefully
// c.Stop()
if cfg.AwsEnable {
c := cron.New()
_, err = c.AddFunc("@daily", func() {
if err := exportDataForTables(conn); err != nil {
log.Println("Error exporting data:", err)
}
})
if err != nil {
log.Fatal("Error adding cron job:", err)
}

// Listen for interrupt signals to stop the program
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)

// Start the cron job scheduler
c.Start()

// Wait for an interrupt signal to stop the program
<-interrupt

// Stop the cron scheduler gracefully
c.Stop()
}

return &Application{
Config: cfg,
conn: natsContext,
Expand All @@ -53,3 +130,19 @@ func (app *Application) Close() {
app.conn.Close()
app.dbClient.Close()
}
func exportDataForTables(db *sql.DB) error {
//pvcMountPath := "/mnt/client/kbz"
tables := []string{
EventsTable, RakkessTable, DeprecatedAPIsTable, DeletedAPIsTable, JfrogContainerPushTable, GetAllResourcesTable, OutdatedImagesTable, KubeScoreTable, TrivyVulTable, TrivyMisconfigTable, TrivyImageTable, DockerHubBuildTable, AzureContainerPushTable, QuayContainerPushTable, TrivySBOMTable, AzureDevOpsTable, GitHubTable, GitLabTable, BitbucketTable, KuberHealthy, GiteaTable,
}
for _, tableName := range tables {
err := storage.ExportExpiredData(tableName, db)
if err != nil {
log.Printf("Error exporting data for table %s: %v", tableName, err)
} else {
log.Printf("Export completed successfully for table %s.\n", tableName)
}
}

return nil
}
38 changes: 22 additions & 16 deletions client/pkg/config/config.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
package config

type Config struct {
NatsAddress string `envconfig:"NATS_ADDRESS"`
NatsToken string `envconfig:"NATS_TOKEN"`
DbPort int `envconfig:"DB_PORT"`
DBAddress string `envconfig:"DB_ADDRESS"`
ClickHouseUsername string `envconfig:"CLICKHOUSE_USERNAME"`
ClickHousePassword string `envconfig:"CLICKHOUSE_PASSWORD"`
KetallConsumer string `envconfig:"KETALL_EVENTS_CONSUMER" required:"true"`
RakeesConsumer string `envconfig:"RAKEES_METRICS_CONSUMER" required:"true"`
OutdatedConsumer string `envconfig:"OUTDATED_EVENTS_CONSUMER" required:"true"`
DeprecatedConsumer string `envconfig:"DEPRECATED_API_CONSUMER" required:"true"`
DeletedConsumer string `envconfig:"DELETED_API_CONSUMER" required:"true"`
KubvizConsumer string `envconfig:"KUBVIZ_EVENTS_CONSUMER" required:"true"`
KubscoreConsumer string `envconfig:"KUBSCORE_CONSUMER" required:"true"`
TrivyConsumer string `envconfig:"TRIVY_CONSUMER" required:"true"`
TrivyImageConsumer string `envconfig:"TRIVY_IMAGE_CONSUMER" required:"true"`
TrivySbomConsumer string `envconfig:"TRIVY_SBOM_CONSUMER" required:"true"`
NatsAddress string `envconfig:"NATS_ADDRESS"`
NatsToken string `envconfig:"NATS_TOKEN"`
DbPort int `envconfig:"DB_PORT"`
DBAddress string `envconfig:"DB_ADDRESS"`
ClickHouseUsername string `envconfig:"CLICKHOUSE_USERNAME"`
ClickHousePassword string `envconfig:"CLICKHOUSE_PASSWORD"`
KetallConsumer string `envconfig:"KETALL_EVENTS_CONSUMER" required:"true"`
RakeesConsumer string `envconfig:"RAKEES_METRICS_CONSUMER" required:"true"`
OutdatedConsumer string `envconfig:"OUTDATED_EVENTS_CONSUMER" required:"true"`
DeprecatedConsumer string `envconfig:"DEPRECATED_API_CONSUMER" required:"true"`
DeletedConsumer string `envconfig:"DELETED_API_CONSUMER" required:"true"`
KubvizConsumer string `envconfig:"KUBVIZ_EVENTS_CONSUMER" required:"true"`
KubscoreConsumer string `envconfig:"KUBSCORE_CONSUMER" required:"true"`
TrivyConsumer string `envconfig:"TRIVY_CONSUMER" required:"true"`
TrivyImageConsumer string `envconfig:"TRIVY_IMAGE_CONSUMER" required:"true"`
TrivySbomConsumer string `envconfig:"TRIVY_SBOM_CONSUMER" required:"true"`
KuberhealthyConsumer string `envconfig:"KUBERHEALTHY_CONSUMER" required:"true"`
AwsEnable bool `envconfig:"AWS_ENABLE" default:"false"`
AWSRegion string `envconfig:"AWS_REGION"`
AWSAccessKey string `envconfig:"AWS_ACCESS_KEY"`
AWSSecretKey string `envconfig:"AWS_SECRET_KEY"`
S3BucketName string `envconfig:"S3_BUCKET_NAME"`
S3ObjectKey string `envconfig:"S3_OBJECT_KEY"`
}
125 changes: 125 additions & 0 deletions client/pkg/storage/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package storage

import (
"database/sql"
"fmt"
"log"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/intelops/kubviz/client/pkg/config"
"github.com/kelseyhightower/envconfig"
)

func ExportExpiredData(tableName string, db *sql.DB) error {
columns, err := getTableColumns(db, tableName)
if err != nil {
return fmt.Errorf("error getting columns for table %s: %v", tableName, err)
}

// Construct SQL query
query := fmt.Sprintf("SELECT * FROM %s WHERE ExportedAt IS NULL", tableName)

// Query expired data
rows, err := db.Query(query)
if err != nil {
return fmt.Errorf("error querying ClickHouse: %v", err)
}
defer rows.Close()

// Construct CSV data in memory
var csvData strings.Builder
csvData.WriteString(columns + "\n") // Write CSV header

for rows.Next() {
// Assuming a dynamic structure, scan the columns into a slice of interface{}
columnValues := make([]interface{}, len(strings.Split(columns, ",")))
for i := range columnValues {
columnValues[i] = new(interface{})
}

err := rows.Scan(columnValues...)
if err != nil {
return fmt.Errorf("error scanning ClickHouse row: %v", err)
}

// Write the values to the CSV data
var rowData []string
for _, value := range columnValues {
// Dereference the pointer to get the interface{} value, then format it as a string
rowData = append(rowData, fmt.Sprintf("%v", *value.(*interface{})))
}
csvData.WriteString(strings.Join(rowData, ",") + "\n")
}

// Upload the CSV data to S3
err = uploadToS3(&csvData, fmt.Sprintf("exported_data_%s.csv", tableName))
if err != nil {
return fmt.Errorf("error uploading CSV to S3: %v", err)
}

// Update ExportedAt column with the current timestamp for exported rows
updateQuery := fmt.Sprintf("ALTER TABLE %s UPDATE ExportedAt = now() WHERE ExportedAt IS NULL", tableName)
_, err = db.Exec(updateQuery)
if err != nil {
return fmt.Errorf("error updating ExportedAt column: %v", err)
}

return nil
}

func getTableColumns(db *sql.DB, tableName string) (string, error) {
// Query to get column names
query := fmt.Sprintf("DESCRIBE TABLE %s", tableName)
rows, err := db.Query(query)
if err != nil {
return "", err
}
defer rows.Close()

// Get column names
var columns []string
for rows.Next() {
var columnName string
rows.Scan(&columnName)
columns = append(columns, columnName)
}

return strings.Join(columns, ","), nil
}

func uploadToS3(csvData *strings.Builder, s3ObjectKey string) error {
cfg := &config.Config{}
if err := envconfig.Process("", cfg); err != nil {
log.Fatalf("Could not parse env Config: %v", err)
}

// Set up AWS S3 session
sess, err := session.NewSession(&aws.Config{
Region: aws.String(cfg.AWSRegion),
Credentials: credentials.NewStaticCredentials(cfg.AWSAccessKey, cfg.AWSSecretKey, ""),
})
if err != nil {
return fmt.Errorf("error creating S3 session: %v", err)
}

// Create an S3 service client
s3Client := s3.New(sess)

// Upload the CSV data to S3
_, err = s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(cfg.S3BucketName),
Key: aws.String((s3ObjectKey)),
Body: strings.NewReader(csvData.String()),
})
if err != nil {
return fmt.Errorf("error uploading data to S3: %v", err)
}

fmt.Printf("Data uploaded to S3: %s\n", s3ObjectKey)

return nil
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/99designs/gqlgen v0.17.42
github.com/ClickHouse/clickhouse-go/v2 v2.10.1
github.com/aquasecurity/trivy v0.43.1
github.com/aws/aws-sdk-go v1.44.245
github.com/blang/semver v3.5.1+incompatible
github.com/corneliusweig/tabwriter v0.0.0-20190512204542-5f8a091e83b5
github.com/davecgh/go-spew v1.1.1
Expand Down Expand Up @@ -97,6 +98,7 @@ require (
github.com/imdario/mergo v0.3.15 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/invopop/yaml v0.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.5 // indirect
Expand Down
Loading

0 comments on commit a155f8f

Please sign in to comment.