From 31a200206f22071374ebda4759a1e5c132a548fe Mon Sep 17 00:00:00 2001 From: Ivan Date: Tue, 2 Mar 2021 16:28:37 +0800 Subject: [PATCH] =?UTF-8?q?feat(pipelines):=20=E6=B7=BB=E5=8A=A0=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/pipeline-tasks/pipeline-task.consumer.ts | 16 ++++++++++++++++ .../pipeline-tasks.service.spec.ts | 17 +++++++++++++++++ src/pipeline-tasks/pipeline-tasks.service.ts | 11 +++++------ 3 files changed, 38 insertions(+), 6 deletions(-) create mode 100644 src/pipeline-tasks/pipeline-task.consumer.ts diff --git a/src/pipeline-tasks/pipeline-task.consumer.ts b/src/pipeline-tasks/pipeline-task.consumer.ts new file mode 100644 index 0000000..2315cb4 --- /dev/null +++ b/src/pipeline-tasks/pipeline-task.consumer.ts @@ -0,0 +1,16 @@ +import { OnQueueCompleted, Process, Processor } from '@nestjs/bull'; +import { Job } from 'bull'; +import { PipelineTask } from './pipeline-task.entity'; +import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants'; +import { PipelineTasksService } from './pipeline-tasks.service'; +@Processor(PIPELINE_TASK_QUEUE) +export class PipelineTaskConsumer { + constructor(private readonly service: PipelineTasksService) {} + @Process() + async doTask() {} + + @OnQueueCompleted() + onCompleted(job: Job) { + this.service.doNextTask(job.data.pipeline); + } +} diff --git a/src/pipeline-tasks/pipeline-tasks.service.spec.ts b/src/pipeline-tasks/pipeline-tasks.service.spec.ts index 01b8202..a53c2c6 100644 --- a/src/pipeline-tasks/pipeline-tasks.service.spec.ts +++ b/src/pipeline-tasks/pipeline-tasks.service.spec.ts @@ -2,6 +2,10 @@ import { Test, TestingModule } from '@nestjs/testing'; import { PipelineTasksService } from './pipeline-tasks.service'; import { getRepositoryToken } from '@nestjs/typeorm'; import { PipelineTask } from './pipeline-task.entity'; +import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants'; +import { getQueueToken } from '@nestjs/bull'; +import { RedisService } from 'nestjs-redis'; +import { Pipeline } from '../pipelines/pipeline.entity'; describe('PipelineTasksService', () => { let service: PipelineTasksService; @@ -14,6 +18,19 @@ describe('PipelineTasksService', () => { provide: getRepositoryToken(PipelineTask), useValue: {}, }, + PipelineTasksService, + { + provide: getRepositoryToken(Pipeline), + useValue: {}, + }, + { + provide: getQueueToken(PIPELINE_TASK_QUEUE), + useValue: {}, + }, + { + provide: RedisService, + useValue: {}, + }, ], }).compile(); diff --git a/src/pipeline-tasks/pipeline-tasks.service.ts b/src/pipeline-tasks/pipeline-tasks.service.ts index 05b97ec..a619998 100644 --- a/src/pipeline-tasks/pipeline-tasks.service.ts +++ b/src/pipeline-tasks/pipeline-tasks.service.ts @@ -44,13 +44,12 @@ export class PipelineTasksService { } } - async doTask(task: PipelineTask) { - const tasksKey = this.getRedisTokens(task.pipeline)[1]; - + async doNextTask(pipeline: Pipeline) { + const tasksKey = this.getRedisTokens(pipeline)[1]; const redis = this.redis.getClient(); - const nextTask = await redis.rpop(tasksKey); - if (nextTask) { - this.doTask(task).then(); + const task = JSON.parse((await redis.rpop(tasksKey)) ?? 'null'); + if (task) { + this.queue.add(task); } }