feat(pipeline-tasks): 在数据库中保存任务执行者所在实例。

This commit is contained in:
Ivan Li
2021-06-20 10:51:43 +08:00
parent 246623b5db
commit a510f411a7
5 changed files with 46 additions and 7 deletions

View File

@ -1,10 +1,10 @@
import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
import { Injectable } from '@nestjs/common';
import { ConsumeMessage } from 'amqplib';
import { deserialize } from 'class-transformer';
import { RedisService } from 'nestjs-redis';
import { isNil } from 'ramda';
import { getSelfInstanceQueueKey } from '../commons/utils/rabbit-mq';
import { TaskStatuses, terminalTaskStatuses } from './enums/task-statuses.enum';
import { PipelineTaskEvent } from './models/pipeline-task-event';
import {
EXCHANGE_PIPELINE_TASK_TOPIC,
@ -32,7 +32,7 @@ export class PipelineTaskFlushService {
durable: true,
},
})
async write(message: PipelineTaskEvent) {
async write(message: PipelineTaskEvent, amqpMsg: ConsumeMessage) {
const client = this.redisService.getClient();
await client.rpush(this.getKey(message.taskId), JSON.stringify(message));
await client.expire(this.getKey(message.taskId), 600); // ten minutes
@ -41,7 +41,11 @@ export class PipelineTaskFlushService {
await this.amqpConnection.request({
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
routingKey: ROUTE_PIPELINE_TASK_DONE,
payload: { taskId: message.taskId, status: message.status },
payload: {
taskId: message.taskId,
status: message.status,
runOn: amqpMsg.properties.headers.sender,
},
});
} catch (error) {
console.log(error);

View File

@ -19,6 +19,7 @@ import {
ROUTE_PIPELINE_TASK_LOG,
} from './pipeline-tasks.constants';
import {
getInstanceName,
getSelfInstanceQueueKey,
getSelfInstanceRouteKey,
} from '../commons/utils/rabbit-mq';
@ -220,7 +221,11 @@ export class PipelineTaskRunner {
status,
};
this.amqpConnection
.publish(EXCHANGE_PIPELINE_TASK_FANOUT, ROUTE_PIPELINE_TASK_LOG, event)
.publish(EXCHANGE_PIPELINE_TASK_FANOUT, ROUTE_PIPELINE_TASK_LOG, event, {
headers: {
sender: getInstanceName(),
},
})
.catch((error) => {
this.logger.error(
{ error, event },

View File

@ -7,7 +7,6 @@ import { Pipeline } from '../pipelines/pipeline.entity';
import debug from 'debug';
import { AmqpConnection, RabbitRPC } from '@golevelup/nestjs-rabbitmq';
import {
EXCHANGE_PIPELINE_TASK_FANOUT,
EXCHANGE_PIPELINE_TASK_TOPIC,
QUEUE_PIPELINE_TASK_DONE,
ROUTE_PIPELINE_TASK_DONE,
@ -84,7 +83,7 @@ export class PipelineTasksService {
durable: true,
},
})
async updateByEvent({ taskId }: { taskId: string }) {
async updateByEvent({ taskId, runOn }: { taskId: string; runOn: string }) {
try {
const [events, task] = await Promise.all([
this.eventFlushService.read(taskId),
@ -127,9 +126,10 @@ export class PipelineTasksService {
l.status = event.status;
}
}
task.runOn = runOn;
await this.repository.update({ id: taskId }, task);
return task;
this.logger.info('[updateByEvent] success. taskId: %s', taskId);
return task;
} catch (error) {
this.logger.error(
{ error },