import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'; import { Injectable, OnModuleDestroy } from '@nestjs/common'; import { plainToClass } from 'class-transformer'; import { Observable, Subject } from 'rxjs'; import { filter } from 'rxjs/operators'; import { PipelineTaskEvent } from './models/pipeline-task-event'; import { EXCHANGE_PIPELINE_TASK_FANOUT, QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT, ROUTE_PIPELINE_TASK_LOG, } from './pipeline-tasks.constants'; @Injectable() export class PipelineTaskLogger implements OnModuleDestroy { private readonly messageSubject = new Subject(); private readonly message$: Observable = this.messageSubject.pipe(); @RabbitSubscribe({ exchange: EXCHANGE_PIPELINE_TASK_FANOUT, routingKey: ROUTE_PIPELINE_TASK_LOG, queue: QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT, queueOptions: { autoDelete: true, }, }) async handleEvent(message: PipelineTaskEvent) { this.messageSubject.next(plainToClass(PipelineTaskEvent, message)); } getMessage$(taskId: string) { return this.message$.pipe(filter((event) => event.taskId === taskId)); } onModuleDestroy() { this.messageSubject.complete(); } }