feat(pipeline-tasks): 完善流程和测试用例。
This commit is contained in:
@ -8,6 +8,7 @@ import { Pipeline } from '../pipelines/pipeline.entity';
|
||||
import { InjectQueue } from '@nestjs/bull';
|
||||
import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants';
|
||||
import { Queue } from 'bull';
|
||||
import { LockFailedException } from '../commons/exceptions/lock-failed.exception';
|
||||
|
||||
@Injectable()
|
||||
export class PipelineTasksService {
|
||||
@ -30,26 +31,34 @@ export class PipelineTasksService {
|
||||
|
||||
const [lckKey, tasksKey] = this.getRedisTokens(pipeline);
|
||||
const redis = this.redis.getClient();
|
||||
await redis.set(lckKey, 0, 'EX', 10, 'NX');
|
||||
const lckSemaphore = await redis.incr(lckKey);
|
||||
if (lckSemaphore > 1) {
|
||||
await this.redis
|
||||
.getClient()
|
||||
.lpush(tasksKey, JSON.stringify(task))
|
||||
.finally(() => {
|
||||
return redis.decr(lckKey);
|
||||
});
|
||||
} else {
|
||||
this.queue.add(task);
|
||||
}
|
||||
await redis.lpush(tasksKey, JSON.stringify(task)).finally(() => {
|
||||
return redis.decr(lckKey);
|
||||
});
|
||||
await this.doNextTask(pipeline);
|
||||
}
|
||||
|
||||
async doNextTask(pipeline: Pipeline) {
|
||||
const tasksKey = this.getRedisTokens(pipeline)[1];
|
||||
const [lckKey, tasksKey] = this.getRedisTokens(pipeline);
|
||||
const redis = this.redis.getClient();
|
||||
const task = JSON.parse((await redis.rpop(tasksKey)) ?? 'null');
|
||||
await redis.set(lckKey, 0, 'EX', 10, 'NX');
|
||||
|
||||
await new Promise(async (resolve, reject) => {
|
||||
for (let i = 0; i < 5; i++) {
|
||||
if ((await redis.incr(lckKey)) === 1) {
|
||||
resolve(undefined);
|
||||
return;
|
||||
}
|
||||
await redis.decr(lckKey);
|
||||
await new Promise((resolve) => setTimeout(resolve, 2000));
|
||||
}
|
||||
reject(new LockFailedException(lckKey));
|
||||
});
|
||||
|
||||
const task = JSON.parse(
|
||||
(await redis.rpop(tasksKey).finally(() => redis.decr(lckKey))) ?? 'null',
|
||||
);
|
||||
if (task) {
|
||||
this.queue.add(task);
|
||||
await this.queue.add(task);
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user