From 246623b5dbff15fde55f7d8e4c648cc67c99b403 Mon Sep 17 00:00:00 2001 From: Ivan Li Date: Sun, 20 Jun 2021 10:39:53 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BB=BB=E5=8A=A1=E7=8A=B6=E6=80=81?= =?UTF-8?q?=E6=94=B9=E5=8F=98=E6=97=B6=EF=BC=8C=E8=87=AA=E5=8A=A8=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E6=95=B0=E6=8D=AE=E5=BA=93=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pipeline-task-flush.service.ts | 4 ++-- src/pipeline-tasks/pipeline-task.entity.ts | 3 +++ src/pipeline-tasks/pipeline-task.runner.ts | 19 ++++++++++++++++--- .../pipeline-tasks.constants.ts | 2 ++ 4 files changed, 23 insertions(+), 5 deletions(-) diff --git a/src/pipeline-tasks/pipeline-task-flush.service.ts b/src/pipeline-tasks/pipeline-task-flush.service.ts index 22bddac..9146d48 100644 --- a/src/pipeline-tasks/pipeline-task-flush.service.ts +++ b/src/pipeline-tasks/pipeline-task-flush.service.ts @@ -4,7 +4,7 @@ import { deserialize } from 'class-transformer'; import { RedisService } from 'nestjs-redis'; import { isNil } from 'ramda'; import { getSelfInstanceQueueKey } from '../commons/utils/rabbit-mq'; -import { terminalTaskStatuses } from './enums/task-statuses.enum'; +import { TaskStatuses, terminalTaskStatuses } from './enums/task-statuses.enum'; import { PipelineTaskEvent } from './models/pipeline-task-event'; import { EXCHANGE_PIPELINE_TASK_TOPIC, @@ -36,7 +36,7 @@ export class PipelineTaskFlushService { const client = this.redisService.getClient(); await client.rpush(this.getKey(message.taskId), JSON.stringify(message)); await client.expire(this.getKey(message.taskId), 600); // ten minutes - if (isNil(message.unit) && terminalTaskStatuses.includes(message.status)) { + if (isNil(message.unit)) { try { await this.amqpConnection.request({ exchange: EXCHANGE_PIPELINE_TASK_TOPIC, diff --git a/src/pipeline-tasks/pipeline-task.entity.ts b/src/pipeline-tasks/pipeline-task.entity.ts index 1fad725..f878b06 100644 --- a/src/pipeline-tasks/pipeline-task.entity.ts +++ b/src/pipeline-tasks/pipeline-task.entity.ts @@ -36,4 +36,7 @@ export class PipelineTask extends AppBaseEntity { @Column({ nullable: true }) endedAt?: Date; + + @Column({ nullable: true }) + runOn: string; } diff --git a/src/pipeline-tasks/pipeline-task.runner.ts b/src/pipeline-tasks/pipeline-task.runner.ts index 571ffa6..732b525 100644 --- a/src/pipeline-tasks/pipeline-task.runner.ts +++ b/src/pipeline-tasks/pipeline-task.runner.ts @@ -9,10 +9,19 @@ import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'; import { PipelineTaskEvent } from './models/pipeline-task-event'; import { last } from 'ramda'; import { Inject } from '@nestjs/common'; +import { + EXCHANGE_PIPELINE_TASK_TOPIC, + QUEUE_PIPELINE_TASK_KILL, + ROUTE_PIPELINE_TASK_KILL, +} from './pipeline-tasks.constants'; import { EXCHANGE_PIPELINE_TASK_FANOUT, ROUTE_PIPELINE_TASK_LOG, } from './pipeline-tasks.constants'; +import { + getSelfInstanceQueueKey, + getSelfInstanceRouteKey, +} from '../commons/utils/rabbit-mq'; type Spawn = typeof spawn; @@ -41,9 +50,13 @@ export class PipelineTaskRunner { } } @RabbitSubscribe({ - exchange: 'stop-pipeline-task', - routingKey: 'mac', - queue: 'mac.stop-pipeline-task', + exchange: EXCHANGE_PIPELINE_TASK_TOPIC, + routingKey: getSelfInstanceRouteKey(ROUTE_PIPELINE_TASK_KILL), + queue: getSelfInstanceQueueKey(QUEUE_PIPELINE_TASK_KILL), + queueOptions: { + autoDelete: true, + durable: true, + }, }) async onStopTask(task: PipelineTask) { this.logger.info({ task }, 'on stop task [%s].', task.id); diff --git a/src/pipeline-tasks/pipeline-tasks.constants.ts b/src/pipeline-tasks/pipeline-tasks.constants.ts index 42d10f9..036525d 100644 --- a/src/pipeline-tasks/pipeline-tasks.constants.ts +++ b/src/pipeline-tasks/pipeline-tasks.constants.ts @@ -5,3 +5,5 @@ export const QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT = 'pipeline-task-log'; export const QUEUE_WRITE_PIPELINE_TASK_LOG = 'write-pipeline-task-log'; export const ROUTE_PIPELINE_TASK_DONE = 'pipeline-task-done'; export const QUEUE_PIPELINE_TASK_DONE = 'pipeline-task-done'; +export const ROUTE_PIPELINE_TASK_KILL = 'pipeline-task-kill'; +export const QUEUE_PIPELINE_TASK_KILL = 'pipeline-task-kill';