feat-pipelines #1
| @@ -12,13 +12,21 @@ export class PipelineTaskLogMessage { | |||||||
|   time: Date; |   time: Date; | ||||||
|   @Field() |   @Field() | ||||||
|   message: string; |   message: string; | ||||||
|  |   @Field() | ||||||
|  |   isError: boolean; | ||||||
|  |  | ||||||
|   static create(task: PipelineTask, unit: PipelineUnits, message: string) { |   static create( | ||||||
|  |     task: PipelineTask, | ||||||
|  |     unit: PipelineUnits, | ||||||
|  |     message: string, | ||||||
|  |     isError: boolean, | ||||||
|  |   ) { | ||||||
|     return Object.assign(new PipelineTaskLogMessage(), { |     return Object.assign(new PipelineTaskLogMessage(), { | ||||||
|       task, |       task, | ||||||
|       message, |       message, | ||||||
|       time: new Date(), |       time: new Date(), | ||||||
|       unit, |       unit, | ||||||
|  |       isError, | ||||||
|     }); |     }); | ||||||
|   } |   } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -1,8 +1,12 @@ | |||||||
| import { Injectable } from '@nestjs/common'; | import { Injectable } from '@nestjs/common'; | ||||||
|  | import { log } from 'console'; | ||||||
| import { PubSub } from 'graphql-subscriptions'; | import { PubSub } from 'graphql-subscriptions'; | ||||||
| import { RedisService } from 'nestjs-redis'; | import { RedisService } from 'nestjs-redis'; | ||||||
| import { omit } from 'ramda'; | import { find, omit, propEq } from 'ramda'; | ||||||
|  | import { PipelineUnits } from './enums/pipeline-units.enum'; | ||||||
|  | import { TaskStatuses } from './enums/task-statuses.enum'; | ||||||
| import { PipelineTaskLogMessage } from './models/pipeline-task-log-message.module'; | import { PipelineTaskLogMessage } from './models/pipeline-task-log-message.module'; | ||||||
|  | import { PipelineTaskLogs } from './models/pipeline-task-logs.model'; | ||||||
| import { PipelineTask } from './pipeline-task.entity'; | import { PipelineTask } from './pipeline-task.entity'; | ||||||
|  |  | ||||||
| const LOG_TIMEOUT_SECONDS = 10_000; | const LOG_TIMEOUT_SECONDS = 10_000; | ||||||
| @@ -33,7 +37,7 @@ export class PipelineTaskLogsService { | |||||||
|     ]); |     ]); | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   async readLogs(task: PipelineTask): Promise<PipelineTaskLogMessage[]> { |   async readLog(task: PipelineTask): Promise<PipelineTaskLogMessage[]> { | ||||||
|     return await this.redis.lrange(this.getKeys(task), 0, -1).then((items) => |     return await this.redis.lrange(this.getKeys(task), 0, -1).then((items) => | ||||||
|       items.map((item) => { |       items.map((item) => { | ||||||
|         const log = JSON.parse(item) as PipelineTaskLogMessage; |         const log = JSON.parse(item) as PipelineTaskLogMessage; | ||||||
| @@ -44,6 +48,30 @@ export class PipelineTaskLogsService { | |||||||
|     ); |     ); | ||||||
|   } |   } | ||||||
|  |  | ||||||
|  |   async readLogsAsPipelineTaskLogs( | ||||||
|  |     task: PipelineTask, | ||||||
|  |   ): Promise<PipelineTaskLogs[]> { | ||||||
|  |     const logs = await this.readLog(task); | ||||||
|  |     const taskLogs: PipelineTaskLogs[] = []; | ||||||
|  |     for (const log of logs) { | ||||||
|  |       const taskLog = find<PipelineTaskLogs>( | ||||||
|  |         propEq('unit', log.unit), | ||||||
|  |         taskLogs, | ||||||
|  |       ); | ||||||
|  |       if (!taskLog) { | ||||||
|  |         taskLogs.push({ | ||||||
|  |           unit: (log.unit as unknown) as PipelineUnits, | ||||||
|  |           status: TaskStatuses.working, | ||||||
|  |           startedAt: log.time, | ||||||
|  |           logs: log.message, | ||||||
|  |         }); | ||||||
|  |       } else { | ||||||
|  |         taskLog.logs += log.message; | ||||||
|  |       } | ||||||
|  |     } | ||||||
|  |     return taskLogs; | ||||||
|  |   } | ||||||
|  |  | ||||||
|   watchLogs(task: PipelineTask) { |   watchLogs(task: PipelineTask) { | ||||||
|     return this.pubSub.asyncIterator(this.getKeys(task)); |     return this.pubSub.asyncIterator(this.getKeys(task)); | ||||||
|   } |   } | ||||||
|   | |||||||
| @@ -1,22 +1,22 @@ | |||||||
| import { PIPELINE_TASK_LOG_QUEUE } from './pipeline-tasks.constants'; |  | ||||||
| import { Test, TestingModule } from '@nestjs/testing'; | import { Test, TestingModule } from '@nestjs/testing'; | ||||||
| import { Job, Queue } from 'bull'; | import { Job } from 'bull'; | ||||||
| import { join } from 'path'; | import { join } from 'path'; | ||||||
| import { ReposService } from '../repos/repos.service'; | import { ReposService } from '../repos/repos.service'; | ||||||
| import { PipelineUnits } from './enums/pipeline-units.enum'; | import { PipelineUnits } from './enums/pipeline-units.enum'; | ||||||
| import { PipelineTaskConsumer } from './pipeline-task.consumer'; | import { PipelineTaskConsumer } from './pipeline-task.consumer'; | ||||||
| import { PipelineTask } from './pipeline-task.entity'; | import { PipelineTask } from './pipeline-task.entity'; | ||||||
| import { PipelineTasksService } from './pipeline-tasks.service'; | import { PipelineTasksService } from './pipeline-tasks.service'; | ||||||
| import { getQueueToken } from '@nestjs/bull'; |  | ||||||
| import { PipelineTaskLogMessage } from './models/pipeline-task-log-message.module'; | import { PipelineTaskLogMessage } from './models/pipeline-task-log-message.module'; | ||||||
| import { Pipeline } from '../pipelines/pipeline.entity'; | import { Pipeline } from '../pipelines/pipeline.entity'; | ||||||
| import { Project } from '../projects/project.entity'; | import { Project } from '../projects/project.entity'; | ||||||
| import { TaskStatuses } from './enums/task-statuses.enum'; | import { TaskStatuses } from './enums/task-statuses.enum'; | ||||||
|  | import { PipelineTaskLogsService } from './pipeline-task-logs.service'; | ||||||
|  | import { ApplicationException } from '../commons/exceptions/application.exception'; | ||||||
|  |  | ||||||
| describe('PipelineTaskConsumer', () => { | describe('PipelineTaskConsumer', () => { | ||||||
|   let consumer: PipelineTaskConsumer; |   let consumer: PipelineTaskConsumer; | ||||||
|   let tasksService: PipelineTasksService; |   let tasksService: PipelineTasksService; | ||||||
|   let logQueue: Queue<PipelineTaskLogMessage>; |   let logsService: PipelineTaskLogsService; | ||||||
|   const getJob = () => |   const getJob = () => | ||||||
|     ({ |     ({ | ||||||
|       data: { |       data: { | ||||||
| @@ -42,19 +42,20 @@ describe('PipelineTaskConsumer', () => { | |||||||
|             checkout: async () => undefined, |             checkout: async () => undefined, | ||||||
|           }, |           }, | ||||||
|         }, |         }, | ||||||
|         PipelineTaskConsumer, |  | ||||||
|         { |         { | ||||||
|           provide: getQueueToken(PIPELINE_TASK_LOG_QUEUE), |           provide: PipelineTaskLogsService, | ||||||
|           useValue: { |           useValue: { | ||||||
|             add: () => undefined, |             recordLog: async () => undefined, | ||||||
|  |             readLogsAsPipelineTaskLogs: async () => [], | ||||||
|           }, |           }, | ||||||
|         }, |         }, | ||||||
|  |         PipelineTaskConsumer, | ||||||
|       ], |       ], | ||||||
|     }).compile(); |     }).compile(); | ||||||
|  |  | ||||||
|     tasksService = module.get(PipelineTasksService); |     tasksService = module.get(PipelineTasksService); | ||||||
|  |     logsService = module.get(PipelineTaskLogsService); | ||||||
|     consumer = module.get(PipelineTaskConsumer); |     consumer = module.get(PipelineTaskConsumer); | ||||||
|     logQueue = module.get(getQueueToken(PIPELINE_TASK_LOG_QUEUE)); |  | ||||||
|   }); |   }); | ||||||
|  |  | ||||||
|   it('should be defined', () => { |   it('should be defined', () => { | ||||||
| @@ -71,31 +72,42 @@ describe('PipelineTaskConsumer', () => { | |||||||
|   }); |   }); | ||||||
|  |  | ||||||
|   describe('runScript', () => { |   describe('runScript', () => { | ||||||
|  |     let logText: string; | ||||||
|  |     let errorText: string; | ||||||
|  |     let recordLog: jest.SpyInstance; | ||||||
|  |     beforeEach(() => { | ||||||
|  |       logText = ''; | ||||||
|  |       errorText = ''; | ||||||
|  |       recordLog = jest | ||||||
|  |         .spyOn(logsService, 'recordLog') | ||||||
|  |         .mockImplementation(async (log: PipelineTaskLogMessage) => { | ||||||
|  |           logText += log.message; | ||||||
|  |           if (log.isError) { | ||||||
|  |             errorText += log.message; | ||||||
|  |           } | ||||||
|  |         }); | ||||||
|  |     }); | ||||||
|     it('should success and log right message', async () => { |     it('should success and log right message', async () => { | ||||||
|       const add = jest.spyOn(logQueue, 'add'); |       await consumer.runScript( | ||||||
|       await expect( |         'node one-second-work.js', | ||||||
|         consumer |         join(__dirname, '../../test/data'), | ||||||
|           .runScript( |       ); | ||||||
|             'node one-second-work.js', |       expect(logText).toMatch(/10.+20.+30.+40.+50.+60.+70.+80.+90/s); | ||||||
|             join(__dirname, '../../test/data'), |       expect(recordLog).toHaveBeenCalledTimes(10); | ||||||
|           ) |  | ||||||
|           .then((arr) => arr.join('')), |  | ||||||
|       ).resolves.toMatch(/10.+20.+30.+40.+50.+60.+70.+80.+90/s); |  | ||||||
|       // expect(add).toHaveBeenCalledTimes(10); |  | ||||||
|       expect( |       expect( | ||||||
|         ((add.mock.calls[8][0] as unknown) as PipelineTaskLogMessage).message, |         ((recordLog.mock.calls[8][0] as unknown) as PipelineTaskLogMessage) | ||||||
|  |           .message, | ||||||
|       ).toMatch(/^90/); |       ).toMatch(/^90/); | ||||||
|     }); |     }); | ||||||
|     it('should failed and log right message', async () => { |     it('should failed and log right message', async () => { | ||||||
|       const add = jest.spyOn(logQueue, 'add'); |  | ||||||
|       await expect( |       await expect( | ||||||
|         consumer.runScript( |         consumer.runScript( | ||||||
|           'node bad-work.js', |           'node bad-work.js', | ||||||
|           join(__dirname, '../../test/data'), |           join(__dirname, '../../test/data'), | ||||||
|         ), |         ), | ||||||
|       ).rejects.toThrowError(/Error Message/); |       ).rejects.toThrowError(/exec script failed/); | ||||||
|       // expect(add).toHaveBeenCalledTimes(8); |       expect(errorText).toMatch(/Error Message/); | ||||||
|       const logs = add.mock.calls |       const logs = recordLog.mock.calls | ||||||
|         .map((call) => ((call[0] as unknown) as PipelineTaskLogMessage).message) |         .map((call) => ((call[0] as unknown) as PipelineTaskLogMessage).message) | ||||||
|         .join(''); |         .join(''); | ||||||
|       expect(logs).toMatch(/10.+20.+30.+40.+50/s); |       expect(logs).toMatch(/10.+20.+30.+40.+50/s); | ||||||
| @@ -104,16 +116,19 @@ describe('PipelineTaskConsumer', () => { | |||||||
|       const task = new PipelineTask(); |       const task = new PipelineTask(); | ||||||
|       task.id = 'test'; |       task.id = 'test'; | ||||||
|  |  | ||||||
|       const add = jest.spyOn(logQueue, 'add'); |       const recordLog = jest.spyOn(logsService, 'recordLog'); | ||||||
|       await expect( |       await expect( | ||||||
|         consumer.runScript( |         consumer.runScript( | ||||||
|           'node bad-work.js', |           'node bad-work.js', | ||||||
|           join(__dirname, '../../test/data'), |           join(__dirname, '../../test/data'), | ||||||
|           task, |           task, | ||||||
|         ), |         ), | ||||||
|       ).rejects.toThrowError(/Error Message 2/); |       ).rejects.toThrowError(/exec script failed/); | ||||||
|  |  | ||||||
|  |       expect(errorText).toMatch(/Error Message 2/); | ||||||
|       expect( |       expect( | ||||||
|         ((add.mock.calls[2][0] as unknown) as PipelineTaskLogMessage).task, |         ((recordLog.mock.calls[2][0] as unknown) as PipelineTaskLogMessage) | ||||||
|  |           .task, | ||||||
|       ).toMatchObject(task); |       ).toMatchObject(task); | ||||||
|     }); |     }); | ||||||
|   }); |   }); | ||||||
| @@ -145,6 +160,43 @@ describe('PipelineTaskConsumer', () => { | |||||||
|       task.pipeline.project = new Project(); |       task.pipeline.project = new Project(); | ||||||
|       task.pipeline.project.name = 'test-project'; |       task.pipeline.project.name = 'test-project'; | ||||||
|     }); |     }); | ||||||
|  |  | ||||||
|  |     it('success and update task on db', async () => { | ||||||
|  |       const job: Job = ({ | ||||||
|  |         data: task, | ||||||
|  |         update: jest.fn().mockImplementation(() => undefined), | ||||||
|  |       } as unknown) as Job; | ||||||
|  |  | ||||||
|  |       jest | ||||||
|  |         .spyOn(consumer, 'runScript') | ||||||
|  |         .mockImplementation(async () => undefined); | ||||||
|  |       const updateTask = jest.spyOn(tasksService, 'updateTask'); | ||||||
|  |  | ||||||
|  |       await consumer.doTask(job); | ||||||
|  |  | ||||||
|  |       expect(updateTask).toHaveBeenCalledTimes(2); | ||||||
|  |       expect(updateTask.mock.calls[0][0].startedAt).toBeDefined(); | ||||||
|  |       expect(updateTask.mock.calls[1][0].endedAt).toBeDefined(); | ||||||
|  |       expect(updateTask.mock.calls[1][0].status).toEqual(TaskStatuses.success); | ||||||
|  |     }); | ||||||
|  |     it('failed and update task on db', async () => { | ||||||
|  |       const job: Job = ({ | ||||||
|  |         data: task, | ||||||
|  |         update: jest.fn().mockImplementation(() => undefined), | ||||||
|  |       } as unknown) as Job; | ||||||
|  |  | ||||||
|  |       jest.spyOn(consumer, 'runScript').mockImplementation(async () => { | ||||||
|  |         throw new ApplicationException('exec script failed'); | ||||||
|  |       }); | ||||||
|  |       const updateTask = jest.spyOn(tasksService, 'updateTask'); | ||||||
|  |  | ||||||
|  |       await consumer.doTask(job); | ||||||
|  |  | ||||||
|  |       expect(updateTask).toHaveBeenCalledTimes(2); | ||||||
|  |       expect(updateTask.mock.calls[0][0].startedAt).toBeDefined(); | ||||||
|  |       expect(updateTask.mock.calls[1][0].endedAt).toBeDefined(); | ||||||
|  |       expect(updateTask.mock.calls[1][0].status).toEqual(TaskStatuses.failed); | ||||||
|  |     }); | ||||||
|     it('should do all task', async () => { |     it('should do all task', async () => { | ||||||
|       const job: Job = ({ |       const job: Job = ({ | ||||||
|         data: task, |         data: task, | ||||||
| @@ -153,18 +205,17 @@ describe('PipelineTaskConsumer', () => { | |||||||
|  |  | ||||||
|       const runScript = jest |       const runScript = jest | ||||||
|         .spyOn(consumer, 'runScript') |         .spyOn(consumer, 'runScript') | ||||||
|         .mockImplementation(async () => []); |         .mockImplementation(async () => undefined); | ||||||
|       const updateTask = jest.spyOn(tasksService, 'updateTask'); |       const updateTask = jest.spyOn(tasksService, 'updateTask'); | ||||||
|  |  | ||||||
|       await consumer.doTask(job); |       await consumer.doTask(job); | ||||||
|  |  | ||||||
|       expect(runScript).toHaveBeenCalledTimes(1); |       expect(runScript).toHaveBeenCalledTimes(1); | ||||||
|       expect(updateTask).toHaveBeenCalledTimes(1); |       expect(updateTask).toHaveBeenCalledTimes(2); | ||||||
|       const taskDto: PipelineTask = updateTask.mock.calls[0][0]; |       const taskDto: PipelineTask = updateTask.mock.calls[0][0]; | ||||||
|       expect(taskDto.logs).toHaveLength(2); |       expect(taskDto.logs).toHaveLength(2); | ||||||
|       expect(taskDto.logs[0].status).toEqual(TaskStatuses.success); |       expect(taskDto.logs[0].status).toEqual(TaskStatuses.success); | ||||||
|       expect(taskDto.logs[0].unit).toEqual(PipelineUnits.checkout); |       expect(taskDto.logs[0].unit).toEqual(PipelineUnits.checkout); | ||||||
|       expect(taskDto.logs[1].logs).toMatch(/Hello, Fennec!/); |  | ||||||
|     }); |     }); | ||||||
|     it('should log error message', async () => { |     it('should log error message', async () => { | ||||||
|       const job: Job = ({ |       const job: Job = ({ | ||||||
| @@ -181,12 +232,11 @@ describe('PipelineTaskConsumer', () => { | |||||||
|  |  | ||||||
|       await consumer.doTask(job); |       await consumer.doTask(job); | ||||||
|  |  | ||||||
|       expect(updateTask).toHaveBeenCalledTimes(1); |       expect(updateTask).toHaveBeenCalledTimes(2); | ||||||
|       const taskDto: PipelineTask = updateTask.mock.calls[0][0]; |       const taskDto: PipelineTask = updateTask.mock.calls[0][0]; | ||||||
|       expect(taskDto.logs).toHaveLength(2); |       expect(taskDto.logs).toHaveLength(2); | ||||||
|       expect(taskDto.logs[0].status).toEqual(TaskStatuses.success); |       expect(taskDto.logs[0].status).toEqual(TaskStatuses.success); | ||||||
|       expect(taskDto.logs[1].status).toEqual(TaskStatuses.failed); |       expect(taskDto.logs[1].status).toEqual(TaskStatuses.failed); | ||||||
|       expect(taskDto.logs[1].logs).toMatch(/bad message/); |  | ||||||
|     }); |     }); | ||||||
|   }); |   }); | ||||||
| }); | }); | ||||||
|   | |||||||
| @@ -1,18 +1,10 @@ | |||||||
| import { PipelineTaskLogs } from './models/pipeline-task-logs.model'; | import { PipelineTaskLogs } from './models/pipeline-task-logs.model'; | ||||||
| import { ReposService } from './../repos/repos.service'; | import { ReposService } from './../repos/repos.service'; | ||||||
| import { | import { OnQueueCompleted, Process, Processor } from '@nestjs/bull'; | ||||||
|   InjectQueue, | import { Job } from 'bull'; | ||||||
|   OnQueueCompleted, |  | ||||||
|   Process, |  | ||||||
|   Processor, |  | ||||||
| } from '@nestjs/bull'; |  | ||||||
| import { Job, Queue } from 'bull'; |  | ||||||
| import { spawn } from 'child_process'; | import { spawn } from 'child_process'; | ||||||
| import { PipelineTask } from './pipeline-task.entity'; | import { PipelineTask } from './pipeline-task.entity'; | ||||||
| import { | import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants'; | ||||||
|   PIPELINE_TASK_LOG_QUEUE, |  | ||||||
|   PIPELINE_TASK_QUEUE, |  | ||||||
| } from './pipeline-tasks.constants'; |  | ||||||
| import { PipelineTasksService } from './pipeline-tasks.service'; | import { PipelineTasksService } from './pipeline-tasks.service'; | ||||||
| import { ApplicationException } from '../commons/exceptions/application.exception'; | import { ApplicationException } from '../commons/exceptions/application.exception'; | ||||||
| import { PipelineUnits } from './enums/pipeline-units.enum'; | import { PipelineUnits } from './enums/pipeline-units.enum'; | ||||||
| @@ -27,13 +19,19 @@ export class PipelineTaskConsumer { | |||||||
|     private readonly logsService: PipelineTaskLogsService, |     private readonly logsService: PipelineTaskLogsService, | ||||||
|   ) {} |   ) {} | ||||||
|   @Process() |   @Process() | ||||||
|   async doTask({ data: task, update }: Job<PipelineTask>) { |   async doTask(job: Job<PipelineTask>) { | ||||||
|  |     let task = job.data; | ||||||
|     if (task.pipeline.workUnitMetadata.version !== 1) { |     if (task.pipeline.workUnitMetadata.version !== 1) { | ||||||
|       throw new ApplicationException( |       throw new ApplicationException( | ||||||
|         'work unit metadata version is not match.', |         'work unit metadata version is not match.', | ||||||
|       ); |       ); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     task.startedAt = new Date(); | ||||||
|  |     task.status = TaskStatuses.working; | ||||||
|  |     task = await this.service.updateTask(task); | ||||||
|  |     await job.update(task); | ||||||
|  |  | ||||||
|     const workspaceRoot = this.reposService.getWorkspaceRootByTask(task); |     const workspaceRoot = this.reposService.getWorkspaceRootByTask(task); | ||||||
|  |  | ||||||
|     const units = task.units.map( |     const units = task.units.map( | ||||||
| @@ -49,21 +47,14 @@ export class PipelineTaskConsumer { | |||||||
|         unitLog.unit = unit.type; |         unitLog.unit = unit.type; | ||||||
|         unitLog.startedAt = new Date(); |         unitLog.startedAt = new Date(); | ||||||
|         try { |         try { | ||||||
|           // 检出代码时,不执行其他脚本。 |           // 检出代码前执行 git checkout | ||||||
|           if (unit.type === PipelineUnits.checkout) { |           if (unit.type === PipelineUnits.checkout) { | ||||||
|             await this.reposService.checkout(task, workspaceRoot); |             await this.reposService.checkout(task, workspaceRoot); | ||||||
|             unitLog.status = TaskStatuses.success; |             unitLog.status = TaskStatuses.success; | ||||||
|             continue; |  | ||||||
|           } |           } | ||||||
|           for (const script of unit.scripts) { |           for (const script of unit.scripts) { | ||||||
|             unitLog.logs += `[RUN SCRIPT] ${script}`; |             unitLog.logs += `[RUN SCRIPT] ${script}`; | ||||||
|             const messages = await this.runScript( |             await this.runScript(script, workspaceRoot, task, unit.type); | ||||||
|               script, |  | ||||||
|               workspaceRoot, |  | ||||||
|               task, |  | ||||||
|               unit.type, |  | ||||||
|             ); |  | ||||||
|             unitLog.logs += messages.join(''); |  | ||||||
|           } |           } | ||||||
|           unitLog.status = TaskStatuses.success; |           unitLog.status = TaskStatuses.success; | ||||||
|         } catch (err) { |         } catch (err) { | ||||||
| @@ -72,15 +63,25 @@ export class PipelineTaskConsumer { | |||||||
|           throw err; |           throw err; | ||||||
|         } finally { |         } finally { | ||||||
|           unitLog.endedAt = new Date(); |           unitLog.endedAt = new Date(); | ||||||
|  |           unitLog.logs = await this.logsService | ||||||
|  |             .readLogsAsPipelineTaskLogs(task) | ||||||
|  |             .then( | ||||||
|  |               (taskLogs) => | ||||||
|  |                 taskLogs.find((tl) => tl.unit === unit.type)?.logs ?? '', | ||||||
|  |             ); | ||||||
|           task.logs.push(unitLog); |           task.logs.push(unitLog); | ||||||
|           // await update(task); |           await job.update(task); | ||||||
|         } |         } | ||||||
|       } |       } | ||||||
|  |  | ||||||
|  |       task.status = TaskStatuses.success; | ||||||
|     } catch (err) { |     } catch (err) { | ||||||
|  |       task.status = TaskStatuses.failed; | ||||||
|       console.log(err); |       console.log(err); | ||||||
|     } finally { |     } finally { | ||||||
|  |       task.endedAt = new Date(); | ||||||
|       task = await this.service.updateTask(task); |       task = await this.service.updateTask(task); | ||||||
|       await update(task); |       await job.update(task); | ||||||
|     } |     } | ||||||
|   } |   } | ||||||
|  |  | ||||||
| @@ -89,35 +90,29 @@ export class PipelineTaskConsumer { | |||||||
|     workspaceRoot: string, |     workspaceRoot: string, | ||||||
|     task?: PipelineTask, |     task?: PipelineTask, | ||||||
|     unit?: PipelineUnits, |     unit?: PipelineUnits, | ||||||
|   ): Promise<string[]> { |   ): Promise<void> { | ||||||
|     return new Promise((resolve, reject) => { |     return new Promise((resolve, reject) => { | ||||||
|       const errorMessages: string[] = []; |  | ||||||
|       const logs: string[] = []; |  | ||||||
|       const sub = spawn(script, { |       const sub = spawn(script, { | ||||||
|         shell: true, |         shell: true, | ||||||
|         cwd: workspaceRoot, |         cwd: workspaceRoot, | ||||||
|       }); |       }); | ||||||
|       sub.stderr.on('data', (data: Buffer) => { |       sub.stderr.on('data', (data: Buffer) => { | ||||||
|         const str = data.toString(); |         const str = data.toString(); | ||||||
|         errorMessages.push(str); |  | ||||||
|         logs.push(str); |  | ||||||
|         this.logsService.recordLog( |         this.logsService.recordLog( | ||||||
|           PipelineTaskLogMessage.create(task, unit, str), |           PipelineTaskLogMessage.create(task, unit, str, true), | ||||||
|         ); |         ); | ||||||
|       }); |       }); | ||||||
|       sub.stdout.on('data', (data: Buffer) => { |       sub.stdout.on('data', (data: Buffer) => { | ||||||
|         const str = data.toString(); |         const str = data.toString(); | ||||||
|         logs.push(str); |  | ||||||
|         this.logsService.recordLog( |         this.logsService.recordLog( | ||||||
|           PipelineTaskLogMessage.create(task, unit, str), |           PipelineTaskLogMessage.create(task, unit, str, false), | ||||||
|         ); |         ); | ||||||
|       }); |       }); | ||||||
|       sub.addListener('close', (code) => { |       sub.addListener('close', (code) => { | ||||||
|         if (code === 0) { |         if (code === 0) { | ||||||
|           sub.stdout; |           return resolve(); | ||||||
|           return resolve(logs); |  | ||||||
|         } |         } | ||||||
|         return reject(new ApplicationException(errorMessages.join(''))); |         return reject(new ApplicationException('exec script failed')); | ||||||
|       }); |       }); | ||||||
|     }); |     }); | ||||||
|   } |   } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user