import { BadRequestException, Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { PipelineTask } from './pipeline-task.entity'; import { Repository } from 'typeorm'; import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input'; import { Pipeline } from '../pipelines/pipeline.entity'; import debug from 'debug'; import { AmqpConnection, RabbitRPC } from '@golevelup/nestjs-rabbitmq'; import { EXCHANGE_PIPELINE_TASK_TOPIC, QUEUE_PIPELINE_TASK_DONE, ROUTE_PIPELINE_TASK_DONE, } from './pipeline-tasks.constants'; import { PipelineTaskFlushService } from './pipeline-task-flush.service'; import { find, isNil, propEq } from 'ramda'; import { PipelineTaskLogs } from './models/pipeline-task-logs.model'; import { TaskStatuses, terminalTaskStatuses } from './enums/task-statuses.enum'; import { InjectPinoLogger, PinoLogger } from 'nestjs-pino'; import { getAppInstanceRouteKey } from '../commons/utils/rabbit-mq'; import { ROUTE_PIPELINE_TASK_KILL } from './pipeline-tasks.constants'; const log = debug('fennec:pipeline-tasks:service'); @Injectable() export class PipelineTasksService { constructor( @InjectRepository(PipelineTask) private readonly repository: Repository, @InjectRepository(Pipeline) private readonly pipelineRepository: Repository, private readonly amqpConnection: AmqpConnection, private readonly eventFlushService: PipelineTaskFlushService, @InjectPinoLogger(PipelineTasksService.name) private readonly logger: PinoLogger, ) {} async addTask(dto: CreatePipelineTaskInput) { const pipeline = await this.pipelineRepository.findOneOrFail({ where: { id: dto.pipelineId }, relations: ['project'], }); // const hasUnfinishedTask = await this.repository // .findOne({ // pipelineId: dto.pipelineId, // commit: dto.commit, // status: In([TaskStatuses.pending, TaskStatuses.working]), // }) // .then((val) => !isNil(val)); // if (hasUnfinishedTask) { // throw new ConflictException( // 'There are the same tasks among the unfinished tasks!', // ); // } const task = await this.repository.save(this.repository.create(dto)); task.pipeline = pipeline; this.amqpConnection.publish('new-pipeline-task', 'mac', task); return task; } async findTaskById(id: string) { return await this.repository.findOneOrFail({ id }); } async listTasksByPipelineId(pipelineId: string) { return await this.repository.find({ pipelineId }); } async listTasksByCommitHash(hash: string) { return await this.repository.find({ where: { commit: hash }, order: { createdAt: 'DESC' }, }); } getRedisTokens(pipeline: Pipeline): [string, string] { return [`pipeline-${pipeline.id}:lck`, `pipeline-${pipeline.id}:tasks`]; } @RabbitRPC({ exchange: EXCHANGE_PIPELINE_TASK_TOPIC, routingKey: ROUTE_PIPELINE_TASK_DONE, queue: QUEUE_PIPELINE_TASK_DONE, queueOptions: { autoDelete: true, durable: true, }, }) async updateByEvent({ taskId, runOn }: { taskId: string; runOn: string }) { try { const [events, task] = await Promise.all([ this.eventFlushService.read(taskId), this.findTaskById(taskId), ]); this.logger.info('[updateByEvent] start. taskId: %s', taskId); for (const event of events) { if (isNil(event.unit)) { if ( event.status !== TaskStatuses.pending && task.status === TaskStatuses.pending ) { task.startedAt = event.emittedAt; } else if (terminalTaskStatuses.includes(event.status)) { task.endedAt = event.emittedAt; } task.status = event.status; } else { let l: PipelineTaskLogs = find( propEq('unit', event.unit), task.logs, ); if (isNil(l)) { l = { unit: event.unit, startedAt: event.emittedAt, endedAt: null, logs: event.message, status: event.status, }; task.logs.push(l); } else { l.logs += event.message; } if (terminalTaskStatuses.includes(event.status)) { l.endedAt = event.emittedAt; } l.status = event.status; } } task.runOn = runOn; await this.repository.update({ id: taskId }, task); this.logger.info('[updateByEvent] success. taskId: %s', taskId); return task; } catch (error) { this.logger.error( { error }, '[updateByEvent] failed. taskId: %s', taskId, ); } } async stopTask(task: PipelineTask) { if (isNil(task.runOn)) { throw new BadRequestException( "the task have not running instance on database. field 'runOn' is nil", ); } await this.amqpConnection.request({ exchange: EXCHANGE_PIPELINE_TASK_TOPIC, routingKey: getAppInstanceRouteKey(ROUTE_PIPELINE_TASK_KILL, task.runOn), payload: task, }); } }