Compare commits

..

3 Commits

Author SHA1 Message Date
Ivan Li
ead32a1204 feat(pipeline-task): flush service. 2021-06-06 22:21:03 +08:00
Ivan Li
20612d4301 feat(pipeline-task): add pipelineTaskEvent api. 2021-06-06 08:40:25 +08:00
Ivan Li
7091f9df6a feat(pipeline-task): logger. 2021-06-05 19:11:39 +08:00
13 changed files with 338 additions and 10 deletions

View File

@ -11,3 +11,5 @@ registerEnumType(TaskStatuses, {
name: 'TaskStatuses', name: 'TaskStatuses',
description: '任务状态', description: '任务状态',
}); });
export const terminalTaskStatuses = [TaskStatuses.success, TaskStatuses.failed];

View File

@ -1,13 +1,25 @@
import { Field, ObjectType } from '@nestjs/graphql';
import { PipelineUnits } from '../enums/pipeline-units.enum'; import { PipelineUnits } from '../enums/pipeline-units.enum';
import { TaskStatuses } from '../enums/task-statuses.enum'; import { TaskStatuses } from '../enums/task-statuses.enum';
import { Type } from 'class-transformer';
@ObjectType()
export class PipelineTaskEvent { export class PipelineTaskEvent {
@Field()
taskId: string; taskId: string;
@Field()
pipelineId: string; pipelineId: string;
@Field()
projectId: string; projectId: string;
@Field(() => PipelineUnits, { nullable: true })
unit: PipelineUnits | null; unit: PipelineUnits | null;
@Field()
@Type(() => Date)
emittedAt: Date; emittedAt: Date;
@Field()
message: string; message: string;
@Field()
messageType: 'stdout' | 'stderr' | 'stdin'; messageType: 'stdout' | 'stderr' | 'stdin';
@Field(() => TaskStatuses)
status: TaskStatuses; status: TaskStatuses;
} }

View File

@ -0,0 +1,80 @@
import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
import { Test, TestingModule } from '@nestjs/testing';
import { RedisService } from 'nestjs-redis';
import { PipelineTaskFlushService } from './pipeline-task-flush.service';
import { PipelineTaskEvent } from './models/pipeline-task-event';
import { TaskStatuses } from './enums/task-statuses.enum';
import {
EXCHANGE_PIPELINE_TASK_TOPIC,
ROUTE_PIPELINE_TASK_DONE,
} from './pipeline-tasks.constants';
describe('PipelineTaskFlushService', () => {
let service: PipelineTaskFlushService;
let redisService: RedisService;
let amqpConnection: AmqpConnection;
beforeEach(async () => {
const redisClient = {
rpush: jest.fn(() => Promise.resolve()),
lrange: jest.fn(() => Promise.resolve()),
};
const module: TestingModule = await Test.createTestingModule({
providers: [
PipelineTaskFlushService,
{
provide: RedisService,
useValue: {
getClient() {
return redisClient;
},
},
},
{
provide: AmqpConnection,
useValue: {
request: jest.fn(() => Promise.resolve()),
},
},
],
}).compile();
service = module.get<PipelineTaskFlushService>(PipelineTaskFlushService);
redisService = module.get<RedisService>(RedisService);
amqpConnection = module.get<AmqpConnection>(AmqpConnection);
});
it('should be defined', () => {
expect(service).toBeDefined();
});
describe('write', () => {
it('normal', async () => {
const testEvent = new PipelineTaskEvent();
testEvent.taskId = 'test';
testEvent.status = TaskStatuses.working;
const rpush = jest.spyOn(redisService.getClient(), 'rpush');
const request = jest.spyOn(amqpConnection, 'request');
await service.write(testEvent);
expect(rpush).toBeCalledTimes(1);
expect(rpush.mock.calls[0][0]).toEqual('p-task:log:test');
expect(rpush.mock.calls[0][1]).toEqual(JSON.stringify(testEvent));
expect(request).toBeCalledTimes(0);
});
it('event for which task done', async () => {
const testEvent = new PipelineTaskEvent();
testEvent.taskId = 'test';
testEvent.status = TaskStatuses.success;
const rpush = jest.spyOn(redisService.getClient(), 'rpush');
const request = jest.spyOn(amqpConnection, 'request');
await service.write(testEvent);
expect(rpush).toBeCalledTimes(1);
expect(request).toBeCalledTimes(1);
expect(request.mock.calls[0][0]).toMatchObject({
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
routingKey: ROUTE_PIPELINE_TASK_DONE,
payload: { taskId: 'test', status: TaskStatuses.success },
});
});
});
});

