From 0f887a666e059f6ceb6942a6dc30b3615af7a1ab Mon Sep 17 00:00:00 2001 From: dvtaras2 Date: Fri, 30 Dec 2022 20:46:35 +0300 Subject: [PATCH] Add observability stack for SOLDR (#40) * Add observability stack for SOLDR * fix: naming of services, containers, volumes and hostnames * fix: add mapping device-id from host OS to the vx-containers to pinning generated service ID * fix: reorder services in the docker compose file * fix: default option of ES Java opts * fix: bug with delivery ES metrics from exporter * feat: add loki service support in compose * fix: pinning observability components version * fix: docker compose restart policy to unless-stopped * feat: add collecting logs and sending it to loki via otel collector * Update README.md Co-authored-by: Dmitry Ng <19asdek91@gmail.com> Co-authored-by: Denis Tarasov Co-authored-by: Mikhail Kochegarov --- .env.template | 9 + README.md | 23 +++ cmd/agent/main.go | 27 ++- cmd/api/main.go | 35 +++- cmd/server/main.go | 29 ++- docker-compose.yml | 274 ++++++++++++++++++++++++++ go.mod | 10 +- go.sum | 13 +- internal/lua/state_test.go | 8 +- internal/observability/client_test.go | 2 + internal/observability/metric_test.go | 44 ++--- internal/observability/obs.go | 10 +- internal/observability/trace.go | 251 +++++++++++++++++++++++ internal/observability/trace_test.go | 32 +-- 14 files changed, 702 insertions(+), 65 deletions(-) diff --git a/.env.template b/.env.template index a2d793d8..605f8c02 100644 --- a/.env.template +++ b/.env.template @@ -49,3 +49,12 @@ MINIO_ENDPOINT=http://127.0.0.1:9000 # internal services INTERNAL_NET_HOST=127.0.0.1 + +# observability +OTEL_ADDR=otel.local:8148 +GRAFANA_SERVER_HOST=0.0.0.0 +GRAFANA_SERVER_PORT=3000 +MASTER_PASSWORD=P@ssw0rd +ELK_VERSION=7.14.1 +ES_JAVA_OPTS="-Xmx2g -Xms2g" +SPAN_STORAGE_TYPE=elasticsearch diff --git a/README.md b/README.md index 88e069de..1703b235 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,12 @@ docker compose pull docker compose up -d ``` +#### Run observability stack components + +```bash +docker compose --profile obs up -d +``` + #### Stop services ```bash @@ -219,6 +225,23 @@ Launch on of the available debug tasks: - launch vxagent - launch web ui +### Observability stack + +Observability stack collect metrics, traces, logs from SOLDR components. Stack consist of: +- `Grafana` - querying and visualizing observability data +- `VictoriaMetrics` - datastore for server and SOLDR components metrics +- `node-exporter` and `elasticsearch-exporter` - scraping metrics +- `Jaeger` - storing and querying traces +- `Elasticsearch` - datastore for `Jaeger` +- `OpenTelemetry collector` - single entry point to receive, process and export all observability data + +> For more information about collector, visit [`https://opentelemetry.io/docs/collector`](https://opentelemetry.io/docs/collector). + +Run observability stack components and then open `Grafana` in a browser [`https://localhost:3000`](https://localhost:3000). Default credentials: `admin/admin`. After default password for `admin` user changed, you can check provisioned SOLDR dashboards by click `Dashboards` icon on menu bar. On this SOLDR dashboards you can view server, agents and modules resource utilization, events statistics etc. +For checking traces, you need to click `Explore` icon on menu bar, then choose `Jaeger` data source from dropdown in the top left. Now you can query and filter traces by `Service Name`, `Operation Name`, `Tags`, `duration` and `Time ranges`. + +> Full observability stack requires more resources compare to clean SOLDR. + ### Clean up the project Remove all build files and other security keys: diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 6af81af8..92a501da 100755 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -292,7 +292,13 @@ func configureLogging(ctx context.Context, c *config.Config) (func(), error) { } } tracerClient := observability.NewHookTracerClient(c.TracerConfigClient) - tracerProvider, err := observability.NewTracerProvider(ctx, tracerClient, serviceName, c.Version, attr) + tracerProvider, err := observability.NewTracerProvider( + ctx, + tracerClient, + serviceName, + c.Version, + attr, + ) if err != nil { return nil, fmt.Errorf("failed to initialize a tracer provider for logging: %w", err) } @@ -305,12 +311,27 @@ func configureLogging(ctx context.Context, c *config.Config) (func(), error) { } } meterClient := observability.NewHookMeterClient(c.MeterConfigClient) - meterProvider, err := observability.NewMeterProvider(ctx, meterClient, serviceName, c.Version, attr) + meterProvider, err := observability.NewMeterProvider( + ctx, + meterClient, + serviceName, + c.Version, + attr, + ) if err != nil { return nil, fmt.Errorf("failed to initialized a metrics provider for logging") } - observability.InitObserver(ctx, tracerProvider, meterProvider, tracerClient, meterClient, serviceName, logLevels) + observability.InitObserver( + ctx, + tracerProvider, + meterProvider, + tracerClient, + meterClient, + serviceName, + c.Version, + logLevels, + ) return func() { observability.Observer.Close() }, nil diff --git a/cmd/api/main.go b/cmd/api/main.go index 4755a198..cf9520bd 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -31,6 +31,8 @@ import ( "soldr/internal/version" ) +const serviceName = "vxapi" + type Config struct { Debug bool `config:"debug"` Develop bool `config:"is_develop"` @@ -191,7 +193,7 @@ func main() { serviceS3ConnectionStorage := mem.NewServiceS3ConnectionStorage() tracerClient := observability.NewProxyTracerClient( - observability.NewOtlpTracerClient(cfg.Tracing.Addr), + observability.NewOtlpTracerAndLoggerClient(cfg.Tracing.Addr), observability.NewHookTracerClient(&observability.HookClientConfig{ ResendTimeout: observability.DefaultResendTimeout, QueueSizeLimit: observability.DefaultQueueSizeLimit, @@ -199,7 +201,13 @@ func main() { }), ) attr := attribute.String("api_server_id", system.MakeAgentID()) - tracerProvider, err := observability.NewTracerProvider(ctx, tracerClient, "vxapi", version.GetBinaryVersion(), attr) + tracerProvider, err := observability.NewTracerProvider( + ctx, + tracerClient, + serviceName, + version.GetBinaryVersion(), + attr, + ) if err != nil { logger.WithError(err).Error("could not create tracer provider") return @@ -216,7 +224,13 @@ func main() { logger.WithError(err).Error("could not create meter client") return } - meterProvider, err := observability.NewMeterProvider(ctx, meterClient, "vxapi", version.GetBinaryVersion(), attr) + meterProvider, err := observability.NewMeterProvider( + ctx, + meterClient, + serviceName, + version.GetBinaryVersion(), + attr, + ) if err != nil { logger.WithError(err).Error("could not create meter provider") return @@ -232,7 +246,16 @@ func main() { if cfg.Debug { logLevels = append(logLevels, logrus.DebugLevel) } - observability.InitObserver(ctx, tracerProvider, meterProvider, tracerClient, meterClient, "vxapi", logLevels) + observability.InitObserver( + ctx, + tracerProvider, + meterProvider, + tracerClient, + meterClient, + serviceName, + version.GetBinaryVersion(), + logLevels, + ) gormMeter := meterProvider.Meter("vxapi-meter") if err = meter.InitGormMetrics(gormMeter); err != nil { @@ -241,8 +264,8 @@ func main() { } // initialize system metric collection in current observer instance - observability.Observer.StartProcessMetricCollect("vxapi", version.GetBinaryVersion(), attr) - observability.Observer.StartGoRuntimeMetricCollect("vxapi", version.GetBinaryVersion(), attr) + observability.Observer.StartProcessMetricCollect(serviceName, version.GetBinaryVersion(), attr) + observability.Observer.StartGoRuntimeMetricCollect(serviceName, version.GetBinaryVersion(), attr) defer observability.Observer.Close() exchanger := srvevents.NewExchanger() diff --git a/cmd/server/main.go b/cmd/server/main.go index 3a33c94e..2e6c250f 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -485,7 +485,7 @@ func getLogDir(configLogDir string) (string, error) { func initObserver(server *Server, _ *logrus.Entry) (func(), error) { server.tracerClient = observability.NewProxyTracerClient( - observability.NewOtlpTracerClient(server.config.OtelAddr), + observability.NewOtlpTracerAndLoggerClient(server.config.OtelAddr), observability.NewHookTracerClient(&observability.HookClientConfig{ ResendTimeout: 100 * time.Millisecond, QueueSizeLimit: 100 * 1024 * 1024, // 100 MB @@ -502,11 +502,23 @@ func initObserver(server *Server, _ *logrus.Entry) (func(), error) { ) attr := attribute.String("server_id", system.MakeAgentID()) ctx := context.Background() - tracerProvider, err := observability.NewTracerProvider(ctx, server.tracerClient, serviceName, server.version, attr) + tracerProvider, err := observability.NewTracerProvider( + ctx, + server.tracerClient, + serviceName, + server.version, + attr, + ) if err != nil { return nil, fmt.Errorf("failed to initialize a tracer provider: %w", err) } - meterProvider, err := observability.NewMeterProvider(ctx, server.metricsClient, serviceName, server.version, attr) + meterProvider, err := observability.NewMeterProvider( + ctx, + server.metricsClient, + serviceName, + server.version, + attr, + ) if err != nil { return nil, fmt.Errorf("failed to initialize a metrics provider: %w", err) } @@ -524,7 +536,16 @@ func initObserver(server *Server, _ *logrus.Entry) (func(), error) { } else { logrus.SetLevel(logrus.InfoLevel) } - observability.InitObserver(ctx, tracerProvider, meterProvider, server.tracerClient, server.metricsClient, serviceName, logLevels) + observability.InitObserver( + ctx, + tracerProvider, + meterProvider, + server.tracerClient, + server.metricsClient, + serviceName, + server.version, + logLevels, + ) return func() { observability.Observer.Close() }, nil diff --git a/docker-compose.yml b/docker-compose.yml index f6bd259d..e0c66d80 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -97,6 +97,7 @@ services: image: vxcontrol/soldr-dbmigrate container_name: vx_dbmigrate hostname: dbmigrate.local + restart: unless-stopped networks: - vx-stand environment: @@ -148,10 +149,13 @@ services: MINIO_SECRET_KEY: ${MINIO_SECRET_KEY} MINIO_BUCKET_NAME: ${AGENT_SERVER_MINIO_BUCKET_NAME} MINIO_ENDPOINT: http://minio.local:9000 + OTEL_ADDR: ${OTEL_ADDR:-otel.local:8148} volumes: - vx-server-data:/opt/vxserver/data:rw - vx-server-logs:/opt/vxserver/logs:rw - vx-server-store:/tmp/vx-store:rw + - /var/lib/dbus/machine-id:/var/lib/dbus/machine-id:ro + - /etc/machine-id:/etc/machine-id:ro depends_on: modules: condition: service_started @@ -183,6 +187,8 @@ services: volumes: - vx-api-server-ssl:/opt/api/ssl:rw - vx-api-server-logs:/opt/api/logs:rw + - /var/lib/dbus/machine-id:/var/lib/dbus/machine-id:ro + - /etc/machine-id:/etc/machine-id:ro environment: API_USE_SSL: ${UI_USE_SSL:-true} API_SERVER_HOST: ${API_SERVER_HOST:-0.0.0.0} @@ -198,6 +204,7 @@ services: MINIO_SECRET_KEY: ${MINIO_SECRET_KEY} MINIO_BUCKET_NAME: ${MINIO_BUCKET_NAME} MINIO_ENDPOINT: http://minio.local:9000 + OTEL_ADDR: ${OTEL_ADDR:-otel.local:8148} depends_on: minio: condition: service_started @@ -217,10 +224,257 @@ services: - vx-stand environment: CONNECT: "wss://server.local:8443" + volumes: + - /var/lib/dbus/machine-id:/var/lib/dbus/machine-id:ro + - /etc/machine-id:/etc/machine-id:ro depends_on: server: condition: service_started + elasticsearch: + image: elasticsearch:7.17.8 + container_name: vx_elasticsearch + hostname: elasticsearch.local + restart: unless-stopped + networks: + - vx-stand + expose: + - 9200/tcp + - 9300/tcp + volumes: + - vx-elasticsearch-config:/usr/share/elasticsearch/config:rw + - vx-elasticsearch-data:/usr/share/elasticsearch/data:rw + ulimits: + memlock: + hard: -1 + soft: -1 + environment: + ELASTIC_PASSWORD: ${ELASTIC_PASSWORD:-changeme} + ES_JAVA_OPTS: ${ES_JAVA_OPTS:--Xmx2g -Xms2g} + depends_on: + elasticsearch-config: + condition: service_started + profiles: ["obs"] + + victoriametrics: + image: victoriametrics/victoria-metrics:v1.85.3 + container_name: vx_victoriametrics + hostname: victoriametrics.local + restart: unless-stopped + command: + - --storageDataPath=/storage + - --graphiteListenAddr=:2003 + - --opentsdbListenAddr=:4242 + - --httpListenAddr=:8428 + - --influxListenAddr=:8089 + - --selfScrapeInterval=10s + networks: + - vx-stand + expose: + - 8428/tcp + volumes: + - vx-victoriametrics-data:/storage:rw + depends_on: + observability-config: + condition: service_started + profiles: ["obs"] + + jaeger: + image: jaegertracing/all-in-one:1.40.0 + container_name: vx_jaeger + hostname: jaeger.local + restart: unless-stopped + command: + - --config-file=/etc/jaeger/config.yml + networks: + - vx-stand + expose: + - 16686/tcp + - 14250/tcp + - 14268/tcp + - 5778/tcp + - 5775/udp + - 6831/udp + - 6832/udp + volumes: + - vx-jaeger-config:/etc/jaeger:rw + ulimits: + nofile: + hard: 65000 + soft: 65000 + nproc: 65535 + environment: + SPAN_STORAGE_TYPE: ${SPAN_STORAGE_TYPE:-elasticsearch} + depends_on: + observability-config: + condition: service_healthy + profiles: ["obs"] + + loki: + image: grafana/loki:2.7.1 + container_name: vx_loki + hostname: loki.local + restart: unless-stopped + command: + - -config.file=/etc/loki/config.yaml + networks: + - vx-stand + expose: + - 3100/tcp + volumes: + - vx-loki-config:/etc/loki:rw + - vx-loki-data:/loki:rw + ulimits: + nofile: + hard: 65000 + soft: 65000 + nproc: 65535 + depends_on: + observability-config: + condition: service_healthy + profiles: ["obs"] + + grafana: + image: grafana/grafana:9.3.2 + container_name: vx_grafana + hostname: grafana.local + restart: unless-stopped + networks: + - vx-stand + expose: + - 3000/tcp + ports: + - ${GRAFANA_SERVER_HOST:-0.0.0.0}:${GRAFANA_SERVER_PORT:-3000}:3000 + volumes: + - vx-grafana-config:/etc/grafana:rw + - vx-grafana-data:/var/lib/grafana:rw + - vx-grafana-dashboards:/var/lib/grafana/dashboards:rw + depends_on: + observability-config: + condition: service_healthy + profiles: ["obs"] + + otel: + image: otel/opentelemetry-collector-contrib:0.68.0 + container_name: vx_otel + hostname: otel.local + restart: unless-stopped + networks: + - vx-stand + expose: + - 55679/tcp + - 14268/tcp + - 8418/tcp + - 8148/tcp + - 6831/tcp + - 1777/tcp + ports: + - ${INTERNAL_NET_HOST:-127.0.0.1}:8148:8148 + volumes: + - vx-otel-config:/etc/otel:rw + entrypoint: ["/otelcol-contrib", "--config", "/etc/otel/config.yaml"] + depends_on: + observability-config: + condition: service_healthy + profiles: ["obs"] + + observability-config: + image: vxcontrol/soldr-observability-stack-config:latest + container_name: vx_observability_config + hostname: observability-config.local + restart: unless-stopped + networks: + - vx-stand + volumes: + - vx-elasticsearch-config:/usr/share/elasticsearch/config:rw + - vx-grafana-config:/etc/grafana:rw + - vx-jaeger-config:/etc/jaeger:rw + - vx-loki-config:/etc/loki:rw + - vx-otel-config:/etc/otel:rw + - vx-grafana-dashboards:/var/lib/grafana/dashboards:rw + environment: + ELK_VERSION: ${ELK_VERSION:-7.17.8} + healthcheck: + interval: 30s + retries: 3 + test: + - CMD + - test + - -f + - /opt/soldr_observability/healthcheck + timeout: 10s + depends_on: + elasticsearch-config: + condition: service_healthy + profiles: ["obs"] + + elasticsearch-config: + image: vxcontrol/soldr-elastic-config:latest + container_name: vx_elasticsearch_config + hostname: elasticsearch-config.local + restart: unless-stopped + networks: + - vx-stand + volumes: + - vx-elasticsearch-config:/usr/share/elasticsearch/config:rw + environment: + MASTER_PASSWORD: ${MASTER_PASSWORD:-P@ssw0rd} + healthcheck: + interval: 0h + retries: 10 + test: + - CMD + - test + - -f + - /usr/share/elasticsearch/config/passfile + timeout: 20s + profiles: ["obs"] + + elasticsearch-exporter: + image: quay.io/prometheuscommunity/elasticsearch-exporter:v1.5.0 + container_name: vx_elasticsearch_exporter + hostname: elasticsearch-exporter.local + restart: unless-stopped + networks: + - vx-stand + expose: + - 9114/tcp + volumes: + - vx-elasticsearch-config:/usr/share/elasticsearch/config:ro + entrypoint: + - /bin/sh + - -c + - /bin/elasticsearch_exporter --es.uri=http://elastic:$$(cat /usr/share/elasticsearch/config/passfile)@elasticsearch.local:9200 + depends_on: + elasticsearch-config: + condition: service_healthy + profiles: ["obs"] + + node-exporter: + image: prom/node-exporter:v1.5.0 + container_name: vx_node_exporter + hostname: node-exporter.local + restart: unless-stopped + command: + - --path.procfs=/host/proc + - --path.sysfs=/host/sys + - --collector.filesystem.ignored-mount-points + - ^/(sys|proc|dev|host|etc|rootfs/var/lib/docker/containers|rootfs/var/lib/docker/overlay2|rootfs/run/docker/netns|rootfs/var/lib/docker/aufs)($$|/) + networks: + - vx-stand + expose: + - 9100/tcp + volumes: + - /proc:/host/proc:ro + - /sys:/host/sys:ro + - /:/rootfs:ro + deploy: + mode: global + depends_on: + victoriametrics: + condition: service_started + profiles: ["obs"] + networks: vx-stand: driver: bridge @@ -240,3 +494,23 @@ volumes: driver: local vx-server-store: driver: local + vx-elasticsearch-data: + driver: local + vx-elasticsearch-config: + driver: local + vx-grafana-config: + driver: local + vx-grafana-dashboards: + driver: local + vx-grafana-data: + driver: local + vx-loki-config: + driver: local + vx-loki-data: + driver: local + vx-jaeger-config: + driver: local + vx-otel-config: + driver: local + vx-victoriametrics-data: + driver: local diff --git a/go.mod b/go.mod index 45bba2d4..e483335a 100644 --- a/go.mod +++ b/go.mod @@ -30,9 +30,9 @@ require ( github.com/pierrec/lz4/v4 v4.1.15 github.com/qor/validations v0.0.0-20171228122639-f364bca61b46 github.com/rubenv/sql-migrate v1.2.0 - github.com/shirou/gopsutil/v3 v3.22.9 + github.com/shirou/gopsutil/v3 v3.22.10 github.com/sirupsen/logrus v1.9.0 - github.com/stretchr/testify v1.8.0 + github.com/stretchr/testify v1.8.1 github.com/swaggo/files v0.0.0-20210815190702-a29dd2bc99b2 github.com/swaggo/gin-swagger v1.3.0 github.com/swaggo/swag v1.8.7 @@ -43,6 +43,7 @@ require ( github.com/vxcontrol/luar v1.1.2 github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 github.com/xeipuuv/gojsonschema v1.2.0 + go.opentelemetry.io/collector/model v0.44.0 go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.28.0 go.opentelemetry.io/otel v1.9.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.26.0 @@ -57,6 +58,7 @@ require ( golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 golang.org/x/sync v0.1.0 golang.org/x/sys v0.0.0-20221013171732-95e765b1cc43 + google.golang.org/grpc v1.46.2 google.golang.org/protobuf v1.28.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/yaml.v2 v2.4.0 @@ -134,6 +136,7 @@ require ( github.com/gobwas/httphead v0.1.0 // indirect github.com/gobwas/pool v0.2.1 // indirect github.com/gofrs/flock v0.8.1 // indirect + github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golangci/check v0.0.0-20180506172741-cfe4005ccda2 // indirect github.com/golangci/dupl v0.0.0-20180902072040-3e9179ac440a // indirect @@ -254,7 +257,7 @@ require ( github.com/spf13/viper v1.13.0 // indirect github.com/ssgreg/nlreturn/v2 v2.2.1 // indirect github.com/stbenjam/no-sprintf-host-port v0.1.1 // indirect - github.com/stretchr/objx v0.4.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect github.com/subosito/gotenv v1.4.1 // indirect github.com/tdakkota/asciicheck v0.1.1 // indirect github.com/tetafro/godot v1.4.11 // indirect @@ -289,7 +292,6 @@ require ( golang.org/x/text v0.3.7 // indirect golang.org/x/tools v0.1.12 // indirect google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd // indirect - google.golang.org/grpc v1.46.2 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect honnef.co/go/tools v0.3.3 // indirect diff --git a/go.sum b/go.sum index 724cec9b..13609d8d 100644 --- a/go.sum +++ b/go.sum @@ -383,6 +383,7 @@ github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-jwt/jwt/v4 v4.4.1 h1:pC5DB52sCeK48Wlb9oPcdhnjkz1TKt1D/P7WKJ0kUcQ= github.com/golang-jwt/jwt/v4 v4.4.1/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= @@ -943,8 +944,8 @@ github.com/securego/gosec/v2 v2.13.1 h1:7mU32qn2dyC81MH9L2kefnQyRMUarfDER3iQyMHc github.com/securego/gosec/v2 v2.13.1/go.mod h1:EO1sImBMBWFjOTFzMWfTRrZW6M15gm60ljzrmy/wtHo= github.com/shazow/go-diff v0.0.0-20160112020656-b6b7b6733b8c h1:W65qqJCIOVP4jpqPQ0YvHYKwcMEMVWIzWC5iNQQfBTU= github.com/shazow/go-diff v0.0.0-20160112020656-b6b7b6733b8c/go.mod h1:/PevMnwAxekIXwN8qQyfc5gl2NlkB3CQlkizAbOkeBs= -github.com/shirou/gopsutil/v3 v3.22.9 h1:yibtJhIVEMcdw+tCTbOPiF1VcsuDeTE4utJ8Dm4c5eA= -github.com/shirou/gopsutil/v3 v3.22.9/go.mod h1:bBYl1kjgEJpWpxeHmLI+dVHWtyAwfcmSBLDsp2TNT8A= +github.com/shirou/gopsutil/v3 v3.22.10 h1:4KMHdfBRYXGF9skjDWiL4RA2N+E8dRdodU/bOZpPoVg= +github.com/shirou/gopsutil/v3 v3.22.10/go.mod h1:QNza6r4YQoydyCfo6rH0blGfKahgibh4dQmV5xdFkQk= github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/shurcooL/go v0.0.0-20180423040247-9e1955d9fb6e/go.mod h1:TDJrrUr11Vxrven61rcy3hJMUqaf/CLWYhHNPmT14Lk= @@ -995,8 +996,9 @@ github.com/stbenjam/no-sprintf-host-port v0.1.1 h1:tYugd/yrm1O0dV+ThCbaKZh195Dfm github.com/stbenjam/no-sprintf-host-port v0.1.1/go.mod h1:TLhvtIvONRzdmkFiio4O8LHsN9N74I+PhRquPsxpL0I= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.1.4/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -1005,8 +1007,9 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/subosito/gotenv v1.4.1 h1:jyEFiXpy21Wm81FBN71l9VoMMV8H8jG+qIK3GCpY6Qs= github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= @@ -1121,6 +1124,8 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +go.opentelemetry.io/collector/model v0.44.0 h1:I+M6X2NANYChOGYrpbxCoEYJah3eHdMvumKjothIAtA= +go.opentelemetry.io/collector/model v0.44.0/go.mod h1:4jo1R8uBDspLCxUGhQ0k3v/EFXFbW7s0AIy3LuGLbcU= go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.28.0 h1:e6uFYVURwheCC4GwkG4XCsWHoNQ8nPpYXCZctcg3mnw= go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.28.0/go.mod h1:f56Jk2pg43YRxWz9OMsVOFWh2HEPzHAjdfmC2pNG90M= go.opentelemetry.io/contrib/propagators/b3 v1.2.0 h1:+zQjl3DBSOle9GEhHuhqzDUKtYcVSfbHSNv24hsoOJ0= diff --git a/internal/lua/state_test.go b/internal/lua/state_test.go index a7588a3c..363241aa 100755 --- a/internal/lua/state_test.go +++ b/internal/lua/state_test.go @@ -37,17 +37,19 @@ func initObserver(uploadTraces, uploadMetrics func(context.Context, [][]byte) er } clientTraces := obs.NewHookTracerClient(clientTracesCfg) - provider, err := obs.NewTracerProvider(ctx, clientTraces, "vxcommon", "v1.0.0-develop") + service := "vxcommon" + version := "v1.0.0-develop" + provider, err := obs.NewTracerProvider(ctx, clientTraces, service, version) if err != nil { return err } clientMetric := obs.NewHookMeterClient(clientMetricsCfg) - controller, err := obs.NewMeterProvider(ctx, clientMetric, "vxcommon", "v1.0.0-develop") + controller, err := obs.NewMeterProvider(ctx, clientMetric, service, version) if err != nil { return err } - obs.InitObserver(ctx, provider, controller, clientTraces, clientMetric, "vxcommon", []logrus.Level{ + obs.InitObserver(ctx, provider, controller, clientTraces, clientMetric, service, version, []logrus.Level{ logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel, diff --git a/internal/observability/client_test.go b/internal/observability/client_test.go index a60d8f6c..72d1ca9a 100644 --- a/internal/observability/client_test.go +++ b/internal/observability/client_test.go @@ -18,6 +18,8 @@ import ( const ( defUploadChan = 256 + service = "vxcommon" + version = "v1.0.0-develop" ) func makeResourceSpans(num int) []*tracepb.ResourceSpans { diff --git a/internal/observability/metric_test.go b/internal/observability/metric_test.go index 62b6e86d..5a69d320 100644 --- a/internal/observability/metric_test.go +++ b/internal/observability/metric_test.go @@ -237,11 +237,11 @@ func TestObserverMetricsAPI(t *testing.T) { } client := obs.NewHookMeterClient(clientCfg) - controller, err := obs.NewMeterProvider(ctx, client, "vxcommon", "v1.0.0-develop") + controller, err := obs.NewMeterProvider(ctx, client, service, version) if err != nil { t.Fatal(err) } - obs.InitObserver(ctx, nil, controller, nil, client, "vxcommon", nil) + obs.InitObserver(ctx, nil, controller, nil, client, service, version, nil) // start metrics collecting putTestMetricsSet(t, obs.Observer) @@ -274,11 +274,11 @@ func TestObserverMetricsAPIGaugeCounter(t *testing.T) { } client := obs.NewHookMeterClient(clientCfg) - controller, err := obs.NewMeterProvider(ctx, client, "vxcommon", "v1.0.0-develop") + controller, err := obs.NewMeterProvider(ctx, client, service, version) if err != nil { t.Fatal(err) } - obs.InitObserver(ctx, nil, controller, nil, client, "vxcommon", nil) + obs.InitObserver(ctx, nil, controller, nil, client, service, version, nil) // start metrics collecting gaugeInt64Counter, err := obs.Observer.NewInt64GaugeCounter("test-int64-gauge-counter") @@ -330,11 +330,11 @@ func TestObserverMetricsAPIWithFlush(t *testing.T) { } client := obs.NewHookMeterClient(clientCfg) - controller, err := obs.NewMeterProvider(ctx, client, "vxcommon", "v1.0.0-develop") + controller, err := obs.NewMeterProvider(ctx, client, service, version) if err != nil { t.Fatal(err) } - obs.InitObserver(ctx, nil, controller, nil, client, "vxcommon", nil) + obs.InitObserver(ctx, nil, controller, nil, client, service, version, nil) // start metrics collecting putTestMetricsSet(t, obs.Observer) @@ -386,11 +386,11 @@ func TestObserverMetricsAPIWithTimeout(t *testing.T) { } client := obs.NewHookMeterClient(clientCfg) - controller, err := obs.NewMeterProvider(ctx, client, "vxcommon", "v1.0.0-develop") + controller, err := obs.NewMeterProvider(ctx, client, service, version) if err != nil { t.Fatal(err) } - obs.InitObserver(ctx, nil, controller, nil, client, "vxcommon", nil) + obs.InitObserver(ctx, nil, controller, nil, client, service, version, nil) // start metrics collecting putTestMetricsSet(t, obs.Observer) @@ -429,11 +429,11 @@ func TestObserverMetricsAPIRegistry(t *testing.T) { } client := obs.NewHookMeterClient(clientCfg) - controller, err := obs.NewMeterProvider(ctx, client, "vxcommon", "v1.0.0-develop") + controller, err := obs.NewMeterProvider(ctx, client, service, version) if err != nil { t.Fatal(err) } - obs.InitObserver(ctx, nil, controller, nil, client, "vxcommon", nil) + obs.InitObserver(ctx, nil, controller, nil, client, service, version, nil) registry, err := obs.NewMetricRegistry() if err != nil { @@ -472,14 +472,14 @@ func TestObserverMetricsCollector(t *testing.T) { } client := obs.NewHookMeterClient(clientCfg) - controller, err := obs.NewMeterProvider(ctx, client, "vxcommon", "v1.0.0-develop") + controller, err := obs.NewMeterProvider(ctx, client, service, version) if err != nil { t.Fatal(err) } - obs.InitObserver(ctx, nil, controller, nil, client, "vxcommon", nil) + obs.InitObserver(ctx, nil, controller, nil, client, service, version, nil) // start metrics collecting - obs.Observer.StartDumperMetricCollect(&d, "vxcommon", "v1.0.0-develop") + obs.Observer.StartDumperMetricCollect(&d, service, version) // stop metrics collecting obs.Observer.Close() @@ -524,14 +524,14 @@ func TestObserverMetricsProxyCollector(t *testing.T) { } hookClient := obs.NewHookMeterClient(hookClientCfg) proxyClient := obs.NewProxyMeterClient(adoptiveClient, hookClient) - controller, err := obs.NewMeterProvider(ctx, proxyClient, "vxcommon", "v1.0.0-develop") + controller, err := obs.NewMeterProvider(ctx, proxyClient, service, version) if err != nil { t.Fatal(err) } - obs.InitObserver(ctx, nil, controller, nil, proxyClient, "vxcommon", nil) + obs.InitObserver(ctx, nil, controller, nil, proxyClient, service, version, nil) // start metrics collecting - obs.Observer.StartDumperMetricCollect(&d, "vxcommon", "v1.0.0-develop") + obs.Observer.StartDumperMetricCollect(&d, service, version) // stop metrics collecting obs.Observer.Close() @@ -573,11 +573,11 @@ func BenchmarkObserverMetricsAPI(b *testing.B) { } client := obs.NewHookMeterClient(clientCfg) - controller, err := obs.NewMeterProvider(ctx, client, "vxcommon", "v1.0.0-develop") + controller, err := obs.NewMeterProvider(ctx, client, service, version) if err != nil { b.Fatal(err) } - obs.InitObserver(ctx, nil, controller, nil, client, "vxcommon", nil) + obs.InitObserver(ctx, nil, controller, nil, client, service, version, nil) meter := obs.Observer direction := int64(1) @@ -629,11 +629,11 @@ func BenchmarkObserverMetricsAPIRegistry(b *testing.B) { } client := obs.NewHookMeterClient(clientCfg) - controller, err := obs.NewMeterProvider(ctx, client, "vxcommon", "v1.0.0-develop") + controller, err := obs.NewMeterProvider(ctx, client, service, version) if err != nil { b.Fatal(err) } - obs.InitObserver(ctx, nil, controller, nil, client, "vxcommon", nil) + obs.InitObserver(ctx, nil, controller, nil, client, service, version, nil) meter, err := obs.NewMetricRegistry() if err != nil { @@ -696,11 +696,11 @@ func BenchmarkObserverMetricsProxyAPI(b *testing.B) { hookClient := obs.NewHookMeterClient(hookClientCfg) proxyClient := obs.NewProxyMeterClient(adoptiveClient, hookClient) - controller, err := obs.NewMeterProvider(ctx, proxyClient, "vxcommon", "v1.0.0-develop") + controller, err := obs.NewMeterProvider(ctx, proxyClient, service, version) if err != nil { b.Fatal(err) } - obs.InitObserver(ctx, nil, controller, nil, proxyClient, "vxcommon", nil) + obs.InitObserver(ctx, nil, controller, nil, proxyClient, service, version, nil) meter := obs.Observer direction := int64(1) diff --git a/internal/observability/obs.go b/internal/observability/obs.go index 670d9eac..99031590 100644 --- a/internal/observability/obs.go +++ b/internal/observability/obs.go @@ -107,7 +107,7 @@ type IDumper interface { } func init() { - InitObserver(context.Background(), nil, nil, nil, nil, "noop", []logrus.Level{}) + InitObserver(context.Background(), nil, nil, nil, nil, "noop", "v0.0.0", []logrus.Level{}) } func InitObserver( @@ -117,6 +117,7 @@ func InitObserver( tclient otlptrace.Client, mclient otlpmetric.Client, tname string, + tversion string, levels []logrus.Level, ) { if Observer != nil { @@ -133,6 +134,9 @@ func InitObserver( mclient: mclient, } + tverRev := strings.Split(tversion, "-") + tversion = strings.TrimPrefix(tverRev[0], "v") + if tprovider != nil { otel.SetTracerProvider(tprovider) otel.SetTextMapPropagator( @@ -141,13 +145,13 @@ func InitObserver( propagation.Baggage{}, ), ) - obs.tracer = tprovider.Tracer(tname) + obs.tracer = tprovider.Tracer(tname, oteltrace.WithInstrumentationVersion(tversion)) logrus.AddHook(obs) } if mprovider != nil { metricglobal.SetMeterProvider(mprovider) - obs.meter = mprovider.Meter(tname) + obs.meter = mprovider.Meter(tname, otelmetric.WithInstrumentationVersion(tversion)) mprovider.Start(ctx) } diff --git a/internal/observability/trace.go b/internal/observability/trace.go index 19b8bab4..fd502803 100644 --- a/internal/observability/trace.go +++ b/internal/observability/trace.go @@ -3,13 +3,21 @@ package observability import ( "context" "fmt" + "reflect" + "strings" + "sync" "time" + "go.opentelemetry.io/collector/model/otlpgrpc" + "go.opentelemetry.io/collector/model/pdata" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" sdktrace "go.opentelemetry.io/otel/sdk/trace" + commonpb "go.opentelemetry.io/proto/otlp/common/v1" tracepb "go.opentelemetry.io/proto/otlp/trace/v1" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/proto" ) @@ -18,6 +26,16 @@ const ( defExportTimeout = 10 * time.Second ) +var ( + pAnyValueTypeString = reflect.TypeOf((*commonpb.AnyValue_StringValue)(nil)) + pAnyValueTypeBool = reflect.TypeOf((*commonpb.AnyValue_BoolValue)(nil)) + pAnyValueTypeInt = reflect.TypeOf((*commonpb.AnyValue_IntValue)(nil)) + pAnyValueTypeDouble = reflect.TypeOf((*commonpb.AnyValue_DoubleValue)(nil)) + pAnyValueTypeArray = reflect.TypeOf((*commonpb.AnyValue_ArrayValue)(nil)) + pAnyValueTypeKvlist = reflect.TypeOf((*commonpb.AnyValue_KvlistValue)(nil)) + pAnyValueTypeBytes = reflect.TypeOf((*commonpb.AnyValue_BytesValue)(nil)) +) + // NewHookTracerClient is constructor for mocking otlp traces client func NewHookTracerClient(cfg *HookClientConfig) otlptrace.Client { return newHookClient(cfg) @@ -88,6 +106,219 @@ func (ptc *proxyTracerClient) UploadTraces(ctx context.Context, protoSpans []*tr return nil } +// logsTracerClient is a custom client to implement traces async uploader via hook client +type logsTracerClient struct { + mx *sync.Mutex + addr string + grpcClient *grpc.ClientConn + logsClient otlpgrpc.LogsClient + otlpClient otlptrace.Client +} + +func (ltc *logsTracerClient) newLogsClient(ctx context.Context) (*grpc.ClientConn, otlpgrpc.LogsClient, error) { + grpcClient, err := grpc.Dial(ltc.addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithWriteBufferSize(512*1024), + ) + if err != nil { + return nil, nil, fmt.Errorf("failed to connect to otel gRPC service: %w", err) + } + return grpcClient, otlpgrpc.NewLogsClient(grpcClient), nil +} + +func (ltc *logsTracerClient) copyToAttribute(src *commonpb.AnyValue, dst pdata.AttributeValue) { + switch reflect.TypeOf(src.GetValue()) { + case pAnyValueTypeString: + dst.SetStringVal(src.GetStringValue()) + case pAnyValueTypeBool: + dst.SetBoolVal(src.GetBoolValue()) + case pAnyValueTypeInt: + dst.SetIntVal(src.GetIntValue()) + case pAnyValueTypeDouble: + dst.SetDoubleVal(src.GetDoubleValue()) + case pAnyValueTypeBytes: + dst.SetBytesVal(src.GetBytesValue()) + case pAnyValueTypeArray: + slv := dst.SliceVal() + for _, v := range src.GetArrayValue().GetValues() { + ltc.copyToAttribute(v, slv.AppendEmpty()) + } + case pAnyValueTypeKvlist: + slv := dst.MapVal() + for _, kv := range src.GetKvlistValue().GetValues() { + a := pdata.NewAttributeValueEmpty() + k, v := kv.GetKey(), kv.GetValue() + ltc.copyToAttribute(v, a) + slv.Upsert(k, a) + } + default: + } +} + +func (ltc *logsTracerClient) copyToResAttributes(src []*commonpb.KeyValue, dst pdata.AttributeMap) { + for _, kv := range src { + k, v := strings.ReplaceAll(kv.GetKey(), ".", "_"), kv.GetValue() + if strings.HasPrefix(k, "telemetry") || k == "environment" { + continue + } else if k == "os_type" { + k = "service_os" + } + a := pdata.NewAttributeValueEmpty() + ltc.copyToAttribute(v, a) + dst.Upsert(k, a) + } +} + +func (ltc *logsTracerClient) copyToLogAttributes(src []*commonpb.KeyValue, dst pdata.AttributeMap, trimPrefix string) { + for _, kv := range src { + k, v := strings.ReplaceAll(strings.TrimPrefix(kv.GetKey(), trimPrefix), ".", "_"), kv.GetValue() + a := pdata.NewAttributeValueEmpty() + ltc.copyToAttribute(v, a) + dst.Upsert(k, a) + } +} + +func (ltc *logsTracerClient) getSeverityNumber(severity string) pdata.SeverityNumber { + switch severity { + case "TRACE", "trace": + return pdata.SeverityNumberTRACE + case "DEBUG", "debug": + return pdata.SeverityNumberDEBUG + case "INFO", "info": + return pdata.SeverityNumberINFO + case "WARN", "warn": + return pdata.SeverityNumberWARN + case "ERROR", "error": + return pdata.SeverityNumberERROR + case "FATAL", "fatal": + return pdata.SeverityNumberFATAL + default: + return pdata.SeverityNumberUNDEFINED + } +} + +func (ltc *logsTracerClient) getLogs(protoSpans []*tracepb.ResourceSpans) pdata.Logs { + pLog := pdata.NewLogs() + + for _, pspans := range protoSpans { + if pspans == nil { + continue + } + + pmm := pLog.ResourceLogs().AppendEmpty() + ltc.copyToResAttributes(pspans.Resource.GetAttributes(), pmm.Resource().Attributes()) + pmm.SetSchemaUrl(pspans.GetSchemaUrl()) + + instLibSpans := pspans.GetInstrumentationLibrarySpans() + for _, instLibSpan := range instLibSpans { + if instLibSpan == nil { + continue + } + + il := instLibSpan.GetInstrumentationLibrary() + lsl := pmm.InstrumentationLibraryLogs().AppendEmpty() + lsl.InstrumentationLibrary().SetName(il.GetName()) + lsl.InstrumentationLibrary().SetVersion(il.GetVersion()) + + spans := instLibSpan.GetSpans() + for _, span := range spans { + for _, event := range span.GetEvents() { + lr := lsl.LogRecords().AppendEmpty() + lr.SetName(span.GetName()) + + logTimeUnix := event.GetTimeUnixNano() + logTime := time.Unix(int64(logTimeUnix/1e9), int64(logTimeUnix%1e9)) + lr.SetTimestamp(pdata.NewTimestampFromTime(logTime)) + + var spanID [8]byte + copy(spanID[:], span.GetSpanId()[:8]) + lr.SetSpanID(pdata.NewSpanID(spanID)) + var traceID [16]byte + copy(traceID[:], span.GetTraceId()[:16]) + lr.SetTraceID(pdata.NewTraceID(traceID)) + + lra := lr.Attributes() + ltc.copyToLogAttributes(span.GetAttributes(), lra, "span.") + ltc.copyToLogAttributes(event.GetAttributes(), lra, event.GetName()+".") + + if event.GetName() == "exception" { + lra.UpsertString("severity", "ERROR") + } + lra.UpsertString("time", logTime.Format("2006-01-02 15:04:05.000000000")) + + var severity string + if a, ok := lra.Get("severity"); ok && a.Type() == pdata.AttributeValueTypeString { + severity = a.AsString() + } + + lr.SetSeverityNumber(ltc.getSeverityNumber(severity)) + lr.SetSeverityText(severity) + lr.SetFlags(31) + } + } + } + } + + return pLog +} + +// Start is function to run otlp and grpc clients pair (grpc client is first) +func (ltc *logsTracerClient) Start(ctx context.Context) error { + ltc.mx.Lock() + defer ltc.mx.Unlock() + + var err error + if ltc.logsClient == nil || ltc.grpcClient == nil { + if ltc.grpcClient, ltc.logsClient, err = ltc.newLogsClient(ctx); err != nil { + return fmt.Errorf("failed to start grpc client: %w", err) + } + } + if err = ltc.otlpClient.Start(ctx); err != nil { + return fmt.Errorf("failed to start otlp client: %w", err) + } + + return nil +} + +// Stop is function to stop otlp and grpc clients pair (otlp client is first) +func (ltc *logsTracerClient) Stop(ctx context.Context) error { + ltc.mx.Lock() + defer ltc.mx.Unlock() + + if err := ltc.otlpClient.Stop(ctx); err != nil { + return fmt.Errorf("failed to stop otlp client: %w", err) + } + ltc.logsClient = nil + if ltc.grpcClient == nil { + if err := ltc.grpcClient.Close(); err != nil { + return fmt.Errorf("failed to stop grpc client: %w", err) + } + ltc.grpcClient = nil + } + + return nil +} + +// UploadTraces is function to transfer traces, spans and logs via otlp and grpc clients +func (ltc *logsTracerClient) UploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) error { + ltc.mx.Lock() + defer ltc.mx.Unlock() + + if err := ltc.otlpClient.UploadTraces(ctx, protoSpans); err != nil { + return fmt.Errorf("failed to upload traces via otlp client: %w", err) + } + if ltc.grpcClient == nil || ltc.logsClient == nil { + return nil + } + request := otlpgrpc.NewLogsRequest() + request.SetLogs(ltc.getLogs(protoSpans)) + if _, err := ltc.logsClient.Export(ctx, request); err != nil { + return fmt.Errorf("failed to upload logs via grpc client: %w", err) + } + + return nil +} + // NewOtlpTracerClient is constructor for original otlp traces client func NewOtlpTracerClient(addr string) otlptrace.Client { opts := []otlptracegrpc.Option{ @@ -98,6 +329,26 @@ func NewOtlpTracerClient(addr string) otlptrace.Client { return otlptracegrpc.NewClient(opts...) } +// NewOtlpTracerAndLoggerClient is constructor for otlp traces and logs client +func NewOtlpTracerAndLoggerClient(addr string) otlptrace.Client { + var err error + ltc := logsTracerClient{ + mx: &sync.Mutex{}, + addr: addr, + } + ltc.grpcClient, err = grpc.Dial(addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithWriteBufferSize(512*1024), + ) + if err != nil { + fmt.Printf("failed to connect to otel gRPC service: %v\n", err) + } else { + ltc.logsClient = otlpgrpc.NewLogsClient(ltc.grpcClient) + } + ltc.otlpClient = NewOtlpTracerClient(addr) + return <c +} + // NewTracerProvider is constructor for otlp sdk traces provider with custom otlp client func NewTracerProvider( ctx context.Context, diff --git a/internal/observability/trace_test.go b/internal/observability/trace_test.go index 565f871f..346a7383 100644 --- a/internal/observability/trace_test.go +++ b/internal/observability/trace_test.go @@ -110,11 +110,11 @@ func TestObserverTracesAPI(t *testing.T) { } client := obs.NewHookTracerClient(clientCfg) - provider, err := obs.NewTracerProvider(ctx, client, "vxcommon", "v1.0.0-develop") + provider, err := obs.NewTracerProvider(ctx, client, service, version) if err != nil { t.Fatal(err) } - obs.InitObserver(ctx, provider, nil, client, nil, "vxcommon", []logrus.Level{ + obs.InitObserver(ctx, provider, nil, client, nil, service, version, []logrus.Level{ logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel, @@ -145,11 +145,11 @@ func TestObserverTracesAPIWithFlush(t *testing.T) { } client := obs.NewHookTracerClient(clientCfg) - provider, err := obs.NewTracerProvider(ctx, client, "vxcommon", "v1.0.0-develop") + provider, err := obs.NewTracerProvider(ctx, client, service, version) if err != nil { t.Fatal(err) } - obs.InitObserver(ctx, provider, nil, client, nil, "vxcommon", []logrus.Level{ + obs.InitObserver(ctx, provider, nil, client, nil, service, version, []logrus.Level{ logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel, @@ -196,11 +196,11 @@ func TestObserverTracesProxyAPI(t *testing.T) { hookClient := obs.NewHookTracerClient(hookClientCfg) proxyClient := obs.NewProxyTracerClient(adoptiveClient, hookClient) - provider, err := obs.NewTracerProvider(ctx, proxyClient, "vxcommon", "v1.0.0-develop") + provider, err := obs.NewTracerProvider(ctx, proxyClient, service, version) if err != nil { t.Fatal(err) } - obs.InitObserver(ctx, provider, nil, proxyClient, nil, "vxcommon", []logrus.Level{ + obs.InitObserver(ctx, provider, nil, proxyClient, nil, service, version, []logrus.Level{ logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel, @@ -238,11 +238,11 @@ func TestObserverTracesProxyAPIWithFlush(t *testing.T) { hookClient := obs.NewHookTracerClient(hookClientCfg) proxyClient := obs.NewProxyTracerClient(adoptiveClient, hookClient) - provider, err := obs.NewTracerProvider(ctx, proxyClient, "vxcommon", "v1.0.0-develop") + provider, err := obs.NewTracerProvider(ctx, proxyClient, service, version) if err != nil { t.Fatal(err) } - obs.InitObserver(ctx, provider, nil, proxyClient, nil, "vxcommon", []logrus.Level{ + obs.InitObserver(ctx, provider, nil, proxyClient, nil, service, version, []logrus.Level{ logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel, @@ -284,11 +284,11 @@ func TestObserverTracesAPIFixedTraceID(t *testing.T) { traceID := "d41d8cd98f00b204e9800998ecf8427e" pspanID := "8f00b204e9800998" client := obs.NewHookTracerClient(clientCfg) - provider, err := obs.NewTracerProvider(ctx, client, "vxcommon", "v1.0.0-develop") + provider, err := obs.NewTracerProvider(ctx, client, service, version) if err != nil { t.Fatal(err) } - obs.InitObserver(ctx, provider, nil, client, nil, "vxcommon", []logrus.Level{ + obs.InitObserver(ctx, provider, nil, client, nil, service, version, []logrus.Level{ logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel, @@ -319,11 +319,11 @@ func TestObserverTracesAPIDoubleEndSpan(t *testing.T) { } client := obs.NewHookTracerClient(clientCfg) - provider, err := obs.NewTracerProvider(ctx, client, "vxcommon", "v1.0.0-develop") + provider, err := obs.NewTracerProvider(ctx, client, service, version) if err != nil { t.Fatal(err) } - obs.InitObserver(ctx, provider, nil, client, nil, "vxcommon", []logrus.Level{ + obs.InitObserver(ctx, provider, nil, client, nil, service, version, []logrus.Level{ logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel, @@ -358,11 +358,11 @@ func TestObserverTracesAPIEmptyBranch(t *testing.T) { } client := obs.NewHookTracerClient(clientCfg) - provider, err := obs.NewTracerProvider(ctx, client, "vxcommon", "v1.0.0-develop") + provider, err := obs.NewTracerProvider(ctx, client, service, version) if err != nil { t.Fatal(err) } - obs.InitObserver(ctx, provider, nil, client, nil, "vxcommon", []logrus.Level{ + obs.InitObserver(ctx, provider, nil, client, nil, service, version, []logrus.Level{ logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel, @@ -396,11 +396,11 @@ func TestObserverTracesAPIMixedBranches(t *testing.T) { } client := obs.NewHookTracerClient(clientCfg) - provider, err := obs.NewTracerProvider(ctx, client, "vxcommon", "v1.0.0-develop") + provider, err := obs.NewTracerProvider(ctx, client, service, version) if err != nil { t.Fatal(err) } - obs.InitObserver(ctx, provider, nil, client, nil, "vxcommon", []logrus.Level{ + obs.InitObserver(ctx, provider, nil, client, nil, service, version, []logrus.Level{ logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel,