diff --git a/src/pipeline-tasks/enums/task-statuses.enum.ts b/src/pipeline-tasks/enums/task-statuses.enum.ts index be408ae..b4cc6b3 100644 --- a/src/pipeline-tasks/enums/task-statuses.enum.ts +++ b/src/pipeline-tasks/enums/task-statuses.enum.ts @@ -11,3 +11,5 @@ registerEnumType(TaskStatuses, { name: 'TaskStatuses', description: '任务状态', }); + +export const terminalTaskStatuses = [TaskStatuses.success, TaskStatuses.failed]; diff --git a/src/pipeline-tasks/pipeline-task-flush.service.spec.ts b/src/pipeline-tasks/pipeline-task-flush.service.spec.ts new file mode 100644 index 0000000..fbaed53 --- /dev/null +++ b/src/pipeline-tasks/pipeline-task-flush.service.spec.ts @@ -0,0 +1,80 @@ +import { AmqpConnection } from '@golevelup/nestjs-rabbitmq'; +import { Test, TestingModule } from '@nestjs/testing'; +import { RedisService } from 'nestjs-redis'; +import { PipelineTaskFlushService } from './pipeline-task-flush.service'; +import { PipelineTaskEvent } from './models/pipeline-task-event'; +import { TaskStatuses } from './enums/task-statuses.enum'; +import { + EXCHANGE_PIPELINE_TASK_TOPIC, + ROUTE_PIPELINE_TASK_DONE, +} from './pipeline-tasks.constants'; + +describe('PipelineTaskFlushService', () => { + let service: PipelineTaskFlushService; + let redisService: RedisService; + let amqpConnection: AmqpConnection; + + beforeEach(async () => { + const redisClient = { + rpush: jest.fn(() => Promise.resolve()), + lrange: jest.fn(() => Promise.resolve()), + }; + const module: TestingModule = await Test.createTestingModule({ + providers: [ + PipelineTaskFlushService, + { + provide: RedisService, + useValue: { + getClient() { + return redisClient; + }, + }, + }, + { + provide: AmqpConnection, + useValue: { + request: jest.fn(() => Promise.resolve()), + }, + }, + ], + }).compile(); + + service = module.get(PipelineTaskFlushService); + redisService = module.get(RedisService); + amqpConnection = module.get(AmqpConnection); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); + + describe('write', () => { + it('normal', async () => { + const testEvent = new PipelineTaskEvent(); + testEvent.taskId = 'test'; + testEvent.status = TaskStatuses.working; + const rpush = jest.spyOn(redisService.getClient(), 'rpush'); + const request = jest.spyOn(amqpConnection, 'request'); + await service.write(testEvent); + expect(rpush).toBeCalledTimes(1); + expect(rpush.mock.calls[0][0]).toEqual('p-task:log:test'); + expect(rpush.mock.calls[0][1]).toEqual(JSON.stringify(testEvent)); + expect(request).toBeCalledTimes(0); + }); + it('event for which task done', async () => { + const testEvent = new PipelineTaskEvent(); + testEvent.taskId = 'test'; + testEvent.status = TaskStatuses.success; + const rpush = jest.spyOn(redisService.getClient(), 'rpush'); + const request = jest.spyOn(amqpConnection, 'request'); + await service.write(testEvent); + expect(rpush).toBeCalledTimes(1); + expect(request).toBeCalledTimes(1); + expect(request.mock.calls[0][0]).toMatchObject({ + exchange: EXCHANGE_PIPELINE_TASK_TOPIC, + routingKey: ROUTE_PIPELINE_TASK_DONE, + payload: { taskId: 'test', status: TaskStatuses.success }, + }); + }); + }); +}); diff --git a/src/pipeline-tasks/pipeline-task-flush.service.ts b/src/pipeline-tasks/pipeline-task-flush.service.ts new file mode 100644 index 0000000..b06675f --- /dev/null +++ b/src/pipeline-tasks/pipeline-task-flush.service.ts @@ -0,0 +1,57 @@ +import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'; +import { Injectable } from '@nestjs/common'; +import { deserialize } from 'class-transformer'; +import { RedisService } from 'nestjs-redis'; +import { isNil } from 'ramda'; +import { getSelfInstanceQueueKey } from '../commons/utils/rabbit-mq'; +import { terminalTaskStatuses } from './enums/task-statuses.enum'; +import { PipelineTaskEvent } from './models/pipeline-task-event'; +import { + EXCHANGE_PIPELINE_TASK_TOPIC, + ROUTE_PIPELINE_TASK_DONE, +} from './pipeline-tasks.constants'; +import { + EXCHANGE_PIPELINE_TASK_FANOUT, + ROUTE_PIPELINE_TASK_LOG, + QUEUE_WRITE_PIPELINE_TASK_LOG, +} from './pipeline-tasks.constants'; + +@Injectable() +export class PipelineTaskFlushService { + constructor( + private readonly redisService: RedisService, + private readonly amqpConnection: AmqpConnection, + ) {} + + @RabbitSubscribe({ + exchange: EXCHANGE_PIPELINE_TASK_FANOUT, + routingKey: ROUTE_PIPELINE_TASK_LOG, + queue: getSelfInstanceQueueKey(QUEUE_WRITE_PIPELINE_TASK_LOG), + queueOptions: { + autoDelete: true, + }, + }) + async write(message: PipelineTaskEvent) { + await this.redisService + .getClient() + .rpush(this.getKey(message.taskId), JSON.stringify(message)); + if (isNil(message.unit) && terminalTaskStatuses.includes(message.status)) { + this.amqpConnection.request({ + exchange: EXCHANGE_PIPELINE_TASK_TOPIC, + routingKey: ROUTE_PIPELINE_TASK_DONE, + payload: { taskId: message.taskId, status: message.status }, + }); + } + } + + async read(taskId: string) { + const raw = await this.redisService + .getClient() + .lrange(this.getKey(taskId), 0, -1); + return raw.map((it) => deserialize(PipelineTaskEvent, it)); + } + + private getKey(taskId: string) { + return `p-task:log:${taskId}`; + } +} diff --git a/src/pipeline-tasks/pipeline-task.logger.spec.ts b/src/pipeline-tasks/pipeline-task.logger.spec.ts index 0da2696..90e2528 100644 --- a/src/pipeline-tasks/pipeline-task.logger.spec.ts +++ b/src/pipeline-tasks/pipeline-task.logger.spec.ts @@ -23,13 +23,17 @@ describe('PipelineTaskRunner', () => { it('normal', async () => { const event = new PipelineTaskEvent(); event.taskId = 'test'; - event.emittedAt = new Date().toISOString() as any; + const emittedAt = new Date(); + event.emittedAt = emittedAt.toISOString() as any; const message$ = logger.getMessage$('test'); let receiveEvent; message$.pipe(take(1)).subscribe((value) => (receiveEvent = value)); await logger.handleEvent(event); - expect(receiveEvent).toEqual(event); + expect(receiveEvent).toMatchObject({ + ...event, + emittedAt, + }); }); it('no match', async () => { const event = new PipelineTaskEvent(); diff --git a/src/pipeline-tasks/pipeline-task.logger.ts b/src/pipeline-tasks/pipeline-task.logger.ts index 096b6c7..9b19a91 100644 --- a/src/pipeline-tasks/pipeline-task.logger.ts +++ b/src/pipeline-tasks/pipeline-task.logger.ts @@ -2,7 +2,7 @@ import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'; import { Injectable, OnModuleDestroy } from '@nestjs/common'; import { plainToClass } from 'class-transformer'; import { Observable, Subject } from 'rxjs'; -import { filter, publish, tap } from 'rxjs/operators'; +import { filter } from 'rxjs/operators'; import { PipelineTaskEvent } from './models/pipeline-task-event'; import { EXCHANGE_PIPELINE_TASK_FANOUT, @@ -14,20 +14,21 @@ import { export class PipelineTaskLogger implements OnModuleDestroy { private readonly messageSubject = new Subject(); private readonly message$: Observable = this.messageSubject.pipe(); + @RabbitSubscribe({ exchange: EXCHANGE_PIPELINE_TASK_FANOUT, routingKey: ROUTE_PIPELINE_TASK_LOG, queue: QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT, + queueOptions: { + autoDelete: true, + }, }) async handleEvent(message: PipelineTaskEvent) { this.messageSubject.next(plainToClass(PipelineTaskEvent, message)); } getMessage$(taskId: string) { - return this.message$.pipe( - tap((val) => console.log(val)), - filter((event) => event.taskId === taskId), - ); + return this.message$.pipe(filter((event) => event.taskId === taskId)); } onModuleDestroy() { diff --git a/src/pipeline-tasks/pipeline-tasks.constants.ts b/src/pipeline-tasks/pipeline-tasks.constants.ts index dbd92fa..49101c3 100644 --- a/src/pipeline-tasks/pipeline-tasks.constants.ts +++ b/src/pipeline-tasks/pipeline-tasks.constants.ts @@ -2,3 +2,5 @@ export const EXCHANGE_PIPELINE_TASK_TOPIC = 'pipeline-task.topic'; export const EXCHANGE_PIPELINE_TASK_FANOUT = 'pipeline-task.fanout'; export const ROUTE_PIPELINE_TASK_LOG = 'pipeline-task-log'; export const QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT = 'pipeline-task-log'; +export const QUEUE_WRITE_PIPELINE_TASK_LOG = 'write-pipeline-task-log'; +export const ROUTE_PIPELINE_TASK_DONE = 'pipeline-task-done'; diff --git a/src/pipeline-tasks/pipeline-tasks.module.ts b/src/pipeline-tasks/pipeline-tasks.module.ts index 428c72d..fa175eb 100644 --- a/src/pipeline-tasks/pipeline-tasks.module.ts +++ b/src/pipeline-tasks/pipeline-tasks.module.ts @@ -10,8 +10,12 @@ import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq'; import { ConfigModule, ConfigService } from '@nestjs/config'; import { PipelineTaskRunner } from './pipeline-task.runner'; import { spawn } from 'child_process'; -import { EXCHANGE_PIPELINE_TASK_FANOUT } from './pipeline-tasks.constants'; +import { + EXCHANGE_PIPELINE_TASK_FANOUT, + EXCHANGE_PIPELINE_TASK_TOPIC, +} from './pipeline-tasks.constants'; import { PipelineTaskLogger } from './pipeline-task.logger'; +import { PipelineTaskFlushService } from './pipeline-task-flush.service'; @Module({ imports: [ @@ -56,6 +60,14 @@ import { PipelineTaskLogger } from './pipeline-task.logger'; autoDelete: true, }, }, + { + name: EXCHANGE_PIPELINE_TASK_TOPIC, + type: 'topic', + options: { + durable: false, + autoDelete: true, + }, + }, ], }), inject: [ConfigService], @@ -70,6 +82,7 @@ import { PipelineTaskLogger } from './pipeline-task.logger'; provide: 'spawn', useValue: spawn, }, + PipelineTaskFlushService, ], exports: [PipelineTasksService], })