diff --git a/src/pipeline-tasks/pipeline-task-flush.service.ts b/src/pipeline-tasks/pipeline-task-flush.service.ts index 22bddac..9146d48 100644 --- a/src/pipeline-tasks/pipeline-task-flush.service.ts +++ b/src/pipeline-tasks/pipeline-task-flush.service.ts @@ -4,7 +4,7 @@ import { deserialize } from 'class-transformer'; import { RedisService } from 'nestjs-redis'; import { isNil } from 'ramda'; import { getSelfInstanceQueueKey } from '../commons/utils/rabbit-mq'; -import { terminalTaskStatuses } from './enums/task-statuses.enum'; +import { TaskStatuses, terminalTaskStatuses } from './enums/task-statuses.enum'; import { PipelineTaskEvent } from './models/pipeline-task-event'; import { EXCHANGE_PIPELINE_TASK_TOPIC, @@ -36,7 +36,7 @@ export class PipelineTaskFlushService { const client = this.redisService.getClient(); await client.rpush(this.getKey(message.taskId), JSON.stringify(message)); await client.expire(this.getKey(message.taskId), 600); // ten minutes - if (isNil(message.unit) && terminalTaskStatuses.includes(message.status)) { + if (isNil(message.unit)) { try { await this.amqpConnection.request({ exchange: EXCHANGE_PIPELINE_TASK_TOPIC, diff --git a/src/pipeline-tasks/pipeline-task.entity.ts b/src/pipeline-tasks/pipeline-task.entity.ts index 1fad725..f878b06 100644 --- a/src/pipeline-tasks/pipeline-task.entity.ts +++ b/src/pipeline-tasks/pipeline-task.entity.ts @@ -36,4 +36,7 @@ export class PipelineTask extends AppBaseEntity { @Column({ nullable: true }) endedAt?: Date; + + @Column({ nullable: true }) + runOn: string; } diff --git a/src/pipeline-tasks/pipeline-task.runner.ts b/src/pipeline-tasks/pipeline-task.runner.ts index 571ffa6..732b525 100644 --- a/src/pipeline-tasks/pipeline-task.runner.ts +++ b/src/pipeline-tasks/pipeline-task.runner.ts @@ -9,10 +9,19 @@ import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'; import { PipelineTaskEvent } from './models/pipeline-task-event'; import { last } from 'ramda'; import { Inject } from '@nestjs/common'; +import { + EXCHANGE_PIPELINE_TASK_TOPIC, + QUEUE_PIPELINE_TASK_KILL, + ROUTE_PIPELINE_TASK_KILL, +} from './pipeline-tasks.constants'; import { EXCHANGE_PIPELINE_TASK_FANOUT, ROUTE_PIPELINE_TASK_LOG, } from './pipeline-tasks.constants'; +import { + getSelfInstanceQueueKey, + getSelfInstanceRouteKey, +} from '../commons/utils/rabbit-mq'; type Spawn = typeof spawn; @@ -41,9 +50,13 @@ export class PipelineTaskRunner { } } @RabbitSubscribe({ - exchange: 'stop-pipeline-task', - routingKey: 'mac', - queue: 'mac.stop-pipeline-task', + exchange: EXCHANGE_PIPELINE_TASK_TOPIC, + routingKey: getSelfInstanceRouteKey(ROUTE_PIPELINE_TASK_KILL), + queue: getSelfInstanceQueueKey(QUEUE_PIPELINE_TASK_KILL), + queueOptions: { + autoDelete: true, + durable: true, + }, }) async onStopTask(task: PipelineTask) { this.logger.info({ task }, 'on stop task [%s].', task.id); diff --git a/src/pipeline-tasks/pipeline-tasks.constants.ts b/src/pipeline-tasks/pipeline-tasks.constants.ts index 42d10f9..036525d 100644 --- a/src/pipeline-tasks/pipeline-tasks.constants.ts +++ b/src/pipeline-tasks/pipeline-tasks.constants.ts @@ -5,3 +5,5 @@ export const QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT = 'pipeline-task-log'; export const QUEUE_WRITE_PIPELINE_TASK_LOG = 'write-pipeline-task-log'; export const ROUTE_PIPELINE_TASK_DONE = 'pipeline-task-done'; export const QUEUE_PIPELINE_TASK_DONE = 'pipeline-task-done'; +export const ROUTE_PIPELINE_TASK_KILL = 'pipeline-task-kill'; +export const QUEUE_PIPELINE_TASK_KILL = 'pipeline-task-kill';