diff --git a/src/pipeline-tasks/pipeline-tasks.service.spec.ts b/src/pipeline-tasks/pipeline-tasks.service.spec.ts index 8fb76ba..a47a082 100644 --- a/src/pipeline-tasks/pipeline-tasks.service.spec.ts +++ b/src/pipeline-tasks/pipeline-tasks.service.spec.ts @@ -38,8 +38,8 @@ describe('PipelineTasksService', () => { beforeEach(async () => { redisClient = (() => ({ set: jest.fn().mockImplementation(async () => 'OK'), - incr: jest.fn().mockImplementation(async () => 1), - decr: jest.fn().mockImplementation(async () => 0), + del: jest.fn().mockImplementation(async () => 'test'), + get: jest.fn().mockImplementation(async () => 'test'), lpush: jest.fn().mockImplementation(async () => 1), rpop: jest.fn().mockImplementation(async () => JSON.stringify(getTask())), }))() as any; @@ -103,6 +103,9 @@ describe('PipelineTasksService', () => { const save = jest .spyOn(taskRepository, 'save') .mockImplementation(async (data: any) => data); + jest + .spyOn(service, 'doNextTask') + .mockImplementation(async () => undefined); await service.addTask({ pipelineId: 'test', commit: 'test', units: [] }), expect(save.mock.calls[0][0]).toMatchObject({ pipelineId: 'test', @@ -113,6 +116,9 @@ describe('PipelineTasksService', () => { it('add task', async () => { const lpush = jest.spyOn(redisClient, 'lpush'); const doNextTask = jest.spyOn(service, 'doNextTask'); + jest + .spyOn(service, 'doNextTask') + .mockImplementation(async () => undefined); await service.addTask({ pipelineId: 'test', commit: 'test', units: [] }); expect(typeof lpush.mock.calls[0][1] === 'string').toBeTruthy(); expect(JSON.parse(lpush.mock.calls[0][1] as string)).toMatchObject({ @@ -127,15 +133,24 @@ describe('PipelineTasksService', () => { describe('doNextTask', () => { it('add task to queue', async () => { - const decr = jest.spyOn(redisClient, 'decr'); + let lckValue: string; + const set = jest + .spyOn(redisClient, 'set') + .mockImplementation(async (...args) => (lckValue = args[3] as string)); + const get = jest + .spyOn(redisClient, 'get') + .mockImplementation(async () => lckValue); + const del = jest.spyOn(redisClient, 'del'); const rpop = jest.spyOn(redisClient, 'rpop'); const add = jest.spyOn(taskQueue, 'add'); await service.doNextTask(getBasePipeline()); expect(add).toHaveBeenCalledWith(getTask()); - expect(decr).toHaveBeenCalledTimes(1); + expect(set).toHaveBeenCalledTimes(1); expect(rpop).toHaveBeenCalledTimes(1); + expect(get).toHaveBeenCalledTimes(1); + expect(del).toHaveBeenCalledTimes(1); }); it('pipeline is busy', async () => { let remainTimes = 3; @@ -155,15 +170,21 @@ describe('PipelineTasksService', () => { expect(add).toHaveBeenCalledWith(getTask()); }); it('pipeline always busy and timeout', async () => { - const incr = jest.spyOn(redisClient, 'incr').mockImplementation(() => 3); - const decr = jest.spyOn(redisClient, 'decr'); + const set = jest + .spyOn(redisClient, 'set') + .mockImplementation(async () => { + throw new Error(); + }); + const get = jest.spyOn(redisClient, 'get'); + const del = jest.spyOn(redisClient, 'del'); await expect( service.doNextTask(getBasePipeline()), ).rejects.toBeInstanceOf(LockFailedException); - expect(decr).toHaveBeenCalledTimes(5); - expect(incr).toHaveBeenCalledTimes(5); + expect(set).toHaveBeenCalledTimes(5); + expect(get).toHaveBeenCalledTimes(0); + expect(del).toHaveBeenCalledTimes(0); }, 15_000); }); }); diff --git a/src/pipeline-tasks/pipeline-tasks.service.ts b/src/pipeline-tasks/pipeline-tasks.service.ts index a5d6475..5e94020 100644 --- a/src/pipeline-tasks/pipeline-tasks.service.ts +++ b/src/pipeline-tasks/pipeline-tasks.service.ts @@ -29,33 +29,41 @@ export class PipelineTasksService { const task = await this.repository.save(this.repository.create(dto)); task.pipeline = pipeline; - const [lckKey, tasksKey] = this.getRedisTokens(pipeline); + const tasksKey = this.getRedisTokens(pipeline)[1]; const redis = this.redis.getClient(); - await redis.lpush(tasksKey, JSON.stringify(task)).finally(() => { - return redis.decr(lckKey); - }); + await redis.lpush(tasksKey, JSON.stringify(task)); await this.doNextTask(pipeline); } async doNextTask(pipeline: Pipeline) { const [lckKey, tasksKey] = this.getRedisTokens(pipeline); const redis = this.redis.getClient(); - await redis.set(lckKey, 0, 'EX', 10, 'NX'); - await new Promise(async (resolve, reject) => { - for (let i = 0; i < 5; i++) { - if ((await redis.incr(lckKey)) === 1) { - resolve(undefined); - return; + const unLck = await new Promise<() => Promise>( + async (resolve, reject) => { + const lckValue = Date.now().toString(); + for (let i = 0; i < 5; i++) { + if ( + await redis + .set(lckKey, 0, 'EX', lckValue, 'NX') + .then(() => true) + .catch(() => false) + ) { + resolve(async () => { + if ((await redis.get(lckKey)) === lckValue) { + await redis.del(lckKey); + } + }); + return; + } + await new Promise((resolve) => setTimeout(resolve, 2000)); } - await redis.decr(lckKey); - await new Promise((resolve) => setTimeout(resolve, 2000)); - } - reject(new LockFailedException(lckKey)); - }); + reject(new LockFailedException(lckKey)); + }, + ); const task = JSON.parse( - (await redis.rpop(tasksKey).finally(() => redis.decr(lckKey))) ?? 'null', + (await redis.rpop(tasksKey).finally(() => unLck())) ?? 'null', ); if (task) { await this.queue.add(task);