refactor(pipeline, repo): rabbitmq

This commit is contained in:
Ivan Li
2021-05-30 22:36:06 +08:00
parent b3a2b11db9
commit 3ee41ece67
35 changed files with 287 additions and 1227 deletions

View File

@ -1,14 +0,0 @@
import { ReposService } from './repos.service';
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
import { ListLogsOption } from './models/list-logs.options';
import { LIST_LOGS_TASK } from './repos.constants';
@Processor(LIST_LOGS_TASK)
export class ListLogsConsumer {
constructor(private readonly service: ReposService) {}
@Process()
async listLogs(job: Job<ListLogsOption>) {
const logs = await this.service.listLogs(job.data);
return logs;
}
}

View File

@ -1,3 +1,5 @@
export const LIST_LOGS_TASK = 'LIST_LOGS_TASK';
export const LIST_LOGS_PUB_SUB = 'LIST_LOGS_PUB_SUB';
export const LIST_LOGS_DONE = 'LIST_LOGS_DONE';
export const EXCHANGE_REPO = 'fennec.repo';
export const ROUTE_FETCH = 'fetch';
export const ROUTE_LIST_COMMITS = 'list-commits';
export const QUEUE_LIST_COMMITS = 'list-commits';
export const QUEUE_FETCH = 'repo-fetch';

View File

@ -3,22 +3,35 @@ import { TypeOrmModule } from '@nestjs/typeorm';
import { Project } from '../projects/project.entity';
import { ReposResolver } from './repos.resolver';
import { ReposService } from './repos.service';
import { ConfigModule } from '@nestjs/config';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { ProjectsModule } from '../projects/projects.module';
import { BullModule } from '@nestjs/bull';
import { LIST_LOGS_TASK } from './repos.constants';
import { ListLogsConsumer } from './list-logs.consumer';
import { EXCHANGE_REPO } from './repos.constants';
import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq';
@Module({
imports: [
TypeOrmModule.forFeature([Project]),
ConfigModule,
ProjectsModule,
BullModule.registerQueue({
name: LIST_LOGS_TASK,
RabbitMQModule.forRootAsync(RabbitMQModule, {
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
uri: configService.get<string>('db.rabbitmq.uri'),
exchanges: [
{
name: EXCHANGE_REPO,
type: 'topic',
options: {
durable: true,
autoDelete: true,
},
},
],
}),
inject: [ConfigService],
}),
],
providers: [ReposResolver, ReposService, ListLogsConsumer],
providers: [ReposResolver, ReposService],
exports: [ReposService],
})
export class ReposModule {}

View File

@ -9,6 +9,9 @@ import configuration from '../commons/config/configuration';
import { PipelineTask } from '../pipeline-tasks/pipeline-task.entity';
import { join } from 'path';
import { readFile } from 'fs/promises';
import { getLoggerToken, PinoLogger } from 'nestjs-pino';
import { Nack } from '@golevelup/nestjs-rabbitmq';
import { getInstanceName } from '../commons/utils/rabbit-mq';
const getTest1Project = () =>
({
@ -45,6 +48,10 @@ describe('ReposService', () => {
provide: getRepositoryToken(Project),
useFactory: repositoryMockFactory,
},
{
provide: getLoggerToken(ReposService.name),
useValue: new PinoLogger({}),
},
],
}).compile();
@ -139,4 +146,57 @@ describe('ReposService', () => {
);
});
});
describe('fetch', () => {
it('success', async () => {
const project = new Project();
const pipeline = new Pipeline();
pipeline.branch = 'test';
const fetch = jest.fn((_: any) => Promise.resolve());
pipeline.project = project;
const getGit = jest.spyOn(service, 'getGit').mockImplementation(() =>
Promise.resolve({
fetch,
} as any),
);
await expect(service.fetch(pipeline)).resolves.toEqual(getInstanceName());
expect(getGit).toBeCalledTimes(1);
expect(getGit.mock.calls[0]?.[0]).toEqual(project);
expect(fetch).toBeCalledTimes(1);
expect(fetch.mock.calls[0]?.[0]).toMatchObject([
'origin',
'test',
'--depth=100',
]);
});
it('failed a', async () => {
const project = new Project();
const pipeline = new Pipeline();
pipeline.branch = 'test';
const fetch = jest.fn((_: any) => Promise.resolve());
pipeline.project = project;
const getGit = jest
.spyOn(service, 'getGit')
.mockImplementation(() => Promise.reject('error'));
await expect(service.fetch(pipeline)).resolves.toMatchObject(new Nack());
expect(getGit).toBeCalledTimes(1);
expect(getGit.mock.calls[0]?.[0]).toEqual(project);
expect(fetch).toBeCalledTimes(0);
});
it('failed b', async () => {
const project = new Project();
const pipeline = new Pipeline();
pipeline.branch = 'test';
const fetch = jest.fn((_: any) => Promise.reject('error'));
pipeline.project = project;
const getGit = jest.spyOn(service, 'getGit').mockImplementation(() =>
Promise.resolve({
fetch,
} as any),
);
await expect(service.fetch(pipeline)).resolves.toMatchObject(new Nack());
expect(getGit).toBeCalledTimes(1);
expect(fetch).toBeCalledTimes(1);
});
});
});

