fennec-be/src/pipeline-tasks/pipeline-task-flush.service.ts

67 lines
2.0 KiB
TypeScript

import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
import { Injectable } from '@nestjs/common';
import { ConsumeMessage } from 'amqplib';
import { deserialize } from 'class-transformer';
import { RedisService } from 'nestjs-redis';
import { isNil } from 'ramda';
import { getSelfInstanceQueueKey } from '../commons/utils/rabbit-mq';
import { PipelineTaskEvent } from './models/pipeline-task-event';
import {
EXCHANGE_PIPELINE_TASK_TOPIC,
ROUTE_PIPELINE_TASK_DONE,
} from './pipeline-tasks.constants';
import {
EXCHANGE_PIPELINE_TASK_FANOUT,
ROUTE_PIPELINE_TASK_LOG,
QUEUE_WRITE_PIPELINE_TASK_LOG,
} from './pipeline-tasks.constants';
@Injectable()
export class PipelineTaskFlushService {
constructor(
private readonly redisService: RedisService,
private readonly amqpConnection: AmqpConnection,
) {}
@RabbitSubscribe({
exchange: EXCHANGE_PIPELINE_TASK_FANOUT,
routingKey: ROUTE_PIPELINE_TASK_LOG,
queue: getSelfInstanceQueueKey(QUEUE_WRITE_PIPELINE_TASK_LOG),
queueOptions: {
autoDelete: true,
durable: true,
},
})
async write(message: PipelineTaskEvent, amqpMsg: ConsumeMessage) {
const client = this.redisService.getClient();
await client.rpush(this.getKey(message.taskId), JSON.stringify(message));
await client.expire(this.getKey(message.taskId), 600); // ten minutes
if (isNil(message.unit)) {
try {
await this.amqpConnection.request({
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
routingKey: ROUTE_PIPELINE_TASK_DONE,
payload: {
taskId: message.taskId,
status: message.status,
runOn: amqpMsg.properties.headers.sender,
},
});
} catch (error) {
console.log(error);
}
}
}
async read(taskId: string) {
const raw = await this.redisService
.getClient()
.lrange(this.getKey(taskId), 0, -1);
return raw.map((it) => deserialize(PipelineTaskEvent, it));
}
private getKey(taskId: string) {
return `p-task:log:${taskId}`;
}
}