feat(pipeline-task):通过任务完成消息更新数据库中的任务信息。

This commit is contained in:
Ivan 2021-06-08 14:52:10 +08:00
parent ead32a1204
commit 0c3310d3a5
3 changed files with 82 additions and 4 deletions

View File

@ -29,12 +29,13 @@ export class PipelineTaskFlushService {
queue: getSelfInstanceQueueKey(QUEUE_WRITE_PIPELINE_TASK_LOG),
queueOptions: {
autoDelete: true,
durable: true,
},
})
async write(message: PipelineTaskEvent) {
await this.redisService
.getClient()
.rpush(this.getKey(message.taskId), JSON.stringify(message));
const client = this.redisService.getClient();
await client.rpush(this.getKey(message.taskId), JSON.stringify(message));
await client.expire(this.getKey(message.taskId), 600); // ten minutes
if (isNil(message.unit) && terminalTaskStatuses.includes(message.status)) {
this.amqpConnection.request({
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,

View File

@ -4,3 +4,4 @@ export const ROUTE_PIPELINE_TASK_LOG = 'pipeline-task-log';
export const QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT = 'pipeline-task-log';
export const QUEUE_WRITE_PIPELINE_TASK_LOG = 'write-pipeline-task-log';
export const ROUTE_PIPELINE_TASK_DONE = 'pipeline-task-done';
export const QUEUE_PIPELINE_TASK_DONE = 'pipeline-task-done';

View File

@ -5,7 +5,21 @@ import { Repository } from 'typeorm';
import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input';
import { Pipeline } from '../pipelines/pipeline.entity';
import debug from 'debug';
import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
import {
AmqpConnection,
RabbitRPC,
RabbitSubscribe,
} from '@golevelup/nestjs-rabbitmq';
import {
EXCHANGE_PIPELINE_TASK_FANOUT,
QUEUE_PIPELINE_TASK_DONE,
ROUTE_PIPELINE_TASK_DONE,
} from './pipeline-tasks.constants';
import { PipelineTaskFlushService } from './pipeline-task-flush.service';
import { find, isNil, propEq } from 'ramda';
import { PipelineTaskLogs } from './models/pipeline-task-logs.model';
import { TaskStatuses, terminalTaskStatuses } from './enums/task-statuses.enum';
import { InjectPinoLogger, PinoLogger } from 'nestjs-pino';
const log = debug('fennec:pipeline-tasks:service');
@ -17,6 +31,9 @@ export class PipelineTasksService {
@InjectRepository(Pipeline)
private readonly pipelineRepository: Repository<Pipeline>,
private readonly amqpConnection: AmqpConnection,
private readonly eventFlushService: PipelineTaskFlushService,
@InjectPinoLogger(PipelineTasksService.name)
private readonly logger: PinoLogger,
) {}
async addTask(dto: CreatePipelineTaskInput) {
const pipeline = await this.pipelineRepository.findOneOrFail({
@ -60,4 +77,63 @@ export class PipelineTasksService {
getRedisTokens(pipeline: Pipeline): [string, string] {
return [`pipeline-${pipeline.id}:lck`, `pipeline-${pipeline.id}:tasks`];
}
@RabbitRPC({
exchange: EXCHANGE_PIPELINE_TASK_FANOUT,
routingKey: ROUTE_PIPELINE_TASK_DONE,
queue: QUEUE_PIPELINE_TASK_DONE,
queueOptions: {
autoDelete: true,
durable: true,
},
})
async updateByEvent({ id }: { id: string }) {
try {
const [events, task] = await Promise.all([
this.eventFlushService.read(id),
this.findTaskById(id),
]);
this.logger.info('[updateByEvent] start. taskId: %s', id);
for (const event of events) {
if (isNil(event.unit)) {
if (
event.status !== TaskStatuses.pending &&
task.status === TaskStatuses.pending
) {
task.startedAt = event.emittedAt;
} else if (terminalTaskStatuses.includes(event.status)) {
task.endedAt = event.emittedAt;
}
task.status = event.status;
} else {
let l: PipelineTaskLogs = find<PipelineTaskLogs>(
propEq('unit', event.unit),
task.logs,
);
if (isNil(l)) {
l = {
unit: event.unit,
startedAt: event.emittedAt,
endedAt: null,
logs: event.message,
status: event.status,
};
task.logs.push(l);
}
if (terminalTaskStatuses.includes(event.status)) {
l.endedAt = event.emittedAt;
}
l.status = event.status;
}
}
await this.repository.update({ id }, task);
this.logger.info('[updateByEvent] success. taskId: %s', id);
} catch (error) {
this.logger.error({ error }, '[updateByEvent] failed. taskId: %s', id);
}
}
}