View File

@ -0,0 +1,57 @@
import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
import { Injectable } from '@nestjs/common';
import { deserialize } from 'class-transformer';
import { RedisService } from 'nestjs-redis';
import { isNil } from 'ramda';
import { getSelfInstanceQueueKey } from '../commons/utils/rabbit-mq';
import { terminalTaskStatuses } from './enums/task-statuses.enum';
import { PipelineTaskEvent } from './models/pipeline-task-event';
import {
EXCHANGE_PIPELINE_TASK_TOPIC,
ROUTE_PIPELINE_TASK_DONE,
} from './pipeline-tasks.constants';
import {
EXCHANGE_PIPELINE_TASK_FANOUT,
ROUTE_PIPELINE_TASK_LOG,
QUEUE_WRITE_PIPELINE_TASK_LOG,
} from './pipeline-tasks.constants';
@Injectable()
export class PipelineTaskFlushService {
constructor(
private readonly redisService: RedisService,
private readonly amqpConnection: AmqpConnection,
) {}
@RabbitSubscribe({
exchange: EXCHANGE_PIPELINE_TASK_FANOUT,
routingKey: ROUTE_PIPELINE_TASK_LOG,
queue: getSelfInstanceQueueKey(QUEUE_WRITE_PIPELINE_TASK_LOG),
queueOptions: {
autoDelete: true,
},
})
async write(message: PipelineTaskEvent) {
await this.redisService
.getClient()
.rpush(this.getKey(message.taskId), JSON.stringify(message));
if (isNil(message.unit) && terminalTaskStatuses.includes(message.status)) {
this.amqpConnection.request({
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
routingKey: ROUTE_PIPELINE_TASK_DONE,
payload: { taskId: message.taskId, status: message.status },
});
}
}
async read(taskId: string) {
const raw = await this.redisService
.getClient()
.lrange(this.getKey(taskId), 0, -1);
return raw.map((it) => deserialize(PipelineTaskEvent, it));
}
private getKey(taskId: string) {
return `p-task:log:${taskId}`;
}
}

View File

@ -0,0 +1,74 @@
import { Test, TestingModule } from '@nestjs/testing';
import { PipelineTaskLogger } from './pipeline-task.logger';
import { PipelineTaskEvent } from './models/pipeline-task-event';
import { take, timeout } from 'rxjs/operators';
describe('PipelineTaskRunner', () => {
let logger: PipelineTaskLogger;
let module: TestingModule;
beforeEach(async () => {
module = await Test.createTestingModule({
providers: [PipelineTaskLogger],
}).compile();
logger = module.get(PipelineTaskLogger);
});
it('should be defined', () => {
expect(logger).toBeDefined();
});
describe('getMessage$', () => {
it('normal', async () => {
const event = new PipelineTaskEvent();
event.taskId = 'test';
const emittedAt = new Date();
event.emittedAt = emittedAt.toISOString() as any;
const message$ = logger.getMessage$('test');
let receiveEvent;
message$.pipe(take(1)).subscribe((value) => (receiveEvent = value));
await logger.handleEvent(event);
expect(receiveEvent).toMatchObject({
...event,
emittedAt,
});
});
it('no match', async () => {
const event = new PipelineTaskEvent();
event.taskId = 'test';
const message$ = logger.getMessage$('other');
setTimeout(() => {
logger.handleEvent(event);
});
expect(message$.pipe(take(1), timeout(100)).toPromise()).rejects.toMatch(
'timeout',
);
});
it('multiple subscribers', async () => {
const event = new PipelineTaskEvent();
event.taskId = 'test';
const message$ = logger.getMessage$('test');
const message2$ = logger.getMessage$('test');
setTimeout(() => {
logger.handleEvent(event);
});
expect(message$.pipe(take(1), timeout(100)).toPromise()).resolves.toEqual(
event,
);
expect(
message2$.pipe(take(1), timeout(100)).toPromise(),
).resolves.toEqual(event);
});
});
describe('onModuleDestroy', () => {
it('complete observable when destroying module', async () => {
logger.onModuleDestroy();
await expect(
(logger as any).message$.toPromise(),
).resolves.toBeUndefined();
});
});
});

