Skip to content

Commit

Permalink
Merge pull request #114 from launchdarkly/eb/ch60771/forward-diagnostics
Browse files Browse the repository at this point in the history
forward diagnostic events
  • Loading branch information
eli-darkly authored Jan 22, 2020
2 parents d10a5c1 + beaa62a commit 0e31bf3
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 61 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -441,11 +441,14 @@ Endpoint | Method | Auth Header | Description
/sdk/flags/*flagKey* | GET | sdk | For [PHP SDK](#using-with-php)
/sdk/segments/*segmentKey* | GET | sdk | For [PHP SDK](#using-with-php)
/sdk/goals/*clientId* | GET | n/a | For JS and other client-side SDKs
/mobile/events | POST | mobile | For receiving events from mobile SDKs
/mobile | POST | mobile | For receiving events from mobile SDKs
/mobile/events | POST | mobile | Same as above
/mobile/events/bulk | POST | mobile | Same as above
/mobile | POST | mobile | Same as above
/mobile/events/diagnostic | POST | mobile | Same as above
/bulk | POST | sdk | For receiving events from server-side SDKs
/diagnostic | POST | sdk | Same as above
/events/bulk/*clientId* | POST, OPTIONS | n/a | For receiving events from JS and other client-side SDKs
/events/diagnostic/*clientId* | POST, OPTIONS | n/a | Same as above
/a/*clientId*.gif?d=*events* | GET, OPTIONS | n/a | Same as above
/all | GET | sdk | SSE stream for all data
/flags | GET | sdk | Legacy SSE stream for flag data
Expand Down
3 changes: 3 additions & 0 deletions endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,11 @@ func pollSegmentHandler(w http.ResponseWriter, req *http.Request) {

// Event-recorder endpoints:
// events.ld.com/bulk (server-side)
// events.ld.com/diagnostic (server-side diagnostic)
// events.ld.com/mobile, events.ld.com/mobile/events, events.ld.com/mobileevents/bulk (mobile)
// events.ld.com/mobile/events/diagnostic (mobile diagnostic)
// events.ld.com/events/bulk/{envId} (JS)
// events.ld.com/events/diagnostic/{envId} (JS)
func bulkEventHandler(endpoint events.Endpoint) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, req *http.Request) {
clientCtx := getClientContext(req)
Expand Down
173 changes: 129 additions & 44 deletions internal/events/event-relay.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package events

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
Expand All @@ -16,7 +17,6 @@ import (

"gopkg.in/launchdarkly/ld-relay.v5/httpconfig"
"gopkg.in/launchdarkly/ld-relay.v5/internal/util"
"gopkg.in/launchdarkly/ld-relay.v5/logging"
)

// EventRelay configuration - used in the config file struct in relay.go
Expand All @@ -35,15 +35,21 @@ type Endpoint interface {
}

type (
serverSDKEventsEndpoint struct{}
mobileSDKEventsEndpoint struct{}
javaScriptSDKEventsEndpoint struct{}
serverSDKEventsEndpoint struct{}
mobileSDKEventsEndpoint struct{}
javaScriptSDKEventsEndpoint struct{}
serverDiagnosticEventsEndpoint struct{}
mobileDiagnosticEventsEndpoint struct{}
javaScriptDiagnosticEventsEndpoint struct{}
)

var (
ServerSDKEventsEndpoint = &serverSDKEventsEndpoint{}
MobileSDKEventsEndpoint = &mobileSDKEventsEndpoint{}
JavaScriptSDKEventsEndpoint = &javaScriptSDKEventsEndpoint{}
ServerSDKEventsEndpoint = &serverSDKEventsEndpoint{}
MobileSDKEventsEndpoint = &mobileSDKEventsEndpoint{}
JavaScriptSDKEventsEndpoint = &javaScriptSDKEventsEndpoint{}
ServerSDKDiagnosticEventsEndpoint = &serverDiagnosticEventsEndpoint{}
MobileSDKDiagnosticEventsEndpoint = &mobileDiagnosticEventsEndpoint{}
JavaScriptSDKDiagnosticEventsEndpoint = &javaScriptDiagnosticEventsEndpoint{}
)

type eventVerbatimRelay struct {
Expand All @@ -68,10 +74,14 @@ const (

// EventDispatcher relays events to LaunchDarkly for an environment
type EventDispatcher struct {
endpoints map[Endpoint]*eventEndpointDispatcher
endpoints map[Endpoint]eventEndpointDispatcher
}

type eventEndpointDispatcher struct {
type eventEndpointDispatcher interface {
dispatch(w http.ResponseWriter, req *http.Request)
}

type analyticsEventEndpointDispatcher struct {
config Config
httpClient *http.Client
httpConfig httpconfig.HTTPConfig
Expand All @@ -84,6 +94,12 @@ type eventEndpointDispatcher struct {
mu sync.Mutex
}

type diagnosticEventEndpointDispatcher struct {
httpClient *http.Client
remoteEndpointURI string
loggers ldlog.Loggers
}

func (e *serverSDKEventsEndpoint) String() string {
return "ServerSDKEventsEndpoint"
}
Expand All @@ -96,19 +112,98 @@ func (e *javaScriptSDKEventsEndpoint) String() string {
return "JavaScriptSDKEventsEndpoint"
}

func (e *serverDiagnosticEventsEndpoint) String() string {
return "ServerDiagnosticEventsEndpoint"
}

func (e *mobileDiagnosticEventsEndpoint) String() string {
return "MobileDiagnosticEventsEndpoint"
}

func (e *javaScriptDiagnosticEventsEndpoint) String() string {
return "JavaScriptDiagnosticEventsEndpoint"
}

func (r *EventDispatcher) GetHandler(endpoint Endpoint) func(w http.ResponseWriter, req *http.Request) {
d := r.endpoints[endpoint]
if d != nil {
return d.dispatchEvents
return d.dispatch
}
return nil
}

func (r *eventEndpointDispatcher) dispatchEvents(w http.ResponseWriter, req *http.Request) {
func (r *analyticsEventEndpointDispatcher) dispatch(w http.ResponseWriter, req *http.Request) {
consumeEvents(w, req, r.loggers, func(body []byte) {
evts := make([]json.RawMessage, 0)
err := json.Unmarshal(body, &evts)
if err != nil {
r.loggers.Errorf("Error unmarshaling event post body: %+v", err)
return
}

payloadVersion, _ := strconv.Atoi(req.Header.Get(EventSchemaHeader))
if payloadVersion == 0 {
payloadVersion = 1
}
r.loggers.Debugf("Received %d events (v%d) to be proxied to %s", len(evts), payloadVersion, r.remotePath)
if payloadVersion >= SummaryEventsSchemaVersion {
// New-style events that have already gone through summarization - deliver them as-is
r.getVerbatimRelay().enqueue(evts)
} else {
r.getSummarizingRelay().enqueue(evts, payloadVersion)
}
})
}

func (d *diagnosticEventEndpointDispatcher) dispatch(w http.ResponseWriter, req *http.Request) {
consumeEvents(w, req, d.loggers, func(body []byte) {
// We are just operating as a reverse proxy and passing the request on verbatim to LD; we do not
// need to parse the JSON.
d.loggers.Debugf("Received diagnostic event to be proxied to %s", d.remoteEndpointURI)

for attempt := 0; attempt < 2; attempt++ { // use the same retry logic that the SDK uses
if attempt > 0 {
d.loggers.Warn("Will retry posting diagnostic event after 1 second")
time.Sleep(1 * time.Second)
}

forwardReq, reqErr := http.NewRequest("POST", d.remoteEndpointURI, bytes.NewReader(body))
if reqErr != nil {
d.loggers.Errorf("Unexpected error while creating event request: %+v", reqErr)
return
}
forwardReq.Header.Add("Content-Type", "application/json")

// Copy the Authorization header, if any (used only for server-side and mobile); also copy User-Agent
if authKey := req.Header.Get("Authorization"); authKey != "" {
forwardReq.Header.Add("Authorization", authKey)
}
if userAgent := req.Header.Get("User-Agent"); userAgent != "" {
forwardReq.Header.Add("User-Agent", userAgent)
}
// diagnostic events do not have schema or payload ID headers

resp, respErr := d.httpClient.Do(forwardReq)
if resp != nil && resp.Body != nil {
_, _ = ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
}
if respErr != nil {
d.loggers.Warnf("Unexpected error while sending events: %+v", respErr)
} else if resp.StatusCode >= 400 {
d.loggers.Warnf("Received error status %d when sending events", resp.StatusCode)
} else {
break
}
}
})
}

func consumeEvents(w http.ResponseWriter, req *http.Request, loggers ldlog.Loggers, thenExecute func([]byte)) {
body, bodyErr := ioutil.ReadAll(req.Body)

if bodyErr != nil {
r.loggers.Errorf("Error reading event post body: %+v", bodyErr)
loggers.Errorf("Error reading event post body: %+v", bodyErr)
w.WriteHeader(http.StatusBadRequest)
w.Write(util.ErrorJsonMsg("unable to read request body"))
return
Expand All @@ -126,35 +221,14 @@ func (r *eventEndpointDispatcher) dispatchEvents(w http.ResponseWriter, req *htt
go func() {
defer func() {
if err := recover(); err != nil {
r.loggers.Errorf("Unexpected panic in event relay: %+v", err)
loggers.Errorf("Unexpected panic in event relay: %+v", err)
}
}()

evts := make([]json.RawMessage, 0)
err := json.Unmarshal(body, &evts)
if err != nil {
r.loggers.Errorf("Error unmarshaling event post body: %+v", err)
return
}

payloadVersion, _ := strconv.Atoi(req.Header.Get(EventSchemaHeader))
if payloadVersion == 0 {
payloadVersion = 1
}
// This debug-level log message goes to logging.GlobalLoggers, not to r.loggers, because it is more of a
// message from ld-relay itself about a client request, rather than SDK logging about requests
// that ld-relay makes.
logging.GlobalLoggers.Debugf("Received %d events (v%d) to be proxied to %s", len(evts), payloadVersion, r.remotePath)
if payloadVersion >= SummaryEventsSchemaVersion {
// New-style events that have already gone through summarization - deliver them as-is
r.getVerbatimRelay().enqueue(evts)
} else {
r.getSummarizingRelay().enqueue(evts, payloadVersion)
}
thenExecute(body)
}()
}

func (r *eventEndpointDispatcher) getVerbatimRelay() *eventVerbatimRelay {
func (r *analyticsEventEndpointDispatcher) getVerbatimRelay() *eventVerbatimRelay {
r.mu.Lock()
defer r.mu.Unlock()
if r.verbatimRelay == nil {
Expand All @@ -163,7 +237,7 @@ func (r *eventEndpointDispatcher) getVerbatimRelay() *eventVerbatimRelay {
return r.verbatimRelay
}

func (r *eventEndpointDispatcher) getSummarizingRelay() *eventSummarizingRelay {
func (r *analyticsEventEndpointDispatcher) getSummarizingRelay() *eventSummarizingRelay {
r.mu.Lock()
defer r.mu.Unlock()
if r.summarizingRelay == nil {
Expand All @@ -176,22 +250,33 @@ func (r *eventEndpointDispatcher) getSummarizingRelay() *eventSummarizingRelay {
func NewEventDispatcher(sdkKey string, mobileKey *string, envID *string, loggers ldlog.Loggers, config Config, httpConfig httpconfig.HTTPConfig, featureStore ld.FeatureStore) *EventDispatcher {
httpClient := httpConfig.Client()
ep := &EventDispatcher{
endpoints: map[Endpoint]*eventEndpointDispatcher{
ServerSDKEventsEndpoint: newEventEndpointDispatcher(sdkKey, config, httpConfig, httpClient, featureStore, loggers, "/bulk"),
endpoints: map[Endpoint]eventEndpointDispatcher{
ServerSDKEventsEndpoint: newAnalyticsEventEndpointDispatcher(sdkKey, config, httpConfig, httpClient, featureStore, loggers, "/bulk"),
},
}
ep.endpoints[ServerSDKDiagnosticEventsEndpoint] = newDiagnosticEventEndpointDispatcher(config, httpClient, loggers, "/diagnostic")
if mobileKey != nil {
ep.endpoints[MobileSDKEventsEndpoint] = newEventEndpointDispatcher(*mobileKey, config, httpConfig, httpClient, featureStore, loggers, "/mobile")
ep.endpoints[MobileSDKEventsEndpoint] = newAnalyticsEventEndpointDispatcher(*mobileKey, config, httpConfig, httpClient, featureStore, loggers, "/mobile")
ep.endpoints[MobileSDKDiagnosticEventsEndpoint] = newDiagnosticEventEndpointDispatcher(config, httpClient, loggers, "/mobile/events/diagnostic")
}
if envID != nil {
ep.endpoints[JavaScriptSDKEventsEndpoint] = newEventEndpointDispatcher("", config, httpConfig, httpClient, featureStore, loggers, "/events/bulk/"+*envID)
ep.endpoints[JavaScriptSDKEventsEndpoint] = newAnalyticsEventEndpointDispatcher("", config, httpConfig, httpClient, featureStore, loggers, "/events/bulk/"+*envID)
ep.endpoints[JavaScriptSDKDiagnosticEventsEndpoint] = newDiagnosticEventEndpointDispatcher(config, httpClient, loggers, "/events/diagnostic/"+*envID)
}
return ep
}

func newEventEndpointDispatcher(authKey string, config Config, httpConfig httpconfig.HTTPConfig,
httpClient *http.Client, featureStore ld.FeatureStore, loggers ldlog.Loggers, remotePath string) *eventEndpointDispatcher {
return &eventEndpointDispatcher{
func newDiagnosticEventEndpointDispatcher(config Config, httpClient *http.Client, loggers ldlog.Loggers, remotePath string) *diagnosticEventEndpointDispatcher {
return &diagnosticEventEndpointDispatcher{
httpClient: httpClient,
remoteEndpointURI: strings.TrimRight(config.EventsUri, "/") + remotePath,
loggers: loggers,
}
}

func newAnalyticsEventEndpointDispatcher(authKey string, config Config, httpConfig httpconfig.HTTPConfig,
httpClient *http.Client, featureStore ld.FeatureStore, loggers ldlog.Loggers, remotePath string) *analyticsEventEndpointDispatcher {
return &analyticsEventEndpointDispatcher{
authKey: authKey,
config: config,
httpConfig: httpConfig,
Expand Down
6 changes: 6 additions & 0 deletions relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,16 @@ func (r *Relay) makeHandler(withRequestLogging bool) http.Handler {
mobileEventsRouter.HandleFunc("/events/bulk", bulkEventHandler(events.MobileSDKEventsEndpoint)).Methods("POST")
mobileEventsRouter.HandleFunc("/events", bulkEventHandler(events.MobileSDKEventsEndpoint)).Methods("POST")
mobileEventsRouter.HandleFunc("", bulkEventHandler(events.MobileSDKEventsEndpoint)).Methods("POST")
mobileEventsRouter.HandleFunc("/events/diagnostic", bulkEventHandler(events.MobileSDKDiagnosticEventsEndpoint)).Methods("POST")

clientSideBulkEventsRouter := router.PathPrefix("/events/bulk/{envId}").Subrouter()
clientSideBulkEventsRouter.Use(clientSideMiddlewareStack, mux.CORSMethodMiddleware(clientSideBulkEventsRouter))
clientSideBulkEventsRouter.HandleFunc("", bulkEventHandler(events.JavaScriptSDKEventsEndpoint)).Methods("POST", "OPTIONS")

clientSideDiagnosticEventsRouter := router.PathPrefix("/events/diagnostic/{envId}").Subrouter()
clientSideDiagnosticEventsRouter.Use(clientSideMiddlewareStack, mux.CORSMethodMiddleware(clientSideBulkEventsRouter))
clientSideDiagnosticEventsRouter.HandleFunc("", bulkEventHandler(events.JavaScriptSDKDiagnosticEventsEndpoint)).Methods("POST", "OPTIONS")

clientSideImageEventsRouter := router.PathPrefix("/a/{envId}.gif").Subrouter()
clientSideImageEventsRouter.Use(clientSideMiddlewareStack, mux.CORSMethodMiddleware(clientSideImageEventsRouter))
clientSideImageEventsRouter.HandleFunc("", getEventsImage).Methods("GET", "OPTIONS")
Expand All @@ -376,6 +381,7 @@ func (r *Relay) makeHandler(withRequestLogging bool) http.Handler {
serverSideRouter.HandleFunc("/all", countServerConns(allStreamHandler)).Methods("GET")
serverSideRouter.HandleFunc("/flags", countServerConns(flagsStreamHandler)).Methods("GET")
serverSideRouter.HandleFunc("/bulk", bulkEventHandler(events.ServerSDKEventsEndpoint)).Methods("POST")
serverSideRouter.HandleFunc("/diagnostic", bulkEventHandler(events.ServerSDKDiagnosticEventsEndpoint)).Methods("POST")
return router
}

Expand Down
Loading

0 comments on commit 0e31bf3

Please sign in to comment.