From a510f411a78d2aa7c1bf17e314a9ecb22ba516b3 Mon Sep 17 00:00:00 2001 From: Ivan Li Date: Sun, 20 Jun 2021 10:51:43 +0800 Subject: [PATCH] =?UTF-8?q?feat(pipeline-tasks):=20=E5=9C=A8=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93=E4=B8=AD=E4=BF=9D=E5=AD=98=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E8=80=85=E6=89=80=E5=9C=A8=E5=AE=9E=E4=BE=8B?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package-lock.json | 29 +++++++++++++++++++ package.json | 1 + .../pipeline-task-flush.service.ts | 10 +++++-- src/pipeline-tasks/pipeline-task.runner.ts | 7 ++++- src/pipeline-tasks/pipeline-tasks.service.ts | 6 ++-- 5 files changed, 46 insertions(+), 7 deletions(-) diff --git a/package-lock.json b/package-lock.json index 484716c..b8962f6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,6 +16,7 @@ "@nestjs/graphql": "^7.9.8", "@nestjs/platform-express": "^7.5.1", "@nestjs/typeorm": "^7.1.5", + "@types/amqplib": "^0.8.0", "@types/bull": "^3.15.0", "@types/ramda": "^0.27.38", "apollo-server-express": "^2.19.2", @@ -2758,6 +2759,15 @@ "@types/node": "*" } }, + "node_modules/@types/amqplib": { + "version": "0.8.0", + "resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.8.0.tgz", + "integrity": "sha512-RDojJ8WACs43HIfWSQGnAVwgNzjMGx4YMNeW7jptgAFgkG1EpNQqts+cND5HYWdYgTM58b+RHe675b0i4A9WpQ==", + "dependencies": { + "@types/bluebird": "*", + "@types/node": "*" + } + }, "node_modules/@types/babel__core": { "version": "7.1.14", "resolved": "https://registry.npmjs.org/@types/babel__core/-/babel__core-7.1.14.tgz", @@ -2799,6 +2809,11 @@ "@babel/types": "^7.3.0" } }, + "node_modules/@types/bluebird": { + "version": "3.5.35", + "resolved": "https://registry.npmjs.org/@types/bluebird/-/bluebird-3.5.35.tgz", + "integrity": "sha512-2WeeXK7BuQo7yPI4WGOBum90SzF/f8rqlvpaXx4rjeTmNssGRDHWf7fgDUH90xMB3sUOu716fUK5d+OVx0+ncQ==" + }, "node_modules/@types/body-parser": { "version": "1.19.0", "resolved": "https://registry.npmjs.org/@types/body-parser/-/body-parser-1.19.0.tgz", @@ -17553,6 +17568,15 @@ "@types/node": "*" } }, + "@types/amqplib": { + "version": "0.8.0", + "resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.8.0.tgz", + "integrity": "sha512-RDojJ8WACs43HIfWSQGnAVwgNzjMGx4YMNeW7jptgAFgkG1EpNQqts+cND5HYWdYgTM58b+RHe675b0i4A9WpQ==", + "requires": { + "@types/bluebird": "*", + "@types/node": "*" + } + }, "@types/babel__core": { "version": "7.1.14", "resolved": "https://registry.npmjs.org/@types/babel__core/-/babel__core-7.1.14.tgz", @@ -17594,6 +17618,11 @@ "@babel/types": "^7.3.0" } }, + "@types/bluebird": { + "version": "3.5.35", + "resolved": "https://registry.npmjs.org/@types/bluebird/-/bluebird-3.5.35.tgz", + "integrity": "sha512-2WeeXK7BuQo7yPI4WGOBum90SzF/f8rqlvpaXx4rjeTmNssGRDHWf7fgDUH90xMB3sUOu716fUK5d+OVx0+ncQ==" + }, "@types/body-parser": { "version": "1.19.0", "resolved": "https://registry.npmjs.org/@types/body-parser/-/body-parser-1.19.0.tgz", diff --git a/package.json b/package.json index 391e7b3..a288e49 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ "@nestjs/graphql": "^7.9.8", "@nestjs/platform-express": "^7.5.1", "@nestjs/typeorm": "^7.1.5", + "@types/amqplib": "^0.8.0", "@types/bull": "^3.15.0", "@types/ramda": "^0.27.38", "apollo-server-express": "^2.19.2", diff --git a/src/pipeline-tasks/pipeline-task-flush.service.ts b/src/pipeline-tasks/pipeline-task-flush.service.ts index 9146d48..8d61391 100644 --- a/src/pipeline-tasks/pipeline-task-flush.service.ts +++ b/src/pipeline-tasks/pipeline-task-flush.service.ts @@ -1,10 +1,10 @@ import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'; import { Injectable } from '@nestjs/common'; +import { ConsumeMessage } from 'amqplib'; import { deserialize } from 'class-transformer'; import { RedisService } from 'nestjs-redis'; import { isNil } from 'ramda'; import { getSelfInstanceQueueKey } from '../commons/utils/rabbit-mq'; -import { TaskStatuses, terminalTaskStatuses } from './enums/task-statuses.enum'; import { PipelineTaskEvent } from './models/pipeline-task-event'; import { EXCHANGE_PIPELINE_TASK_TOPIC, @@ -32,7 +32,7 @@ export class PipelineTaskFlushService { durable: true, }, }) - async write(message: PipelineTaskEvent) { + async write(message: PipelineTaskEvent, amqpMsg: ConsumeMessage) { const client = this.redisService.getClient(); await client.rpush(this.getKey(message.taskId), JSON.stringify(message)); await client.expire(this.getKey(message.taskId), 600); // ten minutes @@ -41,7 +41,11 @@ export class PipelineTaskFlushService { await this.amqpConnection.request({ exchange: EXCHANGE_PIPELINE_TASK_TOPIC, routingKey: ROUTE_PIPELINE_TASK_DONE, - payload: { taskId: message.taskId, status: message.status }, + payload: { + taskId: message.taskId, + status: message.status, + runOn: amqpMsg.properties.headers.sender, + }, }); } catch (error) { console.log(error); diff --git a/src/pipeline-tasks/pipeline-task.runner.ts b/src/pipeline-tasks/pipeline-task.runner.ts index 732b525..cfbf0fe 100644 --- a/src/pipeline-tasks/pipeline-task.runner.ts +++ b/src/pipeline-tasks/pipeline-task.runner.ts @@ -19,6 +19,7 @@ import { ROUTE_PIPELINE_TASK_LOG, } from './pipeline-tasks.constants'; import { + getInstanceName, getSelfInstanceQueueKey, getSelfInstanceRouteKey, } from '../commons/utils/rabbit-mq'; @@ -220,7 +221,11 @@ export class PipelineTaskRunner { status, }; this.amqpConnection - .publish(EXCHANGE_PIPELINE_TASK_FANOUT, ROUTE_PIPELINE_TASK_LOG, event) + .publish(EXCHANGE_PIPELINE_TASK_FANOUT, ROUTE_PIPELINE_TASK_LOG, event, { + headers: { + sender: getInstanceName(), + }, + }) .catch((error) => { this.logger.error( { error, event }, diff --git a/src/pipeline-tasks/pipeline-tasks.service.ts b/src/pipeline-tasks/pipeline-tasks.service.ts index a729547..5a04ebe 100644 --- a/src/pipeline-tasks/pipeline-tasks.service.ts +++ b/src/pipeline-tasks/pipeline-tasks.service.ts @@ -7,7 +7,6 @@ import { Pipeline } from '../pipelines/pipeline.entity'; import debug from 'debug'; import { AmqpConnection, RabbitRPC } from '@golevelup/nestjs-rabbitmq'; import { - EXCHANGE_PIPELINE_TASK_FANOUT, EXCHANGE_PIPELINE_TASK_TOPIC, QUEUE_PIPELINE_TASK_DONE, ROUTE_PIPELINE_TASK_DONE, @@ -84,7 +83,7 @@ export class PipelineTasksService { durable: true, }, }) - async updateByEvent({ taskId }: { taskId: string }) { + async updateByEvent({ taskId, runOn }: { taskId: string; runOn: string }) { try { const [events, task] = await Promise.all([ this.eventFlushService.read(taskId), @@ -127,9 +126,10 @@ export class PipelineTasksService { l.status = event.status; } } + task.runOn = runOn; await this.repository.update({ id: taskId }, task); - return task; this.logger.info('[updateByEvent] success. taskId: %s', taskId); + return task; } catch (error) { this.logger.error( { error },