From 1095ed3017081aac6e3799ebb5e1f4c49a8c43e8 Mon Sep 17 00:00:00 2001 From: Anurag Rajawat Date: Sun, 15 Sep 2024 20:13:47 +0530 Subject: [PATCH] refactor: Refactor for better maintainability Signed-off-by: Anurag Rajawat --- sentryflow/Dockerfile | 5 +- sentryflow/cmd/root.go | 53 ++ sentryflow/cmd/version.go | 20 + sentryflow/config/default.yaml | 22 + sentryflow/go.mod | 29 +- sentryflow/go.sum | 56 +- sentryflow/main.go | 9 + sentryflow/pkg/collector/collectorHandler.go | 106 ---- sentryflow/pkg/collector/envoy.go | 227 -------- sentryflow/pkg/collector/interface.go | 16 - sentryflow/pkg/collector/opentelemetry.go | 145 ----- sentryflow/pkg/collector/wasm.go | 54 -- sentryflow/pkg/config/config.go | 187 ++----- sentryflow/pkg/core/sentryflow.go | 234 +++----- sentryflow/pkg/exporter/apievent.go | 76 +++ sentryflow/pkg/exporter/exportAPILogs.go | 132 ----- sentryflow/pkg/exporter/exporterHandler.go | 259 --------- sentryflow/pkg/exporter/grpc.go | 42 ++ sentryflow/pkg/exporter/http.go | 78 +++ sentryflow/pkg/k8s/client.go | 39 ++ .../source/svcmesh/istio/sidecar/sidecar.go | 525 ++++++++++++++++++ sentryflow/pkg/types/types.go | 36 -- sentryflow/pkg/util/util.go | 21 + 23 files changed, 1071 insertions(+), 1300 deletions(-) create mode 100644 sentryflow/cmd/root.go create mode 100644 sentryflow/cmd/version.go create mode 100644 sentryflow/config/default.yaml create mode 100644 sentryflow/main.go delete mode 100644 sentryflow/pkg/collector/collectorHandler.go delete mode 100644 sentryflow/pkg/collector/envoy.go delete mode 100644 sentryflow/pkg/collector/interface.go delete mode 100644 sentryflow/pkg/collector/opentelemetry.go delete mode 100644 sentryflow/pkg/collector/wasm.go create mode 100644 sentryflow/pkg/exporter/apievent.go delete mode 100644 sentryflow/pkg/exporter/exportAPILogs.go delete mode 100644 sentryflow/pkg/exporter/exporterHandler.go create mode 100644 sentryflow/pkg/exporter/grpc.go create mode 100644 sentryflow/pkg/exporter/http.go create mode 100644 sentryflow/pkg/k8s/client.go create mode 100644 sentryflow/pkg/source/svcmesh/istio/sidecar/sidecar.go delete mode 100644 sentryflow/pkg/types/types.go create mode 100644 sentryflow/pkg/util/util.go diff --git a/sentryflow/Dockerfile b/sentryflow/Dockerfile index 57baf57..feb5802 100644 --- a/sentryflow/Dockerfile +++ b/sentryflow/Dockerfile @@ -8,8 +8,9 @@ COPY sentryflow /sentryflow RUN make -C /sentryflow build -FROM gcr.io/distroless/static-debian12 +#FROM gcr.io/distroless/static-debian12 +FROM redhat/ubi9-minimal COPY --from=builder /sentryflow/bin/sentryflow / -CMD ["/sentryflow"] +ENTRYPOINT ["/sentryflow"] diff --git a/sentryflow/cmd/root.go b/sentryflow/cmd/root.go new file mode 100644 index 0000000..767534d --- /dev/null +++ b/sentryflow/cmd/root.go @@ -0,0 +1,53 @@ +package cmd + +import ( + "context" + + "github.com/spf13/cobra" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/5GSEC/SentryFlow/pkg/core" + "github.com/5GSEC/SentryFlow/pkg/util" +) + +var ( + configFilePath string + kubeConfig string + development bool + logger *zap.SugaredLogger +) + +func init() { + RootCmd.PersistentFlags().StringVar(&configFilePath, "config", "", "config file path") + RootCmd.PersistentFlags().StringVar(&kubeConfig, "kubeconfig", "", "kubeconfig file path") + RootCmd.PersistentFlags().BoolVar(&development, "development", true, "run in development mode") + initLogger(development) +} + +func initLogger(development bool) { + cfg := zap.NewProductionConfig() + cfg.EncoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder + if development { + cfg = zap.NewDevelopmentConfig() + cfg.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder + } + cfg.EncoderConfig.TimeKey = "timestamp" + cfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder + coreLogger, _ := cfg.Build() + logger = coreLogger.Sugar() +} + +var RootCmd = &cobra.Command{ + Use: "sentryflow", + Run: func(cmd *cobra.Command, args []string) { + run() + }, +} + +func run() { + logBuildInfo() + ctx := context.WithValue(ctrl.SetupSignalHandler(), util.LoggerCtxKey, logger) + core.Run(ctx, configFilePath, kubeConfig) +} diff --git a/sentryflow/cmd/version.go b/sentryflow/cmd/version.go new file mode 100644 index 0000000..5a78250 --- /dev/null +++ b/sentryflow/cmd/version.go @@ -0,0 +1,20 @@ +package cmd + +import ( + "runtime" + "runtime/debug" +) + +func logBuildInfo() { + info, _ := debug.ReadBuildInfo() + vcsRev := "" + vcsTime := "" + for _, s := range info.Settings { + if s.Key == "vcs.revision" { + vcsRev = s.Value + } else if s.Key == "vcs.time" { + vcsTime = s.Value + } + } + logger.Infof("Git commit: %s, build time: %s, build version: %s, go os/arch: %s/%s\n", vcsRev, vcsTime, info.Main.Version, runtime.GOOS, runtime.GOARCH) +} diff --git a/sentryflow/config/default.yaml b/sentryflow/config/default.yaml new file mode 100644 index 0000000..f238a8d --- /dev/null +++ b/sentryflow/config/default.yaml @@ -0,0 +1,22 @@ +sources: + serviceMeshes: + - name: istio-sidecar + enable: true + others: + - name: "optional" + # Either gRPC or HTTP not both + grpc: + url: localhost + port: 1234 + http: + url: localhost + port: 4321 + +exporter: + grpc: + port: 8080 + +debug: + enable: false + pprof: + port: 6060 diff --git a/sentryflow/go.mod b/sentryflow/go.mod index e925b30..8692545 100644 --- a/sentryflow/go.mod +++ b/sentryflow/go.mod @@ -4,22 +4,25 @@ go 1.23 require ( github.com/5GSEC/SentryFlow/protobuf v0.0.0-00010101000000-000000000000 - github.com/envoyproxy/go-control-plane v0.13.0 + github.com/golang/protobuf v1.5.4 + github.com/spf13/cobra v1.8.1 github.com/spf13/viper v1.19.0 - go.opentelemetry.io/proto/otlp v1.3.1 + go.uber.org/zap v1.26.0 google.golang.org/grpc v1.66.2 google.golang.org/protobuf v1.34.2 - gopkg.in/yaml.v2 v2.4.0 - k8s.io/api v0.31.1 + istio.io/api v1.23.1 + istio.io/client-go v1.23.1 k8s.io/apimachinery v0.31.1 k8s.io/client-go v0.31.1 + sigs.k8s.io/controller-runtime v0.19.0 ) require ( - github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.12.1 // indirect - github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect + github.com/evanphx/json-patch/v5 v5.9.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-logr/logr v1.4.2 // indirect @@ -27,13 +30,14 @@ require ( github.com/go-openapi/jsonreference v0.21.0 // indirect github.com/go-openapi/swag v0.23.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/protobuf v1.5.4 // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect + github.com/imdario/mergo v0.3.6 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/magiconair/properties v1.8.7 // indirect @@ -43,8 +47,11 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect - github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect github.com/sagikazarmark/locafero v0.6.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect @@ -61,11 +68,15 @@ require ( golang.org/x/term v0.24.0 // indirect golang.org/x/text v0.18.0 // indirect golang.org/x/time v0.6.0 // indirect + gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/api v0.31.1 // indirect + k8s.io/apiextensions-apiserver v0.31.0 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20240903163716-9e1beecbcb38 // indirect k8s.io/utils v0.0.0-20240902221715-702e33fdd3c3 // indirect diff --git a/sentryflow/go.sum b/sentryflow/go.sum index 711ca29..866e9aa 100644 --- a/sentryflow/go.sum +++ b/sentryflow/go.sum @@ -1,15 +1,18 @@ -github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78 h1:QVw89YDxXxEe+l8gU8ETbOasdwEV+avkR75ZzsVV9WI= -github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/emicklei/go-restful/v3 v3.12.1 h1:PJMDIM/ak7btuL8Ex0iYET9hxM3CI2sjZtzpL63nKAU= github.com/emicklei/go-restful/v3 v3.12.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= -github.com/envoyproxy/go-control-plane v0.13.0 h1:HzkeUz1Knt+3bK+8LG1bxOO/jzWZmdxpwC51i202les= -github.com/envoyproxy/go-control-plane v0.13.0/go.mod h1:GRaKG3dwvFoTg4nj7aXdZnvMg4d7nvT/wl9WgVXn3Q8= -github.com/envoyproxy/protoc-gen-validate v1.1.0 h1:tntQDh69XqOCOZsDz0lVJQez/2L6Uu2PdjCQwWCJ3bM= -github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4= +github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U= +github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= +github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0/FOJfg= +github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= @@ -18,6 +21,8 @@ github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/zapr v1.3.0 h1:XGdV8XW8zdwFiwOA2Dryh1gj2KRQyOOoNmBy4EplIcQ= +github.com/go-logr/zapr v1.3.0/go.mod h1:YKepepNBd1u/oyhd/yQmtjVXmm9uML4IXUgMOwR8/Gg= github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ= github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY= github.com/go-openapi/jsonreference v0.21.0 h1:Rs+Y7hSXT83Jacb7kFyjn4ijOuVGSvOdF2+tg1TRrwQ= @@ -28,6 +33,8 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= @@ -42,10 +49,12 @@ github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 h1:FKHo8hFI3A+7w0aUQu github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= +github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -75,15 +84,22 @@ github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= -github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo= -github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= +github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= +github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sagikazarmark/locafero v0.6.0 h1:ON7AQg37yzcRPU69mt7gwhFEBwxI6P9T4Qu3N51bwOk= github.com/sagikazarmark/locafero v0.6.0/go.mod h1:77OmuIc6VTraTXKXIs/uvUxKGUXjE1GbemJYHqdNjX0= github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= @@ -94,6 +110,8 @@ github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY= github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w= github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= +github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= +github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI= @@ -108,10 +126,12 @@ github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= -go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -153,6 +173,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gomodules.xyz/jsonpatch/v2 v2.4.0 h1:Ci3iUJyx9UeRx7CeFN8ARgGbkESwJK+KB9lLcWxY/Zw= +gomodules.xyz/jsonpatch/v2 v2.4.0/go.mod h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY= google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 h1:hjSy6tcFQZ171igDaN5QHOw2n6vx40juYbC/x67CEhc= google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:qpvKtACPCQhAdu3PyQgV4l3LMXZEtft7y8QcarRsp9I= google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= @@ -164,6 +186,8 @@ google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWn gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/evanphx/json-patch.v4 v4.12.0 h1:n6jtcsulIzXPJaxegRbvFNNrZDjbij7ny3gmSPG+6V4= +gopkg.in/evanphx/json-patch.v4 v4.12.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= @@ -173,8 +197,14 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +istio.io/api v1.23.1 h1:bm2XF0j058FfzWVHUfpmMj4sFDkcD1X609qs5AU97Pc= +istio.io/api v1.23.1/go.mod h1:QPSTGXuIQdnZFEm3myf9NZ5uBMwCdJWUvfj9ZZ+2oBM= +istio.io/client-go v1.23.1 h1:IX2cgUUXnVYo+9H6bFGSp/vuKVLPUkmiN8qk1/mvsYs= +istio.io/client-go v1.23.1/go.mod h1:+fxu+O2GkITM3HEREUWdobvRXqI/UhAAI7hfxqqpRh0= k8s.io/api v0.31.1 h1:Xe1hX/fPW3PXYYv8BlozYqw63ytA92snr96zMW9gWTU= k8s.io/api v0.31.1/go.mod h1:sbN1g6eY6XVLeqNsZGLnI5FwVseTrZX7Fv3O26rhAaI= +k8s.io/apiextensions-apiserver v0.31.0 h1:fZgCVhGwsclj3qCw1buVXCV6khjRzKC5eCFt24kyLSk= +k8s.io/apiextensions-apiserver v0.31.0/go.mod h1:b9aMDEYaEe5sdK+1T0KU78ApR/5ZVp4i56VacZYEHxk= k8s.io/apimachinery v0.31.1 h1:mhcUBbj7KUjaVhyXILglcVjuS4nYXiwC+KKFBgIVy7U= k8s.io/apimachinery v0.31.1/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo= k8s.io/client-go v0.31.1 h1:f0ugtWSbWpxHR7sjVpQwuvw9a3ZKLXX0u0itkFXufb0= @@ -185,6 +215,8 @@ k8s.io/kube-openapi v0.0.0-20240903163716-9e1beecbcb38 h1:1dWzkmJrrprYvjGwh9kEUx k8s.io/kube-openapi v0.0.0-20240903163716-9e1beecbcb38/go.mod h1:coRQXBK9NxO98XUv3ZD6AK3xzHCxV6+b7lrquKwaKzA= k8s.io/utils v0.0.0-20240902221715-702e33fdd3c3 h1:b2FmK8YH+QEwq/Sy2uAEhmqL5nPfGYbJOcaqjeYYZoA= k8s.io/utils v0.0.0-20240902221715-702e33fdd3c3/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +sigs.k8s.io/controller-runtime v0.19.0 h1:nWVM7aq+Il2ABxwiCizrVDSlmDcshi9llbaFbC0ji/Q= +sigs.k8s.io/controller-runtime v0.19.0/go.mod h1:iRmWllt8IlaLjvTTDLhRBXIEtkCK6hwVBJJsYS9Ajf4= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= diff --git a/sentryflow/main.go b/sentryflow/main.go new file mode 100644 index 0000000..6163445 --- /dev/null +++ b/sentryflow/main.go @@ -0,0 +1,9 @@ +package main + +import ( + "github.com/5GSEC/SentryFlow/cmd" +) + +func main() { + _ = cmd.RootCmd.Execute() +} diff --git a/sentryflow/pkg/collector/collectorHandler.go b/sentryflow/pkg/collector/collectorHandler.go deleted file mode 100644 index 82fc20e..0000000 --- a/sentryflow/pkg/collector/collectorHandler.go +++ /dev/null @@ -1,106 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package collector - -import ( - "fmt" - "log" - "net" - "net/http" - - "google.golang.org/grpc" - - "github.com/5GSEC/SentryFlow/config" -) - -// == // - -// ColH global reference for Collector Handler -var ColH *ColHandler - -// init Function -func init() { - ColH = NewCollectorHandler() -} - -// ColHandler Structure -type ColHandler struct { - colService net.Listener - grpcServer *grpc.Server - collectors []collectorInterface -} - -// NewCollectorHandler Function -func NewCollectorHandler() *ColHandler { - ch := &ColHandler{ - collectors: make([]collectorInterface, 0), - } - return ch -} - -// == // - -// StartCollector Function -func StartCollector() bool { - // Make a string with the given collector address and port - collectorService := fmt.Sprintf("%s:%s", config.GlobalConfig.CollectorAddr, config.GlobalConfig.CollectorPort) - - // Start listening gRPC port - colService, err := net.Listen("tcp", collectorService) - if err != nil { - log.Printf("[Collector] Failed to listen at %s: %v", collectorService, err) - return false - } - ColH.colService = colService - - log.Printf("[Collector] Listening Collector gRPC services (%s)", collectorService) - - // Create gRPC Service - gRPCServer := grpc.NewServer() - ColH.grpcServer = gRPCServer - - // initialize OpenTelemetry collector - ColH.collectors = append(ColH.collectors, newOpenTelemetryLogsServer()) - - // initialize Envoy collectors for AccessLogs and Metrics - ColH.collectors = append(ColH.collectors, newEnvoyAccessLogsServer()) - ColH.collectors = append(ColH.collectors, newEnvoyMetricsServer()) - - // register services - for _, col := range ColH.collectors { - col.registerService(ColH.grpcServer) - } - - log.Print("[Collector] Initialized Collector gRPC services") - - // Serve gRPC Service - go ColH.grpcServer.Serve(ColH.colService) - - log.Print("[Collector] Serving Collector gRPC services") - - // Start the http server - address := fmt.Sprintf("%s:%s", config.GlobalConfig.ApiLogCollectorAddr, config.GlobalConfig.ApiLogCollectorPort) - log.Print("[Collector] Serving Collector http service on ", address) - go func() { - // Create a new HTTP server - http.HandleFunc("/api/v1/events", DataHandler) - err = http.ListenAndServe(address, nil) - if err != nil { - log.Println("[Collector] Error serving Collector http service on ", err.Error()) - panic(err) - } - }() - - return true -} - -// StopCollector Function -func StopCollector() bool { - ColH.grpcServer.GracefulStop() - - log.Print("[Collector] Gracefully stopped Collector gRPC services") - - return true -} - -// == // diff --git a/sentryflow/pkg/collector/envoy.go b/sentryflow/pkg/collector/envoy.go deleted file mode 100644 index 7a28d24..0000000 --- a/sentryflow/pkg/collector/envoy.go +++ /dev/null @@ -1,227 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package collector - -import ( - "io" - "log" - "strconv" - - "github.com/5GSEC/SentryFlow/protobuf" - - "github.com/5GSEC/SentryFlow/k8s" - "github.com/5GSEC/SentryFlow/processor" - "github.com/5GSEC/SentryFlow/types" - - envoyAccLogsData "github.com/envoyproxy/go-control-plane/envoy/data/accesslog/v3" - envoyAccLogs "github.com/envoyproxy/go-control-plane/envoy/service/accesslog/v3" - envoyMetrics "github.com/envoyproxy/go-control-plane/envoy/service/metrics/v3" - - "google.golang.org/grpc" -) - -// == // - -// EnvoyAccessLogsServer Structure -type EnvoyAccessLogsServer struct { - envoyAccLogs.UnimplementedAccessLogServiceServer - collectorInterface -} - -// newEnvoyAccessLogsServer Function -func newEnvoyAccessLogsServer() *EnvoyAccessLogsServer { - ret := &EnvoyAccessLogsServer{} - return ret -} - -// registerService Function -func (evyAccLogs *EnvoyAccessLogsServer) registerService(server *grpc.Server) { - envoyAccLogs.RegisterAccessLogServiceServer(server, evyAccLogs) -} - -// == // - -// EnvoyMetricsServer Structure -type EnvoyMetricsServer struct { - envoyMetrics.UnimplementedMetricsServiceServer - collectorInterface -} - -// newEnvoyMetricsServer Function -func newEnvoyMetricsServer() *EnvoyMetricsServer { - ret := &EnvoyMetricsServer{} - return ret -} - -// registerService Function -func (evyMetrics *EnvoyMetricsServer) registerService(server *grpc.Server) { - envoyMetrics.RegisterMetricsServiceServer(server, evyMetrics) -} - -// == // - -// generateAPILogsFromEnvoy Function -func generateAPILogsFromEnvoy(entry *envoyAccLogsData.HTTPAccessLogEntry) *protobuf.APILog { - comm := entry.GetCommonProperties() - timeStamp := comm.GetStartTime().Seconds - - srcInform := entry.GetCommonProperties().GetDownstreamRemoteAddress().GetSocketAddress() - srcIP := srcInform.GetAddress() - srcPort := strconv.Itoa(int(srcInform.GetPortValue())) - src := k8s.LookupK8sResource(srcIP) - - dstInform := entry.GetCommonProperties().GetUpstreamRemoteAddress().GetSocketAddress() - dstIP := dstInform.GetAddress() - dstPort := strconv.Itoa(int(dstInform.GetPortValue())) - dst := k8s.LookupK8sResource(dstIP) - - request := entry.GetRequest() - response := entry.GetResponse() - - protocol := entry.GetProtocolVersion().String() - method := request.GetRequestMethod().String() - path := request.GetPath() - resCode := response.GetResponseCode().GetValue() - - envoyAPILog := &protobuf.APILog{ - Id: 0, // @todo zero for now - TimeStamp: strconv.FormatInt(timeStamp, 10), - - SrcNamespace: src.Namespace, - SrcName: src.Name, - SrcLabel: src.Labels, - SrcIP: srcIP, - SrcPort: srcPort, - SrcType: types.K8sResourceTypeToString(src.Type), - - DstNamespace: dst.Namespace, - DstName: dst.Name, - DstLabel: dst.Labels, - DstIP: dstIP, - DstPort: dstPort, - DstType: types.K8sResourceTypeToString(dst.Type), - - Protocol: protocol, - Method: method, - Path: path, - ResponseCode: int32(resCode), - } - - return envoyAPILog -} - -// StreamAccessLogs Function -func (evyAccLogs *EnvoyAccessLogsServer) StreamAccessLogs(stream envoyAccLogs.AccessLogService_StreamAccessLogsServer) error { - for { - event, err := stream.Recv() - if err == io.EOF { - return nil - } else if err != nil { - log.Printf("[EnvoyAPILogs] Failed to receive an event: %v", err) - return err - } - - if event.GetHttpLogs() != nil { - for _, entry := range event.GetHttpLogs().LogEntry { - envoyAPILog := generateAPILogsFromEnvoy(entry) - processor.InsertAPILog(envoyAPILog) - } - } - } -} - -// == // - -// generateMetricsFromEnvoy Function -func generateMetricsFromEnvoy(event *envoyMetrics.StreamMetricsMessage, metaData map[string]interface{}) *protobuf.EnvoyMetrics { - envoyMetrics := &protobuf.EnvoyMetrics{ - TimeStamp: "", - - Namespace: metaData["NAMESPACE"].(string), - Name: metaData["NAME"].(string), - IPAddress: metaData["INSTANCE_IPS"].(string), - Labels: k8s.LookupK8sResource(metaData["INSTANCE_IPS"].(string)).Labels, - - Metrics: make(map[string]*protobuf.MetricValue), - } - - envoyMetrics.Metrics["GAUGE"] = &protobuf.MetricValue{ - Value: make(map[string]string), - } - - envoyMetrics.Metrics["COUNTER"] = &protobuf.MetricValue{ - Value: make(map[string]string), - } - - envoyMetrics.Metrics["HISTOGRAM"] = &protobuf.MetricValue{ - Value: make(map[string]string), - } - - envoyMetrics.Metrics["SUMMARY"] = &protobuf.MetricValue{ - Value: make(map[string]string), - } - - for _, metric := range event.GetEnvoyMetrics() { - metricType := metric.GetType().String() - metricName := metric.GetName() - - if envoyMetrics.Metrics[metricType].Value == nil { - continue - } - - for _, metricDetail := range metric.GetMetric() { - var metricValue string - - if envoyMetrics.TimeStamp == "" { - envoyMetrics.TimeStamp = strconv.FormatInt(metricDetail.GetTimestampMs(), 10) - } - - if metricType == "GAUGE" { - metricValue = strconv.FormatFloat(metricDetail.GetGauge().GetValue(), 'f', -1, 64) - } - - if metricType == "COUNTER" { - metricValue = strconv.FormatFloat(metricDetail.GetCounter().GetValue(), 'f', -1, 64) - } - - if metricType == "HISTOGRAM" { - metricValue = strconv.FormatUint(metricDetail.GetHistogram().GetSampleCount(), 10) - } - - if metricType == "SUMMARY" { - metricValue = strconv.FormatUint(metricDetail.GetHistogram().GetSampleCount(), 10) - } - - envoyMetrics.Metrics[metricType].Value[metricName] = metricValue - } - } - - return envoyMetrics -} - -// StreamMetrics Function -func (evyMetrics *EnvoyMetricsServer) StreamMetrics(stream envoyMetrics.MetricsService_StreamMetricsServer) error { - event, err := stream.Recv() - if err == io.EOF { - return nil - } else if err != nil { - log.Printf("[EnvoyMetrics] Failed to receive an event: %v", err) - return err - } - - err = event.ValidateAll() - if err != nil { - log.Printf("[EnvoyMetrics] Failed to validate an event: %v", err) - } - - identifier := event.GetIdentifier() - if identifier != nil { - metaData := identifier.GetNode().GetMetadata().AsMap() - envoyMetrics := generateMetricsFromEnvoy(event, metaData) - processor.InsertMetrics(envoyMetrics) - } - - return nil -} - -// == // diff --git a/sentryflow/pkg/collector/interface.go b/sentryflow/pkg/collector/interface.go deleted file mode 100644 index a610c4f..0000000 --- a/sentryflow/pkg/collector/interface.go +++ /dev/null @@ -1,16 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package collector - -import ( - "google.golang.org/grpc" -) - -// == // - -// collectorInterface Interface -type collectorInterface interface { - registerService(server *grpc.Server) -} - -// == // diff --git a/sentryflow/pkg/collector/opentelemetry.go b/sentryflow/pkg/collector/opentelemetry.go deleted file mode 100644 index 234b94c..0000000 --- a/sentryflow/pkg/collector/opentelemetry.go +++ /dev/null @@ -1,145 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package collector - -import ( - "context" - "strconv" - "strings" - - otelLogs "go.opentelemetry.io/proto/otlp/collector/logs/v1" - "google.golang.org/grpc" - - "github.com/5GSEC/SentryFlow/protobuf" - - "github.com/5GSEC/SentryFlow/k8s" - "github.com/5GSEC/SentryFlow/processor" - "github.com/5GSEC/SentryFlow/types" -) - -// == // - -// OpenTelemetryLogsServer structure -type OpenTelemetryLogsServer struct { - otelLogs.UnimplementedLogsServiceServer - collectorInterface -} - -// newOpenTelemetryLogsServer Function -func newOpenTelemetryLogsServer() *OpenTelemetryLogsServer { - ret := &OpenTelemetryLogsServer{} - return ret -} - -// registerService Function -func (otlLogs *OpenTelemetryLogsServer) registerService(server *grpc.Server) { - otelLogs.RegisterLogsServiceServer(server, otlLogs) -} - -// == // - -// generateAPILogsFromOtel Function -func generateAPILogsFromOtel(logText string) []*protobuf.APILog { - apiLogs := make([]*protobuf.APILog, 0) - - // Preprocess redundant chars - logText = strings.ReplaceAll(logText, `\"`, "") - logText = strings.ReplaceAll(logText, `}`, "") - - // Split logs by log_records, this is a single access log instance - parts := strings.Split(logText, "log_records") - if len(parts) == 0 { - return nil - } - - // Ignore the first entry (the metadata "resource_logs:{resource:{ scope_logs:{" part) - for _, accessLog := range parts[0:] { - var srcIP string - var srcPort string - var dstIP string - var dstPort string - - if len(accessLog) == 0 { - continue - } - - index := strings.Index(accessLog, "string_value:\"") - if index == -1 { - continue - } - - words := strings.Fields(accessLog[index+len("string_value:\""):]) - - timeStamp := words[0] - method := words[1] - path := words[2] - protocol := words[3] - resCode, _ := strconv.ParseInt(words[4], 10, 64) - - srcInform := words[21] - - // Extract the left and right words based on the colon delimiter (ADDR:PORT) - colonIndex := strings.LastIndex(srcInform, ":") - if colonIndex > 0 && colonIndex < len(srcInform)-1 { - srcIP = strings.TrimSpace(srcInform[:colonIndex]) - srcPort = strings.TrimSpace(srcInform[colonIndex+1:]) - } - src := k8s.LookupK8sResource(srcIP) - - dstInform := words[20] - - // Extract the left and right words based on the colon delimiter (ADDR:PORT) - colonIndex = strings.LastIndex(dstInform, ":") - if colonIndex > 0 && colonIndex < len(dstInform)-1 { - dstIP = strings.TrimSpace(dstInform[:colonIndex]) - dstPort = strings.TrimSpace(dstInform[colonIndex+1:]) - } - dst := k8s.LookupK8sResource(dstIP) - - // Create APILog - apiLog := protobuf.APILog{ - Id: 0, // @todo zero for now - TimeStamp: timeStamp, - - SrcNamespace: src.Namespace, - SrcName: src.Name, - SrcLabel: src.Labels, - SrcIP: srcIP, - SrcPort: srcPort, - SrcType: types.K8sResourceTypeToString(src.Type), - - DstNamespace: dst.Namespace, - DstName: dst.Name, - DstLabel: dst.Labels, - DstIP: dstIP, - DstPort: dstPort, - DstType: types.K8sResourceTypeToString(dst.Type), - - Protocol: protocol, - Method: method, - Path: path, - ResponseCode: int32(resCode), - } - - apiLogs = append(apiLogs, &apiLog) - } - - return apiLogs -} - -// Export Function for Log.Export in OpenTelemetry format -func (otlLogs *OpenTelemetryLogsServer) Export(_ context.Context, req *otelLogs.ExportLogsServiceRequest) (*otelLogs.ExportLogsServiceResponse, error) { - apiLogs := generateAPILogsFromOtel(req.String()) - for _, apiLog := range apiLogs { - processor.InsertAPILog(apiLog) - } - - // @todo not consider partial success - ret := otelLogs.ExportLogsServiceResponse{ - PartialSuccess: nil, - } - - return &ret, nil -} - -// == // diff --git a/sentryflow/pkg/collector/wasm.go b/sentryflow/pkg/collector/wasm.go deleted file mode 100644 index f4c1b03..0000000 --- a/sentryflow/pkg/collector/wasm.go +++ /dev/null @@ -1,54 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package collector - -import ( - "io/ioutil" - "log" - "net/http" - - "google.golang.org/protobuf/encoding/protojson" - - "github.com/5GSEC/SentryFlow/protobuf" - - "github.com/5GSEC/SentryFlow/processor" -) - -// Handler for the HTTP endpoint to receive api events from WASM filter -func DataHandler(w http.ResponseWriter, r *http.Request) { - // Check if the request is POST - if r.Method != http.MethodPost { - http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed) - return - } - - // Read the request body - body, err := ioutil.ReadAll(r.Body) - if err != nil { - http.Error(w, "Failed to read request body", http.StatusInternalServerError) - return - } - // Parse the JSON data into the TelemetryData struct - apiLog := &protobuf.APILogV2{} - err = protojson.Unmarshal(body, apiLog) - if err != nil { - log.Print("failed to parse json") - log.Print(err.Error()) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - // Check protocol version - if r.ProtoMajor == 2 { - apiLog.Protocol = "HTTP/2.0" - } else if r.ProtoMajor == 1 && r.ProtoMinor == 1 { - apiLog.Protocol = "HTTP/1.1" - } else if r.ProtoMajor == 1 && r.ProtoMinor == 0 { - apiLog.Protocol = "HTTP/1.0" - } else { - apiLog.Protocol = "Unknown" - } - processor.InsertAPILog(apiLog) - - // Log the received telemetry data - log.Printf("Received data: %+v\n", apiLog) -} diff --git a/sentryflow/pkg/config/config.go b/sentryflow/pkg/config/config.go index 15e2c7e..da844dd 100644 --- a/sentryflow/pkg/config/config.go +++ b/sentryflow/pkg/config/config.go @@ -1,149 +1,76 @@ -// SPDX-License-Identifier: Apache-2.0 - package config import ( - "flag" - "fmt" - "log" - "strings" + "encoding/json" "github.com/spf13/viper" + "go.uber.org/zap" ) -// SentryFlowConfig structure -type SentryFlowConfig struct { - CollectorAddr string // Address for Collector gRPC - CollectorPort string // Port for Collector gRPC - ApiLogCollectorAddr string // Address for API Log HTTP Collector - ApiLogCollectorPort string // Port for API Log HTTP Collector - - ExporterAddr string // IP address to use for exporter gRPC - ExporterPort string // Port to use for exporter gRPC - - PatchingNamespaces bool // Enable/Disable patching namespaces with 'istio-injection' - RestartingPatchedDeployments bool // Enable/Disable restarting deployments after patching - - AggregationPeriod int // Period for aggregating metrics - CleanUpPeriod int // Period for cleaning up outdated metrics - - AIEngineService string // Address for AI Engine - AIEngineServicePort string // Port for AI Engine - AIEngineBatchSize int // Batch Size to send APIs to AI Engine - - Debug bool // Enable/Disable SentryFlow debug mode -} - -// GlobalConfig Global configuration for SentryFlow -var GlobalConfig SentryFlowConfig - -// init Function -func init() { - _ = LoadConfig() -} - -// Config const const ( - CollectorAddr string = "collectorAddr" - CollectorPort string = "collectorPort" - ApiLogCollectorAddr string = "apiLogCollectorAddr" - ApiLogCollectorPort string = "apiLogCollectorPort" - - ExporterAddr string = "exporterAddr" - ExporterPort string = "exporterPort" - - PatchingNamespaces string = "patchingNamespaces" - RestartingPatchedDeployments string = "restartingPatchedDeployments" - - AggregationPeriod string = "aggregationPeriod" - CleanUpPeriod string = "cleanUpPeriod" - - AIEngineService string = "aiEngineService" - AIEngineServicePort string = "aiEngineServicePort" - AIEngineBatchSize string = "aiEngineBatchSize" - - Debug string = "debug" + DefaultConfigFilePath = "config/default.yaml" ) -func readCmdLineParams() { - collectorAddrStr := flag.String(CollectorAddr, "0.0.0.0", "Address for Collector gRPC") - collectorPortStr := flag.String(CollectorPort, "4317", "Port for Collector gRPC") - apiLogCollectorAddrStr := flag.String(ApiLogCollectorAddr, "0.0.0.0", "Address for API log HTTP Colletor") - apiLogCollectorPortStr := flag.String(ApiLogCollectorPort, "8081", "Port for API log HTTP Colletor") - - exporterAddrStr := flag.String(ExporterAddr, "0.0.0.0", "Address for Exporter gRPC") - exporterPortStr := flag.String(ExporterPort, "8080", "Port for Exporter gRPC") - - patchingNamespacesB := flag.Bool(PatchingNamespaces, false, "Enable patching 'istio-injection' to all namespaces") - restartingPatchedDeploymentsB := flag.Bool(RestartingPatchedDeployments, false, "Enable restarting the deployments in all patched namespaces") - - aggregationPeriodInt := flag.Int(AggregationPeriod, 1, "Period for aggregating metrics") - cleanUpPeriodInt := flag.Int(CleanUpPeriod, 5, "Period for cleanning up outdated metrics") - - aiEngineServiceStr := flag.String(AIEngineService, "ai-engine.sentryflow.svc.cluster.local", "Address for SentryFlow AI Engine") - aiEngineServicePortStr := flag.String(AIEngineServicePort, "5000", "Port for SentryFlow AI Engine") - aiEngineBatchSizeInt := flag.Int(AIEngineBatchSize, 5, "Batch size to send APIs to SentryFlow AI Engine") - - configDebugB := flag.Bool(Debug, false, "Enable debugging mode") - - var flags []string - flag.VisitAll(func(f *flag.Flag) { - kv := fmt.Sprintf("%s:%v", f.Name, f.Value) - flags = append(flags, kv) - }) - log.Printf("Arguments [%s]", strings.Join(flags, " ")) - - flag.Parse() - - viper.SetDefault(CollectorAddr, *collectorAddrStr) - viper.SetDefault(CollectorPort, *collectorPortStr) - viper.SetDefault(ApiLogCollectorAddr, *apiLogCollectorAddrStr) - viper.SetDefault(ApiLogCollectorPort, *apiLogCollectorPortStr) - - viper.SetDefault(ExporterAddr, *exporterAddrStr) - viper.SetDefault(ExporterPort, *exporterPortStr) - - viper.SetDefault(PatchingNamespaces, *patchingNamespacesB) - viper.SetDefault(RestartingPatchedDeployments, *restartingPatchedDeploymentsB) - - viper.SetDefault(AggregationPeriod, *aggregationPeriodInt) - viper.SetDefault(CleanUpPeriod, *cleanUpPeriodInt) - - viper.SetDefault(AIEngineService, *aiEngineServiceStr) - viper.SetDefault(AIEngineServicePort, *aiEngineServicePortStr) - viper.SetDefault(AIEngineBatchSize, *aiEngineBatchSizeInt) - - viper.SetDefault(Debug, *configDebugB) +type endpoint struct { + Url string `json:"url"` + Port uint16 `json:"port"` } -// LoadConfig Load configuration -func LoadConfig() error { - // Read configuration from command line - readCmdLineParams() - - // Read environment variable, those are upper-cased - viper.AutomaticEnv() - - GlobalConfig.CollectorAddr = viper.GetString(CollectorAddr) - GlobalConfig.CollectorPort = viper.GetString(CollectorPort) - GlobalConfig.ApiLogCollectorAddr = viper.GetString(ApiLogCollectorAddr) - GlobalConfig.ApiLogCollectorPort = viper.GetString(ApiLogCollectorPort) - GlobalConfig.ExporterAddr = viper.GetString(ExporterAddr) - GlobalConfig.ExporterPort = viper.GetString(ExporterPort) +type base struct { + Name string `json:"name,omitempty"` + // Todo: Do we really need both gRPC and http variants? + Grpc *endpoint `json:"grpc,omitempty"` + Http *endpoint `json:"http,omitempty"` +} - GlobalConfig.PatchingNamespaces = viper.GetBool(PatchingNamespaces) - GlobalConfig.RestartingPatchedDeployments = viper.GetBool(RestartingPatchedDeployments) +type serviceMesh struct { + Name string `json:"name"` + Enable bool `json:"enable"` +} - GlobalConfig.AggregationPeriod = viper.GetInt(AggregationPeriod) - GlobalConfig.CleanUpPeriod = viper.GetInt(CleanUpPeriod) +type sources struct { + ServiceMeshes []*serviceMesh `json:"serviceMeshes,omitempty"` + Others []*base `json:"others,omitempty"` +} - GlobalConfig.AIEngineService = viper.GetString(AIEngineService) - GlobalConfig.AIEngineServicePort = viper.GetString(AIEngineServicePort) - GlobalConfig.AIEngineBatchSize = viper.GetInt(AIEngineBatchSize) +type pProf struct { + Port uint16 `json:"port"` +} - GlobalConfig.Debug = viper.GetBool(Debug) +type debugCfg struct { + Enable bool `json:"enable"` + PProf *pProf `json:"pprof"` +} - log.Printf("Configuration [%+v]", GlobalConfig) +type Config struct { + Sources *sources `json:"sources"` + Exporter *base `json:"exporter"` + Debug *debugCfg `json:"debug,omitempty"` +} - return nil +func Init(configFilePath string, logger *zap.SugaredLogger) (*Config, error) { + if configFilePath == "" { + configFilePath = DefaultConfigFilePath + logger.Warnf("Using default config file path: %s", configFilePath) + } + + viper.SetConfigFile(configFilePath) + if err := viper.ReadInConfig(); err != nil { + logger.Errorf("Failed to read config file: %v", err) + return nil, err + } + + config := &Config{} + if err := viper.Unmarshal(config); err != nil { + logger.Errorf("Failed to unmarshal config file: %v", err) + return nil, err + } + + bytes, err := json.Marshal(config) + if err != nil { + logger.Errorf("Failed to marshal config file: %v", err) + } + logger.Debugf("Config: %s", string(bytes)) + + return config, nil } diff --git a/sentryflow/pkg/core/sentryflow.go b/sentryflow/pkg/core/sentryflow.go index 32736ea..95b0bc9 100644 --- a/sentryflow/pkg/core/sentryflow.go +++ b/sentryflow/pkg/core/sentryflow.go @@ -1,196 +1,86 @@ -// SPDX-License-Identifier: Apache-2.0 - package core import ( - "log" - "os" - "os/signal" + "context" "sync" - "syscall" - - "github.com/5GSEC/SentryFlow/collector" - "github.com/5GSEC/SentryFlow/config" - "github.com/5GSEC/SentryFlow/exporter" - "github.com/5GSEC/SentryFlow/k8s" - "github.com/5GSEC/SentryFlow/processor" -) - -// == // - -// StopChan Channel -var StopChan chan struct{} - -// init Function -func init() { - StopChan = make(chan struct{}) -} - -// SentryFlowService Structure -type SentryFlowService struct { - waitGroup *sync.WaitGroup -} - -// NewSentryFlow Function -func NewSentryFlow() *SentryFlowService { - sf := new(SentryFlowService) - sf.waitGroup = new(sync.WaitGroup) - return sf -} - -// DestroySentryFlow Function -func (sf *SentryFlowService) DestroySentryFlow() { - close(StopChan) - - // Remove SentryFlow collector config from Kubernetes - if k8s.UnpatchIstioConfigMap() { - log.Print("[SentryFlow] Unpatched Istio ConfigMap") - } else { - log.Print("[SentryFlow] Failed to unpatch Istio ConfigMap") - } - - // Stop collector - if collector.StopCollector() { - log.Print("[SentryFlow] Stopped Collectors") - } else { - log.Print("[SentryFlow] Failed to stop Collectors") - } - - // Stop Log Processor - if processor.StopLogProcessor() { - log.Print("[SentryFlow] Stopped Log Processors") - } else { - log.Print("[SentryFlow] Failed to stop Log Processors") - } - - // Stop API Aanalyzer - if processor.StopAPIAnalyzer() { - log.Print("[SentryFlow] Stopped API Analyzer") - } else { - log.Print("[SentryFlow] Failed to stop API Analyzer") - } - - // Stop API classifier - // if processor.StopAPIClassifier() { - // log.Print("[SentryFlow] Stopped API Classifier") - // } else { - // log.Print("[SentryFlow] Failed to stop API Classifier") - // } - - // Stop exporter - if exporter.StopExporter() { - log.Print("[SentryFlow] Stopped Exporters") - } else { - log.Print("[SentryFlow] Failed to stop Exporters") - } - - log.Print("[SentryFlow] Waiting for routine terminations") - - sf.waitGroup.Wait() - - log.Print("[SentryFlow] Terminated SentryFlow") -} -// == // + "go.uber.org/zap" + istionet "istio.io/client-go/pkg/apis/networking/v1alpha3" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" -// GetOSSigChannel Function -func GetOSSigChannel() chan os.Signal { - c := make(chan os.Signal, 1) + utilruntime "k8s.io/apimachinery/pkg/util/runtime" - signal.Notify(c, - syscall.SIGHUP, - syscall.SIGINT, - syscall.SIGTERM, - syscall.SIGQUIT, - os.Interrupt) + "github.com/5GSEC/SentryFlow/pkg/config" + "github.com/5GSEC/SentryFlow/pkg/exporter" + "github.com/5GSEC/SentryFlow/pkg/k8s" + istiosidecarmesh "github.com/5GSEC/SentryFlow/pkg/source/svcmesh/istio/sidecar" + "github.com/5GSEC/SentryFlow/pkg/util" +) - return c +type manager struct { + Ctx context.Context + Logger *zap.SugaredLogger + K8sClient client.Client + Wg *sync.WaitGroup } -// == // - -// SentryFlow Function -func SentryFlow() { - sf := NewSentryFlow() - - log.Print("[SentryFlow] Initializing SentryFlow") - - // == // - - // Initialize Kubernetes client - if !k8s.InitK8sClient() { - sf.DestroySentryFlow() - return +func Run(ctx context.Context, configFilePath string, kubeConfig string) { + mgr := &manager{ + Ctx: ctx, + Logger: util.LoggerFromCtx(ctx), + Wg: &sync.WaitGroup{}, } + mgr.Logger.Info("Starting SentryFlow") - // Start Kubernetes informers - k8s.RunInformers(StopChan, sf.waitGroup) - - // Patch Istio ConfigMap - if !k8s.PatchIstioConfigMap() { - sf.DestroySentryFlow() + cfg, err := config.Init(configFilePath, mgr.Logger) + if err != nil { return } - // Patch Namespaces - if config.GlobalConfig.PatchingNamespaces { - if !k8s.PatchNamespaces() { - sf.DestroySentryFlow() - return - } - } - - // Patch Deployments - if config.GlobalConfig.RestartingPatchedDeployments { - if !k8s.RestartDeployments() { - sf.DestroySentryFlow() - return - } - } - - // == // + // Todo: Enable pprof and debug logging + if cfg.Debug.Enable { - // Start collector - if !collector.StartCollector() { - sf.DestroySentryFlow() - return } - // Start log processor - if !processor.StartLogProcessor(sf.waitGroup) { - sf.DestroySentryFlow() + k8sClient, err := k8s.NewClient(registerAndGetScheme(), kubeConfig) + if err != nil { + mgr.Logger.Errorf("Failed to create k8s client: %v", err) return } - - // Start API analyzer - if !processor.StartAPIAnalyzer(sf.waitGroup) { - sf.DestroySentryFlow() - return - } - - // Start API classifier - // if !processor.StartAPIClassifier(sf.waitGroup) { - // sf.DestroySentryFlow() - // return - // } - - // Start exporter - if !exporter.StartExporter(sf.waitGroup) { - sf.DestroySentryFlow() - return + mgr.K8sClient = k8sClient + + for _, serviceMesh := range cfg.Sources.ServiceMeshes { + if serviceMesh.Name != "" && serviceMesh.Enable { + switch serviceMesh.Name { + case util.ServiceMeshIstioSidecar: + mgr.Wg.Add(1) + go func() { + defer mgr.Wg.Done() + istiosidecarmesh.StartMonitoring(mgr.Ctx, mgr.Logger.Named("istio-sidecar"), mgr.K8sClient) + }() + default: + mgr.Logger.Errorf("Unsupported Service Mesh, %v", serviceMesh.Name) + return + } + } } - log.Print("[SentryFlow] Initialization is completed") - - // == // - - // listen for interrupt signals - sigChan := GetOSSigChannel() - <-sigChan - log.Print("Got a signal to terminate SentryFlow") - - // == // + mgr.Wg.Add(1) + go func() { + defer mgr.Wg.Done() + exporter.Start(mgr.Ctx, mgr.Logger.Named("exporter"), cfg) + }() + + mgr.Logger.Info("Started SentryFlow") + <-ctx.Done() + mgr.Logger.Info("Shutdown Signal Received. Waiting for all workers to finish.") + mgr.Wg.Wait() + mgr.Logger.Info("Shutting down SentryFlow") +} - // Destroy SentryFlow - sf.DestroySentryFlow() +func registerAndGetScheme() *runtime.Scheme { + scheme := runtime.NewScheme() + utilruntime.Must(istionet.AddToScheme(scheme)) + return scheme } diff --git a/sentryflow/pkg/exporter/apievent.go b/sentryflow/pkg/exporter/apievent.go new file mode 100644 index 0000000..8123134 --- /dev/null +++ b/sentryflow/pkg/exporter/apievent.go @@ -0,0 +1,76 @@ +package exporter + +import ( + "context" + "net/http" + "sync" + + "go.uber.org/zap" + "google.golang.org/grpc" + + "github.com/5GSEC/SentryFlow/pkg/config" + "github.com/5GSEC/SentryFlow/protobuf" +) + +type exporter struct { + protobuf.UnimplementedSentryFlowServer + ctx context.Context + grpcServer *grpc.Server + httpServer *http.Server + wg *sync.WaitGroup + logger *zap.SugaredLogger + apiEvents chan *protobuf.APIEvent +} + +func (e *exporter) StopServers() { + e.logger.Info("Stopping servers") + if e.grpcServer != nil { + e.logger.Info("Stopping gRPC server") + e.grpcServer.GracefulStop() + } + e.logger.Info("Stopping HTTP server") + if err := e.httpServer.Shutdown(context.Background()); err != nil { + e.logger.Errorw("Failed to gracefully shutdown http server", "error", err) + } + e.logger.Info("Stopped Servers") +} + +func Start(ctx context.Context, logger *zap.SugaredLogger, cfg *config.Config) { + logger.Info("Starting exporter") + + if cfg.Exporter == nil { + logger.Fatalf("Failed to start exporter as no config was found") + } + + exp := &exporter{ + ctx: ctx, + wg: &sync.WaitGroup{}, + logger: logger, + apiEvents: make(chan *protobuf.APIEvent, 1024), + } + + grpcCfg := cfg.Exporter.Grpc + if grpcCfg != nil { + exp.wg.Add(1) + go func() { + defer exp.wg.Done() + exp.StartGrpcServer(grpcCfg.Port) + }() + } + + exp.wg.Add(1) + go func() { + defer exp.wg.Done() + exp.StartHttpServer() + }() + + logger.Info("Started Exporter") + + // Todo: Improve graceful shutdown + <-exp.ctx.Done() + logger.Info("Shutting down exporter") + exp.StopServers() + exp.wg.Wait() + + logger.Info("Exporter shut down") +} diff --git a/sentryflow/pkg/exporter/exportAPILogs.go b/sentryflow/pkg/exporter/exportAPILogs.go deleted file mode 100644 index fc4cc5a..0000000 --- a/sentryflow/pkg/exporter/exportAPILogs.go +++ /dev/null @@ -1,132 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package exporter - -import ( - "errors" - "fmt" - "log" - "sort" - "strings" - - "github.com/5GSEC/SentryFlow/protobuf" -) - -// == // - -// apiLogStreamInform structure -type apiLogStreamInform struct { - Hostname string - IPAddress string - - stream protobuf.SentryFlow_GetAPILogServer - error chan error -} - -// apiLogStreamInformV2 structure -type apiLogStreamInformV2 struct { - Hostname string - IPAddress string - - stream protobuf.SentryFlow_GetAPILogV2Server - error chan error -} - -// GetAPILog Function (for gRPC) -func (exs *ExpService) GetAPILog(info *protobuf.ClientInfo, stream protobuf.SentryFlow_GetAPILogServer) error { - log.Printf("[Exporter] Client %s (%s) connected (GetAPILog)", info.HostName, info.IPAddress) - - currExporter := &apiLogStreamInform{ - Hostname: info.HostName, - IPAddress: info.IPAddress, - stream: stream, - } - - ExpH.exporterLock.Lock() - ExpH.apiLogExporters = append(ExpH.apiLogExporters, currExporter) - ExpH.exporterLock.Unlock() - - return <-currExporter.error -} - -// GetAPILogV2 Function (for gRPC) -func (exs *ExpService) GetAPILogV2(info *protobuf.ClientInfo, stream protobuf.SentryFlow_GetAPILogV2Server) error { - log.Printf("[Exporter] Client %s (%s) connected (GetAPILogV2)", info.HostName, info.IPAddress) - - currExporter := &apiLogStreamInformV2{ - Hostname: info.HostName, - IPAddress: info.IPAddress, - stream: stream, - } - - ExpH.exporterLock.Lock() - ExpH.apiLogExportersV2 = append(ExpH.apiLogExportersV2, currExporter) - ExpH.exporterLock.Unlock() - - return <-currExporter.error -} - -// SendAPILogs Function -func (exp *ExpHandler) SendAPILogs(apiLog *protobuf.APILog) error { - failed := 0 - total := len(exp.apiLogExporters) - - for _, exporter := range exp.apiLogExporters { - log.Print("Sending api log!!!!") - log.Printf("Sending api log right here!!!!! %+v\n", apiLog) - if err := exporter.stream.Send(apiLog); err != nil { - log.Printf("[Exporter] Failed to export an API log to %s (%s): %v", exporter.Hostname, exporter.IPAddress, err) - failed++ - } - } - - if failed != 0 { - msg := fmt.Sprintf("[Exporter] Failed to export API logs properly (%d/%d failed)", failed, total) - return errors.New(msg) - } - - return nil -} - -// SendAPILogsV2 Function -func (exp *ExpHandler) SendAPILogsV2(apiLog *protobuf.APILogV2) error { - failed := 0 - total := len(exp.apiLogExportersV2) - - for _, exporter := range exp.apiLogExportersV2 { - if err := exporter.stream.Send(apiLog); err != nil { - log.Printf("[Exporter] Failed to export an API log(V2) to %s (%s): %v", exporter.Hostname, exporter.IPAddress, err) - failed++ - } - } - - if failed != 0 { - msg := fmt.Sprintf("[Exporter] Failed to export API logs(V2) properly (%d/%d failed)", failed, total) - return errors.New(msg) - } - - return nil -} - -// == // - -// InsertAPILog Function -func InsertAPILog(apiLog interface{}) { - switch data := apiLog.(type) { - case *protobuf.APILog: - ExpH.exporterAPILogs <- data - // Make a string with labels - var labelString []string - for k, v := range data.SrcLabel { - labelString = append(labelString, fmt.Sprintf("%s:%s", k, v)) - } - sort.Strings(labelString) - - // Update Stats per namespace and per labels - UpdateStats(data.SrcNamespace, strings.Join(labelString, ","), data.GetPath()) - case *protobuf.APILogV2: - ExpH.exporterAPILogsV2 <- data - } -} - -// == // diff --git a/sentryflow/pkg/exporter/exporterHandler.go b/sentryflow/pkg/exporter/exporterHandler.go deleted file mode 100644 index 0fe103f..0000000 --- a/sentryflow/pkg/exporter/exporterHandler.go +++ /dev/null @@ -1,259 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package exporter - -import ( - "fmt" - "log" - "net" - "sync" - - "github.com/5GSEC/SentryFlow/protobuf" - - "github.com/5GSEC/SentryFlow/config" - - "google.golang.org/grpc" -) - -// == // - -// ExpH global reference for Exporter Handler -var ExpH *ExpHandler - -// init Function -func init() { - ExpH = NewExporterHandler() -} - -// ExpHandler structure -type ExpHandler struct { - exporterService net.Listener - grpcServer *grpc.Server - grpcService *ExpService - - apiLogExporters []*apiLogStreamInform - apiLogExportersV2 []*apiLogStreamInformV2 - apiMetricsExporters []*apiMetricStreamInform - envoyMetricsExporters []*envoyMetricsStreamInform - - exporterLock sync.Mutex - - exporterAPILogs chan *protobuf.APILog - exporterAPILogsV2 chan *protobuf.APILogV2 - exporterAPIMetrics chan *protobuf.APIMetrics - exporterMetrics chan *protobuf.EnvoyMetrics - - statsPerLabel map[string]StatsPerLabel - statsPerLabelLock sync.RWMutex - - stopChan chan struct{} -} - -// ExpService Structure -type ExpService struct { - protobuf.UnimplementedSentryFlowServer -} - -// == // - -// NewExporterHandler Function -func NewExporterHandler() *ExpHandler { - exp := &ExpHandler{ - grpcService: new(ExpService), - - apiLogExporters: make([]*apiLogStreamInform, 0), - apiLogExportersV2: make([]*apiLogStreamInformV2, 0), - apiMetricsExporters: make([]*apiMetricStreamInform, 0), - envoyMetricsExporters: make([]*envoyMetricsStreamInform, 0), - - exporterLock: sync.Mutex{}, - - exporterAPILogs: make(chan *protobuf.APILog), - exporterAPILogsV2: make(chan *protobuf.APILogV2), - exporterAPIMetrics: make(chan *protobuf.APIMetrics), - exporterMetrics: make(chan *protobuf.EnvoyMetrics), - - statsPerLabel: make(map[string]StatsPerLabel), - statsPerLabelLock: sync.RWMutex{}, - - stopChan: make(chan struct{}), - } - - return exp -} - -// == // - -// StartExporter Function -func StartExporter(wg *sync.WaitGroup) bool { - // Make a string with the given exporter address and port - exporterService := fmt.Sprintf("%s:%s", config.GlobalConfig.ExporterAddr, config.GlobalConfig.ExporterPort) - - // Start listening gRPC port - expService, err := net.Listen("tcp", exporterService) - if err != nil { - log.Printf("[Exporter] Failed to listen at %s: %v", exporterService, err) - return false - } - ExpH.exporterService = expService - - log.Printf("[Exporter] Listening Exporter gRPC services (%s)", exporterService) - - // Create gRPC server - gRPCServer := grpc.NewServer() - ExpH.grpcServer = gRPCServer - - protobuf.RegisterSentryFlowServer(gRPCServer, ExpH.grpcService) - - log.Printf("[Exporter] Initialized Exporter gRPC services") - - // Serve gRPC Service - go ExpH.grpcServer.Serve(ExpH.exporterService) - - log.Printf("[Exporter] Serving Exporter gRPC services (%s)", exporterService) - - // Export APILogs - go ExpH.exportAPILogs(wg) - - // Export APILogsV2 - go ExpH.exportAPILogsV2(wg) - - log.Printf("[Exporter] Exporting API logs through gRPC services") - - // Export APIMetrics - go ExpH.exportAPIMetrics(wg) - - log.Printf("[Exporter] Exporting API metrics through gRPC services") - - // Export EnvoyMetrics - go ExpH.exportEnvoyMetrics(wg) - - log.Printf("[Exporter] Exporting Envoy metrics through gRPC services") - - // Start Export Time Ticker Routine - go AggregateAPIMetrics() - go CleanUpOutdatedStats() - - return true -} - -// StopExporter Function -func StopExporter() bool { - // One for exportAPILogs - ExpH.stopChan <- struct{}{} - - // One for exportAPILogsV2 - ExpH.stopChan <- struct{}{} - - // One for exportAPIMetrics - ExpH.stopChan <- struct{}{} - - // One for exportEnvoyMetrics - ExpH.stopChan <- struct{}{} - - // Stop gRPC server - ExpH.grpcServer.GracefulStop() - - log.Printf("[Exporter] Gracefully stopped Exporter gRPC services") - - return true -} - -// == // - -// exportAPILogs Function -func (exp *ExpHandler) exportAPILogs(wg *sync.WaitGroup) { - wg.Add(1) - - for { - select { - case apiLog, ok := <-exp.exporterAPILogs: - if !ok { - log.Printf("[Exporter] Failed to fetch APIs from APIs channel") - wg.Done() - return - } - - if err := exp.SendAPILogs(apiLog); err != nil { - log.Printf("[Exporter] Failed to export API Logs: %v", err) - } - - case <-exp.stopChan: - wg.Done() - return - } - } -} - -// exportAPILogs Function -func (exp *ExpHandler) exportAPILogsV2(wg *sync.WaitGroup) { - wg.Add(1) - - for { - select { - case apiLog, ok := <-exp.exporterAPILogsV2: - if !ok { - log.Printf("[Exporter] Failed to fetch APILogs(V2) from APIs channel") - wg.Done() - return - } - - if err := exp.SendAPILogsV2(apiLog); err != nil { - log.Printf("[Exporter] Failed to export API Logs(V2): %v", err) - } - - case <-exp.stopChan: - wg.Done() - return - } - } -} - -// exportAPIMetrics Function -func (exp *ExpHandler) exportAPIMetrics(wg *sync.WaitGroup) { - wg.Add(1) - - for { - select { - case apiMetrics, ok := <-exp.exporterAPIMetrics: - if !ok { - log.Printf("[Exporter] Failed to fetch metrics from API Metrics channel") - wg.Done() - return - } - if err := exp.SendAPIMetrics(apiMetrics); err != nil { - log.Printf("[Exporter] Failed to export API metrics: %v", err) - } - - case <-exp.stopChan: - wg.Done() - return - } - } -} - -// exportEnvoyMetrics Function -func (exp *ExpHandler) exportEnvoyMetrics(wg *sync.WaitGroup) { - wg.Add(1) - - for { - select { - case evyMetrics, ok := <-exp.exporterMetrics: - if !ok { - log.Printf("[Exporter] Failed to fetch metrics from Envoy Metrics channel") - wg.Done() - return - } - - if err := exp.SendEnvoyMetrics(evyMetrics); err != nil { - log.Printf("[Exporter] Failed to export Envoy metrics: %v", err) - } - - case <-exp.stopChan: - wg.Done() - return - } - } -} - -// == // diff --git a/sentryflow/pkg/exporter/grpc.go b/sentryflow/pkg/exporter/grpc.go new file mode 100644 index 0000000..75c7deb --- /dev/null +++ b/sentryflow/pkg/exporter/grpc.go @@ -0,0 +1,42 @@ +package exporter + +import ( + "fmt" + "net" + + "google.golang.org/grpc" + + "github.com/5GSEC/SentryFlow/protobuf" +) + +func (e *exporter) StartGrpcServer(port uint16) { + e.logger.Info("Starting gRPC Server") + + e.grpcServer = grpc.NewServer() + + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + e.logger.Fatalf("failed to listen: %v", err) + } + + protobuf.RegisterSentryFlowServer(e.grpcServer, e) + + e.logger.Infof("gRPC server listening on port %d", port) + if err = e.grpcServer.Serve(listener); err != nil { + e.logger.Errorf("failed to serve gRPC server: %v", err) + return + } +} + +func (e *exporter) GetAPIEvent(clientInfo *protobuf.ClientInfo, stream grpc.ServerStreamingServer[protobuf.APIEvent]) error { + e.logger.Infof("Client %s (%s) connected", clientInfo.HostName, clientInfo.IPAddress) + + for apiEvent := range e.apiEvents { + if err := stream.Send(apiEvent); err != nil { + e.logger.Errorf("failed to send APIEvent: %v", err) + return err + } + } + + return nil +} diff --git a/sentryflow/pkg/exporter/http.go b/sentryflow/pkg/exporter/http.go new file mode 100644 index 0000000..3e214ff --- /dev/null +++ b/sentryflow/pkg/exporter/http.go @@ -0,0 +1,78 @@ +package exporter + +import ( + "errors" + "fmt" + "io" + "net/http" + "strings" + + "google.golang.org/protobuf/encoding/protojson" + + "github.com/5GSEC/SentryFlow/protobuf" +) + +func (e *exporter) StartHttpServer() { + e.logger.Info("Starting HTTP server") + const port = 8081 + e.httpServer = &http.Server{ + Addr: fmt.Sprintf(":%d", port), + Handler: nil, + } + e.registerRoutes() + + e.logger.Infof("HTTP server listening on port: %d", port) + if err := e.httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + e.logger.Fatalf("Could not listen on port %d: %v", port, err) + } +} + +func (e *exporter) registerRoutes() { + http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + w.WriteHeader(http.StatusOK) + }) + + http.HandleFunc("/api/v1/events", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + if r.Body == nil { + e.logger.Info("Body is nil") + w.WriteHeader(http.StatusBadRequest) + return + } + + body, err := io.ReadAll(r.Body) + if err != nil { + e.logger.Errorf("failed to read request body, error: %v", err) + http.Error(w, "failed to read request body", http.StatusInternalServerError) + return + } + + apiEvent := &protobuf.APIEvent{} + if err := protojson.Unmarshal(body, apiEvent); err != nil { + e.logger.Info("failed to marshal api event, error:", err) + http.Error(w, "failed to parse request body", http.StatusBadRequest) + return + } + + if r.ProtoMajor == 2 { + if strings.Contains(r.Header.Get("Content-Type"), "application/grpc") { + apiEvent.Protocol = "grpc" + } else { + apiEvent.Protocol = "HTTP/2.0" + } + } else if r.ProtoMajor == 1 && r.ProtoMinor == 1 { + apiEvent.Protocol = "HTTP/1.1" + } else if r.ProtoMajor == 1 && r.ProtoMinor == 0 { + apiEvent.Protocol = "HTTP/1.0" + } + e.apiEvents <- apiEvent + }) +} diff --git a/sentryflow/pkg/k8s/client.go b/sentryflow/pkg/k8s/client.go new file mode 100644 index 0000000..b6b0e8d --- /dev/null +++ b/sentryflow/pkg/k8s/client.go @@ -0,0 +1,39 @@ +package k8s + +import ( + "errors" + "fmt" + "os" + "path/filepath" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// NewClient returns a new Client using the provided scheme to map go structs to +// GroupVersionKinds. +func NewClient(scheme *runtime.Scheme, kubeConfig string) (client.Client, error) { + config, err := getConfig(kubeConfig) + if err != nil { + return nil, fmt.Errorf("failed to get config: %v", err) + } + return client.New(config, client.Options{ + Scheme: scheme, + }) +} + +func getConfig(kubeConfig string) (*rest.Config, error) { + config, err := rest.InClusterConfig() + if err != nil && errors.Is(err, rest.ErrNotInCluster) { + if kubeConfig == "" { + kubeConfig = filepath.Join(os.Getenv("HOME"), ".kube", "config") + } + config, err = clientcmd.BuildConfigFromFlags("", kubeConfig) + if err != nil { + return nil, err + } + } + return config, nil +} diff --git a/sentryflow/pkg/source/svcmesh/istio/sidecar/sidecar.go b/sentryflow/pkg/source/svcmesh/istio/sidecar/sidecar.go new file mode 100644 index 0000000..3daabcb --- /dev/null +++ b/sentryflow/pkg/source/svcmesh/istio/sidecar/sidecar.go @@ -0,0 +1,525 @@ +package sidecar + +import ( + "context" + "fmt" + + _struct "github.com/golang/protobuf/ptypes/struct" + "go.uber.org/zap" + networkingv1alpha3 "istio.io/api/networking/v1alpha3" + istionet "istio.io/client-go/pkg/apis/networking/v1alpha3" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + FilterName = "http-filter" + UpstreamAndClusterName = "sentryflow" + ApiPath = "/api/v1/events" + FilterURI = "https://raw.githubusercontent.com/anurag-rajawat/envoy-wasm-filters/main/httpfilters.wasm" + RemoteWasmFilterClusterName = "remote_wasm" + FilterSha256 = "714be6a76e8853fa331a285c8d420a740675708f1503df88370a30197f8b6e37" + Timeout = "5s" + SentryFlowFilterServerPort = 8081 +) + +func StartMonitoring(ctx context.Context, logger *zap.SugaredLogger, k8sClient client.Client) { + logger.Info("Starting istio sidecar mesh monitoring") + + if err := createEnvoyFilter(ctx, logger, k8sClient); err != nil { + logger.Errorf("Failed to create EnvoyFilter. Stopping istio sidecar mesh monitoring, error: %v", err) + return + } + logger.Info("Started istio sidecar mesh monitoring") + + <-ctx.Done() + logger.Info("Shutting down istio sidecar mesh monitoring") + if err := deleteEnvoyFilter(k8sClient); err != nil { + logger.Errorf("Failed to delete EnvoyFilter, error: %v", err) + } + + logger.Info("Stopped istio sidecar mesh monitoring") +} + +func deleteEnvoyFilter(k8sClient client.Client) error { + filter := &istionet.EnvoyFilter{ + ObjectMeta: v1.ObjectMeta{ + Name: "sentryflow-http-filter", + Namespace: "istio-system", + }, + } + + if err := k8sClient.Delete(context.Background(), filter); err != nil { + return err + } + return nil +} + +func createEnvoyFilter(ctx context.Context, logger *zap.SugaredLogger, k8sClient client.Client) error { + var configVal = fmt.Sprintf(`{"upstream_name": "%v", "authority": "%v", "api_path": "%v"} +`, UpstreamAndClusterName, UpstreamAndClusterName, ApiPath) + + filter := &istionet.EnvoyFilter{ + TypeMeta: v1.TypeMeta{ + Kind: "EnvoyFilter", + APIVersion: "networking.istio.io/v1alpha3", + }, + ObjectMeta: v1.ObjectMeta{ + Name: "sentryflow-http-filter", + // Deploy the filter to whatever istio considers its "root" namespace so that we + // don't have to create the ConfigMap(s) containing the WASM filter binary, and + // the associated annotations/configuration for the Istio sidecar(s). + // https://istio.io/latest/docs/reference/config/istio.mesh.v1alpha1/#MeshConfig:~:text=No-,rootNamespace,-string + Namespace: "istio-system", + Labels: map[string]string{ + "app.kubernetes.io/managed-by": "sentryflow", + }, + }, + Spec: networkingv1alpha3.EnvoyFilter{ + ConfigPatches: []*networkingv1alpha3.EnvoyFilter_EnvoyConfigObjectPatch{ + { + ApplyTo: networkingv1alpha3.EnvoyFilter_HTTP_FILTER, + Match: &networkingv1alpha3.EnvoyFilter_EnvoyConfigObjectMatch{ + Context: networkingv1alpha3.EnvoyFilter_ANY, + ObjectTypes: &networkingv1alpha3.EnvoyFilter_EnvoyConfigObjectMatch_Listener{ + Listener: &networkingv1alpha3.EnvoyFilter_ListenerMatch{ + FilterChain: &networkingv1alpha3.EnvoyFilter_ListenerMatch_FilterChainMatch{ + Filter: &networkingv1alpha3.EnvoyFilter_ListenerMatch_FilterMatch{ + Name: "envoy.filters.network.http_connection_manager", + SubFilter: &networkingv1alpha3.EnvoyFilter_ListenerMatch_SubFilterMatch{ + Name: "envoy.filters.http.router", + }, + }, + }, + }, + }, + }, + Patch: &networkingv1alpha3.EnvoyFilter_Patch{ + Operation: networkingv1alpha3.EnvoyFilter_Patch_INSERT_BEFORE, + Value: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "name": { + Kind: &_struct.Value_StringValue{ + StringValue: "envoy.filters.http.wasm", + }, + }, + "typedConfig": { + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "@type": { + Kind: &_struct.Value_StringValue{ + StringValue: "type.googleapis.com/udpa.type.v1.TypedStruct", + }, + }, + "typeUrl": { + Kind: &_struct.Value_StringValue{ + StringValue: "type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm", + }, + }, + "value": { + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "config": { + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "name": { + Kind: &_struct.Value_StringValue{ + StringValue: FilterName, + }, + }, + "rootId": { + Kind: &_struct.Value_StringValue{ + StringValue: FilterName, + }, + }, + "configuration": { + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "@type": { + Kind: &_struct.Value_StringValue{ + StringValue: "type.googleapis.com/google.protobuf.StringValue", + }, + }, + "value": { + Kind: &_struct.Value_StringValue{ + StringValue: configVal, + }, + }, + }, + }, + }, + }, + "vmConfig": { + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "code": { + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "remote": { + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "http_uri": { + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "uri": { + Kind: &_struct.Value_StringValue{ + StringValue: FilterURI, + }, + }, + "timeout": { + Kind: &_struct.Value_StringValue{ + StringValue: Timeout, + }, + }, + "cluster": { + Kind: &_struct.Value_StringValue{ + StringValue: RemoteWasmFilterClusterName, + }, + }, + }, + }, + }, + }, + "sha256": { + Kind: &_struct.Value_StringValue{ + StringValue: FilterSha256, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + "runtime": { + Kind: &_struct.Value_StringValue{ + StringValue: "envoy.wasm.runtime.v8", + }, + }, + "vmId": { + Kind: &_struct.Value_StringValue{ + StringValue: FilterName, + }, + }, + "allow_precompiled": { + Kind: &_struct.Value_BoolValue{ + BoolValue: true, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + ApplyTo: networkingv1alpha3.EnvoyFilter_CLUSTER, + Match: &networkingv1alpha3.EnvoyFilter_EnvoyConfigObjectMatch{ + Context: networkingv1alpha3.EnvoyFilter_SIDECAR_OUTBOUND, + }, + Patch: &networkingv1alpha3.EnvoyFilter_Patch{ + Operation: networkingv1alpha3.EnvoyFilter_Patch_ADD, + Value: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "name": { + Kind: &_struct.Value_StringValue{ + StringValue: UpstreamAndClusterName, + }, + }, + "type": { + Kind: &_struct.Value_StringValue{ + StringValue: "LOGICAL_DNS", + }, + }, + "connect_timeout": { + Kind: &_struct.Value_StringValue{ + StringValue: "1s", + }, + }, + "lb_policy": { + Kind: &_struct.Value_StringValue{ + StringValue: "ROUND_ROBIN", + }, + }, + "load_assignment": { + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "cluster_name": { + Kind: &_struct.Value_StringValue{ + StringValue: UpstreamAndClusterName, + }, + }, + "endpoints": { + Kind: &_struct.Value_ListValue{ + ListValue: &_struct.ListValue{ + Values: []*_struct.Value{ + { + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "lb_endpoints": { + Kind: &_struct.Value_ListValue{ + ListValue: &_struct.ListValue{ + Values: []*_struct.Value{{ + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "endpoint": { + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "address": { + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "socket_address": { + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "protocol": { + Kind: &_struct.Value_StringValue{ + StringValue: "TCP", + }, + }, + "address": { + Kind: &_struct.Value_StringValue{ + StringValue: UpstreamAndClusterName + "." + UpstreamAndClusterName, + }, + }, + "port_value": { + Kind: &_struct.Value_NumberValue{ + NumberValue: SentryFlowFilterServerPort, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + ApplyTo: networkingv1alpha3.EnvoyFilter_CLUSTER, + Match: &networkingv1alpha3.EnvoyFilter_EnvoyConfigObjectMatch{ + Context: networkingv1alpha3.EnvoyFilter_SIDECAR_OUTBOUND, + }, + Patch: &networkingv1alpha3.EnvoyFilter_Patch{ + Operation: networkingv1alpha3.EnvoyFilter_Patch_ADD, + Value: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "name": { + Kind: &_struct.Value_StringValue{ + StringValue: RemoteWasmFilterClusterName, + }, + }, + "type": { + Kind: &_struct.Value_StringValue{ + StringValue: "STRICT_DNS", + }, + }, + "connect_timeout": { + Kind: &_struct.Value_StringValue{ + StringValue: "1s", + }, + }, + "dns_refresh_rate": { + Kind: &_struct.Value_StringValue{ + StringValue: Timeout, + }, + }, + "dns_lookup_family": { + Kind: &_struct.Value_StringValue{ + StringValue: "V4_ONLY", + }, + }, + "lb_policy": { + Kind: &_struct.Value_StringValue{ + StringValue: "ROUND_ROBIN", + }, + }, + "load_assignment": { + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "cluster_name": { + Kind: &_struct.Value_StringValue{ + StringValue: RemoteWasmFilterClusterName, + }, + }, + "endpoints": { + Kind: &_struct.Value_ListValue{ + ListValue: &_struct.ListValue{ + Values: []*_struct.Value{ + { + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "lb_endpoints": { + Kind: &_struct.Value_ListValue{ + ListValue: &_struct.ListValue{ + Values: []*_struct.Value{ + { + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "endpoint": { + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "address": { + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "socket_address": { + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "address": { + Kind: &_struct.Value_StringValue{ + StringValue: "raw.githubusercontent.com", + }, + }, + "port_value": { + Kind: &_struct.Value_NumberValue{ + NumberValue: 443, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + "transport_socket": { + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "name": { + Kind: &_struct.Value_StringValue{ + StringValue: "envoy.transport_sockets.tls", + }, + }, + "typed_config": { + Kind: &_struct.Value_StructValue{ + StructValue: &_struct.Struct{ + Fields: map[string]*_struct.Value{ + "@type": { + Kind: &_struct.Value_StringValue{ + StringValue: "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext", + }, + }, + "sni": { + Kind: &_struct.Value_StringValue{ + StringValue: "raw.githubusercontent.com", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + existingFilter := &istionet.EnvoyFilter{} + if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(filter), existingFilter); err != nil { + if errors.IsNotFound(err) { + if err := k8sClient.Create(ctx, filter); err != nil { + return err + } + logger.Infow("Created Envoy Filter", "name", filter.Name, "namespace", filter.Namespace) + return nil + } + return err + } + + return nil +} diff --git a/sentryflow/pkg/types/types.go b/sentryflow/pkg/types/types.go deleted file mode 100644 index 4ce59ab..0000000 --- a/sentryflow/pkg/types/types.go +++ /dev/null @@ -1,36 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package types - -// == // - -// K8sResourceTypes -const ( - K8sResourceTypeUnknown = 0 - K8sResourceTypePod = 1 - K8sResourceTypeService = 2 -) - -// K8sResource Structure -type K8sResource struct { - Type uint8 - Namespace string - Name string - Labels map[string]string - Containers []string -} - -// K8sResourceTypeToString Function -func K8sResourceTypeToString(resourceType uint8) string { - switch resourceType { - case K8sResourceTypePod: - return "Pod" - case K8sResourceTypeService: - return "Service" - case K8sResourceTypeUnknown: - return "Unknown" - } - return "Unknown" -} - -// == // diff --git a/sentryflow/pkg/util/util.go b/sentryflow/pkg/util/util.go new file mode 100644 index 0000000..5af9a2d --- /dev/null +++ b/sentryflow/pkg/util/util.go @@ -0,0 +1,21 @@ +package util + +import ( + "context" + + "go.uber.org/zap" +) + +const ( + LoggerCtxKey = "logger" + ServiceMeshIstioSidecar = "istio-sidecar" + ServiceMeshIstioAmbient = "istio-ambient" + ServiceMeshKong = "kong" + ServiceMeshConsul = "consul" + ServiceMeshLinkerd = "linkerd" +) + +func LoggerFromCtx(ctx context.Context) *zap.SugaredLogger { + logger, _ := ctx.Value(LoggerCtxKey).(*zap.SugaredLogger) + return logger +}