feat: 任务状态改变时,自动更新数据库。

This commit is contained in:
Ivan Li 2021-06-20 10:39:53 +08:00
parent 37f8ae19be
commit 246623b5db
4 changed files with 23 additions and 5 deletions

View File

@ -4,7 +4,7 @@ import { deserialize } from 'class-transformer';
import { RedisService } from 'nestjs-redis'; import { RedisService } from 'nestjs-redis';
import { isNil } from 'ramda'; import { isNil } from 'ramda';
import { getSelfInstanceQueueKey } from '../commons/utils/rabbit-mq'; import { getSelfInstanceQueueKey } from '../commons/utils/rabbit-mq';
import { terminalTaskStatuses } from './enums/task-statuses.enum'; import { TaskStatuses, terminalTaskStatuses } from './enums/task-statuses.enum';
import { PipelineTaskEvent } from './models/pipeline-task-event'; import { PipelineTaskEvent } from './models/pipeline-task-event';
import { import {
EXCHANGE_PIPELINE_TASK_TOPIC, EXCHANGE_PIPELINE_TASK_TOPIC,
@ -36,7 +36,7 @@ export class PipelineTaskFlushService {
const client = this.redisService.getClient(); const client = this.redisService.getClient();
await client.rpush(this.getKey(message.taskId), JSON.stringify(message)); await client.rpush(this.getKey(message.taskId), JSON.stringify(message));
await client.expire(this.getKey(message.taskId), 600); // ten minutes await client.expire(this.getKey(message.taskId), 600); // ten minutes
if (isNil(message.unit) && terminalTaskStatuses.includes(message.status)) { if (isNil(message.unit)) {
try { try {
await this.amqpConnection.request({ await this.amqpConnection.request({
exchange: EXCHANGE_PIPELINE_TASK_TOPIC, exchange: EXCHANGE_PIPELINE_TASK_TOPIC,

View File

@ -36,4 +36,7 @@ export class PipelineTask extends AppBaseEntity {
@Column({ nullable: true }) @Column({ nullable: true })
endedAt?: Date; endedAt?: Date;
@Column({ nullable: true })
runOn: string;
} }

View File

@ -9,10 +9,19 @@ import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
import { PipelineTaskEvent } from './models/pipeline-task-event'; import { PipelineTaskEvent } from './models/pipeline-task-event';
import { last } from 'ramda'; import { last } from 'ramda';
import { Inject } from '@nestjs/common'; import { Inject } from '@nestjs/common';
import {
EXCHANGE_PIPELINE_TASK_TOPIC,
QUEUE_PIPELINE_TASK_KILL,
ROUTE_PIPELINE_TASK_KILL,
} from './pipeline-tasks.constants';
import { import {
EXCHANGE_PIPELINE_TASK_FANOUT, EXCHANGE_PIPELINE_TASK_FANOUT,
ROUTE_PIPELINE_TASK_LOG, ROUTE_PIPELINE_TASK_LOG,
} from './pipeline-tasks.constants'; } from './pipeline-tasks.constants';
import {
getSelfInstanceQueueKey,
getSelfInstanceRouteKey,
} from '../commons/utils/rabbit-mq';
type Spawn = typeof spawn; type Spawn = typeof spawn;
@ -41,9 +50,13 @@ export class PipelineTaskRunner {
} }
} }
@RabbitSubscribe({ @RabbitSubscribe({
exchange: 'stop-pipeline-task', exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
routingKey: 'mac', routingKey: getSelfInstanceRouteKey(ROUTE_PIPELINE_TASK_KILL),
queue: 'mac.stop-pipeline-task', queue: getSelfInstanceQueueKey(QUEUE_PIPELINE_TASK_KILL),
queueOptions: {
autoDelete: true,
durable: true,
},
}) })
async onStopTask(task: PipelineTask) { async onStopTask(task: PipelineTask) {
this.logger.info({ task }, 'on stop task [%s].', task.id); this.logger.info({ task }, 'on stop task [%s].', task.id);

View File

@ -5,3 +5,5 @@ export const QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT = 'pipeline-task-log';
export const QUEUE_WRITE_PIPELINE_TASK_LOG = 'write-pipeline-task-log'; export const QUEUE_WRITE_PIPELINE_TASK_LOG = 'write-pipeline-task-log';
export const ROUTE_PIPELINE_TASK_DONE = 'pipeline-task-done'; export const ROUTE_PIPELINE_TASK_DONE = 'pipeline-task-done';
export const QUEUE_PIPELINE_TASK_DONE = 'pipeline-task-done'; export const QUEUE_PIPELINE_TASK_DONE = 'pipeline-task-done';
export const ROUTE_PIPELINE_TASK_KILL = 'pipeline-task-kill';
export const QUEUE_PIPELINE_TASK_KILL = 'pipeline-task-kill';