View File

@ -0,0 +1,37 @@
import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
import { Injectable, OnModuleDestroy } from '@nestjs/common';
import { plainToClass } from 'class-transformer';
import { Observable, Subject } from 'rxjs';
import { filter } from 'rxjs/operators';
import { PipelineTaskEvent } from './models/pipeline-task-event';
import {
EXCHANGE_PIPELINE_TASK_FANOUT,
QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT,
ROUTE_PIPELINE_TASK_LOG,
} from './pipeline-tasks.constants';
@Injectable()
export class PipelineTaskLogger implements OnModuleDestroy {
private readonly messageSubject = new Subject<PipelineTaskEvent>();
private readonly message$: Observable<PipelineTaskEvent> = this.messageSubject.pipe();
@RabbitSubscribe({
exchange: EXCHANGE_PIPELINE_TASK_FANOUT,
routingKey: ROUTE_PIPELINE_TASK_LOG,
queue: QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT,
queueOptions: {
autoDelete: true,
},
})
async handleEvent(message: PipelineTaskEvent) {
this.messageSubject.next(plainToClass(PipelineTaskEvent, message));
}
getMessage$(taskId: string) {
return this.message$.pipe(filter((event) => event.taskId === taskId));
}
onModuleDestroy() {
this.messageSubject.complete();
}
}

View File

@ -8,6 +8,7 @@ import { TaskStatuses } from './enums/task-statuses.enum';
import { getLoggerToken, PinoLogger } from 'nestjs-pino'; import { getLoggerToken, PinoLogger } from 'nestjs-pino';
import { PipelineTaskRunner } from './pipeline-task.runner'; import { PipelineTaskRunner } from './pipeline-task.runner';
import { WorkUnitMetadata } from './models/work-unit-metadata.model'; import { WorkUnitMetadata } from './models/work-unit-metadata.model';
import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
describe('PipelineTaskRunner', () => { describe('PipelineTaskRunner', () => {
let runner: PipelineTaskRunner; let runner: PipelineTaskRunner;
let reposService: ReposService; let reposService: ReposService;
@ -31,6 +32,10 @@ describe('PipelineTaskRunner', () => {
useValue: () => undefined, useValue: () => undefined,
}, },
PipelineTaskRunner, PipelineTaskRunner,
{
provide: AmqpConnection,
useValue: {},
},
], ],
}).compile(); }).compile();

View File

