import { Test, TestingModule } from '@nestjs/testing'; import { PipelineTasksService } from './pipeline-tasks.service'; import { getRepositoryToken } from '@nestjs/typeorm'; import { PipelineTask } from './pipeline-task.entity'; import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants'; import { getQueueToken } from '@nestjs/bull'; import { RedisService } from 'nestjs-redis'; import { Pipeline } from '../pipelines/pipeline.entity'; import { EntityNotFoundError } from 'typeorm/error/EntityNotFoundError'; import { Repository } from 'typeorm'; import { Queue } from 'bull'; import { LockFailedException } from '../commons/exceptions/lock-failed.exception'; describe('PipelineTasksService', () => { let service: PipelineTasksService; let module: TestingModule; let taskRepository: Repository; let pipelineRepository: Repository; const getBasePipeline = () => ({ id: 'test', name: '测试流水线', branch: 'master', workUnitMetadata: {}, project: { id: 'test-project', }, } as Pipeline); let redisClient; let taskQueue: Queue; const getTask = () => ({ pipelineId: 'test', commit: 'test', units: [], } as PipelineTask); beforeEach(async () => { redisClient = (() => ({ set: jest.fn().mockImplementation(async () => 'OK'), incr: jest.fn().mockImplementation(async () => 1), decr: jest.fn().mockImplementation(async () => 0), lpush: jest.fn().mockImplementation(async () => 1), rpop: jest.fn().mockImplementation(async () => JSON.stringify(getTask())), }))() as any; taskQueue = (() => ({ add: jest.fn().mockImplementation(async () => null), }))() as any; module = await Test.createTestingModule({ providers: [ PipelineTasksService, { provide: getRepositoryToken(PipelineTask), useValue: new Repository(), }, { provide: getRepositoryToken(Pipeline), useValue: new Repository(), }, { provide: getQueueToken(PIPELINE_TASK_QUEUE), useValue: taskQueue, }, { provide: RedisService, useValue: { getClient: jest.fn(() => redisClient), }, }, ], }).compile(); service = module.get(PipelineTasksService); taskRepository = module.get(getRepositoryToken(PipelineTask)); pipelineRepository = module.get(getRepositoryToken(Pipeline)); jest .spyOn(taskRepository, 'save') .mockImplementation(async (data: any) => data); jest .spyOn(taskRepository, 'create') .mockImplementation((data: any) => data); }); it('should be defined', () => { expect(service).toBeDefined(); }); describe('addTask', () => { beforeEach(() => { jest .spyOn(pipelineRepository, 'findOneOrFail') .mockImplementation(async () => getBasePipeline()); }); it('pipeline not found', async () => { jest.spyOn(taskRepository, 'findOneOrFail').mockImplementation(() => { throw new EntityNotFoundError(Pipeline, {}); }); await expect( service.addTask({ pipelineId: 'test', commit: 'test', units: [] }), ).rejects; }); it('create task on db', async () => { const save = jest .spyOn(taskRepository, 'save') .mockImplementation(async (data: any) => data); await service.addTask({ pipelineId: 'test', commit: 'test', units: [] }), expect(save.mock.calls[0][0]).toMatchObject({ pipelineId: 'test', commit: 'test', units: [], }); }); it('add task', async () => { const lpush = jest.spyOn(redisClient, 'lpush'); const doNextTask = jest.spyOn(service, 'doNextTask'); await service.addTask({ pipelineId: 'test', commit: 'test', units: [] }); expect(typeof lpush.mock.calls[0][1] === 'string').toBeTruthy(); expect(JSON.parse(lpush.mock.calls[0][1] as string)).toMatchObject({ pipelineId: 'test', commit: 'test', units: [], pipeline: getBasePipeline(), }); expect(doNextTask).toHaveBeenCalledWith(getBasePipeline()); }); }); describe('doNextTask', () => { it('add task to queue', async () => { const decr = jest.spyOn(redisClient, 'decr'); const rpop = jest.spyOn(redisClient, 'rpop'); const add = jest.spyOn(taskQueue, 'add'); await service.doNextTask(getBasePipeline()); expect(add).toHaveBeenCalledWith(getTask()); expect(decr).toHaveBeenCalledTimes(1); expect(rpop).toHaveBeenCalledTimes(1); }); it('pipeline is busy', async () => { let remainTimes = 3; const incr = jest .spyOn(redisClient, 'incr') .mockImplementation(() => remainTimes--); const rpop = jest.spyOn(redisClient, 'rpop'); const decr = jest.spyOn(redisClient, 'decr'); const add = jest.spyOn(taskQueue, 'add'); await service.doNextTask(getBasePipeline()); expect(rpop).toHaveBeenCalledTimes(1); expect(incr).toHaveBeenCalledTimes(3); expect(decr).toHaveBeenCalledTimes(3); expect(add).toHaveBeenCalledWith(getTask()); }); it('pipeline always busy and timeout', async () => { const incr = jest.spyOn(redisClient, 'incr').mockImplementation(() => 3); const decr = jest.spyOn(redisClient, 'decr'); await expect( service.doNextTask(getBasePipeline()), ).rejects.toBeInstanceOf(LockFailedException); expect(decr).toHaveBeenCalledTimes(5); expect(incr).toHaveBeenCalledTimes(5); }, 15_000); }); });