Skip to content

Commit

Permalink
Merge pull request #220 from fabriziosestito/fix/parallel-audit-requests
Browse files Browse the repository at this point in the history
feat: parallel audit requests
  • Loading branch information
flavio authored Mar 11, 2024
2 parents 62f4a1a + 8d19288 commit bc33f23
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 3 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ require (
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/oauth2 v0.16.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/term v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
47 changes: 44 additions & 3 deletions internal/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"net/url"
"os"
"sync"
"time"

"github.com/kubewarden/audit-scanner/internal/k8s"
Expand All @@ -20,13 +21,16 @@ import (
policiesv1 "github.com/kubewarden/kubewarden-controller/pkg/apis/policies/v1"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"golang.org/x/sync/semaphore"
admissionv1 "k8s.io/api/admission/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
)

const parallelAuditRequests = int64(100)

// Scanner verifies that existing resources don't violate any of the policies
type Scanner struct {
policiesClient *policies.Client
Expand Down Expand Up @@ -105,6 +109,9 @@ func NewScanner(
func (s *Scanner) ScanNamespace(ctx context.Context, nsName string) error {
log.Info().Str("namespace", nsName).Msg("namespace scan started")

semaphore := semaphore.NewWeighted(parallelAuditRequests)
var workers sync.WaitGroup

_, err := s.k8sClient.GetNamespace(ctx, nsName)
if err != nil {
return err
Expand Down Expand Up @@ -132,7 +139,20 @@ func (s *Scanner) ScanNamespace(ctx context.Context, nsName string) error {
if !ok {
return fmt.Errorf("failed to convert runtime.Object to *unstructured.Unstructured")
}
s.auditResource(ctx, policies, *resource)

err := semaphore.Acquire(ctx, 1)
if err != nil {
return err
}
workers.Add(1)
policiesToAudit := policies

go func() {
defer semaphore.Release(1)
defer workers.Done()

s.auditResource(ctx, policiesToAudit, *resource)
}()

return nil
})
Expand All @@ -141,6 +161,9 @@ func (s *Scanner) ScanNamespace(ctx context.Context, nsName string) error {
}
}

workers.Wait()
log.Info().Msg("Namespaced resources scan finished")

return nil
}

Expand All @@ -161,7 +184,9 @@ func (s *Scanner) ScanAllNamespaces(ctx context.Context) error {
err = errors.Join(err, e)
}
}

log.Info().Msg("all-namespaces scan finished")

return err
}

Expand All @@ -172,6 +197,9 @@ func (s *Scanner) ScanAllNamespaces(ctx context.Context) error {
func (s *Scanner) ScanClusterWideResources(ctx context.Context) error {
log.Info().Msg("clusterwide resources scan started")

semaphore := semaphore.NewWeighted(parallelAuditRequests)
var workers sync.WaitGroup

policies, err := s.policiesClient.GetClusterWidePolicies(ctx)
if err != nil {
return err
Expand All @@ -195,7 +223,19 @@ func (s *Scanner) ScanClusterWideResources(ctx context.Context) error {
return fmt.Errorf("failed to convert runtime.Object to *unstructured.Unstructured")
}

s.auditClusterResource(ctx, policies, *resource)
workers.Add(1)
err := semaphore.Acquire(ctx, 1)
if err != nil {
return err
}
policiesToAudit := policies

go func() {
defer semaphore.Release(1)
defer workers.Done()

s.auditClusterResource(ctx, policiesToAudit, *resource)
}()

return nil
})
Expand All @@ -204,7 +244,8 @@ func (s *Scanner) ScanClusterWideResources(ctx context.Context) error {
}
}

log.Info().Msg("clusterwide resources scan finished")
workers.Wait()
log.Info().Msg("Cluster-wide resources scan finished")

return nil
}
Expand Down

0 comments on commit bc33f23

Please sign in to comment.