View File

@ -1,5 +1,4 @@
import { ListLogsOption } from './models/list-logs.options';
import { Pipeline } from './../pipelines/pipeline.entity';
import { PipelineTask } from './../pipeline-tasks/pipeline-task.entity';
import { Injectable, NotFoundException } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
@ -12,25 +11,32 @@ import { Project } from '../projects/project.entity';
import { ListBranchesArgs } from './dtos/list-branches.args';
import { ConfigService } from '@nestjs/config';
import { Commit } from './dtos/log-list.model';
import { Nack, RabbitRPC } from '@golevelup/nestjs-rabbitmq';
import { Pipeline } from '../pipelines/pipeline.entity';
import { InjectPinoLogger, Logger } from 'nestjs-pino';
import {
EXCHANGE_REPO,
QUEUE_FETCH,
QUEUE_LIST_COMMITS,
ROUTE_FETCH,
ROUTE_LIST_COMMITS,
} from './repos.constants';
import { getSelfInstanceQueueKey } from '../commons/utils/rabbit-mq';
import {
getInstanceName,
getSelfInstanceRouteKey,
} from '../commons/utils/rabbit-mq';
const DEFAULT_REMOTE_NAME = 'origin';
const INFO_PATH = '@info';
@Injectable()
export class ReposService {
async listCommits(project: Project, branch?: string) {
const git = await this.getGit(project, undefined, { fetch: false });
const data = await git.log(
branch ? ['--branches', `remotes/origin/${branch}`, '--'] : ['--all'],
);
return data.all.map((it) => ({
...it,
date: new Date(it.date),
}));
}
constructor(
@InjectRepository(Project)
private readonly projectRepository: Repository<Project>,
private readonly configService: ConfigService,
@InjectPinoLogger(ReposService.name)
private readonly logger: Logger,
) {}
getWorkspaceRoot(project: Project): string {
@ -111,4 +117,64 @@ export class ReposService {
await this.checkout(task, path);
return path;
}
@RabbitRPC({
exchange: EXCHANGE_REPO,
routingKey: [
ROUTE_LIST_COMMITS,
getSelfInstanceRouteKey(ROUTE_LIST_COMMITS),
],
queue: getSelfInstanceQueueKey(QUEUE_LIST_COMMITS),
queueOptions: {
autoDelete: true,
},
})
async listCommits(pipeline: Pipeline): Promise<Commit[] | Nack> {
const git = await this.getGit(pipeline.project, undefined, {
fetch: false,
});
try {
const data = await git.log([
'-100',
'--branches',
`remotes/origin/${pipeline.branch}`,
'--',
]);
return data.all.map(
(it) =>
({
...it,
date: new Date(it.date),
} as Commit),
);
} catch (error) {
this.logger.error(
{ error, pipeline },
'[listCommits] %s',
error?.message,
);
return new Nack();
}
}
@RabbitRPC({
exchange: EXCHANGE_REPO,
routingKey: [ROUTE_FETCH, getSelfInstanceRouteKey(ROUTE_FETCH)],
queue: getSelfInstanceQueueKey(QUEUE_FETCH),
queueOptions: {
autoDelete: true,
},
})
async fetch(pipeline: Pipeline): Promise<string | null | Nack> {
try {
const git = await this.getGit(pipeline.project, undefined, {
fetch: false,
});
await git.fetch(['origin', pipeline.branch, '--depth=100']);
return getInstanceName();
} catch (error) {
this.logger.error({ error, pipeline }, '[fetch] %s', error?.message);
return new Nack();
}
}
}