feat(pipeline-tasks): 流水线人物。
This commit is contained in:
60
src/pipeline-tasks/pipeline-tasks.service.ts
Normal file
60
src/pipeline-tasks/pipeline-tasks.service.ts
Normal file
@ -0,0 +1,60 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { PipelineTask } from './pipeline-task.entity';
|
||||
import { Repository } from 'typeorm';
|
||||
import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input';
|
||||
import { RedisService } from 'nestjs-redis';
|
||||
import { Pipeline } from '../pipelines/pipeline.entity';
|
||||
import { InjectQueue } from '@nestjs/bull';
|
||||
import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants';
|
||||
import { Queue } from 'bull';
|
||||
|
||||
@Injectable()
|
||||
export class PipelineTasksService {
|
||||
constructor(
|
||||
@InjectRepository(PipelineTask)
|
||||
private readonly repository: Repository<PipelineTask>,
|
||||
@InjectRepository(Pipeline)
|
||||
private readonly pipelineRepository: Repository<Pipeline>,
|
||||
private readonly redis: RedisService,
|
||||
@InjectQueue(PIPELINE_TASK_QUEUE)
|
||||
private readonly queue: Queue<PipelineTask>,
|
||||
) {}
|
||||
async addTask(dto: CreatePipelineTaskInput) {
|
||||
const pipeline = await this.pipelineRepository.findOneOrFail({
|
||||
where: { id: dto.pipelineId },
|
||||
relations: ['project'],
|
||||
});
|
||||
const task = await this.repository.save(this.repository.create(dto));
|
||||
task.pipeline = pipeline;
|
||||
|
||||
const [lckKey, tasksKey] = this.getRedisTokens(pipeline);
|
||||
const redis = this.redis.getClient();
|
||||
await redis.set(lckKey, 0, 'EX', 10, 'NX');
|
||||
const lckSemaphore = await redis.incr(lckKey);
|
||||
if (lckSemaphore > 1) {
|
||||
await this.redis
|
||||
.getClient()
|
||||
.lpush(tasksKey, JSON.stringify(task))
|
||||
.finally(() => {
|
||||
return redis.decr(lckKey);
|
||||
});
|
||||
} else {
|
||||
this.queue.add(task);
|
||||
}
|
||||
}
|
||||
|
||||
async doTask(task: PipelineTask) {
|
||||
const tasksKey = this.getRedisTokens(task.pipeline)[1];
|
||||
|
||||
const redis = this.redis.getClient();
|
||||
const nextTask = await redis.rpop(tasksKey);
|
||||
if (nextTask) {
|
||||
this.doTask(task).then();
|
||||
}
|
||||
}
|
||||
|
||||
getRedisTokens(pipeline: Pipeline): [string, string] {
|
||||
return [`pipeline-${pipeline.id}:lck`, `pipeline-${pipeline.id}:tasks`];
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user