From af372ed12bfe4fd0b8e859074512900fa6129bb9 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 22 Jan 2020 10:25:54 -0800 Subject: [PATCH 1/3] forward diagnostic events --- endpoints.go | 3 + internal/events/event-relay.go | 173 ++++++++++++++++++++++++--------- relay.go | 6 ++ relay_test.go | 80 ++++++++++++--- 4 files changed, 203 insertions(+), 59 deletions(-) diff --git a/endpoints.go b/endpoints.go index 6317fcef..9880246e 100644 --- a/endpoints.go +++ b/endpoints.go @@ -78,8 +78,11 @@ func pollSegmentHandler(w http.ResponseWriter, req *http.Request) { // Event-recorder endpoints: // events.ld.com/bulk (server-side) +// events.ld.com/diagnostic (server-side diagnostic) // events.ld.com/mobile, events.ld.com/mobile/events, events.ld.com/mobileevents/bulk (mobile) +// events.ld.com/mobile/events/diagnostic (mobile diagnostic) // events.ld.com/events/bulk/{envId} (JS) +// events.ld.com/events/diagnostic/{envId} (JS) func bulkEventHandler(endpoint events.Endpoint) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, req *http.Request) { clientCtx := getClientContext(req) diff --git a/internal/events/event-relay.go b/internal/events/event-relay.go index f5470e0f..ca82b873 100644 --- a/internal/events/event-relay.go +++ b/internal/events/event-relay.go @@ -1,6 +1,7 @@ package events import ( + "bytes" "encoding/json" "fmt" "io/ioutil" @@ -16,7 +17,6 @@ import ( "gopkg.in/launchdarkly/ld-relay.v5/httpconfig" "gopkg.in/launchdarkly/ld-relay.v5/internal/util" - "gopkg.in/launchdarkly/ld-relay.v5/logging" ) // EventRelay configuration - used in the config file struct in relay.go @@ -35,15 +35,21 @@ type Endpoint interface { } type ( - serverSDKEventsEndpoint struct{} - mobileSDKEventsEndpoint struct{} - javaScriptSDKEventsEndpoint struct{} + serverSDKEventsEndpoint struct{} + mobileSDKEventsEndpoint struct{} + javaScriptSDKEventsEndpoint struct{} + serverDiagnosticEventsEndpoint struct{} + mobileDiagnosticEventsEndpoint struct{} + javaScriptDiagnosticEventsEndpoint struct{} ) var ( - ServerSDKEventsEndpoint = &serverSDKEventsEndpoint{} - MobileSDKEventsEndpoint = &mobileSDKEventsEndpoint{} - JavaScriptSDKEventsEndpoint = &javaScriptSDKEventsEndpoint{} + ServerSDKEventsEndpoint = &serverSDKEventsEndpoint{} + MobileSDKEventsEndpoint = &mobileSDKEventsEndpoint{} + JavaScriptSDKEventsEndpoint = &javaScriptSDKEventsEndpoint{} + ServerSDKDiagnosticEventsEndpoint = &serverDiagnosticEventsEndpoint{} + MobileSDKDiagnosticEventsEndpoint = &mobileDiagnosticEventsEndpoint{} + JavaScriptSDKDiagnosticEventsEndpoint = &javaScriptDiagnosticEventsEndpoint{} ) type eventVerbatimRelay struct { @@ -68,10 +74,14 @@ const ( // EventDispatcher relays events to LaunchDarkly for an environment type EventDispatcher struct { - endpoints map[Endpoint]*eventEndpointDispatcher + endpoints map[Endpoint]eventEndpointDispatcher } -type eventEndpointDispatcher struct { +type eventEndpointDispatcher interface { + dispatch(w http.ResponseWriter, req *http.Request) +} + +type analyticsEventEndpointDispatcher struct { config Config httpClient *http.Client httpConfig httpconfig.HTTPConfig @@ -84,6 +94,12 @@ type eventEndpointDispatcher struct { mu sync.Mutex } +type diagnosticEventEndpointDispatcher struct { + httpClient *http.Client + remoteEndpointURI string + loggers ldlog.Loggers +} + func (e *serverSDKEventsEndpoint) String() string { return "ServerSDKEventsEndpoint" } @@ -96,19 +112,98 @@ func (e *javaScriptSDKEventsEndpoint) String() string { return "JavaScriptSDKEventsEndpoint" } +func (e *serverDiagnosticEventsEndpoint) String() string { + return "ServerDiagnosticEventsEndpoint" +} + +func (e *mobileDiagnosticEventsEndpoint) String() string { + return "MobileDiagnosticEventsEndpoint" +} + +func (e *javaScriptDiagnosticEventsEndpoint) String() string { + return "JavaScriptDiagnosticEventsEndpoint" +} + func (r *EventDispatcher) GetHandler(endpoint Endpoint) func(w http.ResponseWriter, req *http.Request) { d := r.endpoints[endpoint] if d != nil { - return d.dispatchEvents + return d.dispatch } return nil } -func (r *eventEndpointDispatcher) dispatchEvents(w http.ResponseWriter, req *http.Request) { +func (r *analyticsEventEndpointDispatcher) dispatch(w http.ResponseWriter, req *http.Request) { + consumeEvents(w, req, r.loggers, func(body []byte) { + evts := make([]json.RawMessage, 0) + err := json.Unmarshal(body, &evts) + if err != nil { + r.loggers.Errorf("Error unmarshaling event post body: %+v", err) + return + } + + payloadVersion, _ := strconv.Atoi(req.Header.Get(EventSchemaHeader)) + if payloadVersion == 0 { + payloadVersion = 1 + } + r.loggers.Debugf("Received %d events (v%d) to be proxied to %s", len(evts), payloadVersion, r.remotePath) + if payloadVersion >= SummaryEventsSchemaVersion { + // New-style events that have already gone through summarization - deliver them as-is + r.getVerbatimRelay().enqueue(evts) + } else { + r.getSummarizingRelay().enqueue(evts, payloadVersion) + } + }) +} + +func (d *diagnosticEventEndpointDispatcher) dispatch(w http.ResponseWriter, req *http.Request) { + consumeEvents(w, req, d.loggers, func(body []byte) { + // We are just operating as a reverse proxy and passing the request on verbatim to LD; we do not + // need to parse the JSON. + d.loggers.Debugf("Received diagnostic event to be proxied to %s", d.remoteEndpointURI) + + for attempt := 0; attempt < 2; attempt++ { // use the same retry logic that the SDK uses + if attempt > 0 { + d.loggers.Warn("Will retry posting diagnostic event after 1 second") + time.Sleep(1 * time.Second) + } + + forwardReq, reqErr := http.NewRequest("POST", d.remoteEndpointURI, bytes.NewReader(body)) + if reqErr != nil { + d.loggers.Errorf("Unexpected error while creating event request: %+v", reqErr) + return + } + forwardReq.Header.Add("Content-Type", "application/json") + + // Copy the Authorization header, if any (used only for server-side and mobile); also copy User-Agent + if authKey := req.Header.Get("Authorization"); authKey != "" { + forwardReq.Header.Add("Authorization", authKey) + } + if userAgent := req.Header.Get("User-Agent"); userAgent != "" { + forwardReq.Header.Add("User-Agent", userAgent) + } + // diagnostic events do not have schema or payload ID headers + + resp, respErr := d.httpClient.Do(forwardReq) + if resp != nil && resp.Body != nil { + _, _ = ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() + } + if respErr != nil { + d.loggers.Warnf("Unexpected error while sending events: %+v", respErr) + } else if resp.StatusCode >= 400 { + d.loggers.Warnf("Received error status %d when sending events", resp.StatusCode) + } else { + break + } + } + }) +} + +func consumeEvents(w http.ResponseWriter, req *http.Request, loggers ldlog.Loggers, thenExecute func([]byte)) { body, bodyErr := ioutil.ReadAll(req.Body) if bodyErr != nil { - r.loggers.Errorf("Error reading event post body: %+v", bodyErr) + loggers.Errorf("Error reading event post body: %+v", bodyErr) w.WriteHeader(http.StatusBadRequest) w.Write(util.ErrorJsonMsg("unable to read request body")) return @@ -126,35 +221,14 @@ func (r *eventEndpointDispatcher) dispatchEvents(w http.ResponseWriter, req *htt go func() { defer func() { if err := recover(); err != nil { - r.loggers.Errorf("Unexpected panic in event relay: %+v", err) + loggers.Errorf("Unexpected panic in event relay: %+v", err) } }() - - evts := make([]json.RawMessage, 0) - err := json.Unmarshal(body, &evts) - if err != nil { - r.loggers.Errorf("Error unmarshaling event post body: %+v", err) - return - } - - payloadVersion, _ := strconv.Atoi(req.Header.Get(EventSchemaHeader)) - if payloadVersion == 0 { - payloadVersion = 1 - } - // This debug-level log message goes to logging.GlobalLoggers, not to r.loggers, because it is more of a - // message from ld-relay itself about a client request, rather than SDK logging about requests - // that ld-relay makes. - logging.GlobalLoggers.Debugf("Received %d events (v%d) to be proxied to %s", len(evts), payloadVersion, r.remotePath) - if payloadVersion >= SummaryEventsSchemaVersion { - // New-style events that have already gone through summarization - deliver them as-is - r.getVerbatimRelay().enqueue(evts) - } else { - r.getSummarizingRelay().enqueue(evts, payloadVersion) - } + thenExecute(body) }() } -func (r *eventEndpointDispatcher) getVerbatimRelay() *eventVerbatimRelay { +func (r *analyticsEventEndpointDispatcher) getVerbatimRelay() *eventVerbatimRelay { r.mu.Lock() defer r.mu.Unlock() if r.verbatimRelay == nil { @@ -163,7 +237,7 @@ func (r *eventEndpointDispatcher) getVerbatimRelay() *eventVerbatimRelay { return r.verbatimRelay } -func (r *eventEndpointDispatcher) getSummarizingRelay() *eventSummarizingRelay { +func (r *analyticsEventEndpointDispatcher) getSummarizingRelay() *eventSummarizingRelay { r.mu.Lock() defer r.mu.Unlock() if r.summarizingRelay == nil { @@ -176,22 +250,33 @@ func (r *eventEndpointDispatcher) getSummarizingRelay() *eventSummarizingRelay { func NewEventDispatcher(sdkKey string, mobileKey *string, envID *string, loggers ldlog.Loggers, config Config, httpConfig httpconfig.HTTPConfig, featureStore ld.FeatureStore) *EventDispatcher { httpClient := httpConfig.Client() ep := &EventDispatcher{ - endpoints: map[Endpoint]*eventEndpointDispatcher{ - ServerSDKEventsEndpoint: newEventEndpointDispatcher(sdkKey, config, httpConfig, httpClient, featureStore, loggers, "/bulk"), + endpoints: map[Endpoint]eventEndpointDispatcher{ + ServerSDKEventsEndpoint: newAnalyticsEventEndpointDispatcher(sdkKey, config, httpConfig, httpClient, featureStore, loggers, "/bulk"), }, } + ep.endpoints[ServerSDKDiagnosticEventsEndpoint] = newDiagnosticEventEndpointDispatcher(config, httpClient, loggers, "/diagnostic") if mobileKey != nil { - ep.endpoints[MobileSDKEventsEndpoint] = newEventEndpointDispatcher(*mobileKey, config, httpConfig, httpClient, featureStore, loggers, "/mobile") + ep.endpoints[MobileSDKEventsEndpoint] = newAnalyticsEventEndpointDispatcher(*mobileKey, config, httpConfig, httpClient, featureStore, loggers, "/mobile") + ep.endpoints[MobileSDKDiagnosticEventsEndpoint] = newDiagnosticEventEndpointDispatcher(config, httpClient, loggers, "/mobile/events/diagnostic") } if envID != nil { - ep.endpoints[JavaScriptSDKEventsEndpoint] = newEventEndpointDispatcher("", config, httpConfig, httpClient, featureStore, loggers, "/events/bulk/"+*envID) + ep.endpoints[JavaScriptSDKEventsEndpoint] = newAnalyticsEventEndpointDispatcher("", config, httpConfig, httpClient, featureStore, loggers, "/events/bulk/"+*envID) + ep.endpoints[JavaScriptSDKDiagnosticEventsEndpoint] = newDiagnosticEventEndpointDispatcher(config, httpClient, loggers, "/events/diagnostic/"+*envID) } return ep } -func newEventEndpointDispatcher(authKey string, config Config, httpConfig httpconfig.HTTPConfig, - httpClient *http.Client, featureStore ld.FeatureStore, loggers ldlog.Loggers, remotePath string) *eventEndpointDispatcher { - return &eventEndpointDispatcher{ +func newDiagnosticEventEndpointDispatcher(config Config, httpClient *http.Client, loggers ldlog.Loggers, remotePath string) *diagnosticEventEndpointDispatcher { + return &diagnosticEventEndpointDispatcher{ + httpClient: httpClient, + remoteEndpointURI: strings.TrimRight(config.EventsUri, "/") + remotePath, + loggers: loggers, + } +} + +func newAnalyticsEventEndpointDispatcher(authKey string, config Config, httpConfig httpconfig.HTTPConfig, + httpClient *http.Client, featureStore ld.FeatureStore, loggers ldlog.Loggers, remotePath string) *analyticsEventEndpointDispatcher { + return &analyticsEventEndpointDispatcher{ authKey: authKey, config: config, httpConfig: httpConfig, diff --git a/relay.go b/relay.go index c066b474..e0df5dd6 100644 --- a/relay.go +++ b/relay.go @@ -362,11 +362,16 @@ func (r *Relay) makeHandler(withRequestLogging bool) http.Handler { mobileEventsRouter.HandleFunc("/events/bulk", bulkEventHandler(events.MobileSDKEventsEndpoint)).Methods("POST") mobileEventsRouter.HandleFunc("/events", bulkEventHandler(events.MobileSDKEventsEndpoint)).Methods("POST") mobileEventsRouter.HandleFunc("", bulkEventHandler(events.MobileSDKEventsEndpoint)).Methods("POST") + mobileEventsRouter.HandleFunc("/events/diagnostic", bulkEventHandler(events.MobileSDKDiagnosticEventsEndpoint)).Methods("POST") clientSideBulkEventsRouter := router.PathPrefix("/events/bulk/{envId}").Subrouter() clientSideBulkEventsRouter.Use(clientSideMiddlewareStack, mux.CORSMethodMiddleware(clientSideBulkEventsRouter)) clientSideBulkEventsRouter.HandleFunc("", bulkEventHandler(events.JavaScriptSDKEventsEndpoint)).Methods("POST", "OPTIONS") + clientSideDiagnosticEventsRouter := router.PathPrefix("/events/diagnostic/{envId}").Subrouter() + clientSideDiagnosticEventsRouter.Use(clientSideMiddlewareStack, mux.CORSMethodMiddleware(clientSideBulkEventsRouter)) + clientSideDiagnosticEventsRouter.HandleFunc("", bulkEventHandler(events.JavaScriptSDKDiagnosticEventsEndpoint)).Methods("POST", "OPTIONS") + clientSideImageEventsRouter := router.PathPrefix("/a/{envId}.gif").Subrouter() clientSideImageEventsRouter.Use(clientSideMiddlewareStack, mux.CORSMethodMiddleware(clientSideImageEventsRouter)) clientSideImageEventsRouter.HandleFunc("", getEventsImage).Methods("GET", "OPTIONS") @@ -376,6 +381,7 @@ func (r *Relay) makeHandler(withRequestLogging bool) http.Handler { serverSideRouter.HandleFunc("/all", countServerConns(allStreamHandler)).Methods("GET") serverSideRouter.HandleFunc("/flags", countServerConns(flagsStreamHandler)).Methods("GET") serverSideRouter.HandleFunc("/bulk", bulkEventHandler(events.ServerSDKEventsEndpoint)).Methods("POST") + serverSideRouter.HandleFunc("/diagnostic", bulkEventHandler(events.ServerSDKDiagnosticEventsEndpoint)).Methods("POST") return router } diff --git a/relay_test.go b/relay_test.go index b6cac0c8..3fdc720a 100644 --- a/relay_test.go +++ b/relay_test.go @@ -611,21 +611,39 @@ func TestRelay(t *testing.T) { } }) - t.Run("server-side events proxy", func(t *testing.T) { - w := httptest.NewRecorder() - body := makeTestEventBuffer("me") - bodyBuffer := bytes.NewBuffer(body) - r, _ := http.NewRequest("POST", "http://localhost/bulk", bodyBuffer) - r.Header.Set("Content-Type", "application/json") - r.Header.Set("Authorization", sdkKey) - r.Header.Set(events.EventSchemaHeader, strconv.Itoa(events.SummaryEventsSchemaVersion)) - relay.ServeHTTP(w, r) - result := w.Result() - if assert.Equal(t, http.StatusAccepted, result.StatusCode) { - event := requirePublishedEvent(body) - assert.Equal(t, "/bulk", event.url) - assert.Equal(t, sdkKey, event.authKey) - } + t.Run("server-side events proxies", func(t *testing.T) { + t.Run("bulk post", func(t *testing.T) { + w := httptest.NewRecorder() + body := makeTestEventBuffer("me") + bodyBuffer := bytes.NewBuffer(body) + r, _ := http.NewRequest("POST", "http://localhost/bulk", bodyBuffer) + r.Header.Set("Content-Type", "application/json") + r.Header.Set("Authorization", sdkKey) + r.Header.Set(events.EventSchemaHeader, strconv.Itoa(events.SummaryEventsSchemaVersion)) + relay.ServeHTTP(w, r) + result := w.Result() + if assert.Equal(t, http.StatusAccepted, result.StatusCode) { + event := requirePublishedEvent(body) + assert.Equal(t, "/bulk", event.url) + assert.Equal(t, sdkKey, event.authKey) + } + }) + + t.Run("diagnostics forwarding", func(t *testing.T) { + w := httptest.NewRecorder() + body := makeTestEventBuffer("me") + bodyBuffer := bytes.NewBuffer(body) + r, _ := http.NewRequest("POST", "http://localhost/diagnostic", bodyBuffer) + r.Header.Set("Content-Type", "application/json") + r.Header.Set("Authorization", sdkKey) + relay.ServeHTTP(w, r) + result := w.Result() + if assert.Equal(t, http.StatusAccepted, result.StatusCode) { + event := requirePublishedEvent(body) + assert.Equal(t, "/diagnostic", event.url) + assert.Equal(t, sdkKey, event.authKey) + } + }) }) t.Run("mobile events proxies", func(t *testing.T) { @@ -654,6 +672,22 @@ func TestRelay(t *testing.T) { assert.Equal(t, mobileKey, event.authKey) } } + + t.Run("diagnostics forwarding", func(t *testing.T) { + w := httptest.NewRecorder() + body := makeTestEventBuffer("me") + bodyBuffer := bytes.NewBuffer(body) + r, _ := http.NewRequest("POST", "http://localhost/mobile/events/diagnostic", bodyBuffer) + r.Header.Set("Content-Type", "application/json") + r.Header.Set("Authorization", mobileKey) + relay.ServeHTTP(w, r) + result := w.Result() + if assert.Equal(t, http.StatusAccepted, result.StatusCode) { + event := requirePublishedEvent(body) + assert.Equal(t, "/mobile/events/diagnostic", event.url) + assert.Equal(t, mobileKey, event.authKey) + } + }) }) t.Run("client-side events proxies", func(t *testing.T) { @@ -688,6 +722,22 @@ func TestRelay(t *testing.T) { assert.Equal(t, "", event.authKey) } }) + + t.Run("diagnostics forwarding", func(t *testing.T) { + w := httptest.NewRecorder() + body := makeTestEventBuffer("me") + bodyBuffer := bytes.NewBuffer(body) + expectedPath := fmt.Sprintf("/events/bulk/%s", envId) + r, _ := http.NewRequest("POST", "http://localhost"+expectedPath, bodyBuffer) + r.Header.Set("Content-Type", "application/json") + relay.ServeHTTP(w, r) + result := w.Result() + if assert.Equal(t, http.StatusAccepted, result.StatusCode) { + event := requirePublishedEvent(body) + assert.Equal(t, expectedPath, event.url) + assert.Equal(t, "", event.authKey) + } + }) }) t.Run("PHP endpoints", func(t *testing.T) { From fb2a79e2afb172244e33f7d2d43205636a67409b Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 22 Jan 2020 11:02:09 -0800 Subject: [PATCH 2/3] update readme --- README.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index fd068cb2..a9e57247 100644 --- a/README.md +++ b/README.md @@ -441,11 +441,14 @@ Endpoint | Method | Auth Header | Description /sdk/flags/*flagKey* | GET | sdk | For [PHP SDK](#using-with-php) /sdk/segments/*segmentKey* | GET | sdk | For [PHP SDK](#using-with-php) /sdk/goals/*clientId* | GET | n/a | For JS and other client-side SDKs +/mobile | POST | mobile | Same as above /mobile/events | POST | mobile | For receiving events from mobile SDKs /mobile/events/bulk | POST | mobile | Same as above -/mobile | POST | mobile | Same as above +/mobile/events/diagnostic | POST | mobile | Same as above /bulk | POST | sdk | For receiving events from server-side SDKs +/diagnostic | POST | sdk | Same as above /events/bulk/*clientId* | POST, OPTIONS | n/a | For receiving events from JS and other client-side SDKs +/events/diagnostic/*clientId* | POST, OPTIONS | n/a | Same as above /a/*clientId*.gif?d=*events* | GET, OPTIONS | n/a | Same as above /all | GET | sdk | SSE stream for all data /flags | GET | sdk | Legacy SSE stream for flag data From beaa62af7f7db77478994c6d97a0028976a2eea4 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 22 Jan 2020 11:26:40 -0800 Subject: [PATCH 3/3] readme fix --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index a9e57247..4a38e251 100644 --- a/README.md +++ b/README.md @@ -441,8 +441,8 @@ Endpoint | Method | Auth Header | Description /sdk/flags/*flagKey* | GET | sdk | For [PHP SDK](#using-with-php) /sdk/segments/*segmentKey* | GET | sdk | For [PHP SDK](#using-with-php) /sdk/goals/*clientId* | GET | n/a | For JS and other client-side SDKs -/mobile | POST | mobile | Same as above -/mobile/events | POST | mobile | For receiving events from mobile SDKs +/mobile | POST | mobile | For receiving events from mobile SDKs +/mobile/events | POST | mobile | Same as above /mobile/events/bulk | POST | mobile | Same as above /mobile/events/diagnostic | POST | mobile | Same as above /bulk | POST | sdk | For receiving events from server-side SDKs