From 9bdd991cfbdc43bf0d79358ee747447ce51ef034 Mon Sep 17 00:00:00 2001 From: Ivan Li Date: Wed, 24 Mar 2021 20:35:24 +0800 Subject: [PATCH] =?UTF-8?q?feat(pipeline-tasks):=20=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E6=9B=B4=E6=96=B0=E6=8E=A8=E9=80=81=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/pipeline-tasks/pipeline-tasks.resolver.ts | 9 +++++++++ src/pipeline-tasks/pipeline-tasks.service.ts | 7 +++++++ 2 files changed, 16 insertions(+) diff --git a/src/pipeline-tasks/pipeline-tasks.resolver.ts b/src/pipeline-tasks/pipeline-tasks.resolver.ts index b3e3d77..603f39e 100644 --- a/src/pipeline-tasks/pipeline-tasks.resolver.ts +++ b/src/pipeline-tasks/pipeline-tasks.resolver.ts @@ -29,6 +29,15 @@ export class PipelineTasksResolver { return asyncIterator; } + @Subscription(() => PipelineTask, { + resolve: (value) => { + return value; + }, + }) + async pipelineTaskChanged(@Args('id') id: string) { + return await this.service.watchTaskUpdated(id); + } + @Query(() => [PipelineTask]) async listPipelineTaskByPipelineId(@Args('pipelineId') pipelineId: string) { return await this.service.listTasksByPipelineId(pipelineId); diff --git a/src/pipeline-tasks/pipeline-tasks.service.ts b/src/pipeline-tasks/pipeline-tasks.service.ts index 8258aad..f01cc51 100644 --- a/src/pipeline-tasks/pipeline-tasks.service.ts +++ b/src/pipeline-tasks/pipeline-tasks.service.ts @@ -9,9 +9,11 @@ import { InjectQueue } from '@nestjs/bull'; import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants'; import { Queue } from 'bull'; import { LockFailedException } from '../commons/exceptions/lock-failed.exception'; +import { PubSub } from 'apollo-server-express'; @Injectable() export class PipelineTasksService { + pubSub = new PubSub(); constructor( @InjectRepository(PipelineTask) private readonly repository: Repository, @@ -80,9 +82,14 @@ export class PipelineTasksService { } async updateTask(task: PipelineTask) { + this.pubSub.publish(task.id, task); return await this.repository.save(task); } + async watchTaskUpdated(id: string) { + return this.pubSub.asyncIterator(id); + } + getRedisTokens(pipeline: Pipeline): [string, string] { return [`pipeline-${pipeline.id}:lck`, `pipeline-${pipeline.id}:tasks`]; }