feat_the_progress_of_tasks #5

Merged
Ivan merged 20 commits from feat_the_progress_of_tasks into master 2021-06-27 19:57:14 +08:00
10 changed files with 15310 additions and 48 deletions
Showing only changes of commit 86c8bce9ea - Show all commits

0
.vscode/launch.json vendored Normal file
View File

15263
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -41,9 +41,11 @@
"graphql-tools": "^7.0.2", "graphql-tools": "^7.0.2",
"ioredis": "^4.25.0", "ioredis": "^4.25.0",
"js-yaml": "^4.0.0", "js-yaml": "^4.0.0",
"nestjs-pino": "^1.4.0",
"nestjs-redis": "^1.2.8", "nestjs-redis": "^1.2.8",
"observable-to-async-generator": "^1.0.1-rc", "observable-to-async-generator": "^1.0.1-rc",
"pg": "^8.5.1", "pg": "^8.5.1",
"pino-pretty": "^4.7.1",
"ramda": "^0.27.1", "ramda": "^0.27.1",
"reflect-metadata": "^0.1.13", "reflect-metadata": "^0.1.13",
"rimraf": "^3.0.2", "rimraf": "^3.0.2",
@ -62,6 +64,7 @@
"@types/jest": "^26.0.15", "@types/jest": "^26.0.15",
"@types/js-yaml": "^4.0.0", "@types/js-yaml": "^4.0.0",
"@types/node": "^14.14.6", "@types/node": "^14.14.6",
"@types/pino-pretty": "^4.7.0",
"@types/supertest": "^2.0.10", "@types/supertest": "^2.0.10",
"@typescript-eslint/eslint-plugin": "^4.6.1", "@typescript-eslint/eslint-plugin": "^4.6.1",
"@typescript-eslint/parser": "^4.6.1", "@typescript-eslint/parser": "^4.6.1",

View File

