diff --git a/src/pipeline-tasks/pipeline-task-flush.service.ts b/src/pipeline-tasks/pipeline-task-flush.service.ts index b06675f..18a0e6d 100644 --- a/src/pipeline-tasks/pipeline-task-flush.service.ts +++ b/src/pipeline-tasks/pipeline-task-flush.service.ts @@ -29,12 +29,13 @@ export class PipelineTaskFlushService { queue: getSelfInstanceQueueKey(QUEUE_WRITE_PIPELINE_TASK_LOG), queueOptions: { autoDelete: true, + durable: true, }, }) async write(message: PipelineTaskEvent) { - await this.redisService - .getClient() - .rpush(this.getKey(message.taskId), JSON.stringify(message)); + const client = this.redisService.getClient(); + await client.rpush(this.getKey(message.taskId), JSON.stringify(message)); + await client.expire(this.getKey(message.taskId), 600); // ten minutes if (isNil(message.unit) && terminalTaskStatuses.includes(message.status)) { this.amqpConnection.request({ exchange: EXCHANGE_PIPELINE_TASK_TOPIC, diff --git a/src/pipeline-tasks/pipeline-tasks.constants.ts b/src/pipeline-tasks/pipeline-tasks.constants.ts index 49101c3..42d10f9 100644 --- a/src/pipeline-tasks/pipeline-tasks.constants.ts +++ b/src/pipeline-tasks/pipeline-tasks.constants.ts @@ -4,3 +4,4 @@ export const ROUTE_PIPELINE_TASK_LOG = 'pipeline-task-log'; export const QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT = 'pipeline-task-log'; export const QUEUE_WRITE_PIPELINE_TASK_LOG = 'write-pipeline-task-log'; export const ROUTE_PIPELINE_TASK_DONE = 'pipeline-task-done'; +export const QUEUE_PIPELINE_TASK_DONE = 'pipeline-task-done'; diff --git a/src/pipeline-tasks/pipeline-tasks.service.ts b/src/pipeline-tasks/pipeline-tasks.service.ts index 812a3c3..fdc1aa2 100644 --- a/src/pipeline-tasks/pipeline-tasks.service.ts +++ b/src/pipeline-tasks/pipeline-tasks.service.ts @@ -5,7 +5,21 @@ import { Repository } from 'typeorm'; import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input'; import { Pipeline } from '../pipelines/pipeline.entity'; import debug from 'debug'; -import { AmqpConnection } from '@golevelup/nestjs-rabbitmq'; +import { + AmqpConnection, + RabbitRPC, + RabbitSubscribe, +} from '@golevelup/nestjs-rabbitmq'; +import { + EXCHANGE_PIPELINE_TASK_FANOUT, + QUEUE_PIPELINE_TASK_DONE, + ROUTE_PIPELINE_TASK_DONE, +} from './pipeline-tasks.constants'; +import { PipelineTaskFlushService } from './pipeline-task-flush.service'; +import { find, isNil, propEq } from 'ramda'; +import { PipelineTaskLogs } from './models/pipeline-task-logs.model'; +import { TaskStatuses, terminalTaskStatuses } from './enums/task-statuses.enum'; +import { InjectPinoLogger, PinoLogger } from 'nestjs-pino'; const log = debug('fennec:pipeline-tasks:service'); @@ -17,6 +31,9 @@ export class PipelineTasksService { @InjectRepository(Pipeline) private readonly pipelineRepository: Repository, private readonly amqpConnection: AmqpConnection, + private readonly eventFlushService: PipelineTaskFlushService, + @InjectPinoLogger(PipelineTasksService.name) + private readonly logger: PinoLogger, ) {} async addTask(dto: CreatePipelineTaskInput) { const pipeline = await this.pipelineRepository.findOneOrFail({ @@ -60,4 +77,63 @@ export class PipelineTasksService { getRedisTokens(pipeline: Pipeline): [string, string] { return [`pipeline-${pipeline.id}:lck`, `pipeline-${pipeline.id}:tasks`]; } + + @RabbitRPC({ + exchange: EXCHANGE_PIPELINE_TASK_FANOUT, + routingKey: ROUTE_PIPELINE_TASK_DONE, + queue: QUEUE_PIPELINE_TASK_DONE, + queueOptions: { + autoDelete: true, + durable: true, + }, + }) + async updateByEvent({ id }: { id: string }) { + try { + const [events, task] = await Promise.all([ + this.eventFlushService.read(id), + this.findTaskById(id), + ]); + this.logger.info('[updateByEvent] start. taskId: %s', id); + + for (const event of events) { + if (isNil(event.unit)) { + if ( + event.status !== TaskStatuses.pending && + task.status === TaskStatuses.pending + ) { + task.startedAt = event.emittedAt; + } else if (terminalTaskStatuses.includes(event.status)) { + task.endedAt = event.emittedAt; + } + task.status = event.status; + } else { + let l: PipelineTaskLogs = find( + propEq('unit', event.unit), + task.logs, + ); + + if (isNil(l)) { + l = { + unit: event.unit, + startedAt: event.emittedAt, + endedAt: null, + logs: event.message, + status: event.status, + }; + + task.logs.push(l); + } + + if (terminalTaskStatuses.includes(event.status)) { + l.endedAt = event.emittedAt; + } + l.status = event.status; + } + } + await this.repository.update({ id }, task); + this.logger.info('[updateByEvent] success. taskId: %s', id); + } catch (error) { + this.logger.error({ error }, '[updateByEvent] failed. taskId: %s', id); + } + } }