Skip to content

Commit

Permalink
Merge pull request #59 from outfoxx/fix/event_source_pings
Browse files Browse the repository at this point in the history
Filter EventSource logging and ensure support for “pings”
  • Loading branch information
kdubb authored Nov 20, 2023
2 parents cc8582a + 807c46b commit 1f28bd7
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 25 deletions.
77 changes: 52 additions & 25 deletions src/fetch-event-source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { EMPTY, map, Observable, of, Subscription, switchMap } from 'rxjs';
import { fromReadableStreamLike } from 'rxjs/internal/observable/innerFrom';
import { EventInfo, EventParser } from './event-parser';
import { validate } from './fetch';
import { Logger } from './logger';
import { levelLogger, Logger, LogLevel } from './logger';
import { MediaType } from './media-type';
import { ExtEventSource } from './request-factory';
import { unknownGet, unknownSet } from './util/any';
Expand All @@ -28,6 +28,7 @@ export interface FetchEventSource {
listener: (this: EventSource, ev: EventSourceEventMap[K]) => any,
options?: boolean | AddEventListenerOptions,
): void;

removeEventListener<K extends keyof EventSourceEventMap>(
type: K,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand All @@ -38,7 +39,8 @@ export interface FetchEventSource {

export class FetchEventSource extends EventTarget implements ExtEventSource {
private static LAST_EVENT_ID_HEADER = 'Last-Event-ID';
private static MAX_RETRY_TIME_MULTIPLE = 30;
private static MAX_RETRY_TIME_MULTIPLIER = 12;
private static RETRY_EXPONENT = 2.6;
private static EVENT_TIMEOUT_DEFAULT = 75;
private static EVENT_TIMEOUT_CHECK_INTERVAL = 2;

Expand All @@ -64,7 +66,7 @@ export class FetchEventSource extends EventTarget implements ExtEventSource {
private connectionSubscription?: Subscription;
private internalRetryTime = 100;
private retryAttempt = 0;
private connectionAttemptTime = 0;
private connectionAttemptTime: number | undefined;
private connectionOrigin?: string;
private reconnectTimeoutHandle?: number;
private lastEventId?: string;
Expand All @@ -90,7 +92,7 @@ export class FetchEventSource extends EventTarget implements ExtEventSource {
this.eventTimeout =
eventSourceInit?.eventTimeout ??
FetchEventSource.EVENT_TIMEOUT_DEFAULT * 1000;
this.logger = eventSourceInit?.logger;
this.logger = levelLogger(LogLevel.Info, eventSourceInit?.logger);
}

//
Expand Down Expand Up @@ -191,7 +193,7 @@ export class FetchEventSource extends EventTarget implements ExtEventSource {
return;
}

this.logger?.debug?.('close requested');
this.logger?.debug?.('closing');

this.readyState = this.CLOSED;

Expand All @@ -211,14 +213,18 @@ export class FetchEventSource extends EventTarget implements ExtEventSource {
// Event Timeout
//

private updateLastEventReceived(time: number = Date.now()) {
this.lastEventReceivedTime = time;
}

private startEventTimeoutCheck(lastEventReceivedTime: number) {
this.stopEventTimeoutCheck();

if (!this.eventTimeout) {
return;
}

this.lastEventReceivedTime = lastEventReceivedTime;
this.updateLastEventReceived(lastEventReceivedTime);

this.logger?.trace?.('starting event timeout checks');

Expand Down Expand Up @@ -274,7 +280,7 @@ export class FetchEventSource extends EventTarget implements ExtEventSource {
return;
}

this.logger?.debug?.('opened');
this.logger?.info?.('opened');

this.connectionOrigin = response.url;
this.retryAttempt = 0;
Expand Down Expand Up @@ -336,21 +342,15 @@ export class FetchEventSource extends EventTarget implements ExtEventSource {
private scheduleReconnect() {
this.internalClose();

// calculate total delay
const backOffDelay = Math.pow(this.retryAttempt, 2) * this.retryTime;
let retryDelay = Math.min(
this.retryTime + backOffDelay,
this.retryTime * FetchEventSource.MAX_RETRY_TIME_MULTIPLE,
);
const lastConnectionTime = this.connectionAttemptTime
? Date.now() - this.connectionAttemptTime
: 0;

// Adjust delay by amount of time last connect
// cycle took, except on the first attempt
if (this.retryAttempt > 0) {
const connectionTime = Date.now() - this.connectionAttemptTime;
// Ensure delay is at least as large as
// minimum retry time interval
retryDelay = Math.max(retryDelay - connectionTime, this.retryTime);
}
const retryDelay = FetchEventSource.calculateRetryTime(
this.retryAttempt,
this.retryTime,
lastConnectionTime,
);

this.retryAttempt++;
this.readyState = this.CONNECTING;
Expand All @@ -363,6 +363,32 @@ export class FetchEventSource extends EventTarget implements ExtEventSource {
);
}

private static calculateRetryTime(
retryAttempt: number,
retryTime: number,
lastConnectTime: number,
): number {
const retryMultiplier = Math.min(
retryAttempt,
this.MAX_RETRY_TIME_MULTIPLIER,
);

// calculate total delay
let retryDelay = Math.pow(retryMultiplier, this.RETRY_EXPONENT) * retryTime;

// Adjust delay by amount of time last connect
// cycle took, except on the first attempt
if (retryAttempt > 0) {
retryDelay -= lastConnectTime;

// Ensure delay is at least as large as
// minimum retry time interval
retryDelay = Math.max(retryDelay, retryTime);
}

return retryDelay;
}

private clearReconnect() {
if (this.reconnectTimeoutHandle) {
clearTimeout(this.reconnectTimeoutHandle);
Expand All @@ -375,7 +401,7 @@ export class FetchEventSource extends EventTarget implements ExtEventSource {
//

private dispatchParsedEvent = (eventInfo: EventInfo) => {
this.lastEventReceivedTime = Date.now();
this.updateLastEventReceived();

if (eventInfo.retry) {
const retryTime = Number.parseInt(eventInfo.retry, 10);
Expand All @@ -385,7 +411,7 @@ export class FetchEventSource extends EventTarget implements ExtEventSource {

this.internalRetryTime = retryTime;
} else {
this.logger?.debug?.('ignoring invalid retry timeout event', {
this.logger?.warn?.('ignoring invalid retry timeout event', {
eventInfo,
});
}
Expand All @@ -398,6 +424,7 @@ export class FetchEventSource extends EventTarget implements ExtEventSource {
eventInfo.data == null
) {
// skip empty event
this.logger?.trace?.('skipping empty event');
return;
}

Expand All @@ -407,8 +434,8 @@ export class FetchEventSource extends EventTarget implements ExtEventSource {
if (eventInfo.id.indexOf('\0') == -1) {
this.lastEventId = eventInfo.id;
} else {
this.logger?.debug?.(
'event id contains null, unable to use for last-event-id',
this.logger?.warn?.(
'event id contains NULL byte, unable to use for last-event-id',
);
}
}
Expand Down
26 changes: 26 additions & 0 deletions src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,29 @@ export interface Logger {
warn?(...data: unknown[]): void;
error?(...data: unknown[]): void;
}

export enum LogLevel {
Trace = 4,
Debug = 3,
Info = 2,
Warn = 1,
Error = 0,
None = -1,
}

export function levelLogger(
level: LogLevel,
logger?: Logger,
): Logger | undefined {
if (!logger) {
return undefined;
}
return {
log: logger?.log?.bind(logger),
trace: level >= LogLevel.Trace ? logger?.trace?.bind(logger) : undefined,
debug: level >= LogLevel.Debug ? logger?.debug?.bind(logger) : undefined,
info: level >= LogLevel.Info ? logger?.info?.bind(logger) : undefined,
warn: level >= LogLevel.Warn ? logger?.warn?.bind(logger) : undefined,
error: level >= LogLevel.Error ? logger?.error?.bind(logger) : undefined,
};
}
44 changes: 44 additions & 0 deletions test/fetch-event-source.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,50 @@ describe('FetchEventSource', () => {
}, 250);
});

it('counts comment only pings as events but does not dispatch', (done) => {
const eventStream = new TextEncoder().encode(
': ping\n\n: ping\n\nevent: hello\nid: 12345\ndata: Hello World!\n\n',
).buffer;

fetchMock.getOnce(
'http://example.com',
() =>
new Response(new Blob([eventStream]), {
headers: { 'content-type': MediaType.EventStream.toString() },
}),
);
fetchMock.get(
'http://example.com',
{ status: 503 },
{ overwriteRoutes: false },
);

const eventSource = new FetchEventSource('http://example.com');

const lastEventReceivedTimeSet = spyOn(
eventSource,
'updateLastEventReceived' as any, // eslint-disable-line @typescript-eslint/no-explicit-any
).and.callThrough();

const dispatchEventSpy = spyOn(
eventSource,
'dispatchEvent',
).and.callThrough();

eventSource.onmessage = (ev) => {
expect(ev.type).toEqual('hello');
expect(ev.data).toEqual('Hello World!');

if (ev.type === 'hello') {
expect(dispatchEventSpy).toHaveBeenCalledTimes(1);
expect(lastEventReceivedTimeSet).toHaveBeenCalledTimes(4);
done();
}
};

eventSource.connect();
});

xit('survives disconnections & close/connect cycles', (done) => {
let messagesReceived = 0;
const eventSource = new FetchEventSource('http://localhost:5555/stream', {
Expand Down

0 comments on commit 1f28bd7

Please sign in to comment.