diff --git a/cmd/connect-client/commands/publish.go b/cmd/connect-client/commands/publish.go index bc9bfdde2..8b015e105 100644 --- a/cmd/connect-client/commands/publish.go +++ b/cmd/connect-client/commands/publish.go @@ -7,6 +7,7 @@ import ( "io/fs" "os" + "github.com/r3labs/sse/v2" "github.com/rstudio/connect-client/internal/apptypes" "github.com/rstudio/connect-client/internal/bundles" "github.com/rstudio/connect-client/internal/bundles/gitignore" @@ -252,7 +253,10 @@ func (cmd *PublishUICmd) Run(args *cli_types.CommonArgs, ctx *cli_types.CLIConte if err != nil { return err } - log := events.NewLoggerWithSSE(args.Debug) + eventServer := sse.New() + eventServer.CreateStream("messages") + + log := events.NewLoggerWithSSE(args.Debug, eventServer) svc := ui.NewUIService( "/", cmd.UIArgs, @@ -260,6 +264,7 @@ func (cmd *PublishUICmd) Run(args *cli_types.CommonArgs, ctx *cli_types.CLIConte ctx.LocalToken, ctx.Fs, ctx.Accounts, - log) + log, + eventServer) return svc.Run() } diff --git a/internal/events/sse_logger.go b/internal/events/sse_logger.go index d33e0c792..9df13c24b 100644 --- a/internal/events/sse_logger.go +++ b/internal/events/sse_logger.go @@ -24,10 +24,8 @@ func NewLogger(debug bool) logging.Logger { return logging.FromStdLogger(slog.New(stderrHandler)) } -func NewLoggerWithSSE(debug bool) logging.Logger { +func NewLoggerWithSSE(debug bool, eventServer *sse.Server) logging.Logger { level := logLevel(debug) - eventServer := sse.New() - eventServer.CreateStream("messages") stderrHandler := slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: level}) sseHandler := NewSSEHandler(eventServer, &SSEHandlerOptions{Level: level}) multiHandler := logging.NewMultiHandler(stderrHandler, sseHandler) diff --git a/internal/events/sse_logger_test.go b/internal/events/sse_logger_test.go index 204ffb265..9b9356241 100644 --- a/internal/events/sse_logger_test.go +++ b/internal/events/sse_logger_test.go @@ -7,6 +7,7 @@ import ( "log/slog" "testing" + "github.com/r3labs/sse/v2" "github.com/rstudio/connect-client/internal/logging" "github.com/rstudio/connect-client/internal/util/utiltest" "github.com/stretchr/testify/suite" @@ -35,6 +36,7 @@ func (s *LoggerSuite) TestNewLoggerDebug() { } func (s *LoggerSuite) TestNewLoggerWithSSE() { - log := NewLoggerWithSSE(false) + sseServer := sse.New() + log := NewLoggerWithSSE(false, sseServer) s.IsType(log.Handler(), &logging.MultiHandler{}) } diff --git a/internal/services/ui/ui_service.go b/internal/services/ui/ui_service.go index 1784e35d5..55f636fdf 100644 --- a/internal/services/ui/ui_service.go +++ b/internal/services/ui/ui_service.go @@ -18,6 +18,7 @@ import ( "github.com/rstudio/connect-client/web" "github.com/gorilla/mux" + "github.com/r3labs/sse/v2" "github.com/spf13/afero" ) @@ -30,9 +31,10 @@ func NewUIService( token services.LocalToken, fs afero.Fs, lister accounts.AccountList, - log logging.Logger) *api.Service { + log logging.Logger, + eventServer *sse.Server) *api.Service { - handler := RouterHandlerFunc(fs, publish, lister, log) + handler := RouterHandlerFunc(fs, publish, lister, log, eventServer) return api.NewService( publish.State, @@ -50,7 +52,7 @@ func NewUIService( ) } -func RouterHandlerFunc(afs afero.Fs, publishArgs *cli_types.PublishArgs, lister accounts.AccountList, log logging.Logger) http.HandlerFunc { +func RouterHandlerFunc(afs afero.Fs, publishArgs *cli_types.PublishArgs, lister accounts.AccountList, log logging.Logger, eventServer *sse.Server) http.HandlerFunc { deployment := publishArgs.State base := deployment.SourceDir @@ -63,6 +65,9 @@ func RouterHandlerFunc(afs afero.Fs, publishArgs *cli_types.PublishArgs, lister r.Handle(ToPath("accounts"), api.GetAccountsHandlerFunc(lister, log)). Methods(http.MethodGet) + // GET /api/events + r.HandleFunc(ToPath("events"), eventServer.ServeHTTP) + // GET /api/files r.Handle(ToPath("files"), api.GetFileHandlerFunc(base, filesService, pathsService, log)). Methods(http.MethodGet) diff --git a/web/src/App.vue b/web/src/App.vue index 3e2e82ab5..654d05888 100644 --- a/web/src/App.vue +++ b/web/src/App.vue @@ -28,6 +28,8 @@ /> @@ -36,7 +38,7 @@ diff --git a/web/src/api/resources/EventStream.ts b/web/src/api/resources/EventStream.ts new file mode 100644 index 000000000..f286aad1f --- /dev/null +++ b/web/src/api/resources/EventStream.ts @@ -0,0 +1,247 @@ +// Copyright (C) 2023 by Posit Software, PBC. + +import camelcaseKeys from 'camelcase-keys'; + +import { + OnMessageEventSourceCallback, + MethodResult, + EventStatus, + EventStreamMessage, + isEventStreamMessage, + EventSubscriptionTarget, + CallbackQueueEntry, +} from 'src/api/types/events.ts'; + +export class EventStream { + private eventSource = null; + private isOpen = false; + private lastError = null; + private debugEnabled = false; + + private subscriptions = []; + + private logMsg(msg: string) { + if (this.debugEnabled) { + console.log(`DEBUG: ${msg}`); + } + } + + private logError(msg: string, error: MethodResult): MethodResult { + this.logMsg(`${msg}: error = ${error?.error}`); + return error; + } + + private matchEvent( + subscriptionType: EventSubscriptionTarget, + incomingEventType: EventSubscriptionTarget + ) { + this.logMsg(`MatchEvent: subscription type: ${subscriptionType}, incomingType: ${incomingEventType}`); + if (subscriptionType.indexOf('*') === 0) { + this.logMsg('matched on *'); + return true; + } + const wildCardIndex = subscriptionType.indexOf('/*'); + // Does the wildcard live at the very end of the subscription type? + if (wildCardIndex > 0 && subscriptionType.length === wildCardIndex + 2) { + const basePath = subscriptionType.substring(0, wildCardIndex); + if (incomingEventType.indexOf(basePath) === 0) { + this.logMsg('matched on start of string'); + return true; + } + } + // Are we using a glob, which is meant to be in the middle of two strings + // which need to be matched + const globIndex = subscriptionType.indexOf('/**/'); + if (globIndex > 0) { + // split our subscription type string into two parts (before and after the glob characters) + const parts = subscriptionType.split('/**/'); + // to match, we must make sure we find that the incoming event type starts + // exactly with our first part and ends with exactly our second part, regardless of how + // many characters in the incoming event type are "consumed" by our glob query. + if ( + incomingEventType.indexOf(parts[0]) === 0 && + incomingEventType.indexOf(parts[1]) === incomingEventType.length - parts[1].length + ) { + this.logMsg('matched on glob'); + return true; + } + } + + // no wild-card. Must match exactly + this.logMsg(`attempt to match on exact string. Result = ${subscriptionType === incomingEventType}`); + return subscriptionType === incomingEventType; + } + + private dispatchMessage(msg: EventStreamMessage) { + let numMatched = 0; + this.subscriptions.forEach(entry => { + if (this.matchEvent(entry.eventType, msg.type)) { + numMatched++; + entry.callback(msg); + } + }); + if (numMatched === 0 && msg.type !== 'errors/open') { + this.logMsg(`WARNING! No subscriber/handler found for msg: ${JSON.stringify}`); + this.dispatchMessage({ + type: 'errors/unknownEvent', + time: new Date().toString(), + data: { + event: msg, + }, + }); + } + } + + private onRawOpenCallback() { + this.logMsg(`received RawOpenCallback`); + this.isOpen = true; + this.dispatchMessage({ + type: 'open/sse', + time: new Date().toString(), + data: {}, + }); + } + + private onErrorRawCallback(e: Event) { + // errors are fatal, connection is down. + // not receiving anything of value from calling parameters. only : {"isTrusted":true} + this.logMsg(`received ErrorRawCallback: ${JSON.stringify(e)}`); + this.isOpen = false; + this.lastError = `unknown error with connection ${Date.now()}`; + const now = new Date(); + this.dispatchMessage({ + type: 'errors/open', + time: now.toString(), + data: { msg: `${this.lastError}` }, + }); + } + + private parseMessageData(data: string) : EventStreamMessage | null { + const rawObj = JSON.parse(data); + const obj = camelcaseKeys(rawObj); + if (isEventStreamMessage(obj)) { + return obj; + } + return null; + } + + private onMessageRawCallback(msg: MessageEvent) { + this.logMsg(`received MessageRawCallback (for real): ${msg.data}`); + const parsed = this.parseMessageData(msg.data); + if (!parsed) { + const errorMsg = `Invalid EventStreamMessage received: ${msg.data}`; + const now = new Date(); + this.dispatchMessage({ + type: 'errors/open', + time: now.toString(), + data: { msg: `${errorMsg}` }, + }); + return; + } + this.logMsg(`Received event type = ${parsed.type}`); + this.dispatchMessage(parsed); + } + + private initializeConnection(url: string, withCredentials: boolean): MethodResult { + this.logMsg(`initializing connection to ${url}, with credentials: ${withCredentials}`); + this.eventSource = new EventSource(url, { withCredentials: withCredentials }); + this.eventSource.onopen = () => this.onRawOpenCallback(); + // nothing good seems to come with the error data. Only get {"isTrusted":true} + this.eventSource.onerror = (e) => this.onErrorRawCallback(e); + this.eventSource.onmessage = (msg: MessageEvent) => this.onMessageRawCallback(msg); + return { + ok: true, + }; + } + + public open(url: string, withCredentials = false): MethodResult { + this.logMsg(`opening connection ${url}, with credentials: ${withCredentials}}`); + if (this.isOpen) { + return this.logError( + `failure opening connection`, + { + ok: false, + error: `EventStream instance has already been initialized to ${url}.`, + } + ); + } + if (!url) { + return this.logError( + `failure opening connection`, + { + ok: false, + error: `URL parameter must be a non-empty string.`, + } + ); + } + return this.initializeConnection(url, withCredentials); + } + + public close(): MethodResult { + if (this.isOpen && this.eventSource !== null) { + this.eventSource.close(); + this.eventSource = null; + this.isOpen = false; + return { + ok: true, + }; + } + return this.logError( + `failure closing connection`, + { + ok: false, + error: `EventSource is not open.`, + } + ); + } + + public addEventMonitorCallback( + targets: EventSubscriptionTarget[], + cb: OnMessageEventSourceCallback + ) { + for (const t in targets) { + this.subscriptions.push({ + eventType: targets[t], + callback: cb, + }); + } + } + + public delEventFilterCallback(cb: OnMessageEventSourceCallback) { + let found = false; + let index = -1; + // We may have multiple events being delivered to same callback + // so we have to search until we do not find anything + do { + index = this.subscriptions.findIndex(entry => entry.callback === cb); + if (index >= 0) { + this.subscriptions.splice(index, 1); + found = true; + } + } while (index >= 0); + if (found) { + this.logMsg(`delEventFilterCallback found at least one match!`); + } else { + this.logMsg(`delEventFilterCallback did NOT match any subcription callbacks!`); + } + return found; + } + + public status(): EventStatus { + return { + withCredentials: this.eventSource?.withCredentials, + readyState: this.eventSource?.readyState, + url: this.eventSource ? this.eventSource.url : null, + lastError: this.lastError, + isOpen: this.isOpen, + eventSource: this.eventSource ? 'eventSource has been initialized' : 'eventSource not yet initialized', + }; + } + + public setDebugMode(val: boolean) { + this.debugEnabled = val; + if (val) { + this.logMsg(`debug logging is enabled!`); + } + } +} diff --git a/web/src/api/types/events.ts b/web/src/api/types/events.ts new file mode 100644 index 000000000..c882308dc --- /dev/null +++ b/web/src/api/types/events.ts @@ -0,0 +1,82 @@ +// Copyright (C) 2023 by Posit Software, PBC. + +export enum EventSourceReadyState { + CONNECTING = 0, + OPEN = 1, + CLOSED = 2, +} + +export enum EventStreamMessageType { + ERROR = 'error', + LOG = 'log', +} + +export type EventStreamMessage = { + type: EventSubscriptionTarget, + time: string, + data: object, +} + +export function isEventStreamMessage(o: object): o is EventStreamMessage { + return ( + 'type' in o && + 'time' in o && + 'data' in o + ); +} + +export type OnMessageEventSourceCallback = (msg: EventStreamMessage) => void; + +export type MethodResult = { + ok: boolean, + error?: string, +} + +export type EventStatus = { + isOpen?: boolean, + eventSource: string, + withCredentials?: boolean, + readyState?: EventSourceReadyState, + url: string | null, + lastError: string | null, +} + +export type CallbackQueueEntry = { + eventType: EventSubscriptionTarget, + callback: OnMessageEventSourceCallback, +} + +export type EventSubscriptionTarget = + '*' | // all events + + 'agent/log' | // agent console log messages + + 'errors/*' | // all errors + 'errors/sse' | + 'errors/open' | + 'errors/unknownEvent' | + + 'open/*' | // open events + 'open/sse' | + + 'publish/createBundle/start' | + 'publish/createBundle/success' | + + 'publish/createDeployment/start' | + 'publish/createDeployment/success' | + + 'publish/uploadBundle/start' | + 'publish/uploadBundle/success' | + + 'publish/deployBundle/start' | + 'publish/deployBundle/success' | + + 'publish/**/log' | + + 'publish/restorePythonEnv/log' | + 'publish/restorePythonEnv/success' | + + 'publish/runContent/log' | + 'publish/runContent/success' | + + 'publish/success'; diff --git a/web/src/components/publishProcess/PublishProcess.vue b/web/src/components/publishProcess/PublishProcess.vue index f221b4d35..9f866ee1e 100644 --- a/web/src/components/publishProcess/PublishProcess.vue +++ b/web/src/components/publishProcess/PublishProcess.vue @@ -8,21 +8,75 @@ >
+
+

Temporary Event Display for Publishing Process

+
+ {{ eventItem }} +
+

+   +

+