diff --git a/src/pipeline-tasks/models/pipeline-task-log-message.module.ts b/src/pipeline-tasks/models/pipeline-task-log-message.module.ts new file mode 100644 index 0000000..4ea73d0 --- /dev/null +++ b/src/pipeline-tasks/models/pipeline-task-log-message.module.ts @@ -0,0 +1,15 @@ +import { PipelineTask } from './../pipeline-task.entity'; + +export class PipelineTaskLogMessage { + task: PipelineTask; + time: Date; + message: string; + + static create(task: PipelineTask, message: string) { + return Object.assign(new PipelineTaskLogMessage(), { + task, + message, + time: new Date(), + }); + } +} diff --git a/src/pipeline-tasks/models/pipeline-task-logs.model.ts b/src/pipeline-tasks/models/pipeline-task-logs.model.ts index 7178b73..3c7b373 100644 --- a/src/pipeline-tasks/models/pipeline-task-logs.model.ts +++ b/src/pipeline-tasks/models/pipeline-task-logs.model.ts @@ -5,5 +5,5 @@ export class PipelineTaskLogs { status: TaskStatuses; startedAt?: Date; endedAt?: Date; - logs: string[]; + logs = ''; } diff --git a/src/pipeline-tasks/pipeline-task.consumer.spec.ts b/src/pipeline-tasks/pipeline-task.consumer.spec.ts index 20f2402..ef703ae 100644 --- a/src/pipeline-tasks/pipeline-task.consumer.spec.ts +++ b/src/pipeline-tasks/pipeline-task.consumer.spec.ts @@ -1,14 +1,22 @@ +import { PIPELINE_TASK_LOG_QUEUE } from './pipeline-tasks.constants'; import { Test, TestingModule } from '@nestjs/testing'; -import { Job } from 'bull'; +import { Job, Queue } from 'bull'; +import { join } from 'path'; import { ReposService } from '../repos/repos.service'; import { PipelineUnits } from './enums/pipeline-units.enum'; import { PipelineTaskConsumer } from './pipeline-task.consumer'; import { PipelineTask } from './pipeline-task.entity'; import { PipelineTasksService } from './pipeline-tasks.service'; +import { getQueueToken } from '@nestjs/bull'; +import { PipelineTaskLogMessage } from './models/pipeline-task-log-message.module'; +import { Pipeline } from '../pipelines/pipeline.entity'; +import { Project } from '../projects/project.entity'; +import { TaskStatuses } from './enums/task-statuses.enum'; describe('PipelineTaskConsumer', () => { let consumer: PipelineTaskConsumer; let tasksService: PipelineTasksService; + let logQueue: Queue; const getJob = () => ({ data: { @@ -24,20 +32,29 @@ describe('PipelineTaskConsumer', () => { provide: PipelineTasksService, useValue: { doNextTask: () => undefined, + updateTask: async (value) => value, }, }, { provide: ReposService, useValue: { getWorkspaceRootByTask: () => 'workspace-root', + checkout: async () => undefined, }, }, PipelineTaskConsumer, + { + provide: getQueueToken(PIPELINE_TASK_LOG_QUEUE), + useValue: { + add: () => undefined, + }, + }, ], }).compile(); tasksService = module.get(PipelineTasksService); consumer = module.get(PipelineTaskConsumer); + logQueue = module.get(getQueueToken(PIPELINE_TASK_LOG_QUEUE)); }); it('should be defined', () => { @@ -53,7 +70,123 @@ describe('PipelineTaskConsumer', () => { }); }); + describe('runScript', () => { + it('should success and log right message', async () => { + const add = jest.spyOn(logQueue, 'add'); + await expect( + consumer + .runScript( + 'node one-second-work.js', + join(__dirname, '../../test/data'), + ) + .then((arr) => arr.join('')), + ).resolves.toMatch(/10.+20.+30.+40.+50.+60.+70.+80.+90/s); + // expect(add).toHaveBeenCalledTimes(10); + expect( + ((add.mock.calls[8][0] as unknown) as PipelineTaskLogMessage).message, + ).toMatch(/^90/); + }); + it('should failed and log right message', async () => { + const add = jest.spyOn(logQueue, 'add'); + await expect( + consumer.runScript( + 'node bad-work.js', + join(__dirname, '../../test/data'), + ), + ).rejects.toThrowError(/Error Message/); + // expect(add).toHaveBeenCalledTimes(8); + const logs = add.mock.calls + .map((call) => ((call[0] as unknown) as PipelineTaskLogMessage).message) + .join(''); + expect(logs).toMatch(/10.+20.+30.+40.+50/s); + }); + it('should log with task', async () => { + const task = new PipelineTask(); + task.id = 'test'; + + const add = jest.spyOn(logQueue, 'add'); + await expect( + consumer.runScript( + 'node bad-work.js', + join(__dirname, '../../test/data'), + task, + ), + ).rejects.toThrowError(/Error Message 2/); + expect( + ((add.mock.calls[2][0] as unknown) as PipelineTaskLogMessage).task, + ).toMatchObject(task); + }); + }); + describe('doTask', () => { - it('should do all task', () => {}); + let task: PipelineTask; + + beforeEach(() => { + task = new PipelineTask(); + task.id = 'test-id'; + task.logs = []; + task.pipeline = new Pipeline(); + task.pipeline.workUnitMetadata = { + version: 1, + units: [ + { + type: PipelineUnits.checkout, + scripts: [], + }, + { + type: PipelineUnits.installDependencies, + scripts: ["echo ' Hello, Fennec!'"], + }, + ], + }; + task.units = task.pipeline.workUnitMetadata.units.map( + (unit) => unit.type, + ); + task.pipeline.project = new Project(); + task.pipeline.project.name = 'test-project'; + }); + it('should do all task', async () => { + const job: Job = ({ + data: task, + update: jest.fn().mockImplementation(() => undefined), + } as unknown) as Job; + + const runScript = jest + .spyOn(consumer, 'runScript') + .mockImplementation(async () => []); + const updateTask = jest.spyOn(tasksService, 'updateTask'); + + await consumer.doTask(job); + + expect(runScript).toHaveBeenCalledTimes(1); + expect(updateTask).toHaveBeenCalledTimes(1); + const taskDto: PipelineTask = updateTask.mock.calls[0][0]; + expect(taskDto.logs).toHaveLength(2); + expect(taskDto.logs[0].status).toEqual(TaskStatuses.success); + expect(taskDto.logs[0].unit).toEqual(PipelineUnits.checkout); + expect(taskDto.logs[1].logs).toMatch(/Hello, Fennec!/); + }); + it('should log error message', async () => { + const job: Job = ({ + data: task, + update: jest.fn().mockImplementation(() => undefined), + } as unknown) as Job; + + const runScript = jest + .spyOn(consumer, 'runScript') + .mockImplementation(async () => { + throw new Error('bad message'); + }); + const updateTask = jest.spyOn(tasksService, 'updateTask'); + + await consumer.doTask(job); + + expect(updateTask).toHaveBeenCalledTimes(1); + const taskDto: PipelineTask = updateTask.mock.calls[0][0]; + expect(taskDto.logs).toHaveLength(2); + expect(taskDto.logs[0].status).toEqual(TaskStatuses.success); + expect(taskDto.logs[1].status).toEqual(TaskStatuses.failed); + expect(taskDto.logs[1].logs).toMatch(/bad message/); + }); }); }); diff --git a/src/pipeline-tasks/pipeline-task.consumer.ts b/src/pipeline-tasks/pipeline-task.consumer.ts index 5758be4..1f2a0d0 100644 --- a/src/pipeline-tasks/pipeline-task.consumer.ts +++ b/src/pipeline-tasks/pipeline-task.consumer.ts @@ -1,20 +1,39 @@ +import { PipelineTaskLogs } from './models/pipeline-task-logs.model'; import { ReposService } from './../repos/repos.service'; -import { OnQueueCompleted, Process, Processor } from '@nestjs/bull'; -import { Job } from 'bull'; +import { + InjectQueue, + OnQueueCompleted, + Process, + Processor, +} from '@nestjs/bull'; +import { Job, Queue } from 'bull'; import { spawn } from 'child_process'; import { PipelineTask } from './pipeline-task.entity'; -import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants'; +import { + PIPELINE_TASK_LOG_QUEUE, + PIPELINE_TASK_QUEUE, +} from './pipeline-tasks.constants'; import { PipelineTasksService } from './pipeline-tasks.service'; import { ApplicationException } from '../commons/exceptions/application.exception'; import { PipelineUnits } from './enums/pipeline-units.enum'; +import { PipelineTaskLogMessage } from './models/pipeline-task-log-message.module'; +import { TaskStatuses } from './enums/task-statuses.enum'; @Processor(PIPELINE_TASK_QUEUE) export class PipelineTaskConsumer { constructor( private readonly service: PipelineTasksService, private readonly reposService: ReposService, + @InjectQueue(PIPELINE_TASK_LOG_QUEUE) + private readonly logQueue: Queue, ) {} @Process() - async doTask({ data: task }: Job) { + async doTask({ data: task, update }: Job) { + if (task.pipeline.workUnitMetadata.version !== 1) { + throw new ApplicationException( + 'work unit metadata version is not match.', + ); + } + const workspaceRoot = this.reposService.getWorkspaceRootByTask(task); const units = task.units.map( @@ -24,31 +43,70 @@ export class PipelineTaskConsumer { ) ?? { type: type, scripts: [] }, ); - for (const unit of units) { - // 检出代码时,不执行其他脚本。 - if (unit.type === PipelineUnits.checkout) { - await this.reposService.checkout(task, workspaceRoot); - continue; - } - for (const script of unit.scripts) { - await this.runScript(task, script, workspaceRoot); + try { + for (const unit of units) { + const unitLog = new PipelineTaskLogs(); + unitLog.unit = unit.type; + unitLog.startedAt = new Date(); + try { + // 检出代码时,不执行其他脚本。 + if (unit.type === PipelineUnits.checkout) { + await this.reposService.checkout(task, workspaceRoot); + unitLog.status = TaskStatuses.success; + continue; + } + for (const script of unit.scripts) { + unitLog.logs += `[RUN SCRIPT] ${script}`; + const messages = await this.runScript(script, workspaceRoot, task); + unitLog.logs += messages.join(''); + } + unitLog.status = TaskStatuses.success; + } catch (err) { + unitLog.status = TaskStatuses.failed; + unitLog.logs += err.message; + throw err; + } finally { + unitLog.endedAt = new Date(); + task.logs.push(unitLog); + } } + } catch (err) { + console.log(err); + } finally { + task = await this.service.updateTask(task); + update(task); } } - async runScript(task: PipelineTask, script: string, workspaceRoot: string) { + async runScript( + script: string, + workspaceRoot: string, + task?: PipelineTask, + ): Promise { return new Promise((resolve, reject) => { const errorMessages: string[] = []; + const logs: string[] = []; const sub = spawn(script, { shell: true, cwd: workspaceRoot, }); - sub.stderr.on('data', (data) => errorMessages.push(data)); + sub.stderr.on('data', (data: Buffer) => { + const str = data.toString(); + errorMessages.push(str); + logs.push(str); + this.logQueue.add(PipelineTaskLogMessage.create(task, str)); + }); + sub.stdout.on('data', (data: Buffer) => { + const str = data.toString(); + logs.push(str); + this.logQueue.add(PipelineTaskLogMessage.create(task, str)); + }); sub.addListener('close', (code) => { if (code === 0) { - return resolve(code); + sub.stdout; + return resolve(logs); } - return reject(new ApplicationException(errorMessages.join('\n'))); + return reject(new ApplicationException(errorMessages.join(''))); }); }); } diff --git a/src/pipeline-tasks/pipeline-tasks.constants.ts b/src/pipeline-tasks/pipeline-tasks.constants.ts index 08e8f67..777e284 100644 --- a/src/pipeline-tasks/pipeline-tasks.constants.ts +++ b/src/pipeline-tasks/pipeline-tasks.constants.ts @@ -1 +1,2 @@ export const PIPELINE_TASK_QUEUE = 'PIPELINE_TASK_QUEUE'; +export const PIPELINE_TASK_LOG_QUEUE = 'PIPELINE_TASK_LOG_QUEUE'; diff --git a/src/pipeline-tasks/pipeline-tasks.service.ts b/src/pipeline-tasks/pipeline-tasks.service.ts index 770a9c7..a5d6475 100644 --- a/src/pipeline-tasks/pipeline-tasks.service.ts +++ b/src/pipeline-tasks/pipeline-tasks.service.ts @@ -62,6 +62,10 @@ export class PipelineTasksService { } } + async updateTask(task: PipelineTask) { + return await this.repository.save(task); + } + getRedisTokens(pipeline: Pipeline): [string, string] { return [`pipeline-${pipeline.id}:lck`, `pipeline-${pipeline.id}:tasks`]; } diff --git a/test/data/bad-work.js b/test/data/bad-work.js new file mode 100644 index 0000000..caffb9c --- /dev/null +++ b/test/data/bad-work.js @@ -0,0 +1,8 @@ +for (let i = 1; i <= 5; i++) { + console.log(i * 10); +} +console.error('Error Message'); +console.error('Error Message 2'); +console.log('Bye-bye'); + +process.exit(1); diff --git a/test/data/one-second-work.js b/test/data/one-second-work.js new file mode 100644 index 0000000..be81207 --- /dev/null +++ b/test/data/one-second-work.js @@ -0,0 +1,7 @@ +let timer; +let count = 0; +setTimeout(() => clearInterval(timer), 1_000); + +timer = setInterval(() => { + console.log(++count * 10); +}, 95);