import { Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { PipelineTask } from './pipeline-task.entity'; import { Repository } from 'typeorm'; import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input'; import { RedisService } from 'nestjs-redis'; import { Pipeline } from '../pipelines/pipeline.entity'; import { InjectQueue } from '@nestjs/bull'; import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants'; import { Queue } from 'bull'; import { LockFailedException } from '../commons/exceptions/lock-failed.exception'; @Injectable() export class PipelineTasksService { constructor( @InjectRepository(PipelineTask) private readonly repository: Repository, @InjectRepository(Pipeline) private readonly pipelineRepository: Repository, @InjectQueue(PIPELINE_TASK_QUEUE) private readonly queue: Queue, private readonly redis: RedisService, ) {} async addTask(dto: CreatePipelineTaskInput) { const pipeline = await this.pipelineRepository.findOneOrFail({ where: { id: dto.pipelineId }, relations: ['project'], }); const task = await this.repository.save(this.repository.create(dto)); task.pipeline = pipeline; const tasksKey = this.getRedisTokens(pipeline)[1]; const redis = this.redis.getClient(); await redis.lpush(tasksKey, JSON.stringify(task)); await this.doNextTask(pipeline); return task; } async findTaskById(id: string) { return await this.repository.findOneOrFail({ id }); } async listTasksByPipelineId(pipelineId: string) { return await this.repository.find({ pipelineId }); } async doNextTask(pipeline: Pipeline) { const [lckKey, tasksKey] = this.getRedisTokens(pipeline); const redis = this.redis.getClient(); const unLck = await new Promise<() => Promise>( async (resolve, reject) => { const lckValue = Date.now().toString(); for (let i = 0; i < 5; i++) { if ( await redis .set(lckKey, 0, 'EX', lckValue, 'NX') .then(() => true) .catch(() => false) ) { resolve(async () => { if ((await redis.get(lckKey)) === lckValue) { await redis.del(lckKey); } }); return; } await new Promise((resolve) => setTimeout(resolve, 2000)); } reject(new LockFailedException(lckKey)); }, ); const task = JSON.parse( (await redis.rpop(tasksKey).finally(() => unLck())) ?? 'null', ); if (task) { await this.queue.add(task); } } async updateTask(task: PipelineTask) { return await this.repository.save(task); } getRedisTokens(pipeline: Pipeline): [string, string] { return [`pipeline-${pipeline.id}:lck`, `pipeline-${pipeline.id}:tasks`]; } }