From 7091f9df6adbcdbb4f8bb8025e5ef20cf70577a7 Mon Sep 17 00:00:00 2001 From: Ivan Li Date: Sat, 5 Jun 2021 19:11:39 +0800 Subject: [PATCH] feat(pipeline-task): logger. --- .../pipeline-task.logger.spec.ts | 69 +++++++++++++++++++ src/pipeline-tasks/pipeline-task.logger.ts | 35 ++++++++++ .../pipeline-task.runner.spec.ts | 5 ++ src/pipeline-tasks/pipeline-task.runner.ts | 16 ++++- .../pipeline-tasks.constants.ts | 4 ++ src/pipeline-tasks/pipeline-tasks.module.ts | 12 ++++ 6 files changed, 140 insertions(+), 1 deletion(-) create mode 100644 src/pipeline-tasks/pipeline-task.logger.spec.ts create mode 100644 src/pipeline-tasks/pipeline-task.logger.ts create mode 100644 src/pipeline-tasks/pipeline-tasks.constants.ts diff --git a/src/pipeline-tasks/pipeline-task.logger.spec.ts b/src/pipeline-tasks/pipeline-task.logger.spec.ts new file mode 100644 index 0000000..7a2da27 --- /dev/null +++ b/src/pipeline-tasks/pipeline-task.logger.spec.ts @@ -0,0 +1,69 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { PipelineTaskLogger } from './pipeline-task.logger'; +import { PipelineTaskEvent } from './models/pipeline-task-event'; +import { take, timeout } from 'rxjs/operators'; + +describe('PipelineTaskRunner', () => { + let logger: PipelineTaskLogger; + let module: TestingModule; + + beforeEach(async () => { + module = await Test.createTestingModule({ + providers: [PipelineTaskLogger], + }).compile(); + + logger = module.get(PipelineTaskLogger); + }); + + it('should be defined', () => { + expect(logger).toBeDefined(); + }); + + describe('getMessage$', () => { + it('normal', async () => { + const event = new PipelineTaskEvent(); + event.taskId = 'test'; + const message$ = logger.getMessage$('test'); + + let receiveEvent; + message$.pipe(take(1)).subscribe((value) => (receiveEvent = value)); + await logger.handleEvent(event); + expect(receiveEvent).toEqual(event); + }); + it('no match', async () => { + const event = new PipelineTaskEvent(); + event.taskId = 'test'; + const message$ = logger.getMessage$('other'); + setTimeout(() => { + logger.handleEvent(event); + }); + expect(message$.pipe(take(1), timeout(100)).toPromise()).rejects.toMatch( + 'timeout', + ); + }); + it('multiple subscribers', async () => { + const event = new PipelineTaskEvent(); + event.taskId = 'test'; + const message$ = logger.getMessage$('test'); + const message2$ = logger.getMessage$('test'); + setTimeout(() => { + logger.handleEvent(event); + }); + expect(message$.pipe(take(1), timeout(100)).toPromise()).resolves.toEqual( + event, + ); + expect( + message2$.pipe(take(1), timeout(100)).toPromise(), + ).resolves.toEqual(event); + }); + }); + + describe('onModuleDestroy', () => { + it('complete observable when destroying module', async () => { + logger.onModuleDestroy(); + await expect( + (logger as any).message$.toPromise(), + ).resolves.toBeUndefined(); + }); + }); +}); diff --git a/src/pipeline-tasks/pipeline-task.logger.ts b/src/pipeline-tasks/pipeline-task.logger.ts new file mode 100644 index 0000000..e729450 --- /dev/null +++ b/src/pipeline-tasks/pipeline-task.logger.ts @@ -0,0 +1,35 @@ +import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'; +import { Injectable, OnModuleDestroy } from '@nestjs/common'; +import { Observable, Subject } from 'rxjs'; +import { filter, publish, tap } from 'rxjs/operators'; +import { PipelineTaskEvent } from './models/pipeline-task-event'; +import { + EXCHANGE_PIPELINE_TASK_FANOUT, + QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT, + ROUTE_PIPELINE_TASK_LOG, +} from './pipeline-tasks.constants'; + +@Injectable() +export class PipelineTaskLogger implements OnModuleDestroy { + private readonly messageSubject = new Subject(); + private readonly message$: Observable = this.messageSubject.pipe(); + @RabbitSubscribe({ + exchange: EXCHANGE_PIPELINE_TASK_FANOUT, + routingKey: ROUTE_PIPELINE_TASK_LOG, + queue: QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT, + }) + async handleEvent(message: PipelineTaskEvent) { + this.messageSubject.next(message); + } + + getMessage$(taskId: string) { + return this.message$.pipe( + tap((val) => console.log(val)), + filter((event) => event.taskId === taskId), + ); + } + + onModuleDestroy() { + this.messageSubject.complete(); + } +} diff --git a/src/pipeline-tasks/pipeline-task.runner.spec.ts b/src/pipeline-tasks/pipeline-task.runner.spec.ts index 83fde2f..4fafc1a 100644 --- a/src/pipeline-tasks/pipeline-task.runner.spec.ts +++ b/src/pipeline-tasks/pipeline-task.runner.spec.ts @@ -8,6 +8,7 @@ import { TaskStatuses } from './enums/task-statuses.enum'; import { getLoggerToken, PinoLogger } from 'nestjs-pino'; import { PipelineTaskRunner } from './pipeline-task.runner'; import { WorkUnitMetadata } from './models/work-unit-metadata.model'; +import { AmqpConnection } from '@golevelup/nestjs-rabbitmq'; describe('PipelineTaskRunner', () => { let runner: PipelineTaskRunner; let reposService: ReposService; @@ -31,6 +32,10 @@ describe('PipelineTaskRunner', () => { useValue: () => undefined, }, PipelineTaskRunner, + { + provide: AmqpConnection, + useValue: {}, + }, ], }).compile(); diff --git a/src/pipeline-tasks/pipeline-task.runner.ts b/src/pipeline-tasks/pipeline-task.runner.ts index 2db8e58..571ffa6 100644 --- a/src/pipeline-tasks/pipeline-task.runner.ts +++ b/src/pipeline-tasks/pipeline-task.runner.ts @@ -5,10 +5,14 @@ import { ApplicationException } from '../commons/exceptions/application.exceptio import { PipelineUnits } from './enums/pipeline-units.enum'; import { TaskStatuses } from './enums/task-statuses.enum'; import { InjectPinoLogger, PinoLogger } from 'nestjs-pino'; -import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'; +import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'; import { PipelineTaskEvent } from './models/pipeline-task-event'; import { last } from 'ramda'; import { Inject } from '@nestjs/common'; +import { + EXCHANGE_PIPELINE_TASK_FANOUT, + ROUTE_PIPELINE_TASK_LOG, +} from './pipeline-tasks.constants'; type Spawn = typeof spawn; @@ -21,6 +25,7 @@ export class PipelineTaskRunner { private readonly logger: PinoLogger, @Inject('spawn') private readonly spawn: Spawn, + private readonly amqpConnection: AmqpConnection, ) {} @RabbitSubscribe({ exchange: 'new-pipeline-task', @@ -201,6 +206,15 @@ export class PipelineTaskRunner { messageType, status, }; + this.amqpConnection + .publish(EXCHANGE_PIPELINE_TASK_FANOUT, ROUTE_PIPELINE_TASK_LOG, event) + .catch((error) => { + this.logger.error( + { error, event }, + 'send event message to queue failed. %s', + error.message, + ); + }); } async runScript( diff --git a/src/pipeline-tasks/pipeline-tasks.constants.ts b/src/pipeline-tasks/pipeline-tasks.constants.ts new file mode 100644 index 0000000..dbd92fa --- /dev/null +++ b/src/pipeline-tasks/pipeline-tasks.constants.ts @@ -0,0 +1,4 @@ +export const EXCHANGE_PIPELINE_TASK_TOPIC = 'pipeline-task.topic'; +export const EXCHANGE_PIPELINE_TASK_FANOUT = 'pipeline-task.fanout'; +export const ROUTE_PIPELINE_TASK_LOG = 'pipeline-task-log'; +export const QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT = 'pipeline-task-log'; diff --git a/src/pipeline-tasks/pipeline-tasks.module.ts b/src/pipeline-tasks/pipeline-tasks.module.ts index be5d0b9..1067c27 100644 --- a/src/pipeline-tasks/pipeline-tasks.module.ts +++ b/src/pipeline-tasks/pipeline-tasks.module.ts @@ -10,6 +10,7 @@ import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq'; import { ConfigModule, ConfigService } from '@nestjs/config'; import { PipelineTaskRunner } from './pipeline-task.runner'; import { spawn } from 'child_process'; +import { EXCHANGE_PIPELINE_TASK_FANOUT } from './pipeline-tasks.constants'; @Module({ imports: [ @@ -27,6 +28,7 @@ import { spawn } from 'child_process'; type: 'fanout', options: { durable: true, + autoDelete: true, }, }, { @@ -34,6 +36,7 @@ import { spawn } from 'child_process'; type: 'fanout', options: { durable: true, + autoDelete: true, }, }, { @@ -41,6 +44,15 @@ import { spawn } from 'child_process'; type: 'fanout', options: { durable: false, + autoDelete: true, + }, + }, + { + name: EXCHANGE_PIPELINE_TASK_FANOUT, + type: 'fanout', + options: { + durable: false, + autoDelete: true, }, }, ],