Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store the exiting data #318

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 95 additions & 3 deletions client/pkg/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ package application

import (
"context"
"database/sql"
"log"
"os"
"os/signal"
"syscall"

"github.com/intelops/kubviz/client/pkg/clickhouse"
"github.com/intelops/kubviz/client/pkg/clients"
"github.com/intelops/kubviz/client/pkg/config"
"github.com/intelops/kubviz/client/pkg/storage"
"github.com/intelops/kubviz/pkg/opentelemetry"
"github.com/kelseyhightower/envconfig"
"github.com/robfig/cron/v3"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
)
Expand All @@ -19,20 +25,43 @@ type Application struct {
dbClient clickhouse.DBInterface
}

const (
EventsTable = "events"
RakkessTable = "rakkess"
DeprecatedAPIsTable = "DeprecatedAPIs"
DeletedAPIsTable = "DeletedAPIs"
JfrogContainerPushTable = "jfrogcontainerpush"
GetAllResourcesTable = "getall_resources"
OutdatedImagesTable = "outdated_images"
KubeScoreTable = "kubescore"
TrivyVulTable = "trivy_vul"
TrivyMisconfigTable = "trivy_misconfig"
TrivyImageTable = "trivyimage"
DockerHubBuildTable = "dockerhubbuild"
AzureContainerPushTable = "azurecontainerpush"
QuayContainerPushTable = "quaycontainerpush"
TrivySBOMTable = "trivysbom"
AzureDevOpsTable = "azure_devops"
GitHubTable = "github"
GitLabTable = "gitlab"
BitbucketTable = "bitbucket"
GiteaTable = "gitea"
)

