feat(pipelines): 添加消息队列
This commit is contained in:
parent
22d9bf47d3
commit
31a200206f
16
src/pipeline-tasks/pipeline-task.consumer.ts
Normal file
16
src/pipeline-tasks/pipeline-task.consumer.ts
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
import { OnQueueCompleted, Process, Processor } from '@nestjs/bull';
|
||||||
|
import { Job } from 'bull';
|
||||||
|
import { PipelineTask } from './pipeline-task.entity';
|
||||||
|
import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants';
|
||||||
|
import { PipelineTasksService } from './pipeline-tasks.service';
|
||||||
|
@Processor(PIPELINE_TASK_QUEUE)
|
||||||
|
export class PipelineTaskConsumer {
|
||||||
|
constructor(private readonly service: PipelineTasksService) {}
|
||||||
|
@Process()
|
||||||
|
async doTask() {}
|
||||||
|
|
||||||
|
@OnQueueCompleted()
|
||||||
|
onCompleted(job: Job<PipelineTask>) {
|
||||||
|
this.service.doNextTask(job.data.pipeline);
|
||||||
|
}
|
||||||
|
}
|
@ -2,6 +2,10 @@ import { Test, TestingModule } from '@nestjs/testing';
|
|||||||
import { PipelineTasksService } from './pipeline-tasks.service';
|
import { PipelineTasksService } from './pipeline-tasks.service';
|
||||||
import { getRepositoryToken } from '@nestjs/typeorm';
|
import { getRepositoryToken } from '@nestjs/typeorm';
|
||||||
import { PipelineTask } from './pipeline-task.entity';
|
import { PipelineTask } from './pipeline-task.entity';
|
||||||
|
import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants';
|
||||||
|
import { getQueueToken } from '@nestjs/bull';
|
||||||
|
import { RedisService } from 'nestjs-redis';
|
||||||
|
import { Pipeline } from '../pipelines/pipeline.entity';
|
||||||
|
|
||||||
describe('PipelineTasksService', () => {
|
describe('PipelineTasksService', () => {
|
||||||
let service: PipelineTasksService;
|
let service: PipelineTasksService;
|
||||||
@ -14,6 +18,19 @@ describe('PipelineTasksService', () => {
|
|||||||
provide: getRepositoryToken(PipelineTask),
|
provide: getRepositoryToken(PipelineTask),
|
||||||
useValue: {},
|
useValue: {},
|
||||||
},
|
},
|
||||||
|
PipelineTasksService,
|
||||||
|
{
|
||||||
|
provide: getRepositoryToken(Pipeline),
|
||||||
|
useValue: {},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
provide: getQueueToken(PIPELINE_TASK_QUEUE),
|
||||||
|
useValue: {},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
provide: RedisService,
|
||||||
|
useValue: {},
|
||||||
|
},
|
||||||
],
|
],
|
||||||
}).compile();
|
}).compile();
|
||||||
|
|
||||||
|
@ -44,13 +44,12 @@ export class PipelineTasksService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async doTask(task: PipelineTask) {
|
async doNextTask(pipeline: Pipeline) {
|
||||||
const tasksKey = this.getRedisTokens(task.pipeline)[1];
|
const tasksKey = this.getRedisTokens(pipeline)[1];
|
||||||
|
|
||||||
const redis = this.redis.getClient();
|
const redis = this.redis.getClient();
|
||||||
const nextTask = await redis.rpop(tasksKey);
|
const task = JSON.parse((await redis.rpop(tasksKey)) ?? 'null');
|
||||||
if (nextTask) {
|
if (task) {
|
||||||
this.doTask(task).then();
|
this.queue.add(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user