import { ConflictException, Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { PipelineTask } from './pipeline-task.entity'; import { In, Repository } from 'typeorm'; import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input'; import { RedisService } from 'nestjs-redis'; import { Pipeline } from '../pipelines/pipeline.entity'; import { InjectQueue } from '@nestjs/bull'; import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants'; import { Queue } from 'bull'; import { LockFailedException } from '../commons/exceptions/lock-failed.exception'; import { TaskStatuses } from './enums/task-statuses.enum'; import { isNil } from 'ramda'; import debug from 'debug'; import { InjectPubSub } from '../commons/pub-sub/decorators/inject-pub-sub.decorator'; import { PubSub } from '../commons/pub-sub/pub-sub'; import { observableToAsyncIterable } from '@graphql-tools/utils'; const log = debug('fennec:pipeline-tasks:service'); @Injectable() export class PipelineTasksService { constructor( @InjectRepository(PipelineTask) private readonly repository: Repository, @InjectRepository(Pipeline) private readonly pipelineRepository: Repository, @InjectQueue(PIPELINE_TASK_QUEUE) private readonly queue: Queue, private readonly redis: RedisService, @InjectPubSub() private readonly pubSub: PubSub, ) {} async addTask(dto: CreatePipelineTaskInput) { const pipeline = await this.pipelineRepository.findOneOrFail({ where: { id: dto.pipelineId }, relations: ['project'], }); const hasUnfinishedTask = await this.repository .findOne({ pipelineId: dto.pipelineId, commit: dto.commit, status: In([TaskStatuses.pending, TaskStatuses.working]), }) .then((val) => !isNil(val)); if (hasUnfinishedTask) { throw new ConflictException( 'There are the same tasks among the unfinished tasks!', ); } const task = await this.repository.save(this.repository.create(dto)); task.pipeline = pipeline; const tasksKey = this.getRedisTokens(pipeline)[1]; const redis = this.redis.getClient(); await redis.lpush(tasksKey, JSON.stringify(task)); log( 'add task %s:%s-%s', task.id, task.pipeline.branch, task.commit.slice(0, 6), ); await this.doNextTask(pipeline); return task; } async findTaskById(id: string) { return await this.repository.findOneOrFail({ id }); } async listTasksByPipelineId(pipelineId: string) { return await this.repository.find({ pipelineId }); } async listTasksByCommitHash(hash: string) { return await this.repository.find({ commit: hash }); } async doNextTask(pipeline: Pipeline) { const [lckKey, tasksKey] = this.getRedisTokens(pipeline); const redis = this.redis.getClient(); log('doNextTask()'); const unLck = await new Promise<() => Promise>( async (resolve, reject) => { const lckValue = Date.now().toString(); for (let i = 0; i < 5; i++) { if ( await redis .set(lckKey, 0, 'EX', lckValue, 'NX') .then(() => true) .catch(() => false) ) { resolve(async () => { if ((await redis.get(lckKey)) === lckValue) { await redis.del(lckKey); } }); return; } await new Promise((resolve) => setTimeout(resolve, 2000)); } reject(new LockFailedException(lckKey)); }, ); const task = JSON.parse( (await redis.rpop(tasksKey).finally(() => unLck())) ?? 'null', ); if (task) { log( 'add task (%s:%s-%s) to queue', task.id, task.pipeline.branch, task.commit.slice(0, 6), ); await this.queue.add(task); } else { log('task is empty'); } } async updateTask(task: PipelineTask) { this.pubSub.publish(`task:${task.id}`, task); return await this.repository.save(task); } async watchTaskUpdated(id: string) { return observableToAsyncIterable(this.pubSub.message$(`task:${id}`)); } getRedisTokens(pipeline: Pipeline): [string, string] { return [`pipeline-${pipeline.id}:lck`, `pipeline-${pipeline.id}:tasks`]; } }