Compare commits
3 Commits
37f8ae19be
...
5b5a657651
Author | SHA1 | Date | |
---|---|---|---|
|
5b5a657651 | ||
|
a510f411a7 | ||
|
246623b5db |
29
package-lock.json
generated
29
package-lock.json
generated
@ -16,6 +16,7 @@
|
|||||||
"@nestjs/graphql": "^7.9.8",
|
"@nestjs/graphql": "^7.9.8",
|
||||||
"@nestjs/platform-express": "^7.5.1",
|
"@nestjs/platform-express": "^7.5.1",
|
||||||
"@nestjs/typeorm": "^7.1.5",
|
"@nestjs/typeorm": "^7.1.5",
|
||||||
|
"@types/amqplib": "^0.8.0",
|
||||||
"@types/bull": "^3.15.0",
|
"@types/bull": "^3.15.0",
|
||||||
"@types/ramda": "^0.27.38",
|
"@types/ramda": "^0.27.38",
|
||||||
"apollo-server-express": "^2.19.2",
|
"apollo-server-express": "^2.19.2",
|
||||||
@ -2758,6 +2759,15 @@
|
|||||||
"@types/node": "*"
|
"@types/node": "*"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"node_modules/@types/amqplib": {
|
||||||
|
"version": "0.8.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.8.0.tgz",
|
||||||
|
"integrity": "sha512-RDojJ8WACs43HIfWSQGnAVwgNzjMGx4YMNeW7jptgAFgkG1EpNQqts+cND5HYWdYgTM58b+RHe675b0i4A9WpQ==",
|
||||||
|
"dependencies": {
|
||||||
|
"@types/bluebird": "*",
|
||||||
|
"@types/node": "*"
|
||||||
|
}
|
||||||
|
},
|
||||||
"node_modules/@types/babel__core": {
|
"node_modules/@types/babel__core": {
|
||||||
"version": "7.1.14",
|
"version": "7.1.14",
|
||||||
"resolved": "https://registry.npmjs.org/@types/babel__core/-/babel__core-7.1.14.tgz",
|
"resolved": "https://registry.npmjs.org/@types/babel__core/-/babel__core-7.1.14.tgz",
|
||||||
@ -2799,6 +2809,11 @@
|
|||||||
"@babel/types": "^7.3.0"
|
"@babel/types": "^7.3.0"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"node_modules/@types/bluebird": {
|
||||||
|
"version": "3.5.35",
|
||||||
|
"resolved": "https://registry.npmjs.org/@types/bluebird/-/bluebird-3.5.35.tgz",
|
||||||
|
"integrity": "sha512-2WeeXK7BuQo7yPI4WGOBum90SzF/f8rqlvpaXx4rjeTmNssGRDHWf7fgDUH90xMB3sUOu716fUK5d+OVx0+ncQ=="
|
||||||
|
},
|
||||||
"node_modules/@types/body-parser": {
|
"node_modules/@types/body-parser": {
|
||||||
"version": "1.19.0",
|
"version": "1.19.0",
|
||||||
"resolved": "https://registry.npmjs.org/@types/body-parser/-/body-parser-1.19.0.tgz",
|
"resolved": "https://registry.npmjs.org/@types/body-parser/-/body-parser-1.19.0.tgz",
|
||||||
@ -17553,6 +17568,15 @@
|
|||||||
"@types/node": "*"
|
"@types/node": "*"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"@types/amqplib": {
|
||||||
|
"version": "0.8.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.8.0.tgz",
|
||||||
|
"integrity": "sha512-RDojJ8WACs43HIfWSQGnAVwgNzjMGx4YMNeW7jptgAFgkG1EpNQqts+cND5HYWdYgTM58b+RHe675b0i4A9WpQ==",
|
||||||
|
"requires": {
|
||||||
|
"@types/bluebird": "*",
|
||||||
|
"@types/node": "*"
|
||||||
|
}
|
||||||
|
},
|
||||||
"@types/babel__core": {
|
"@types/babel__core": {
|
||||||
"version": "7.1.14",
|
"version": "7.1.14",
|
||||||
"resolved": "https://registry.npmjs.org/@types/babel__core/-/babel__core-7.1.14.tgz",
|
"resolved": "https://registry.npmjs.org/@types/babel__core/-/babel__core-7.1.14.tgz",
|
||||||
@ -17594,6 +17618,11 @@
|
|||||||
"@babel/types": "^7.3.0"
|
"@babel/types": "^7.3.0"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"@types/bluebird": {
|
||||||
|
"version": "3.5.35",
|
||||||
|
"resolved": "https://registry.npmjs.org/@types/bluebird/-/bluebird-3.5.35.tgz",
|
||||||
|
"integrity": "sha512-2WeeXK7BuQo7yPI4WGOBum90SzF/f8rqlvpaXx4rjeTmNssGRDHWf7fgDUH90xMB3sUOu716fUK5d+OVx0+ncQ=="
|
||||||
|
},
|
||||||
"@types/body-parser": {
|
"@types/body-parser": {
|
||||||
"version": "1.19.0",
|
"version": "1.19.0",
|
||||||
"resolved": "https://registry.npmjs.org/@types/body-parser/-/body-parser-1.19.0.tgz",
|
"resolved": "https://registry.npmjs.org/@types/body-parser/-/body-parser-1.19.0.tgz",
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
"@nestjs/graphql": "^7.9.8",
|
"@nestjs/graphql": "^7.9.8",
|
||||||
"@nestjs/platform-express": "^7.5.1",
|
"@nestjs/platform-express": "^7.5.1",
|
||||||
"@nestjs/typeorm": "^7.1.5",
|
"@nestjs/typeorm": "^7.1.5",
|
||||||
|
"@types/amqplib": "^0.8.0",
|
||||||
"@types/bull": "^3.15.0",
|
"@types/bull": "^3.15.0",
|
||||||
"@types/ramda": "^0.27.38",
|
"@types/ramda": "^0.27.38",
|
||||||
"apollo-server-express": "^2.19.2",
|
"apollo-server-express": "^2.19.2",
|
||||||
|
@ -50,17 +50,20 @@ describe('PipelineTaskFlushService', () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
describe('write', () => {
|
describe('write', () => {
|
||||||
|
const amqpMsg = {
|
||||||
|
properties: { headers: { sender: 'test' } },
|
||||||
|
} as any;
|
||||||
it('normal', async () => {
|
it('normal', async () => {
|
||||||
const testEvent = new PipelineTaskEvent();
|
const testEvent = new PipelineTaskEvent();
|
||||||
testEvent.taskId = 'test';
|
testEvent.taskId = 'test';
|
||||||
testEvent.status = TaskStatuses.working;
|
testEvent.status = TaskStatuses.working;
|
||||||
const rpush = jest.spyOn(redisService.getClient(), 'rpush');
|
const rpush = jest.spyOn(redisService.getClient(), 'rpush');
|
||||||
const request = jest.spyOn(amqpConnection, 'request');
|
const request = jest.spyOn(amqpConnection, 'request');
|
||||||
await service.write(testEvent);
|
await service.write(testEvent, amqpMsg);
|
||||||
expect(rpush).toBeCalledTimes(1);
|
expect(rpush).toBeCalledTimes(1);
|
||||||
expect(rpush.mock.calls[0][0]).toEqual('p-task:log:test');
|
expect(rpush.mock.calls[0][0]).toEqual('p-task:log:test');
|
||||||
expect(rpush.mock.calls[0][1]).toEqual(JSON.stringify(testEvent));
|
expect(rpush.mock.calls[0][1]).toEqual(JSON.stringify(testEvent));
|
||||||
expect(request).toBeCalledTimes(0);
|
expect(request).toBeCalledTimes(1);
|
||||||
});
|
});
|
||||||
it('event for which task done', async () => {
|
it('event for which task done', async () => {
|
||||||
const testEvent = new PipelineTaskEvent();
|
const testEvent = new PipelineTaskEvent();
|
||||||
@ -68,13 +71,17 @@ describe('PipelineTaskFlushService', () => {
|
|||||||
testEvent.status = TaskStatuses.success;
|
testEvent.status = TaskStatuses.success;
|
||||||
const rpush = jest.spyOn(redisService.getClient(), 'rpush');
|
const rpush = jest.spyOn(redisService.getClient(), 'rpush');
|
||||||
const request = jest.spyOn(amqpConnection, 'request');
|
const request = jest.spyOn(amqpConnection, 'request');
|
||||||
await service.write(testEvent);
|
await service.write(testEvent, amqpMsg);
|
||||||
expect(rpush).toBeCalledTimes(1);
|
expect(rpush).toBeCalledTimes(1);
|
||||||
expect(request).toBeCalledTimes(1);
|
expect(request).toBeCalledTimes(1);
|
||||||
expect(request.mock.calls[0][0]).toMatchObject({
|
expect(request.mock.calls[0][0]).toMatchObject({
|
||||||
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
|
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||||
routingKey: ROUTE_PIPELINE_TASK_DONE,
|
routingKey: ROUTE_PIPELINE_TASK_DONE,
|
||||||
payload: { taskId: 'test', status: TaskStatuses.success },
|
payload: {
|
||||||
|
taskId: 'test',
|
||||||
|
status: TaskStatuses.success,
|
||||||
|
runOn: 'test',
|
||||||
|
},
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
|
import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
|
||||||
import { Injectable } from '@nestjs/common';
|
import { Injectable } from '@nestjs/common';
|
||||||
|
import { ConsumeMessage } from 'amqplib';
|
||||||
import { deserialize } from 'class-transformer';
|
import { deserialize } from 'class-transformer';
|
||||||
import { RedisService } from 'nestjs-redis';
|
import { RedisService } from 'nestjs-redis';
|
||||||
import { isNil } from 'ramda';
|
import { isNil } from 'ramda';
|
||||||
import { getSelfInstanceQueueKey } from '../commons/utils/rabbit-mq';
|
import { getSelfInstanceQueueKey } from '../commons/utils/rabbit-mq';
|
||||||
import { terminalTaskStatuses } from './enums/task-statuses.enum';
|
|
||||||
import { PipelineTaskEvent } from './models/pipeline-task-event';
|
import { PipelineTaskEvent } from './models/pipeline-task-event';
|
||||||
import {
|
import {
|
||||||
EXCHANGE_PIPELINE_TASK_TOPIC,
|
EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||||
@ -32,16 +32,20 @@ export class PipelineTaskFlushService {
|
|||||||
durable: true,
|
durable: true,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
async write(message: PipelineTaskEvent) {
|
async write(message: PipelineTaskEvent, amqpMsg: ConsumeMessage) {
|
||||||
const client = this.redisService.getClient();
|
const client = this.redisService.getClient();
|
||||||
await client.rpush(this.getKey(message.taskId), JSON.stringify(message));
|
await client.rpush(this.getKey(message.taskId), JSON.stringify(message));
|
||||||
await client.expire(this.getKey(message.taskId), 600); // ten minutes
|
await client.expire(this.getKey(message.taskId), 600); // ten minutes
|
||||||
if (isNil(message.unit) && terminalTaskStatuses.includes(message.status)) {
|
if (isNil(message.unit)) {
|
||||||
try {
|
try {
|
||||||
await this.amqpConnection.request({
|
await this.amqpConnection.request({
|
||||||
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
|
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||||
routingKey: ROUTE_PIPELINE_TASK_DONE,
|
routingKey: ROUTE_PIPELINE_TASK_DONE,
|
||||||
payload: { taskId: message.taskId, status: message.status },
|
payload: {
|
||||||
|
taskId: message.taskId,
|
||||||
|
status: message.status,
|
||||||
|
runOn: amqpMsg.properties.headers.sender,
|
||||||
|
},
|
||||||
});
|
});
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.log(error);
|
console.log(error);
|
||||||
|
@ -36,4 +36,7 @@ export class PipelineTask extends AppBaseEntity {
|
|||||||
|
|
||||||
@Column({ nullable: true })
|
@Column({ nullable: true })
|
||||||
endedAt?: Date;
|
endedAt?: Date;
|
||||||
|
|
||||||
|
@Column({ nullable: true })
|
||||||
|
runOn: string;
|
||||||
}
|
}
|
||||||
|
@ -5,19 +5,34 @@ import { ApplicationException } from '../commons/exceptions/application.exceptio
|
|||||||
import { PipelineUnits } from './enums/pipeline-units.enum';
|
import { PipelineUnits } from './enums/pipeline-units.enum';
|
||||||
import { TaskStatuses } from './enums/task-statuses.enum';
|
import { TaskStatuses } from './enums/task-statuses.enum';
|
||||||
import { InjectPinoLogger, PinoLogger } from 'nestjs-pino';
|
import { InjectPinoLogger, PinoLogger } from 'nestjs-pino';
|
||||||
import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
|
import {
|
||||||
|
AmqpConnection,
|
||||||
|
RabbitRPC,
|
||||||
|
RabbitSubscribe,
|
||||||
|
} from '@golevelup/nestjs-rabbitmq';
|
||||||
import { PipelineTaskEvent } from './models/pipeline-task-event';
|
import { PipelineTaskEvent } from './models/pipeline-task-event';
|
||||||
import { last } from 'ramda';
|
import { last } from 'ramda';
|
||||||
import { Inject } from '@nestjs/common';
|
import { Inject } from '@nestjs/common';
|
||||||
|
import {
|
||||||
|
EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||||
|
QUEUE_PIPELINE_TASK_KILL,
|
||||||
|
ROUTE_PIPELINE_TASK_KILL,
|
||||||
|
} from './pipeline-tasks.constants';
|
||||||
import {
|
import {
|
||||||
EXCHANGE_PIPELINE_TASK_FANOUT,
|
EXCHANGE_PIPELINE_TASK_FANOUT,
|
||||||
ROUTE_PIPELINE_TASK_LOG,
|
ROUTE_PIPELINE_TASK_LOG,
|
||||||
} from './pipeline-tasks.constants';
|
} from './pipeline-tasks.constants';
|
||||||
|
import {
|
||||||
|
getInstanceName,
|
||||||
|
getSelfInstanceQueueKey,
|
||||||
|
getSelfInstanceRouteKey,
|
||||||
|
} from '../commons/utils/rabbit-mq';
|
||||||
|
|
||||||
type Spawn = typeof spawn;
|
type Spawn = typeof spawn;
|
||||||
|
|
||||||
export class PipelineTaskRunner {
|
export class PipelineTaskRunner {
|
||||||
readonly processes = new Map<string, ChildProcessWithoutNullStreams>();
|
readonly processes = new Map<string, ChildProcessWithoutNullStreams>();
|
||||||
|
readonly stopTaskIds = new Set<string>();
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly reposService: ReposService,
|
private readonly reposService: ReposService,
|
||||||
@ -40,19 +55,27 @@ export class PipelineTaskRunner {
|
|||||||
this.logger.error({ task, err }, err.message);
|
this.logger.error({ task, err }, err.message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@RabbitSubscribe({
|
@RabbitRPC({
|
||||||
exchange: 'stop-pipeline-task',
|
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||||
routingKey: 'mac',
|
routingKey: getSelfInstanceRouteKey(ROUTE_PIPELINE_TASK_KILL),
|
||||||
queue: 'mac.stop-pipeline-task',
|
queue: getSelfInstanceQueueKey(QUEUE_PIPELINE_TASK_KILL),
|
||||||
|
queueOptions: {
|
||||||
|
autoDelete: true,
|
||||||
|
durable: true,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
async onStopTask(task: PipelineTask) {
|
async onStopTask(task: PipelineTask) {
|
||||||
this.logger.info({ task }, 'on stop task [%s].', task.id);
|
this.logger.info({ task }, 'on stop task [%s].', task.id);
|
||||||
|
this.stopTaskIds.add(task.id);
|
||||||
const process = this.processes.get(task.id);
|
const process = this.processes.get(task.id);
|
||||||
if (process) {
|
if (process) {
|
||||||
this.logger.info({ task }, 'send signal SIGINT to child process.');
|
this.logger.info({ task }, 'send signal SIGINT to child process.');
|
||||||
process.kill('SIGINT');
|
process.kill('SIGINT');
|
||||||
|
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
|
setTimeout(() => {
|
||||||
|
this.stopTaskIds.delete(task.id);
|
||||||
|
}, 10_000);
|
||||||
if (process === this.processes.get(task.id)) {
|
if (process === this.processes.get(task.id)) {
|
||||||
this.logger.info({ task }, 'send signal SIGKILL to child process.');
|
this.logger.info({ task }, 'send signal SIGKILL to child process.');
|
||||||
process.kill('SIGKILL');
|
process.kill('SIGKILL');
|
||||||
@ -68,6 +91,7 @@ export class PipelineTaskRunner {
|
|||||||
} else {
|
} else {
|
||||||
this.logger.info({ task }, 'child process is not running.');
|
this.logger.info({ task }, 'child process is not running.');
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
async doTask(task: PipelineTask) {
|
async doTask(task: PipelineTask) {
|
||||||
@ -137,6 +161,9 @@ export class PipelineTaskRunner {
|
|||||||
try {
|
try {
|
||||||
for (const script of scripts) {
|
for (const script of scripts) {
|
||||||
this.logger.debug('begin runScript %s', script);
|
this.logger.debug('begin runScript %s', script);
|
||||||
|
if (this.stopTaskIds.has(task.id)) {
|
||||||
|
throw new ApplicationException('Task is be KILLED');
|
||||||
|
}
|
||||||
await this.runScript(script, workspaceRoot, task, unit);
|
await this.runScript(script, workspaceRoot, task, unit);
|
||||||
this.logger.debug('end runScript %s', script);
|
this.logger.debug('end runScript %s', script);
|
||||||
}
|
}
|
||||||
@ -207,7 +234,11 @@ export class PipelineTaskRunner {
|
|||||||
status,
|
status,
|
||||||
};
|
};
|
||||||
this.amqpConnection
|
this.amqpConnection
|
||||||
.publish(EXCHANGE_PIPELINE_TASK_FANOUT, ROUTE_PIPELINE_TASK_LOG, event)
|
.publish(EXCHANGE_PIPELINE_TASK_FANOUT, ROUTE_PIPELINE_TASK_LOG, event, {
|
||||||
|
headers: {
|
||||||
|
sender: getInstanceName(),
|
||||||
|
},
|
||||||
|
})
|
||||||
.catch((error) => {
|
.catch((error) => {
|
||||||
this.logger.error(
|
this.logger.error(
|
||||||
{ error, event },
|
{ error, event },
|
||||||
@ -260,6 +291,9 @@ export class PipelineTaskRunner {
|
|||||||
if (code === 0) {
|
if (code === 0) {
|
||||||
return resolve();
|
return resolve();
|
||||||
}
|
}
|
||||||
|
if (this.stopTaskIds.has(task.id)) {
|
||||||
|
throw reject(new ApplicationException('Task is be KILLED'));
|
||||||
|
}
|
||||||
return reject(new ApplicationException('exec script failed'));
|
return reject(new ApplicationException('exec script failed'));
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
@ -5,3 +5,5 @@ export const QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT = 'pipeline-task-log';
|
|||||||
export const QUEUE_WRITE_PIPELINE_TASK_LOG = 'write-pipeline-task-log';
|
export const QUEUE_WRITE_PIPELINE_TASK_LOG = 'write-pipeline-task-log';
|
||||||
export const ROUTE_PIPELINE_TASK_DONE = 'pipeline-task-done';
|
export const ROUTE_PIPELINE_TASK_DONE = 'pipeline-task-done';
|
||||||
export const QUEUE_PIPELINE_TASK_DONE = 'pipeline-task-done';
|
export const QUEUE_PIPELINE_TASK_DONE = 'pipeline-task-done';
|
||||||
|
export const ROUTE_PIPELINE_TASK_KILL = 'pipeline-task-kill';
|
||||||
|
export const QUEUE_PIPELINE_TASK_KILL = 'pipeline-task-kill';
|
||||||
|
@ -55,5 +55,7 @@ export class PipelineTasksResolver {
|
|||||||
@Mutation(() => Boolean)
|
@Mutation(() => Boolean)
|
||||||
async stopPipelineTask(@Args('id') id: string) {
|
async stopPipelineTask(@Args('id') id: string) {
|
||||||
const task = await this.service.findTaskById(id);
|
const task = await this.service.findTaskById(id);
|
||||||
|
await this.service.stopTask(task);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
import { Injectable } from '@nestjs/common';
|
import { BadRequestException, Injectable } from '@nestjs/common';
|
||||||
import { InjectRepository } from '@nestjs/typeorm';
|
import { InjectRepository } from '@nestjs/typeorm';
|
||||||
import { PipelineTask } from './pipeline-task.entity';
|
import { PipelineTask } from './pipeline-task.entity';
|
||||||
import { Repository } from 'typeorm';
|
import { Repository } from 'typeorm';
|
||||||
@ -7,7 +7,6 @@ import { Pipeline } from '../pipelines/pipeline.entity';
|
|||||||
import debug from 'debug';
|
import debug from 'debug';
|
||||||
import { AmqpConnection, RabbitRPC } from '@golevelup/nestjs-rabbitmq';
|
import { AmqpConnection, RabbitRPC } from '@golevelup/nestjs-rabbitmq';
|
||||||
import {
|
import {
|
||||||
EXCHANGE_PIPELINE_TASK_FANOUT,
|
|
||||||
EXCHANGE_PIPELINE_TASK_TOPIC,
|
EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||||
QUEUE_PIPELINE_TASK_DONE,
|
QUEUE_PIPELINE_TASK_DONE,
|
||||||
ROUTE_PIPELINE_TASK_DONE,
|
ROUTE_PIPELINE_TASK_DONE,
|
||||||
@ -17,6 +16,8 @@ import { find, isNil, propEq } from 'ramda';
|
|||||||
import { PipelineTaskLogs } from './models/pipeline-task-logs.model';
|
import { PipelineTaskLogs } from './models/pipeline-task-logs.model';
|
||||||
import { TaskStatuses, terminalTaskStatuses } from './enums/task-statuses.enum';
|
import { TaskStatuses, terminalTaskStatuses } from './enums/task-statuses.enum';
|
||||||
import { InjectPinoLogger, PinoLogger } from 'nestjs-pino';
|
import { InjectPinoLogger, PinoLogger } from 'nestjs-pino';
|
||||||
|
import { getAppInstanceRouteKey } from '../commons/utils/rabbit-mq';
|
||||||
|
import { ROUTE_PIPELINE_TASK_KILL } from './pipeline-tasks.constants';
|
||||||
|
|
||||||
const log = debug('fennec:pipeline-tasks:service');
|
const log = debug('fennec:pipeline-tasks:service');
|
||||||
|
|
||||||
@ -84,7 +85,7 @@ export class PipelineTasksService {
|
|||||||
durable: true,
|
durable: true,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
async updateByEvent({ taskId }: { taskId: string }) {
|
async updateByEvent({ taskId, runOn }: { taskId: string; runOn: string }) {
|
||||||
try {
|
try {
|
||||||
const [events, task] = await Promise.all([
|
const [events, task] = await Promise.all([
|
||||||
this.eventFlushService.read(taskId),
|
this.eventFlushService.read(taskId),
|
||||||
@ -127,9 +128,10 @@ export class PipelineTasksService {
|
|||||||
l.status = event.status;
|
l.status = event.status;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
task.runOn = runOn;
|
||||||
await this.repository.update({ id: taskId }, task);
|
await this.repository.update({ id: taskId }, task);
|
||||||
return task;
|
|
||||||
this.logger.info('[updateByEvent] success. taskId: %s', taskId);
|
this.logger.info('[updateByEvent] success. taskId: %s', taskId);
|
||||||
|
return task;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger.error(
|
this.logger.error(
|
||||||
{ error },
|
{ error },
|
||||||
@ -138,4 +140,17 @@ export class PipelineTasksService {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async stopTask(task: PipelineTask) {
|
||||||
|
if (isNil(task.runOn)) {
|
||||||
|
throw new BadRequestException(
|
||||||
|
"the task have not running instance on database. field 'runOn' is nil",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
await this.amqpConnection.request({
|
||||||
|
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||||
|
routingKey: getAppInstanceRouteKey(ROUTE_PIPELINE_TASK_KILL, task.runOn),
|
||||||
|
payload: task,
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user