From 41cbcad7e28b500b9ad08d92d3327694a579bd4c Mon Sep 17 00:00:00 2001 From: Jens Neuse Date: Thu, 5 Sep 2024 09:06:54 +0200 Subject: [PATCH] chore: improve mem usage for attributes, single flight (#1141) --- go.work | 1 + router-tests/query_plans_test.go | 4 + .../query_plan_with_trace_no_data.json | 689 +----------------- router/core/context.go | 25 + router/core/engine_loader_hooks.go | 7 +- router/core/operation_metrics.go | 25 - router/core/operation_planner.go | 2 +- router/core/transport.go | 136 ++-- 8 files changed, 127 insertions(+), 762 deletions(-) diff --git a/go.work b/go.work index 03b4817ebd..16dcf355e8 100644 --- a/go.work +++ b/go.work @@ -12,3 +12,4 @@ use ( ) //replace github.com/wundergraph/graphql-go-tools/v2 => ../graphql-go-tools/v2 +//replace github.com/wundergraph/astjson => ../astjson diff --git a/router-tests/query_plans_test.go b/router-tests/query_plans_test.go index b85b5c97d4..574a590e8b 100644 --- a/router-tests/query_plans_test.go +++ b/router-tests/query_plans_test.go @@ -141,6 +141,7 @@ func TestQueryPlans(t *testing.T) { res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ Header: http.Header{ "X-WG-Include-Query-Plan": []string{"true"}, + "X-WG-Skip-Loader": []string{"true"}, "X-WG-Trace": []string{"true", "enable_predictable_debug_timings"}, }, Query: `query Requires { @@ -162,6 +163,9 @@ func TestQueryPlans(t *testing.T) { require.NoError(t, err) resultBody := rex.ReplaceAllString(res.Body, "http://localhost/graphql") g.Assert(t, "query_plan_with_trace_no_data", prettifyJSON(resultBody)) + if t.Failed() { + t.Log(res.Body) + } }) }) } diff --git a/router-tests/testdata/fixtures/query_plans/query_plan_with_trace_no_data.json b/router-tests/testdata/fixtures/query_plans/query_plan_with_trace_no_data.json index fb0a4eff5c..687f394e3b 100644 --- a/router-tests/testdata/fixtures/query_plans/query_plan_with_trace_no_data.json +++ b/router-tests/testdata/fixtures/query_plans/query_plan_with_trace_no_data.json @@ -1,23 +1,5 @@ { - "data": { - "products": [ - { - "__typename": "Consultancy", - "lead": { - "__typename": "Employee", - "id": 1, - "derivedMood": "HAPPY" - }, - "isLeadAvailable": false - }, - { - "__typename": "Cosmo" - }, - { - "__typename": "SDK" - } - ] - }, + "data": null, "extensions": { "queryPlan": { "version": "1", @@ -181,134 +163,7 @@ "fetch": { "kind": "Single", "path": "", - "source_id": "0", - "trace": { - "raw_input_data": {}, - "input": { - "body": { - "query": "{products {__typename ... on Consultancy {lead {__typename id} __typename upc}}}" - }, - "header": {}, - "method": "POST", - "url": "http://localhost/graphql" - }, - "output": { - "data": { - "products": [ - { - "lead": { - "__typename": "Employee", - "id": 1 - }, - "__typename": "Consultancy", - "upc": "consultancy" - }, - { - "__typename": "Cosmo" - }, - { - "__typename": "SDK" - } - ] - }, - "extensions": { - "trace": { - "request": { - "method": "POST", - "url": "http://localhost/graphql", - "headers": { - "Accept": [ - "application/json" - ], - "Accept-Encoding": [ - "gzip", - "deflate" - ], - "Content-Type": [ - "application/json" - ] - } - }, - "response": { - "status_code": 200, - "status": "200 OK", - "headers": { - "Content-Length": [ - "183" - ], - "Content-Type": [ - "application/json" - ] - }, - "body_size": 183 - } - } - } - }, - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "duration_load_nanoseconds": 1, - "duration_load_pretty": "1ns", - "single_flight_used": true, - "single_flight_shared_response": false, - "load_skipped": false, - "load_stats": { - "get_conn": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "host_port": "" - }, - "got_conn": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "reused": false, - "was_idle": false, - "idle_time_nanoseconds": 0, - "idle_time_pretty": "" - }, - "got_first_response_byte": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns" - }, - "dns_start": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "", - "host": "" - }, - "dns_done": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "" - }, - "connect_start": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "network": "", - "addr": "" - }, - "connect_done": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "network": "", - "addr": "" - }, - "tls_handshake_start": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "" - }, - "tls_handshake_done": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "" - }, - "wrote_headers": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns" - }, - "wrote_request": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns" - } - } - } + "source_id": "0" } }, { @@ -319,135 +174,7 @@ "fetch": { "kind": "BatchEntity", "path": "products.@.lead", - "source_id": "6", - "trace": { - "raw_input_data": { - "__typename": "Employee", - "id": 1 - }, - "input": { - "body": { - "query": "query($representations: [_Any!]!){_entities(representations: $representations){__typename ... on Employee {currentMood}}}", - "variables": { - "representations": [ - { - "__typename": "Employee", - "id": 1 - } - ] - } - }, - "header": {}, - "method": "POST", - "url": "http://localhost/graphql" - }, - "output": { - "data": { - "_entities": [ - { - "__typename": "Employee", - "currentMood": "HAPPY" - } - ] - }, - "extensions": { - "trace": { - "request": { - "method": "POST", - "url": "http://localhost/graphql", - "headers": { - "Accept": [ - "application/json" - ], - "Accept-Encoding": [ - "gzip", - "deflate" - ], - "Content-Type": [ - "application/json" - ] - } - }, - "response": { - "status_code": 200, - "status": "200 OK", - "headers": { - "Content-Length": [ - "72" - ], - "Content-Type": [ - "application/json" - ] - }, - "body_size": 72 - } - } - } - }, - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "duration_load_nanoseconds": 1, - "duration_load_pretty": "1ns", - "single_flight_used": true, - "single_flight_shared_response": false, - "load_skipped": false, - "load_stats": { - "get_conn": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "host_port": "" - }, - "got_conn": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "reused": false, - "was_idle": false, - "idle_time_nanoseconds": 0, - "idle_time_pretty": "" - }, - "got_first_response_byte": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns" - }, - "dns_start": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "", - "host": "" - }, - "dns_done": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "" - }, - "connect_start": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "network": "", - "addr": "" - }, - "connect_done": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "network": "", - "addr": "" - }, - "tls_handshake_start": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "" - }, - "tls_handshake_done": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "" - }, - "wrote_headers": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns" - }, - "wrote_request": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns" - } - } - } + "source_id": "6" } }, { @@ -455,135 +182,7 @@ "fetch": { "kind": "BatchEntity", "path": "products.@.lead", - "source_id": "5", - "trace": { - "raw_input_data": { - "__typename": "Employee", - "id": 1 - }, - "input": { - "body": { - "query": "query($representations: [_Any!]!){_entities(representations: $representations){__typename ... on Employee {isAvailable}}}", - "variables": { - "representations": [ - { - "__typename": "Employee", - "id": 1 - } - ] - } - }, - "header": {}, - "method": "POST", - "url": "http://localhost/graphql" - }, - "output": { - "data": { - "_entities": [ - { - "__typename": "Employee", - "isAvailable": false - } - ] - }, - "extensions": { - "trace": { - "request": { - "method": "POST", - "url": "http://localhost/graphql", - "headers": { - "Accept": [ - "application/json" - ], - "Accept-Encoding": [ - "gzip", - "deflate" - ], - "Content-Type": [ - "application/json" - ] - } - }, - "response": { - "status_code": 200, - "status": "200 OK", - "headers": { - "Content-Length": [ - "70" - ], - "Content-Type": [ - "application/json" - ] - }, - "body_size": 70 - } - } - } - }, - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "duration_load_nanoseconds": 1, - "duration_load_pretty": "1ns", - "single_flight_used": true, - "single_flight_shared_response": false, - "load_skipped": false, - "load_stats": { - "get_conn": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "host_port": "" - }, - "got_conn": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "reused": false, - "was_idle": false, - "idle_time_nanoseconds": 0, - "idle_time_pretty": "" - }, - "got_first_response_byte": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns" - }, - "dns_start": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "", - "host": "" - }, - "dns_done": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "" - }, - "connect_start": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "network": "", - "addr": "" - }, - "connect_done": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "network": "", - "addr": "" - }, - "tls_handshake_start": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "" - }, - "tls_handshake_done": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "" - }, - "wrote_headers": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns" - }, - "wrote_request": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns" - } - } - } + "source_id": "5" } } ] @@ -596,138 +195,7 @@ "fetch": { "kind": "BatchEntity", "path": "products.@.lead", - "source_id": "0", - "trace": { - "raw_input_data": { - "__typename": "Employee", - "id": 1, - "currentMood": "HAPPY", - "isAvailable": false - }, - "input": { - "body": { - "query": "query($representations: [_Any!]!){_entities(representations: $representations){__typename ... on Employee {derivedMood}}}", - "variables": { - "representations": [ - { - "__typename": "Employee", - "currentMood": "HAPPY", - "id": 1 - } - ] - } - }, - "header": {}, - "method": "POST", - "url": "http://localhost/graphql" - }, - "output": { - "data": { - "_entities": [ - { - "__typename": "Employee", - "derivedMood": "HAPPY" - } - ] - }, - "extensions": { - "trace": { - "request": { - "method": "POST", - "url": "http://localhost/graphql", - "headers": { - "Accept": [ - "application/json" - ], - "Accept-Encoding": [ - "gzip", - "deflate" - ], - "Content-Type": [ - "application/json" - ] - } - }, - "response": { - "status_code": 200, - "status": "200 OK", - "headers": { - "Content-Length": [ - "72" - ], - "Content-Type": [ - "application/json" - ] - }, - "body_size": 72 - } - } - } - }, - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "duration_load_nanoseconds": 1, - "duration_load_pretty": "1ns", - "single_flight_used": true, - "single_flight_shared_response": false, - "load_skipped": false, - "load_stats": { - "get_conn": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "host_port": "" - }, - "got_conn": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "reused": false, - "was_idle": false, - "idle_time_nanoseconds": 0, - "idle_time_pretty": "" - }, - "got_first_response_byte": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns" - }, - "dns_start": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "", - "host": "" - }, - "dns_done": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "" - }, - "connect_start": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "", - "network": "", - "addr": "" - }, - "connect_done": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "", - "network": "", - "addr": "" - }, - "tls_handshake_start": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "" - }, - "tls_handshake_done": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "" - }, - "wrote_headers": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns" - }, - "wrote_request": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns" - } - } - } + "source_id": "0" } }, { @@ -735,152 +203,7 @@ "fetch": { "kind": "BatchEntity", "path": "products", - "source_id": "0", - "trace": { - "raw_input_data": [ - { - "lead": { - "__typename": "Employee", - "id": 1, - "currentMood": "HAPPY", - "isAvailable": false - }, - "__typename": "Consultancy", - "upc": "consultancy" - }, - { - "__typename": "Cosmo" - }, - { - "__typename": "SDK" - } - ], - "input": { - "body": { - "query": "query($representations: [_Any!]!){_entities(representations: $representations){__typename ... on Consultancy {isLeadAvailable}}}", - "variables": { - "representations": [ - { - "__typename": "Consultancy", - "lead": { - "isAvailable": false - }, - "upc": "consultancy" - } - ] - } - }, - "header": {}, - "method": "POST", - "url": "http://localhost/graphql" - }, - "output": { - "data": { - "_entities": [ - { - "__typename": "Consultancy", - "isLeadAvailable": false - } - ] - }, - "extensions": { - "trace": { - "request": { - "method": "POST", - "url": "http://localhost/graphql", - "headers": { - "Accept": [ - "application/json" - ], - "Accept-Encoding": [ - "gzip", - "deflate" - ], - "Content-Type": [ - "application/json" - ] - } - }, - "response": { - "status_code": 200, - "status": "200 OK", - "headers": { - "Content-Length": [ - "77" - ], - "Content-Type": [ - "application/json" - ] - }, - "body_size": 77 - } - } - } - }, - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "duration_load_nanoseconds": 1, - "duration_load_pretty": "1ns", - "single_flight_used": true, - "single_flight_shared_response": false, - "load_skipped": false, - "load_stats": { - "get_conn": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "host_port": "" - }, - "got_conn": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "reused": false, - "was_idle": false, - "idle_time_nanoseconds": 0, - "idle_time_pretty": "" - }, - "got_first_response_byte": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns" - }, - "dns_start": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "", - "host": "" - }, - "dns_done": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "" - }, - "connect_start": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "network": "", - "addr": "" - }, - "connect_done": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns", - "network": "", - "addr": "" - }, - "tls_handshake_start": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "" - }, - "tls_handshake_done": { - "duration_since_start_nanoseconds": 0, - "duration_since_start_pretty": "" - }, - "wrote_headers": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns" - }, - "wrote_request": { - "duration_since_start_nanoseconds": 1, - "duration_since_start_pretty": "1ns" - } - } - } + "source_id": "0" } } ] diff --git a/router/core/context.go b/router/core/context.go index 7423d2fba6..6ddd453b79 100644 --- a/router/core/context.go +++ b/router/core/context.go @@ -4,10 +4,12 @@ import ( "context" "net/http" "net/url" + "strconv" "sync" "time" graphqlmetrics "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1" + "github.com/wundergraph/cosmo/router/pkg/otel" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/httpclient" "go.opentelemetry.io/otel/attribute" @@ -373,6 +375,29 @@ type operationContext struct { typeFieldUsageInfo []*graphqlmetrics.TypeFieldUsageInfo argumentUsageInfo []*graphqlmetrics.ArgumentUsageInfo inputUsageInfo []*graphqlmetrics.InputUsageInfo + + attributes []attribute.KeyValue +} + +func (o *operationContext) setAttributes() { + numberOfAttributes := 6 + if o.persistedID != "" { + numberOfAttributes += 1 + } + o.attributes = make([]attribute.KeyValue, numberOfAttributes) + o.attributes[0] = otel.WgClientName.String(o.clientInfo.Name) + o.attributes[1] = otel.WgClientVersion.String(o.clientInfo.Version) + o.attributes[2] = otel.WgOperationName.String(o.Name()) + o.attributes[3] = otel.WgOperationType.String(o.Type()) + o.attributes[4] = otel.WgOperationProtocol.String(o.Protocol().String()) + o.attributes[5] = otel.WgOperationHash.String(strconv.FormatUint(o.Hash(), 10)) + if o.persistedID != "" { + o.attributes[6] = otel.WgOperationPersistedID.String(o.PersistedID()) + } +} + +func (o *operationContext) Attributes() []attribute.KeyValue { + return o.attributes } func (o *operationContext) Variables() []byte { diff --git a/router/core/engine_loader_hooks.go b/router/core/engine_loader_hooks.go index b90f60d44e..8d98dfe4f0 100644 --- a/router/core/engine_loader_hooks.go +++ b/router/core/engine_loader_hooks.go @@ -4,6 +4,9 @@ import ( "context" "errors" "fmt" + "slices" + "strings" + "github.com/wundergraph/cosmo/router/pkg/metric" rotel "github.com/wundergraph/cosmo/router/pkg/otel" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" @@ -12,8 +15,6 @@ import ( "go.opentelemetry.io/otel/codes" semconv "go.opentelemetry.io/otel/semconv/v1.21.0" "go.opentelemetry.io/otel/trace" - "slices" - "strings" ) var ( @@ -97,7 +98,7 @@ func (f *EngineLoaderHooks) OnFinished(ctx context.Context, statusCode int, data } // Ensure common attributes are set - baseAttributes = append(baseAttributes, getAttributesFromOperationContext(reqContext.operation)...) + baseAttributes = append(baseAttributes, reqContext.operation.Attributes()...) if attributes := baseAttributesFromContext(reqContext.Request().Context()); attributes != nil { baseAttributes = append(baseAttributes, attributes...) diff --git a/router/core/operation_metrics.go b/router/core/operation_metrics.go index b01c174ec1..e19a3aee76 100644 --- a/router/core/operation_metrics.go +++ b/router/core/operation_metrics.go @@ -2,7 +2,6 @@ package core import ( "context" - "strconv" "time" "github.com/wundergraph/cosmo/router/pkg/otel" @@ -111,27 +110,3 @@ func newOperationMetrics(opts OperationMetricsOptions) *OperationMetrics { trackUsageInfo: opts.TrackUsageInfo, } } - -// getAttributesFromOperationContext returns the attributes that are common to both metrics and traces. -func getAttributesFromOperationContext(operationContext *operationContext) []attribute.KeyValue { - if operationContext == nil { - return nil - } - - var baseMetricAttributeValues []attribute.KeyValue - - // Fields that are always present in the metrics and traces - baseMetricAttributeValues = append(baseMetricAttributeValues, otel.WgClientName.String(operationContext.clientInfo.Name)) - baseMetricAttributeValues = append(baseMetricAttributeValues, otel.WgClientVersion.String(operationContext.clientInfo.Version)) - baseMetricAttributeValues = append(baseMetricAttributeValues, otel.WgOperationName.String(operationContext.Name())) - baseMetricAttributeValues = append(baseMetricAttributeValues, otel.WgOperationType.String(operationContext.Type())) - baseMetricAttributeValues = append(baseMetricAttributeValues, otel.WgOperationProtocol.String(operationContext.Protocol().String())) - baseMetricAttributeValues = append(baseMetricAttributeValues, otel.WgOperationHash.String(strconv.FormatUint(operationContext.Hash(), 10))) - - // Common Field that will be present in both metrics and traces if not empty - if operationContext.PersistedID() != "" { - baseMetricAttributeValues = append(baseMetricAttributeValues, otel.WgOperationPersistedID.String(operationContext.PersistedID())) - } - - return baseMetricAttributeValues -} diff --git a/router/core/operation_planner.go b/router/core/operation_planner.go index 5e1fca0861..0706c67aba 100644 --- a/router/core/operation_planner.go +++ b/router/core/operation_planner.go @@ -131,10 +131,10 @@ func (p *OperationPlanner) plan(operation *ParsedOperation, options PlanOptions) normalizationCacheHit: operation.NormalizationCacheHit, executionOptions: options.ExecutionOptions, } - if operation.IsPersistedOperation { opContext.persistedID = operation.GraphQLRequestExtensions.PersistedQuery.Sha256Hash } + opContext.setAttributes() // if we have tracing enabled or want to include a query plan in the response we always prepare a new plan // this is because in case of tracing, we're writing trace data to the plan diff --git a/router/core/transport.go b/router/core/transport.go index 2116db8854..b0818271fd 100644 --- a/router/core/transport.go +++ b/router/core/transport.go @@ -12,6 +12,7 @@ import ( "time" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/httpclient" + "go.opentelemetry.io/otel/attribute" sdktrace "go.opentelemetry.io/otel/sdk/trace" "github.com/wundergraph/cosmo/router/pkg/metric" @@ -26,7 +27,6 @@ import ( "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" otrace "go.opentelemetry.io/otel/trace" "go.uber.org/zap" - "golang.org/x/sync/singleflight" ) type TransportPreHandler func(req *http.Request, ctx RequestContext) (*http.Request, *http.Response) @@ -39,7 +39,15 @@ type CustomTransport struct { metricStore metric.Provider logger *zap.Logger - sf *singleflight.Group + sf map[uint64]*sfCacheItem + sfMu *sync.RWMutex +} + +type sfCacheItem struct { + loaded chan struct{} + response *http.Response + body []byte + err error } func NewCustomTransport( @@ -59,7 +67,8 @@ func NewCustomTransport( ct.roundTripper = roundTripper } if enableSingleFlight { - ct.sf = &singleflight.Group{} + ct.sf = make(map[uint64]*sfCacheItem) + ct.sfMu = &sync.RWMutex{} } return ct @@ -68,9 +77,10 @@ func NewCustomTransport( func (ct *CustomTransport) measureSubgraphMetrics(req *http.Request) func(err error, resp *http.Response) { reqContext := getRequestContext(req.Context()) - baseFields := getAttributesFromOperationContext(reqContext.operation) - activeSubgraph := reqContext.ActiveSubgraph(req) + + var baseFields []attribute.KeyValue + if activeSubgraph != nil { baseFields = append(baseFields, otel.WgSubgraphName.String(activeSubgraph.Name)) baseFields = append(baseFields, otel.WgSubgraphID.String(activeSubgraph.Id)) @@ -80,6 +90,8 @@ func (ct *CustomTransport) measureSubgraphMetrics(req *http.Request) func(err er baseFields = append(baseFields, attributes...) } + baseFields = append(baseFields, reqContext.operation.Attributes()...) + inFlightDone := ct.metricStore.MeasureInFlight(req.Context(), baseFields...) ct.metricStore.MeasureRequestSize(req.Context(), req.ContentLength, baseFields...) @@ -162,11 +174,6 @@ func (ct *CustomTransport) RoundTrip(req *http.Request) (resp *http.Response, er return resp, err } -type responseWithBody struct { - res *http.Response - body []byte -} - func (ct *CustomTransport) isUpgradeError(req *http.Request, res *http.Response) bool { return req.Header.Get("Upgrade") != "" && res.StatusCode != http.StatusSwitchingProtocols } @@ -214,55 +221,83 @@ func releaseBuffer(buf *bytes.Buffer) { func (ct *CustomTransport) roundTripSingleFlight(req *http.Request) (*http.Response, error) { - // We need to use the single flight group to ensure that the request is only sent once - v, err, shared := ct.sf.Do(ct.singleFlightKey(req), func() (interface{}, error) { - res, err := ct.roundTripper.RoundTrip(req) - if err != nil { - return nil, err - } - // single flight is disallowed for mutations, including file uploads - // hence we don't need to worry about buffering the body here - buf := getBuffer() - defer releaseBuffer(buf) - _, err = buf.ReadFrom(res.Body) - if err != nil { - return nil, err - } - cp := make([]byte, buf.Len()) - copy(cp, buf.Bytes()) - return &responseWithBody{ - res: res, - body: cp, - }, nil - }) - if err != nil { - return nil, err - } + key := ct.singleFlightKey(req) + ct.sfMu.RLock() + item, shared := ct.sf[key] + ct.sfMu.RUnlock() sfStats := resolve.GetSingleFlightStats(req.Context()) if sfStats != nil { sfStats.SingleFlightUsed = true sfStats.SingleFlightSharedResponse = shared } - rwb := v.(*responseWithBody) - res := &http.Response{} - res.Status = rwb.res.Status - res.StatusCode = rwb.res.StatusCode - res.Header = rwb.res.Header.Clone() - res.Trailer = rwb.res.Trailer.Clone() - res.ContentLength = rwb.res.ContentLength - res.TransferEncoding = rwb.res.TransferEncoding - res.Close = rwb.res.Close - res.Uncompressed = rwb.res.Uncompressed - res.Request = req + + if shared { + select { + case <-item.loaded: + case <-req.Context().Done(): + return nil, req.Context().Err() + } + + res := &http.Response{} + res.Status = item.response.Status + res.StatusCode = item.response.StatusCode + res.Header = item.response.Header.Clone() + res.Trailer = item.response.Trailer.Clone() + res.ContentLength = item.response.ContentLength + res.TransferEncoding = item.response.TransferEncoding + res.Close = item.response.Close + res.Uncompressed = item.response.Uncompressed + res.Request = req + + // Restore the body + res.Body = io.NopCloser(bytes.NewReader(item.body)) + return res, item.err + } + + if sfStats != nil { + sfStats.SingleFlightUsed = true + sfStats.SingleFlightSharedResponse = false + } + + item = &sfCacheItem{ + loaded: make(chan struct{}), + } + ct.sfMu.Lock() + ct.sf[key] = item + ct.sfMu.Unlock() + defer func() { + close(item.loaded) + ct.sfMu.Lock() + delete(ct.sf, key) + ct.sfMu.Unlock() + }() + + res, err := ct.roundTripper.RoundTrip(req) + if err != nil { + item.err = err + return nil, err + } + + buf := getBuffer() + defer releaseBuffer(buf) + _, err = buf.ReadFrom(res.Body) + if err != nil { + item.err = err + return nil, err + } + + item.response = res + item.body = make([]byte, buf.Len()) + copy(item.body, buf.Bytes()) // Restore the body - res.Body = io.NopCloser(bytes.NewReader(rwb.body)) + res.Body = io.NopCloser(bytes.NewReader(item.body)) return res, nil } -func (ct *CustomTransport) singleFlightKey(req *http.Request) string { +func (ct *CustomTransport) singleFlightKey(req *http.Request) uint64 { keyGen := pool.Hash64.Get() defer pool.Hash64.Put(keyGen) @@ -283,7 +318,7 @@ func (ct *CustomTransport) singleFlightKey(req *http.Request) string { } sum := keyGen.Sum64() - return strconv.FormatUint(sum, 10) + return sum } type TransportFactory struct { @@ -337,9 +372,8 @@ func (t TransportFactory) RoundTripper(enableSingleFlight bool, transport http.R trace.WithPreHandler(func(r *http.Request) { span := otrace.SpanFromContext(r.Context()) reqContext := getRequestContext(r.Context()) - operation := reqContext.operation - commonAttributeValues := getAttributesFromOperationContext(operation) + var commonAttributeValues []attribute.KeyValue subgraph := reqContext.ActiveSubgraph(r) if subgraph != nil { @@ -351,6 +385,8 @@ func (t TransportFactory) RoundTripper(enableSingleFlight bool, transport http.R commonAttributeValues = append(commonAttributeValues, attributes...) } + commonAttributeValues = append(commonAttributeValues, reqContext.operation.Attributes()...) + span.SetAttributes(commonAttributeValues...) }),