import { PipelineTaskLogs } from './models/pipeline-task-logs.model'; import { ReposService } from './../repos/repos.service'; import { InjectQueue, OnQueueCompleted, Process, Processor, } from '@nestjs/bull'; import { Job, Queue } from 'bull'; import { spawn } from 'child_process'; import { PipelineTask } from './pipeline-task.entity'; import { PIPELINE_TASK_LOG_QUEUE, PIPELINE_TASK_QUEUE, } from './pipeline-tasks.constants'; import { PipelineTasksService } from './pipeline-tasks.service'; import { ApplicationException } from '../commons/exceptions/application.exception'; import { PipelineUnits } from './enums/pipeline-units.enum'; import { PipelineTaskLogMessage } from './models/pipeline-task-log-message.module'; import { TaskStatuses } from './enums/task-statuses.enum'; import { PipelineTaskLogsService } from './pipeline-task-logs.service'; @Processor(PIPELINE_TASK_QUEUE) export class PipelineTaskConsumer { constructor( private readonly service: PipelineTasksService, private readonly reposService: ReposService, private readonly logsService: PipelineTaskLogsService, ) {} @Process() async doTask({ data: task, update }: Job) { if (task.pipeline.workUnitMetadata.version !== 1) { throw new ApplicationException( 'work unit metadata version is not match.', ); } const workspaceRoot = this.reposService.getWorkspaceRootByTask(task); const units = task.units.map( (type) => task.pipeline.workUnitMetadata.units.find( (unit) => unit.type === type, ) ?? { type: type, scripts: [] }, ); try { for (const unit of units) { const unitLog = new PipelineTaskLogs(); unitLog.unit = unit.type; unitLog.startedAt = new Date(); try { // 检出代码时,不执行其他脚本。 if (unit.type === PipelineUnits.checkout) { await this.reposService.checkout(task, workspaceRoot); unitLog.status = TaskStatuses.success; continue; } for (const script of unit.scripts) { unitLog.logs += `[RUN SCRIPT] ${script}`; const messages = await this.runScript(script, workspaceRoot, task); unitLog.logs += messages.join(''); } unitLog.status = TaskStatuses.success; } catch (err) { unitLog.status = TaskStatuses.failed; unitLog.logs += err.message; throw err; } finally { unitLog.endedAt = new Date(); task.logs.push(unitLog); // await update(task); } } } catch (err) { console.log(err); } finally { task = await this.service.updateTask(task); await update(task); } } async runScript( script: string, workspaceRoot: string, task?: PipelineTask, ): Promise { return new Promise((resolve, reject) => { const errorMessages: string[] = []; const logs: string[] = []; const sub = spawn(script, { shell: true, cwd: workspaceRoot, }); sub.stderr.on('data', (data: Buffer) => { const str = data.toString(); errorMessages.push(str); logs.push(str); this.logsService.recordLog(PipelineTaskLogMessage.create(task, str)); }); sub.stdout.on('data', (data: Buffer) => { const str = data.toString(); logs.push(str); this.logsService.recordLog(PipelineTaskLogMessage.create(task, str)); }); sub.addListener('close', (code) => { if (code === 0) { sub.stdout; return resolve(logs); } return reject(new ApplicationException(errorMessages.join(''))); }); }); } @OnQueueCompleted() onCompleted(job: Job) { this.service.doNextTask(job.data.pipeline); } }