func Start() *Application {
log.Println("Client Application started...")

ctx:=context.Background()
ctx := context.Background()
tracer := otel.Tracer("kubviz-client")
_, span := tracer.Start(opentelemetry.BuildContext(ctx), "Start")
span.SetAttributes(attribute.String("start-app-client", "application"))
defer span.End()

cfg := &config.Config{}
if err := envconfig.Process("", cfg); err != nil {
log.Fatalf("Could not parse env Config: %v", err)
}
dbClient, _, err := clickhouse.NewDBClient(cfg)
dbClient, conn, err := clickhouse.NewDBClient(cfg)
if err != nil {
log.Fatal(err)
}
Expand All @@ -41,6 +70,53 @@ func Start() *Application {
if err != nil {
log.Fatal("Error establishing connection to NATS:", err)
}
// c := cron.New()
// _, err = c.AddFunc("@daily", func() {
// if err := exportDataForTables(conn); err != nil {
// log.Println("Error exporting data:", err)
// }
// })
// if err != nil {
// log.Fatal("Error adding cron job:", err)
// }

// // Listen for interrupt signals to stop the program
// interrupt := make(chan os.Signal, 1)
// signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)

// // Start the cron job scheduler
// c.Start()

// // Wait for an interrupt signal to stop the program
// <-interrupt

// // Stop the cron scheduler gracefully
// c.Stop()
if cfg.AwsEnable {
c := cron.New()
_, err = c.AddFunc("@daily", func() {
if err := exportDataForTables(conn); err != nil {
log.Println("Error exporting data:", err)
}
})
if err != nil {
log.Fatal("Error adding cron job:", err)
}

// Listen for interrupt signals to stop the program
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)

// Start the cron job scheduler
c.Start()

// Wait for an interrupt signal to stop the program
<-interrupt

// Stop the cron scheduler gracefully
c.Stop()
}

return &Application{
Config: cfg,
conn: natsContext,
Expand All @@ -53,3 +129,19 @@ func (app *Application) Close() {
app.conn.Close()
app.dbClient.Close()
}
func exportDataForTables(db *sql.DB) error {
//pvcMountPath := "/mnt/client/kbz"
tables := []string{
EventsTable, RakkessTable, DeprecatedAPIsTable, DeletedAPIsTable, JfrogContainerPushTable, GetAllResourcesTable, OutdatedImagesTable, KubeScoreTable, TrivyVulTable, TrivyMisconfigTable, TrivyImageTable, DockerHubBuildTable, AzureContainerPushTable, QuayContainerPushTable, TrivySBOMTable, AzureDevOpsTable, GitHubTable, GitLabTable, BitbucketTable, GiteaTable,
}
for _, tableName := range tables {
err := storage.ExportExpiredData(tableName, db)
if err != nil {
log.Printf("Error exporting data for table %s: %v", tableName, err)
} else {
log.Printf("Export completed successfully for table %s.\n", tableName)
}
}

return nil
}
34 changes: 17 additions & 17 deletions client/pkg/clickhouse/db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func NewDBClient(conf *config.Config) (DBInterface, *sql.DB, error) {

func (c *DBClient) InsertContainerEventAzure(pushEvent model.AzureContainerPushEventPayload) {

ctx:=context.Background()
ctx := context.Background()
tracer := otel.Tracer("insert-container-azure")
_, span := tracer.Start(opentelemetry.BuildContext(ctx), "InsertContainerEventAzure")
span.SetAttributes(attribute.String("container-azure-client", "insert"))
Expand Down Expand Up @@ -193,7 +193,7 @@ func (c *DBClient) InsertContainerEventAzure(pushEvent model.AzureContainerPushE

func (c *DBClient) InsertContainerEventQuay(pushEvent model.QuayImagePushPayload) {

ctx:=context.Background()
ctx := context.Background()
tracer := otel.Tracer("insert-container-quay")
_, span := tracer.Start(opentelemetry.BuildContext(ctx), "InsertContainerEventQuay")
span.SetAttributes(attribute.String("container-quay-client", "insert"))
Expand Down Expand Up @@ -251,7 +251,7 @@ func (c *DBClient) InsertContainerEventQuay(pushEvent model.QuayImagePushPayload

func (c *DBClient) InsertContainerEventJfrog(pushEvent model.JfrogContainerPushEventPayload) {

ctx:=context.Background()
ctx := context.Background()
tracer := otel.Tracer("insert-container-jfrog")
_, span := tracer.Start(opentelemetry.BuildContext(ctx), "InsertContainerEventJfrog")
span.SetAttributes(attribute.String("container-jfrog-client", "insert"))
Expand Down Expand Up @@ -309,7 +309,7 @@ func (c *DBClient) InsertContainerEventJfrog(pushEvent model.JfrogContainerPushE

func (c *DBClient) InsertRakeesMetrics(metrics model.RakeesMetrics) {

ctx:=context.Background()
ctx := context.Background()
tracer := otel.Tracer("insert-rakees-metrics")
_, span := tracer.Start(opentelemetry.BuildContext(ctx), "InsertRakeesMetrics")
span.SetAttributes(attribute.String("rakees-client", "insert"))
Expand Down Expand Up @@ -346,7 +346,7 @@ func (c *DBClient) InsertRakeesMetrics(metrics model.RakeesMetrics) {

func (c *DBClient) InsertKetallEvent(metrics model.Resource) {

ctx:=context.Background()
ctx := context.Background()
tracer := otel.Tracer("insert-ketall-event")
_, span := tracer.Start(opentelemetry.BuildContext(ctx), "InsertKetallEvent")
span.SetAttributes(attribute.String("ketall-client", "insert"))
Expand Down Expand Up @@ -382,7 +382,7 @@ func (c *DBClient) InsertKetallEvent(metrics model.Resource) {

func (c *DBClient) InsertOutdatedEvent(metrics model.CheckResultfinal) {

ctx:=context.Background()
ctx := context.Background()
tracer := otel.Tracer("insert-outdated-event")
_, span := tracer.Start(opentelemetry.BuildContext(ctx), "InsertOutdatedEvent")
span.SetAttributes(attribute.String("outdated-client", "insert"))
Expand Down Expand Up @@ -420,7 +420,7 @@ func (c *DBClient) InsertOutdatedEvent(metrics model.CheckResultfinal) {

func (c *DBClient) InsertDeprecatedAPI(deprecatedAPI model.DeprecatedAPI) {

ctx:=context.Background()
ctx := context.Background()
tracer := otel.Tracer("insert-depricated-event")
_, span := tracer.Start(opentelemetry.BuildContext(ctx), "InsertDeprecatedAPI")
span.SetAttributes(attribute.String("depricated-client", "insert"))
Expand Down Expand Up @@ -465,7 +465,7 @@ func (c *DBClient) InsertDeprecatedAPI(deprecatedAPI model.DeprecatedAPI) {

func (c *DBClient) InsertDeletedAPI(deletedAPI model.DeletedAPI) {

ctx:=context.Background()
ctx := context.Background()
tracer := otel.Tracer("insert-deletedapi")
_, span := tracer.Start(opentelemetry.BuildContext(ctx), "InsertDeletedAPI")
span.SetAttributes(attribute.String("deletedapi-client", "insert"))
Expand Down Expand Up @@ -511,7 +511,7 @@ func (c *DBClient) InsertDeletedAPI(deletedAPI model.DeletedAPI) {

func (c *DBClient) InsertKubvizEvent(metrics model.Metrics) {

ctx:=context.Background()
ctx := context.Background()
tracer := otel.Tracer("insert-kubviz-event")
_, span := tracer.Start(opentelemetry.BuildContext(ctx), "InsertKubvizEvent")
span.SetAttributes(attribute.String("kubvizevent-client", "insert"))
Expand Down Expand Up @@ -791,14 +791,14 @@ func (c *DBClient) InsertTrivySbomMetrics(metrics model.SbomData) {
span.SetAttributes(attribute.String("trivy-sbom-client", "insert"))
defer span.End()

tx, err := c.conn.Begin()
if err != nil {
log.Fatalf("error beginning transaction, clickhouse connection not available: %v", err)
}
stmt, err := tx.Prepare(InsertTrivySbom)
if err != nil {
log.Fatalf("error preparing statement: %v", err)
}
tx, err := c.conn.Begin()
if err != nil {
log.Fatalf("error beginning transaction, clickhouse connection not available: %v", err)
}
stmt, err := tx.Prepare(InsertTrivySbom)
if err != nil {
log.Fatalf("error preparing statement: %v", err)
}

if _, err := stmt.Exec(
metrics.ID,
Expand Down
6 changes: 6 additions & 0 deletions client/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,10 @@ type Config struct {
TrivyConsumer string `envconfig:"TRIVY_CONSUMER" required:"true"`
TrivyImageConsumer string `envconfig:"TRIVY_IMAGE_CONSUMER" required:"true"`
TrivySbomConsumer string `envconfig:"TRIVY_SBOM_CONSUMER" required:"true"`
AwsEnable bool `envconfig:"AWS_ENABLE" default:"false"`
AWSRegion string `envconfig:"AWS_REGION"`
AWSAccessKey string `envconfig:"AWS_ACCESS_KEY"`
AWSSecretKey string `envconfig:"AWS_SECRET_KEY"`
S3BucketName string `envconfig:"S3_BUCKET_NAME"`
S3ObjectKey string `envconfig:"S3_OBJECT_KEY"`
}
125 changes: 125 additions & 0 deletions client/pkg/storage/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package storage

import (
"database/sql"
"fmt"
"log"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/intelops/kubviz/client/pkg/config"
"github.com/kelseyhightower/envconfig"
)

func ExportExpiredData(tableName string, db *sql.DB) error {
columns, err := getTableColumns(db, tableName)
if err != nil {
return fmt.Errorf("error getting columns for table %s: %v", tableName, err)
}

// Construct SQL query
query := fmt.Sprintf("SELECT * FROM %s WHERE ExportedAt IS NULL", tableName)

// Query expired data
rows, err := db.Query(query)
if err != nil {
return fmt.Errorf("error querying ClickHouse: %v", err)
}
defer rows.Close()

// Construct CSV data in memory
var csvData strings.Builder
csvData.WriteString(columns + "\n") // Write CSV header

for rows.Next() {
// Assuming a dynamic structure, scan the columns into a slice of interface{}
columnValues := make([]interface{}, len(strings.Split(columns, ",")))
for i := range columnValues {
columnValues[i] = new(interface{})
}

err := rows.Scan(columnValues...)
if err != nil {
return fmt.Errorf("error scanning ClickHouse row: %v", err)
}

// Write the values to the CSV data
var rowData []string
for _, value := range columnValues {
// Dereference the pointer to get the interface{} value, then format it as a string
rowData = append(rowData, fmt.Sprintf("%v", *value.(*interface{})))
}
csvData.WriteString(strings.Join(rowData, ",") + "\n")
}

// Upload the CSV data to S3
err = uploadToS3(&csvData, fmt.Sprintf("exported_data_%s.csv", tableName))
if err != nil {
return fmt.Errorf("error uploading CSV to S3: %v", err)
}

// Update ExportedAt column with the current timestamp for exported rows
updateQuery := fmt.Sprintf("ALTER TABLE %s UPDATE ExportedAt = now() WHERE ExportedAt IS NULL", tableName)
_, err = db.Exec(updateQuery)
if err != nil {
return fmt.Errorf("error updating ExportedAt column: %v", err)
}

return nil
}

func getTableColumns(db *sql.DB, tableName string) (string, error) {
// Query to get column names
query := fmt.Sprintf("DESCRIBE TABLE %s", tableName)
rows, err := db.Query(query)
if err != nil {
return "", err
}
defer rows.Close()

// Get column names
var columns []string
for rows.Next() {
var columnName string
rows.Scan(&columnName)
columns = append(columns, columnName)
}

return strings.Join(columns, ","), nil
}

func uploadToS3(csvData *strings.Builder, s3ObjectKey string) error {
cfg := &config.Config{}
if err := envconfig.Process("", cfg); err != nil {
log.Fatalf("Could not parse env Config: %v", err)
}

// Set up AWS S3 session
sess, err := session.NewSession(&aws.Config{
Region: aws.String(cfg.AWSRegion),
Credentials: credentials.NewStaticCredentials(cfg.AWSAccessKey, cfg.AWSSecretKey, ""),
})
if err != nil {
return fmt.Errorf("error creating S3 session: %v", err)
}

// Create an S3 service client
s3Client := s3.New(sess)

// Upload the CSV data to S3
_, err = s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(cfg.S3BucketName),
Key: aws.String((s3ObjectKey)),
Body: strings.NewReader(csvData.String()),
})
if err != nil {
return fmt.Errorf("error uploading data to S3: %v", err)
}

fmt.Printf("Data uploaded to S3: %s\n", s3ObjectKey)

return nil
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/99designs/gqlgen v0.17.42
github.com/ClickHouse/clickhouse-go/v2 v2.10.1
github.com/aquasecurity/trivy v0.43.1
github.com/aws/aws-sdk-go v1.44.245
github.com/blang/semver v3.5.1+incompatible
github.com/corneliusweig/tabwriter v0.0.0-20190512204542-5f8a091e83b5
github.com/davecgh/go-spew v1.1.1
Expand Down Expand Up @@ -96,6 +97,7 @@ require (
github.com/imdario/mergo v0.3.15 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/invopop/yaml v0.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.5 // indirect
Expand Down
Loading
Loading