import { Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { PipelineTask } from './pipeline-task.entity'; import { Repository } from 'typeorm'; import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input'; import { RedisService } from 'nestjs-redis'; import { Pipeline } from '../pipelines/pipeline.entity'; import { InjectQueue } from '@nestjs/bull'; import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants'; import { Queue } from 'bull'; import { LockFailedException } from '../commons/exceptions/lock-failed.exception'; @Injectable() export class PipelineTasksService { constructor( @InjectRepository(PipelineTask) private readonly repository: Repository, @InjectRepository(Pipeline) private readonly pipelineRepository: Repository, private readonly redis: RedisService, @InjectQueue(PIPELINE_TASK_QUEUE) private readonly queue: Queue, ) {} async addTask(dto: CreatePipelineTaskInput) { const pipeline = await this.pipelineRepository.findOneOrFail({ where: { id: dto.pipelineId }, relations: ['project'], }); const task = await this.repository.save(this.repository.create(dto)); task.pipeline = pipeline; const [lckKey, tasksKey] = this.getRedisTokens(pipeline); const redis = this.redis.getClient(); await redis.lpush(tasksKey, JSON.stringify(task)).finally(() => { return redis.decr(lckKey); }); await this.doNextTask(pipeline); } async doNextTask(pipeline: Pipeline) { const [lckKey, tasksKey] = this.getRedisTokens(pipeline); const redis = this.redis.getClient(); await redis.set(lckKey, 0, 'EX', 10, 'NX'); await new Promise(async (resolve, reject) => { for (let i = 0; i < 5; i++) { if ((await redis.incr(lckKey)) === 1) { resolve(undefined); return; } await redis.decr(lckKey); await new Promise((resolve) => setTimeout(resolve, 2000)); } reject(new LockFailedException(lckKey)); }); const task = JSON.parse( (await redis.rpop(tasksKey).finally(() => redis.decr(lckKey))) ?? 'null', ); if (task) { await this.queue.add(task); } } getRedisTokens(pipeline: Pipeline): [string, string] { return [`pipeline-${pipeline.id}:lck`, `pipeline-${pipeline.id}:tasks`]; } }