diff --git a/src/pipeline-tasks/pipeline-task.consumer.spec.ts b/src/pipeline-tasks/pipeline-task.consumer.spec.ts index 8418d0f..453059d 100644 --- a/src/pipeline-tasks/pipeline-task.consumer.spec.ts +++ b/src/pipeline-tasks/pipeline-task.consumer.spec.ts @@ -223,11 +223,9 @@ describe('PipelineTaskConsumer', () => { update: jest.fn().mockImplementation(() => undefined), } as unknown) as Job; - const runScript = jest - .spyOn(consumer, 'runScript') - .mockImplementation(async () => { - throw new Error('bad message'); - }); + jest.spyOn(consumer, 'runScript').mockImplementation(async () => { + throw new Error('bad message'); + }); const updateTask = jest.spyOn(tasksService, 'updateTask'); await consumer.doTask(job); diff --git a/src/pipeline-tasks/pipeline-task.consumer.ts b/src/pipeline-tasks/pipeline-task.consumer.ts index 72d2d17..8439718 100644 --- a/src/pipeline-tasks/pipeline-task.consumer.ts +++ b/src/pipeline-tasks/pipeline-task.consumer.ts @@ -113,19 +113,30 @@ export class PipelineTaskConsumer { shell: true, cwd: workspaceRoot, }); + let loggingCount = 0; // semaphore + sub.stderr.on('data', (data: Buffer) => { const str = data.toString(); - this.logsService.recordLog( - PipelineTaskLogMessage.create(task, unit, str, true), - ); + loggingCount++; + this.logsService + .recordLog(PipelineTaskLogMessage.create(task, unit, str, true)) + .finally(() => loggingCount--); }); sub.stdout.on('data', (data: Buffer) => { const str = data.toString(); - this.logsService.recordLog( - PipelineTaskLogMessage.create(task, unit, str, false), - ); + loggingCount++; + this.logsService + .recordLog(PipelineTaskLogMessage.create(task, unit, str, false)) + .finally(() => loggingCount--); }); - sub.addListener('close', (code) => { + sub.addListener('close', async (code) => { + await new Promise(async (resolve) => { + for (let i = 0; i < 10 && loggingCount > 0; i++) { + await new Promise((resolve) => setTimeout(resolve, 500)); + log('waiting logging... (%dx500ms)', i); + } + resolve(); + }); if (code === 0) { return resolve(); }