feat(pipeline-tasks): debug log.

This commit is contained in:
Ivan
2021-03-25 18:09:45 +08:00
parent 607a4f57de
commit 713f5b2426
5 changed files with 203 additions and 28 deletions

View File

@ -51,7 +51,7 @@ import { RedisModule } from 'nestjs-redis';
host: configService.get<string>('db.redis.host', 'localhost'),
port: configService.get<number>('db.redis.port', 6379),
password: configService.get<string>('db.redis.password', ''),
keyPrefix: configService.get<string>('db.redis.prefix', 'fennec'),
keyPrefix: configService.get<string>('db.redis.prefix', 'fennec') + ':',
}),
inject: [ConfigService],
}),

View File

@ -16,6 +16,10 @@ import { PipelineUnits } from './enums/pipeline-units.enum';
import { PipelineTaskLogMessage } from './models/pipeline-task-log-message.module';
import { TaskStatuses } from './enums/task-statuses.enum';
import { PipelineTaskLogsService } from './pipeline-task-logs.service';
import debug from 'debug';
const log = debug('fennec:pipeline-tasks:consumer');
@Processor(PIPELINE_TASK_QUEUE)
export class PipelineTaskConsumer {
constructor(
@ -35,6 +39,7 @@ export class PipelineTaskConsumer {
task.startedAt = new Date();
task.status = TaskStatuses.working;
task = await this.service.updateTask(task);
log('start job');
await job.update(task);
const workspaceRoot = this.reposService.getWorkspaceRootByTask(task);
@ -46,20 +51,26 @@ export class PipelineTaskConsumer {
) ?? { type: type, scripts: [] },
);
log('task have [%o] units', units);
try {
for (const unit of units) {
const unitLog = new PipelineTaskLogs();
unitLog.unit = unit.type;
unitLog.startedAt = new Date();
log('curr unit is %s', unit.type);
try {
// 检出代码前执行 git checkout
if (unit.type === PipelineUnits.checkout) {
log('begin checkout');
await this.reposService.checkout(task, workspaceRoot);
unitLog.status = TaskStatuses.success;
log('end checkout');
}
for (const script of unit.scripts) {
unitLog.logs += `[RUN SCRIPT] ${script}`;
log('begin runScript %s', script);
await this.runScript(script, workspaceRoot, task, unit.type);
log('end runScript %s', script);
}
unitLog.status = TaskStatuses.success;
} catch (err) {
@ -82,7 +93,7 @@ export class PipelineTaskConsumer {
task.status = TaskStatuses.success;
} catch (err) {
task.status = TaskStatuses.failed;
console.log(err);
log('task is failed', err);
} finally {
task.endedAt = new Date();
task = await this.service.updateTask(task);
@ -124,11 +135,13 @@ export class PipelineTaskConsumer {
@OnQueueCompleted()
onCompleted(job: Job<PipelineTask>) {
log('queue onCompleted');
this.service.doNextTask(job.data.pipeline);
}
@OnQueueFailed()
onFailed(job: Job<PipelineTask>) {
log('queue onFailed');
this.service.doNextTask(job.data.pipeline);
}
}

View File

@ -12,6 +12,9 @@ import { LockFailedException } from '../commons/exceptions/lock-failed.exception
import { PubSub } from 'apollo-server-express';
import { TaskStatuses } from './enums/task-statuses.enum';
import { isNil } from 'ramda';
import debug from 'debug';
const log = debug('fennec:pipeline-tasks:service');
@Injectable()
export class PipelineTasksService {
@ -30,23 +33,30 @@ export class PipelineTasksService {
where: { id: dto.pipelineId },
relations: ['project'],
});
// const hasUnfinishedTask = await this.repository
// .findOne({
// pipelineId: pipeline.id,
// status: In([TaskStatuses.pending, TaskStatuses.working]),
// })
// .then((val) => !isNil(val));
// if (hasUnfinishedTask) {
// throw new ConflictException(
// 'there are still unfinished task in the current pipeline.',
// );
// }
const hasUnfinishedTask = await this.repository
.findOne({
pipelineId: dto.pipelineId,
commit: dto.commit,
status: In([TaskStatuses.pending, TaskStatuses.working]),
})
.then((val) => !isNil(val));
if (hasUnfinishedTask) {
throw new ConflictException(
'There are the same tasks among the unfinished tasks!',
);
}
const task = await this.repository.save(this.repository.create(dto));
task.pipeline = pipeline;
const tasksKey = this.getRedisTokens(pipeline)[1];
const redis = this.redis.getClient();
await redis.lpush(tasksKey, JSON.stringify(task));
log(
'add task %s:%s-%s',
task.id,
task.pipeline.branch,
task.commit.slice(0, 6),
);
await this.doNextTask(pipeline);
return task;
}
@ -63,6 +73,7 @@ export class PipelineTasksService {
const [lckKey, tasksKey] = this.getRedisTokens(pipeline);
const redis = this.redis.getClient();
log('doNextTask()');
const unLck = await new Promise<() => Promise<void>>(
async (resolve, reject) => {
const lckValue = Date.now().toString();
@ -90,7 +101,15 @@ export class PipelineTasksService {
(await redis.rpop(tasksKey).finally(() => unLck())) ?? 'null',
);
if (task) {
log(
'add task (%s:%s-%s) to queue',
task.id,
task.pipeline.branch,
task.commit.slice(0, 6),
);
await this.queue.add(task);
} else {
log('task is empty');
}
}