@ -5,10 +5,14 @@ import { ApplicationException } from '../commons/exceptions/application.exceptio
import { PipelineUnits } from './enums/pipeline-units.enum'; import { PipelineUnits } from './enums/pipeline-units.enum';
import { TaskStatuses } from './enums/task-statuses.enum'; import { TaskStatuses } from './enums/task-statuses.enum';
import { InjectPinoLogger, PinoLogger } from 'nestjs-pino'; import { InjectPinoLogger, PinoLogger } from 'nestjs-pino';
import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'; import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
import { PipelineTaskEvent } from './models/pipeline-task-event'; import { PipelineTaskEvent } from './models/pipeline-task-event';
import { last } from 'ramda'; import { last } from 'ramda';
import { Inject } from '@nestjs/common'; import { Inject } from '@nestjs/common';
import {
EXCHANGE_PIPELINE_TASK_FANOUT,
ROUTE_PIPELINE_TASK_LOG,
} from './pipeline-tasks.constants';
type Spawn = typeof spawn; type Spawn = typeof spawn;
@ -21,6 +25,7 @@ export class PipelineTaskRunner {
private readonly logger: PinoLogger, private readonly logger: PinoLogger,
@Inject('spawn') @Inject('spawn')
private readonly spawn: Spawn, private readonly spawn: Spawn,
private readonly amqpConnection: AmqpConnection,
) {} ) {}
@RabbitSubscribe({ @RabbitSubscribe({
exchange: 'new-pipeline-task', exchange: 'new-pipeline-task',
@ -201,6 +206,15 @@ export class PipelineTaskRunner {
messageType, messageType,
status, status,
}; };
this.amqpConnection
.publish(EXCHANGE_PIPELINE_TASK_FANOUT, ROUTE_PIPELINE_TASK_LOG, event)
.catch((error) => {
this.logger.error(
{ error, event },
'send event message to queue failed. %s',
error.message,
);
});
} }
async runScript( async runScript(

View File

@ -0,0 +1,6 @@
export const EXCHANGE_PIPELINE_TASK_TOPIC = 'pipeline-task.topic';
export const EXCHANGE_PIPELINE_TASK_FANOUT = 'pipeline-task.fanout';
export const ROUTE_PIPELINE_TASK_LOG = 'pipeline-task-log';
export const QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT = 'pipeline-task-log';
export const QUEUE_WRITE_PIPELINE_TASK_LOG = 'write-pipeline-task-log';
export const ROUTE_PIPELINE_TASK_DONE = 'pipeline-task-done';

View File

@ -10,6 +10,12 @@ import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq';
import { ConfigModule, ConfigService } from '@nestjs/config'; import { ConfigModule, ConfigService } from '@nestjs/config';
import { PipelineTaskRunner } from './pipeline-task.runner'; import { PipelineTaskRunner } from './pipeline-task.runner';
import { spawn } from 'child_process'; import { spawn } from 'child_process';
import {
EXCHANGE_PIPELINE_TASK_FANOUT,
EXCHANGE_PIPELINE_TASK_TOPIC,
} from './pipeline-tasks.constants';
import { PipelineTaskLogger } from './pipeline-task.logger';
import { PipelineTaskFlushService } from './pipeline-task-flush.service';
@Module({ @Module({
imports: [ imports: [
@ -27,6 +33,7 @@ import { spawn } from 'child_process';
type: 'fanout', type: 'fanout',
options: { options: {
durable: true, durable: true,
autoDelete: true,
}, },
}, },
{ {
@ -34,6 +41,7 @@ import { spawn } from 'child_process';
type: 'fanout', type: 'fanout',
options: { options: {
durable: true, durable: true,
autoDelete: true,
}, },
}, },
{ {
@ -41,6 +49,23 @@ import { spawn } from 'child_process';
type: 'fanout', type: 'fanout',
options: { options: {
durable: false, durable: false,
autoDelete: true,
},
},
{
name: EXCHANGE_PIPELINE_TASK_FANOUT,
type: 'fanout',
options: {
durable: false,
autoDelete: true,
},
},
{
name: EXCHANGE_PIPELINE_TASK_TOPIC,
type: 'topic',
options: {
durable: false,
autoDelete: true,
}, },
}, },
], ],
@ -52,10 +77,12 @@ import { spawn } from 'child_process';
PipelineTasksService, PipelineTasksService,
PipelineTasksResolver, PipelineTasksResolver,
PipelineTaskRunner, PipelineTaskRunner,
PipelineTaskLogger,
{ {
provide: 'spawn', provide: 'spawn',
useValue: spawn, useValue: spawn,
}, },
PipelineTaskFlushService,
], ],
exports: [PipelineTasksService], exports: [PipelineTasksService],
}) })

