From a231a02c289036825cbc4a3f3fb1fd29eb0e8cf3 Mon Sep 17 00:00:00 2001 From: Ivan Li Date: Sun, 27 Jun 2021 19:35:49 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BF=AE=E6=94=B9=E9=A1=B9=E7=9B=AE?= =?UTF-8?q?=E5=90=8E=E8=87=AA=E5=8A=A8=E5=88=A0=E9=99=A4=E9=A1=B9=E7=9B=AE?= =?UTF-8?q?=E5=B7=A5=E4=BD=9C=E5=8C=BA=E7=9B=B8=E5=85=B3=E7=9B=AE=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vscode/settings.json | 1 + src/commons/commons.module.ts | 4 +- src/commons/redis-mutex/redis-mutex.module.ts | 10 +++ .../redis-mutex/redis-mutex.service.spec.ts | 18 +++++ .../redis-mutex/redis-mutex.service.ts | 71 +++++++++++++++++++ src/pipeline-tasks/pipeline-tasks.module.ts | 17 ----- src/pipelines/pipelines.service.ts | 2 +- src/projects/projects.constants.ts | 3 + src/projects/projects.module.ts | 24 ++++++- src/projects/projects.service.spec.ts | 5 ++ src/projects/projects.service.ts | 13 +++- src/repos/repos.constants.ts | 1 + src/repos/repos.module.ts | 2 + src/repos/repos.service.ts | 54 +++++++++++++- 14 files changed, 201 insertions(+), 24 deletions(-) create mode 100644 src/commons/redis-mutex/redis-mutex.module.ts create mode 100644 src/commons/redis-mutex/redis-mutex.service.spec.ts create mode 100644 src/commons/redis-mutex/redis-mutex.service.ts create mode 100644 src/projects/projects.constants.ts diff --git a/.vscode/settings.json b/.vscode/settings.json index ad38e62..9ac8376 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,5 +1,6 @@ { "cSpell.words": [ + "Mutex", "Repos", "amqp", "boardcat", diff --git a/src/commons/commons.module.ts b/src/commons/commons.module.ts index 12d0563..c266509 100644 --- a/src/commons/commons.module.ts +++ b/src/commons/commons.module.ts @@ -1,8 +1,10 @@ import { Module } from '@nestjs/common'; import { PasswordConverter } from './services/password-converter'; +import { RedisMutexModule } from './redis-mutex/redis-mutex.module'; @Module({ providers: [PasswordConverter], - exports: [PasswordConverter], + exports: [PasswordConverter, RedisMutexModule], + imports: [RedisMutexModule], }) export class CommonsModule {} diff --git a/src/commons/redis-mutex/redis-mutex.module.ts b/src/commons/redis-mutex/redis-mutex.module.ts new file mode 100644 index 0000000..2884158 --- /dev/null +++ b/src/commons/redis-mutex/redis-mutex.module.ts @@ -0,0 +1,10 @@ +import { Module } from '@nestjs/common'; +import { RedisMutexService } from './redis-mutex.service'; +import { RedisModule } from 'nestjs-redis'; + +@Module({ + imports: [RedisModule], + providers: [RedisMutexService], + exports: [RedisMutexService], +}) +export class RedisMutexModule {} diff --git a/src/commons/redis-mutex/redis-mutex.service.spec.ts b/src/commons/redis-mutex/redis-mutex.service.spec.ts new file mode 100644 index 0000000..31daa7a --- /dev/null +++ b/src/commons/redis-mutex/redis-mutex.service.spec.ts @@ -0,0 +1,18 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { RedisMutexService } from './redis-mutex.service'; + +describe('RedisMutexService', () => { + let service: RedisMutexService; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [RedisMutexService], + }).compile(); + + service = module.get(RedisMutexService); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); +}); diff --git a/src/commons/redis-mutex/redis-mutex.service.ts b/src/commons/redis-mutex/redis-mutex.service.ts new file mode 100644 index 0000000..32775cb --- /dev/null +++ b/src/commons/redis-mutex/redis-mutex.service.ts @@ -0,0 +1,71 @@ +import { Injectable } from '@nestjs/common'; +import { RedisService } from 'nestjs-redis'; +import * as uuid from 'uuid'; +import { ApplicationException } from '../exceptions/application.exception'; + +export interface RedisMutexOption { + /** + * seconds + */ + expires?: number; + /** + * seconds + */ + timeout?: number | null; + /** + * milliseconds + */ + retryDelay?: number; +} + +@Injectable() +export class RedisMutexService { + constructor(private readonly redisClient: RedisService) {} + + public async lock( + key: string, + { expires = 100, timeout = 10, retryDelay = 100 }: RedisMutexOption = { + expires: 100, + timeout: 10, + retryDelay: 100, + }, + ) { + const redisKey = `${'mutex-lock'}:${key}`; + const redis = this.redisClient.getClient(); + const value = uuid.v4(); + const timeoutAt = timeout ? Date.now() + timeout * 1000 : null; + + while ( + !(await redis + .set(redisKey, value, 'EX', expires, 'NX') + .then(() => true) + .catch(() => false)) + ) { + if (timeoutAt && timeoutAt > Date.now()) { + throw new ApplicationException('lock timeout'); + } + await new Promise((resolve) => setTimeout(resolve, retryDelay)); + } + + const renewTimer = setInterval(() => { + redis.expire(redisKey, expires); + }, (expires * 1000) / 2); + + return async () => { + clearInterval(renewTimer); + await redis.eval( + ` + if redis.call("get", KEYS[1]) == ARGV[1] + then + return redis.call("del", KEYS[1]) + else + return 0 + end + `, + 1, + redisKey, + value, + ); + }; + } +} diff --git a/src/pipeline-tasks/pipeline-tasks.module.ts b/src/pipeline-tasks/pipeline-tasks.module.ts index fa175eb..5c05339 100644 --- a/src/pipeline-tasks/pipeline-tasks.module.ts +++ b/src/pipeline-tasks/pipeline-tasks.module.ts @@ -22,7 +22,6 @@ import { PipelineTaskFlushService } from './pipeline-task-flush.service'; TypeOrmModule.forFeature([PipelineTask, Pipeline]), RedisModule, ReposModule, - RabbitMQModule.forRootAsync(RabbitMQModule, { imports: [ConfigModule], useFactory: (configService: ConfigService) => ({ @@ -36,22 +35,6 @@ import { PipelineTaskFlushService } from './pipeline-task-flush.service'; autoDelete: true, }, }, - { - name: 'stop-pipeline-task', - type: 'fanout', - options: { - durable: true, - autoDelete: true, - }, - }, - { - name: 'update-pipeline-task', - type: 'fanout', - options: { - durable: false, - autoDelete: true, - }, - }, { name: EXCHANGE_PIPELINE_TASK_FANOUT, type: 'fanout', diff --git a/src/pipelines/pipelines.service.ts b/src/pipelines/pipelines.service.ts index 14dcaef..1c7f8ee 100644 --- a/src/pipelines/pipelines.service.ts +++ b/src/pipelines/pipelines.service.ts @@ -57,7 +57,7 @@ export class PipelinesService extends BaseDbService { exchange: EXCHANGE_REPO, routingKey: getAppInstanceRouteKey(ROUTE_FETCH, appInstance), payload: pipeline, - timeout: 30_000, + timeout: 120_000, }); } async listCommits(pipeline: Pipeline) { diff --git a/src/projects/projects.constants.ts b/src/projects/projects.constants.ts new file mode 100644 index 0000000..3f412ab --- /dev/null +++ b/src/projects/projects.constants.ts @@ -0,0 +1,3 @@ +export const EXCHANGE_PROJECT_TOPIC = 'project.topic'; +export const EXCHANGE_PROJECT_FANOUT = 'project.fanout'; +export const ROUTE_PROJECT_CHANGE = 'project-change'; diff --git a/src/projects/projects.module.ts b/src/projects/projects.module.ts index 8dda814..f5441a8 100644 --- a/src/projects/projects.module.ts +++ b/src/projects/projects.module.ts @@ -3,9 +3,31 @@ import { ProjectsService } from './projects.service'; import { ProjectsResolver } from './projects.resolver'; import { TypeOrmModule } from '@nestjs/typeorm'; import { Project } from './project.entity'; +import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq'; +import { ConfigModule, ConfigService } from '@nestjs/config'; +import { EXCHANGE_PROJECT_FANOUT } from './projects.constants'; @Module({ - imports: [TypeOrmModule.forFeature([Project])], + imports: [ + TypeOrmModule.forFeature([Project]), + RabbitMQModule.forRootAsync(RabbitMQModule, { + imports: [ConfigModule], + useFactory: (configService: ConfigService) => ({ + uri: configService.get('db.rabbitmq.uri'), + exchanges: [ + { + name: EXCHANGE_PROJECT_FANOUT, + type: 'fanout', + options: { + durable: false, + autoDelete: true, + }, + }, + ], + }), + inject: [ConfigService], + }), + ], providers: [ProjectsService, ProjectsResolver], exports: [ProjectsService], }) diff --git a/src/projects/projects.service.spec.ts b/src/projects/projects.service.spec.ts index be02cbd..8262732 100644 --- a/src/projects/projects.service.spec.ts +++ b/src/projects/projects.service.spec.ts @@ -2,6 +2,7 @@ import { Test, TestingModule } from '@nestjs/testing'; import { ProjectsService } from './projects.service'; import { getRepositoryToken } from '@nestjs/typeorm'; import { Project } from './project.entity'; +import { AmqpConnection } from '@golevelup/nestjs-rabbitmq'; describe('ProjectsService', () => { let service: ProjectsService; @@ -14,6 +15,10 @@ describe('ProjectsService', () => { provide: getRepositoryToken(Project), useValue: {}, }, + { + provide: AmqpConnection, + useValue: {}, + }, ], }).compile(); diff --git a/src/projects/projects.service.ts b/src/projects/projects.service.ts index ae0009e..a8b7cea 100644 --- a/src/projects/projects.service.ts +++ b/src/projects/projects.service.ts @@ -5,6 +5,11 @@ import { Repository } from 'typeorm'; import { CreateProjectInput } from './dtos/create-project.input'; import { Project } from './project.entity'; import { UpdateProjectInput } from './dtos/update-project.input'; +import { AmqpConnection } from '@golevelup/nestjs-rabbitmq'; +import { + EXCHANGE_PROJECT_FANOUT, + ROUTE_PROJECT_CHANGE, +} from './projects.constants'; @Injectable() export class ProjectsService extends BaseDbService { @@ -12,6 +17,7 @@ export class ProjectsService extends BaseDbService { constructor( @InjectRepository(Project) readonly repository: Repository, + private readonly amqpConnection: AmqpConnection, ) { super(repository); } @@ -28,7 +34,12 @@ export class ProjectsService extends BaseDbService { async update(dto: UpdateProjectInput) { await this.isDuplicateEntityForUpdate(dto.id, dto); const old = await this.findOne(dto.id); - return await this.repository.save(this.repository.merge(old, dto)); + const project = await this.repository.save(this.repository.merge(old, dto)); + this.amqpConnection.publish(EXCHANGE_PROJECT_FANOUT, ROUTE_PROJECT_CHANGE, [ + project, + old, + ]); + return project; } async remove(id: string) { diff --git a/src/repos/repos.constants.ts b/src/repos/repos.constants.ts index efe2010..5946766 100644 --- a/src/repos/repos.constants.ts +++ b/src/repos/repos.constants.ts @@ -3,3 +3,4 @@ export const ROUTE_FETCH = 'fetch'; export const ROUTE_LIST_COMMITS = 'list-commits'; export const QUEUE_LIST_COMMITS = 'list-commits'; export const QUEUE_FETCH = 'repo-fetch'; +export const QUEUE_REFRESH_REPO = 'refresh-repo'; diff --git a/src/repos/repos.module.ts b/src/repos/repos.module.ts index 7c5802d..08edf4a 100644 --- a/src/repos/repos.module.ts +++ b/src/repos/repos.module.ts @@ -7,12 +7,14 @@ import { ConfigModule, ConfigService } from '@nestjs/config'; import { ProjectsModule } from '../projects/projects.module'; import { EXCHANGE_REPO } from './repos.constants'; import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq'; +import { CommonsModule } from '../commons/commons.module'; @Module({ imports: [ TypeOrmModule.forFeature([Project]), ConfigModule, ProjectsModule, + CommonsModule, RabbitMQModule.forRootAsync(RabbitMQModule, { imports: [ConfigModule], useFactory: (configService: ConfigService) => ({ diff --git a/src/repos/repos.service.ts b/src/repos/repos.service.ts index 83a6919..811c6a1 100644 --- a/src/repos/repos.service.ts +++ b/src/repos/repos.service.ts @@ -11,13 +11,14 @@ import { Project } from '../projects/project.entity'; import { ListBranchesArgs } from './dtos/list-branches.args'; import { ConfigService } from '@nestjs/config'; import { Commit } from './dtos/log-list.model'; -import { Nack, RabbitRPC } from '@golevelup/nestjs-rabbitmq'; +import { Nack, RabbitRPC, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'; import { Pipeline } from '../pipelines/pipeline.entity'; -import { InjectPinoLogger, Logger } from 'nestjs-pino'; +import { InjectPinoLogger, PinoLogger } from 'nestjs-pino'; import { EXCHANGE_REPO, QUEUE_FETCH, QUEUE_LIST_COMMITS, + QUEUE_REFRESH_REPO, ROUTE_FETCH, ROUTE_LIST_COMMITS, } from './repos.constants'; @@ -27,6 +28,12 @@ import { getSelfInstanceRouteKey, } from '../commons/utils/rabbit-mq'; import { ApplicationException } from '../commons/exceptions/application.exception'; +import { + EXCHANGE_PROJECT_FANOUT, + ROUTE_PROJECT_CHANGE, +} from '../projects/projects.constants'; +import { RedisMutexService } from '../commons/redis-mutex/redis-mutex.service'; +import { rm } from 'fs/promises'; const DEFAULT_REMOTE_NAME = 'origin'; const INFO_PATH = '@info'; @@ -37,7 +44,8 @@ export class ReposService { private readonly projectRepository: Repository, private readonly configService: ConfigService, @InjectPinoLogger(ReposService.name) - private readonly logger: Logger, + private readonly logger: PinoLogger, + private readonly redisMutexService: RedisMutexService, ) {} getWorkspaceRoot(project: Project): string { @@ -170,6 +178,9 @@ export class ReposService { }, }) async fetch(pipeline: Pipeline): Promise { + const unlock = await this.redisMutexService.lock( + `repo-project-${pipeline.projectId}`, + ); try { const git = await this.getGit(pipeline.project, undefined, { fetch: false, @@ -179,6 +190,43 @@ export class ReposService { } catch (error) { this.logger.error({ error, pipeline }, '[fetch] %s', error?.message); return new Nack(); + } finally { + await unlock(); + } + } + + @RabbitSubscribe({ + exchange: EXCHANGE_PROJECT_FANOUT, + routingKey: ROUTE_PROJECT_CHANGE, + queue: QUEUE_REFRESH_REPO, + queueOptions: { + autoDelete: true, + durable: true, + }, + }) + async refreshRepo([project]: [Project]) { + this.logger.info({ project }, '[refreshRepo] start'); + const unlock = await this.redisMutexService.lock( + `repo-project-${project.id}`, + { + timeout: null, + }, + ); + try { + const path = join( + this.configService.get('workspaces.root'), + encodeURIComponent(project.name), + ); + await rm(path, { recursive: true }); + this.logger.info({ project }, '[refreshRepo] success'); + } catch (error) { + this.logger.error( + { project, error }, + '[refreshRepo] failed. $s', + error.message, + ); + } finally { + await unlock(); } } }