diff --git a/src/pipeline-tasks/pipeline-task.consumer.ts b/src/pipeline-tasks/pipeline-task.consumer.ts index 5f099a0..bfa76f1 100644 --- a/src/pipeline-tasks/pipeline-task.consumer.ts +++ b/src/pipeline-tasks/pipeline-task.consumer.ts @@ -1,6 +1,11 @@ import { PipelineTaskLogs } from './models/pipeline-task-logs.model'; import { ReposService } from './../repos/repos.service'; -import { OnQueueCompleted, Process, Processor } from '@nestjs/bull'; +import { + OnQueueCompleted, + OnQueueFailed, + Process, + Processor, +} from '@nestjs/bull'; import { Job } from 'bull'; import { spawn } from 'child_process'; import { PipelineTask } from './pipeline-task.entity'; @@ -121,4 +126,9 @@ export class PipelineTaskConsumer { onCompleted(job: Job) { this.service.doNextTask(job.data.pipeline); } + + @OnQueueFailed() + onFailed(job: Job) { + this.service.doNextTask(job.data.pipeline); + } } diff --git a/src/pipeline-tasks/pipeline-tasks.service.ts b/src/pipeline-tasks/pipeline-tasks.service.ts index f01cc51..ea36f2b 100644 --- a/src/pipeline-tasks/pipeline-tasks.service.ts +++ b/src/pipeline-tasks/pipeline-tasks.service.ts @@ -1,7 +1,7 @@ -import { Injectable } from '@nestjs/common'; +import { ConflictException, Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { PipelineTask } from './pipeline-task.entity'; -import { Repository } from 'typeorm'; +import { In, Repository } from 'typeorm'; import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input'; import { RedisService } from 'nestjs-redis'; import { Pipeline } from '../pipelines/pipeline.entity'; @@ -10,6 +10,8 @@ import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants'; import { Queue } from 'bull'; import { LockFailedException } from '../commons/exceptions/lock-failed.exception'; import { PubSub } from 'apollo-server-express'; +import { TaskStatuses } from './enums/task-statuses.enum'; +import { isNil } from 'ramda'; @Injectable() export class PipelineTasksService { @@ -28,6 +30,17 @@ export class PipelineTasksService { where: { id: dto.pipelineId }, relations: ['project'], }); + // const hasUnfinishedTask = await this.repository + // .findOne({ + // pipelineId: pipeline.id, + // status: In([TaskStatuses.pending, TaskStatuses.working]), + // }) + // .then((val) => !isNil(val)); + // if (hasUnfinishedTask) { + // throw new ConflictException( + // 'there are still unfinished task in the current pipeline.', + // ); + // } const task = await this.repository.save(this.repository.create(dto)); task.pipeline = pipeline;