This commit is contained in:
Ivan 2021-03-05 18:12:34 +08:00
parent 7913184174
commit 8901c49bb3
2 changed files with 53 additions and 24 deletions

View File

@ -38,8 +38,8 @@ describe('PipelineTasksService', () => {
beforeEach(async () => { beforeEach(async () => {
redisClient = (() => ({ redisClient = (() => ({
set: jest.fn().mockImplementation(async () => 'OK'), set: jest.fn().mockImplementation(async () => 'OK'),
incr: jest.fn().mockImplementation(async () => 1), del: jest.fn().mockImplementation(async () => 'test'),
decr: jest.fn().mockImplementation(async () => 0), get: jest.fn().mockImplementation(async () => 'test'),
lpush: jest.fn().mockImplementation(async () => 1), lpush: jest.fn().mockImplementation(async () => 1),
rpop: jest.fn().mockImplementation(async () => JSON.stringify(getTask())), rpop: jest.fn().mockImplementation(async () => JSON.stringify(getTask())),
}))() as any; }))() as any;
@ -103,6 +103,9 @@ describe('PipelineTasksService', () => {
const save = jest const save = jest
.spyOn(taskRepository, 'save') .spyOn(taskRepository, 'save')
.mockImplementation(async (data: any) => data); .mockImplementation(async (data: any) => data);
jest
.spyOn(service, 'doNextTask')
.mockImplementation(async () => undefined);
await service.addTask({ pipelineId: 'test', commit: 'test', units: [] }), await service.addTask({ pipelineId: 'test', commit: 'test', units: [] }),
expect(save.mock.calls[0][0]).toMatchObject({ expect(save.mock.calls[0][0]).toMatchObject({
pipelineId: 'test', pipelineId: 'test',
@ -113,6 +116,9 @@ describe('PipelineTasksService', () => {
it('add task', async () => { it('add task', async () => {
const lpush = jest.spyOn(redisClient, 'lpush'); const lpush = jest.spyOn(redisClient, 'lpush');
const doNextTask = jest.spyOn(service, 'doNextTask'); const doNextTask = jest.spyOn(service, 'doNextTask');
jest
.spyOn(service, 'doNextTask')
.mockImplementation(async () => undefined);
await service.addTask({ pipelineId: 'test', commit: 'test', units: [] }); await service.addTask({ pipelineId: 'test', commit: 'test', units: [] });
expect(typeof lpush.mock.calls[0][1] === 'string').toBeTruthy(); expect(typeof lpush.mock.calls[0][1] === 'string').toBeTruthy();
expect(JSON.parse(lpush.mock.calls[0][1] as string)).toMatchObject({ expect(JSON.parse(lpush.mock.calls[0][1] as string)).toMatchObject({
@ -127,15 +133,24 @@ describe('PipelineTasksService', () => {
describe('doNextTask', () => { describe('doNextTask', () => {
it('add task to queue', async () => { it('add task to queue', async () => {
const decr = jest.spyOn(redisClient, 'decr'); let lckValue: string;
const set = jest
.spyOn(redisClient, 'set')
.mockImplementation(async (...args) => (lckValue = args[3] as string));
const get = jest
.spyOn(redisClient, 'get')
.mockImplementation(async () => lckValue);
const del = jest.spyOn(redisClient, 'del');
const rpop = jest.spyOn(redisClient, 'rpop'); const rpop = jest.spyOn(redisClient, 'rpop');
const add = jest.spyOn(taskQueue, 'add'); const add = jest.spyOn(taskQueue, 'add');
await service.doNextTask(getBasePipeline()); await service.doNextTask(getBasePipeline());
expect(add).toHaveBeenCalledWith(getTask()); expect(add).toHaveBeenCalledWith(getTask());
expect(decr).toHaveBeenCalledTimes(1); expect(set).toHaveBeenCalledTimes(1);
expect(rpop).toHaveBeenCalledTimes(1); expect(rpop).toHaveBeenCalledTimes(1);
expect(get).toHaveBeenCalledTimes(1);
expect(del).toHaveBeenCalledTimes(1);
}); });
it('pipeline is busy', async () => { it('pipeline is busy', async () => {
let remainTimes = 3; let remainTimes = 3;
@ -155,15 +170,21 @@ describe('PipelineTasksService', () => {
expect(add).toHaveBeenCalledWith(getTask()); expect(add).toHaveBeenCalledWith(getTask());
}); });
it('pipeline always busy and timeout', async () => { it('pipeline always busy and timeout', async () => {
const incr = jest.spyOn(redisClient, 'incr').mockImplementation(() => 3); const set = jest
const decr = jest.spyOn(redisClient, 'decr'); .spyOn(redisClient, 'set')
.mockImplementation(async () => {
throw new Error();
});
const get = jest.spyOn(redisClient, 'get');
const del = jest.spyOn(redisClient, 'del');
await expect( await expect(
service.doNextTask(getBasePipeline()), service.doNextTask(getBasePipeline()),
).rejects.toBeInstanceOf(LockFailedException); ).rejects.toBeInstanceOf(LockFailedException);
expect(decr).toHaveBeenCalledTimes(5); expect(set).toHaveBeenCalledTimes(5);
expect(incr).toHaveBeenCalledTimes(5); expect(get).toHaveBeenCalledTimes(0);
expect(del).toHaveBeenCalledTimes(0);
}, 15_000); }, 15_000);
}); });
}); });

View File

@ -29,33 +29,41 @@ export class PipelineTasksService {
const task = await this.repository.save(this.repository.create(dto)); const task = await this.repository.save(this.repository.create(dto));
task.pipeline = pipeline; task.pipeline = pipeline;
const [lckKey, tasksKey] = this.getRedisTokens(pipeline); const tasksKey = this.getRedisTokens(pipeline)[1];
const redis = this.redis.getClient(); const redis = this.redis.getClient();
await redis.lpush(tasksKey, JSON.stringify(task)).finally(() => { await redis.lpush(tasksKey, JSON.stringify(task));
return redis.decr(lckKey);
});
await this.doNextTask(pipeline); await this.doNextTask(pipeline);
} }
async doNextTask(pipeline: Pipeline) { async doNextTask(pipeline: Pipeline) {
const [lckKey, tasksKey] = this.getRedisTokens(pipeline); const [lckKey, tasksKey] = this.getRedisTokens(pipeline);
const redis = this.redis.getClient(); const redis = this.redis.getClient();
await redis.set(lckKey, 0, 'EX', 10, 'NX');
await new Promise(async (resolve, reject) => { const unLck = await new Promise<() => Promise<void>>(
for (let i = 0; i < 5; i++) { async (resolve, reject) => {
if ((await redis.incr(lckKey)) === 1) { const lckValue = Date.now().toString();
resolve(undefined); for (let i = 0; i < 5; i++) {
return; if (
await redis
.set(lckKey, 0, 'EX', lckValue, 'NX')
.then(() => true)
.catch(() => false)
) {
resolve(async () => {
if ((await redis.get(lckKey)) === lckValue) {
await redis.del(lckKey);
}
});
return;
}
await new Promise((resolve) => setTimeout(resolve, 2000));
} }
await redis.decr(lckKey); reject(new LockFailedException(lckKey));
await new Promise((resolve) => setTimeout(resolve, 2000)); },
} );
reject(new LockFailedException(lckKey));
});
const task = JSON.parse( const task = JSON.parse(
(await redis.rpop(tasksKey).finally(() => redis.decr(lckKey))) ?? 'null', (await redis.rpop(tasksKey).finally(() => unLck())) ?? 'null',
); );
if (task) { if (task) {
await this.queue.add(task); await this.queue.add(task);