refactor(pipeline-tasks-runner): rabbitmq

This commit is contained in:
Ivan Li
2021-05-23 17:51:14 +08:00
parent 4041a6fd2a
commit b3a2b11db9
12 changed files with 982 additions and 23 deletions

View File

@ -0,0 +1,253 @@
import { ReposService } from '../repos/repos.service';
import { spawn, ChildProcessWithoutNullStreams } from 'child_process';
import { PipelineTask } from './pipeline-task.entity';
import { ApplicationException } from '../commons/exceptions/application.exception';
import { PipelineUnits } from './enums/pipeline-units.enum';
import { TaskStatuses } from './enums/task-statuses.enum';
import { InjectPinoLogger, PinoLogger } from 'nestjs-pino';
import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
import { PipelineTaskEvent } from './models/pipeline-task-event';
import { last } from 'ramda';
import { Inject } from '@nestjs/common';
type Spawn = typeof spawn;
export class PipelineTaskRunner {
readonly processes = new Map<string, ChildProcessWithoutNullStreams>();
constructor(
private readonly reposService: ReposService,
@InjectPinoLogger(PipelineTaskRunner.name)
private readonly logger: PinoLogger,
@Inject('spawn')
private readonly spawn: Spawn,
) {}
@RabbitSubscribe({
exchange: 'new-pipeline-task',
routingKey: 'mac',
queue: 'mac.new-pipeline-task',
})
async onNewTask(task: PipelineTask) {
this.logger.info({ task }, 'on new task [%s].', task.id);
try {
await this.doTask(task);
} catch (err) {
this.logger.error({ task, err }, err.message);
}
}
@RabbitSubscribe({
exchange: 'stop-pipeline-task',
routingKey: 'mac',
queue: 'mac.stop-pipeline-task',
})
async onStopTask(task: PipelineTask) {
this.logger.info({ task }, 'on stop task [%s].', task.id);
const process = this.processes.get(task.id);
if (process) {
this.logger.info({ task }, 'send signal SIGINT to child process.');
process.kill('SIGINT');
setTimeout(() => {
if (process === this.processes.get(task.id)) {
this.logger.info({ task }, 'send signal SIGKILL to child process.');
process.kill('SIGKILL');
return;
}
if (this.processes.has(task.id)) {
this.logger.error(
{ task },
'this pipeline task not stop yet. there is a new process running, maybe is a bug about error capture',
);
}
}, 10_000);
} else {
this.logger.info({ task }, 'child process is not running.');
}
}
async doTask(task: PipelineTask) {
if (task.pipeline.workUnitMetadata.version !== 1) {
throw new ApplicationException(
'work unit metadata version is not match.',
);
}
await this.emitEvent(
task,
null,
TaskStatuses.working,
`[start task]`,
'stdout',
);
this.logger.info('running task [%s].', task.id);
try {
const workspaceRoot = await this.checkout(task);
const units = task.units
.filter((unit) => unit !== PipelineUnits.checkout)
.map(
(type) =>
task.pipeline.workUnitMetadata.units.find(
(unit) => unit.type === type,
) ?? { type: type, scripts: [] },
);
this.logger.info({ units }, 'begin run units.');
for (const unit of units) {
await this.doTaskUnit(unit.type, unit.scripts, task, workspaceRoot);
}
await this.emitEvent(
task,
null,
TaskStatuses.success,
`[finished task] success`,
'stdout',
);
this.logger.info({ task }, 'task [%s] completed.', task.id);
} catch (err) {
await this.emitEvent(
task,
null,
TaskStatuses.failed,
`[finished unit] ${err.message}`,
'stderr',
);
this.logger.error({ task, error: err }, 'task [%s] failed.', task.id);
} finally {
}
}
async doTaskUnit(
unit: PipelineUnits,
scripts: string[],
task: PipelineTask,
workspaceRoot: string,
) {
await this.emitEvent(
task,
unit,
TaskStatuses.working,
`[begin unit] ${unit}`,
'stdin',
);
this.logger.info({ task }, 'curr unit is %s', unit);
try {
for (const script of scripts) {
this.logger.debug('begin runScript %s', script);
await this.runScript(script, workspaceRoot, task, unit);
this.logger.debug('end runScript %s', script);
}
await this.emitEvent(
task,
unit,
TaskStatuses.success,
`[finished unit] ${unit}`,
'stdout',
);
} catch (err) {
await this.emitEvent(
task,
unit,
TaskStatuses.failed,
`[finished unit] ${err.message}`,
'stderr',
);
throw err;
}
}
async checkout(task: PipelineTask) {
await this.emitEvent(
task,
PipelineUnits.checkout,
TaskStatuses.working,
'[begin unit] checkout',
'stdin',
);
try {
const path = await this.reposService.checkout4Task(task);
await this.emitEvent(
task,
PipelineUnits.checkout,
TaskStatuses.success,
'checkout success.',
'stdout',
);
return path;
} catch (err) {
await this.emitEvent(
task,
PipelineUnits.checkout,
TaskStatuses.failed,
'checkout failed.',
'stderr',
);
}
}
async emitEvent(
task: PipelineTask,
unit: PipelineUnits | null,
status: TaskStatuses,
message: string,
messageType: 'stderr' | 'stdout' | 'stdin',
) {
const event: PipelineTaskEvent = {
taskId: task.id,
pipelineId: task.pipeline.id,
projectId: task.pipeline.project.id,
unit,
emittedAt: new Date(),
message: last(message) === '\n' ? message : message + '\n',
messageType,
status,
};
}
async runScript(
script: string,
workspaceRoot: string,
task: PipelineTask,
unit: PipelineUnits,
): Promise<void> {
await this.emitEvent(task, unit, TaskStatuses.working, script, 'stdin');
return new Promise((resolve, reject) => {
const sub = this.spawn(script, {
shell: true,
cwd: workspaceRoot,
});
this.processes.set(task.id, sub);
let loggingCount = 0; // semaphore
sub.stderr.on('data', (data: Buffer) => {
const str = data.toString();
loggingCount++;
this.emitEvent(task, unit, TaskStatuses.working, str, 'stdout').finally(
() => loggingCount--,
);
});
sub.stdout.on('data', (data: Buffer) => {
const str = data.toString();
loggingCount++;
this.emitEvent(task, unit, TaskStatuses.working, str, 'stderr').finally(
() => loggingCount--,
);
});
sub.addListener('close', async (code) => {
this.processes.delete(task.id);
await new Promise<void>(async (resolve) => {
for (let i = 0; i < 10 && loggingCount > 0; i++) {
await new Promise((resolve) => setTimeout(resolve, 500));
this.logger.debug('waiting logging... (%dx500ms)', i);
}
resolve();
});
if (code === 0) {
return resolve();
}
return reject(new ApplicationException('exec script failed'));
});
});
}
}