Skip to content

Commit

Permalink
fix: error handling for Observable, #7
Browse files Browse the repository at this point in the history
  • Loading branch information
xavierchow committed Mar 29, 2022
1 parent 1edbfd8 commit 47b78f7
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 12 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,13 @@ export class SomeService {
@EventPattern({
topic: 'topic1',
channel: 'channel1',
options: { // optional
maxAttempts: 3
}
})
messageHandlerForTopic1(@Payload() payload: any, @Ctx() context: NsqContext)
// Handle messages
// Notes: if you throw an Error from this messageHandler, it's better to use `RpcException` so that we have explicit log in nsq-transporter.
}
}
```
Expand Down
5 changes: 5 additions & 0 deletions src/interfaces/nsq-options.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,8 @@ export interface NsqOptions {
deserializer?: ConsumerDeserializer;
logger?: LoggerService | LogLevel[] | false;
}

export type NsqConsumerOptions = Pick<
NsqOptions,
'discardHandler' | 'requeueDelay' | 'maxAttempts' | 'lookupdPollInterval'
>;
54 changes: 44 additions & 10 deletions src/responder/server-nsq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@ import { Server, CustomTransportStrategy } from '@nestjs/microservices';
import { Consumer } from 'nsq-strategies';
import { Logger } from '@nestjs/common';

import { NsqOptions } from '../interfaces/nsq-options.interface';
import {
NsqConsumerOptions,
NsqOptions,
} from '../interfaces/nsq-options.interface';
import { InboundMessageDeserializer } from './inbound-message-deserializer';
import { NsqContext } from './nsq-context';
import { firstValueFrom, isObservable, EMPTY, catchError } from 'rxjs';
const CONSUME_ERR = Symbol();

export class ServerNsq extends Server implements CustomTransportStrategy {
private nsqConsumers: Consumer[];
Expand Down Expand Up @@ -42,11 +47,12 @@ export class ServerNsq extends Server implements CustomTransportStrategy {
private createConsumer(
topic: string,
channel: string,
options: any,
options: NsqConsumerOptions,
): Consumer {
const c = new Consumer(topic, channel, options || this.options);
if (this.options.discardHandler) {
c.reader.on('discard', this.options.discardHandler);
const c = new Consumer(topic, channel, options);

if (options.discardHandler) {
c.reader.on('discard', options.discardHandler);
}
c.reader.on('error', (err: Error) => {
this.logger.error(
Expand Down Expand Up @@ -104,7 +110,11 @@ export class ServerNsq extends Server implements CustomTransportStrategy {
if (handler.isEventHandler) {
const { topic, channel, options } = JSON.parse(pattern);

const c = this.createConsumer(topic, channel, options);
const consumerOptions = this.transformToConsumerOptions(options);

const DEFAULT_REQUEUE_DELAY = 90 * 1000;
const c = this.createConsumer(topic, channel, consumerOptions);

this.nsqConsumers.push(c);
c.consume(async (msg: any) => {
this.logger.log(
Expand All @@ -115,14 +125,34 @@ export class ServerNsq extends Server implements CustomTransportStrategy {
topic,
channel,
});
let source;
try {
await handler(packet.data, nsqCtx);
msg.finish();
source = await handler(packet.data, nsqCtx);
if (!isObservable(source)) {
return msg.finish();
}
} catch (err) {
this.logger.error(
`consumer reader failed to process message with error: ${err}, topic: ${topic}, channel: ${channel}`,
`consumer reader failed to process message with error: ${err.message}, topic: ${topic}, channel: ${channel}`,
);
msg.requeue(this.options.requeueDelay || 90 * 1000);
msg.requeue(consumerOptions.requeueDelay || DEFAULT_REQUEUE_DELAY);
}

const wrappedErrObservable = source.pipe(
catchError((err, caught) => {
this.logger.error(
`consumer reader failed to process message with Observable error: ${err.message}, topic: ${topic}, channel: ${channel}`,
);
return EMPTY;
}),
);
const first = await firstValueFrom(wrappedErrObservable, {
defaultValue: CONSUME_ERR,
});
if (first === CONSUME_ERR) {
msg.requeue(consumerOptions.requeueDelay || DEFAULT_REQUEUE_DELAY);
} else {
msg.finish();
}
});
} else {
Expand All @@ -131,6 +161,10 @@ export class ServerNsq extends Server implements CustomTransportStrategy {
});
}

private transformToConsumerOptions(options: NsqOptions): NsqConsumerOptions {
return Object.assign({}, this.options, options);
}

/**
* close() is required by `CustomTransportStrategy`
*/
Expand Down
18 changes: 18 additions & 0 deletions test/app.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ describe('AppController (e2e)', () => {
let app: INestApplication;
let controller: AppController;
let nsqd;
let mockDiscardHdlr;
beforeAll(async () => {
const nsqdHTTPAddress = 'http://localhost:4151';
const lookupdHttpAddrs = ['http://localhost:4161'];
Expand All @@ -30,10 +31,14 @@ describe('AppController (e2e)', () => {
}).compile();
controller = moduleFixture.get<AppController>(AppController);
app = moduleFixture.createNestApplication();
mockDiscardHdlr = jest.fn();
app.connectMicroservice<MicroserviceOptions>({
strategy: new ServerNsq({
lookupdHTTPAddresses: lookupdHttpAddrs,
lookupdPollInterval: 1,
maxAttempts: 1,
discardHandler: mockDiscardHdlr,
requeueDelay: 200,
}),
});

Expand All @@ -55,6 +60,19 @@ describe('AppController (e2e)', () => {
expect(onEventPatternCall).toBeDefined();
expect(onEventPatternCall.payload).toEqual({ eventId, foo: 'bar' });
});
it('should be able to catch and re-queue', async () => {
const topic = 'topic01';
await nsqd.createTopic('topic01'); // otherwise it fails on the 1st time as topic does not exist
await setTimeout(1500); // wait 1.5s for consumer polls the topic
const eventId = uuid() + 'thrown';
await nsqd.publish(topic, { eventId, foo: 'bar' });
let discardCalled = mockDiscardHdlr.mock.calls.length;
while (discardCalled === 0) {
await setTimeout(500);
discardCalled = mockDiscardHdlr.mock.calls.length;
}
expect(discardCalled).toBe(1);
}, 10000);

it('should be able to dispatch event', async () => {
await request(app.getHttpServer())
Expand Down
12 changes: 10 additions & 2 deletions test/test-app/app.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@ import {
Body,
HttpCode,
} from '@nestjs/common';
import { EventPattern, Ctx, Payload } from '@nestjs/microservices';
import { Observable } from 'rxjs';
import {
EventPattern,
Ctx,
Payload,
RpcException,
} from '@nestjs/microservices';
import { Observable, throwError } from 'rxjs';

import { NsqContext, ClientNsq } from '../../src';

Expand Down Expand Up @@ -42,6 +47,9 @@ export class AppController {
channel: 'channel01',
})
onEventPattern(@Payload() payload: any, @Ctx() context: NsqContext): string {
if (payload.eventId.endsWith('thrown')) {
throw new RpcException('a thrown error for test');
}
if (payload.eventId) {
this.onEventPatternCalls.set(payload.eventId, {
payload: payload,
Expand Down

0 comments on commit 47b78f7

Please sign in to comment.