From 133439bb49ca54b4dab4d0a7299f80bfceb3486c Mon Sep 17 00:00:00 2001 From: Ivan Li Date: Sun, 20 Jun 2021 00:05:24 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BB=BB=E5=8A=A1=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E5=90=8E=E6=9B=B4=E6=96=B0=E6=95=B0=E6=8D=AE=E5=BA=93=E4=B8=AD?= =?UTF-8?q?=E7=9A=84=E6=95=B0=E6=8D=AE=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pipeline-task-flush.service.spec.ts | 1 + .../pipeline-task-flush.service.ts | 14 ++++++---- .../pipeline-tasks.service.spec.ts | 23 +++++++-------- src/pipeline-tasks/pipeline-tasks.service.ts | 28 ++++++++++--------- 4 files changed, 35 insertions(+), 31 deletions(-) diff --git a/src/pipeline-tasks/pipeline-task-flush.service.spec.ts b/src/pipeline-tasks/pipeline-task-flush.service.spec.ts index fbaed53..fa2a26f 100644 --- a/src/pipeline-tasks/pipeline-task-flush.service.spec.ts +++ b/src/pipeline-tasks/pipeline-task-flush.service.spec.ts @@ -18,6 +18,7 @@ describe('PipelineTaskFlushService', () => { const redisClient = { rpush: jest.fn(() => Promise.resolve()), lrange: jest.fn(() => Promise.resolve()), + expire: jest.fn(() => Promise.resolve()), }; const module: TestingModule = await Test.createTestingModule({ providers: [ diff --git a/src/pipeline-tasks/pipeline-task-flush.service.ts b/src/pipeline-tasks/pipeline-task-flush.service.ts index 18a0e6d..22bddac 100644 --- a/src/pipeline-tasks/pipeline-task-flush.service.ts +++ b/src/pipeline-tasks/pipeline-task-flush.service.ts @@ -37,11 +37,15 @@ export class PipelineTaskFlushService { await client.rpush(this.getKey(message.taskId), JSON.stringify(message)); await client.expire(this.getKey(message.taskId), 600); // ten minutes if (isNil(message.unit) && terminalTaskStatuses.includes(message.status)) { - this.amqpConnection.request({ - exchange: EXCHANGE_PIPELINE_TASK_TOPIC, - routingKey: ROUTE_PIPELINE_TASK_DONE, - payload: { taskId: message.taskId, status: message.status }, - }); + try { + await this.amqpConnection.request({ + exchange: EXCHANGE_PIPELINE_TASK_TOPIC, + routingKey: ROUTE_PIPELINE_TASK_DONE, + payload: { taskId: message.taskId, status: message.status }, + }); + } catch (error) { + console.log(error); + } } } diff --git a/src/pipeline-tasks/pipeline-tasks.service.spec.ts b/src/pipeline-tasks/pipeline-tasks.service.spec.ts index a391d66..2113ad3 100644 --- a/src/pipeline-tasks/pipeline-tasks.service.spec.ts +++ b/src/pipeline-tasks/pipeline-tasks.service.spec.ts @@ -4,26 +4,15 @@ import { getRepositoryToken } from '@nestjs/typeorm'; import { PipelineTask } from './pipeline-task.entity'; import { Pipeline } from '../pipelines/pipeline.entity'; import { Repository } from 'typeorm'; -import { Queue } from 'bull'; import { AmqpConnection } from '@golevelup/nestjs-rabbitmq'; +import { PipelineTaskFlushService } from './pipeline-task-flush.service'; +import { getLoggerToken, PinoLogger } from 'nestjs-pino'; describe('PipelineTasksService', () => { let service: PipelineTasksService; let module: TestingModule; let taskRepository: Repository; let pipelineRepository: Repository; - const getBasePipeline = () => - ({ - id: 'test', - name: '测试流水线', - branch: 'master', - workUnitMetadata: {}, - project: { - id: 'test-project', - }, - } as Pipeline); - let redisClient; - let taskQueue: Queue; beforeEach(async () => { module = await Test.createTestingModule({ @@ -41,6 +30,14 @@ describe('PipelineTasksService', () => { provide: AmqpConnection, useValue: {}, }, + { + provide: PipelineTaskFlushService, + useValue: {}, + }, + { + provide: getLoggerToken(PipelineTasksService.name), + useValue: new PinoLogger({}), + }, ], }).compile(); diff --git a/src/pipeline-tasks/pipeline-tasks.service.ts b/src/pipeline-tasks/pipeline-tasks.service.ts index fdc1aa2..a729547 100644 --- a/src/pipeline-tasks/pipeline-tasks.service.ts +++ b/src/pipeline-tasks/pipeline-tasks.service.ts @@ -5,13 +5,10 @@ import { Repository } from 'typeorm'; import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input'; import { Pipeline } from '../pipelines/pipeline.entity'; import debug from 'debug'; -import { - AmqpConnection, - RabbitRPC, - RabbitSubscribe, -} from '@golevelup/nestjs-rabbitmq'; +import { AmqpConnection, RabbitRPC } from '@golevelup/nestjs-rabbitmq'; import { EXCHANGE_PIPELINE_TASK_FANOUT, + EXCHANGE_PIPELINE_TASK_TOPIC, QUEUE_PIPELINE_TASK_DONE, ROUTE_PIPELINE_TASK_DONE, } from './pipeline-tasks.constants'; @@ -79,7 +76,7 @@ export class PipelineTasksService { } @RabbitRPC({ - exchange: EXCHANGE_PIPELINE_TASK_FANOUT, + exchange: EXCHANGE_PIPELINE_TASK_TOPIC, routingKey: ROUTE_PIPELINE_TASK_DONE, queue: QUEUE_PIPELINE_TASK_DONE, queueOptions: { @@ -87,13 +84,13 @@ export class PipelineTasksService { durable: true, }, }) - async updateByEvent({ id }: { id: string }) { + async updateByEvent({ taskId }: { taskId: string }) { try { const [events, task] = await Promise.all([ - this.eventFlushService.read(id), - this.findTaskById(id), + this.eventFlushService.read(taskId), + this.findTaskById(taskId), ]); - this.logger.info('[updateByEvent] start. taskId: %s', id); + this.logger.info('[updateByEvent] start. taskId: %s', taskId); for (const event of events) { if (isNil(event.unit)) { @@ -130,10 +127,15 @@ export class PipelineTasksService { l.status = event.status; } } - await this.repository.update({ id }, task); - this.logger.info('[updateByEvent] success. taskId: %s', id); + await this.repository.update({ id: taskId }, task); + return task; + this.logger.info('[updateByEvent] success. taskId: %s', taskId); } catch (error) { - this.logger.error({ error }, '[updateByEvent] failed. taskId: %s', id); + this.logger.error( + { error }, + '[updateByEvent] failed. taskId: %s', + taskId, + ); } } }