@ -17,12 +17,31 @@ import { GiteaWebhooksController } from './webhooks/gitea-webhooks.controller';
import { ParseBodyMiddleware } from './commons/middlewares/parse-body.middleware'; import { ParseBodyMiddleware } from './commons/middlewares/parse-body.middleware';
import { BullModule } from '@nestjs/bull'; import { BullModule } from '@nestjs/bull';
import { PubSubModule } from './commons/pub-sub/pub-sub.module'; import { PubSubModule } from './commons/pub-sub/pub-sub.module';
import { LoggerModule } from 'nestjs-pino';
import pinoPretty from 'pino-pretty';
@Module({ @Module({
imports: [ imports: [
ConfigModule.forRoot({ ConfigModule.forRoot({
load: [configuration], load: [configuration],
}), }),
LoggerModule.forRootAsync({
imports: [ConfigModule],
useFactory: (configService: ConfigService) => {
const isDev = configService.get<'dev' | 'prod'>('env') === 'dev';
return {
pinoHttp: {
prettyPrint: isDev
? {
levelFirst: true,
}
: false,
prettifier: pinoPretty,
},
};
},
inject: [ConfigService],
}),
TypeOrmModule.forRootAsync({ TypeOrmModule.forRootAsync({
imports: [ConfigModule], imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({ useFactory: (configService: ConfigService) => ({

View File

@ -12,6 +12,7 @@ import { Project } from '../projects/project.entity';
import { TaskStatuses } from './enums/task-statuses.enum'; import { TaskStatuses } from './enums/task-statuses.enum';
import { PipelineTaskLogsService } from './pipeline-task-logs.service'; import { PipelineTaskLogsService } from './pipeline-task-logs.service';
import { ApplicationException } from '../commons/exceptions/application.exception'; import { ApplicationException } from '../commons/exceptions/application.exception';
import { getLoggerToken, PinoLogger } from 'nestjs-pino';
describe('PipelineTaskConsumer', () => { describe('PipelineTaskConsumer', () => {
let consumer: PipelineTaskConsumer; let consumer: PipelineTaskConsumer;
@ -49,6 +50,10 @@ describe('PipelineTaskConsumer', () => {
readLogsAsPipelineTaskLogs: async () => [], readLogsAsPipelineTaskLogs: async () => [],
}, },
}, },
{
provide: getLoggerToken(PipelineTaskConsumer.name),
useValue: new PinoLogger({}),
},
PipelineTaskConsumer, PipelineTaskConsumer,
], ],
}).compile(); }).compile();
@ -92,12 +97,10 @@ describe('PipelineTaskConsumer', () => {
'node one-second-work.js', 'node one-second-work.js',
join(__dirname, '../../test/data'), join(__dirname, '../../test/data'),
); );
expect(logText).toMatch(/10.+20.+30.+40.+50.+60.+70.+80.+90/s); expect(logText).toMatch(
expect(recordLog).toHaveBeenCalledTimes(10); /node one-second-work\.js.+10.+20.+30.+40.+50.+60.+70.+80.+90/s,
expect( );
((recordLog.mock.calls[8][0] as unknown) as PipelineTaskLogMessage) expect(recordLog).toHaveBeenCalled();
.message,
).toMatch(/^90/);
}); });
it('should failed and log right message', async () => { it('should failed and log right message', async () => {
await expect( await expect(

View File

@ -16,9 +16,7 @@ import { PipelineUnits } from './enums/pipeline-units.enum';
import { PipelineTaskLogMessage } from './models/pipeline-task-log-message.module'; import { PipelineTaskLogMessage } from './models/pipeline-task-log-message.module';
import { TaskStatuses } from './enums/task-statuses.enum'; import { TaskStatuses } from './enums/task-statuses.enum';
import { PipelineTaskLogsService } from './pipeline-task-logs.service'; import { PipelineTaskLogsService } from './pipeline-task-logs.service';
import debug from 'debug'; import { InjectPinoLogger, PinoLogger } from 'nestjs-pino';
const log = debug('fennec:pipeline-tasks:consumer');
@Processor(PIPELINE_TASK_QUEUE) @Processor(PIPELINE_TASK_QUEUE)
export class PipelineTaskConsumer { export class PipelineTaskConsumer {
@ -26,6 +24,8 @@ export class PipelineTaskConsumer {
private readonly service: PipelineTasksService, private readonly service: PipelineTasksService,
private readonly reposService: ReposService, private readonly reposService: ReposService,
private readonly logsService: PipelineTaskLogsService, private readonly logsService: PipelineTaskLogsService,
@InjectPinoLogger(PipelineTaskConsumer.name)
private readonly logger: PinoLogger,
) {} ) {}
@Process() @Process()
async doTask(job: Job<PipelineTask>) { async doTask(job: Job<PipelineTask>) {
@ -39,7 +39,7 @@ export class PipelineTaskConsumer {
task.startedAt = new Date(); task.startedAt = new Date();
task.status = TaskStatuses.working; task.status = TaskStatuses.working;
task = await this.service.updateTask(task); task = await this.service.updateTask(task);
log('start job'); this.logger.info({ task }, 'running task [%s].', task.id);
await job.update(task); await job.update(task);
const workspaceRoot = this.reposService.getWorkspaceRootByTask(task); const workspaceRoot = this.reposService.getWorkspaceRootByTask(task);
@ -51,26 +51,26 @@ export class PipelineTaskConsumer {
) ?? { type: type, scripts: [] }, ) ?? { type: type, scripts: [] },
); );
log('task have [%o] units', units); this.logger.info({ units }, 'begin run units.');
try { try {
for (const unit of units) { for (const unit of units) {
const unitLog = new PipelineTaskLogs(); const unitLog = new PipelineTaskLogs();
unitLog.unit = unit.type; unitLog.unit = unit.type;
unitLog.startedAt = new Date(); unitLog.startedAt = new Date();
log('curr unit is %s', unit.type); this.logger.info('curr unit is %s', unit.type);
try { try {
// 检出代码前执行 git checkout // 检出代码前执行 git checkout
if (unit.type === PipelineUnits.checkout) { if (unit.type === PipelineUnits.checkout) {
log('begin checkout'); this.logger.debug('begin checkout');
await this.reposService.checkout(task, workspaceRoot); await this.reposService.checkout(task, workspaceRoot);
unitLog.status = TaskStatuses.success; unitLog.status = TaskStatuses.success;
log('end checkout'); this.logger.debug('end checkout');
} }
for (const script of unit.scripts) { for (const script of unit.scripts) {
unitLog.logs += `[RUN SCRIPT] ${script}`; unitLog.logs += `[RUN SCRIPT] ${script}`;
log('begin runScript %s', script); this.logger.debug('begin runScript %s', script);
await this.runScript(script, workspaceRoot, task, unit.type); await this.runScript(script, workspaceRoot, task, unit.type);
log('end runScript %s', script); this.logger.debug('end runScript %s', script);
} }
unitLog.status = TaskStatuses.success; unitLog.status = TaskStatuses.success;
} catch (err) { } catch (err) {
@ -92,9 +92,10 @@ export class PipelineTaskConsumer {
} }
task.status = TaskStatuses.success; task.status = TaskStatuses.success;
this.logger.info({ task }, 'task [%s] completed.', task.id);
} catch (err) { } catch (err) {
task.status = TaskStatuses.failed; task.status = TaskStatuses.failed;
log('task is failed', err); this.logger.error({ task, error: err }, 'task [%s] failed.', task.id);
} finally { } finally {
task.endedAt = new Date(); task.endedAt = new Date();
task = await this.service.updateTask(task); task = await this.service.updateTask(task);
@ -108,6 +109,9 @@ export class PipelineTaskConsumer {
task?: PipelineTask, task?: PipelineTask,
unit?: PipelineUnits, unit?: PipelineUnits,
): Promise<void> { ): Promise<void> {
await this.logsService.recordLog(
PipelineTaskLogMessage.create(task, unit, script + '\n', false),
);
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const sub = spawn(script, { const sub = spawn(script, {
shell: true, shell: true,
@ -133,7 +137,7 @@ export class PipelineTaskConsumer {
await new Promise<void>(async (resolve) => { await new Promise<void>(async (resolve) => {
for (let i = 0; i < 10 && loggingCount > 0; i++) { for (let i = 0; i < 10 && loggingCount > 0; i++) {
await new Promise((resolve) => setTimeout(resolve, 500)); await new Promise((resolve) => setTimeout(resolve, 500));
log('waiting logging... (%dx500ms)', i); this.logger.debug('waiting logging... (%dx500ms)', i);
} }
resolve(); resolve();
}); });
@ -147,13 +151,11 @@ export class PipelineTaskConsumer {
@OnQueueCompleted() @OnQueueCompleted()
onCompleted(job: Job<PipelineTask>) { onCompleted(job: Job<PipelineTask>) {
log('queue onCompleted');
this.service.doNextTask(job.data.pipeline); this.service.doNextTask(job.data.pipeline);
} }
@OnQueueFailed() @OnQueueFailed()
onFailed(job: Job<PipelineTask>) { onFailed(job: Job<PipelineTask>) {
log('queue onFailed');
this.service.doNextTask(job.data.pipeline); this.service.doNextTask(job.data.pipeline);
} }
} }

View File

@ -21,7 +21,10 @@ export class CreateProjectInput {
comment: string; comment: string;
@Matches( @Matches(
/^(?:ssh:\/\/)?(?:[\w\d-_]+@)(?:[\w\d-_]+\.)*\w{2,10}(?::\d{1,5})?(?:\/[\w\d-_.]+)*/, /^(?:ssh:\/\/)?(?:[\w\d-_]+@)?(?:[\w\d-_]+\.)*\w{2,10}(?::\d{1,5})?(?:\/[\w\d-_.]+)*/,
{
message: 'wrong ssh url',
},
) )
@MaxLength(256) @MaxLength(256)
sshUrl: string; sshUrl: string;

View File

@ -1,5 +1,9 @@
import { InputType } from '@nestjs/graphql'; import { InputType } from '@nestjs/graphql';
import { IsUUID } from 'class-validator';
import { CreateProjectInput } from './create-project.input'; import { CreateProjectInput } from './create-project.input';
@InputType() @InputType()
export class UpdateProjectInput extends CreateProjectInput {} export class UpdateProjectInput extends CreateProjectInput {
@IsUUID()
id: string;
}

View File

@ -8,12 +8,12 @@ import { ProjectsService } from './projects.service';
export class ProjectsResolver { export class ProjectsResolver {
constructor(private readonly service: ProjectsService) {} constructor(private readonly service: ProjectsService) {}
@Query(() => [Project]) @Query(() => [Project])
async findProjects() { async projects() {
return await this.service.list(); return await this.service.list();
} }
@Query(() => Project) @Query(() => Project)
async findProject(@Args('id', { type: () => String }) id: string) { async project(@Args('id', { type: () => String }) id: string) {
return await this.service.findOne(id); return await this.service.findOne(id);
} }
@ -26,18 +26,17 @@ export class ProjectsResolver {
} }
@Mutation(() => Project) @Mutation(() => Project)
async modifyProject( async updateProject(
@Args('id', { type: () => String }) id: string,
@Args('project', { type: () => UpdateProjectInput }) @Args('project', { type: () => UpdateProjectInput })
dto: UpdateProjectInput, dto: UpdateProjectInput,
) { ) {
const tmp = await this.service.update(id, dto); const tmp = await this.service.update(dto);
console.log(tmp); console.log(tmp);
return tmp; return tmp;
} }
@Mutation(() => Number) @Mutation(() => Number)
async deleteProject(@Args('id', { type: () => String }) id: string) { async removeProject(@Args('id', { type: () => String }) id: string) {
return await this.service.remove(id); return await this.service.remove(id);
} }
} }

View File

@ -25,9 +25,9 @@ export class ProjectsService extends BaseDbService<Project> {
return await this.repository.save(this.repository.create(dto)); return await this.repository.save(this.repository.create(dto));
} }
async update(id: string, dto: UpdateProjectInput) { async update(dto: UpdateProjectInput) {
await this.isDuplicateEntityForUpdate(id, dto); await this.isDuplicateEntityForUpdate(dto.id, dto);
const old = await this.findOne(id); const old = await this.findOne(dto.id);
return await this.repository.save(this.repository.merge(old, dto)); return await this.repository.save(this.repository.merge(old, dto));
} }