feat(pipeline-task): logger.

This commit is contained in:
Ivan Li 2021-06-05 19:11:39 +08:00
parent 3ee41ece67
commit 7091f9df6a
6 changed files with 140 additions and 1 deletions

View File

@ -0,0 +1,69 @@
import { Test, TestingModule } from '@nestjs/testing';
import { PipelineTaskLogger } from './pipeline-task.logger';
import { PipelineTaskEvent } from './models/pipeline-task-event';
import { take, timeout } from 'rxjs/operators';
describe('PipelineTaskRunner', () => {
let logger: PipelineTaskLogger;
let module: TestingModule;
beforeEach(async () => {
module = await Test.createTestingModule({
providers: [PipelineTaskLogger],
}).compile();
logger = module.get(PipelineTaskLogger);
});
it('should be defined', () => {
expect(logger).toBeDefined();
});
describe('getMessage$', () => {
it('normal', async () => {
const event = new PipelineTaskEvent();
event.taskId = 'test';
const message$ = logger.getMessage$('test');
let receiveEvent;
message$.pipe(take(1)).subscribe((value) => (receiveEvent = value));
await logger.handleEvent(event);
expect(receiveEvent).toEqual(event);
});
it('no match', async () => {
const event = new PipelineTaskEvent();
event.taskId = 'test';
const message$ = logger.getMessage$('other');
setTimeout(() => {
logger.handleEvent(event);
});
expect(message$.pipe(take(1), timeout(100)).toPromise()).rejects.toMatch(
'timeout',
);
});
it('multiple subscribers', async () => {
const event = new PipelineTaskEvent();
event.taskId = 'test';
const message$ = logger.getMessage$('test');
const message2$ = logger.getMessage$('test');
setTimeout(() => {
logger.handleEvent(event);
});
expect(message$.pipe(take(1), timeout(100)).toPromise()).resolves.toEqual(
event,
);
expect(
message2$.pipe(take(1), timeout(100)).toPromise(),
).resolves.toEqual(event);
});
});
describe('onModuleDestroy', () => {
it('complete observable when destroying module', async () => {
logger.onModuleDestroy();
await expect(
(logger as any).message$.toPromise(),
).resolves.toBeUndefined();
});
});
});

View File

@ -0,0 +1,35 @@
import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
import { Injectable, OnModuleDestroy } from '@nestjs/common';
import { Observable, Subject } from 'rxjs';
import { filter, publish, tap } from 'rxjs/operators';
import { PipelineTaskEvent } from './models/pipeline-task-event';
import {
EXCHANGE_PIPELINE_TASK_FANOUT,
QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT,
ROUTE_PIPELINE_TASK_LOG,
} from './pipeline-tasks.constants';
@Injectable()
export class PipelineTaskLogger implements OnModuleDestroy {
private readonly messageSubject = new Subject<PipelineTaskEvent>();
private readonly message$: Observable<PipelineTaskEvent> = this.messageSubject.pipe();
@RabbitSubscribe({
exchange: EXCHANGE_PIPELINE_TASK_FANOUT,
routingKey: ROUTE_PIPELINE_TASK_LOG,
queue: QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT,
})
async handleEvent(message: PipelineTaskEvent) {
this.messageSubject.next(message);
}
getMessage$(taskId: string) {
return this.message$.pipe(
tap((val) => console.log(val)),
filter((event) => event.taskId === taskId),
);
}
onModuleDestroy() {
this.messageSubject.complete();
}
}

View File

@ -8,6 +8,7 @@ import { TaskStatuses } from './enums/task-statuses.enum';
import { getLoggerToken, PinoLogger } from 'nestjs-pino';
import { PipelineTaskRunner } from './pipeline-task.runner';
import { WorkUnitMetadata } from './models/work-unit-metadata.model';
import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
describe('PipelineTaskRunner', () => {
let runner: PipelineTaskRunner;
let reposService: ReposService;
@ -31,6 +32,10 @@ describe('PipelineTaskRunner', () => {
useValue: () => undefined,
},
PipelineTaskRunner,
{
provide: AmqpConnection,
useValue: {},
},
],
}).compile();

View File

@ -5,10 +5,14 @@ import { ApplicationException } from '../commons/exceptions/application.exceptio
import { PipelineUnits } from './enums/pipeline-units.enum';
import { TaskStatuses } from './enums/task-statuses.enum';
import { InjectPinoLogger, PinoLogger } from 'nestjs-pino';
import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
import { PipelineTaskEvent } from './models/pipeline-task-event';
import { last } from 'ramda';
import { Inject } from '@nestjs/common';
import {
EXCHANGE_PIPELINE_TASK_FANOUT,
ROUTE_PIPELINE_TASK_LOG,
} from './pipeline-tasks.constants';
type Spawn = typeof spawn;
@ -21,6 +25,7 @@ export class PipelineTaskRunner {
private readonly logger: PinoLogger,
@Inject('spawn')
private readonly spawn: Spawn,
private readonly amqpConnection: AmqpConnection,
) {}
@RabbitSubscribe({
exchange: 'new-pipeline-task',
@ -201,6 +206,15 @@ export class PipelineTaskRunner {
messageType,
status,
};
this.amqpConnection
.publish(EXCHANGE_PIPELINE_TASK_FANOUT, ROUTE_PIPELINE_TASK_LOG, event)
.catch((error) => {
this.logger.error(
{ error, event },
'send event message to queue failed. %s',
error.message,
);
});
}
async runScript(

View File

@ -0,0 +1,4 @@
export const EXCHANGE_PIPELINE_TASK_TOPIC = 'pipeline-task.topic';
export const EXCHANGE_PIPELINE_TASK_FANOUT = 'pipeline-task.fanout';
export const ROUTE_PIPELINE_TASK_LOG = 'pipeline-task-log';
export const QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT = 'pipeline-task-log';

View File

@ -10,6 +10,7 @@ import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { PipelineTaskRunner } from './pipeline-task.runner';
import { spawn } from 'child_process';
import { EXCHANGE_PIPELINE_TASK_FANOUT } from './pipeline-tasks.constants';
@Module({
imports: [
@ -27,6 +28,7 @@ import { spawn } from 'child_process';
type: 'fanout',
options: {
durable: true,
autoDelete: true,
},
},
{
@ -34,6 +36,7 @@ import { spawn } from 'child_process';
type: 'fanout',
options: {
durable: true,
autoDelete: true,
},
},
{
@ -41,6 +44,15 @@ import { spawn } from 'child_process';
type: 'fanout',
options: {
durable: false,
autoDelete: true,
},
},
{
name: EXCHANGE_PIPELINE_TASK_FANOUT,
type: 'fanout',
options: {
durable: false,
autoDelete: true,
},
},
],