Skip to content

Commit

Permalink
first run at validation improvement (#940)
Browse files Browse the repository at this point in the history
* first run at validation improvement

Signed-off-by: jasmingacic <[email protected]>

* assigning log name to the actual logger

Signed-off-by: jasmingacic <[email protected]>

Signed-off-by: jasmingacic <[email protected]>
  • Loading branch information
jasmingacic authored Nov 1, 2022
1 parent 764a6a2 commit 81431ba
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 13 deletions.
2 changes: 1 addition & 1 deletion cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func main() {
}()

// Validation proxy
proxy := validation.NewServer()
proxy := validation.NewServer(logger)
go func() {
if err := proxy.Start(":17000"); err != nil {
setupLog.Error(err, "Unable to start validation proxy")
Expand Down
30 changes: 18 additions & 12 deletions internal/validation/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
Expand All @@ -17,6 +17,7 @@ import (
pb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
v32 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"github.com/getkin/kin-openapi/openapi3filter"
"github.com/go-logr/logr"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
Expand All @@ -31,14 +32,15 @@ const (
// Server provides OpenAPI Validation and implements ext_proc GRPC service.
type Server struct {
services map[string]*Service

m sync.RWMutex
log logr.Logger
m sync.RWMutex
}

// NewServer() creates new validation Server.
func NewServer() *Server {
func NewServer(log logr.Logger) *Server {
return &Server{
services: map[string]*Service{},
log: log,
}

}
Expand All @@ -47,21 +49,24 @@ func NewServer() *Server {
func (s *Server) Start(port string) error {
lis, err := net.Listen("tcp", port)
if err != nil {
s.log.Error(err, "validation server failed to start at port", port)
return err
}

s.log.Info("validation server listening at", port)
sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(1000)}
srv := grpc.NewServer(sopts...)

pb.RegisterExternalProcessorServer(srv, s)
err = srv.Serve(lis)
if err != nil {
s.log.Error(err, "validation server failed to start")
return err
}
return nil
}

func (s *Server) Process(srv pb.ExternalProcessor_ProcessServer) error {
s.log = s.log.WithName("Request validation:")
header := make(http.Header)
ctx := srv.Context()
for {
Expand Down Expand Up @@ -109,7 +114,7 @@ func (s *Server) Process(srv pb.ExternalProcessor_ProcessServer) error {
case *pb.ProcessingRequest_RequestHeaders:
r := req.Request
h := r.(*pb.ProcessingRequest_RequestHeaders)
log.Printf("Got RequestHeaders.Headers %v", h.RequestHeaders.Headers)
s.log.Info("Got RequestHeaders.Headers", h.RequestHeaders.Headers)
for _, envoyHeader := range h.RequestHeaders.GetHeaders().GetHeaders() {
header.Add(envoyHeader.Key, envoyHeader.Value)
}
Expand All @@ -128,14 +133,15 @@ func (s *Server) Process(srv pb.ExternalProcessor_ProcessServer) error {
}
err = s.validate(req, service, operation)
if err != nil {
errMsg := NewErrorBody()
errMsg.SetErrorBody(err)
errorMsg := NewErrorBody()
errorMsg.SetErrorBody(err)
s.log.Error(fmt.Errorf(errorMsg.Error), "validation failed")

resp = &pb.ProcessingResponse{
Response: &pb.ProcessingResponse_ImmediateResponse{
ImmediateResponse: &pb.ImmediateResponse{
Status: &v32.HttpStatus{Code: v32.StatusCode_BadRequest},
Body: errMsg.Error,
Body: errorMsg.Error,
Headers: &pb.HeaderMutation{
SetHeaders: []*v31.HeaderValueOption{
{
Expand Down Expand Up @@ -197,7 +203,7 @@ func (s *Server) Process(srv pb.ExternalProcessor_ProcessServer) error {
if err != nil {
errorMsg := NewErrorBody()
errorMsg.SetErrorBody(err)

s.log.Error(fmt.Errorf(errorMsg.Error), "validation failed")
resp = &pb.ProcessingResponse{
Response: &pb.ProcessingResponse_ImmediateResponse{
ImmediateResponse: &pb.ImmediateResponse{
Expand Down Expand Up @@ -230,11 +236,11 @@ func (s *Server) Process(srv pb.ExternalProcessor_ProcessServer) error {

}
default:
log.Printf("Unknown Request type %v\n", v)
s.log.Info("Unknown Request type ", v)
}

if err := srv.Send(resp); err != nil {
log.Printf("send error %v", err)
s.log.Error(err, "send error")
}
}
}
Expand Down

0 comments on commit 81431ba

Please sign in to comment.