diff --git a/src/pipeline-tasks/models/pipeline-task-log-message.module.ts b/src/pipeline-tasks/models/pipeline-task-log-message.module.ts index 3d66018..09bf4ca 100644 --- a/src/pipeline-tasks/models/pipeline-task-log-message.module.ts +++ b/src/pipeline-tasks/models/pipeline-task-log-message.module.ts @@ -12,13 +12,21 @@ export class PipelineTaskLogMessage { time: Date; @Field() message: string; + @Field() + isError: boolean; - static create(task: PipelineTask, unit: PipelineUnits, message: string) { + static create( + task: PipelineTask, + unit: PipelineUnits, + message: string, + isError: boolean, + ) { return Object.assign(new PipelineTaskLogMessage(), { task, message, time: new Date(), unit, + isError, }); } } diff --git a/src/pipeline-tasks/pipeline-task-logs.service.ts b/src/pipeline-tasks/pipeline-task-logs.service.ts index d515296..328ff64 100644 --- a/src/pipeline-tasks/pipeline-task-logs.service.ts +++ b/src/pipeline-tasks/pipeline-task-logs.service.ts @@ -1,8 +1,12 @@ import { Injectable } from '@nestjs/common'; +import { log } from 'console'; import { PubSub } from 'graphql-subscriptions'; import { RedisService } from 'nestjs-redis'; -import { omit } from 'ramda'; +import { find, omit, propEq } from 'ramda'; +import { PipelineUnits } from './enums/pipeline-units.enum'; +import { TaskStatuses } from './enums/task-statuses.enum'; import { PipelineTaskLogMessage } from './models/pipeline-task-log-message.module'; +import { PipelineTaskLogs } from './models/pipeline-task-logs.model'; import { PipelineTask } from './pipeline-task.entity'; const LOG_TIMEOUT_SECONDS = 10_000; @@ -33,7 +37,7 @@ export class PipelineTaskLogsService { ]); } - async readLogs(task: PipelineTask): Promise { + async readLog(task: PipelineTask): Promise { return await this.redis.lrange(this.getKeys(task), 0, -1).then((items) => items.map((item) => { const log = JSON.parse(item) as PipelineTaskLogMessage; @@ -44,6 +48,30 @@ export class PipelineTaskLogsService { ); } + async readLogsAsPipelineTaskLogs( + task: PipelineTask, + ): Promise { + const logs = await this.readLog(task); + const taskLogs: PipelineTaskLogs[] = []; + for (const log of logs) { + const taskLog = find( + propEq('unit', log.unit), + taskLogs, + ); + if (!taskLog) { + taskLogs.push({ + unit: (log.unit as unknown) as PipelineUnits, + status: TaskStatuses.working, + startedAt: log.time, + logs: log.message, + }); + } else { + taskLog.logs += log.message; + } + } + return taskLogs; + } + watchLogs(task: PipelineTask) { return this.pubSub.asyncIterator(this.getKeys(task)); } diff --git a/src/pipeline-tasks/pipeline-task.consumer.spec.ts b/src/pipeline-tasks/pipeline-task.consumer.spec.ts index ef703ae..25aa753 100644 --- a/src/pipeline-tasks/pipeline-task.consumer.spec.ts +++ b/src/pipeline-tasks/pipeline-task.consumer.spec.ts @@ -1,22 +1,22 @@ -import { PIPELINE_TASK_LOG_QUEUE } from './pipeline-tasks.constants'; import { Test, TestingModule } from '@nestjs/testing'; -import { Job, Queue } from 'bull'; +import { Job } from 'bull'; import { join } from 'path'; import { ReposService } from '../repos/repos.service'; import { PipelineUnits } from './enums/pipeline-units.enum'; import { PipelineTaskConsumer } from './pipeline-task.consumer'; import { PipelineTask } from './pipeline-task.entity'; import { PipelineTasksService } from './pipeline-tasks.service'; -import { getQueueToken } from '@nestjs/bull'; import { PipelineTaskLogMessage } from './models/pipeline-task-log-message.module'; import { Pipeline } from '../pipelines/pipeline.entity'; import { Project } from '../projects/project.entity'; import { TaskStatuses } from './enums/task-statuses.enum'; +import { PipelineTaskLogsService } from './pipeline-task-logs.service'; +import { ApplicationException } from '../commons/exceptions/application.exception'; describe('PipelineTaskConsumer', () => { let consumer: PipelineTaskConsumer; let tasksService: PipelineTasksService; - let logQueue: Queue; + let logsService: PipelineTaskLogsService; const getJob = () => ({ data: { @@ -42,19 +42,20 @@ describe('PipelineTaskConsumer', () => { checkout: async () => undefined, }, }, - PipelineTaskConsumer, { - provide: getQueueToken(PIPELINE_TASK_LOG_QUEUE), + provide: PipelineTaskLogsService, useValue: { - add: () => undefined, + recordLog: async () => undefined, + readLogsAsPipelineTaskLogs: async () => [], }, }, + PipelineTaskConsumer, ], }).compile(); tasksService = module.get(PipelineTasksService); + logsService = module.get(PipelineTaskLogsService); consumer = module.get(PipelineTaskConsumer); - logQueue = module.get(getQueueToken(PIPELINE_TASK_LOG_QUEUE)); }); it('should be defined', () => { @@ -71,31 +72,42 @@ describe('PipelineTaskConsumer', () => { }); describe('runScript', () => { + let logText: string; + let errorText: string; + let recordLog: jest.SpyInstance; + beforeEach(() => { + logText = ''; + errorText = ''; + recordLog = jest + .spyOn(logsService, 'recordLog') + .mockImplementation(async (log: PipelineTaskLogMessage) => { + logText += log.message; + if (log.isError) { + errorText += log.message; + } + }); + }); it('should success and log right message', async () => { - const add = jest.spyOn(logQueue, 'add'); - await expect( - consumer - .runScript( - 'node one-second-work.js', - join(__dirname, '../../test/data'), - ) - .then((arr) => arr.join('')), - ).resolves.toMatch(/10.+20.+30.+40.+50.+60.+70.+80.+90/s); - // expect(add).toHaveBeenCalledTimes(10); + await consumer.runScript( + 'node one-second-work.js', + join(__dirname, '../../test/data'), + ); + expect(logText).toMatch(/10.+20.+30.+40.+50.+60.+70.+80.+90/s); + expect(recordLog).toHaveBeenCalledTimes(10); expect( - ((add.mock.calls[8][0] as unknown) as PipelineTaskLogMessage).message, + ((recordLog.mock.calls[8][0] as unknown) as PipelineTaskLogMessage) + .message, ).toMatch(/^90/); }); it('should failed and log right message', async () => { - const add = jest.spyOn(logQueue, 'add'); await expect( consumer.runScript( 'node bad-work.js', join(__dirname, '../../test/data'), ), - ).rejects.toThrowError(/Error Message/); - // expect(add).toHaveBeenCalledTimes(8); - const logs = add.mock.calls + ).rejects.toThrowError(/exec script failed/); + expect(errorText).toMatch(/Error Message/); + const logs = recordLog.mock.calls .map((call) => ((call[0] as unknown) as PipelineTaskLogMessage).message) .join(''); expect(logs).toMatch(/10.+20.+30.+40.+50/s); @@ -104,16 +116,19 @@ describe('PipelineTaskConsumer', () => { const task = new PipelineTask(); task.id = 'test'; - const add = jest.spyOn(logQueue, 'add'); + const recordLog = jest.spyOn(logsService, 'recordLog'); await expect( consumer.runScript( 'node bad-work.js', join(__dirname, '../../test/data'), task, ), - ).rejects.toThrowError(/Error Message 2/); + ).rejects.toThrowError(/exec script failed/); + + expect(errorText).toMatch(/Error Message 2/); expect( - ((add.mock.calls[2][0] as unknown) as PipelineTaskLogMessage).task, + ((recordLog.mock.calls[2][0] as unknown) as PipelineTaskLogMessage) + .task, ).toMatchObject(task); }); }); @@ -145,6 +160,43 @@ describe('PipelineTaskConsumer', () => { task.pipeline.project = new Project(); task.pipeline.project.name = 'test-project'; }); + + it('success and update task on db', async () => { + const job: Job = ({ + data: task, + update: jest.fn().mockImplementation(() => undefined), + } as unknown) as Job; + + jest + .spyOn(consumer, 'runScript') + .mockImplementation(async () => undefined); + const updateTask = jest.spyOn(tasksService, 'updateTask'); + + await consumer.doTask(job); + + expect(updateTask).toHaveBeenCalledTimes(2); + expect(updateTask.mock.calls[0][0].startedAt).toBeDefined(); + expect(updateTask.mock.calls[1][0].endedAt).toBeDefined(); + expect(updateTask.mock.calls[1][0].status).toEqual(TaskStatuses.success); + }); + it('failed and update task on db', async () => { + const job: Job = ({ + data: task, + update: jest.fn().mockImplementation(() => undefined), + } as unknown) as Job; + + jest.spyOn(consumer, 'runScript').mockImplementation(async () => { + throw new ApplicationException('exec script failed'); + }); + const updateTask = jest.spyOn(tasksService, 'updateTask'); + + await consumer.doTask(job); + + expect(updateTask).toHaveBeenCalledTimes(2); + expect(updateTask.mock.calls[0][0].startedAt).toBeDefined(); + expect(updateTask.mock.calls[1][0].endedAt).toBeDefined(); + expect(updateTask.mock.calls[1][0].status).toEqual(TaskStatuses.failed); + }); it('should do all task', async () => { const job: Job = ({ data: task, @@ -153,18 +205,17 @@ describe('PipelineTaskConsumer', () => { const runScript = jest .spyOn(consumer, 'runScript') - .mockImplementation(async () => []); + .mockImplementation(async () => undefined); const updateTask = jest.spyOn(tasksService, 'updateTask'); await consumer.doTask(job); expect(runScript).toHaveBeenCalledTimes(1); - expect(updateTask).toHaveBeenCalledTimes(1); + expect(updateTask).toHaveBeenCalledTimes(2); const taskDto: PipelineTask = updateTask.mock.calls[0][0]; expect(taskDto.logs).toHaveLength(2); expect(taskDto.logs[0].status).toEqual(TaskStatuses.success); expect(taskDto.logs[0].unit).toEqual(PipelineUnits.checkout); - expect(taskDto.logs[1].logs).toMatch(/Hello, Fennec!/); }); it('should log error message', async () => { const job: Job = ({ @@ -181,12 +232,11 @@ describe('PipelineTaskConsumer', () => { await consumer.doTask(job); - expect(updateTask).toHaveBeenCalledTimes(1); + expect(updateTask).toHaveBeenCalledTimes(2); const taskDto: PipelineTask = updateTask.mock.calls[0][0]; expect(taskDto.logs).toHaveLength(2); expect(taskDto.logs[0].status).toEqual(TaskStatuses.success); expect(taskDto.logs[1].status).toEqual(TaskStatuses.failed); - expect(taskDto.logs[1].logs).toMatch(/bad message/); }); }); }); diff --git a/src/pipeline-tasks/pipeline-task.consumer.ts b/src/pipeline-tasks/pipeline-task.consumer.ts index 48df00c..5f099a0 100644 --- a/src/pipeline-tasks/pipeline-task.consumer.ts +++ b/src/pipeline-tasks/pipeline-task.consumer.ts @@ -1,18 +1,10 @@ import { PipelineTaskLogs } from './models/pipeline-task-logs.model'; import { ReposService } from './../repos/repos.service'; -import { - InjectQueue, - OnQueueCompleted, - Process, - Processor, -} from '@nestjs/bull'; -import { Job, Queue } from 'bull'; +import { OnQueueCompleted, Process, Processor } from '@nestjs/bull'; +import { Job } from 'bull'; import { spawn } from 'child_process'; import { PipelineTask } from './pipeline-task.entity'; -import { - PIPELINE_TASK_LOG_QUEUE, - PIPELINE_TASK_QUEUE, -} from './pipeline-tasks.constants'; +import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants'; import { PipelineTasksService } from './pipeline-tasks.service'; import { ApplicationException } from '../commons/exceptions/application.exception'; import { PipelineUnits } from './enums/pipeline-units.enum'; @@ -27,13 +19,19 @@ export class PipelineTaskConsumer { private readonly logsService: PipelineTaskLogsService, ) {} @Process() - async doTask({ data: task, update }: Job) { + async doTask(job: Job) { + let task = job.data; if (task.pipeline.workUnitMetadata.version !== 1) { throw new ApplicationException( 'work unit metadata version is not match.', ); } + task.startedAt = new Date(); + task.status = TaskStatuses.working; + task = await this.service.updateTask(task); + await job.update(task); + const workspaceRoot = this.reposService.getWorkspaceRootByTask(task); const units = task.units.map( @@ -49,21 +47,14 @@ export class PipelineTaskConsumer { unitLog.unit = unit.type; unitLog.startedAt = new Date(); try { - // 检出代码时,不执行其他脚本。 + // 检出代码前执行 git checkout if (unit.type === PipelineUnits.checkout) { await this.reposService.checkout(task, workspaceRoot); unitLog.status = TaskStatuses.success; - continue; } for (const script of unit.scripts) { unitLog.logs += `[RUN SCRIPT] ${script}`; - const messages = await this.runScript( - script, - workspaceRoot, - task, - unit.type, - ); - unitLog.logs += messages.join(''); + await this.runScript(script, workspaceRoot, task, unit.type); } unitLog.status = TaskStatuses.success; } catch (err) { @@ -72,15 +63,25 @@ export class PipelineTaskConsumer { throw err; } finally { unitLog.endedAt = new Date(); + unitLog.logs = await this.logsService + .readLogsAsPipelineTaskLogs(task) + .then( + (taskLogs) => + taskLogs.find((tl) => tl.unit === unit.type)?.logs ?? '', + ); task.logs.push(unitLog); - // await update(task); + await job.update(task); } } + + task.status = TaskStatuses.success; } catch (err) { + task.status = TaskStatuses.failed; console.log(err); } finally { + task.endedAt = new Date(); task = await this.service.updateTask(task); - await update(task); + await job.update(task); } } @@ -89,35 +90,29 @@ export class PipelineTaskConsumer { workspaceRoot: string, task?: PipelineTask, unit?: PipelineUnits, - ): Promise { + ): Promise { return new Promise((resolve, reject) => { - const errorMessages: string[] = []; - const logs: string[] = []; const sub = spawn(script, { shell: true, cwd: workspaceRoot, }); sub.stderr.on('data', (data: Buffer) => { const str = data.toString(); - errorMessages.push(str); - logs.push(str); this.logsService.recordLog( - PipelineTaskLogMessage.create(task, unit, str), + PipelineTaskLogMessage.create(task, unit, str, true), ); }); sub.stdout.on('data', (data: Buffer) => { const str = data.toString(); - logs.push(str); this.logsService.recordLog( - PipelineTaskLogMessage.create(task, unit, str), + PipelineTaskLogMessage.create(task, unit, str, false), ); }); sub.addListener('close', (code) => { if (code === 0) { - sub.stdout; - return resolve(logs); + return resolve(); } - return reject(new ApplicationException(errorMessages.join(''))); + return reject(new ApplicationException('exec script failed')); }); }); }