import { PipelineTaskLogs } from './models/pipeline-task-logs.model'; import { ReposService } from './../repos/repos.service'; import { OnQueueCompleted, OnQueueFailed, Process, Processor, } from '@nestjs/bull'; import { Job } from 'bull'; import { spawn } from 'child_process'; import { PipelineTask } from './pipeline-task.entity'; import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants'; import { PipelineTasksService } from './pipeline-tasks.service'; import { ApplicationException } from '../commons/exceptions/application.exception'; import { PipelineUnits } from './enums/pipeline-units.enum'; import { PipelineTaskLogMessage } from './models/pipeline-task-log-message.module'; import { TaskStatuses } from './enums/task-statuses.enum'; import { PipelineTaskLogsService } from './pipeline-task-logs.service'; import debug from 'debug'; const log = debug('fennec:pipeline-tasks:consumer'); @Processor(PIPELINE_TASK_QUEUE) export class PipelineTaskConsumer { constructor( private readonly service: PipelineTasksService, private readonly reposService: ReposService, private readonly logsService: PipelineTaskLogsService, ) {} @Process() async doTask(job: Job) { let task = job.data; if (task.pipeline.workUnitMetadata.version !== 1) { throw new ApplicationException( 'work unit metadata version is not match.', ); } task.startedAt = new Date(); task.status = TaskStatuses.working; task = await this.service.updateTask(task); log('start job'); await job.update(task); const workspaceRoot = this.reposService.getWorkspaceRootByTask(task); const units = task.units.map( (type) => task.pipeline.workUnitMetadata.units.find( (unit) => unit.type === type, ) ?? { type: type, scripts: [] }, ); log('task have [%o] units', units); try { for (const unit of units) { const unitLog = new PipelineTaskLogs(); unitLog.unit = unit.type; unitLog.startedAt = new Date(); log('curr unit is %s', unit.type); try { // 检出代码前执行 git checkout if (unit.type === PipelineUnits.checkout) { log('begin checkout'); await this.reposService.checkout(task, workspaceRoot); unitLog.status = TaskStatuses.success; log('end checkout'); } for (const script of unit.scripts) { unitLog.logs += `[RUN SCRIPT] ${script}`; log('begin runScript %s', script); await this.runScript(script, workspaceRoot, task, unit.type); log('end runScript %s', script); } unitLog.status = TaskStatuses.success; } catch (err) { unitLog.status = TaskStatuses.failed; unitLog.logs += err.message; throw err; } finally { unitLog.endedAt = new Date(); unitLog.logs = await this.logsService .readLogsAsPipelineTaskLogs(task) .then( (taskLogs) => taskLogs.find((tl) => tl.unit === unit.type)?.logs ?? '', ); task.logs.push(unitLog); await job.update(task); } } task.status = TaskStatuses.success; } catch (err) { task.status = TaskStatuses.failed; log('task is failed', err); } finally { task.endedAt = new Date(); task = await this.service.updateTask(task); await job.update(task); } } async runScript( script: string, workspaceRoot: string, task?: PipelineTask, unit?: PipelineUnits, ): Promise { return new Promise((resolve, reject) => { const sub = spawn(script, { shell: true, cwd: workspaceRoot, }); sub.stderr.on('data', (data: Buffer) => { const str = data.toString(); this.logsService.recordLog( PipelineTaskLogMessage.create(task, unit, str, true), ); }); sub.stdout.on('data', (data: Buffer) => { const str = data.toString(); this.logsService.recordLog( PipelineTaskLogMessage.create(task, unit, str, false), ); }); sub.addListener('close', (code) => { if (code === 0) { return resolve(); } return reject(new ApplicationException('exec script failed')); }); }); } @OnQueueCompleted() onCompleted(job: Job) { log('queue onCompleted'); this.service.doNextTask(job.data.pipeline); } @OnQueueFailed() onFailed(job: Job) { log('queue onFailed'); this.service.doNextTask(job.data.pipeline); } }