fennec-be/src/pipeline-tasks/pipeline-tasks.service.ts

138 lines
4.3 KiB
TypeScript
Raw Normal View History

import { ConflictException, Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { PipelineTask } from './pipeline-task.entity';
import { In, Repository } from 'typeorm';
import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input';
import { RedisService } from 'nestjs-redis';
import { Pipeline } from '../pipelines/pipeline.entity';
import { InjectQueue } from '@nestjs/bull';
import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants';
import { Queue } from 'bull';
import { LockFailedException } from '../commons/exceptions/lock-failed.exception';
import { TaskStatuses } from './enums/task-statuses.enum';
import { isNil } from 'ramda';
2021-03-25 18:09:45 +08:00
import debug from 'debug';
import { InjectPubSub } from '../commons/pub-sub/decorators/inject-pub-sub.decorator';
import { PubSub } from '../commons/pub-sub/pub-sub';
import { observableToAsyncIterable } from '@graphql-tools/utils';
2021-03-25 18:09:45 +08:00
const log = debug('fennec:pipeline-tasks:service');
@Injectable()
export class PipelineTasksService {
constructor(
@InjectRepository(PipelineTask)
private readonly repository: Repository<PipelineTask>,
@InjectRepository(Pipeline)
private readonly pipelineRepository: Repository<Pipeline>,
@InjectQueue(PIPELINE_TASK_QUEUE)
private readonly queue: Queue<PipelineTask>,
2021-03-06 12:23:55 +08:00
private readonly redis: RedisService,
@InjectPubSub()
private readonly pubSub: PubSub,
) {}
async addTask(dto: CreatePipelineTaskInput) {
const pipeline = await this.pipelineRepository.findOneOrFail({
where: { id: dto.pipelineId },
relations: ['project'],
});
2021-03-25 18:09:45 +08:00
const hasUnfinishedTask = await this.repository
.findOne({
pipelineId: dto.pipelineId,
commit: dto.commit,
status: In([TaskStatuses.pending, TaskStatuses.working]),
})
.then((val) => !isNil(val));
if (hasUnfinishedTask) {
throw new ConflictException(
'There are the same tasks among the unfinished tasks!',
);
}
const task = await this.repository.save(this.repository.create(dto));
task.pipeline = pipeline;
2021-03-05 18:12:34 +08:00
const tasksKey = this.getRedisTokens(pipeline)[1];
const redis = this.redis.getClient();
2021-03-05 18:12:34 +08:00
await redis.lpush(tasksKey, JSON.stringify(task));
2021-03-25 18:09:45 +08:00
log(
'add task %s:%s-%s',
task.id,
task.pipeline.branch,
task.commit.slice(0, 6),
);
await this.doNextTask(pipeline);
return task;
}
async findTaskById(id: string) {
return await this.repository.findOneOrFail({ id });
}
async listTasksByPipelineId(pipelineId: string) {
return await this.repository.find({ pipelineId });
}
async listTasksByCommitHash(hash: string) {
return await this.repository.find({ commit: hash });
}
2021-03-02 16:28:37 +08:00
async doNextTask(pipeline: Pipeline) {
const [lckKey, tasksKey] = this.getRedisTokens(pipeline);
const redis = this.redis.getClient();
2021-03-25 18:09:45 +08:00
log('doNextTask()');
2021-03-05 18:12:34 +08:00
const unLck = await new Promise<() => Promise<void>>(
async (resolve, reject) => {
const lckValue = Date.now().toString();
for (let i = 0; i < 5; i++) {
if (
await redis
.set(lckKey, 0, 'EX', lckValue, 'NX')
.then(() => true)
.catch(() => false)
) {
resolve(async () => {
if ((await redis.get(lckKey)) === lckValue) {
await redis.del(lckKey);
}
});
return;
}
await new Promise((resolve) => setTimeout(resolve, 2000));
}
2021-03-05 18:12:34 +08:00
reject(new LockFailedException(lckKey));
},
);
const task = JSON.parse(
2021-03-05 18:12:34 +08:00
(await redis.rpop(tasksKey).finally(() => unLck())) ?? 'null',
);
2021-03-02 16:28:37 +08:00
if (task) {
2021-03-25 18:09:45 +08:00
log(
'add task (%s:%s-%s) to queue',
task.id,
task.pipeline.branch,
task.commit.slice(0, 6),
);
await this.queue.add(task);
2021-03-25 18:09:45 +08:00
} else {
log('task is empty');
}
}
2021-03-05 17:12:06 +08:00
async updateTask(task: PipelineTask) {
this.pubSub.publish(`pipeline-task:${task.id}`, task);
2021-03-05 17:12:06 +08:00
return await this.repository.save(task);
}
async watchTaskUpdated(id: string) {
return observableToAsyncIterable(
this.pubSub.message$(`pipeline-task:${id}`),
);
}
getRedisTokens(pipeline: Pipeline): [string, string] {
return [`pipeline-${pipeline.id}:lck`, `pipeline-${pipeline.id}:tasks`];
}
}