fix(pipeline-tasks): 修复日志有概率丢失最后几条的问题
日志记录结束前,任务信息已入库存档。
This commit is contained in:
parent
24a2f80e46
commit
246c0bd8f8
@ -223,9 +223,7 @@ describe('PipelineTaskConsumer', () => {
|
|||||||
update: jest.fn().mockImplementation(() => undefined),
|
update: jest.fn().mockImplementation(() => undefined),
|
||||||
} as unknown) as Job;
|
} as unknown) as Job;
|
||||||
|
|
||||||
const runScript = jest
|
jest.spyOn(consumer, 'runScript').mockImplementation(async () => {
|
||||||
.spyOn(consumer, 'runScript')
|
|
||||||
.mockImplementation(async () => {
|
|
||||||
throw new Error('bad message');
|
throw new Error('bad message');
|
||||||
});
|
});
|
||||||
const updateTask = jest.spyOn(tasksService, 'updateTask');
|
const updateTask = jest.spyOn(tasksService, 'updateTask');
|
||||||
|
@ -113,19 +113,30 @@ export class PipelineTaskConsumer {
|
|||||||
shell: true,
|
shell: true,
|
||||||
cwd: workspaceRoot,
|
cwd: workspaceRoot,
|
||||||
});
|
});
|
||||||
|
let loggingCount = 0; // semaphore
|
||||||
|
|
||||||
sub.stderr.on('data', (data: Buffer) => {
|
sub.stderr.on('data', (data: Buffer) => {
|
||||||
const str = data.toString();
|
const str = data.toString();
|
||||||
this.logsService.recordLog(
|
loggingCount++;
|
||||||
PipelineTaskLogMessage.create(task, unit, str, true),
|
this.logsService
|
||||||
);
|
.recordLog(PipelineTaskLogMessage.create(task, unit, str, true))
|
||||||
|
.finally(() => loggingCount--);
|
||||||
});
|
});
|
||||||
sub.stdout.on('data', (data: Buffer) => {
|
sub.stdout.on('data', (data: Buffer) => {
|
||||||
const str = data.toString();
|
const str = data.toString();
|
||||||
this.logsService.recordLog(
|
loggingCount++;
|
||||||
PipelineTaskLogMessage.create(task, unit, str, false),
|
this.logsService
|
||||||
);
|
.recordLog(PipelineTaskLogMessage.create(task, unit, str, false))
|
||||||
|
.finally(() => loggingCount--);
|
||||||
|
});
|
||||||
|
sub.addListener('close', async (code) => {
|
||||||
|
await new Promise<void>(async (resolve) => {
|
||||||
|
for (let i = 0; i < 10 && loggingCount > 0; i++) {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 500));
|
||||||
|
log('waiting logging... (%dx500ms)', i);
|
||||||
|
}
|
||||||
|
resolve();
|
||||||
});
|
});
|
||||||
sub.addListener('close', (code) => {
|
|
||||||
if (code === 0) {
|
if (code === 0) {
|
||||||
return resolve();
|
return resolve();
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user