From 246c0bd8f82b44e943002273313ec064ddeacec7 Mon Sep 17 00:00:00 2001 From: Ivan Li Date: Tue, 6 Apr 2021 22:54:51 +0800 Subject: [PATCH] =?UTF-8?q?fix(pipeline-tasks):=20=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E6=9C=89=E6=A6=82=E7=8E=87=E4=B8=A2=E5=A4=B1?= =?UTF-8?q?=E6=9C=80=E5=90=8E=E5=87=A0=E6=9D=A1=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 日志记录结束前,任务信息已入库存档。 --- .../pipeline-task.consumer.spec.ts | 8 +++--- src/pipeline-tasks/pipeline-task.consumer.ts | 25 +++++++++++++------ 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/src/pipeline-tasks/pipeline-task.consumer.spec.ts b/src/pipeline-tasks/pipeline-task.consumer.spec.ts index 8418d0f..453059d 100644 --- a/src/pipeline-tasks/pipeline-task.consumer.spec.ts +++ b/src/pipeline-tasks/pipeline-task.consumer.spec.ts @@ -223,11 +223,9 @@ describe('PipelineTaskConsumer', () => { update: jest.fn().mockImplementation(() => undefined), } as unknown) as Job; - const runScript = jest - .spyOn(consumer, 'runScript') - .mockImplementation(async () => { - throw new Error('bad message'); - }); + jest.spyOn(consumer, 'runScript').mockImplementation(async () => { + throw new Error('bad message'); + }); const updateTask = jest.spyOn(tasksService, 'updateTask'); await consumer.doTask(job); diff --git a/src/pipeline-tasks/pipeline-task.consumer.ts b/src/pipeline-tasks/pipeline-task.consumer.ts index 72d2d17..8439718 100644 --- a/src/pipeline-tasks/pipeline-task.consumer.ts +++ b/src/pipeline-tasks/pipeline-task.consumer.ts @@ -113,19 +113,30 @@ export class PipelineTaskConsumer { shell: true, cwd: workspaceRoot, }); + let loggingCount = 0; // semaphore + sub.stderr.on('data', (data: Buffer) => { const str = data.toString(); - this.logsService.recordLog( - PipelineTaskLogMessage.create(task, unit, str, true), - ); + loggingCount++; + this.logsService + .recordLog(PipelineTaskLogMessage.create(task, unit, str, true)) + .finally(() => loggingCount--); }); sub.stdout.on('data', (data: Buffer) => { const str = data.toString(); - this.logsService.recordLog( - PipelineTaskLogMessage.create(task, unit, str, false), - ); + loggingCount++; + this.logsService + .recordLog(PipelineTaskLogMessage.create(task, unit, str, false)) + .finally(() => loggingCount--); }); - sub.addListener('close', (code) => { + sub.addListener('close', async (code) => { + await new Promise(async (resolve) => { + for (let i = 0; i < 10 && loggingCount > 0; i++) { + await new Promise((resolve) => setTimeout(resolve, 500)); + log('waiting logging... (%dx500ms)', i); + } + resolve(); + }); if (code === 0) { return resolve(); }