import { ReposService } from '../repos/repos.service'; import { spawn, ChildProcessWithoutNullStreams } from 'child_process'; import { PipelineTask } from './pipeline-task.entity'; import { ApplicationException } from '../commons/exceptions/application.exception'; import { PipelineUnits } from './enums/pipeline-units.enum'; import { TaskStatuses } from './enums/task-statuses.enum'; import { InjectPinoLogger, PinoLogger } from 'nestjs-pino'; import { AmqpConnection, RabbitRPC, RabbitSubscribe, } from '@golevelup/nestjs-rabbitmq'; import { PipelineTaskEvent } from './models/pipeline-task-event'; import { last } from 'ramda'; import { Inject } from '@nestjs/common'; import { EXCHANGE_PIPELINE_TASK_TOPIC, QUEUE_PIPELINE_TASK_KILL, ROUTE_PIPELINE_TASK_KILL, } from './pipeline-tasks.constants'; import { EXCHANGE_PIPELINE_TASK_FANOUT, ROUTE_PIPELINE_TASK_LOG, } from './pipeline-tasks.constants'; import { getInstanceName, getSelfInstanceQueueKey, getSelfInstanceRouteKey, } from '../commons/utils/rabbit-mq'; import { unlink } from 'fs/promises'; import { rename } from 'fs/promises'; type Spawn = typeof spawn; export class PipelineTaskRunner { readonly processes = new Map(); readonly stopTaskIds = new Set(); constructor( private readonly reposService: ReposService, @InjectPinoLogger(PipelineTaskRunner.name) private readonly logger: PinoLogger, @Inject('spawn') private readonly spawn: Spawn, private readonly amqpConnection: AmqpConnection, ) {} @RabbitSubscribe({ exchange: 'new-pipeline-task', routingKey: 'mac', queue: 'mac.new-pipeline-task', }) async onNewTask(task: PipelineTask) { this.logger.info({ task }, 'on new task [%s].', task.id); try { await this.doTask(task); } catch (err) { this.logger.error({ task, err }, err.message); } } @RabbitRPC({ exchange: EXCHANGE_PIPELINE_TASK_TOPIC, routingKey: getSelfInstanceRouteKey(ROUTE_PIPELINE_TASK_KILL), queue: getSelfInstanceQueueKey(QUEUE_PIPELINE_TASK_KILL), queueOptions: { autoDelete: true, durable: true, }, }) async onStopTask(task: PipelineTask) { this.logger.info({ task }, 'on stop task [%s].', task.id); this.stopTaskIds.add(task.id); const process = this.processes.get(task.id); if (process) { this.logger.info({ task }, 'send signal SIGINT to child process.'); process.kill('SIGINT'); setTimeout(() => { setTimeout(() => { this.stopTaskIds.delete(task.id); }, 10_000); if (process === this.processes.get(task.id)) { this.logger.info({ task }, 'send signal SIGKILL to child process.'); process.kill('SIGKILL'); return; } if (this.processes.has(task.id)) { this.logger.error( { task }, 'this pipeline task not stop yet. there is a new process running, maybe is a bug about error capture', ); } }, 10_000); } else { this.logger.info({ task }, 'child process is not running.'); } return true; } async doTask(task: PipelineTask) { if (task.pipeline.workUnitMetadata.version !== 1) { throw new ApplicationException( 'work unit metadata version is not match.', ); } await this.emitEvent( task, null, TaskStatuses.working, `[start task]`, 'stdout', ); this.logger.info('running task [%s].', task.id); try { let workspaceRoot = await this.checkout(task); const units = task.units .filter((unit) => unit !== PipelineUnits.checkout) .map( (type) => task.pipeline.workUnitMetadata.units.find( (unit) => unit.type === type, ) ?? { type: type, scripts: [] }, ); this.logger.info({ units }, 'begin run units.'); for (const unit of units) { if (unit.type === PipelineUnits.deploy) { const oldRoot = workspaceRoot; workspaceRoot = this.reposService.getDeployRoot(task); if (oldRoot !== workspaceRoot) { await unlink(workspaceRoot).catch(() => void 0); await rename(oldRoot, workspaceRoot); } await this.emitEvent( task, unit.type, TaskStatuses.success, `[deploy] change deploy folder content success`, 'stdout', ); } await this.doTaskUnit(unit.type, unit.scripts, task, workspaceRoot); } await this.emitEvent( task, null, TaskStatuses.success, `[finished task] success`, 'stdout', ); this.logger.info({ task }, 'task [%s] completed.', task.id); } catch (err) { await this.emitEvent( task, null, TaskStatuses.failed, `[finished unit] ${err.message}`, 'stderr', ); this.logger.error({ task, error: err }, 'task [%s] failed.', task.id); } finally { } } async doTaskUnit( unit: PipelineUnits, scripts: string[], task: PipelineTask, workspaceRoot: string, ) { await this.emitEvent( task, unit, TaskStatuses.working, `[begin unit] ${unit}`, 'stdin', ); this.logger.info({ task }, 'curr unit is %s', unit); try { for (const script of scripts) { this.logger.debug('begin runScript %s', script); if (this.stopTaskIds.has(task.id)) { throw new ApplicationException('Task is be KILLED'); } await this.runScript(script, workspaceRoot, task, unit); this.logger.debug('end runScript %s', script); } await this.emitEvent( task, unit, TaskStatuses.success, `[finished unit] ${unit}`, 'stdout', ); } catch (err) { await this.emitEvent( task, unit, TaskStatuses.failed, `[finished unit] ${err.message}`, 'stderr', ); throw err; } } async checkout(task: PipelineTask) { await this.emitEvent( task, PipelineUnits.checkout, TaskStatuses.working, '[begin unit] checkout', 'stdin', ); try { const path = await this.reposService.checkout4Task(task); await this.emitEvent( task, PipelineUnits.checkout, TaskStatuses.success, 'checkout success.', 'stdout', ); return path; } catch (err) { await this.emitEvent( task, PipelineUnits.checkout, TaskStatuses.failed, 'checkout failed.', 'stderr', ); } } async emitEvent( task: PipelineTask, unit: PipelineUnits | null, status: TaskStatuses, message: string, messageType: 'stderr' | 'stdout' | 'stdin', ) { const event: PipelineTaskEvent = { taskId: task.id, pipelineId: task.pipeline.id, projectId: task.pipeline.project.id, unit, emittedAt: new Date(), message: last(message) === '\n' ? message : message + '\n', messageType, status, }; this.amqpConnection .publish(EXCHANGE_PIPELINE_TASK_FANOUT, ROUTE_PIPELINE_TASK_LOG, event, { headers: { sender: getInstanceName(), }, }) .catch((error) => { this.logger.error( { error, event }, 'send event message to queue failed. %s', error.message, ); }); } async runScript( script: string, workspaceRoot: string, task: PipelineTask, unit: PipelineUnits, ): Promise { await this.emitEvent(task, unit, TaskStatuses.working, script, 'stdin'); return new Promise((resolve, reject) => { const sub = this.spawn(script, { shell: true, cwd: workspaceRoot, }); this.processes.set(task.id, sub); let loggingCount = 0; // semaphore sub.stderr.on('data', (data: Buffer) => { const str = data.toString(); loggingCount++; this.emitEvent(task, unit, TaskStatuses.working, str, 'stdout').finally( () => loggingCount--, ); }); sub.stdout.on('data', (data: Buffer) => { const str = data.toString(); loggingCount++; this.emitEvent(task, unit, TaskStatuses.working, str, 'stderr').finally( () => loggingCount--, ); }); sub.addListener('close', async (code) => { this.processes.delete(task.id); await new Promise(async (resolve) => { for (let i = 0; i < 10 && loggingCount > 0; i++) { await new Promise((resolve) => setTimeout(resolve, 500)); this.logger.debug('waiting logging... (%dx500ms)', i); } resolve(); }); if (code === 0) { return resolve(); } if (this.stopTaskIds.has(task.id)) { throw reject(new ApplicationException('Task is be KILLED')); } return reject(new ApplicationException('exec script failed')); }); }); } }