From 607a4f57dee1b2b985b09a290d51d1aebfeec5d8 Mon Sep 17 00:00:00 2001 From: Ivan Date: Thu, 25 Mar 2021 11:38:14 +0800 Subject: [PATCH] =?UTF-8?q?feat(pipeline-tasks):=20=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E5=A4=B1=E8=B4=A5=E5=90=8E=EF=BC=8C=E4=B9=9F=E8=B0=83=E7=94=A8?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E4=B8=8B=E4=B8=80=E4=B8=AA=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/pipeline-tasks/pipeline-task.consumer.ts | 12 +++++++++++- src/pipeline-tasks/pipeline-tasks.service.ts | 17 +++++++++++++++-- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/src/pipeline-tasks/pipeline-task.consumer.ts b/src/pipeline-tasks/pipeline-task.consumer.ts index 5f099a0..bfa76f1 100644 --- a/src/pipeline-tasks/pipeline-task.consumer.ts +++ b/src/pipeline-tasks/pipeline-task.consumer.ts @@ -1,6 +1,11 @@ import { PipelineTaskLogs } from './models/pipeline-task-logs.model'; import { ReposService } from './../repos/repos.service'; -import { OnQueueCompleted, Process, Processor } from '@nestjs/bull'; +import { + OnQueueCompleted, + OnQueueFailed, + Process, + Processor, +} from '@nestjs/bull'; import { Job } from 'bull'; import { spawn } from 'child_process'; import { PipelineTask } from './pipeline-task.entity'; @@ -121,4 +126,9 @@ export class PipelineTaskConsumer { onCompleted(job: Job) { this.service.doNextTask(job.data.pipeline); } + + @OnQueueFailed() + onFailed(job: Job) { + this.service.doNextTask(job.data.pipeline); + } } diff --git a/src/pipeline-tasks/pipeline-tasks.service.ts b/src/pipeline-tasks/pipeline-tasks.service.ts index f01cc51..ea36f2b 100644 --- a/src/pipeline-tasks/pipeline-tasks.service.ts +++ b/src/pipeline-tasks/pipeline-tasks.service.ts @@ -1,7 +1,7 @@ -import { Injectable } from '@nestjs/common'; +import { ConflictException, Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { PipelineTask } from './pipeline-task.entity'; -import { Repository } from 'typeorm'; +import { In, Repository } from 'typeorm'; import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input'; import { RedisService } from 'nestjs-redis'; import { Pipeline } from '../pipelines/pipeline.entity'; @@ -10,6 +10,8 @@ import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants'; import { Queue } from 'bull'; import { LockFailedException } from '../commons/exceptions/lock-failed.exception'; import { PubSub } from 'apollo-server-express'; +import { TaskStatuses } from './enums/task-statuses.enum'; +import { isNil } from 'ramda'; @Injectable() export class PipelineTasksService { @@ -28,6 +30,17 @@ export class PipelineTasksService { where: { id: dto.pipelineId }, relations: ['project'], }); + // const hasUnfinishedTask = await this.repository + // .findOne({ + // pipelineId: pipeline.id, + // status: In([TaskStatuses.pending, TaskStatuses.working]), + // }) + // .then((val) => !isNil(val)); + // if (hasUnfinishedTask) { + // throw new ConflictException( + // 'there are still unfinished task in the current pipeline.', + // ); + // } const task = await this.repository.save(this.repository.create(dto)); task.pipeline = pipeline;