View File

@ -1,4 +1,5 @@
import { Test, TestingModule } from '@nestjs/testing'; import { Test, TestingModule } from '@nestjs/testing';
import { PipelineTaskLogger } from './pipeline-task.logger';
import { PipelineTasksResolver } from './pipeline-tasks.resolver'; import { PipelineTasksResolver } from './pipeline-tasks.resolver';
import { PipelineTasksService } from './pipeline-tasks.service'; import { PipelineTasksService } from './pipeline-tasks.service';
@ -13,6 +14,10 @@ describe('PipelineTasksResolver', () => {
provide: PipelineTasksService, provide: PipelineTasksService,
useValue: {}, useValue: {},
}, },
{
provide: PipelineTaskLogger,
useValue: {},
},
], ],
}).compile(); }).compile();

View File

@ -2,29 +2,35 @@ import { Resolver, Args, Mutation, Subscription, Query } from '@nestjs/graphql';
import { PipelineTask } from './pipeline-task.entity'; import { PipelineTask } from './pipeline-task.entity';
import { PipelineTasksService } from './pipeline-tasks.service'; import { PipelineTasksService } from './pipeline-tasks.service';
import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input'; import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input';
import { PipelineTaskLogMessage } from './models/pipeline-task-log-message.module';
import { PipelineTaskLogArgs } from './dtos/pipeline-task-log.args'; import { PipelineTaskLogArgs } from './dtos/pipeline-task-log.args';
import { plainToClass } from 'class-transformer'; import { plainToClass } from 'class-transformer';
import { PipelineTaskLogger } from './pipeline-task.logger';
import { observableToAsyncIterable } from '@graphql-tools/utils';
import { PipelineTaskEvent } from './models/pipeline-task-event';
@Resolver() @Resolver()
export class PipelineTasksResolver { export class PipelineTasksResolver {
constructor(private readonly service: PipelineTasksService) {} constructor(
private readonly service: PipelineTasksService,
private readonly taskLogger: PipelineTaskLogger,
) {}
@Mutation(() => PipelineTask) @Mutation(() => PipelineTask)
async createPipelineTask(@Args('task') taskDto: CreatePipelineTaskInput) { async createPipelineTask(@Args('task') taskDto: CreatePipelineTaskInput) {
return await this.service.addTask(taskDto); return await this.service.addTask(taskDto);
} }
@Subscription(() => PipelineTaskLogMessage, { @Subscription(() => PipelineTaskEvent, {
resolve: (value) => { resolve: (value) => {
const data = plainToClass(PipelineTaskLogMessage, value); const data = plainToClass(PipelineTaskEvent, value);
return data; return data;
}, },
}) })
async pipelineTaskLog(@Args() args: PipelineTaskLogArgs) { async pipelineTaskEvent(@Args() args: PipelineTaskLogArgs) {
// const task = await this.service.findTaskById(args.taskId); const task = await this.service.findTaskById(args.taskId);
// const asyncIterator = this.logsService.watchLogs(task); return observableToAsyncIterable<PipelineTaskEvent>(
// return asyncIterator; this.taskLogger.getMessage$(task.id),
);
} }
@Subscription(() => PipelineTask, { @Subscription(() => PipelineTask, {

View File

@ -51,7 +51,10 @@ export class PipelineTasksService {
} }
async listTasksByCommitHash(hash: string) { async listTasksByCommitHash(hash: string) {
return await this.repository.find({ commit: hash }); return await this.repository.find({
where: { commit: hash },
order: { createdAt: 'DESC' },
});
} }
getRedisTokens(pipeline: Pipeline): [string, string] { getRedisTokens(pipeline: Pipeline): [string, string] {